o
    bXc                     @   sb  d Z ddlZddlZddlmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZmZ ddlmZmZmZ dd	lmZ dd
lmZ ddlmZ ddlmZmZ edZeejZej ej!ed< G dd deZ"ddlm#Z# ddlm$Z$ ddl%m&Z& G dd dej'Z(dZ)dZ*dZ+dZ,dZ-dZ.dZ/dZ0dZ1ddl	m2Z2 z dd l3m4Z4m5Z5m6Z6m7Z7m8Z8 dd!l9m:Z:m;Z;m<Z<m=Z=m>Z>m?Z? W n e@y   d"ZAY nGw d#ZAG d$d% d%e:jBZCG d&d' d'e;jDZEG d(d) d)e?jFZGG d*d+ d+e>jHZIG d,d- d-e7ZJG d.d/ d/e6ZKG d0d1 d1e8ZLeMeKeLe=jN G d2d3 d3e$jOZPG d4d5 d5ZQG d6d7 d7eQZRdd8lSmTZT G d9d: d:ejUeTjVZWG d;d< d<ejXeTjVZYG d=d> d>eQZZG d?d@ d@eQZ[G dAdB dBZ\G dCdD dDeZee\Z]G dEdF dFeRee\Z^ee dGG dHdI dIe[ee\Z_G dJdK dKZ`G dLdM dMeZee`ZaG dNdO dOeRee`Zbee dPG dQdR dRe[ee`ZcG dSdT dTeZddS )UzU
Tests for L{twisted.conch.recvline} and fixtures for testing related
functionality.
    N)skipIf)recvline)insults)portal)defererror)
componentsfilepathreflect)	iterbytes)requireModule)StringTransport)SkipTestTestCaseztwisted.conch.stdio
PYTHONPATHc                   @   sl   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd Zdd Z	dd Z
dd Zdd Zdd Zdd ZdS )ArrowsTestsc                    sF   t   _t  _t  _ fdd j_  j_	 j
 j d S )Nc                      s    j S N)p selfr   B/usr/lib/python3/dist-packages/twisted/conch/test/test_recvline.py<lambda>"   s    z#ArrowsTests.setUp.<locals>.<lambda>)r   underlyingTransportr   ServerProtocolptr   HistoricRecvLiner   protocolFactoryfactorymakeConnectionr   r   r   r   setUp   s   

zArrowsTests.setUpc                 C   s@   | j dd | j dd | j dd | | j  d dS )zy
        When L{HistoricRecvLine} receives a printable character,
        it adds it to the current line buffer.
           xN   y   z   xyz    )r   keystrokeReceivedassertEqualcurrentLineBufferr   r   r   r   test_printableCharacters&   s   z$ArrowsTests.test_printableCharactersc                    sF   fdd}t dD ]}|| q
  j d | jj   j d | jj   j d | jj   j d | jj   j d | jj   j d | jj   j d | jj   j d | jj   j d | jj   j d dS )	z
        When L{HistoricRecvLine} receives a LEFT_ARROW or
        RIGHT_ARROW keystroke it moves the cursor left or right
        in the current line buffer, respectively.
        c                        j | d S r   r   r'   chr   r   r   r   7       z3ArrowsTests.test_horizontalArrows.<locals>.<lambda>r%   r$   )   xyr#   )r!   s   yz)r&   r%   N)r   r(   r   r)   r   RIGHT_ARROW
LEFT_ARROWr   kRr.   r   r   r   test_horizontalArrows1   s,   
z!ArrowsTests.test_horizontalArrowsc                    s|    fdd}t dD ]}|| q
  j d |d |d |d   j d |d   j d	 d
S )z
        When {HistoricRecvLine} receives a newline, it adds the current
        line buffer to the end of its history buffer.
        c                    r+   r   r,   r-   r   r   r   r   ]   r/   z*ArrowsTests.test_newline.<locals>.<lambda>   xyz
