o
    `                     @   sR  d dl mZmZmZ d dlmZ d dlZd dlmZ d dl	m
Z
 d dlmZ d dlmZ d dlmZ d d	lmZmZ d d
lmZmZmZmZ d dlmZ d dlmZmZmZmZ d dl m!Z!m"Z" d dl#m$Z$ d dl%m&Z& ddl'm(Z(m)Z)m*Z*m+Z+m,Z, e
ddd G dd dZ-d"ddZ.G dd dZ/G dd de!Z0G dd dZ1d"d d!Z2dS )#    )as_completedFuturewait_for)countN)Optional)warn)IOLoop)IOStream)Event)Queue	QueueFull)
SASLParsermake_auth_externalBEGINAuthenticationError)get_bus)ParserMessageTypeMessageMessageFlag)	ProxyBase
unwrap_msg)Router)message_bus   )MessageFiltersFilterHandleReplyMatcherRouterClosedcheck_replyablezsjeepney.io.tornado is deprecated. Tornado is now built on top of asyncio, so please use jeepney.io.asyncio instead.   )
stacklevelc                   @   sD   e Zd ZdefddZdddefddZd	efd
dZdd ZdS )DBusConnectionstreamc                 C   s$   || _ t | _tdd| _d | _d S )Nr   )start)r#   r   parserr   outgoing_serialunique_name)selfr#    r)   4/usr/lib/python3/dist-packages/jeepney/io/tornado.py__init__   s   
zDBusConnection.__init__Nserialmessagec                   s0   |d u r
t | j}| j||I d H  d S N)nextr&   r#   write	serialiser(   r.   r-   r)   r)   r*   send!   s   
zDBusConnection.sendreturnc                    s>   	 | j  }|d ur|S | jjdddI d H }| j | q)NTi   partial)r%   get_next_messager#   
read_bytesadd_data)r(   msgbr)   r)   r*   receive'   s   
zDBusConnection.receivec                 C      | j   d S r/   )r#   closer(   r)   r)   r*   r?   0      zDBusConnection.close)	__name__
__module____qualname__r	   r+   r   r4   r=   r?   r)   r)   r)   r*   r"      s
    	r"   SESSIONc                    s   t | }ttjtjd}||I d H  |dt  I d H  t }|js?|	|j
dddI d H  |jr<t|j|jr'|tI d H  t|}t|}ttt| dI d H }|d |_W d    |S 1 smw   Y  |S )N)family    i   Tr6   
   r   )r   r	   socketAF_UNIXconnectr1   r   r   authenticatedfeedr9   errorr   r   r"   
DBusRouterr   Proxyr   Hellor'   )busbus_addrr#   auth_parserconnrouter
reply_bodyr)   r)   r*   open_dbus_connection4   s(   


rX   c                   @   s   e Zd ZdefddZddddZdd	 Zdd
ddee fddZ	dd Z
dd Zdd Zedd ZdefddZdefddZdd ZdS )rO   rU   c                 C   s<   || _ t | _t | _t | _t 	| j
 tt| _d S r/   )rU   r   _repliesr   _filtersr
   _stop_receivingr   currentadd_callback	_receiverr   r   rV   )r(   rU   r)   r)   r*   r+   L   s   zDBusRouter.__init__Nr,   c                   s   | j j||dI d H  d S )Nr,   )rU   r4   r3   r)   r)   r*   r4   V   s   zDBusRouter.sendc                    sz   t | | j rtdt| jj}| j|t	 }| j
||dI d H  |I d H W  d    S 1 s6w   Y  d S )NzThis DBusRouter has stoppedr,   )r   r[   is_setr   r0   rU   r&   rY   catchr   r4   )r(   r.   r-   	reply_futr)   r)   r*   send_and_get_replyY   s   
$zDBusRouter.send_and_get_replyr   )queuebufsizerc   c                C   s   t | j||p	t|S )a  Create a filter for incoming messages

        Usage::

            with router.filter(rule) as queue:
                matching_msg = await queue.get()

        :param jeepney.MatchRule rule: Catch messages matching this rule
        :param tornado.queues.Queue queue: Matched messages will be added to this
        :param int bufsize: If no queue is passed in, create one with this size
        )r   rZ   r   )r(   rulerc   rd   r)   r)   r*   filterd   s   zDBusRouter.filterc                 C   r>   r/   )r[   setr@   r)   r)   r*   stopr   rA   zDBusRouter.stopc                 C   s   | S r/   r)   r@   r)   r)   r*   	__enter__u   s   zDBusRouter.__enter__c                 C   s   |    dS )NF)rh   r(   exc_typeexc_valexc_tbr)   r)   r*   __exit__x   s   zDBusRouter.__exit__c                 C   s   | j jS r/   )rU   r'   r@   r)   r)   r*   r'   ~   s   zDBusRouter.unique_namer.   c                    sF   |j jtjkr|j jtj@ st| |I d H S | 	|I d H  d S r/   )
headermessage_typer   method_returnflagsr   no_reply_expectedr   rb   r4   )r(   r.   r)   r)   r*   send_message   s   zDBusRouter.send_messager;   c              	   C   sH   | j |rdS | j|D ]}z|j| W q ty!   Y qw dS )zHandle one received messageN)rY   dispatchrZ   matchesrc   
put_nowaitr   )r(   r;   rf   r)   r)   r*   	_dispatch   s   zDBusRouter._dispatchc                    sx   z1	 t | j | j gD ]"}|I dH }|du r& W d| _| j  dS | | | j	
| qqd| _| j  w )z'Receiver loop - runs in a separate taskTNF)r   rU   r=   r[   wait
is_runningrY   drop_allrx   rV   incoming)r(   coror;   r)   r)   r*   r^      s   

zDBusRouter._receiver)rB   rC   rD   r"   r+   r4   rb   r   r   rf   rh   ri   rn   propertyr'   r   rt   rx   r^   r)   r)   r)   r*   rO   K   s    

rO   c                       s2   e Zd Zdef fddZdd Zdd Z  ZS )rP   rV   c                    s   t  | || _d S r/   )superr+   _router)r(   msggenrV   	__class__r)   r*   r+      s   
zProxy.__init__c                 C   s   d | j| jS )NzProxy({}, {}))format_msggenr   r@   r)   r)   r*   __repr__   s   zProxy.__repr__c                    s    fdd}|S )Nc                     s8    | i |}|j jtju sJ tj|I d H S r/   )ro   rp   r   method_callr   r   rb   )argskwargsr;   make_msgr(   r)   r*   inner   s   z!Proxy._method_call.<locals>.innerr)   )r(   r   r   r)   r   r*   _method_call   s   zProxy._method_call)rB   rC   rD   rO   r+   r   r   __classcell__r)   r)   r   r*   rP      s    rP   c                   @   s.   e Zd ZdZdZd	ddZdd Zdd ZdS )
_RouterContextNrE   c                 C   s
   || _ d S r/   rR   )r(   rR   r)   r)   r*   r+      s   
z_RouterContext.__init__c                    s&   t | jI d H | _t| j| _| jS r/   )rX   rR   rU   rO   rV   r@   r)   r)   r*   
__aenter__   s   z_RouterContext.__aenter__c                    s   | j   | j  d S r/   )rV   rh   rU   r?   rj   r)   r)   r*   	__aexit__   s   
z_RouterContext.__aexit__rE   )rB   rC   rD   rU   rV   r+   r   r   r)   r)   r)   r*   r      s    
r   c                 C   s   t | S )a  Open a D-Bus 'router' to send and receive messages.

    Use as an async context manager::

        async with open_dbus_router() as req:
            ...

    :param str bus: 'SESSION' or 'SYSTEM' or a supported address.
    :return: :class:`DBusRouter`

    This is a shortcut for::

        conn = await open_dbus_connection()
        async with conn:
            async with conn.router() as req:
                ...
    )r   r   r)   r)   r*   open_dbus_router   s   r   r   )3asyncior   r   r   	itertoolsr   rI   typingr   warningsr   tornado.ioloopr   tornado.iostreamr	   tornado.locksr
   tornado.queuesr   r   jeepney.authr   r   r   r   jeepney.busr   jeepney.low_levelr   r   r   r   jeepney.wrappersr   r   jeepney.routingr   jeepney.bus_messagesr   commonr   r   r   r   r   r"   rX   rO   rP   r   r   r)   r)   r)   r*   <module>   s2    
^