o
    b                     @   s  U d Z ddlmZ ddlmZ ddlmZ ddlmZm	Z	 ddl
mZ ddlmZmZmZ ddlmZ dd	lmZmZ dd
lmZmZ ddlmZ ddlmZ ddlmZ dZee ed< edredrddl m!Z! ddl"mZm#Z#m$Z$ ddl%m&Z& ddl'm(Z( nG dd dZ#G dd dZ$G dd de$j)Z*G dd de$j)Z+G dd de$j)Z,G d d! d!e#j-Z.eeG d"d# d#Z/eeG d$d% d%Z0eeG d&d' d'Z1eeG d(d) d)Z2G d*d+ d+ej3Z4G d,d- d-ej3Z5G d.d/ d/ej3Z6G d0d1 d1ej3Z7dS )2zT
Tests for the implementation of the ssh-userauth service.

Maintainer: Paul Swartz
    )
ModuleType)Optional)implementer)
ConchErrorValidPublicKey)ICredentialsChecker)
IAnonymousISSHPrivateKeyIUsernamePassword)UnauthorizedLogin)IRealmPortal)defertask)loopback)requireModule)unittestNkeyscryptographypyasn1)SSHProtocolChecker)r   	transportuserauth)NS)keydatac                   @      e Zd ZG dd dZdS )r   c                   @      e Zd ZdZdS )ztransport.SSHTransportBaseQ
            A stub class so that later class definitions won't die.
            N__name__
__module____qualname____doc__ r#   r#   B/usr/lib/python3/dist-packages/twisted/conch/test/test_userauth.pySSHTransportBase"       r%   N)r   r    r!   r%   r#   r#   r#   r$   r   !       r   c                   @   r   )r   c                   @   r   )zuserauth.SSHUserAuthClientr   Nr   r#   r#   r#   r$   SSHUserAuthClient(   r&   r(   N)r   r    r!   r(   r#   r#   r#   r$   r   '   r'   r   c                   @   s2   e Zd ZdZdd Zdd ZdddZd	d
 ZdS )ClientUserAuthz"
    A mock user auth client.
    c                 C   s(   | j r
tjtjS ttjtjS )z
        If this is the first time we've been called, return a blob for
        the DSA key.  Otherwise, return a blob
        for the RSA key.
        )	lastPublicKeyr   Key
fromStringr   publicRSA_opensshr   succeedpublicDSA_opensshselfr#   r#   r$   getPublicKey3   s   zClientUserAuth.getPublicKeyc                 C   s   t tjtjS )z@
        Return the private key object for the RSA key.
        )r   r.   r   r+   r,   r   privateRSA_opensshr0   r#   r#   r$   getPrivateKey>      zClientUserAuth.getPrivateKeyNc                 C   
   t dS )z/
        Return 'foo' as the password.
           foor   r.   )r1   promptr#   r#   r$   getPasswordD      
zClientUserAuth.getPasswordc                 C   r6   )z>
        Return 'foo' as the answer to two questions.
        )foor<   r8   )r1   nameinformationanswersr#   r#   r$   getGenericAnswersJ   r;   z ClientUserAuth.getGenericAnswersN)r   r    r!   r"   r2   r4   r:   r@   r#   r#   r#   r$   r)   .   s    
r)   c                   @       e Zd ZdZdd Zdd ZdS )OldClientAuthz~
    The old SSHUserAuthClient returned a cryptography key object from
    getPrivateKey() and a string from getPublicKey
    c                 C   s   t tjtjjS rA   )r   r.   r   r+   r,   r   r3   	keyObjectr0   r#   r#   r$   r4   W      zOldClientAuth.getPrivateKeyc                 C   s   t jtj S rA   )r   r+   r,   r   r-   blobr0   r#   r#   r$   r2   Z   s   zOldClientAuth.getPublicKeyNr   r    r!   r"   r4   r2   r#   r#   r#   r$   rC   Q   s    rC   c                   @   rB   )ClientAuthWithoutPrivateKeyzP
    This client doesn't have a private key, but it does have a public key.
    c                 C      d S rA   r#   r0   r#   r#   r$   r4   c      z)ClientAuthWithoutPrivateKey.getPrivateKeyc                 C   s   t jtjS rA   )r   r+   r,   r   r-   r0   r#   r#   r$   r2   f      z(ClientAuthWithoutPrivateKey.getPublicKeyNrG   r#   r#   r#   r$   rH   ^   s    rH   c                   @   sL   e Zd ZdZG dd dZG dd dZdd Zdd	 Zd
