o
    b%                     @   s   d Z ddlmZ ddlmZmZ ddlmZ ddlm	Z	 dZ
zddlmZ ddlmZ W n ey7   d	Z
Y nw dd
lmZ ddlmZ G dd deZG dd de	ZG dd de	ZG dd deZG dd de	ZG dd de	ZG dd de	ZdS )z>
Tests for L{twisted.internet.posixbase} and supporting code.
    )Deferred)PosixReactorBase_Waker)ServerFactory)TestCaseN)unix)ClientProtoz)Platform does not support AF_UNIX socketsreactor)Portc                   @   s4   e Zd Zdd Zdd Zdd Zdd Zd	d
 ZdS )TrivialReactorc                 C   s   i | _ i | _t|  d S N)_readers_writersr   __init__self r   F/usr/lib/python3/dist-packages/twisted/internet/test/test_posixbase.pyr      s   zTrivialReactor.__init__c                 C      d| j |< d S NTr   r   readerr   r   r   	addReader      zTrivialReactor.addReaderc                 C      | j |= d S r   r   r   r   r   r   removeReader"      zTrivialReactor.removeReaderc                 C   r   r   r   r   writerr   r   r   	addWriter%   r   zTrivialReactor.addWriterc                 C   r   r   r   r    r   r   r   removeWriter(   r   zTrivialReactor.removeWriterN)__name__
__module____qualname__r   r   r   r"   r#   r   r   r   r   r      s    r   c                   @   s0   e Zd ZdZdd Zdd Zdd Zdd	 Zd
S )PosixReactorBaseTestsz(
    Tests for L{PosixReactorBase}.
    c                 C   s2   |  |jt | |j|j | |j|j d S r   )assertIsInstancewakerr   assertIn_internalReadersr   r   r
   r   r   r   _checkWaker1   s   z!PosixReactorBaseTests._checkWakerc                 C   s   t  }| | dS )z
        When L{PosixReactorBase} is instantiated, it creates a waker and adds
        it to its internal readers set.
        N)r   r-   r,   r   r   r   test_wakerIsInternalReader6   s   z0PosixReactorBaseTests.test_wakerIsInternalReaderc                 C   s\   t  }t }|j| || ||j|j | | | 	||j | 	||j dS )z
        Any L{IReadDescriptors} in L{PosixReactorBase._internalReaders} are
        left alone by L{PosixReactorBase._removeAll}.
        N)
r   objectr+   addr   
_removeAllr   r   r-   r*   )r   r
   extrar   r   r   "test_removeAllSkipsInternalReaders>   s   

z8PosixReactorBaseTests.test_removeAllSkipsInternalReadersc                 C   sj   t  }t }t }|| || ||j|j}| t|||h | 	||j | 	||j dS )z
        L{PosixReactorBase._removeAll} returns a list of removed
        L{IReadDescriptor} and L{IWriteDescriptor} objects.
        N)
r   r/   r   r"   r1   r   r   assertEqualsetassertNotIn)r   r
   r   r!   removedr   r   r   'test_removeAllReturnsRemovedDescriptorsL   s   

z=PosixReactorBaseTests.test_removeAllReturnsRemovedDescriptorsN)r$   r%   r&   __doc__r-   r.   r3   r8   r   r   r   r   r'   ,   s    r'   c                   @   s&   e Zd ZdZeeesdZdd ZdS )TCPPortTestsz1
    Tests for L{twisted.internet.tcp.Port}.
    zNon-posixbase reactorc                 C   s,   t dt }d|_dd |_| | tS )z
        L{Port.stopListening} returns a L{Deferred} which errbacks if
        L{Port.connectionLost} raises an exception.
        i90  Tc                 S   s   dd S )N   r   r   )reasonr   r   r   <lambda>k       z8TCPPortTests.test_connectionLostFailed.<locals>.<lambda>)r   r   	connectedconnectionLostassertFailurestopListeningZeroDivisionError)r   portr   r   r   test_connectionLostFailedd   s   
z&TCPPortTests.test_connectionLostFailedN)	r$   r%   r&   r9   
isinstancer
   r   skiprE   r   r   r   r   r:   \   s
    
