System Design: AI Customer Support with AWS Bedrock
Design an AI-powered customer support system using AWS Bedrock. Covers multi-model routing, RAG with Knowledge Base, Guardrails, conversation management, and production deployment.
Question
System Design: AI Customer Support with AWS Bedrock
Difficulty: senior Estimated Time: 155 minutes Tags: AWS, Bedrock, System Design, LLM, RAG, Production ML, Claude, Knowledge Base
Part 1: Problem Statement
Problem Statement: AI-Powered Customer Support System
The Scenario
You are a senior ML engineer at a global e-commerce company. The VP of Customer Experience approaches you with a challenge:
"Our customer support team is overwhelmed. We're handling 10,000+ tickets daily, with 40% being simple questions about orders, returns, and product information. We need an AI-powered solution that can handle routine inquiries automatically while ensuring complex issues reach human agents. Oh, and we operate in 5 countries, so we need multilingual support."
Your task: Design a production-grade AI customer support system using AWS Bedrock.
Functional Requirements
Core Capabilities
- Conversational AI Interface: Natural language chat widget embedded in website and mobile app
- Knowledge-Based Answers: Answer questions using product documentation, FAQs, return policies, and shipping information
- Order Status Lookups: Retrieve real-time order information from backend systems
- Multilingual Support: Handle conversations in English, Spanish, Chinese (Simplified), French, and German
- Smart Escalation: Route complex issues to human agents with full conversation context
- Conversation History: Maintain context across multiple turns and sessions
Secondary Capabilities
- Conversation Quality Scoring: Rate each interaction for continuous improvement
- Customer Satisfaction Tracking: Collect and analyze CSAT scores
- Agent Dashboard: Provide human agents with AI-suggested responses
- Analytics & Reporting: Track resolution rates, common topics, and system performance
Non-Functional Requirements
Performance
| Metric | Target | Rationale |
|---|---|---|
| Response Latency (p50) | < 1.5 seconds | User expectation for chat |
| Response Latency (p95) | < 3 seconds | Acceptable wait time |
| Response Latency (p99) | < 5 seconds | Maximum before timeout |
| Availability | 99.9% | Critical customer touchpoint |
| Throughput | 100+ req/sec peak | Handle traffic spikes |
Scale Requirements
- Daily Volume: 10,000+ conversations
- Concurrent Users: 500+ simultaneous chats
- Peak Load: 10x normal during sales events (Black Friday, Prime Day)
- Knowledge Base Size: 50,000+ documents (product pages, FAQs, policies)
- Message History: 90 days retention for compliance
Compliance & Security
- GDPR Compliance: Right to deletion, data portability, consent management
- PII Protection: Detect and redact sensitive information
- Data Residency: Keep EU customer data in EU regions
- Audit Logging: Full trail of all AI decisions and escalations
- Access Control: Role-based access for support agents and admins
Why AWS Bedrock?
Before diving into the design, let's understand why Bedrock is a strong choice:
| Requirement | How Bedrock Helps |
|---|---|
| Enterprise Security | VPC endpoints, PrivateLink, no data leaving AWS |
| Compliance | SOC2, HIPAA, GDPR certifications inherited from AWS |
| Multi-model Flexibility | Switch between Claude, Titan, Llama without code changes |
| Managed RAG | Knowledge Bases handle ingestion, chunking, retrieval |
| Content Safety | Guardrails for PII detection, content filtering |
| Native Integration | Seamless with Lambda, DynamoDB, S3, CloudWatch |
Interview Tip
When given a system design question, always start by clarifying requirements. Ask about scale, latency targets, compliance needs, and budget constraints. This shows you think about production realities, not just happy-path demos.
What We'll Design
In the following sections, we'll build:
- High-Level Architecture: End-to-end system with all AWS components
- Component Deep-Dives: Detailed design of each major subsystem
- Integration Patterns: Reusable patterns for Bedrock integration
- Scaling Strategy: How to handle 10x traffic spikes
- Trade-off Analysis: Why Bedrock vs alternatives
- Interview Q&As: 30 questions you might be asked
- Production Code: Complete, working examples
Let's begin with the architecture.
Part 2: High-Level Architecture
High-Level Architecture
System Architecture Diagram
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β AI Customer Support System β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββ βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Web/Mobile β β API Layer β β
β β Chat Widget ββββββΆβ βββββββββββββββ βββββββββββββββ βββββββββββββββ β β
β β (React/Next) β β β API Gateway ββββ Lambda ββββ WebSocket β β β
β βββββββββββββββββββ β β (REST/WS) β β Functions β β API β β β
β β βββββββββββββββ ββββββββ¬βββββββ βββββββββββββββ β β
β ββββββββββββββββββββββββββββΌβββββββββββββββββββββββββββ β
β β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββΌββββββββββββββββββββββββββββ β
β β Bedrock Services β β β
β β βββββββββββββββββββ βββββββββββββββββββ ββββββ΄βββββββββ ββββββββββββββ β β
β β β Knowledge Base β β Guardrails β β Bedrock β β Bedrock β β β
β β β (RAG Engine) β β (Safety Layer) β β Runtime β β Agents β β β
β β β β β β β β β β β β
β β β β’ S3 Documents β β β’ PII Detection β β β’ Claude 3 β β β’ Tools β β β
β β β β’ OpenSearch β β β’ Content Filterβ β β’ Haiku β β β’ Actions β β β
β β β β’ Titan Embed β β β’ Topic Deny β β β’ Titan β β β’ Memory β β β
β β ββββββββββ¬βββββββββ ββββββββββ¬βββββββββ ββββββββ¬βββββββ βββββββ¬βββββββ β β
β β β β β β β β
β βββββββββββββΌβββββββββββββββββββββΌβββββββββββββββββββΌββββββββββββββββΌβββββββββββ β
β β β β β β
β βββββββββββββ΄βββββββββββββββββββββ΄βββββββββββββββββββ΄ββββββββββββββββ΄βββββββββββ β
β β Data & State Layer β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββββββββββ β β
β β β DynamoDB β β S3 β β Secrets β β ElastiCache β β β
β β β β β β β Manager β β (Redis) β β β
β β β β’ Sessions β β β’ Documents β β β β β β β
β β β β’ History β β β’ Logs β β β’ API Keys β β β’ Response Cache β β β
β β β β’ Analytics β β β’ Exports β β β’ Configs β β β’ Session State β β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β External Integrations β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββββββββββ β β
β β β Order API β β CRM/Zendeskβ β Translationβ β Human Agent β β β
β β β (Internal) β β (Escalate) β β (Amazon β β Dashboard β β β
β β β β β β β Translate)β β β β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Monitoring & Observability β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββββββββββ β β
β β β CloudWatch β β X-Ray β β CloudTrail β β Cost Explorer β β β
β β β (Metrics) β β (Tracing) β β (Audit) β β (Billing) β β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Data Flow: User Query to Response
ββββββββ βββββββββββ ββββββββββ ββββββββββββ βββββββββββ ββββββββββ
β User βββββΆβ Widget βββββΆβ API βββββΆβ Lambda βββββΆβ Router βββββΆβ Bedrockβ
β β β β βGateway β β β β β β β
ββββββββ βββββββββββ ββββββββββ ββββββββββββ βββββββββββ ββββββββββ
β β β
βΌ βΌ βΌ
ββββββββββββ ββββββββββββ ββββββββββββ
β Session β β KB β β Claude/ β
β (Dynamo) β β Retrievalβ β Haiku β
ββββββββββββ ββββββββββββ ββββββββββββ
β β
βΌ βΌ
ββββββββββββ ββββββββββββ
β Context ββββΆβ Generate β
β Assembly β β Response β
ββββββββββββ ββββββββββββ
β
ββββββββββββββββββββββββββββββ
βΌ
ββββββββββββ ββββββββββββ ββββββββ
βGuardrailsβββββΆβ Stream βββββΆβ User β
β (Safety) β β Response β β β
ββββββββββββ ββββββββββββ ββββββββ
Component Responsibilities
1. Client Layer (Chat Widget)
Technology: React/Next.js with WebSocket support
Responsibilities:
- Render chat interface
- Handle WebSocket connection for streaming
- Manage local message state
- Support file uploads (screenshots, documents)
- Detect user language preference
- Collect CSAT feedback
2. API Layer
Technology: API Gateway (REST + WebSocket) + Lambda
Responsibilities:
- Authenticate requests (Cognito/API Key)
- Rate limiting and throttling
- Request validation
- Route to appropriate Lambda handlers
- WebSocket connection management for streaming
3. Orchestration Layer (Lambda)
Technology: Python Lambda functions
Responsibilities:
- Load/create conversation session
- Determine query type (simple vs complex)
- Route to appropriate model (Haiku for simple, Sonnet for complex)
- Retrieve relevant context from Knowledge Base
- Assemble prompt with history + context
- Call Bedrock and stream response
- Apply Guardrails for safety
- Detect escalation triggers
- Log analytics events
4. Bedrock Services
Knowledge Base
- Purpose: Store and retrieve product documentation
- Documents: Product pages, FAQs, return policies, shipping info
- Vector Store: Amazon OpenSearch Serverless
- Embeddings: Amazon Titan Embeddings v2
Guardrails
- Purpose: Ensure safe, compliant responses
- Features: PII detection, content filtering, denied topics
- Applied: On both input and output
Runtime (Models)
- Claude 3.5 Sonnet: Complex queries, nuanced responses
- Claude 3 Haiku: Simple queries, fast responses, routing
- Amazon Titan: Embeddings, backup model
Agents (Optional)
- Purpose: Handle multi-step tasks
- Tools: Order lookup, refund processing, appointment scheduling
- When to Use: Actions requiring backend system integration
5. Data Layer
DynamoDB Tables
conversations
βββ PK: user_id
βββ SK: conversation_id
βββ messages: [...]
βββ created_at: timestamp
βββ language: string
βββ status: active|escalated|resolved
βββ metadata: {...}
session_state
βββ PK: session_id
βββ context: {...}
βββ turn_count: number
βββ ttl: timestamp
S3 Buckets
support-kb-documents/ # Knowledge base source
βββ products/
βββ faqs/
βββ policies/
βββ shipping/
support-conversation-logs/ # Audit trail
βββ 2024/01/01/
βββ ...
ElastiCache (Redis)
- Response caching (common questions)
- Session state (fast access)
- Rate limiting counters
6. External Integrations
| System | Purpose | Integration Method |
|---|---|---|
| Order API | Retrieve order status | REST API (VPC) |
| Zendesk | Escalate to human | Zendesk API |
| Amazon Translate | Language detection/translation | AWS SDK |
| SNS/SQS | Event notifications | AWS SDK |
Technology Choices: Rationale
Why API Gateway + Lambda (vs ECS/EKS)?
- Serverless: Auto-scaling, no infrastructure management
- Cost: Pay per request, ideal for variable load
- Integration: Native WebSocket support
- Cold Start: Acceptable for chat (<500ms with provisioned concurrency)
Why DynamoDB (vs Aurora)?
- Scale: Handles 10K+ writes/sec without provisioning
- Latency: Single-digit ms reads
- Serverless: On-demand capacity mode
- TTL: Built-in expiration for session data
Why OpenSearch Serverless (vs Pinecone/Weaviate)?
- Managed: No cluster management
- Integration: Native Bedrock KB support
- Security: VPC, encryption, IAM
- Cost: Pay per OCU-hour, scales to zero
Why Claude 3.5 Sonnet (vs GPT-4)?
- Quality: Excellent at nuanced, helpful responses
- Bedrock Native: No external API calls needed
- Context Window: 200K tokens for long conversations
- Speed: Faster than GPT-4 with similar quality
- Cost: Competitive pricing in Bedrock
Interview Tip
When presenting architecture, always explain WHY you chose each component, not just WHAT you chose. Interviewers want to see your decision-making process and understanding of trade-offs.
Part 3: Component Deep-Dives
Component Deep-Dives
3.1 Bedrock Knowledge Base Setup
Document Ingestion Pipeline
ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ
β Source βββββΆβ S3 βββββΆβ Bedrock βββββΆβ OpenSearch β
β Documents β β Bucket β β Ingestion β β Vectors β
β β β β β β β β
β β’ Confluence β β β’ Raw docs β β β’ Chunking β β β’ Embeddings β
β β’ Notion β β β’ PDFs β β β’ Embedding β β β’ Metadata β
β β’ CMS β β β’ HTML β β β’ Indexing β β β’ Search β
ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ
β β
β ββββββββββββββββ β
ββββββββββββΆβ Lambda βββββββββββββ
β Trigger β
β β
β β’ S3 Events β
β β’ Scheduled β
β β’ Manual β
ββββββββββββββββ
Chunking Strategy
Chunking is critical for RAG quality. Bad chunks = bad retrieval = bad answers.
| Strategy | Chunk Size | Overlap | Best For |
|---|---|---|---|
| Fixed Size | 512 tokens | 50 tokens | Simple docs |
| Semantic | Variable | Context-aware | Technical docs |
| Hierarchical | Parent/Child | Linked | Long documents |
Our Choice: Hierarchical Chunking
# Bedrock KB Configuration
chunking_config = {
"chunkingStrategy": "HIERARCHICAL",
"hierarchicalChunkingConfiguration": {
"levelConfigurations": [
{"maxTokens": 1500}, # Parent chunks (broad context)
{"maxTokens": 300} # Child chunks (precise retrieval)
],
"overlapTokens": 60
}
}
Why Hierarchical?
- Parent chunks: Provide broader context to the model
- Child chunks: Enable precise retrieval
- Result: Model sees both the specific answer AND surrounding context
Embedding Model Selection
| Model | Dimensions | Max Tokens | Cost | Quality |
|---|---|---|---|---|
| Titan Embeddings v2 | 1024 | 8192 | $0.00002/1K tokens | Excellent |
| Cohere Embed v3 | 1024 | 512 | $0.00010/1K tokens | Excellent |
| Titan Embeddings v1 | 1536 | 8192 | $0.00010/1K tokens | Good |
Our Choice: Titan Embeddings v2
Reasons:
- 8K token support: Can embed entire pages
- Cost: 5x cheaper than Cohere
- Native integration: No additional setup
- Quality: On par with leading models
Vector Store: OpenSearch Serverless
# Collection configuration
collection_config = {
"name": "customer-support-kb",
"type": "VECTORSEARCH",
"description": "Product knowledge base vectors"
}
# Index configuration (created by Bedrock)
index_mapping = {
"settings": {
"index.knn": True,
"index.knn.algo_param.ef_search": 512
},
"mappings": {
"properties": {
"bedrock-knowledge-base-default-vector": {
"type": "knn_vector",
"dimension": 1024,
"method": {
"name": "hnsw",
"engine": "nmslib",
"space_type": "cosinesimil"
}
}
}
}
}
Why OpenSearch Serverless over Aurora pgvector?
- No provisioning: Scales automatically
- Native support: Bedrock KB integrates directly
- Performance: HNSW algorithm for fast ANN search
- Cost: Pay per OCU-hour, can scale to zero
Sync and Update Strategy
import boto3
def sync_knowledge_base():
"""Trigger KB sync after document updates"""
client = boto3.client('bedrock-agent')
response = client.start_ingestion_job(
knowledgeBaseId='KB_ID',
dataSourceId='DS_ID',
description='Scheduled sync'
)
return response['ingestionJob']['ingestionJobId']
# Schedule: Every 6 hours via EventBridge
# On-demand: S3 event trigger for urgent updates
3.2 LLM Selection and Configuration
Model Comparison for Customer Support
| Model | Latency (p50) | Cost/1K tokens | Context | Best Use Case |
|---|---|---|---|---|
| Claude 3.5 Sonnet | ~1.5s | $3/$15 (in/out) | 200K | Complex queries |
| Claude 3 Haiku | ~0.4s | $0.25/$1.25 | 200K | Simple queries, routing |
| Claude 3 Opus | ~3s | $15/$75 | 200K | Not recommended (cost) |
| Amazon Titan Text | ~0.8s | $0.50/$1.50 | 8K | Backup/fallback |
| Llama 3.1 70B | ~2s | $2.65/$3.50 | 128K | Alternative to Claude |
Multi-Model Routing Strategy
def route_to_model(query: str, session: dict) -> str:
"""Route query to appropriate model based on complexity"""
# Use Haiku to classify query complexity
classification = classify_query(query)
if classification['type'] == 'simple':
# FAQ, order status, basic questions
return 'anthropic.claude-3-haiku-20240307-v1:0'
elif classification['type'] == 'complex':
# Technical issues, complaints, nuanced requests
return 'anthropic.claude-3-5-sonnet-20241022-v2:0'
elif classification['type'] == 'escalate':
# Human handoff needed
return 'ESCALATE_TO_HUMAN'
else:
# Default to Sonnet for safety
return 'anthropic.claude-3-5-sonnet-20241022-v2:0'
def classify_query(query: str) -> dict:
"""Use Haiku to classify query type (fast, cheap)"""
prompt = f"""Classify this customer query:
Query: {query}
Categories:
- simple: FAQ, order status, basic product info
- complex: Technical issues, complaints, returns, refunds
- escalate: Angry customer, legal issues, sensitive topics
Respond with JSON: {{"type": "simple|complex|escalate", "confidence": 0.0-1.0}}"""
response = bedrock.invoke_model(
modelId='anthropic.claude-3-haiku-20240307-v1:0',
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 100,
"messages": [{"role": "user", "content": prompt}]
})
)
return json.loads(response['body'].read())
Prompt Engineering Best Practices
SYSTEM_PROMPT = """You are a helpful customer support agent for TechMart, an online electronics retailer.
## Your Role
- Answer customer questions accurately using the provided knowledge base
- Be friendly, professional, and concise
- If you don't know something, say so honestly
- Never make up information about orders, policies, or products
## Guidelines
1. Start with a brief, direct answer
2. Provide supporting details if helpful
3. Offer next steps when appropriate
4. Keep responses under 150 words unless the question requires more detail
## Tone
- Professional but warm
- Empathetic when customer is frustrated
- Clear and jargon-free
## Escalation Triggers
Escalate to human agent if:
- Customer explicitly requests human help
- Issue involves refunds over $500
- Customer mentions legal action
- You cannot resolve after 3 attempts
- Conversation becomes hostile
## Language
Respond in the same language as the customer's query.
"""
def build_prompt(query: str, context: list, history: list) -> str:
"""Assemble complete prompt with context and history"""
# Format retrieved context
context_text = "\n\n".join([
f"[Source: {doc['source']}]\n{doc['content']}"
for doc in context
])
# Format conversation history (last 5 turns)
history_text = "\n".join([
f"{msg['role'].upper()}: {msg['content']}"
for msg in history[-10:] # Last 5 exchanges
])
return f"""## Knowledge Base Context
{context_text}
## Conversation History
{history_text}
## Current Query
{query}
Provide a helpful response based on the context above."""
3.3 Conversation Management
Session State Schema (DynamoDB)
# DynamoDB table: conversations
{
"user_id": "user_123", # Partition key
"conversation_id": "conv_456", # Sort key
"created_at": "2024-01-15T10:30:00Z",
"updated_at": "2024-01-15T10:35:00Z",
"status": "active", # active | escalated | resolved
"language": "en",
"channel": "web", # web | mobile | email
"messages": [
{
"id": "msg_001",
"role": "user",
"content": "Where is my order?",
"timestamp": "2024-01-15T10:30:00Z"
},
{
"id": "msg_002",
"role": "assistant",
"content": "I'd be happy to help...",
"timestamp": "2024-01-15T10:30:02Z",
"model": "claude-3-haiku",
"tokens": {"input": 150, "output": 89}
}
],
"metadata": {
"order_id": "ORD-789",
"customer_tier": "premium",
"escalation_reason": null,
"csat_score": null
},
"ttl": 1710500000 # 90 days from creation
}
History Truncation Strategy
Claude has a 200K context window, but we shouldn't fill it all:
def truncate_history(messages: list, max_tokens: int = 4000) -> list:
"""Keep recent history within token budget"""
# Always keep system prompt (~500 tokens)
# Reserve ~2000 tokens for KB context
# Reserve ~1000 tokens for response
# Leaves ~4000 for history
truncated = []
total_tokens = 0
# Process from most recent to oldest
for msg in reversed(messages):
msg_tokens = count_tokens(msg['content'])
if total_tokens + msg_tokens > max_tokens:
break
truncated.insert(0, msg)
total_tokens += msg_tokens
# Always include the first message for context
if messages and messages[0] not in truncated:
truncated.insert(0, messages[0])
return truncated
3.4 Guardrails Implementation
Guardrail Configuration
import boto3
def create_guardrail():
"""Create Bedrock Guardrail for customer support"""
client = boto3.client('bedrock')
response = client.create_guardrail(
name='customer-support-guardrail',
description='Safety guardrail for customer support AI',
# Content filtering
contentPolicyConfig={
'filtersConfig': [
{'type': 'SEXUAL', 'inputStrength': 'HIGH', 'outputStrength': 'HIGH'},
{'type': 'VIOLENCE', 'inputStrength': 'HIGH', 'outputStrength': 'HIGH'},
{'type': 'HATE', 'inputStrength': 'HIGH', 'outputStrength': 'HIGH'},
{'type': 'INSULTS', 'inputStrength': 'MEDIUM', 'outputStrength': 'HIGH'},
{'type': 'MISCONDUCT', 'inputStrength': 'HIGH', 'outputStrength': 'HIGH'},
{'type': 'PROMPT_ATTACK', 'inputStrength': 'HIGH', 'outputStrength': 'NONE'}
]
},
# Denied topics
topicPolicyConfig={
'topicsConfig': [
{
'name': 'competitor-comparison',
'definition': 'Questions comparing our products to competitors',
'examples': ['Is your product better than Amazon?', 'Why should I buy from you instead of Best Buy?'],
'type': 'DENY'
},
{
'name': 'legal-advice',
'definition': 'Requests for legal advice or opinions',
'examples': ['Can I sue you for this?', 'What are my legal rights?'],
'type': 'DENY'
}
]
},
# PII handling
sensitiveInformationPolicyConfig={
'piiEntitiesConfig': [
{'type': 'EMAIL', 'action': 'ANONYMIZE'},
{'type': 'PHONE', 'action': 'ANONYMIZE'},
{'type': 'SSN', 'action': 'BLOCK'},
{'type': 'CREDIT_DEBIT_CARD_NUMBER', 'action': 'BLOCK'},
{'type': 'US_BANK_ACCOUNT_NUMBER', 'action': 'BLOCK'}
]
},
# Word filters
wordPolicyConfig={
'wordsConfig': [
{'text': 'stupid company'},
{'text': 'worst service ever'}
],
'managedWordListsConfig': [
{'type': 'PROFANITY'}
]
},
blockedInputMessaging='I apologize, but I cannot process that request. How else can I help you today?',
blockedOutputsMessaging='I apologize, but I cannot provide that information. Is there something else I can help with?'
)
return response['guardrailId']
Applying Guardrails
def invoke_with_guardrails(prompt: str, model_id: str, guardrail_id: str) -> dict:
"""Invoke Bedrock with Guardrails applied"""
response = bedrock_runtime.invoke_model(
modelId=model_id,
guardrailIdentifier=guardrail_id,
guardrailVersion='DRAFT', # or specific version
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 500,
"messages": [{"role": "user", "content": prompt}]
})
)
result = json.loads(response['body'].read())
# Check if guardrail intervened
if 'amazon-bedrock-guardrailAction' in response['ResponseMetadata']:
action = response['ResponseMetadata']['amazon-bedrock-guardrailAction']
if action == 'INTERVENED':
# Log for review
log_guardrail_intervention(prompt, result)
return result
3.5 Escalation Logic
Escalation Decision Tree
class EscalationEngine:
"""Determine when to escalate to human agent"""
def __init__(self):
self.confidence_threshold = 0.7
self.max_turns_before_escalate = 5
self.escalation_keywords = [
'speak to human', 'real person', 'manager',
'supervisor', 'escalate', 'lawyer', 'sue'
]
def should_escalate(self, session: dict, response: dict) -> tuple[bool, str]:
"""Returns (should_escalate, reason)"""
# 1. Explicit request
last_message = session['messages'][-1]['content'].lower()
if any(kw in last_message for kw in self.escalation_keywords):
return True, 'customer_request'
# 2. Low confidence
if response.get('confidence', 1.0) < self.confidence_threshold:
return True, 'low_confidence'
# 3. Too many turns without resolution
turn_count = len([m for m in session['messages'] if m['role'] == 'user'])
if turn_count > self.max_turns_before_escalate:
return True, 'max_turns_exceeded'
# 4. High-value customer issue
if session['metadata'].get('customer_tier') == 'premium':
if 'refund' in last_message or 'cancel' in last_message:
return True, 'premium_customer_sensitive'
# 5. Sentiment analysis
sentiment = analyze_sentiment(last_message)
if sentiment['score'] < -0.7: # Very negative
return True, 'negative_sentiment'
# 6. Guardrail intervention
if response.get('guardrail_intervened'):
return True, 'guardrail_block'
return False, None
def execute_escalation(self, session: dict, reason: str):
"""Hand off to human agent"""
# 1. Update session status
session['status'] = 'escalated'
session['metadata']['escalation_reason'] = reason
# 2. Create Zendesk ticket
ticket = create_zendesk_ticket(
subject=f"Escalated: {session['conversation_id']}",
description=format_conversation_summary(session),
priority='high' if reason in ['negative_sentiment', 'premium_customer_sensitive'] else 'normal',
tags=['ai-escalated', reason]
)
# 3. Notify agent queue
notify_agent_queue(ticket['id'], session)
# 4. Send customer message
return {
'content': "I'm connecting you with a customer support specialist who can better assist you. They'll be with you shortly. In the meantime, is there anything else I can help clarify?",
'escalated': True,
'ticket_id': ticket['id']
}
Human Agent Dashboard Integration
def prepare_agent_handoff(session: dict) -> dict:
"""Prepare context for human agent"""
return {
'conversation_id': session['conversation_id'],
'customer': {
'id': session['user_id'],
'tier': session['metadata'].get('customer_tier'),
'language': session['language']
},
'summary': generate_conversation_summary(session),
'ai_suggested_response': generate_suggested_response(session),
'relevant_kb_articles': get_relevant_articles(session),
'order_info': fetch_order_details(session['metadata'].get('order_id')),
'escalation_reason': session['metadata'].get('escalation_reason'),
'sentiment_trend': calculate_sentiment_trend(session['messages']),
'full_transcript': session['messages']
}
Part 4: Bedrock Integration Patterns
Bedrock Integration Patterns
This section provides 10 reusable patterns for integrating AWS Bedrock into production systems.
Pattern 1: Basic Inference
The simplest pattern for calling Bedrock models.
import boto3
import json
class BedrockClient:
"""Basic Bedrock inference client"""
def __init__(self, region: str = 'us-east-1'):
self.client = boto3.client('bedrock-runtime', region_name=region)
self.default_model = 'anthropic.claude-3-5-sonnet-20241022-v2:0'
def invoke(self, prompt: str, model_id: str = None, max_tokens: int = 1000) -> str:
"""Invoke Bedrock model and return response text"""
model_id = model_id or self.default_model
body = {
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": max_tokens,
"messages": [
{"role": "user", "content": prompt}
]
}
response = self.client.invoke_model(
modelId=model_id,
contentType='application/json',
accept='application/json',
body=json.dumps(body)
)
result = json.loads(response['body'].read())
return result['content'][0]['text']
# Usage
client = BedrockClient()
response = client.invoke("What is the return policy for electronics?")
print(response)
Pattern 2: RAG with Knowledge Base
Retrieve relevant context before generating response.
class RAGClient:
"""RAG pattern using Bedrock Knowledge Base"""
def __init__(self, knowledge_base_id: str):
self.kb_client = boto3.client('bedrock-agent-runtime')
self.bedrock = boto3.client('bedrock-runtime')
self.kb_id = knowledge_base_id
def retrieve(self, query: str, num_results: int = 5) -> list:
"""Retrieve relevant documents from Knowledge Base"""
response = self.kb_client.retrieve(
knowledgeBaseId=self.kb_id,
retrievalQuery={'text': query},
retrievalConfiguration={
'vectorSearchConfiguration': {
'numberOfResults': num_results
}
}
)
return [
{
'content': r['content']['text'],
'source': r['location']['s3Location']['uri'],
'score': r['score']
}
for r in response['retrievalResults']
]
def generate_with_context(self, query: str, context: list) -> str:
"""Generate response using retrieved context"""
context_text = "\n\n".join([
f"[Source: {doc['source']}]\n{doc['content']}"
for doc in context
])
prompt = f"""Use the following context to answer the question.
Context:
{context_text}
Question: {query}
Answer based only on the provided context. If the context doesn't contain
the answer, say "I don't have that information."
"""
response = self.bedrock.invoke_model(
modelId='anthropic.claude-3-5-sonnet-20241022-v2:0',
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 500,
"messages": [{"role": "user", "content": prompt}]
})
)
result = json.loads(response['body'].read())
return result['content'][0]['text']
def query(self, question: str) -> dict:
"""Full RAG pipeline: retrieve + generate"""
# Step 1: Retrieve
context = self.retrieve(question)
# Step 2: Generate
answer = self.generate_with_context(question, context)
return {
'answer': answer,
'sources': [doc['source'] for doc in context],
'context_used': len(context)
}
# Usage
rag = RAGClient(knowledge_base_id='KB_12345')
result = rag.query("What is the warranty on laptops?")
print(f"Answer: {result['answer']}")
print(f"Sources: {result['sources']}")
Pattern 3: Streaming Response
Stream responses for real-time chat experience.
import json
class StreamingClient:
"""Stream Bedrock responses for real-time display"""
def __init__(self):
self.client = boto3.client('bedrock-runtime')
def stream(self, prompt: str, model_id: str = 'anthropic.claude-3-5-sonnet-20241022-v2:0'):
"""Yield response chunks as they arrive"""
response = self.client.invoke_model_with_response_stream(
modelId=model_id,
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 1000,
"messages": [{"role": "user", "content": prompt}]
})
)
for event in response['body']:
chunk = json.loads(event['chunk']['bytes'])
if chunk['type'] == 'content_block_delta':
yield chunk['delta'].get('text', '')
elif chunk['type'] == 'message_stop':
break
# Usage with WebSocket
async def handle_chat_message(websocket, message):
client = StreamingClient()
for chunk in client.stream(message):
await websocket.send_json({
'type': 'chunk',
'content': chunk
})
await websocket.send_json({'type': 'done'})
Pattern 4: Multi-Model Routing
Route queries to appropriate models based on complexity.
class ModelRouter:
"""Route queries to optimal model based on complexity"""
MODELS = {
'simple': 'anthropic.claude-3-haiku-20240307-v1:0',
'complex': 'anthropic.claude-3-5-sonnet-20241022-v2:0',
'embedding': 'amazon.titan-embed-text-v2:0'
}
def __init__(self):
self.client = boto3.client('bedrock-runtime')
def classify_complexity(self, query: str) -> str:
"""Use Haiku to classify query complexity (fast, cheap)"""
classification_prompt = f"""Classify this query's complexity:
Query: {query}
Categories:
- simple: Basic FAQ, yes/no questions, simple lookups
- complex: Nuanced questions, comparisons, explanations, complaints
Respond with only: simple or complex"""
response = self.client.invoke_model(
modelId=self.MODELS['simple'],
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 10,
"messages": [{"role": "user", "content": classification_prompt}]
})
)
result = json.loads(response['body'].read())
complexity = result['content'][0]['text'].strip().lower()
return complexity if complexity in ['simple', 'complex'] else 'complex'
def route_and_invoke(self, query: str, context: str = None) -> dict:
"""Classify, route, and invoke appropriate model"""
complexity = self.classify_complexity(query)
model_id = self.MODELS[complexity]
prompt = query
if context:
prompt = f"Context:\n{context}\n\nQuestion: {query}"
response = self.client.invoke_model(
modelId=model_id,
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 500,
"messages": [{"role": "user", "content": prompt}]
})
)
result = json.loads(response['body'].read())
return {
'response': result['content'][0]['text'],
'model_used': model_id,
'complexity': complexity,
'tokens': result['usage']
}
# Usage
router = ModelRouter()
result = router.route_and_invoke("What are your store hours?")
print(f"Model: {result['model_used']}") # Haiku for simple question
print(f"Response: {result['response']}")
Pattern 5: Conversation Memory
Maintain context across multiple conversation turns.
import boto3
from datetime import datetime
import uuid
class ConversationManager:
"""Manage multi-turn conversations with DynamoDB"""
def __init__(self, table_name: str = 'conversations'):
self.dynamodb = boto3.resource('dynamodb')
self.table = self.dynamodb.Table(table_name)
self.bedrock = boto3.client('bedrock-runtime')
def create_session(self, user_id: str) -> str:
"""Create new conversation session"""
session_id = str(uuid.uuid4())
self.table.put_item(Item={
'user_id': user_id,
'session_id': session_id,
'messages': [],
'created_at': datetime.utcnow().isoformat(),
'status': 'active'
})
return session_id
def get_session(self, user_id: str, session_id: str) -> dict:
"""Retrieve existing session"""
response = self.table.get_item(
Key={'user_id': user_id, 'session_id': session_id}
)
return response.get('Item')
def add_message(self, user_id: str, session_id: str, role: str, content: str):
"""Add message to conversation history"""
message = {
'id': str(uuid.uuid4()),
'role': role,
'content': content,
'timestamp': datetime.utcnow().isoformat()
}
self.table.update_item(
Key={'user_id': user_id, 'session_id': session_id},
UpdateExpression='SET messages = list_append(messages, :msg)',
ExpressionAttributeValues={':msg': [message]}
)
def chat(self, user_id: str, session_id: str, user_message: str) -> str:
"""Process chat message with conversation context"""
# Get existing conversation
session = self.get_session(user_id, session_id)
# Build messages for Bedrock
messages = [
{"role": msg['role'], "content": msg['content']}
for msg in session.get('messages', [])[-10:] # Last 10 messages
]
messages.append({"role": "user", "content": user_message})
# Call Bedrock
response = self.bedrock.invoke_model(
modelId='anthropic.claude-3-5-sonnet-20241022-v2:0',
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 500,
"system": "You are a helpful customer support agent.",
"messages": messages
})
)
result = json.loads(response['body'].read())
assistant_message = result['content'][0]['text']
# Save both messages
self.add_message(user_id, session_id, 'user', user_message)
self.add_message(user_id, session_id, 'assistant', assistant_message)
return assistant_message
# Usage
cm = ConversationManager()
session_id = cm.create_session('user_123')
response1 = cm.chat('user_123', session_id, "I need help with my order")
response2 = cm.chat('user_123', session_id, "The order number is 12345")
# Second message has context from first
Pattern 6: Error Handling & Retry
Production-grade error handling with exponential backoff.
import time
from botocore.exceptions import ClientError
class ResilientBedrockClient:
"""Bedrock client with retry logic and fallbacks"""
def __init__(self):
self.client = boto3.client('bedrock-runtime')
self.max_retries = 3
self.base_delay = 1.0
# Fallback model chain
self.models = [
'anthropic.claude-3-5-sonnet-20241022-v2:0',
'anthropic.claude-3-haiku-20240307-v1:0',
'amazon.titan-text-premier-v1:0'
]
def invoke_with_retry(self, prompt: str, model_id: str = None) -> dict:
"""Invoke with exponential backoff retry"""
model_id = model_id or self.models[0]
for attempt in range(self.max_retries):
try:
response = self.client.invoke_model(
modelId=model_id,
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 500,
"messages": [{"role": "user", "content": prompt}]
})
)
result = json.loads(response['body'].read())
return {'success': True, 'response': result, 'model': model_id}
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == 'ThrottlingException':
# Exponential backoff
delay = self.base_delay * (2 ** attempt)
time.sleep(delay)
continue
elif error_code == 'ModelNotReadyException':
time.sleep(5)
continue
elif error_code in ['ValidationException', 'AccessDeniedException']:
# Don't retry these
return {'success': False, 'error': str(e), 'retryable': False}
else:
raise
return {'success': False, 'error': 'Max retries exceeded', 'retryable': True}
def invoke_with_fallback(self, prompt: str) -> dict:
"""Try models in order until one succeeds"""
for model_id in self.models:
result = self.invoke_with_retry(prompt, model_id)
if result['success']:
return result
if not result.get('retryable', True):
return result
return {'success': False, 'error': 'All models failed'}
# Usage
client = ResilientBedrockClient()
result = client.invoke_with_fallback("What is your return policy?")
if result['success']:
print(f"Response from {result['model']}: {result['response']}")
Pattern 7: Cost Tracking
Monitor and track token usage for cost management.
from dataclasses import dataclass
from typing import Optional
import boto3
@dataclass
class TokenUsage:
input_tokens: int
output_tokens: int
model_id: str
@property
def total_tokens(self) -> int:
return self.input_tokens + self.output_tokens
def estimate_cost(self) -> float:
"""Estimate cost in USD"""
# Pricing per 1K tokens (approximate)
pricing = {
'anthropic.claude-3-5-sonnet': {'input': 0.003, 'output': 0.015},
'anthropic.claude-3-haiku': {'input': 0.00025, 'output': 0.00125},
'amazon.titan-text': {'input': 0.0005, 'output': 0.0015}
}
# Find matching pricing
for model_prefix, costs in pricing.items():
if model_prefix in self.model_id:
input_cost = (self.input_tokens / 1000) * costs['input']
output_cost = (self.output_tokens / 1000) * costs['output']
return input_cost + output_cost
return 0.0
class CostTrackingClient:
"""Track Bedrock usage and costs"""
def __init__(self):
self.client = boto3.client('bedrock-runtime')
self.usage_log = []
def invoke(self, prompt: str, model_id: str) -> tuple[str, TokenUsage]:
"""Invoke and track usage"""
response = self.client.invoke_model(
modelId=model_id,
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 500,
"messages": [{"role": "user", "content": prompt}]
})
)
result = json.loads(response['body'].read())
usage = TokenUsage(
input_tokens=result['usage']['input_tokens'],
output_tokens=result['usage']['output_tokens'],
model_id=model_id
)
self.usage_log.append(usage)
return result['content'][0]['text'], usage
def get_session_cost(self) -> float:
"""Get total cost for this session"""
return sum(u.estimate_cost() for u in self.usage_log)
def get_usage_summary(self) -> dict:
"""Get usage summary"""
return {
'total_requests': len(self.usage_log),
'total_input_tokens': sum(u.input_tokens for u in self.usage_log),
'total_output_tokens': sum(u.output_tokens for u in self.usage_log),
'estimated_cost_usd': self.get_session_cost()
}
# Usage
client = CostTrackingClient()
response, usage = client.invoke("Hello", 'anthropic.claude-3-haiku-20240307-v1:0')
print(f"Cost: ${usage.estimate_cost():.6f}")
print(f"Session total: {client.get_usage_summary()}")
Part 5: Scaling & Production Considerations
Scaling & Production Considerations
5.1 Performance Optimization
Latency Targets
| Endpoint | p50 Target | p95 Target | p99 Target |
|---|---|---|---|
| Simple query (Haiku) | 400ms | 800ms | 1.5s |
| Complex query (Sonnet) | 1.5s | 3s | 5s |
| RAG query (KB + Sonnet) | 2s | 4s | 6s |
| Streaming first token | 200ms | 500ms | 1s |
Latency Optimization Techniques
class OptimizedBedrockClient:
"""Performance-optimized Bedrock client"""
def __init__(self):
# Use regional endpoint for lower latency
self.client = boto3.client(
'bedrock-runtime',
region_name='us-east-1',
config=boto3.session.Config(
connect_timeout=5,
read_timeout=60,
retries={'max_attempts': 2}
)
)
# Response cache (Redis)
self.cache = redis.Redis(host='cache.example.com')
self.cache_ttl = 3600 # 1 hour
def get_cached_response(self, prompt_hash: str) -> str | None:
"""Check cache for identical queries"""
return self.cache.get(f"response:{prompt_hash}")
def cache_response(self, prompt_hash: str, response: str):
"""Cache response for future identical queries"""
self.cache.setex(f"response:{prompt_hash}", self.cache_ttl, response)
def invoke_optimized(self, prompt: str, use_cache: bool = True) -> dict:
"""Invoke with caching and optimization"""
import hashlib
prompt_hash = hashlib.sha256(prompt.encode()).hexdigest()[:16]
# Check cache first
if use_cache:
cached = self.get_cached_response(prompt_hash)
if cached:
return {'response': cached, 'cached': True, 'latency_ms': 5}
start = time.time()
response = self.client.invoke_model(
modelId='anthropic.claude-3-5-sonnet-20241022-v2:0',
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 500,
"messages": [{"role": "user", "content": prompt}]
})
)
latency_ms = (time.time() - start) * 1000
result = json.loads(response['body'].read())
response_text = result['content'][0]['text']
# Cache for future use
if use_cache:
self.cache_response(prompt_hash, response_text)
return {
'response': response_text,
'cached': False,
'latency_ms': latency_ms,
'tokens': result['usage']
}
Caching Strategy
| Cache Type | TTL | Use Case | Hit Rate |
|---|---|---|---|
| Response cache | 1 hour | Identical FAQ queries | 30-40% |
| KB retrieval cache | 15 min | Same document lookups | 20-30% |
| Session cache | 30 min | Active conversations | 80%+ |
| Embedding cache | 24 hours | Document embeddings | 90%+ |
5.2 Cost Management
Cost Breakdown (10K conversations/day)
Daily Volume: 10,000 conversations
Avg turns per conversation: 5
Avg tokens per turn: 500 input, 200 output
Model Mix:
- 60% Haiku (simple queries): 6,000 Γ 5 = 30,000 calls
- 40% Sonnet (complex queries): 4,000 Γ 5 = 20,000 calls
Haiku Cost:
- Input: 30,000 Γ 500 / 1000 Γ $0.00025 = $3.75
- Output: 30,000 Γ 200 / 1000 Γ $0.00125 = $7.50
- Subtotal: $11.25/day
Sonnet Cost:
- Input: 20,000 Γ 500 / 1000 Γ $0.003 = $30
- Output: 20,000 Γ 200 / 1000 Γ $0.015 = $60
- Subtotal: $90/day
Knowledge Base (Titan Embeddings):
- 50,000 queries/day Γ 500 tokens Γ $0.00002 = $0.50/day
Total Daily: ~$102/day = ~$3,060/month
Cost per conversation: ~$0.01
Cost Optimization Strategies
class CostOptimizer:
"""Strategies to reduce Bedrock costs"""
def __init__(self):
self.daily_budget = 100.0 # USD
self.current_spend = 0.0
def optimize_prompt(self, prompt: str) -> str:
"""Reduce prompt token count while preserving meaning"""
# Remove excessive whitespace
prompt = ' '.join(prompt.split())
# Truncate context if too long
max_context_tokens = 2000
if len(prompt) > max_context_tokens * 4: # Rough char estimate
prompt = prompt[:max_context_tokens * 4] + "..."
return prompt
def select_model_for_budget(self, query_type: str) -> str:
"""Select model based on remaining budget"""
remaining = self.daily_budget - self.current_spend
budget_percentage = remaining / self.daily_budget
if budget_percentage < 0.1:
# Low budget: Use Haiku for everything
return 'anthropic.claude-3-haiku-20240307-v1:0'
elif budget_percentage < 0.3:
# Medium budget: Haiku for simple, Sonnet only for complex
if query_type == 'complex':
return 'anthropic.claude-3-5-sonnet-20241022-v2:0'
return 'anthropic.claude-3-haiku-20240307-v1:0'
else:
# Normal operation
return self.get_optimal_model(query_type)
def should_use_provisioned_throughput(self, daily_requests: int) -> bool:
"""Determine if provisioned throughput is cost-effective"""
# Provisioned: $XX/hour for guaranteed capacity
# On-demand: Pay per token
# Break-even typically around 50K+ requests/day
return daily_requests > 50000
Provisioned Throughput vs On-Demand
| Factor | On-Demand | Provisioned |
|---|---|---|
| Pricing | Per token | Per model unit/hour |
| Best for | Variable load | Steady high load |
| Latency | Variable | Consistent |
| Availability | Shared capacity | Guaranteed |
| Break-even | <50K req/day | >50K req/day |
5.3 Reliability & Availability
Multi-Region Architecture
βββββββββββββββββββ
β Route 53 β
β (Latency-based) β
ββββββββββ¬βββββββββ
β
ββββββββββββββββΌβββββββββββββββ
βΌ βΌ βΌ
ββββββββββββ ββββββββββββ ββββββββββββ
β us-east-1β β eu-west-1β βap-south-1β
β (Primary)β β (EU) β β (APAC) β
ββββββββββββ ββββββββββββ ββββββββββββ
β β β
βΌ βΌ βΌ
ββββββββββββ ββββββββββββ ββββββββββββ
β Bedrock β β Bedrock β β Bedrock β
β + KB β β + KB β β + KB β
ββββββββββββ ββββββββββββ ββββββββββββ
Failover Configuration
class MultiRegionClient:
"""Bedrock client with multi-region failover"""
def __init__(self):
self.regions = ['us-east-1', 'us-west-2', 'eu-west-1']
self.clients = {
region: boto3.client('bedrock-runtime', region_name=region)
for region in self.regions
}
self.primary_region = 'us-east-1'
def invoke_with_failover(self, prompt: str, model_id: str) -> dict:
"""Try primary region, failover to others on failure"""
regions_to_try = [self.primary_region] + [
r for r in self.regions if r != self.primary_region
]
last_error = None
for region in regions_to_try:
try:
response = self.clients[region].invoke_model(
modelId=model_id,
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 500,
"messages": [{"role": "user", "content": prompt}]
})
)
return {
'success': True,
'region': region,
'response': json.loads(response['body'].read())
}
except Exception as e:
last_error = e
continue
return {'success': False, 'error': str(last_error)}
Circuit Breaker Pattern
from datetime import datetime, timedelta
class CircuitBreaker:
"""Prevent cascade failures with circuit breaker"""
def __init__(self, failure_threshold: int = 5, reset_timeout: int = 60):
self.failure_threshold = failure_threshold
self.reset_timeout = reset_timeout
self.failures = 0
self.last_failure_time = None
self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN
def can_execute(self) -> bool:
"""Check if request should be allowed"""
if self.state == 'CLOSED':
return True
if self.state == 'OPEN':
# Check if timeout has passed
if datetime.now() - self.last_failure_time > timedelta(seconds=self.reset_timeout):
self.state = 'HALF_OPEN'
return True
return False
if self.state == 'HALF_OPEN':
return True
return False
def record_success(self):
"""Record successful execution"""
self.failures = 0
self.state = 'CLOSED'
def record_failure(self):
"""Record failed execution"""
self.failures += 1
self.last_failure_time = datetime.now()
if self.failures >= self.failure_threshold:
self.state = 'OPEN'
5.4 Security & Compliance
VPC Configuration
# Bedrock VPC Endpoint (no internet access needed)
vpc_endpoint_config = {
"VpcEndpointType": "Interface",
"ServiceName": "com.amazonaws.us-east-1.bedrock-runtime",
"SubnetIds": ["subnet-private-1", "subnet-private-2"],
"SecurityGroupIds": ["sg-bedrock-access"],
"PrivateDnsEnabled": True
}
# Security Group
security_group = {
"GroupName": "bedrock-access",
"Description": "Allow HTTPS to Bedrock",
"IpPermissions": [
{
"IpProtocol": "tcp",
"FromPort": 443,
"ToPort": 443,
"UserIdGroupPairs": [{"GroupId": "sg-lambda"}]
}
]
}
IAM Policy (Least Privilege)
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "BedrockInvoke",
"Effect": "Allow",
"Action": [
"bedrock:InvokeModel",
"bedrock:InvokeModelWithResponseStream"
],
"Resource": [
"arn:aws:bedrock:us-east-1::foundation-model/anthropic.claude-3-5-sonnet-*",
"arn:aws:bedrock:us-east-1::foundation-model/anthropic.claude-3-haiku-*"
]
},
{
"Sid": "KnowledgeBaseAccess",
"Effect": "Allow",
"Action": [
"bedrock:Retrieve",
"bedrock:RetrieveAndGenerate"
],
"Resource": "arn:aws:bedrock:us-east-1:123456789:knowledge-base/KB_ID"
},
{
"Sid": "GuardrailAccess",
"Effect": "Allow",
"Action": "bedrock:ApplyGuardrail",
"Resource": "arn:aws:bedrock:us-east-1:123456789:guardrail/GR_ID"
}
]
}
GDPR Compliance
class GDPRCompliantStorage:
"""Handle data according to GDPR requirements"""
def __init__(self, table_name: str):
self.dynamodb = boto3.resource('dynamodb')
self.table = self.dynamodb.Table(table_name)
def store_conversation(self, user_id: str, conversation: dict):
"""Store with consent tracking"""
item = {
'user_id': user_id,
'conversation_id': conversation['id'],
'data': conversation,
'consent_timestamp': datetime.utcnow().isoformat(),
'data_retention_days': 90,
'ttl': int((datetime.utcnow() + timedelta(days=90)).timestamp())
}
self.table.put_item(Item=item)
def delete_user_data(self, user_id: str):
"""Right to deletion - remove all user data"""
# Query all items for user
response = self.table.query(
KeyConditionExpression=Key('user_id').eq(user_id)
)
# Delete each item
with self.table.batch_writer() as batch:
for item in response['Items']:
batch.delete_item(Key={
'user_id': item['user_id'],
'conversation_id': item['conversation_id']
})
# Also delete from S3, Redis, etc.
self.delete_from_s3(user_id)
self.delete_from_cache(user_id)
def export_user_data(self, user_id: str) -> dict:
"""Right to portability - export all user data"""
response = self.table.query(
KeyConditionExpression=Key('user_id').eq(user_id)
)
return {
'user_id': user_id,
'export_timestamp': datetime.utcnow().isoformat(),
'conversations': response['Items']
}
5.5 Monitoring & Observability
CloudWatch Metrics
import boto3
class BedrockMetrics:
"""Custom metrics for Bedrock monitoring"""
def __init__(self, namespace: str = 'CustomerSupport/Bedrock'):
self.cloudwatch = boto3.client('cloudwatch')
self.namespace = namespace
def record_latency(self, model_id: str, latency_ms: float):
"""Record model invocation latency"""
self.cloudwatch.put_metric_data(
Namespace=self.namespace,
MetricData=[{
'MetricName': 'InvocationLatency',
'Value': latency_ms,
'Unit': 'Milliseconds',
'Dimensions': [
{'Name': 'ModelId', 'Value': model_id}
]
}]
)
def record_tokens(self, model_id: str, input_tokens: int, output_tokens: int):
"""Record token usage"""
self.cloudwatch.put_metric_data(
Namespace=self.namespace,
MetricData=[
{
'MetricName': 'InputTokens',
'Value': input_tokens,
'Unit': 'Count',
'Dimensions': [{'Name': 'ModelId', 'Value': model_id}]
},
{
'MetricName': 'OutputTokens',
'Value': output_tokens,
'Unit': 'Count',
'Dimensions': [{'Name': 'ModelId', 'Value': model_id}]
}
]
)
def record_error(self, model_id: str, error_type: str):
"""Record errors"""
self.cloudwatch.put_metric_data(
Namespace=self.namespace,
MetricData=[{
'MetricName': 'Errors',
'Value': 1,
'Unit': 'Count',
'Dimensions': [
{'Name': 'ModelId', 'Value': model_id},
{'Name': 'ErrorType', 'Value': error_type}
]
}]
)
Key Dashboards
| Dashboard | Metrics | Alerts |
|---|---|---|
| Latency | p50, p95, p99 by model | p95 > 5s |
| Throughput | Requests/min, Success rate | Success < 99% |
| Tokens | Input/output by model | Daily spend > budget |
| Errors | Error rate, Error types | Error rate > 1% |
| RAG | Retrieval latency, Hit rate | Hit rate < 80% |
Part 6: Trade-offs & Alternatives
Trade-offs & Alternatives
Bedrock vs Direct API Calls (OpenAI/Anthropic)
| Factor | AWS Bedrock | Direct Anthropic API | Direct OpenAI API |
|---|---|---|---|
| Models | Multiple (Claude, Titan, Llama, Cohere) | Claude only | GPT only |
| Network | VPC PrivateLink (no internet) | Public internet | Public internet |
| Security | AWS IAM, VPC, encryption | API keys | API keys |
| Compliance | SOC2, HIPAA, FedRAMP via AWS | SOC2 | SOC2 |
| Billing | Consolidated AWS bill | Separate | Separate |
| Latency | Same region, low | Variable | Variable |
| Features | KB, Agents, Guardrails | Direct model access | Assistants, threads |
| Data Privacy | AWS data processing terms | Anthropic terms | OpenAI terms |
| Custom Fine-tuning | Limited | Not available | Available |
When to Choose Bedrock
-
Enterprise Security Requirements
- Need VPC isolation (no internet egress)
- Require AWS compliance certifications
- Need IAM-based access control
- Must keep data within AWS
-
Multi-Model Flexibility
- Want to switch models without code changes
- Need to compare different providers
- Want fallback options
-
AWS-Native Architecture
- Already running on AWS
- Need integration with Lambda, DynamoDB, etc.
- Want unified billing and monitoring
-
Managed RAG
- Need quick Knowledge Base setup
- Don't want to manage vector database
- Want automatic document syncing
When NOT to Choose Bedrock
-
Cutting-Edge Features
- Need features available only in direct API (e.g., computer use)
- Require newest model versions immediately
- Need custom fine-tuning (OpenAI)
-
Cost-Sensitive at High Volume
- Direct APIs may offer volume discounts
- Provider credits/free tiers
-
Provider-Specific Optimizations
- Anthropic prompt caching (direct only currently)
- OpenAI Assistants for complex workflows
Bedrock Knowledge Base vs Custom RAG
| Factor | Bedrock KB | Custom RAG (LangChain + Pinecone) |
|---|---|---|
| Setup Time | Hours | Days to weeks |
| Maintenance | Fully managed | Self-managed |
| Flexibility | Limited chunking options | Full control |
| Cost at Scale | Higher per query | Lower marginal cost |
| Retrieval Quality | Good default | Can be optimized |
| Hybrid Search | Limited | Full control |
| Re-ranking | Not available | Can add Cohere, custom |
| Document Types | PDF, TXT, HTML, DOCX | Any with custom parsers |
When to Choose Bedrock KB
β
Quick prototype or MVP
β
Team doesn't have RAG expertise
β
Standard document formats
β
<100K documents
β
Good enough retrieval quality
When to Choose Custom RAG
β
Need fine-grained control over chunking
β
Require hybrid search (keyword + semantic)
β
Need re-ranking for better relevance
β
Processing millions of documents
β
Custom document types (code, tables, images)
β
Need metadata filtering
Hybrid Approach
class HybridRAG:
"""Use Bedrock KB for simple queries, custom RAG for complex"""
def __init__(self):
self.bedrock_kb = BedrockKBClient(kb_id='...')
self.custom_rag = PineconeRAG(index='...')
def query(self, question: str, use_custom: bool = False) -> dict:
if use_custom or self.requires_advanced_retrieval(question):
# Custom RAG with re-ranking
docs = self.custom_rag.retrieve(question)
docs = self.rerank(docs, question)
return self.generate(question, docs)
else:
# Simple Bedrock KB
return self.bedrock_kb.retrieve_and_generate(question)
def requires_advanced_retrieval(self, question: str) -> bool:
# Complex queries need custom RAG
indicators = ['compare', 'versus', 'difference', 'all', 'list']
return any(ind in question.lower() for ind in indicators)
Bedrock Agents vs LangChain/LangGraph
| Factor | Bedrock Agents | LangChain/LangGraph |
|---|---|---|
| Infrastructure | Fully managed | Self-hosted |
| Tool Ecosystem | AWS-focused | 100+ integrations |
| Customization | Limited | Extensive |
| State Management | Built-in | Manual |
| Debugging | CloudWatch traces | LangSmith |
| Cost | Per invocation | Compute + API |
| Learning Curve | Lower | Higher |
Bedrock Agents: Best For
β
Simple action-oriented tasks
β
AWS service integrations (Lambda, Step Functions)
β
Teams without agent expertise
β
Quick deployment
LangChain/LangGraph: Best For
β
Complex multi-step workflows
β
Need custom tools and integrations
β
Require advanced prompting strategies
β
Want open-source flexibility
β
Need detailed debugging and tracing
Model Selection Decision Tree
βββββββββββββββββββββββ
β What's the task? β
ββββββββββββ¬βββββββββββ
β
ββββββββββββββββββββββΌβββββββββββββββββββββ
βΌ βΌ βΌ
βββββββββββββ βββββββββββββ βββββββββββββ
β Simple β β Complex β β Embedding β
β FAQ/Chat β β Reasoning β β /Search β
βββββββ¬ββββββ βββββββ¬ββββββ βββββββ¬ββββββ
β β β
βΌ βΌ βΌ
βββββββββββββ βββββββββββββ βββββββββββββ
β Haiku β β Sonnet β β Titan β
β $0.25/1M β β $3/1M β β Embeddingsβ
β Fast β β Smart β β $0.02/1M β
βββββββββββββ βββββββββββββ βββββββββββββ
Model Selection Guidelines
| Query Type | Recommended Model | Reasoning |
|---|---|---|
| Yes/No questions | Haiku | Fast, cheap |
| Order status | Haiku | Structured data |
| FAQ answers | Haiku | Well-defined answers |
| Complaints | Sonnet | Needs empathy, nuance |
| Technical issues | Sonnet | Complex reasoning |
| Policy questions | Sonnet | Nuanced interpretation |
| Routing/Classification | Haiku | Fast decision |
Architecture Decision Matrix
| Requirement | Recommended Choice |
|---|---|
| < 1s latency | Haiku + caching |
| Enterprise security | Bedrock + VPC |
| Multi-language | Bedrock (native support) |
| 50K+ docs | Custom RAG |
| Quick prototype | Bedrock KB + Agents |
| Complex workflows | LangGraph |
| Cost-sensitive | Haiku + aggressive caching |
| High availability | Multi-region Bedrock |
Interview Tip
When discussing trade-offs, always present both sides fairly. Show that you understand the context matters:
"Bedrock is ideal when you need enterprise security and quick setup, but if you need cutting-edge features or have extreme cost sensitivity at scale, direct APIs might be better. The right choice depends on your specific requirements."
This shows mature engineering judgment, not just advocacy for one solution.
Your Solution
Try solving the problem first before viewing the solution