abc
123
r%      abc   123r      c   b   a   
))r%   r9   r:   s   cbar   N)r   r(   r   currentHistoryBufferr3   r   r   r   test_newlineX   s   

zArrowsTests.test_newlinec                    s6   fdd}t dD ]}|| q
  j d   j d | jj   j d   j d | jj   j d   j d	 | jj   j d
   j d | jj   j d
   j d tdD ]}| jj q  j d dS )a  
        When L{HistoricRecvLine} receives UP_ARROW or DOWN_ARROW
        keystrokes it move the current index in the current history
        buffer up or down, and resets the current line buffer to the
        previous or next line in history, respectively for each.
        c                    r+   r   r,   r-   r   r   r   r   u   r/   z1ArrowsTests.test_verticalArrows.<locals>.<lambda>r6   r7   r&   r&   ))r%   r9   )r:   )r:   r&   ))r%   )r9   r:   )r9   r&   )r   r8   r$      N)	r   r(   r   r?   r)   r   UP_ARROWrange
DOWN_ARROW)r   r4   r.   ir   r   r   test_verticalArrowsn   s(   
zArrowsTests.test_verticalArrowsc                    sV    fdd}t dD ]}|| q
  j d | jj   j d dS )z
        When L{HistoricRecvLine} receives a HOME keystroke it moves the
        cursor to the beginning of the current line buffer.
        c                    r+   r   r,   r-   r   r   r   r      r/   z'ArrowsTests.test_home.<locals>.<lambda>   hello, worldrH   r&   )r&   rH   N)r   r(   r   r)   r   HOMEr3   r   r   r   	test_home   s   
zArrowsTests.test_homec                    sb    fdd}t dD ]}|| q
  j d | jj | jj   j d dS )z
        When L{HistoricRecvLine} receives an END keystroke it moves the cursor
        to the end of the current line buffer.
        c                    r+   r   r,   r-   r   r   r   r      r/   z&ArrowsTests.test_end.<locals>.<lambda>rH   rI   N)r   r(   r   r)   r   rJ   ENDr3   r   r   r   test_end   s   
zArrowsTests.test_endc                    s    fdd}t dD ]}|| q
  j d | jj   j d | jj | jj   j d | jj   j d dS )z
        When L{HistoricRecvLine} receives a BACKSPACE keystroke it deletes
        the character immediately before the cursor.
        c                    r+   r   r,   r-   r   r   r   r      r/   z,ArrowsTests.test_backspace.<locals>.<lambda>r%   r$   r0   r&   )r&   r"   N)r   r(   r   r)   r   	BACKSPACEr2   r3   r   r   r   test_backspace   s   
zArrowsTests.test_backspacec                    s    fdd}t dD ]}|| q
  j d | jj   j d | jj | jj   j d | jj | jj   j d | jj | jj   j d | jj   j d dS )	z
        When L{HistoricRecvLine} receives a DELETE keystroke, it
        delets the character immediately after the cursor.
        c                    r+   r   r,   r-   r   r   r   r      r/   z)ArrowsTests.test_delete.<locals>.<lambda>r%   r$   rN   )r!   r&   rA   N)r   r(   r   r)   r   DELETEr2   r3   r   r   r   test_delete   s"   
zArrowsTests.test_deletec                    sr    fdd}t dD ]}|| q
| jj |d   j d | jj |d   j d dS )	z
        When not in INSERT mode, L{HistoricRecvLine} inserts the typed
        character at the cursor before the next character.
        c                    r+   r   r,   r-   r   r   r   r      r/   z)ArrowsTests.test_insert.<locals>.<lambda>r%      A)   xyAr#      B)   xyBs   AzN)r   r   r2   r(   r   r)   r3   r   r   r   test_insert   s   
zArrowsTests.test_insertc                    s~    fdd}t dD ]}|| q
| jj | jj |d   j d | jj |d   j d dS )	a  
        When in INSERT mode and upon receiving a keystroke with a printable
        character, L{HistoricRecvLine} replaces the character at
        the cursor with the typed character rather than inserting before.
        Ah, the ironies of INSERT mode.
        c                    r+   r   r,   r-   r   r   r   r      r/   z+ArrowsTests.test_typeover.<locals>.<lambda>r%   rS   )rT   r&   rU   )rV   r&   N)r   r   INSERTr2   r(   r   r)   r3   r   r   r   test_typeover   s   