r:   c                   @   s8   e Zd ZdZdd Zdd Zdd Zdd	 Zd
d ZdS )TimeoutReportReactorz
    A reactor which is just barely runnable and which cannot monitor any
    readers or writers, and which fires a L{Deferred} with the timeout
    passed to its C{doIteration} method as soon as that method is invoked.
    c                 C   s   t |  t | _d| _d S )Nd   )r   r   r   iterationTimeoutnowr   r   r   r   r   v   s   

zTimeoutReportReactor.__init__c                 C   s   dS )z
        Ignore the reader.  This is necessary because the waker will be
        added.  However, we won't actually monitor it for any events.
        Nr   r   r   r   r   r   {       zTimeoutReportReactor.addReaderc                 C   s   g S )z
        There are no readers or writers, so there is nothing to remove.
        This will be called when the reactor stops, though, so it must be
        implemented.
        r   r   r   r   r   	removeAll   s   zTimeoutReportReactor.removeAllc                 C   s   | j S )zx
        Override the real clock with a deterministic one that can be easily
        controlled in a unit test.
        )rK   r   r   r   r   seconds   s   zTimeoutReportReactor.secondsc                 C   s&   | j }|d urd | _ || d S d S r   )rJ   callback)r   timeoutdr   r   r   doIteration   s
   z TimeoutReportReactor.doIterationN)	r$   r%   r&   r9   r   r   rM   rN   rR   r   r   r   r   rH   o   s    rH   c                   @   sP   e Zd ZdZdd Zdd Zdd Zdd	 Zd
d Zdd Z	dd Z
dd ZdS )IterationTimeoutTestsz
    Tests for the timeout argument L{PosixReactorBase.run} calls
    L{PosixReactorBase.doIteration} with in the presence of various delayed
    calls.
    c                    s6   g } j |j  j  fdd    |d S )Nc                    s      S r   )stop)ignoredr	   r   r   r=      r>   z>IterationTimeoutTests._checkIterationTimeout.<locals>.<lambda>r   )rJ   addCallbackappendrunr   r
   rP   r   r	   r   _checkIterationTimeout   s
   z,IterationTimeoutTests._checkIterationTimeoutc                 C   s   t  }| |}| | dS )zl
        If there are no delayed calls, C{doIteration} is called with a
        timeout of L{None}.
        N)rH   rZ   assertIsNonerY   r   r   r   test_noCalls   s   
z"IterationTimeoutTests.test_noCallsc                 C   s0   t  }|ddd  | |}| |d dS )z
        If there is a delayed call, C{doIteration} is called with a timeout
        which is the difference between the current time and the time at
        which that call is to run.
        rI   c                   S      d S r   r   r   r   r   r   r=      rL   z8IterationTimeoutTests.test_delayedCall.<locals>.<lambda>NrH   	callLaterrZ   r4   rY   r   r   r   test_delayedCall   s   
z&IterationTimeoutTests.test_delayedCallc                 C   s>   t  }|ddd  | jd7  _| |}| |d dS )z
        If a delayed call is scheduled and then some time passes, the
        timeout passed to C{doIteration} is reduced by the amount of time
        which passed.
        rI   c                   S   r]   r   r   r   r   r   r   r=      rL   z7IterationTimeoutTests.test_timePasses.<locals>.<lambda>   K   N)rH   r_   rK   rZ   r4   rY   r   r   r   test_timePasses   s
   
z%IterationTimeoutTests.test_timePassesc                 C   sP   t  }|ddd  |ddd  |ddd  | |}| |d dS )	z
        If there are several delayed calls, C{doIteration} is called with a
        timeout which is the difference between the current time and the
        time at which the earlier of the two calls is to run.
        2   c                   S   r]   r   r   r   r   r   r   r=      rL   zAIterationTimeoutTests.test_multipleDelayedCalls.<locals>.<lambda>
   c                   S   r]   r   r   r   r   r   r   r=      rL   rI   c                   S   r]   r   r   r   r   r   r   r=      rL   Nr^   rY   r   r   r   test_multipleDelayedCalls   s   
