HEX
Server: Apache/2.4.58 (Ubuntu)
System: Linux ns3133907 6.8.0-86-generic #87-Ubuntu SMP PREEMPT_DYNAMIC Mon Sep 22 18:03:36 UTC 2025 x86_64
User: cssnetorguk (1024)
PHP: 8.2.28
Disabled: NONE
Upload Files
File: //lib/python3/dist-packages/twisted/test/__pycache__/test_iutils.cpython-312.pyc
�

Ϫ�f@5����dZddlZddlZddlZddlZddlZddlmZddlm	Z	m
Z
mZmZddl
mZddlmZddlmZddlmZmZGd	�d
e�ZGd�de�ZGd
�de�Zy)zD
Test running processes with the APIs in L{twisted.internet.utils}.
�N)�skipIf)�error�
interfaces�reactor�utils)�Deferred)�platform)�SuppressedWarningsTests)�SynchronousTestCase�TestCasec���eZdZdZej
ed��dZdZdZ	e
jZd�Z
d�Zd�Zd�Zd�Zd	�Zeej*�d
�d��Zd�Zd
�Zd�Zd�Zd�Zd�Zd�Zd�Zd�Zy)�ProcessUtilsTestszt
    Test running a process using L{getProcessOutput}, L{getProcessValue}, and
    L{getProcessOutputAndValue}.
    Nz)reactor doesn't implement IReactorProcessc� �|j�}t|d�5}|jtjj|�tjz�ddd�tjj|�S#1swY�(xYw)zj
        Write the given list of lines to a text file and return the absolute
        path to it.
        �wtN)�mktemp�open�write�os�linesep�join�path�abspath)�self�sourceLines�script�
scriptFiles    �:/usr/lib/python3/dist-packages/twisted/test/test_iutils.py�makeSourceFilez ProcessUtilsTests.makeSourceFile$sk��
�����
�&�$�
�	H�:����R�Z�Z�_�_�[�9�B�J�J�F�G�	H��w�w���v�&�&�	H�	H�s�AB�B
c��|jgd��}tj|jd|g�}|j	|j
d�S)z�
        L{getProcessOutput} returns a L{Deferred} which fires with the complete
        output of the process it runs after that process exits.
        )�
import syszfor s in b'hello world\n':z    s = bytes([s])z    sys.stdout.buffer.write(s)z    sys.stdout.flush()�-ushello world
�rr�getProcessOutput�exe�addCallback�assertEqual�rr�ds   r�test_outputzProcessUtilsTests.test_output.sN��
�(�(�
�
�
�
�"�"�4�8�8�d�J�-?�@���}�}�T�-�-�/?�@�@�c�����jddg�}tj�jd|g�}�j	|t
�}�fd�}|j
|�|S)z�
        The L{Deferred} returned by L{getProcessOutput} is fired with an
        L{IOError} L{Failure} if the child process writes to stderr.
        r z!sys.stderr.write("hello world\n")r!c�X���j|jtj�S�N)�
assertFailure�processEndedr�ProcessDone)�errrs �r�cbFailedz?ProcessUtilsTests.test_outputWithErrorIgnored.<locals>.cbFailedLs"����%�%�c�&6�&6��8I�8I�J�Jr*)rrr#r$r.�IOErrorr%)rrr(r2s`   r�test_outputWithErrorIgnoredz-ProcessUtilsTests.test_outputWithErrorIgnored?sf����(�(�
�?�@�
�
�
�"�"�4�8�8�d�J�-?�@�����q�'�*��	K�	
�
�
�h���r*c��|jgd��}tj|jd|gd��}|j	|j
d�S)z�
        If a C{True} value is supplied for the C{errortoo} parameter to
        L{getProcessOutput}, the returned L{Deferred} fires with the child's
        stderr output as well as its stdout output.
        )r zsys.stdout.write("foo")�sys.stdout.flush()zsys.stderr.write("foo")�sys.stderr.flush()r!T)�errortoosfoofoor"r's   r�test_outputWithErrorCollectedz/ProcessUtilsTests.test_outputWithErrorCollectedRsO���(�(�
�

�
�
�"�"�4�8�8�d�J�-?�$�O���}�}�T�-�-�y�9�9r*c��|jdg�}tj|jd|g�}|j	|j
d�S)z|
        The L{Deferred} returned by L{getProcessValue} is fired with the exit
        status of the child process.
        zraise SystemExit(1)r!�)rr�getProcessValuer$r%r&r's   r�
test_valuezProcessUtilsTests.test_valuegsJ��
�(�(�*?�)@�A�
��!�!�$�(�(�T�:�,>�?���}�}�T�-�-�q�1�1r*c����jgd��}�fd�}tj�jd|g�}|j	|�S)a
        The L{Deferred} returned by L{getProcessOutputAndValue} fires with a
        three-tuple, the elements of which give the data written to the child's
        stdout, the data written to the child's stderr, and the exit status of
        the child.
        )r z*sys.stdout.buffer.write(b'hello world!\n')z,sys.stderr.buffer.write(b'goodbye world!\n')zsys.exit(1)c�~��|\}}}�j|d��j|d��j|d�y)Ns
hello world!
sgoodbye world!
r;�r&)�out_err_code�outr1�coders    �r�gotOutputAndValuez@ProcessUtilsTests.test_outputAndValue.<locals>.gotOutputAndValue�s@���)�N�C��d����S�"3�4����S�"5�6����T�1�%r*r!�rr�getProcessOutputAndValuer$r%�rrrDr(s`   r�test_outputAndValuez%ProcessUtilsTests.test_outputAndValueqsL����(�(�
�
�
�	&�
�*�*�4�8�8�d�J�5G�H���}�}�.�/�/r*z"Windows doesn't have real signals.c�����jgd��}�fd�}tj�jd|g�}�j	|t
�}|j
|�S)z�
        If the child process exits because of a signal, the L{Deferred}
        returned by L{getProcessOutputAndValue} fires a L{Failure} of a tuple
        containing the child's stdout, stderr, and the signal which caused
        it to exit.
        )zimport sys, os, signalz"sys.stdout.write('stdout bytes\n')z"sys.stderr.write('stderr bytes\n')r6r7z$os.kill(os.getpid(), signal.SIGKILL)c���|\}}}�j|d��j|d��j|tj�y)Ns
stdout bytes
s
stderr bytes
)r&�signal�SIGKILL)�out_err_sigrBr1�sigrs    �rrDz>ProcessUtilsTests.test_outputSignal.<locals>.gotOutputAndValue�sD���'�M�C��c����S�"3�4����S�"3�4����S�&�.�.�1r*r!)rrrFr$r.�tupler%rGs`   r�test_outputSignalz#ProcessUtilsTests.test_outputSignal�s^����(�(�
�	
�
�	2�
�*�*�4�8�8�d�J�5G�H�����q�%�(���}�}�.�/�/r*c�D�tjj|j��}tj|�|jddg�}||jd|g|��}|j||jtj���|S)N�import os, syszsys.stdout.write(os.getcwd())r!)r)rrrr�makedirsrr$r%�encode�sys�getfilesystemencoding)r�utilFunc�check�dirrr(s      r�	_pathTestzProcessUtilsTests._pathTest�s}���g�g�o�o�d�k�k�m�,��
���C���(�(�
�>�?�
�
�
�T�X�X��j�1��<��	�
�
�e�S�Z�Z��(A�(A�(C�D�E��r*c�V�|jtj|j�S)z
        L{getProcessOutput} runs the given command with the working directory
        given by the C{path} parameter.
        )rZrr#r&�rs r�test_getProcessOutputPathz+ProcessUtilsTests.test_getProcessOutputPath�s!��
�~�~�e�4�4�d�6F�6F�G�Gr*c�N���fd�}�jtj|�S)z~
        L{getProcessValue} runs the given command with the working directory
        given by the C{path} parameter.
        c�*���j|d�y�Nrr@��result�ignoredrs  �rrXz9ProcessUtilsTests.test_getProcessValuePath.<locals>.check��������V�Q�'r*)rZrr<�rrXs` r�test_getProcessValuePathz*ProcessUtilsTests.test_getProcessValuePath�s!���	(��~�~�e�3�3�U�;�;r*c�N���fd�}�jtj|�S)z�
        L{getProcessOutputAndValue} runs the given command with the working
        directory given by the C{path} parameter.
        c�Z��|\}}}�j||��j|d�yr`r@��out_err_statusrYrBr1�statusrs     �rrXzBProcessUtilsTests.test_getProcessOutputAndValuePath.<locals>.check��/���-��C��f����S�#�&����V�Q�'r*)rZrrFres` r�!test_getProcessOutputAndValuePathz3ProcessUtilsTests.test_getProcessOutputAndValuePath�s!���	(�
�~�~�e�<�<�e�D�Dr*c���tjj|j��}tj|�|jgd��}|j
tjtj��tj|�tjtjd�j�}tj|tjtjz�|j
tj||�||jd|g�}|j!||j#t%j&���|S)N)rRzcdir = os.getcwd()zsys.stdout.write(cdir)�.r!)rrrrrSr�
addCleanup�chdir�getcwd�stat�S_IMODE�st_mode�chmod�S_IXUSR�S_IRUSRr$r%rTrUrV)rrWrXrYr�originalModer(s       r�_defaultPathTestz"ProcessUtilsTests._defaultPathTest�s����g�g�o�o�d�k�k�m�,��
���C���(�(�N�
�
�
	
������"�)�)�+�.�
����
��|�|�B�G�G�C�L�$8�$8�9��
	����d�l�l�T�\�\�1�2�
	
������#�|�4��T�X�X��j�1�2��	�
�
�e�S�Z�Z��(A�(A�(C�D�E��r*c�V�|jtj|j�S)a
        If no value is supplied for the C{path} parameter, L{getProcessOutput}
        runs the given command in the same working directory as the parent
        process and succeeds even if the current working directory is not
        accessible.
        )rzrr#r&r\s r� test_getProcessOutputDefaultPathz2ProcessUtilsTests.test_getProcessOutputDefaultPath�s#���$�$�U�%;�%;�T�=M�=M�N�Nr*c�N���fd�}�jtj|�S)a
        If no value is supplied for the C{path} parameter, L{getProcessValue}
        runs the given command in the same working directory as the parent
        process and succeeds even if the current working directory is not
        accessible.
        c�*���j|d�yr`r@ras  �rrXz@ProcessUtilsTests.test_getProcessValueDefaultPath.<locals>.checkrdr*)rzrr<res` r�test_getProcessValueDefaultPathz1ProcessUtilsTests.test_getProcessValueDefaultPath�s#���	(��$�$�U�%:�%:�E�B�Br*c�N���fd�}�jtj|�S)a	
        If no value is supplied for the C{path} parameter,
        L{getProcessOutputAndValue} runs the given command in the same working
        directory as the parent process and succeeds even if the current
        working directory is not accessible.
        c�Z��|\}}}�j||��j|d�yr`r@ris     �rrXzIProcessUtilsTests.test_getProcessOutputAndValueDefaultPath.<locals>.checkrlr*)rzrrFres` r�(test_getProcessOutputAndValueDefaultPathz:ProcessUtilsTests.test_getProcessOutputAndValueDefaultPaths#���	(�
�$�$�U�%C�%C�U�K�Kr*c�����jddg�}d�tj�jd|g���}��fd�}|j	|�|S)z�
        Standard input can be made available to the child process by passing
        bytes for the `stdinBytes` parameter.
        r z"sys.stdout.write(sys.stdin.read())sThese are the bytes to see.r!)�
stdinBytesc�Z��|\}}}�j�|��jd|�yr`)�assertInr&)rArBr1rCrr�s    ��rrDzPProcessUtilsTests.test_get_processOutputAndValueStdin.<locals>.gotOutputAndValue(s.���)�N�C��d�
�M�M�*�c�*����Q��%r*rE)rrr(rDr�s`   @r�#test_get_processOutputAndValueStdinz5ProcessUtilsTests.test_get_processOutputAndValueStdinsb���
�(�(��4�
�
�
�4�
��*�*��H�H�
�:��!�
��	&�	
�
�
�'�(��r*) �__name__�
__module__�__qualname__�__doc__r�IReactorProcessr�skip�output�valuerU�
executabler$rr)r4r9r=rHrr	�	isWindowsrPrZr]rfrmrzr|rr�r��r*rrrs����
"�z�!�!�'�4�0�8�:��
�F��E�

�.�.�C�'�A�"�&:�*2�0�2��H��� �"F�G�0�H�0�>�H�	<�E��<O�C�
L�r*rc��eZdZdZd�Zy)�SuppressWarningsTestsz.
    Tests for L{utils.suppressWarnings}.
    c�^��g��fd�}|jtd|�d�}tj|dt	d��f�}|d�|jt
��d�|d�|jt
��d�|d	�|jt
��d
�y)zs
        L{utils.suppressWarnings} decorates a function so that the given
        warnings are suppressed.
        c�,���j||f�yr-)�append)r�a�kwrbs   �r�showwarningz@SuppressWarningsTests.test_suppressWarnings.<locals>.showwarning@s����M�M�1�b�'�"r*r�c�.�tj|�yr-)�warnings�warn)�msgs r�fz6SuppressWarningsTests.test_suppressWarnings.<locals>.fEs���M�M�#�r*)�ignorezThis is message)�messagezSanity check messager;zUnignored message�N)�patchr�r�suppressWarnings�dictr&�len)rr�r��grbs    @r�test_suppressWarningsz+SuppressWarningsTests.test_suppressWarnings9s����
��	#�	
�
�
�8�]�K�8�	�
�"�"�1�{�D�AR�4S�&T�U��	
�
 �!�����V��a�(�	
�
������V��a�(�	
�
������V��a�(r*N)r�r�r�r�r�r�r*rr�r�4s���)r*r�c�@�eZdZdZeej�Zd�Zd�Zy)�DeferredSuppressedWarningsTestsz`
    Tests for L{utils.runWithWarningsSuppressed}, the version that supports
    Deferreds.
    c�8��difdifg}t��|j|�fd��tjd��j	d�tjd�|jdg|j
�D�cgc]}|d��	c}�ycc}w)	z�
        If the function called by L{utils.runWithWarningsSuppressed} returns a
        C{Deferred}, the warning filters aren't removed until the Deferred
        fires.
        �r�z.*foo.*�r�z.*bar.*c����Sr-r��rbs�r�<lambda>zGDeferredSuppressedWarningsTests.test_deferredCallback.<locals>.<lambda>ls����r*�
ignore foo��ignore foo 2r�N)r�runWithWarningsSuppressedr�r��callbackr&�
flushWarnings)r�filters�wrbs   @r�test_deferredCallbackz5DeferredSuppressedWarningsTests.test_deferredCallbackds����*�2�.�1F��0K�L������&�&�w��?��
�
�l�#�������
�
�n�%����.�)�$�BT�BT�BV�+W�Q�A�i�L�+W�X��+Ws�B
c�l��difdifg}t��|j|�fd��}tjd��j	t��|j
d��tjd�|jdg|j�D�cgc]}|d��	c}�ycc}w)	z�
        If the function called by L{utils.runWithWarningsSuppressed} returns a
        C{Deferred}, the warning filters aren't removed until the Deferred
        fires with an errback.
        r�r�c����Sr-r�r�s�rr�zFDeferredSuppressedWarningsTests.test_deferredErrback.<locals>.<lambda>zs���F�r*r�c�,�|jt�Sr-)�trap�ZeroDivisionError)r�s rr�zFDeferredSuppressedWarningsTests.test_deferredErrback.<locals>.<lambda>}s��q�v�v�&7�8�r*r�r�N)	rr�r�r��errbackr��
addErrbackr&r�)rr�r(r�rbs    @r�test_deferredErrbackz4DeferredSuppressedWarningsTests.test_deferredErrbackrs����*�2�.�1F��0K�L������*�*�7�N�C���
�
�l�#����(�*�+�	���8�9��
�
�n�%����.�)�$�BT�BT�BV�+W�Q�A�i�L�+W�X��+Ws�B1
N)	r�r�r�r��staticmethodrr�r�r�r�r*rr�r�Zs&���!-�U�-L�-L� M��Y�
Yr*r�)r�rrKrsrUr��unittestr�twisted.internetrrrr�twisted.internet.deferr�twisted.python.runtimer	�twisted.python.test.test_utilr
�twisted.trial.unittestrrrr�r�r�r*r�<module>r�s\���

�
��
���>�>�+�+�A�@�Z��Z�z#)�/�#)�L%Y�&=�%Yr*