"""
Security plugin for Bedrock Agents SDK.
"""
from bedrock_agents_sdk.plugins.base import AgentPlugin
[docs]
class SecurityPlugin(AgentPlugin):
"""Plugin for adding security features to Bedrock Agents"""
def __init__(self, customer_encryption_key_arn=None):
"""
Initialize the security plugin
Args:
customer_encryption_key_arn: The ARN of the KMS key to use for encryption
"""
self.customer_encryption_key_arn = customer_encryption_key_arn
[docs]
def pre_invoke(self, params):
"""Add KMS key ARN to the request parameters"""
if self.customer_encryption_key_arn and "customerEncryptionKeyArn" not in params:
params["customerEncryptionKeyArn"] = self.customer_encryption_key_arn
return params
[docs]
def pre_deploy(self, template):
"""Add KMS key ARN to the agent configuration in the SAM template"""
if "Resources" in template and "BedrockAgent" in template["Resources"]:
agent_props = template["Resources"]["BedrockAgent"]["Properties"]
# Add KMS key ARN to the agent configuration
if self.customer_encryption_key_arn and "customerEncryptionKeyArn" not in agent_props:
agent_props["customerEncryptionKeyArn"] = self.customer_encryption_key_arn
# Add IAM permissions for KMS key
if self.customer_encryption_key_arn and "BedrockAgentRole" in template["Resources"]:
role_props = template["Resources"]["BedrockAgentRole"]["Properties"]
# Get the policy document
if "Policies" in role_props:
for policy in role_props["Policies"]:
if "PolicyDocument" in policy and "Statement" in policy["PolicyDocument"]:
statements = policy["PolicyDocument"]["Statement"]
# Add KMS permissions
kms_statement = {
"Effect": "Allow",
"Action": [
"kms:Decrypt",
"kms:GenerateDataKey"
],
"Resource": self.customer_encryption_key_arn
}
# Check if statement already exists
if not any(self._is_same_kms_resource(stmt, self.customer_encryption_key_arn) for stmt in statements):
statements.append(kms_statement)
return template
def _is_same_kms_resource(self, statement, kms_arn):
"""Check if a statement refers to the same KMS resource"""
resource = statement.get("Resource")
return resource == kms_arn