d Zdd Z	dS )FakeTransporta_  
    L{userauth.SSHUserAuthServer} expects an SSH transport which has a factory
    attribute which has a portal attribute. Because the portal is important for
    testing authentication, we need to be able to provide an interesting portal
    object to the L{SSHUserAuthServer}.

    In addition, we want to be able to capture any packets sent over the
    transport.

    @ivar packets: a list of 2-tuples: (messageType, data).  Each 2-tuple is
        a sent packet.
    @type packets: C{list}
    @param lostConnecion: True if loseConnection has been called on us.
    @type lostConnection: L{bool}
    c                   @   s   e Zd ZdZdZdd ZdS )zFakeTransport.ServicezW
        A mock service, representing the other service offered by the server.
           nancyc                 C   rI   rA   r#   r0   r#   r#   r$   serviceStarted   rJ   z$FakeTransport.Service.serviceStartedN)r   r    r!   r"   r=   rN   r#   r#   r#   r$   Service{   s    rO   c                   @      e Zd ZdZdd ZdS )zFakeTransport.Factoryzg
        A mock factory, representing the factory that spawned this user auth
        service.
        c                 C   s   |dkrt jS dS )z2
            Return our fake service.
               noneN)rL   rO   )r1   r   servicer#   r#   r$   
getService   s   z FakeTransport.Factory.getServiceN)r   r    r!   r"   rS   r#   r#   r#   r$   Factory   s    rT   c                 C   s(   |   | _|| j_d| _| | _g | _d S NF)rT   factoryportallostConnectionr   packets)r1   rW   r#   r#   r$   __init__   s
   

zFakeTransport.__init__c                 C   s   | j ||f dS )z8
        Record the packet sent by the service.
        N)rY   append)r1   messageTypemessager#   r#   r$   
sendPacket   r5   zFakeTransport.sendPacketc                 C      dS )z
        Pretend that this transport encrypts traffic in both directions. The
        SSHUserAuthServer disables password authentication if the transport
        isn't encrypted.
        Tr#   )r1   	directionr#   r#   r$   isEncrypted   s   zFakeTransport.isEncryptedc                 C   s
   d| _ d S NT)rX   r0   r#   r#   r$   loseConnection   s   
zFakeTransport.loseConnectionN)
r   r    r!   r"   rO   rT   rZ   r^   ra   rc   r#   r#   r#   r$   rL   j   s    
rL   c                   @   rP   )Realmz
    A mock realm for testing L{userauth.SSHUserAuthServer}.

    This realm is not actually used in the course of testing, so it returns the
    simplest thing that could possibly work.
    c                 G   s   t |d d dd fS )Nr   c                   S   rI   rA   r#   r#   r#   r#   r$   <lambda>       z%Realm.requestAvatar.<locals>.<lambda>r8   )r1   avatarIdmind
interfacesr#   r#   r$   requestAvatar   s   zRealm.requestAvatarN)r   r    r!   r"   rj   r#   r#   r#   r$   rd      s    rd   c                   @      e Zd ZdZefZdd ZdS )PasswordCheckerz
    A very simple username/password checker which authenticates anyone whose
    password matches their username and rejects all others.
    c                 C   s&   |j |jkrt|j S ttdS )NzInvalid username/password pair)usernamepasswordr   r.   failr   )r1   credsr#   r#   r$   requestAvatarId   s   zPasswordChecker.requestAvatarIdN)r   r    r!   r"   r
   credentialInterfacesrq   r#   r#   r#   r$   rl          rl   c                   @   rk   )PrivateKeyCheckerz
    A very simple public key checker which authenticates anyone whose
    public/private keypair is the same keydata.public/privateRSA_openssh.
    c                 C   sX   |j tjtj  kr)|jd ur&tj|j }||j|jr#|j	S t t
 t rA   )rF   r   r+   r,   r   r-   	signatureverifysigDatarm   r   r   )r1   rp   objr#   r#   r$   rq      s   
