o
    cE                     @   s  d Z ddlZddlZddlZddlZddlZddlZddlZddlm	Z	 ddlm
Z
 ddlmZ ddlmZ ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlZddlmZ ddlmZ ddlmZ eeZejZG dd dZG dd dZddeddfde de de!de!de!dee"e!f deee   dej#fddZ$		 	d?d!e d"eeee" ee" f  d#e%d$eeeej&ej'f   de f
d%d&Z(d'eej#ej)f dee" fd(d)Z*d*eej#ej)f dee" fd+d,Z+d*eej#ej)f dee" fd-d.Z,d*eej#ej)f dee" fd/d0Z-			2		d@d3ej.d"eee"  d4ee! d5e!d6e%d7eeej/  d8eeeej&ej'f   dej#fd9d:Z0ej1fd;eeej2 eej# f d<e!de fd=d>Z3dS )AzCrypto utilities.    N)Any)Callable)List)Mapping)Optional)Sequence)Set)Tuple)Union)crypto)SSL)errorsc                   @   sR   e Zd Zdeeeejejf f fddZ	de
jdeeejejf  fddZdS )	_DefaultCertSelectioncertsc                 C   
   || _ d S N)r   )selfr    r   =/opt/certbot/lib/python3.10/site-packages/acme/crypto_util.py__init__&      
z_DefaultCertSelection.__init__
connectionreturnc                 C   s   |  }|r| j|d S d S r   )get_servernamer   get)r   r   server_namer   r   r   __call__)   s   z_DefaultCertSelection.__call__N)__name__
__module____qualname__r   bytesr	   r   PKeyX509r   r   
Connectionr   r   r   r   r   r   r   %   s    "(r   c                   @   s   e Zd ZdZdeddfdejdeeee	e
je
jf f  dedeeejee gef  deeejgee	e
je
jf  f  ddfd	d
ZdedefddZdejddfddZG dd dZde	eef fddZdS )	SSLSocketa  SSL wrapper for sockets.

    :ivar socket sock: Original wrapped socket.
    :ivar dict certs: Mapping from domain names (`bytes`) to
        `OpenSSL.crypto.X509`.
    :ivar method: See `OpenSSL.SSL.Context` for allowed values.
    :ivar alpn_selection: Hook to select negotiated ALPN protocol for
        connection.
    :ivar cert_selection: Hook to select certificate for connection. If given,
        `certs` parameter would be ignored, and therefore must be empty.

    Nsockr   methodalpn_selectioncert_selectionr   c                 C   sX   || _ || _|| _|s|std|r|rtd|}|d u r't|r$|ni }|| _d S )Nz*Neither cert_selection or certs specified.z(Both cert_selection and certs specified.)r%   r'   r&   
ValueErrorr   r(   )r   r%   r   r&   r'   r(   actual_cert_selectionr   r   r   r   =   s   
zSSLSocket.__init__namec                 C      t | j|S r   )getattrr%   r   r+   r   r   r   __getattr__T      zSSLSocket.__getattr__r   c                 C   s   |  |}|du rtd|  dS |\}}t| j}|tj |tj	 |
| || | jdur>|| j || dS )a  SNI certificate callback.

        This method will set a new OpenSSL context object for this
        connection when an incoming connection provides an SNI name
        (in order to serve the appropriate certificate, if any).

        :param connection: The TLS connection object on which the SNI
            extension was received.
        :type connection: :class:`OpenSSL.Connection`

        Nz=Certificate selection for server name %s failed, dropping SSL)r(   loggerdebugr   r   Contextr&   set_optionsOP_NO_SSLv2OP_NO_SSLv3use_privatekeyuse_certificater'   set_alpn_select_callbackset_context)r   r   pairkeycertnew_contextr   r   r   _pick_certificate_cbW   s   



zSSLSocket._pick_certificate_cbc                   @   sH   e Zd ZdZdejddfddZdedefdd	Z	d
ede
fddZdS )zSSLSocket.FakeConnectionzFake OpenSSL.SSL.Connection.r   r   Nc                 C   r   r   )_wrapped)r   r   r   r   r   r   w   r   z!SSLSocket.FakeConnection.__init__r+   c                 C   r,   r   )r-   r@   r.   r   r   r   r/   z   r0   z$SSLSocket.FakeConnection.__getattr__unused_argsc              
   G   s2   z| j  W S  tjy } zt|d }~ww r   )r@   shutdownr   Errorsocketerror)r   rA   rE   r   r   r   rB   }   s   
z!SSLSocket.FakeConnection.shutdown)r   r   r   __doc__r   r#   r   strr   r/   boolrB   r   r   r   r   FakeConnectionr   s
    rI   c              
   C   s   | j  \}}t| j}|tj |tj || j	 | j
