o
    b1                     @   s   d Z ddlZddlZddlZddlmZ ddlmZmZm	Z	m
Z
mZ ddlmZmZ ddlmZ ddlmZ ddlmZ dd	lmZmZ d
ZeejZejejed< G dd de	jZ G dd deZ!dS )z
Tests for L{twisted.internet.stdio}.

@var properEnv: A copy of L{os.environ} which has L{bytes} keys/values on POSIX
    platforms and native L{str} keys/values on Windows.
    N)skipIf)defererrorprotocolreactorstdio)filepathlog)requireModule)platform)ConnectionLostNotifyingProtocol)SkipTestTestCases   xyz123abc Twisted is great!
PYTHONPATHc                   @   s4   e Zd ZdZdZdd Zdd Zdd Zd	d
 ZdS )StandardIOTestProcessProtocola  
    Test helper for collecting output from a child process and notifying
    something when it exits.

    @ivar onConnection: A L{defer.Deferred} which will be called back with
    L{None} when the connection to the child process is established.

    @ivar onCompletion: A L{defer.Deferred} which will be errbacked with the
    failure associated with the child process exiting when it exits.

    @ivar onDataReceived: A L{defer.Deferred} which will be called back with
    this instance whenever C{childDataReceived} is called, or L{None} to
    suppress these callbacks.

    @ivar data: A C{dict} mapping file descriptors to strings containing all
    bytes received from the child process on each file descriptor.
    Nc                 C   s   t  | _t  | _i | _d S N)r   DeferredonConnectiononCompletiondataself r   9/usr/lib/python3/dist-packages/twisted/test/test_stdio.py__init__8   s   


z&StandardIOTestProcessProtocol.__init__c                 C   s   | j d  d S r   )r   callbackr   r   r   r   connectionMade=      z,StandardIOTestProcessProtocol.connectionMadec                 C   sB   | j |d| | j |< | jdur| jd}| _||  dS dS )z
        Record all bytes received from the child process in the C{data}
        dictionary.  Fire C{onDataReceived} if it is not L{None}.
            N)r   getonDataReceivedr   )r   namebytesdr   r   r   childDataReceived@   s
   
z/StandardIOTestProcessProtocol.childDataReceivedc                 C   s   | j | d S r   )r   r   )r   reasonr   r   r   processEndedJ   r   z*StandardIOTestProcessProtocol.processEnded)	__name__
__module____qualname____doc__r    r   r   r$   r&   r   r   r   r   r   #   s    
r   c                   @   s   e Zd Ze reddu rdZdd Zdd Zdd	 Z	d
d Z
dd Zdd Zdd Zdd Zdd Zdd Zdd Zee ddd ZdS )StandardInputOutputTestswin32processNzIOn windows, spawnProcess is not available in the absence of win32process.c                 O   s:   t jdd| tjjgt| }tj|t j|fdti|S )a_  
        Launch a child Python process and communicate with it using the
        given ProcessProtocol.

        @param proto: A L{ProcessProtocol} instance which will be connected
        to the child process.

        @param sibling: The basename of a file containing the Python program
        to run in the child process.

        @param *args: strings which will be passed to the child process on
        the command line as C{argv[2:]}.

        @param **kw: additional arguments to pass to L{reactor.spawnProcess}.

        @return: The L{IProcessTransport} provider for the spawned process.
        s   -ms   twisted.test.env)sys
executabler   	__class__r(   listspawnProcess	properEnv)r   protosiblingargskwr   r   r   _spawnProcessV   s   z&StandardInputOutputTests._spawnProcessc                    s$   fdd} fdd}| ||S )Nc                    s     d|  d S )Nz%Process terminated with non-Failure: )fail)resultr   r   r   cbq   s   z4StandardInputOutputTests._requireFailure.<locals>.cbc                    s    | S r   r   )err)r   r   r   ebt   s   z4StandardInputOutputTests._requireFailure.<locals>.eb)addCallbacks)r   r#   r   r;   r=   r   )r   r   r   _requireFailurep   s   z(StandardInputOutputTests._requireFailurec                    sL      td   t j}d   fdd}||S )z
        Verify that a protocol connected to L{StandardIO} can disconnect
        itself using C{transport.loseConnection}.
        Child process logging to s   stdio_test_loseconnc                    sb   t  }|D ]}td|   qW d    n1 sw   Y  dj | tj d S )NzChild logged:    )	openr	   msgrstripfailIfInr   trapr   ProcessDone)r%   flineerrorLogFilepr   r   r   r&      s   