z!PrivateKeyChecker.requestAvatarIdN)r   r    r!   r"   r	   rr   rq   r#   r#   r#   r$   rt      rs   rt   c                   @   rk   )AnonymousCheckerzI
    A simple checker which isn't supported by L{SSHUserAuthServer}.
    c                 C   rI   rA   r#   )r1   credentialsr#   r#   r$   rq      s   z AnonymousChecker.requestAvatarIdN)r   r    r!   r"   r   rr   rq   r#   r#   r#   r$   ry      s    ry   c                   @   s   e Zd ZdZedu rdZdd Zdd Zdd	 Zd
d Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zd d! Zd"d# Zd$d% Zd&d' Zd(d) Zd*d+ ZdS ),SSHUserAuthServerTestsz&
    Tests for SSHUserAuthServer.
    Ncannot run without cryptographyc                 C   sb   t  | _t| j| _| jt  | jt  t | _	t
| j| j	_| j	  | j	j  d S rA   )rd   realmr   rW   registerCheckerrl   rt   r   SSHUserAuthServer
authServerrL   r   rN   supportedAuthenticationssortr0   r#   r#   r$   setUp   s   

zSSHUserAuthServerTests.setUpc                 C      | j   d | _ d S rA   )r   serviceStoppedr0   r#   r#   r$   tearDown      

zSSHUserAuthServerTests.tearDownc                 C   s(   |  | jjjd tjtdd f dS )z;
        Check that the authentication has failed.
        s   password,publickey    N)assertEqualr   r   rY   r   MSG_USERAUTH_FAILUREr   r1   ignoredr#   r#   r$   _checkFailed   s   z#SSHUserAuthServerTests._checkFailedc                 C   s,   | j tdtd td }|| jS )z
        A client may request a list of authentication 'method name' values
        that may continue by using the "none" authentication 'method name'.

        See RFC 4252 Section 5.2.
        r7   s   servicerQ   )r   ssh_USERAUTH_REQUESTr   addCallbackr   )r1   dr#   r#   r$   test_noneAuthentication  s   z.SSHUserAuthServerTests.test_noneAuthenticationc                    sF   d tdtdtddtdg} j|} fdd}||S )z
        When provided with correct password authentication information, the
        server should respond by sending a MSG_USERAUTH_SUCCESS message with
        no other data.

        See RFC 4252, Section 5.1.
            r7   rQ      passwordr   c                          jjjtjdfg d S Nr   r   r   r   rY   r   MSG_USERAUTH_SUCCESSr   r0   r#   r$   check     
zKSSHUserAuthServerTests.test_successfulPasswordAuthentication.<locals>.check)joinr   r   r   r   r1   packetr   r   r#   r0   r$   %test_successfulPasswordAuthentication  s   $
z<SSHUserAuthServerTests.test_successfulPasswordAuthenticationc                 C   sh   d tdtdtddtdg}t | j_| j|}| | jjj	g  | jj
d || jS )a;  
        When provided with invalid authentication details, the server should
        respond by sending a MSG_USERAUTH_FAILURE message which states whether
        the authentication was partially successful, and provides other, open
        options for authentication.

        See RFC 4252, Section 5.1.
        r   r7   rQ   r   r      bar   )r   r   r   Clockr   clockr   r   r   rY   advancer   r   r1   r   r   r#   r#   r$   !test_failedPasswordAuthentication'  s   $
z8SSHUserAuthServerTests.test_failedPasswordAuthenticationc                    s   t jtj }t jtj}tdtd td d t|  t| }d j	j
_|tdttjf | }|t|7 } j	|} fdd}||S )zN
        Test that private key authentication completes successfully,
        r7   rQ   	   publickey      testc                    r   r   r   r   r0   r#   r$   r   M  r   zMSSHUserAuthServerTests.test_successfulPrivateKeyAuthentication.<locals>.check)r   r+   r,   r   r-   rF   r3   r   sshTyper   r   	sessionIDsignbytesr   MSG_USERAUTH_REQUESTr   r   )r1   rF   rx   r   ru   r   r   r#   r0   r$   'test_successfulPrivateKeyAuthentication8  s,   