z/IterationTimeoutTests.test_multipleDelayedCallsc                 C   sH   t  }|ddd }| jd7  _|d | |}| |d dS )z
        If a delayed call is reset, the timeout passed to C{doIteration} is
        based on the interval between the time when reset is called and the
        new delay of the call.
        rd   c                   S   r]   r   r   r   r   r   r   r=      rL   z=IterationTimeoutTests.test_resetDelayedCall.<locals>.<lambda>ra      N)rH   r_   rK   resetrZ   r4   r   r
   callrP   r   r   r   test_resetDelayedCall   s   

z+IterationTimeoutTests.test_resetDelayedCallc                 C   sH   t  }|ddd }| jd7  _|d | |}| |d dS )z
        If a delayed call is re-delayed, the timeout passed to
        C{doIteration} is based on the remaining time before the call would
        have been made and the additional amount of time passed to the delay
        method.
        rd   c                   S   r]   r   r   r   r   r   r   r=      rL   z=IterationTimeoutTests.test_delayDelayedCall.<locals>.<lambda>re      <   N)rH   r_   rK   delayrZ   r4   ri   r   r   r   test_delayDelayedCall   s   

z+IterationTimeoutTests.test_delayDelayedCallc                 C   s6   t  }|ddd }|  | |}| | dS )zp
        If the only delayed call is canceled, L{None} is the timeout passed
        to C{doIteration}.
        rd   c                   S   r]   r   r   r   r   r   r   r=      rL   z>IterationTimeoutTests.test_cancelDelayedCall.<locals>.<lambda>N)rH   r_   cancelrZ   r[   ri   r   r   r   test_cancelDelayedCall   s
   
z,IterationTimeoutTests.test_cancelDelayedCallN)r$   r%   r&   r9   rZ   r\   r`   rc   rf   rk   ro   rq   r   r   r   r   rS      s    	rS   c                   @   s,   e Zd ZdZedureZdd Zdd ZdS )ConnectedDatagramPortTestsz/
    Test connected datagram UNIX sockets.
    Nc                    s.    fdd}t dt }||_|d dS )z
        L{ConnectedDatagramPort} does not call the deprecated C{loseConnection}
        in L{ConnectedDatagramPort.connectionFailed}.
        c                      s     d dS )z
            Dummy C{loseConnection} method. C{loseConnection} is deprecated and
            should not get called.
            z7loseConnection is deprecated and should not get called.N)failr   r   r   r   loseConnection  s   z`ConnectedDatagramPortTests.test_connectionFailedDoesntCallLoseConnection.<locals>.loseConnectionNgoodbye)r   ConnectedDatagramPortr   rt   connectionFailed)r   rt   rD   r   r   r   -test_connectionFailedDoesntCallLoseConnection  s   zHConnectedDatagramPortTests.test_connectionFailedDoesntCallLoseConnectionc                    s@   d _  fdd}tdt }||_|d   j  dS )z
        L{ConnectedDatagramPort} calls L{ConnectedDatagramPort.stopListening}
        instead of the deprecated C{loseConnection} in
        L{ConnectedDatagramPort.connectionFailed}.
        Fc                      s
   d _ dS )z8
            Dummy C{stopListening} method.
            TN)calledr   r   r   r   rB     s   
zYConnectedDatagramPortTests.test_connectionFailedCallsStopListening.<locals>.stopListeningNru   )ry   r   rv   r   rB   rw   
assertTrue)r   rB   rD   r   r   r   'test_connectionFailedCallsStopListening  s   
zBConnectedDatagramPortTests.test_connectionFailedCallsStopListening)r$   r%   r&   r9   skipSocketsrG   rx   r{   r   r   r   r   rr      s    rr   c                   @   s   e Zd Zdd ZdS )
WakerTestsc                 C   s0   t d d}|  }|d  | t|d d S )Nr	   r   )r   flushWarningsr@   r4   len)r   r)   warningsr   r   r    test_noWakerConstructionWarnings'  s   

z+WakerTests.test_noWakerConstructionWarningsN)r$   r%   r&   r   r   r   r   r   r}   &  s    r}   )r9   twisted.internet.deferr   twisted.internet.posixbaser   r   twisted.internet.protocolr   twisted.trial.unittestr   r|   twisted.internetr   twisted.test.test_unixr   ImportErrorr
   twisted.internet.tcpr   r   r'   r:   rH   rS   rr   r}   r   r   r   r   <module>   s*   0(b-