zArrowsTests.test_typeoverc                    sr    fdd} j }|j|j|j|j|j|j|j|j|j	|j
|j|j|j|jfD ]}||   j d q'dS )z
        When L{HistoricRecvLine} receives a keystroke for an unprintable
        function key with no assigned behavior, the line buffer is unmodified.
        c                    r+   r   r,   r-   r   r   r   r     r/   z8ArrowsTests.test_unprintableCharacters.<locals>.<lambda>rA   N)r   F1F2F3F4F5F6F7F8F9F10F11F12PGUPPGDNr(   r   r)   )r   r4   r   r.   r   r   r   test_unprintableCharacters  s(   z&ArrowsTests.test_unprintableCharactersN)__name__
__module____qualname__r    r*   r5   r@   rG   rK   rM   rP   rR   rW   rY   rh   r   r   r   r   r      s    '#r   )telnet)helper)LoopbackRelayc                   @      e Zd Zdd ZdS )
EchoServerc                 C   s    | j |d | j| j   d S )Nr>   )terminalwritepspn)r   liner   r   r   lineReceived*  s    zEchoServer.lineReceivedN)ri   rj   rk   rv   r   r   r   r   rp   )      rp   s   [Ds   [Cs   [As   [Bs   [2~s   [1~s   [3~s   [4~   )checkers)ConchFactoryTerminalRealmTerminalSessionTerminalSessionTransportTerminalUser)channel
connectionkeyssession	transportuserauthFTc                   @   s0   e Zd ZdZdd Zdd Zdd Zdd	 Zd
S )SessionChannels   sessionc                 O   <   t jj| g|R i | || _|| _|| _|| _|| _d S r   )r   
SSHChannel__init__r   protocolArgsprotocolKwArgswidthheightr   r   r   r   r   r   akwr   r   r   r   S     
zSessionChannel.__init__c                 C   sh   t d| j| jddfd}| j| d| | j| dd | j| ji | j| _	| | j	_
| j	|  d S )Ns   vt102r   r&   s   pty-reqs   shell)r   packRequest_pty_reqr   r   connsendRequestr   r   r   _protocolInstancer   r   )r   datatermr   r   r   channelOpen_  s   zSessionChannel.channelOpenc                 C   s   | j t  d S r   )r   connectionLostr   ConnectionDoner   r   r   r   closedl  s   zSessionChannel.closedc                 C      | j | d S r   )r   dataReceivedr   r   r   r   r   r   o     zSessionChannel.dataReceivedN)ri   rj   rk   namer   r   r   r   r   r   r   r   r   P  s    r   c                   @   $   e Zd Zdd Zdd Zdd ZdS )TestConnectionc                 O   r   r   )r   SSHConnectionr   r   r   r   r   r   r   r   r   r   r   s  r   zTestConnection.__init__c                 C   s,   t | j| j| j| j| j| _| | j d S r   )r   r   r   r   r   r   _TestConnection__channelopenChannelr   r   r   r   serviceStarted  s   zTestConnection.serviceStartedc                 C      | j |S r   )r   rr   r   r   r   r   rr        zTestConnection.writeN)ri   rj   rk   r   r   rr   r   r   r   r   r   r  s    
r   c                   @      e Zd Zdd Zdd ZdS )TestAuthc                 O   s&   t jj| |g|R i | || _d S r   )r   SSHUserAuthClientr   password)r   usernamer   r   r   r   r   r   r     s   
zTestAuth.__init__c                 C   s   t | jS r   )r   succeedr   r   r   r   r   getPassword  r   zTestAuth.getPasswordN)ri   rj   rk   r   r   r   r   r   r   r     s    r   c                   @   s,   e Zd Zdd Zdd Zdd Zdd Zd	S )
TestTransportc           
      O   s.   || _ || _|| _|| _|| _|| _|| _d S r   )r   r   r   r   r   r   r   )
r   r   r   r   r   r   r   r   r   r   r   r   r   r     s   
zTestTransport.__init__c                 C   s
   t dS )NT)r   r   )r   hostKeyfingerprintr   r   r   verifyHostKey  s   