z>SSHUserAuthServerTests.test_successfulPrivateKeyAuthenticationc                    s   t   dd }dd } fdd}| | jd| | | jd| | | jd	| td
td td td }| j| |  tS )z
        ssh_USERAUTH_REQUEST should raise a ConchError if tryAuth returns
        None. Added to catch a bug noticed by pyflakes.
        c                 S   s   |  d d S )Nz&request should have raised ConochError)ro   r   r#   r#   r$   mockCbFinishedAuth\  rK   zOSSHUserAuthServerTests.test_requestRaisesConchError.<locals>.mockCbFinishedAuthc                 S   rI   rA   r#   )kinduserdatar#   r#   r$   mockTryAuth_  rJ   zHSSHUserAuthServerTests.test_requestRaisesConchError.<locals>.mockTryAuthc                    s     | j d S rA   )errbackvalue)reasonr   r#   r$   mockEbBadAuthb  s   zJSSHUserAuthServerTests.test_requestRaisesConchError.<locals>.mockEbBadAuthtryAuth_cbFinishedAuth
_ebBadAuths   userrQ   s
   public-keys   data)r   Deferredpatchr   r   r   assertFailurer   )r1   r   r   r   r   r#   r   r$   test_requestRaisesConchErrorU  s    z3SSHUserAuthServerTests.test_requestRaisesConchErrorc                    sb   t jtj  tdtd td d td t  }j|} fdd}|	|S )z@
        Test that verifying a valid private key works.
        r7   rQ   r   r      ssh-rsac                    s*    jjjtjtdt  fg d S )Nr   )r   r   r   rY   r   MSG_USERAUTH_PK_OKr   r   rF   r1   r#   r$   r   ~  s   z@SSHUserAuthServerTests.test_verifyValidPrivateKey.<locals>.check)
r   r+   r,   r   r-   rF   r   r   r   r   r   r#   r   r$   test_verifyValidPrivateKeyo  s    
z1SSHUserAuthServerTests.test_verifyValidPrivateKeyc                 C   sV   t jtj }tdtd td d td t| }| j|}|	| j
S )d
        Test that private key authentication fails when the public key
        is invalid.
        r7   rQ   r   r   s   ssh-dsar   r+   r,   r   r/   rF   r   r   r   r   r   r1   rF   r   r   r#   r#   r$   3test_failedPrivateKeyAuthenticationWithoutSignature  s   zJSSHUserAuthServerTests.test_failedPrivateKeyAuthenticationWithoutSignaturec                 C   s|   t jtj }t jtj}tdtd td d td t| t|| }d| j	j
_| j	|}|| jS )r   r7   rQ   r   r   r   r   )r   r+   r,   r   r-   rF   r3   r   r   r   r   r   r   r   r   )r1   rF   rx   r   r   r#   r#   r$   0test_failedPrivateKeyAuthenticationWithSignature  s&   
	zGSSHUserAuthServerTests.test_failedPrivateKeyAuthenticationWithSignaturec                 C   sj   t jtj }td|dd  }tdtd td d td t| }| j|}|	| j
S )	z
        Private key authentication fails when the public key type is
        unsupported or the public key is corrupt.
        s   ssh-bad-type   Nr7   rQ   r   r   r   r   r   r#   r#   r$   test_unsupported_publickey  s    z1SSHUserAuthServerTests.test_unsupported_publickeyc                 C   sR   t  }t| j|_| jt  |  |  |j	
  | |j	ddg dS )ah  
        L{SSHUserAuthServer} sets up
        C{SSHUserAuthServer.supportedAuthentications} by checking the portal's
        credentials interfaces and mapping them to SSH authentication method
        strings.  If the Portal advertises an interface that
        L{SSHUserAuthServer} can't map, it should be ignored.  This is a white
        box test.
        r   r   N)r   r   rL   rW   r   r~   ry   rN   r   r   r   r   r1   serverr#   r#   r$    test_ignoreUnknownCredInterfaces  s   	