d ur*|| j
 | t||}|  td| z	|  W ||fS  tjyX } zt|d }~ww )NzPerforming handshake with %s)r%   acceptr   r3   r&   r4   r5   r6   set_tlsext_servername_callbackr?   r'   r9   rI   r#   set_accept_stater1   r2   do_handshakerC   rD   rE   )r   r%   addrcontextssl_sockrE   r   r   r   rJ      s"   


zSSLSocket.accept)r   r   r   rF   _DEFAULT_SSL_METHODrD   r   r   r    r	   r   r!   r"   intr   r   r#   r   r   rG   r   r/   r?   rI   rJ   r   r   r   r   r$   0   s2    


r$   i  i,  ) r   r+   hostporttimeoutr&   source_addressalpn_protocolsr   c                 C   s4  t |}|| d|i}z%td||t|r"d|d |d nd ||f}	tj|	fi |}
W n tj	yE } zt
|d}~ww t|
=}t ||}|  ||  |durd|| z
|  |  W n t jy } zt
|d}~ww W d   n1 sw   Y  | }|sJ |S )a	  Probe SNI server for SSL certificate.

    :param bytes name: Byte string to send as the server name in the
        client hello message.
    :param bytes host: Host to connect to.
    :param int port: Port to connect to.
    :param int timeout: Timeout in seconds.
    :param method: See `OpenSSL.SSL.Context` for allowed values.
    :param tuple source_address: Enables multi-path probing (selection
        of source interface). See `socket.creation_connection` for more
        info. Available only in Python 2.7+.
    :param alpn_protocols: Protocols to request using ALPN.
    :type alpn_protocols: `Sequence` of `bytes`

    :raises acme.errors.Error: In case of any problems.

    :returns: SSL certificate presented by the server.
    :rtype: OpenSSL.crypto.X509

    rW   z!Attempting to connect to %s:%d%s.z from {0}:{1}r      rS   N)r   r3   set_timeoutr1   r2   anyformatrD   create_connectionrE   r   rC   
contextlibclosingr#   set_connect_stateset_tlsext_host_nameset_alpn_protosrM   rB   get_peer_certificate)r+   rT   rU   rV   r&   rW   rX   rO   socket_kwargssocket_tupler%   rE   client
client_sslr=   r   r   r   	probe_sni   sJ   





rh   Fprivate_key_pemdomainsmust_stapleipaddrsc                 C   s   t t j| }t  }g }|du rg }|du rg }t|t| dkr'td|D ]	}|d|  q)|D ]
}|d|j  q5d|	d}	t j
dd	|	d
g}
|r^|
t j
dd	dd
 ||
 || |d ||d t t j|S )a  Generate a CSR containing domains or IPs as subjectAltNames.

    :param buffer private_key_pem: Private key, in PEM PKCS#8 format.
    :param list domains: List of DNS names to include in subjectAltNames of CSR.
    :param bool must_staple: Whether to include the TLS Feature extension (aka
        OCSP Must Staple: https://tools.ietf.org/html/rfc7633).
    :param list ipaddrs: List of IPaddress(type ipaddress.IPv4Address or ipaddress.IPv6Address)
    names to include in subbjectAltNames of CSR.
    params ordered this way for backward competablity when called by positional argument.
    :returns: buffer PEM-encoded Certificate Signing Request.
    Nr   zAAt least one of domains or ipaddrs parameter need to be not emptyDNS:IP:, ascii   subjectAltNameFcriticalvalues   1.3.6.1.5.5.7.1.24s   DER:30:03:02:01:05sha256)r   load_privatekeyFILETYPE_PEMX509Reqlenr)   appendexplodedjoinencodeX509Extensionadd_extensions
set_pubkeyset_versionsigndump_certificate_request)ri   rj   rk   rl   private_keycsrsanlistaddressips
san_string
extensionsr   r   r   make_csr   sF   


r   loaded_cert_or_reqc                    s6   |   j t| } d u r|S  g fdd|D  S )Nc                    s   g | ]}| kr|qS r   r   ).0dcommon_namer   r   
<listcomp>  s    z4_pyopenssl_cert_or_req_all_names.<locals>.<listcomp>)get_subjectCN_pyopenssl_cert_or_req_san)r   sansr   r   r    _pyopenssl_cert_or_req_all_names  s
   
r   cert_or_reqc                    s(   d d  t | } fdd|D S )a  Get Subject Alternative Names from certificate or CSR using pyOpenSSL.

    .. todo:: Implement directly in PyOpenSSL!

    .. note:: Although this is `acme` internal API, it is used by
        `letsencrypt`.

    :param cert_or_req: Certificate or CSR.
    :type cert_or_req: `OpenSSL.crypto.X509` or `OpenSSL.crypto.X509Req`.

    :returns: A list of Subject Alternative Names that is DNS.
    :rtype: `list` of `str`

    :DNSc                    s$   g | ]}| r| d  qS )rY   )