zTestTransport.verifyHostKeyc                 C   s8   t | j| j| j| j| j| _| t| j	| j
| j d S r   )r   r   r   r   r   r   _TestTransport__connectionrequestServicer   r   r   r   r   r   r   connectionSecure  s   zTestTransport.connectionSecurec                 C   r   r   )r   rr   r   r   r   r   rr     r   zTestTransport.writeN)ri   rj   rk   r   r   r   rr   r   r   r   r   r     s
    r   c                   @   ro   )TestSessionTransportc                 C   s   | j jjj S r   )avatarr   r   r   serverProtocolr   r   r   r   r     r   z$TestSessionTransport.protocolFactoryN)ri   rj   rk   r   r   r   r   r   r     rw   r   c                   @   s   e Zd ZeZdS )TestSessionN)ri   rj   rk   r   transportFactoryr   r   r   r   r     s    r   c                   @      e Zd ZdS )TestUserNri   rj   rk   r   r   r   r   r         r   c                   @   r   )NotifyingExpectableBufferc                 C   s   t  | _t  | _d S r   )r   DeferredonConnectiononDisconnectionr   r   r   r   r     s   
z"NotifyingExpectableBuffer.__init__c                 C   s   t j|  | j|  d S r   )rm   ExpectableBufferconnectionMader   callbackr   r   r   r   r     s   z(NotifyingExpectableBuffer.connectionMadec                 C   r   r   )r   errback)r   reasonr   r   r   r     r   z(NotifyingExpectableBuffer.connectionLostN)ri   rj   rk   r   r   r   r   r   r   r   r     s    r   c                   @   s$   e Zd ZdZdZdd Zdd ZdS )
_BaseMixinP      c                 C   s   | j   }|dg| jt| d   }| t|t| tt|D ]-}| || || d|td|d |d  d d|td|d |d    q%d S )Nr&      r   s    != )	recvlineClient	__bytes__
splitlinesHEIGHTlenr(   rD   joinmax)r   linesreceivedLinesexpectedLinesrF   r   r   r   _assertBuffer  s   z_BaseMixin._assertBufferc                    s.   j d}|  fdd}||S )Ns   donec                    s      d S r   )r   )ignoutputr   r   r   finished  s   z)_BaseMixin._trivialTest.<locals>.finished)r   expect
_testwriteaddCallback)r   	inputLiner   doner   r   r   r   _trivialTest  s   

z_BaseMixin._trivialTestN)ri   rj   rk   WIDTHr   r   r   r   r   r   r   r     s
    r   c                   @   r   )	_SSHMixinc                    s>  t stdd\}}t }t|_fdd|_t }||| t	
|}|| t|}tjt|  dd}||jd< ||jd< | j|_|  |  tfdd|d }t|}	t tfd	d t fd
ddi ||| j| j}
t|
}|
|	 || | _ |
| _!|| _"|	| _#|| _$j%S )NzMcryptography requirements missing, can't run historic recvline tests over ssh)s   testusers   testpassc                          S r   r   r   insultsServerr   r   r         z!_SSHMixin.setUp.<locals>.<lambda>i   )keySizes   ssh-rsac                      r   r   r   r   recvlineServerr   r   r     r   c                      r   r   r   r   r   r   r   r     r   c                      r   r   r   r   insultsClientr   r   r     r   r   )&sshr   r{   r   userFactorychainedProtocolFactoryry   'InMemoryUsernamePasswordDatabaseDontUseaddUserr   PortalregisterCheckerrz   r   _getPersistentRSAKeyr	   FilePathmktemp
publicKeysprivateKeysr   startFactoryr   r   buildProtocolrn   r   ClientProtocolr   r   r   r   r   	sshClient	sshServerclientTransportserverTransportr   )r   ur   rlmcheckerptl
sshFactorysshKeyr   r   r   r   r   r   r   r   r   r   r      sL   






z_SSHMixin.setUpc                 C   r   r   )r   rr   r   r   r   r   r   $  r   z_SSHMixin._testwriteNri   rj   rk   r    r   r   r   r   r   r     s    2r   )test_telnetc                   @   r   )TestInsultsClientProtocolNr   r   r   r   r   r	  +  r   r	  c                   @   r   )TestInsultsServerProtocolNr   r   r   r   r   r
  /  r   r
  c                   @   r   )_TelnetMixinc                    s   |   tfddtfdd}t|}t tfdd t fdd}t|}|| || |  |  | _	|| _