z7SSHUserAuthServerTests.test_ignoreUnknownCredInterfacesc                 C   s   |  d| jj t }t| j|_dd |j_|	  |
  | d|j t }t| j|_dd |j_|	  |
  |  d|j dS )z
        Test that the userauth service does not advertise password
        authentication if the password would be send in cleartext.
        r   c                 S   r_   rU   r#   xr#   r#   r$   re     rf   zISSHUserAuthServerTests.test_removePasswordIfUnencrypted.<locals>.<lambda>c                 S      | dkS Ninr#   r   r#   r#   r$   re         N)assertInr   r   r   r   rL   rW   r   ra   rN   r   assertNotIn)r1   clearAuthServerhalfAuthServerr#   r#   r$    test_removePasswordIfUnencrypted  s   z7SSHUserAuthServerTests.test_removePasswordIfUnencryptedc                 C   s   t | j}|t  t }t||_dd |j_|	  |
  | |jdg t }t||_dd |j_|	  |
  | |jdg dS )z
        If the L{SSHUserAuthServer} is not advertising passwords, then an
        unencrypted connection should not cause any warnings or exceptions.
        This is a white box test.
        c                 S   r_   rU   r#   r   r#   r#   r$   re     rf   zSSSHUserAuthServerTests.test_unencryptedConnectionWithoutPasswords.<locals>.<lambda>r   c                 S   r   r   r#   r   r#   r#   r$   re     r   N)r   r}   r~   rt   r   r   rL   r   ra   rN   r   r   r   )r1   rW   r   r   r#   r#   r$   *test_unencryptedConnectionWithoutPasswords  s   


zASSHUserAuthServerTests.test_unencryptedConnectionWithoutPasswordsc                 C   s   t  }t |_t| j|_|  |j	d |
  | |jjtjdttjf td td fg | |jj dS )z0
        Test that the login times out.
        鰚        s   you took too longr   N)r   r   r   r   r   rL   rW   r   rN   r   r   r   rY   MSG_DISCONNECTr   )DISCONNECT_NO_MORE_AUTH_METHODS_AVAILABLEr   
assertTruerX   r1   timeoutAuthServerr#   r#   r$   test_loginTimeout  s(   

z(SSHUserAuthServerTests.test_loginTimeoutc                 C   s\   t  }t |_t| j|_|  |	  |j
d | |jjg  | |jj dS )zN
        Test that stopping the service also stops the login timeout.
        r   N)r   r   r   r   r   rL   rW   r   rN   r   r   r   rY   assertFalserX   r   r#   r#   r$   test_cancelLoginTimeout  s   
z.SSHUserAuthServerTests.test_cancelLoginTimeoutc                    sn   d tdtdtddtdg}t  j_tdD ]} j|} jjd q fd	d
}|	|S )zm
        Test that the server disconnects if the client fails authentication
        too many times.
        r   r7   rQ   r   r   r      r   c                    s<      jjjd tjdttjf td td f d S )Nr   r   s   too many bad authsr   )r   r   r   rY   r   r   r   r   r   r0   r#   r$   r   1  s   
z:SSHUserAuthServerTests.test_tooManyAttempts.<locals>.check)
r   r   r   r   r   r   ranger   r   r   )r1   r   ir   r   r#   r0   r$   test_tooManyAttempts&  s   $
z+SSHUserAuthServerTests.test_tooManyAttemptsc                 C   sH   t dt d t d d t d }t | j_| j|}|| jS )zo
        If the user requests a service that we don't support, the
        authentication should fail.
        r7   r   r   r   )r   r   r   r   r   r   r   r   r   r#   r#   r$   test_failIfUnknownService?  s   $z0SSHUserAuthServerTests.test_failIfUnknownServicec                    sV   dd }   jd|    jdd  fdd} jddd} |t|S )	aZ  
        tryAuth() has two edge cases that are difficult to reach.

        1) an authentication method auth_* returns None instead of a Deferred.
        2) an authentication type that is defined does not have a matching
           auth_* method.

        Both these cases should return a Deferred which fails with a
        ConchError.
        c                 S   rI   rA   r#   )r   r#   r#   r$   mockAuthU  rJ   z>SSHUserAuthServerTests.test_tryAuthEdgeCases.<locals>.mockAuthauth_publickeyauth_passwordNc                    s    j dd d } |tS )Nr   )r   r   r   r   )r   d2r0   r#   r$   
