From 05d6765bb8de994195c5e37ebef0cd56b22fed6d Mon Sep 17 00:00:00 2001 From: Marcus Boorstin Date: Tue, 4 Nov 2025 00:15:44 -0500 Subject: [PATCH 1/2] Add support for recipient for KMS Decrypt --- .../localstack/services/kms/provider.py | 44 ++++++- localstack-core/localstack/utils/crypto.py | 116 ++++++++++++++++++ pyproject.toml | 2 + requirements-base-runtime.txt | 4 + requirements-basic.txt | 4 + requirements-dev.txt | 8 ++ requirements-runtime.txt | 8 ++ requirements-test.txt | 8 ++ requirements-typehint.txt | 8 ++ tests/aws/services/kms/test_kms.py | 110 +++++++++++++++++ 10 files changed, 309 insertions(+), 3 deletions(-) diff --git a/localstack-core/localstack/services/kms/provider.py b/localstack-core/localstack/services/kms/provider.py index be5abc2728832..5e73d1788b282 100644 --- a/localstack-core/localstack/services/kms/provider.py +++ b/localstack-core/localstack/services/kms/provider.py @@ -4,10 +4,12 @@ import logging import os +from cbor2._decoder import loads as cbor2_loads from cryptography.exceptions import InvalidTag from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes, keywrap from cryptography.hazmat.primitives.asymmetric import padding +from cryptography.hazmat.primitives.serialization import load_der_public_key from localstack.aws.api import CommonServiceException, RequestContext, handler from localstack.aws.api.kms import ( @@ -138,6 +140,7 @@ from localstack.utils.aws.arns import get_partition, kms_alias_arn, parse_arn from localstack.utils.collections import PaginatedList from localstack.utils.common import select_attributes +from localstack.utils.crypto import pkcs7_envelope_encrypt from localstack.utils.strings import short_uid, to_bytes, to_str LOG = logging.getLogger(__name__) @@ -1075,6 +1078,25 @@ def decrypt( self._validate_key_for_encryption_decryption(context, key) self._validate_key_state_not_pending_import(key) + # Handle the recipient field. This is used by AWS Nitro to re-encrypt the plaintext to the key specified + # by the enclave. Proper support for this will take significant work to figure out how to model enforcing + # the attestation measurements; for now, if recipient is specified and has an attestation doc in it including + # a public key where it's expected to be, we encrypt to that public key. This at least allows users to use + # localstack as a drop-in replacement for AWS when testing without having to skip the secondary decryption + # when using localstack. + recipient_pubkey = None + if recipient: + attestation_document = recipient["AttestationDocument"] + # We do all of this in a try/catch and warn if it fails so that if users are currently passing a nonsense + # value we don't break it for them. In the future we could do a breaking change to require a valid attestation + # (or at least one that contains the public key). + try: + recipient_pubkey = self._extract_attestation_pubkey(attestation_document) + except Exception as e: + logging.warning( + "Unable to extract public key from non-empty attestation document: %s", e + ) + try: # TODO: Extend the implementation to handle additional encryption/decryption scenarios # beyond the current support for offline encryption and online decryption using RSA keys if key id exists in @@ -1088,20 +1110,27 @@ def decrypt( plaintext = key.decrypt(ciphertext, encryption_context) except InvalidTag: raise InvalidCiphertextException() + # For compatibility, we return EncryptionAlgorithm values expected from AWS. But LocalStack currently always # encrypts with symmetric encryption no matter the key settings. # # We return a key ARN instead of KeyId despite the name of the parameter, as this is what AWS does and states # in its docs. - # TODO add support for "recipient" # https://docs.aws.amazon.com/kms/latest/APIReference/API_Decrypt.html#API_Decrypt_RequestSyntax # TODO add support for "dry_run" - return DecryptResponse( + response = DecryptResponse( KeyId=key.metadata.get("Arn"), - Plaintext=plaintext, EncryptionAlgorithm=encryption_algorithm, ) + # Encrypt to the recipient pubkey if specified. Otherwise, return the actual plaintext + if recipient_pubkey: + response["CiphertextForRecipient"] = pkcs7_envelope_encrypt(plaintext, recipient_pubkey) + else: + response["Plaintext"] = plaintext + + return response + def get_parameters_for_import( self, context: RequestContext, @@ -1559,6 +1588,15 @@ def _validate_grant_request(self, data: dict): f" constraint: [Member must satisfy enum value set: {VALID_OPERATIONS}]" ) + def _extract_attestation_pubkey(self, attestation_document: bytes): + # The attestation document comes as a COSE (CBOR Object Signing and Encryption) object: the CBOR + # attestation is signed and then the attestation and signature are again CBOR-encoded. For now + # we don't bother validating the signature, though in the future we could. + cose_document = cbor2_loads(attestation_document) + attestation = cbor2_loads(cose_document[2]) + public_key_bytes = attestation["public_key"] + return load_der_public_key(public_key_bytes) + def _decrypt_wrapped_key_material( self, import_state: KeyImportState, diff --git a/localstack-core/localstack/utils/crypto.py b/localstack-core/localstack/utils/crypto.py index dc9f50947c905..a7b69754d8837 100644 --- a/localstack-core/localstack/utils/crypto.py +++ b/localstack-core/localstack/utils/crypto.py @@ -4,7 +4,15 @@ import re import threading +from asn1crypto import algos, cms, core +from asn1crypto import x509 as asn1_x509 +from Crypto.Cipher import AES, PKCS1_OAEP +from Crypto.Hash import SHA256 +from Crypto.PublicKey import RSA as CryptoRSA +from Crypto.Random import get_random_bytes +from Crypto.Util.Padding import pad as crypto_pad from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from .files import TMP_FILES, file_exists_not_empty, load_file, new_tmp_file, save_file @@ -26,6 +34,11 @@ PEM_KEY_START_REGEX = r"-----BEGIN(.*)PRIVATE KEY-----" PEM_KEY_END_REGEX = r"-----END(.*)PRIVATE KEY-----" +OID_AES256_CBC = "2.16.840.1.101.3.4.1.42" +OID_MGF1 = "1.2.840.113549.1.1.8" +OID_RSAES_OAEP = "1.2.840.113549.1.1.7" +OID_SHA256 = "2.16.840.1.101.3.4.2.1" + @synchronized(lock=SSL_CERT_LOCK) def generate_ssl_cert( @@ -183,3 +196,106 @@ def decrypt( decrypted = decryptor.update(encrypted) + decryptor.finalize() decrypted = unpad(decrypted) return decrypted + + +def pkcs7_envelope_encrypt(plaintext: bytes, recipient_pubkey) -> bytes: + """ + Create a PKCS7 wrapper of some plaintext decryptable by recipient_pubkey. Uses RSA-OAEP with SHA-256 + to encrypt the AES-256-CBC content key. Hazmat's PKCS7EnvelopeBuilder doesn't support RSA-OAEP with SHA-256, + so we need to build the envelope manually with asn1crypto and PyCryptoDome. + """ + + # Get the public key to PyCryptoDome's object + pem_bytes = recipient_pubkey.public_bytes( + encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo + ) + crypto_pubkey = CryptoRSA.import_key(pem_bytes) + + # Generate the AES session key + session_key = get_random_bytes(32) + + # Encrypt the sesssion key with RSA-OAEP-SHA256 to recipient_pubkey + cipher_rsa = PKCS1_OAEP.new(crypto_pubkey, hashAlgo=SHA256) + encrypted_session_key = cipher_rsa.encrypt(session_key) + + # Encrypt the context with the session key + iv = get_random_bytes(16) + cipher_aes = AES.new(session_key, AES.MODE_CBC, iv) + padded_plaintext = crypto_pad(plaintext, AES.block_size) + encrypted_content = cipher_aes.encrypt(padded_plaintext) + + # Finally, build the envelope + + # Add the recipient + recipient_identifier = cms.RecipientIdentifier( + name="issuer_and_serial_number", + value=cms.IssuerAndSerialNumber( + { + "issuer": asn1_x509.Name.build({"common_name": "recipient"}), + "serial_number": 1, + } + ), + ) + key_enc_algorithm = cms.KeyEncryptionAlgorithm( + { + "algorithm": OID_RSAES_OAEP, + "parameters": algos.RSAESOAEPParams( + { + "hash_algorithm": algos.DigestAlgorithm( + { + "algorithm": OID_SHA256, + } + ), + "mask_gen_algorithm": algos.MaskGenAlgorithm( + { + "algorithm": OID_MGF1, + "parameters": algos.DigestAlgorithm( + { + "algorithm": OID_SHA256, + } + ), + } + ), + } + ), + } + ) + recipient_info = cms.KeyTransRecipientInfo( + { + "version": "v0", + "rid": recipient_identifier, + "key_encryption_algorithm": key_enc_algorithm, + "encrypted_key": encrypted_session_key, + } + ) + + # Add the encrypted content + content_enc_algorithm = cms.EncryptionAlgorithm( + { + "algorithm": OID_AES256_CBC, + "parameters": core.OctetString(iv), + } + ) + encrypted_content_info = cms.EncryptedContentInfo( + { + "content_type": "data", + "content_encryption_algorithm": content_enc_algorithm, + "encrypted_content": encrypted_content, + } + ) + enveloped_data = cms.EnvelopedData( + { + "version": "v0", + "recipient_infos": [recipient_info], + "encrypted_content_info": encrypted_content_info, + } + ) + + # Finally add a wrapper and return its bytes + content_info = cms.ContentInfo( + { + "content_type": "enveloped_data", + "content": enveloped_data, + } + ) + return content_info.dump() diff --git a/pyproject.toml b/pyproject.toml index 9558af558020b..ae5a2dbedbbd6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,6 +12,7 @@ description = "The core library and runtime of LocalStack" license = "Apache-2.0" requires-python = ">=3.10" dependencies = [ + "asn1crypto>=1.5.1", "click>=7.1", "cachetools>=5.0", "cryptography", @@ -20,6 +21,7 @@ dependencies = [ "dnspython>=1.16.0", "plux>=1.10", "psutil>=5.4.8", + "pycryptodome>=3.19.0", "python-dotenv>=0.19.1", "pyyaml>=5.1", "rich>=12.3.0", diff --git a/requirements-base-runtime.txt b/requirements-base-runtime.txt index d0deaabf9ffc1..e70f53f452a39 100644 --- a/requirements-base-runtime.txt +++ b/requirements-base-runtime.txt @@ -4,6 +4,8 @@ # # pip-compile --extra=base-runtime --output-file=requirements-base-runtime.txt --strip-extras --unsafe-package=distribute --unsafe-package=localstack-core --unsafe-package=pip --unsafe-package=setuptools pyproject.toml # +asn1crypto==1.5.1 + # via localstack-core (pyproject.toml) attrs==25.4.0 # via # jsonschema @@ -127,6 +129,8 @@ psutil==7.1.2 # via localstack-core (pyproject.toml) pycparser==2.23 # via cffi +pycryptodome==3.23.0 + # via localstack-core (pyproject.toml) pygments==2.19.2 # via rich pyopenssl==25.3.0 diff --git a/requirements-basic.txt b/requirements-basic.txt index 39ca1266b01ca..91a87386ee491 100644 --- a/requirements-basic.txt +++ b/requirements-basic.txt @@ -4,6 +4,8 @@ # # pip-compile --output-file=requirements-basic.txt --strip-extras --unsafe-package=distribute --unsafe-package=localstack-core --unsafe-package=pip --unsafe-package=setuptools pyproject.toml # +asn1crypto==1.5.1 + # via localstack-core (pyproject.toml) cachetools==6.2.1 # via localstack-core (pyproject.toml) certifi==2025.10.5 @@ -34,6 +36,8 @@ psutil==7.1.2 # via localstack-core (pyproject.toml) pycparser==2.23 # via cffi +pycryptodome==3.23.0 + # via localstack-core (pyproject.toml) pygments==2.19.2 # via rich python-dotenv==1.2.1 diff --git a/requirements-dev.txt b/requirements-dev.txt index 7a69afd81b8b6..2f14d3bf7529a 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -18,6 +18,10 @@ apispec==6.8.4 # via localstack-core argparse==1.4.0 # via kclpy-ext +asn1crypto==1.5.1 + # via + # localstack-core + # localstack-core (pyproject.toml) attrs==25.4.0 # via # cattrs @@ -340,6 +344,10 @@ pyasn1==0.6.1 # via rsa pycparser==2.23 # via cffi +pycryptodome==3.23.0 + # via + # localstack-core + # localstack-core (pyproject.toml) pydantic==2.12.3 # via # aws-sam-translator diff --git a/requirements-runtime.txt b/requirements-runtime.txt index 361610fb8f02b..37d49dfaedf72 100644 --- a/requirements-runtime.txt +++ b/requirements-runtime.txt @@ -16,6 +16,10 @@ apispec==6.8.4 # via localstack-core (pyproject.toml) argparse==1.4.0 # via kclpy-ext +asn1crypto==1.5.1 + # via + # localstack-core + # localstack-core (pyproject.toml) attrs==25.4.0 # via # jsonschema @@ -237,6 +241,10 @@ pyasn1==0.6.1 # via rsa pycparser==2.23 # via cffi +pycryptodome==3.23.0 + # via + # localstack-core + # localstack-core (pyproject.toml) pydantic==2.12.3 # via # aws-sam-translator diff --git a/requirements-test.txt b/requirements-test.txt index b34b16daa9ce4..b71bfca034ebc 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -18,6 +18,10 @@ apispec==6.8.4 # via localstack-core argparse==1.4.0 # via kclpy-ext +asn1crypto==1.5.1 + # via + # localstack-core + # localstack-core (pyproject.toml) attrs==25.4.0 # via # cattrs @@ -299,6 +303,10 @@ pyasn1==0.6.1 # via rsa pycparser==2.23 # via cffi +pycryptodome==3.23.0 + # via + # localstack-core + # localstack-core (pyproject.toml) pydantic==2.12.3 # via # aws-sam-translator diff --git a/requirements-typehint.txt b/requirements-typehint.txt index 1678e15dc6c00..64114c91996fb 100644 --- a/requirements-typehint.txt +++ b/requirements-typehint.txt @@ -18,6 +18,10 @@ apispec==6.8.4 # via localstack-core argparse==1.4.0 # via kclpy-ext +asn1crypto==1.5.1 + # via + # localstack-core + # localstack-core (pyproject.toml) attrs==25.4.0 # via # cattrs @@ -550,6 +554,10 @@ pyasn1==0.6.1 # via rsa pycparser==2.23 # via cffi +pycryptodome==3.23.0 + # via + # localstack-core + # localstack-core (pyproject.toml) pydantic==2.12.3 # via # aws-sam-translator diff --git a/tests/aws/services/kms/test_kms.py b/tests/aws/services/kms/test_kms.py index ed8ec0ed43bbb..d8c9ceeb28f3e 100644 --- a/tests/aws/services/kms/test_kms.py +++ b/tests/aws/services/kms/test_kms.py @@ -6,9 +6,15 @@ from datetime import datetime from random import getrandbits +import cbor2 import pytest +from asn1crypto import cms from botocore.config import Config from botocore.exceptions import ClientError +from Crypto.Cipher import AES, PKCS1_OAEP +from Crypto.Hash import SHA256 +from Crypto.PublicKey import RSA as CryptoRSA +from Crypto.Util.Padding import unpad as crypto_unpad from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes, hmac, serialization from cryptography.hazmat.primitives.asymmetric import ec, padding, rsa, utils @@ -2096,6 +2102,110 @@ def test_encrypt_decrypt_encryption_context(self, kms_create_key, snapshot, aws_ ) snapshot.match("decrypt_response_with_invalid_ciphertext", e.value.response) + @markers.aws.only_localstack + def test_decrypt_recipient(self, kms_create_key, snapshot, aws_client): + """ + Test that decryption to a Nitro recipient creates a correct PKCS7 envelope to that recipient. This is done + as an only-Localstack test because of the difficulty of spinning up a true Nitro enclave to generate a real + attestation for real AWS. + """ + + # Setup + key_id = kms_create_key(KeyUsage="ENCRYPT_DECRYPT", KeySpec="SYMMETRIC_DEFAULT")["KeyId"] + message = b"test message 123 !%$@ 1234567890" + ciphertext = aws_client.kms.encrypt( + KeyId=key_id, + Plaintext=base64.b64encode(message), + EncryptionAlgorithm="SYMMETRIC_DEFAULT", + )["CiphertextBlob"] + + # Set up our fake attestation. For now we just build a mini-attestation with only the public key, and + # skipping the other fields in it and the wrapping signature; in the future we could provide a more complete one. + rsa_key = rsa.generate_private_key( + public_exponent=65537, + key_size=2048, + backend=default_backend(), + ) + der_public_key = rsa_key.public_key().public_bytes( + encoding=serialization.Encoding.DER, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ) + inner_attestation = cbor2.dumps({"public_key": der_public_key}) + wrapped_attestation = cbor2.dumps(["", "", inner_attestation, ""]) + recipient = {"AttestationDocument": wrapped_attestation} + + # Do the KMS call + ciphertext_for_recipient = aws_client.kms.decrypt( + KeyId=key_id, + CiphertextBlob=ciphertext, + EncryptionAlgorithm="SYMMETRIC_DEFAULT", + Recipient=recipient, + )["CiphertextForRecipient"] + + # Decrypt the PKCS7 envelope to recover the plaintext. Hazmat's pkcs7_decrypt_pem() doesn't support the RSA-OAEP + # with SHA-256 that AWS uses, so we have to do it manually. + + # Parse the PKCS7 envelope + content_info = cms.ContentInfo.load(ciphertext_for_recipient) + enveloped_data = content_info["content"] + + # Extract the encrypted session key and encrypted content + recipient_info = enveloped_data["recipient_infos"][0].chosen + encrypted_session_key = recipient_info["encrypted_key"].native + encrypted_content_info = enveloped_data["encrypted_content_info"] + encrypted_content = encrypted_content_info["encrypted_content"].native + + # Get the IV from the content encryption algorithm parameters + iv = encrypted_content_info["content_encryption_algorithm"]["parameters"].native + + # Convert the private key to PyCryptoDome format + pem_bytes = rsa_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ) + crypto_privkey = CryptoRSA.import_key(pem_bytes) + + # Decrypt the session key using RSA-OAEP with SHA-256 + cipher_rsa = PKCS1_OAEP.new(crypto_privkey, hashAlgo=SHA256) + session_key = cipher_rsa.decrypt(encrypted_session_key) + + # Decrypt the content using the session key and AES-256-CBC + cipher_aes = AES.new(session_key, AES.MODE_CBC, iv) + padded_plaintext = cipher_aes.decrypt(encrypted_content) + plaintext = crypto_unpad(padded_plaintext, AES.block_size) + + # Make sure we did the decryption correctly + assert base64.b64decode(plaintext) == message + + @markers.aws.only_localstack + def test_decrypt_recipient_invalid_attestation(self, kms_create_key, snapshot, aws_client): + """ + Test that if we are unable to extract the public key from the attestation, we ignore the recipient + and provide the plaintext in the response. + """ + + # Setup + key_id = kms_create_key(KeyUsage="ENCRYPT_DECRYPT", KeySpec="SYMMETRIC_DEFAULT")["KeyId"] + message = b"test message 123 !%$@ 1234567890" + ciphertext = aws_client.kms.encrypt( + KeyId=key_id, + Plaintext=base64.b64encode(message), + EncryptionAlgorithm="SYMMETRIC_DEFAULT", + )["CiphertextBlob"] + + # Invalid attestation document + recipient = {"AttestationDocument": base64.b64encode(b"invalidattestation")} + plaintext = aws_client.kms.decrypt( + KeyId=key_id, + CiphertextBlob=ciphertext, + EncryptionAlgorithm="SYMMETRIC_DEFAULT", + Recipient=recipient, + )["Plaintext"] + + # Still get the plaintext back + assert base64.b64decode(plaintext) == message + @markers.aws.validated def test_get_parameters_for_import(self, kms_create_key, snapshot, aws_client): sign_verify_key = kms_create_key( From 6d5101dfb73f06133bdf043d24af49fa27f7a976 Mon Sep 17 00:00:00 2001 From: Marcus Boorstin Date: Mon, 10 Nov 2025 09:39:09 -0500 Subject: [PATCH 2/2] Update for code review comments --- .../localstack/services/kms/provider.py | 5 +- localstack-core/localstack/utils/crypto.py | 49 ++++++++----------- pyproject.toml | 1 - requirements-base-runtime.txt | 2 - requirements-basic.txt | 2 - requirements-dev.txt | 4 -- requirements-runtime.txt | 4 -- requirements-test.txt | 4 -- requirements-typehint.txt | 4 -- tests/aws/services/kms/test_kms.py | 36 ++++++-------- 10 files changed, 39 insertions(+), 72 deletions(-) diff --git a/localstack-core/localstack/services/kms/provider.py b/localstack-core/localstack/services/kms/provider.py index 5e73d1788b282..3d930d2f321a1 100644 --- a/localstack-core/localstack/services/kms/provider.py +++ b/localstack-core/localstack/services/kms/provider.py @@ -4,11 +4,12 @@ import logging import os -from cbor2._decoder import loads as cbor2_loads +from cbor2 import loads as cbor2_loads from cryptography.exceptions import InvalidTag from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes, keywrap from cryptography.hazmat.primitives.asymmetric import padding +from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey from cryptography.hazmat.primitives.serialization import load_der_public_key from localstack.aws.api import CommonServiceException, RequestContext, handler @@ -1588,7 +1589,7 @@ def _validate_grant_request(self, data: dict): f" constraint: [Member must satisfy enum value set: {VALID_OPERATIONS}]" ) - def _extract_attestation_pubkey(self, attestation_document: bytes): + def _extract_attestation_pubkey(self, attestation_document: bytes) -> RSAPublicKey: # The attestation document comes as a COSE (CBOR Object Signing and Encryption) object: the CBOR # attestation is signed and then the attestation and signature are again CBOR-encoded. For now # we don't bother validating the signature, though in the future we could. diff --git a/localstack-core/localstack/utils/crypto.py b/localstack-core/localstack/utils/crypto.py index a7b69754d8837..70de3cec0da2b 100644 --- a/localstack-core/localstack/utils/crypto.py +++ b/localstack-core/localstack/utils/crypto.py @@ -6,13 +6,11 @@ from asn1crypto import algos, cms, core from asn1crypto import x509 as asn1_x509 -from Crypto.Cipher import AES, PKCS1_OAEP -from Crypto.Hash import SHA256 -from Crypto.PublicKey import RSA as CryptoRSA -from Crypto.Random import get_random_bytes -from Crypto.Util.Padding import pad as crypto_pad from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives import padding as sym_padding +from cryptography.hazmat.primitives.asymmetric import padding +from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from .files import TMP_FILES, file_exists_not_empty, load_file, new_tmp_file, save_file @@ -198,35 +196,30 @@ def decrypt( return decrypted -def pkcs7_envelope_encrypt(plaintext: bytes, recipient_pubkey) -> bytes: +def pkcs7_envelope_encrypt(plaintext: bytes, recipient_pubkey: RSAPublicKey) -> bytes: """ Create a PKCS7 wrapper of some plaintext decryptable by recipient_pubkey. Uses RSA-OAEP with SHA-256 to encrypt the AES-256-CBC content key. Hazmat's PKCS7EnvelopeBuilder doesn't support RSA-OAEP with SHA-256, - so we need to build the envelope manually with asn1crypto and PyCryptoDome. + so we need to build the pieces manually and then put them together in an envelope with asn1crypto. """ - # Get the public key to PyCryptoDome's object - pem_bytes = recipient_pubkey.public_bytes( - encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo + # Encrypt the plaintext with an AES session key, then encrypt the session key to the recipient_pubkey + session_key = os.urandom(32) + iv = os.urandom(16) + encrypted_session_key = recipient_pubkey.encrypt( + session_key, + padding.OAEP( + mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None + ), ) - crypto_pubkey = CryptoRSA.import_key(pem_bytes) - - # Generate the AES session key - session_key = get_random_bytes(32) - - # Encrypt the sesssion key with RSA-OAEP-SHA256 to recipient_pubkey - cipher_rsa = PKCS1_OAEP.new(crypto_pubkey, hashAlgo=SHA256) - encrypted_session_key = cipher_rsa.encrypt(session_key) - - # Encrypt the context with the session key - iv = get_random_bytes(16) - cipher_aes = AES.new(session_key, AES.MODE_CBC, iv) - padded_plaintext = crypto_pad(plaintext, AES.block_size) - encrypted_content = cipher_aes.encrypt(padded_plaintext) - - # Finally, build the envelope + cipher = Cipher(algorithms.AES(session_key), modes.CBC(iv), backend=default_backend()) + encryptor = cipher.encryptor() + padder = sym_padding.PKCS7(algorithms.AES.block_size).padder() + padded_plaintext = padder.update(plaintext) + padder.finalize() + encrypted_content = encryptor.update(padded_plaintext) + encryptor.finalize() - # Add the recipient + # Now put together the envelope. + # Add the recipient with their copy of the session key recipient_identifier = cms.RecipientIdentifier( name="issuer_and_serial_number", value=cms.IssuerAndSerialNumber( diff --git a/pyproject.toml b/pyproject.toml index ae5a2dbedbbd6..c88c302f073d7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,7 +21,6 @@ dependencies = [ "dnspython>=1.16.0", "plux>=1.10", "psutil>=5.4.8", - "pycryptodome>=3.19.0", "python-dotenv>=0.19.1", "pyyaml>=5.1", "rich>=12.3.0", diff --git a/requirements-base-runtime.txt b/requirements-base-runtime.txt index e70f53f452a39..027a6852b4674 100644 --- a/requirements-base-runtime.txt +++ b/requirements-base-runtime.txt @@ -129,8 +129,6 @@ psutil==7.1.2 # via localstack-core (pyproject.toml) pycparser==2.23 # via cffi -pycryptodome==3.23.0 - # via localstack-core (pyproject.toml) pygments==2.19.2 # via rich pyopenssl==25.3.0 diff --git a/requirements-basic.txt b/requirements-basic.txt index 91a87386ee491..49d93fe44eefb 100644 --- a/requirements-basic.txt +++ b/requirements-basic.txt @@ -36,8 +36,6 @@ psutil==7.1.2 # via localstack-core (pyproject.toml) pycparser==2.23 # via cffi -pycryptodome==3.23.0 - # via localstack-core (pyproject.toml) pygments==2.19.2 # via rich python-dotenv==1.2.1 diff --git a/requirements-dev.txt b/requirements-dev.txt index 2f14d3bf7529a..daba7e163da71 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -344,10 +344,6 @@ pyasn1==0.6.1 # via rsa pycparser==2.23 # via cffi -pycryptodome==3.23.0 - # via - # localstack-core - # localstack-core (pyproject.toml) pydantic==2.12.3 # via # aws-sam-translator diff --git a/requirements-runtime.txt b/requirements-runtime.txt index 37d49dfaedf72..0c9c5bd07ca58 100644 --- a/requirements-runtime.txt +++ b/requirements-runtime.txt @@ -241,10 +241,6 @@ pyasn1==0.6.1 # via rsa pycparser==2.23 # via cffi -pycryptodome==3.23.0 - # via - # localstack-core - # localstack-core (pyproject.toml) pydantic==2.12.3 # via # aws-sam-translator diff --git a/requirements-test.txt b/requirements-test.txt index b71bfca034ebc..96cefd854cc33 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -303,10 +303,6 @@ pyasn1==0.6.1 # via rsa pycparser==2.23 # via cffi -pycryptodome==3.23.0 - # via - # localstack-core - # localstack-core (pyproject.toml) pydantic==2.12.3 # via # aws-sam-translator diff --git a/requirements-typehint.txt b/requirements-typehint.txt index 64114c91996fb..d379e0887eceb 100644 --- a/requirements-typehint.txt +++ b/requirements-typehint.txt @@ -554,10 +554,6 @@ pyasn1==0.6.1 # via rsa pycparser==2.23 # via cffi -pycryptodome==3.23.0 - # via - # localstack-core - # localstack-core (pyproject.toml) pydantic==2.12.3 # via # aws-sam-translator diff --git a/tests/aws/services/kms/test_kms.py b/tests/aws/services/kms/test_kms.py index d8c9ceeb28f3e..acf024206e866 100644 --- a/tests/aws/services/kms/test_kms.py +++ b/tests/aws/services/kms/test_kms.py @@ -11,13 +11,10 @@ from asn1crypto import cms from botocore.config import Config from botocore.exceptions import ClientError -from Crypto.Cipher import AES, PKCS1_OAEP -from Crypto.Hash import SHA256 -from Crypto.PublicKey import RSA as CryptoRSA -from Crypto.Util.Padding import unpad as crypto_unpad from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes, hmac, serialization from cryptography.hazmat.primitives.asymmetric import ec, padding, rsa, utils +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives.keywrap import aes_key_wrap_with_padding from cryptography.hazmat.primitives.serialization import load_der_public_key @@ -2103,7 +2100,7 @@ def test_encrypt_decrypt_encryption_context(self, kms_create_key, snapshot, aws_ snapshot.match("decrypt_response_with_invalid_ciphertext", e.value.response) @markers.aws.only_localstack - def test_decrypt_recipient(self, kms_create_key, snapshot, aws_client): + def test_decrypt_recipient(self, kms_create_key, aws_client): """ Test that decryption to a Nitro recipient creates a correct PKCS7 envelope to that recipient. This is done as an only-Localstack test because of the difficulty of spinning up a true Nitro enclave to generate a real @@ -2143,7 +2140,8 @@ def test_decrypt_recipient(self, kms_create_key, snapshot, aws_client): )["CiphertextForRecipient"] # Decrypt the PKCS7 envelope to recover the plaintext. Hazmat's pkcs7_decrypt_pem() doesn't support the RSA-OAEP - # with SHA-256 that AWS uses, so we have to do it manually. + # with SHA-256 that AWS uses, so we use as1ncrypto to unpack the envelope and then Hazmat to decrypt + # the session key and then the plaintext. # Parse the PKCS7 envelope content_info = cms.ContentInfo.load(ciphertext_for_recipient) @@ -2158,28 +2156,24 @@ def test_decrypt_recipient(self, kms_create_key, snapshot, aws_client): # Get the IV from the content encryption algorithm parameters iv = encrypted_content_info["content_encryption_algorithm"]["parameters"].native - # Convert the private key to PyCryptoDome format - pem_bytes = rsa_key.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.PKCS8, - encryption_algorithm=serialization.NoEncryption(), + # Decrypt the session key + session_key = rsa_key.decrypt( + encrypted_session_key, + padding.OAEP( + mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None + ), ) - crypto_privkey = CryptoRSA.import_key(pem_bytes) - - # Decrypt the session key using RSA-OAEP with SHA-256 - cipher_rsa = PKCS1_OAEP.new(crypto_privkey, hashAlgo=SHA256) - session_key = cipher_rsa.decrypt(encrypted_session_key) - # Decrypt the content using the session key and AES-256-CBC - cipher_aes = AES.new(session_key, AES.MODE_CBC, iv) - padded_plaintext = cipher_aes.decrypt(encrypted_content) - plaintext = crypto_unpad(padded_plaintext, AES.block_size) + # Decrypt the content using the session key and iv + cipher = Cipher(algorithms.AES(session_key), modes.CBC(iv), backend=default_backend()) + decryptor = cipher.decryptor() + plaintext = decryptor.update(encrypted_content) + decryptor.finalize() # Make sure we did the decryption correctly assert base64.b64decode(plaintext) == message @markers.aws.only_localstack - def test_decrypt_recipient_invalid_attestation(self, kms_create_key, snapshot, aws_client): + def test_decrypt_recipient_invalid_attestation(self, kms_create_key, aws_client): """ Test that if we are unable to extract the public key from the attestation, we ignore the recipient and provide the plaintext in the response.