|| _|| _jS )Nc                      r   r   r   r   r   r   r   r   6  r   z$_TelnetMixin.setUp.<locals>.<lambda>c                      r   r   r   r   r   r   r   r   7  r   c                      r   r   r   r   r   r   r   r   ;  r   c                      r   r   r   r   r   r   r   r   <  r   )r   r
  rl   TelnetTransportrn   r   r	  r   clearBufferr   telnetClientr   r   r   )r   telnetServerr   r  r   r   r  r   r    4  s"   

z_TelnetMixin.setUpc                 C   r   r   )r  rr   r   r   r   r   r   L  r   z_TelnetMixin._testwriteNr  r   r   r   r   r  3  s    r  c                   @   r   )_StdioMixinc                    s   t   t fdd}t|}tj}tj}|ds!|dr'|d d }||t	
| jg}ddlm} |j|||tdd	}  | _| _|| _|| _ttd |j d
gS )Nc                      r   r   r   r   testTerminalr   r   r   [  r   z#_StdioMixin.setUp.<locals>.<lambda>z.pycz.pyor   )reactorT)envusePTYs   >>> )r   r   r   stdioTerminalProcessProtocolsys
executable__file__endswithr
   qualr   twisted.internetr  spawnProcess	properEnvr   r  processClientr   r   gatherResultsfilterr   r   )r   r   r!  exemoduleargsr  r   r   r  r   r    Q  s$   

z_StdioMixin.setUpc              	      sD   z j d W n tjtfy   Y nw  fdd} jj|S )NKILLc                    s.   |  tj  | jj  | jjd d S )N	   )trapr   ProcessTerminatedassertIsNonevalueexitCoder(   status)failurer   r   r   r)    s   z"_StdioMixin.tearDown.<locals>.trap)r   signalProcessr   ProcessExitedAlreadyOSErrorr  r   
addErrback)r   r)  r   r   r   tearDown|  s   z_StdioMixin.tearDownc                 C   r   r   )r   rr   r   r   r   r   r     r   z_StdioMixin._testwriteN)ri   rj   rk   r    r4  r   r   r   r   r   r  P  s    +r  c                   @   sX   e Zd ZeZdd Zdd Zdd Zdd Zd	d
 Z	dd Z
dd Zdd Zdd ZdS )RecvlineLoopbackMixinc                 C   s   |  dg dS )Ns   first line
done)   >>> first line
   first line   >>> done)r   r   r   r   r   
testSimple  s   z RecvlineLoopbackMixin.testSimplec                 C   s    |  td td  d g dS )Nr7  rB   	   xxxx
done)s   >>> first xxxxs
   first xxxxr8  )r   insertleftr   r   r   r   testLeftArrow     z#RecvlineLoopbackMixin.testLeftArrowc                 C   s(   |  td td  td  d g dS )Ns
   right linerB      s   xx
done)s   >>> right lixxs
   right lixxr8  )r   r;  r<  rightr   r   r   r   testRightArrow  s   z$RecvlineLoopbackMixin.testRightArrowc                 C      |  dtd  d g dS )N   second linerB   r:  )s   >>> second xxxxs   second xxxxr8  )r   	backspacer   r   r   r   testBackspace     z#RecvlineLoopbackMixin.testBackspacec                 C   s$   |  dtd  td  d g dS )Ns   delete xxxxrB   	   line
done)s   >>> delete lines   delete liner8  )r   r<  deleter   r   r   r   
testDelete  s   z RecvlineLoopbackMixin.testDeletec                 C   rB  )Ns	   third ine      l
done)s   >>> third lines
   third liner8  )r   r<  r   r   r   r   