secondTest[  s   z@SSHUserAuthServerTests.test_tryAuthEdgeCases.<locals>.secondTestr   )r   r   r   r   r   r   )r1   r   r   d1r#   r0   r$   test_tryAuthEdgeCasesI  s   z,SSHUserAuthServerTests.test_tryAuthEdgeCases)r   r    r!   r"   r   skipr   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r#   r#   r#   r$   r{      s0    	
r{   c                   @   s   e Zd ZdZedu rdZdd Zdd Zdd	 Zd
d Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zd d! ZdS )"SSHUserAuthClientTestsz&
    Tests for SSHUserAuthClient.
    Nr|   c                 C   s4   t dt | _td | j_d| jj_| j  d S )Nr7   r   )r)   rL   rO   
authClientr   r   rN   r0   r#   r#   r$   r   k  s   
zSSHUserAuthClientTests.setUpc                 C   r   rA   )r   r   r0   r#   r#   r$   r   q  r   zSSHUserAuthClientTests.tearDownc                 C   sT   |  | jjd |  | jjjd |  | jjjtjt	dt	d t	d fg dS )z;
        Test that client is initialized properly.
        r7   rM   rQ   N)
r   r   r   instancer=   r   rY   r   r   r   r0   r#   r#   r$   	test_initu  s   z SSHUserAuthClientTests.test_initc                    s@   dg  fdd}|| j j_| j d |  d | j j dS )z9
        Test that the client succeeds properly.
        Nc                    s   |  d< d S )Nr   r#   )rR   r   r#   r$   stubSetService  s   zDSSHUserAuthClientTests.test_USERAUTH_SUCCESS.<locals>.stubSetServicer   r   )r   r   
setServicessh_USERAUTH_SUCCESSr   r   )r1   r   r#   r   r$   test_USERAUTH_SUCCESS  s
   
z,SSHUserAuthClientTests.test_USERAUTH_SUCCESSc              	   C   s  | j tdd  | | j jjd tjtdtd td d td ttj	
tj  f | j tdd  ttj	
tj }| | j jjd tjtdtd td d td | f | j tdttj	
tj   t| j jjttjf td td td d td | }tj	
tj}| | j jjd tjtdtd td d td | t|| f d	S )
zJ
        Test that the client can authenticate with a public key.
        r   r   r   r7   rM   s   ssh-dssr      N)r   ssh_USERAUTH_FAILUREr   r   r   rY   r   r   r   r+   r,   r   r/   rF   r-   ssh_USERAUTH_PK_OKr   r   r3   r   )r1   rF   rw   rx   r#   r#   r$   test_publickey  s   

z%SSHUserAuthClientTests.test_publickeyc                 C   sz   t dt }td|_d|j_|  |d g |j_| |	d | 
|jjtjtdtd td fg dS )z
        If the SSHUserAuthClient doesn't return anything from signData,
        the client should start the authentication over again by requesting
        'none' authentication.
        r7   Nr   r   r   rM   rQ   )rH   rL   rO   r   r   rN   r   rY   assertIsNoner   r   r   r   r   )r1   r   r#   r#   r$   !test_publickey_without_privatekey  s   