zBStandardInputOutputTests.test_loseConnection.<locals>.processEnded)mktempr	   rC   r   r   r8   r?   r   r#   r&   r   rJ   r   test_loseConnectiony   s   	z,StandardInputOutputTests.test_loseConnectionc                    sf   |   }td|  t  t  _ fdd} j| dd }|  j|}| 	 d| |S )z
        When stdin is closed and the protocol connected to it implements
        L{IHalfCloseableProtocol}, the protocol's C{readConnectionLost} method
        is called.
        r@   c                    s    j } j  |S r   )r   	transport
closeStdin)ignoredr#   rL   r   r   cbBytes   s   
zAStandardInputOutputTests.test_readConnectionLost.<locals>.cbBytesc                 S   s   |  tj d S r   )rF   r   rG   r%   r   r   r   r&      r   zFStandardInputOutputTests.test_readConnectionLost.<locals>.processEndeds   stdio_test_halfclose)
rM   r	   rC   r   r   r   r    addCallbackr?   r8   )r   rK   rT   r&   r#   r   rS   r   test_readConnectionLost   s   
z0StandardInputOutputTests.test_readConnectionLostc              
      s^   t   zj dtdd W n ty  } ztt|d}~ww  fdd} j|S )z
        Verify that a write made directly to stdout using L{os.write}
        after StandardIO has finished is reliably received by the
        process reading that stdout.
        s   stdio_test_lastwriteT)usePTYNc                    s2     jd td jd | tj dS )z
            Asserts that the parent received the bytes written by the child
            immediately after the child starts.
            rA   z	Received z) from child, did not find expected bytes.N)
assertTruer   endswithUNIQUE_LAST_WRITE_STRINGrF   r   rG   rU   rL   r   r   r   r&      s
   zEStandardInputOutputTests.test_lastWriteReceived.<locals>.processEnded)r   r8   r[   
ValueErrorr   strr?   r   )r   er&   r   r\   r   test_lastWriteReceived   s   
z/StandardInputOutputTests.test_lastWriteReceivedc                    2   t    j} d  fdd}||S )z
        Verify that the transport of a protocol connected to L{StandardIO}
        has C{getHost} and C{getPeer} methods.
        s   stdio_test_hostpeerc                    s6    j d  \}}| | | tj d S )NrA   )r   
splitlinesrY   rF   r   rG   )r%   hostpeerr\   r   r   r&      s   

z?StandardInputOutputTests.test_hostAndPeer.<locals>.processEndedr   r   r8   r?   rN   r   r\   r   test_hostAndPeer   s
   z)StandardInputOutputTests.test_hostAndPeerc                    ra   )z
        Verify that the C{write} method of the transport of a protocol
        connected to L{StandardIO} sends bytes to standard out.
        s   stdio_test_writec                    "     jd d | tj d S NrA   s   ok!assertEqualr   rF   r   rG   rU   r\   r   r   r&         z9StandardInputOutputTests.test_write.<locals>.processEndedre   rN   r   r\   r   