startswithsplitr   partpart_separatorprefixr   r   r   4  s    
z._pyopenssl_cert_or_req_san.<locals>.<listcomp>_pyopenssl_extract_san_list_raw)r   
sans_partsr   r   r   r     s   r   c                    s&   d}d|  t | } fdd|D S )ae  Get Subject Alternative Names IPs from certificate or CSR using pyOpenSSL.

    :param cert_or_req: Certificate or CSR.
    :type cert_or_req: `OpenSSL.crypto.X509` or `OpenSSL.crypto.X509Req`.

    :returns: A list of Subject Alternative Names that are IP Addresses.
    :rtype: `list` of `str`. note that this returns as string, not IPaddress object

    r   z
IP Addressc                    s&   g | ]}|  r|t d  qS r   )r   ry   r   r   r   r   r   I  s   & z1_pyopenssl_cert_or_req_san_ip.<locals>.<listcomp>r   )r   r   r   r   r   r   _pyopenssl_cert_or_req_san_ip8  s   r   c                 C   sj   t | tjrttj| d}n
ttj| d}td|}d}|du r+g }|S |	d
|}|S )a  Get raw SAN string from cert or csr, parse it as UTF-8 and return.

    :param cert_or_req: Certificate or CSR.
    :type cert_or_req: `OpenSSL.crypto.X509` or `OpenSSL.crypto.X509Req`.

    :returns: raw san strings, parsed byte as utf-8
    :rtype: `list` of `str`

    zutf-8z5X509v3 Subject Alternative Name:(?: critical)?\s*(.*)ro   NrY   )
isinstancer   r"   dump_certificateFILETYPE_TEXTdecoder   researchgroupr   )r   textraw_sanparts_separatorr   r   r   r   r   L  s   r   :	 Tr<   
not_beforevalidity	force_sanr   r   c                 C   sZ  |s|sJ dt  }|tttdd |d |du r%g }|du r+g }|du r1g }|	t 
ddd t|dkrH|d | _||  g }|D ]	}	|	d	|	  qS|D ]
}
|	d
|
j  q_d|d}|st|dkst|dkr|	t j
dd|d || ||du rdn| || ||  || d |S )at  Generate new self-signed certificate.

    :type domains: `list` of `str`
    :param OpenSSL.crypto.PKey key:
    :param bool force_san:
    :param extensions: List of additional extensions to include in the cert.
    :type extensions: `list` of `OpenSSL.crypto.X509Extension`
    :type ips: `list` of (`ipaddress.IPv4Address` or `ipaddress.IPv6Address`)

    If more than one domain is provided, all of the domains are put into
    ``subjectAltName`` X.509 extension and first domain is set as the
    subject CN. If only one domain is provided no ``subjectAltName``
    extension is used, unless `force_san` is ``True``.

    z7Must provide one or more hostnames or IPs for the cert.      Ns   basicConstraintsTs   CA:TRUE, pathlen:0r   rm   rn   ro   rp   rY   rq   Frr   ru   )r   r"   set_serial_numberrR   binasciihexlifyosurandomr   rz   r~   ry   r   r   
set_issuerr{   r|   r}   r   gmtime_adj_notBeforegmtime_adj_notAfterr   r   )r<   rj   r   r   r   r   r   r=   r   r   ipr   r   r   r   gen_ss_certk  sH   



r   chainfiletypec                    s:   dt tjtjf dtffdd d fdd| D S )zDump certificate chain into a bundle.

    :param list chain: List of `OpenSSL.crypto.X509` (or wrapped in
        :class:`josepy.util.ComparableX509`).

    :returns: certificate chain bundle
    :rtype: bytes

    r=   r   c                    s6   t | tjrt | jtjrtd| j} t | S )NzUnexpected CSR provided.)	r   joseComparableX509wrappedr   rx   r   rC   r   )r=   )r   r   r   
_dump_cert  s
   
z(dump_pyopenssl_chain.<locals>._dump_cert    c                 3   s    | ]} |V  qd S r   r   )r   r=   )r   r   r   	<genexpr>  s    z'dump_pyopenssl_chain.<locals>.<genexpr>)r
   r   r   r   r"   r    r|   )r   r   r   )r   r   r   dump_pyopenssl_chain  s   "	r   )NFN)NNr   TNN)4rF   r   r^   	ipaddressloggingr   r   rD   typingr   r   r   r   r   r   r   r	   r
   josepyr   OpenSSLr   r   acmer   	getLoggerr   r1   SSLv23_METHODrQ   r   r$   r    rR   rG   r"   rh   rH   IPv4AddressIPv6Addressr   rx   r   r   r   r   r!   r~   r   rw   r   r   r   r   r   r   <module>   s    
	r


9 
7
"""
C