HEX
Server: Apache/2.4.58 (Ubuntu)
System: Linux ns3133907 6.8.0-86-generic #87-Ubuntu SMP PREEMPT_DYNAMIC Mon Sep 22 18:03:36 UTC 2025 x86_64
User: cssnetorguk (1024)
PHP: 8.2.28
Disabled: NONE
Upload Files
File: //usr/lib/python3/dist-packages/twisted/cred/test/test_digestauth.py
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Tests for L{twisted.cred._digest} and the associated bits in
L{twisted.cred.credentials}.
"""


import base64
from binascii import hexlify
from hashlib import md5, sha1

from zope.interface.verify import verifyObject

from twisted.cred.credentials import (
    DigestCredentialFactory,
    IUsernameDigestHash,
    calcHA1,
    calcHA2,
    calcResponse,
)
from twisted.cred.error import LoginFailed
from twisted.internet.address import IPv4Address
from twisted.python.compat import networkString
from twisted.trial.unittest import TestCase


def b64encode(s):
    return base64.b64encode(s).strip()


class FakeDigestCredentialFactory(DigestCredentialFactory):
    """
    A Fake Digest Credential Factory that generates a predictable
    nonce and opaque
    """

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.privateKey = b"0"

    def _generateNonce(self):
        """
        Generate a static nonce
        """
        return b"178288758716122392881254770685"

    def _getTime(self):
        """
        Return a stable time
        """
        return 0


class DigestAuthTests(TestCase):
    """
    L{TestCase} mixin class which defines a number of tests for
    L{DigestCredentialFactory}.  Because this mixin defines C{setUp}, it
    must be inherited before L{TestCase}.
    """

    def setUp(self):
        """
        Create a DigestCredentialFactory for testing
        """
        self.username = b"foobar"
        self.password = b"bazquux"
        self.realm = b"test realm"
        self.algorithm = b"md5"
        self.cnonce = b"29fc54aa1641c6fa0e151419361c8f23"
        self.qop = b"auth"
        self.uri = b"/write/"
        self.clientAddress = IPv4Address("TCP", "10.2.3.4", 43125)
        self.method = b"GET"
        self.credentialFactory = DigestCredentialFactory(self.algorithm, self.realm)

    def test_MD5HashA1(self, _algorithm=b"md5", _hash=md5):
        """
        L{calcHA1} accepts the C{'md5'} algorithm and returns an MD5 hash of
        its parameters, excluding the nonce and cnonce.
        """
        nonce = b"abc123xyz"
        hashA1 = calcHA1(
            _algorithm, self.username, self.realm, self.password, nonce, self.cnonce
        )
        a1 = b":".join((self.username, self.realm, self.password))
        expected = hexlify(_hash(a1).digest())
        self.assertEqual(hashA1, expected)

    def test_MD5SessionHashA1(self):
        """
        L{calcHA1} accepts the C{'md5-sess'} algorithm and returns an MD5 hash
        of its parameters, including the nonce and cnonce.
        """
        nonce = b"xyz321abc"
        hashA1 = calcHA1(
            b"md5-sess", self.username, self.realm, self.password, nonce, self.cnonce
        )
        a1 = self.username + b":" + self.realm + b":" + self.password
        ha1 = hexlify(md5(a1).digest())
        a1 = ha1 + b":" + nonce + b":" + self.cnonce
        expected = hexlify(md5(a1).digest())
        self.assertEqual(hashA1, expected)

    def test_SHAHashA1(self):
        """
        L{calcHA1} accepts the C{'sha'} algorithm and returns a SHA hash of its
        parameters, excluding the nonce and cnonce.
        """
        self.test_MD5HashA1(b"sha", sha1)

    def test_MD5HashA2Auth(self, _algorithm=b"md5", _hash=md5):
        """
        L{calcHA2} accepts the C{'md5'} algorithm and returns an MD5 hash of
        its arguments, excluding the entity hash for QOP other than
        C{'auth-int'}.
        """
        method = b"GET"
        hashA2 = calcHA2(_algorithm, method, self.uri, b"auth", None)
        a2 = method + b":" + self.uri
        expected = hexlify(_hash(a2).digest())
        self.assertEqual(hashA2, expected)

    def test_MD5HashA2AuthInt(self, _algorithm=b"md5", _hash=md5):
        """
        L{calcHA2} accepts the C{'md5'} algorithm and returns an MD5 hash of
        its arguments, including the entity hash for QOP of C{'auth-int'}.
        """
        method = b"GET"
        hentity = b"foobarbaz"
        hashA2 = calcHA2(_algorithm, method, self.uri, b"auth-int", hentity)
        a2 = method + b":" + self.uri + b":" + hentity
        expected = hexlify(_hash(a2).digest())
        self.assertEqual(hashA2, expected)

    def test_MD5SessHashA2Auth(self):
        """
        L{calcHA2} accepts the C{'md5-sess'} algorithm and QOP of C{'auth'} and
        returns the same value as it does for the C{'md5'} algorithm.
        """
        self.test_MD5HashA2Auth(b"md5-sess")

    def test_MD5SessHashA2AuthInt(self):
        """
        L{calcHA2} accepts the C{'md5-sess'} algorithm and QOP of C{'auth-int'}
        and returns the same value as it does for the C{'md5'} algorithm.
        """
        self.test_MD5HashA2AuthInt(b"md5-sess")

    def test_SHAHashA2Auth(self):
        """
        L{calcHA2} accepts the C{'sha'} algorithm and returns a SHA hash of
        its arguments, excluding the entity hash for QOP other than
        C{'auth-int'}.
        """
        self.test_MD5HashA2Auth(b"sha", sha1)

    def test_SHAHashA2AuthInt(self):
        """
        L{calcHA2} accepts the C{'sha'} algorithm and returns a SHA hash of
        its arguments, including the entity hash for QOP of C{'auth-int'}.
        """
        self.test_MD5HashA2AuthInt(b"sha", sha1)

    def test_MD5HashResponse(self, _algorithm=b"md5", _hash=md5):
        """
        L{calcResponse} accepts the C{'md5'} algorithm and returns an MD5 hash
        of its parameters, excluding the nonce count, client nonce, and QoP
        value if the nonce count and client nonce are L{None}
        """
        hashA1 = b"abc123"
        hashA2 = b"789xyz"
        nonce = b"lmnopq"

        response = hashA1 + b":" + nonce + b":" + hashA2
        expected = hexlify(_hash(response).digest())

        digest = calcResponse(hashA1, hashA2, _algorithm, nonce, None, None, None)
        self.assertEqual(expected, digest)

    def test_MD5SessionHashResponse(self):
        """
        L{calcResponse} accepts the C{'md5-sess'} algorithm and returns an MD5
        hash of its parameters, excluding the nonce count, client nonce, and
        QoP value if the nonce count and client nonce are L{None}
        """
        self.test_MD5HashResponse(b"md5-sess")

    def test_SHAHashResponse(self):
        """
        L{calcResponse} accepts the C{'sha'} algorithm and returns a SHA hash
        of its parameters, excluding the nonce count, client nonce, and QoP
        value if the nonce count and client nonce are L{None}
        """
        self.test_MD5HashResponse(b"sha", sha1)

    def test_MD5HashResponseExtra(self, _algorithm=b"md5", _hash=md5):
        """
        L{calcResponse} accepts the C{'md5'} algorithm and returns an MD5 hash
        of its parameters, including the nonce count, client nonce, and QoP
        value if they are specified.
        """
        hashA1 = b"abc123"
        hashA2 = b"789xyz"
        nonce = b"lmnopq"
        nonceCount = b"00000004"
        clientNonce = b"abcxyz123"
        qop = b"auth"

        response = (
            hashA1
            + b":"
            + nonce
            + b":"
            + nonceCount
            + b":"
            + clientNonce
            + b":"
            + qop
            + b":"
            + hashA2
        )
        expected = hexlify(_hash(response).digest())

        digest = calcResponse(
            hashA1, hashA2, _algorithm, nonce, nonceCount, clientNonce, qop
        )
        self.assertEqual(expected, digest)

    def test_MD5SessionHashResponseExtra(self):
        """
        L{calcResponse} accepts the C{'md5-sess'} algorithm and returns an MD5
        hash of its parameters, including the nonce count, client nonce, and
        QoP value if they are specified.
        """
        self.test_MD5HashResponseExtra(b"md5-sess")

    def test_SHAHashResponseExtra(self):
        """
        L{calcResponse} accepts the C{'sha'} algorithm and returns a SHA hash
        of its parameters, including the nonce count, client nonce, and QoP
        value if they are specified.
        """
        self.test_MD5HashResponseExtra(b"sha", sha1)

    def formatResponse(self, quotes=True, **kw):
        """
        Format all given keyword arguments and their values suitably for use as
        the value of an HTTP header.

        @types quotes: C{bool}
        @param quotes: A flag indicating whether to quote the values of each
            field in the response.

        @param **kw: Keywords and C{bytes} values which will be treated as field
            name/value pairs to include in the result.

        @rtype: C{bytes}
        @return: The given fields formatted for use as an HTTP header value.
        """
        if "username" not in kw:
            kw["username"] = self.username
        if "realm" not in kw:
            kw["realm"] = self.realm
        if "algorithm" not in kw:
            kw["algorithm"] = self.algorithm
        if "qop" not in kw:
            kw["qop"] = self.qop
        if "cnonce" not in kw:
            kw["cnonce"] = self.cnonce
        if "uri" not in kw:
            kw["uri"] = self.uri
        if quotes:
            quote = b'"'
        else:
            quote = b""

        return b", ".join(
            [
                b"".join((networkString(k), b"=", quote, v, quote))
                for (k, v) in kw.items()
                if v is not None
            ]
        )

    def getDigestResponse(self, challenge, ncount):
        """
        Calculate the response for the given challenge
        """
        nonce = challenge.get("nonce")
        algo = challenge.get("algorithm").lower()
        qop = challenge.get("qop")

        ha1 = calcHA1(
            algo, self.username, self.realm, self.password, nonce, self.cnonce
        )
        ha2 = calcHA2(algo, b"GET", self.uri, qop, None)
        expected = calcResponse(ha1, ha2, algo, nonce, ncount, self.cnonce, qop)
        return expected

    def test_response(self, quotes=True):
        """
        L{DigestCredentialFactory.decode} accepts a digest challenge response
        and parses it into an L{IUsernameHashedPassword} provider.
        """
        challenge = self.credentialFactory.getChallenge(self.clientAddress.host)

        nc = b"00000001"
        clientResponse = self.formatResponse(
            quotes=quotes,
            nonce=challenge["nonce"],
            response=self.getDigestResponse(challenge, nc),
            nc=nc,
            opaque=challenge["opaque"],
        )
        creds = self.credentialFactory.decode(
            clientResponse, self.method, self.clientAddress.host
        )
        self.assertTrue(creds.checkPassword(self.password))
        self.assertFalse(creds.checkPassword(self.password + b"wrong"))

    def test_responseWithoutQuotes(self):
        """
        L{DigestCredentialFactory.decode} accepts a digest challenge response
        which does not quote the values of its fields and parses it into an
        L{IUsernameHashedPassword} provider in the same way it would a
        response which included quoted field values.
        """
        self.test_response(False)

    def test_responseWithCommaURI(self):
        """
        L{DigestCredentialFactory.decode} accepts a digest challenge response
        which quotes the values of its fields and includes a C{b","} in the URI
        field.
        """
        self.uri = b"/some,path/"
        self.test_response(True)

    def test_caseInsensitiveAlgorithm(self):
        """
        The case of the algorithm value in the response is ignored when
        checking the credentials.
        """
        self.algorithm = b"MD5"
        self.test_response()

    def test_md5DefaultAlgorithm(self):
        """
        The algorithm defaults to MD5 if it is not supplied in the response.
        """
        self.algorithm = None
        self.test_response()

    def test_responseWithoutClientIP(self):
        """
        L{DigestCredentialFactory.decode} accepts a digest challenge response
        even if the client address it is passed is L{None}.
        """
        challenge = self.credentialFactory.getChallenge(None)

        nc = b"00000001"
        clientResponse = self.formatResponse(
            nonce=challenge["nonce"],
            response=self.getDigestResponse(challenge, nc),
            nc=nc,
            opaque=challenge["opaque"],
        )
        creds = self.credentialFactory.decode(clientResponse, self.method, None)
        self.assertTrue(creds.checkPassword(self.password))
        self.assertFalse(creds.checkPassword(self.password + b"wrong"))

    def test_multiResponse(self):
        """
        L{DigestCredentialFactory.decode} handles multiple responses to a
        single challenge.
        """
        challenge = self.credentialFactory.getChallenge(self.clientAddress.host)

        nc = b"00000001"
        clientResponse = self.formatResponse(
            nonce=challenge["nonce"],
            response=self.getDigestResponse(challenge, nc),
            nc=nc,
            opaque=challenge["opaque"],
        )

        creds = self.credentialFactory.decode(
            clientResponse, self.method, self.clientAddress.host
        )
        self.assertTrue(creds.checkPassword(self.password))
        self.assertFalse(creds.checkPassword(self.password + b"wrong"))

        nc = b"00000002"
        clientResponse = self.formatResponse(
            nonce=challenge["nonce"],
            response=self.getDigestResponse(challenge, nc),
            nc=nc,
            opaque=challenge["opaque"],
        )

        creds = self.credentialFactory.decode(
            clientResponse, self.method, self.clientAddress.host
        )
        self.assertTrue(creds.checkPassword(self.password))
        self.assertFalse(creds.checkPassword(self.password + b"wrong"))

    def test_failsWithDifferentMethod(self):
        """
        L{DigestCredentialFactory.decode} returns an L{IUsernameHashedPassword}
        provider which rejects a correct password for the given user if the
        challenge response request is made using a different HTTP method than
        was used to request the initial challenge.
        """
        challenge = self.credentialFactory.getChallenge(self.clientAddress.host)

        nc = b"00000001"
        clientResponse = self.formatResponse(
            nonce=challenge["nonce"],
            response=self.getDigestResponse(challenge, nc),
            nc=nc,
            opaque=challenge["opaque"],
        )
        creds = self.credentialFactory.decode(
            clientResponse, b"POST", self.clientAddress.host
        )
        self.assertFalse(creds.checkPassword(self.password))
        self.assertFalse(creds.checkPassword(self.password + b"wrong"))

    def test_noUsername(self):
        """
        L{DigestCredentialFactory.decode} raises L{LoginFailed} if the response
        has no username field or if the username field is empty.
        """
        # Check for no username
        e = self.assertRaises(
            LoginFailed,
            self.credentialFactory.decode,
            self.formatResponse(username=None),
            self.method,
            self.clientAddress.host,
        )
        self.assertEqual(str(e), "Invalid response, no username given.")

        # Check for an empty username
        e = self.assertRaises(
            LoginFailed,
            self.credentialFactory.decode,
            self.formatResponse(username=b""),
            self.method,
            self.clientAddress.host,
        )
        self.assertEqual(str(e), "Invalid response, no username given.")

    def test_noNonce(self):
        """
        L{DigestCredentialFactory.decode} raises L{LoginFailed} if the response
        has no nonce.
        """
        e = self.assertRaises(
            LoginFailed,
            self.credentialFactory.decode,
            self.formatResponse(opaque=b"abc123"),
            self.method,
            self.clientAddress.host,
        )
        self.assertEqual(str(e), "Invalid response, no nonce given.")

    def test_noOpaque(self):
        """
        L{DigestCredentialFactory.decode} raises L{LoginFailed} if the response
        has no opaque.
        """
        e = self.assertRaises(
            LoginFailed,
            self.credentialFactory.decode,
            self.formatResponse(),
            self.method,
            self.clientAddress.host,
        )
        self.assertEqual(str(e), "Invalid response, no opaque given.")

    def test_checkHash(self):
        """
        L{DigestCredentialFactory.decode} returns an L{IUsernameDigestHash}
        provider which can verify a hash of the form 'username:realm:password'.
        """
        challenge = self.credentialFactory.getChallenge(self.clientAddress.host)

        nc = b"00000001"
        clientResponse = self.formatResponse(
            nonce=challenge["nonce"],
            response=self.getDigestResponse(challenge, nc),
            nc=nc,
            opaque=challenge["opaque"],
        )

        creds = self.credentialFactory.decode(
            clientResponse, self.method, self.clientAddress.host
        )
        self.assertTrue(verifyObject(IUsernameDigestHash, creds))

        cleartext = self.username + b":" + self.realm + b":" + self.password
        hash = md5(cleartext)
        self.assertTrue(creds.checkHash(hexlify(hash.digest())))
        hash.update(b"wrong")
        self.assertFalse(creds.checkHash(hexlify(hash.digest())))

    def test_invalidOpaque(self):
        """
        L{DigestCredentialFactory.decode} raises L{LoginFailed} when the opaque
        value does not contain all the required parts.
        """
        credentialFactory = FakeDigestCredentialFactory(self.algorithm, self.realm)
        challenge = credentialFactory.getChallenge(self.clientAddress.host)

        exc = self.assertRaises(
            LoginFailed,
            credentialFactory._verifyOpaque,
            b"badOpaque",
            challenge["nonce"],
            self.clientAddress.host,
        )
        self.assertEqual(str(exc), "Invalid response, invalid opaque value")

        badOpaque = b"foo-" + b64encode(b"nonce,clientip")

        exc = self.assertRaises(
            LoginFailed,
            credentialFactory._verifyOpaque,
            badOpaque,
            challenge["nonce"],
            self.clientAddress.host,
        )
        self.assertEqual(str(exc), "Invalid response, invalid opaque value")

        exc = self.assertRaises(
            LoginFailed,
            credentialFactory._verifyOpaque,
            b"",
            challenge["nonce"],
            self.clientAddress.host,
        )
        self.assertEqual(str(exc), "Invalid response, invalid opaque value")

        badOpaque = b"foo-" + b64encode(
            b",".join(
                (challenge["nonce"], networkString(self.clientAddress.host), b"foobar")
            )
        )
        exc = self.assertRaises(
            LoginFailed,
            credentialFactory._verifyOpaque,
            badOpaque,
            challenge["nonce"],
            self.clientAddress.host,
        )
        self.assertEqual(str(exc), "Invalid response, invalid opaque/time values")

    def test_incompatibleNonce(self):
        """
        L{DigestCredentialFactory.decode} raises L{LoginFailed} when the given
        nonce from the response does not match the nonce encoded in the opaque.
        """
        credentialFactory = FakeDigestCredentialFactory(self.algorithm, self.realm)
        challenge = credentialFactory.getChallenge(self.clientAddress.host)

        badNonceOpaque = credentialFactory._generateOpaque(
            b"1234567890", self.clientAddress.host
        )

        exc = self.assertRaises(
            LoginFailed,
            credentialFactory._verifyOpaque,
            badNonceOpaque,
            challenge["nonce"],
            self.clientAddress.host,
        )
        self.assertEqual(str(exc), "Invalid response, incompatible opaque/nonce values")

        exc = self.assertRaises(
            LoginFailed,
            credentialFactory._verifyOpaque,
            badNonceOpaque,
            b"",
            self.clientAddress.host,
        )
        self.assertEqual(str(exc), "Invalid response, incompatible opaque/nonce values")

    def test_incompatibleClientIP(self):
        """
        L{DigestCredentialFactory.decode} raises L{LoginFailed} when the
        request comes from a client IP other than what is encoded in the
        opaque.
        """
        credentialFactory = FakeDigestCredentialFactory(self.algorithm, self.realm)
        challenge = credentialFactory.getChallenge(self.clientAddress.host)

        badAddress = "10.0.0.1"
        # Sanity check
        self.assertNotEqual(self.clientAddress.host, badAddress)

        badNonceOpaque = credentialFactory._generateOpaque(
            challenge["nonce"], badAddress
        )

        self.assertRaises(
            LoginFailed,
            credentialFactory._verifyOpaque,
            badNonceOpaque,
            challenge["nonce"],
            self.clientAddress.host,
        )

    def test_oldNonce(self):
        """
        L{DigestCredentialFactory.decode} raises L{LoginFailed} when the given
        opaque is older than C{DigestCredentialFactory.CHALLENGE_LIFETIME_SECS}
        """
        credentialFactory = FakeDigestCredentialFactory(self.algorithm, self.realm)
        challenge = credentialFactory.getChallenge(self.clientAddress.host)

        key = b",".join(
            (challenge["nonce"], networkString(self.clientAddress.host), b"-137876876")
        )
        digest = hexlify(md5(key + credentialFactory.privateKey).digest())
        ekey = b64encode(key)

        oldNonceOpaque = b"-".join((digest, ekey.strip(b"\n")))

        self.assertRaises(
            LoginFailed,
            credentialFactory._verifyOpaque,
            oldNonceOpaque,
            challenge["nonce"],
            self.clientAddress.host,
        )

    def test_mismatchedOpaqueChecksum(self):
        """
        L{DigestCredentialFactory.decode} raises L{LoginFailed} when the opaque
        checksum fails verification.
        """
        credentialFactory = FakeDigestCredentialFactory(self.algorithm, self.realm)
        challenge = credentialFactory.getChallenge(self.clientAddress.host)

        key = b",".join(
            (challenge["nonce"], networkString(self.clientAddress.host), b"0")
        )

        digest = hexlify(md5(key + b"this is not the right pkey").digest())
        badChecksum = b"-".join((digest, b64encode(key)))

        self.assertRaises(
            LoginFailed,
            credentialFactory._verifyOpaque,
            badChecksum,
            challenge["nonce"],
            self.clientAddress.host,
        )

    def test_incompatibleCalcHA1Options(self):
        """
        L{calcHA1} raises L{TypeError} when any of the pszUsername, pszRealm,
        or pszPassword arguments are specified with the preHA1 keyword
        argument.
        """
        arguments = (
            (b"user", b"realm", b"password", b"preHA1"),
            (None, b"realm", None, b"preHA1"),
            (None, None, b"password", b"preHA1"),
        )

        for pszUsername, pszRealm, pszPassword, preHA1 in arguments:
            self.assertRaises(
                TypeError,
                calcHA1,
                b"md5",
                pszUsername,
                pszRealm,
                pszPassword,
                b"nonce",
                b"cnonce",
                preHA1=preHA1,
            )

    def test_noNewlineOpaque(self):
        """
        L{DigestCredentialFactory._generateOpaque} returns a value without
        newlines, regardless of the length of the nonce.
        """
        opaque = self.credentialFactory._generateOpaque(b"long nonce " * 10, None)
        self.assertNotIn(b"\n", opaque)