o
    `                     @   s   d dl Z d dlmZmZmZ d dlmZmZ d dlm	Z	m
Z
 ddlmZ e jje ddZe jd	d
 Zdd ZeddddZdd Zdd Zdd Zdd Zdd ZdS )    N)new_method_callMessageTypeDBusAddress)message_bus	MatchRule)open_dbus_connectionProxy   )have_session_buszTests require DBus session bus)reasonc                  c   s8    t dd} | V  W d    d S 1 sw   Y  d S )NSESSION)bus)r   )conn r   @/usr/lib/python3/dist-packages/jeepney/io/tests/test_blocking.pysession_conn   s   "r   c                 C   s   | j dsJ d S )N:)unique_name
startswith)r   r   r   r   test_connect   s   r   zorg.freedesktop.DBusz/org/freedesktop/DBuszorg.freedesktop.DBus.Peer)bus_nameobject_path	interfacec                 C   sd   t td}| j|ddd}|jjtjksJ |jdksJ t td}| j|ddd}|dks0J d S )NPing   F)timeoutunwrapr   T)r   bus_peersend_and_get_replyheadermessage_typer   method_returnbody)r   	ping_callreply
reply_bodyr   r   r   test_send_and_get_reply   s   

r&   c                 C   sH   t t| dd}d}||}|dv sJ |j|dd\}|du s"J d S )Nr   r   z+io.gitlab.takluyver.jeepney.examples.Server>   r	         )_timeoutT)r   r   RequestNameNameHasOwner)r   proxynameres	has_ownerr   r   r   
test_proxy%   s   
r3   c                 C   s   t t| }d}tdtjtjdtjd}|d| || | |'}|	|\}|dks0J | j
|dd}|j|d	| jfksBJ W d    d S 1 sMw   Y  d S )
Nz6io.gitlab.takluyver.jeepney.tests.blocking_test_filtersignalNameOwnerChanged)typesenderr   memberpathr   r	   r*   r'    )r   r   r   r   r   r   add_arg_conditionAddMatchfilterr-   recv_until_filteredr"   r   )r   r   r0   
match_rulematchesr1   
signal_msgr   r   r   test_filter.   s"   

"rB   c                 C   s   t | d}tddd}|j|dd}W d    n1 sw   Y  |jjtju s+J |jd d}|	 d	ks<J W d    d S 1 sGw   Y  d S )
NGetFDr   Tr   
enable_fdsr   r'   r   zw+readme)
r   r   r   r   r    r   r!   r"   to_fileread)respond_with_fd
getfd_callr   r$   fr   r   r   test_recv_fdG   s   
"rL   c                 C   sz   | \}}t |dd|f}tddd}|j|dd}W d    n1 s$w   Y  |jjtju s2J |jd |ks;J d S )	NReadFDhr   TrD   r   r'   r   )r   r   r   r   r    r   r!   r"   )temp_file_and_contentsread_from_fd	temp_filedatareadfd_callr   r$   r   r   r   test_send_fdQ   s   rT   )pytestjeepneyr   r   r   jeepney.bus_messagesr   r   jeepney.io.blockingr   r   utilsr
   markskipif
pytestmarkfixturer   r   r   r&   r3   rB   rL   rT   r   r   r   r   <module>   s*    

	
