o
    b                     @   sL   d Z ddlmZ ddlmZ eejejG dd dZG dd deZdS )	z
Producer-Consumer Proxy.
    )implementer)
interfacesc                   @   sz   e Zd ZdZdZdZdZdZdZdZ	dZ
dd Zdd Zd	d
 Zdd Zdd Zdd Zdd Zdd ZdefddZdS )BasicProducerConsumerProxyaa  
    I can act as a man in the middle between any Producer and Consumer.

    @ivar producer: the Producer I subscribe to.
    @type producer: L{IProducer<interfaces.IProducer>}
    @ivar consumer: the Consumer I publish to.
    @type consumer: L{IConsumer<interfaces.IConsumer>}
    @ivar paused: As a Producer, am I paused?
    @type paused: bool
    NTFc                 C   s*   g | _ |d ur|| _|| | j d S d S N)_bufferconsumerregisterProduceriAmStreaming)selfr    r   7/usr/lib/python3/dist-packages/twisted/protocols/pcp.py__init__#   s
   z#BasicProducerConsumerProxy.__init__c                 C   s   d| _ | jr| j  d S d S NT)pausedproducerpauseProducingr
   r   r   r   r   +   s   z)BasicProducerConsumerProxy.pauseProducingc                 C   sX   d| _ | jr| jd| j g | jd d < n| jsd| _| jd ur*| j  d S d S )NF T)	r   r   r   writejoinr	   outstandingPullr   resumeProducingr   r   r   r   r   0   s   
z*BasicProducerConsumerProxy.resumeProducingc                 C   s*   | j d ur
| j   | jd ur| `d S d S r   )r   stopProducingr   r   r   r   r   r   =   s
   


z(BasicProducerConsumerProxy.stopProducingc                 C   sF   | j s	| js| js| j| d S | jd ur!| j| d| _d S d S NF)r   r	   r   r   appendr   r   r
   datar   r   r   r   E   s   

z BasicProducerConsumerProxy.writec                 C   s    | j d ur
| j   |   d S r   )r   finishunregisterProducerr   r   r   r   r   N   s   

z!BasicProducerConsumerProxy.finishc                 C   s   || _ || _d S r   )r   producerIsStreamingr
   r   	streamingr   r   r   r   S   s   
z+BasicProducerConsumerProxy.registerProducerc                 C   s*   | j d ur	| ` | `| jr| j  d S d S r   )r   r   r   r   r   r   r   r   r   W   s   
z-BasicProducerConsumerProxy.unregisterProducerreturnc                 C   s"   d| j  dt| dd| j dS )N<@xz around >)	__class__idr   r   r   r   r   __repr__^   s   "z#BasicProducerConsumerProxy.__repr__)__name__
__module____qualname____doc__r   r   r   r	   r   r   stoppedr   r   r   r   r   r   r   r   strr)   r   r   r   r   r      s$    	r   c                   @   sL   e Zd ZdZdZdZdZdd Zdd Zdd	 Z	d
d Z
dd Zdd ZdS )ProducerConsumerProxyzProducerConsumerProxy with a finite buffer.

    When my buffer fills up, I have my parent Producer pause until my buffer
    has room in it again.
    i   Fc                 C   s
   d| _ d S r   )r   r   r   r   r   r   o   s   
z$ProducerConsumerProxy.pauseProducingc                 C   s   d| _ | jr5d| j}| |}|t|k r-||d  }| jr$J d|g| jd d < n
g | jd d < nd}| jrI|rI| jsI| jd urI| j  | jsP| | _	| j
d ur{tdd | jD }| jrq|| jk rqd| _| j
  d S | j	r}| j
  d S d S d S )NFr   .Streaming producer did not write all its data.r   c                 s       | ]}t |V  qd S r   len.0sr   r   r   	<genexpr>       z8ProducerConsumerProxy.resumeProducing.<locals>.<genexpr>)r   r   r   _writeSomeDatar4   r	   unregisteredr   r   r   r   sumproducerPaused
bufferSizer   )r
   r   	bytesSentunsentbytesBufferedr   r   r   r   t   s@   



z%ProducerConsumerProxy.resumeProducingc                 C   s   | j s	| js| js| j| n+| jd ur;| jrJ d| |}d| _|t|ks;| jr1J d| j||d   | jd ur\| j	r^t
dd | jD }|| jkr`| j  d| _d S d S d S d S )Nz9Writing fresh data to consumer before my buffer is empty!Fr1   c                 s   r2   r   r3   r5   r   r   r   r8      r9   z.ProducerConsumerProxy.write.<locals>.<genexpr>T)r   r	   r   r   r   r   r:   r4   r   r   r<   r>   r   r=   )r
   r   r?   rA   r   r   r   r      s,   




zProducerConsumerProxy.writec                 C   s(   d| _ t| || |s|  d S d S r   )r;   r   r   r   r    r   r   r   r      s
   z&ProducerConsumerProxy.registerProducerc                 C   s:   | j d ur	| ` | `d| _| jr| js| j  d S d S d S r   )r   r   r;   r   r   r   r   r   r   r   r      s   
z(ProducerConsumerProxy.unregisterProducerc                 C   s"   | j du rdS | j | t|S )z`Write as much of this data as possible.

        @returns: The number of bytes written.
        Nr   )r   r   r4   r   r   r   r   r:      s   
z$ProducerConsumerProxy._writeSomeDataN)r*   r+   r,   r-   r>   r=   r;   r   r   r   r   r   r:   r   r   r   r   r0   b   s    -r0   N)	r-   zope.interfacer   twisted.internetr   	IProducer	IConsumerr   r0   r   r   r   r   <module>   s   S