testInsert  rF  z RecvlineLoopbackMixin.testInsertc                 C       |  dtd  t d g dS )Ns   fourth xinerB   rK  )s   >>> fourth lines   fourth liner8  )r   r<  r;  r   r   r   r   testTypeover  r>  z"RecvlineLoopbackMixin.testTypeoverc                 C   s   |  td t d g dS )Ns	   blah lines	   home
done)s   >>> home lines	   home liner8  )r   r;  homer   r   r   r   testHome  rF  zRecvlineLoopbackMixin.testHomec                 C   rM  )Ns   end rB   rG  )s   >>> end lines   end liner8  )r   r<  endr   r   r   r   testEnd  r>  zRecvlineLoopbackMixin.testEndN)ri   rj   rk   rp   r   r9  r=  rA  rE  rI  rL  rN  rP  rR  r   r   r   r   r5    s    r5  c                   @   r   )RecvlineLoopbackTelnetTestsNr   r   r   r   r   rS    r   rS  c                   @   r   )RecvlineLoopbackSSHTestsNr   r   r   r   r   rT    r   rT  zBTerminal requirements missing, can't run recvline tests over stdioc                   @   r   )RecvlineLoopbackStdioTestsNr   r   r   r   r   rU    s    rU  c                   @   s(   e Zd ZeZdd Zdd Zdd ZdS )HistoricRecvlineLoopbackMixinc                 C   s   |  dt d g dS )Ns   first line
   
done)r6  r7  r6  r7  r8  )r   upr   r   r   r   testUpArrow  s   
z)HistoricRecvlineLoopbackMixin.testUpArrowc                 C   s   |  dt t d g dS )aH  
        Pressing down arrow to visit an entry that was added to the
        history by pressing the up arrow instead of return does not
        raise a L{TypeError}.

        @see: U{http://twistedmatrix.com/trac/ticket/9031}

        @return: A L{defer.Deferred} that fires when C{b"done"} is
            echoed back.
        s   first line
partial linerW  )r6  r7  s   >>> partial lines   partial liner8  r   rX  downr   r   r   r   $test_DownArrowToPartialLineInHistory  s   zBHistoricRecvlineLoopbackMixin.test_DownArrowToPartialLineInHistoryc                 C   rM  )Ns   first line
second line
r?  rW  )r6  r7     >>> second linerC  r]  rC  r8  rZ  r   r   r   r   testDownArrow  r>  z+HistoricRecvlineLoopbackMixin.testDownArrowN)ri   rj   rk   rp   r   rY  r\  r^  r   r   r   r   rV    s
    rV  c                   @   r   )#HistoricRecvlineLoopbackTelnetTestsNr   r   r   r   r   r_  	      r_  c                   @   r   ) HistoricRecvlineLoopbackSSHTestsNr   r   r   r   r   ra    r`  ra  zKTerminal requirements missing, can't run historic recvline tests over stdioc                   @   r   )"HistoricRecvlineLoopbackStdioTestsNr   r   r   r   r   rb    s    rb  c                   @   s   e Zd ZdZdd ZdS )TransportSequenceTestsz5
    L{twisted.conch.recvline.TransportSequence}
    c                 C   s   |  ttj dS )zh
        Initializing a L{recvline.TransportSequence} with no args
        raises an assertion.
        N)assertRaisesAssertionErrorr   TransportSequencer   r   r   r   test_invalidSequence$  s   z+TransportSequenceTests.test_invalidSequenceN)ri   rj   rk   __doc__rg  r   r   r   r   rc    s    rc  )erh  osr  unittestr   twisted.conchr   twisted.conch.insultsr   twisted.credr   r  r   r   twisted.pythonr   r	   r
   twisted.python.compatr   twisted.python.reflectr   twisted.test.proto_helpersr   twisted.trial.unittestr   r   r  dictenvironr   pathsepr   pathr   rl   rm   twisted.conch.test.loopbackrn   r   rp   r<  r@  rX  r[  r;  rO  rH  rQ  rD  ry   twisted.conch.manhole_sshrz   r{   r|   r}   r~   twisted.conch.sshr   r   r   r   r   r   ImportErrorr   r   r   r   r   r   r   SSHClientTransportr   r   r   r   registerAdapterISessionr   r   r   r   twisted.conch.testr  r   TestProtocolr	  r   r
  r  r  r5  rS  rT  rU  rV  r_  ra  rb  rc  r   r   r   r   <module>   s   
  	$"'7>9

5