z8SSHUserAuthClientTests.test_publickey_without_privatekeyc                    s.   dd  j _ j d} fdd}||S )z{
        If there's no public key, auth_publickey should return a Deferred
        called back with a False value.
        c                 S   rI   rA   r#   r   r#   r#   r$   re     rf   z:SSHUserAuthClientTests.test_no_publickey.<locals>.<lambda>r   c                    s     |  d S rA   )r   resultr0   r#   r$   r     rK   z7SSHUserAuthClientTests.test_no_publickey.<locals>.check)r   r2   r   r   )r1   r   r   r#   r0   r$   test_no_publickey  s   
z(SSHUserAuthClientTests.test_no_publickeyc                 C   s   | j tdd  | | j jjd tjtdtd td d td f | j tdtd  | | j jjd tjtdtd td d tdd  f d	S )
zx
        Test that the client can authentication with a password.  This
        includes changing the password.
        r   r   r   r7   rM   r   r   r   N)	r   r   r   r   r   rY   r   r   r   r0   r#   r#   r$   test_password  s   "&z$SSHUserAuthClientTests.test_passwordc                 C   s"   dd | j _| | j d dS )zK
        If getPassword returns None, tryAuth should return False.
        c                   S   rI   rA   r#   r#   r#   r#   r$   re     rf   z9SSHUserAuthClientTests.test_no_password.<locals>.<lambda>r   N)r   r:   r   r   r0   r#   r#   r$   test_no_password  s   z'SSHUserAuthClientTests.test_no_passwordc                 C   s`   | j tdtd td d td d  | | j jjd tjdtd td f dS )	zj
        Make sure that the client can authenticate with the keyboard
        interactive method.
        r   s      s
   Password: r   r   s      r7   N)r   'ssh_USERAUTH_PK_OK_keyboard_interactiver   r   r   rY   r   MSG_USERAUTH_INFO_RESPONSEr0   r#   r#   r$   test_keyboardInteractive  s&   z/SSHUserAuthClientTests.test_keyboardInteractivec                 C   sP   d| j _g | j j_| j d | | j jjtjtdtd td fg dS )z
        If C{SSHUserAuthClient} gets a MSG_USERAUTH_PK_OK packet when it's not
        expecting it, it should fail the current authentication and move on to
        the next type.
        s   unknownr   r7   rM   rQ   N)	r   lastAuthr   rY   r   r   r   r   r   r0   r#   r#   r$   "test_USERAUTH_PK_OK_unknown_method  s   
z9SSHUserAuthClientTests.test_USERAUTH_PK_OK_unknown_methodc                    s    fdd} fdd}| j _| j _ j tdd    j jjd tj	tdtd	 td
 d td f  j tdd    j jjdd ddg dS )z
        ssh_USERAUTH_FAILURE should sort the methods by their position
        in SSHUserAuthClient.preferredOrder.  Methods that are not in
        preferredOrder should be sorted at the end of that list.
        c                      s    j jdd d S )N      here is datar   r   r^   r#   r0   r#   r$   auth_firstmethod2  s   zNSSHUserAuthClientTests.test_USERAUTH_FAILURE_sorting.<locals>.auth_firstmethodc                      s    j jdd dS )N   
   other dataTr  r#   r0   r#   r$   auth_anothermethod5  s   zPSSHUserAuthClientTests.test_USERAUTH_FAILURE_sorting.<locals>.auth_anothermethods   anothermethod,passwordr   r   r7   rM   r   s"   firstmethod,anothermethod,passwordr   N)r  r  )r  r  )
r   r  r  r   r   r   r   rY   r   r   )r1   r  r  r#   r0   r$   test_USERAUTH_FAILURE_sorting+  s$   "
z4SSHUserAuthClientTests.test_USERAUTH_FAILURE_sortingc                 C   sT   | j tdd  | j tdd  | | j jjd tjdtd d f dS )	z
        If there are no more available user authentication messages,
        the SSHUserAuthClient should disconnect with code
        DISCONNECT_NO_MORE_AUTH_METHODS_AVAILABLE.
        r   r   r   r   s      s(   no more authentication methods availables       N)r   r   r   r   r   rY   r   r0   r#   r#   r$   %test_disconnectIfNoMoreAuthenticationO  s   z<SSHUserAuthClientTests.test_disconnectIfNoMoreAuthenticationc                 C   sH   g | j j_| j d | | j jjtjtdtd td fg dS )z
        _ebAuth (the generic authentication error handler) should send
        a request for the 'none' authentication method.
        Nr7   rM   rQ   )r   r   rY   _ebAuthr   r   r   r   r0   r#   r#   r$   test_ebAutha  s   
z"SSHUserAuthClientTests.test_ebAuthc                    s`   t dt      fdd} fdddd   }|j	|S )z
        getPublicKey() should return None.  getPrivateKey() should return a
        failed Deferred.  getPassword() should return a failed Deferred.
        getGenericAnswers() should return a failed Deferred.
        r7   c                    s$   |  t   }|jS rA   )trapNotImplementedErrorr:   r   ro   
