HEX
Server: Apache/2.4.58 (Ubuntu)
System: Linux ns3133907 6.8.0-86-generic #87-Ubuntu SMP PREEMPT_DYNAMIC Mon Sep 22 18:03:36 UTC 2025 x86_64
User: cssnetorguk (1024)
PHP: 8.2.28
Disabled: NONE
Upload Files
File: //lib/python3/dist-packages/boto3/crt.py
# Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# https://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""
This file contains private functionality for interacting with the AWS
Common Runtime library (awscrt) in boto3.

All code contained within this file is for internal usage within this
project and is not intended for external consumption. All interfaces
contained within are subject to abrupt breaking changes.
"""

import threading

import botocore.exceptions
from botocore.session import Session
from s3transfer.crt import (
    BotocoreCRTCredentialsWrapper,
    BotocoreCRTRequestSerializer,
    CRTTransferManager,
    acquire_crt_s3_process_lock,
    create_s3_crt_client,
)

# Singletons for CRT-backed transfers
CRT_S3_CLIENT = None
BOTOCORE_CRT_SERIALIZER = None

CLIENT_CREATION_LOCK = threading.Lock()
PROCESS_LOCK_NAME = 'boto3'


def _create_crt_client(session, config, region_name, cred_provider):
    """Create a CRT S3 Client for file transfer.

    Instantiating many of these may lead to degraded performance or
    system resource exhaustion.
    """
    create_crt_client_kwargs = {
        'region': region_name,
        'use_ssl': True,
        'crt_credentials_provider': cred_provider,
    }
    return create_s3_crt_client(**create_crt_client_kwargs)


def _create_crt_request_serializer(session, region_name):
    return BotocoreCRTRequestSerializer(
        session, {'region_name': region_name, 'endpoint_url': None}
    )


def _create_crt_s3_client(
    session, config, region_name, credentials, lock, **kwargs
):
    """Create boto3 wrapper class to manage crt lock reference and S3 client."""
    cred_wrapper = BotocoreCRTCredentialsWrapper(credentials)
    cred_provider = cred_wrapper.to_crt_credentials_provider()
    return CRTS3Client(
        _create_crt_client(session, config, region_name, cred_provider),
        lock,
        region_name,
        cred_wrapper,
    )


def _initialize_crt_transfer_primatives(client, config):
    lock = acquire_crt_s3_process_lock(PROCESS_LOCK_NAME)
    if lock is None:
        # If we're unable to acquire the lock, we cannot
        # use the CRT in this process and should default to
        # the classic s3transfer manager.
        return None, None

    session = Session()
    region_name = client.meta.region_name
    credentials = client._get_credentials()

    serializer = _create_crt_request_serializer(session, region_name)
    s3_client = _create_crt_s3_client(
        session, config, region_name, credentials, lock
    )
    return serializer, s3_client


def get_crt_s3_client(client, config):
    global CRT_S3_CLIENT
    global BOTOCORE_CRT_SERIALIZER

    with CLIENT_CREATION_LOCK:
        if CRT_S3_CLIENT is None:
            serializer, s3_client = _initialize_crt_transfer_primatives(
                client, config
            )
            BOTOCORE_CRT_SERIALIZER = serializer
            CRT_S3_CLIENT = s3_client

    return CRT_S3_CLIENT


class CRTS3Client:
    """
    This wrapper keeps track of our underlying CRT client, the lock used to
    acquire it and the region we've used to instantiate the client.

    Due to limitations in the existing CRT interfaces, we can only make calls
    in a single region and does not support redirects. We track the region to
    ensure we don't use the CRT client when a successful request cannot be made.
    """

    def __init__(self, crt_client, process_lock, region, cred_provider):
        self.crt_client = crt_client
        self.process_lock = process_lock
        self.region = region
        self.cred_provider = cred_provider


def is_crt_compatible_request(client, crt_s3_client):
    """
    Boto3 client must use same signing region and credentials
    as the CRT_S3_CLIENT singleton. Otherwise fallback to classic.
    """
    if crt_s3_client is None:
        return False

    boto3_creds = client._get_credentials()
    if boto3_creds is None:
        return False

    is_same_identity = compare_identity(
        boto3_creds.get_frozen_credentials(), crt_s3_client.cred_provider
    )
    is_same_region = client.meta.region_name == crt_s3_client.region
    return is_same_region and is_same_identity


def compare_identity(boto3_creds, crt_s3_creds):
    try:
        crt_creds = crt_s3_creds()
    except botocore.exceptions.NoCredentialsError:
        return False

    is_matching_identity = (
        boto3_creds.access_key == crt_creds.access_key_id
        and boto3_creds.secret_key == crt_creds.secret_access_key
        and boto3_creds.token == crt_creds.session_token
    )
    return is_matching_identity


def create_crt_transfer_manager(client, config):
    """Create a CRTTransferManager for optimized data transfer."""
    crt_s3_client = get_crt_s3_client(client, config)
    if is_crt_compatible_request(client, crt_s3_client):
        return CRTTransferManager(
            crt_s3_client.crt_client, BOTOCORE_CRT_SERIALIZER
        )
    return None