test_write   
   z#StandardInputOutputTests.test_writec                    ra   )z
        Verify that the C{writeSequence} method of the transport of a
        protocol connected to L{StandardIO} sends bytes to standard out.
        s   stdio_test_writeseqc                    rg   rh   ri   rU   r\   r   r   r&      rk   zAStandardInputOutputTests.test_writeSequence.<locals>.processEndedre   rN   r   r\   r   test_writeSequence   rm   z+StandardInputOutputTests.test_writeSequencec                 C   sV   |   }t|d}tdD ]
}|d|f  qW d    |S 1 s$w   Y  |S )Nwbi      %d
)rM   rB   rangewrite)r   junkPathjunkFileir   r   r   	_junkPath  s   
z"StandardInputOutputTests._junkPathc                    sd   t  j}g ttd fdd dj  fdd}||S )z
        Verify that the transport of a protocol connected to L{StandardIO}
        is a working L{IProducer} provider.
        d   c                    s<   r d f  d  td d  d S d S )Nrp   g{Gz?)appendpoprr   r   	callLater)ign)r   proctoWritewrittenr   r   r     s
   z>StandardInputOutputTests.test_producer.<locals>.connectionMades   stdio_test_producerc                    s>     jd d dtf  | tj d S )NrA   r   z*Connection lost with %d writes left to go.)rj   r   joinassertFalselenrF   r   rG   rU   )rL   r   r~   r   r   r   r&     s
   z<StandardInputOutputTests.test_producer.<locals>.processEnded)r   r   r1   rq   r8   r   rV   r?   rN   r   )r   rL   r}   r   r~   r   r   test_producer  s   z&StandardInputOutputTests.test_producerc                    s>   t  j}  d   fdd}||S )z
        Verify that the transport of a protocol connected to L{StandardIO}
        is a working L{IConsumer} provider.
        s   stdio_test_consumerc                    sP   t  d}jd |  W d    n1 sw   Y  | tj d S )NrbrA   )rB   rj   r   readrF   r   rG   )r%   rH   rs   rL   r   r   r   r&   2  s   z<StandardInputOutputTests.test_consumer.<locals>.processEnded)r   r   rv   r8   r?   rN   r   r   r   test_consumer&  s   z&StandardInputOutputTests.test_consumerzpStandardIO does not accept stdout as an argument to Windows.  Testing redirection to a file is therefore harder.c                    s   t  }t|}t d _}|j	 t
| d}t s@t \}}tj	| tj	| ||d< tj|fi | dt  fddtd fdd	}|| |S )
aE  
        If L{StandardIO} is created with a file descriptor which refers to a
        normal file (ie, a file from the filesystem), L{StandardIO.write}
        writes bytes to that file.  In particular, it does not immediately
        consider the file closed or call its protocol's C{connectionLost}
        method.
        ro   )stdoutstdin   c                     s@   D ]} | kr     d S  d| f   td d S )N   %dr   )loseConnectionrr   r   r{   )value)
connectioncounthowManyspinr   r   r   ]  s   zAStandardInputOutputTests.test_normalFileStandardOut.<locals>.spinr   c                    s<    t d    ddd tD  d S )NrA   r   c                 s   s    | ]}d |f V  qdS )r   Nr   ).0ru   r   r   r   	<genexpr>m  s    zVStandardInputOutputTests.test_normalFileStandardOut.<locals>.cbLost.<locals>.<genexpr>)rj   next
getContentr   rq   rU   )r   r   pathr   r   r   cbLostj  s   zCStandardInputOutputTests.test_normalFileStandardOut.<locals>.cbLost)r   r   r   r   FilePathrM   rB   normal
addCleanupclosedictfilenor   	isWindowsospiper   
StandardIO	itertoolsr   r   r{   rV   )r   
onConnLostr4   r   kwargsrwr   r   )r   r   r   r   r   r   r   test_normalFileStandardOut9  s&   	
z3StandardInputOutputTests.test_normalFileStandardOut)r'   r(   r)   r   r   r
   skipr8   r?   rO   rW   r`   rf   rl   rn   rv   r   r   r   r   r   r   r   r   r+   N   s(    	'r+   )"r*   r   r   r.   unittestr   twisted.internetr   r   r   r   r   twisted.pythonr   r	   twisted.python.reflectr
   twisted.python.runtimer   twisted.test.test_tcpr   twisted.trial.unittestr   r   r[   r   environr3   pathsepr   r   ProcessProtocolr   r+   r   r   r   r   <module>   s    
+