addErrbackr  r   )r   check2r1   r#   r$   r   v  s   
z3SSHUserAuthClientTests.test_defaults.<locals>.checkc                    s*   |  t  d d d }|jS rA   )r  r  r@   r   ro   r  r  )r   check3r1   r#   r$   r  {  s   
z4SSHUserAuthClientTests.test_defaults.<locals>.check2c                 S   s   |  t d S rA   )r  r  r  r#   r#   r$   r    rK   z4SSHUserAuthClientTests.test_defaults.<locals>.check3)
r   r(   rL   rO   r   r2   r4   r   ro   r  )r1   r   r   r#   )r   r  r  r1   r$   test_defaultsm  s   z$SSHUserAuthClientTests.test_defaults)r   r    r!   r"   r   r   r   r   r   r   r   r   r  r  r  r  r
  r  r  r  r  r#   r#   r#   r$   r   c  s&    >$r   c                   @   s.   e Zd Zedu r
dZG dd dZdd ZdS )LoopbackTestsN)cannot run without cryptography or PyASN1c                   @   s"   e Zd ZG dd dZdd ZdS )zLoopbackTests.Factoryc                   @   rB   )zLoopbackTests.Factory.Service   TestServicec                 C   s   | j   d S rA   )r   rc   r0   r#   r#   r$   rN     rK   z,LoopbackTests.Factory.Service.serviceStartedc                 C   rI   rA   r#   r0   r#   r#   r$   r     rJ   z,LoopbackTests.Factory.Service.serviceStoppedN)r   r    r!   r=   rN   r   r#   r#   r#   r$   rO     s    rO   c                 C   s   | j S rA   )rO   )r1   avatarr=   r#   r#   r$   rS     s   z LoopbackTests.Factory.getServiceN)r   r    r!   rO   rS   r#   r#   r#   r$   rT     s    	rT   c                    s   t  tdj }t _j_dd j_t |_||j_d j_	|j_	dd  j_
|j_
 j_d_t }t|}t   t   t   fdd _|  |jj_tj|j}dd jj_d	d |jj_  |  fd
d}||S )zW
        Test that the userauth server and client play nicely with each other.
        r7   c                 S   r_   rb   r#   r   r#   r#   r$   re     rf   z-LoopbackTests.test_loopback.<locals>.<lambda>r   c                   S   rI   rA   r#   r#   r#   r#   r$   re     rf   r   c                    s   t  j|  dkS )Nr   )lensuccessfulCredentials)aId)checkerr#   r$   re     s    c                   S   r_   )N_ServerLoopbackr#   r#   r#   r#   r$   re     rf   c                   S   r_   )N_ClientLoopbackr#   r#   r#   r#   r$   re     rf   c                    s     jjjd d S )Nr   )r   r   rR   r=   r   r   r#   r$   r     rE   z*LoopbackTests.test_loopback.<locals>.check)r   r   r)   rT   rO   r   r%   rR   ra   r   sendKexInitrV   passwordDelayrd   r   r   r~   rl   rt   areDonerW   r   loopbackAsync	logPrefixrN   r   )r1   clientr}   rW   r   r   r#   )r%  r1   r   r$   test_loopback  s4   




zLoopbackTests.test_loopback)r   r    r!   r   r   rT   r.  r#   r#   r#   r$   r    s
    r  c                   @   s    e Zd Zedu r
dZdd ZdS )ModuleInitializationTestsNr  c                 C   s,   |  tjjd d |  tjjd d d S )N<   r   )r   r   r   protocolMessagesr(   r0   r#   r#   r$   test_messages  s   z'ModuleInitializationTests.test_messages)r   r    r!   r   r   r2  r#   r#   r#   r$   r/    s    r/  )8r"   typesr   typingr   zope.interfacer   twisted.conch.errorr   r   twisted.cred.checkersr   twisted.cred.credentialsr   r	   r
   twisted.cred.errorr   twisted.cred.portalr   r   twisted.internetr   r   twisted.protocolsr   twisted.python.reflectr   twisted.trialr   r   __annotations__twisted.conch.checkersr   twisted.conch.sshr   r   twisted.conch.ssh.commonr   twisted.conch.testr   r(   r)   rC   rH   r%   rL   rd   rl   rt   ry   TestCaser{   r   r  r/  r#   r#   r#   r$   <module>   sR   #A  }  &<