System Design: LangGraph Log Analysis & Slack Notification System
Design a multi-agent system using LangGraph that reads logs/tickets from Jira/GitHub, fetches error logs, and posts actionable summaries to Slack.
Question
System Design: LangGraph Log Analysis & Slack Notification System
Difficulty: senior Estimated Time: 90 minutes Tags: LangGraph, Multi-Agent, System Design, Jira, GitHub, Slack, Log Analysis, Incident Response
Part 1: Problem Statement
Problem Statement: Intelligent Log Analysis & Incident Summarization
The Scenario
You are a senior ML/platform engineer at a fast-growing tech company. The VP of Engineering approaches you:
"Our on-call engineers spend 2+ hours daily manually correlating Jira tickets, GitHub PRs, and error logs to understand what's happening in production. We need an intelligent system that automatically collects this information, identifies patterns, and posts actionable summaries to Slack. This should run every morning before standup."
Your task: Design a multi-agent system using LangGraph that automates incident analysis and reporting.
Functional Requirements
Core Capabilities
- Jira Integration: Fetch recent tickets with bug/error labels from the last 24 hours
- GitHub Integration: Collect PRs with failed CI, reverted commits, or hotfix branches
- Log Aggregation: Query CloudWatch/Datadog for error logs matching ticket keywords
- Correlation Engine: Match tickets β PRs β logs using timestamps, keywords, and service names
- Root Cause Analysis: Use LLM to identify patterns and potential root causes
- Summary Generation: Create executive summary with actionable insights
- Slack Notification: Post formatted summary to engineering channel with thread for details
Secondary Capabilities
- Historical Tracking: Store analyses for trend detection over time
- Priority Scoring: Rank issues by severity and frequency
- On-Demand Trigger: Allow engineers to manually trigger analysis via Slack command
- Multi-Project Support: Handle multiple Jira projects and GitHub repos
Non-Functional Requirements
Performance
| Metric | Target | Rationale |
|---|---|---|
| End-to-End Latency | < 5 minutes | Complete before standup |
| Jira API Calls | < 100/run | Stay within rate limits |
| GitHub API Calls | < 500/run | Respect GraphQL limits |
| Log Query Time | < 30 seconds | CloudWatch timeout |
Reliability
- Availability: 99% (some failures acceptable, not customer-facing)
- Graceful Degradation: Post partial results if one source fails
- Retry Logic: Handle transient API failures automatically
- Alerting: Notify if system fails 3 consecutive runs
Scale
- Jira Tickets: Process 50-200 tickets/day
- GitHub PRs: Analyze 100-500 PRs/day
- Log Volume: Query up to 1M log entries
- Slack Messages: 1 main summary + detail threads
Why LangGraph?
| Requirement | Why LangGraph (vs Simple Orchestration) |
|---|---|
| Parallel Execution | Collect Jira/GitHub/Logs simultaneously |
| State Management | TypedDict accumulates results across agents |
| Conditional Routing | Skip analysis if no errors found |
| Error Recovery | Checkpoint and resume from failures |
| Observability | LangSmith tracing for debugging |
| Human-in-Loop | Easy to add approval step before posting |
Interview Tip
When designing multi-agent systems, always clarify: (1) Which agents can run in parallel? (2) What happens if one agent fails? (3) How do you prevent infinite loops? This shows production thinking.
Part 2: High-Level Architecture
High-Level Architecture
System Overview
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β LangGraph Orchestrator β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β AgentState β β
β β βββββββββββββββ¬ββββββββββββββ¬ββββββββββββββ¬ββββββββββββββ β β
β β β jira_ticketsβ github_prs β error_logs β correlationsβ β β
β β β List[Ticket]β List[PR] β List[Log] β List[Match] β β β
β β βββββββββββββββ΄ββββββββββββββ΄ββββββββββββββ΄ββββββββββββββ β β
β β βββββββββββββββ¬ββββββββββββββ¬ββββββββββββββ β β
β β β root_causes β summary β slack_ts β β β
β β β List[Cause] β str β Optional β β β
β β βββββββββββββββ΄ββββββββββββββ΄ββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββ PARALLEL COLLECTION βββββββββββββββββββ β
β β β β
β βΌ βΌ βΌ β β
β ββββββββββββ ββββββββββββ ββββββββββββ β β
β β Jira β β GitHub β β Log β β β
β βCollector β βCollector β β Fetcher β β β
β ββββββ¬ββββββ ββββββ¬ββββββ ββββββ¬ββββββ β β
β β β β β β
β βββββββββββββββββββββ΄ββββββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββ΄βββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ SEQUENTIAL ANALYSIS β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β βCorrelatorβ βββΆ β Analyzer β βββΆ βSummarizerβ βββΆ β Slack β β
β β β β(LLM) β β(LLM) β β Poster β β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
External Services
ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ
β Jira β β GitHub β βCloudWatchβ β OpenAI β β Slack β
β API β β API β β/Datadog β β /Claude β β API β
ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ
Data Flow
1. TRIGGER (Cron or Slack Command)
β
βΌ
2. PARALLEL COLLECTION (Fan-out)
βββ Jira: GET /rest/api/3/search?jql=labels=bug AND created>=-24h
βββ GitHub: GraphQL query for PRs with failed checks
βββ Logs: CloudWatch Insights query for ERROR/EXCEPTION
β
βΌ
3. CORRELATION (Fan-in)
Match by: timestamp proximity, service name, error keywords
Output: List of correlated incidents
β
βΌ
4. ROOT CAUSE ANALYSIS (LLM)
Input: Correlated incidents with full context
Output: Identified patterns, potential causes
β
βΌ
5. SUMMARIZATION (LLM)
Generate: Executive summary, action items, priority ranking
β
βΌ
6. SLACK POSTING
Main message: Summary with key stats
Thread replies: Details per incident
Component Responsibilities
| Component | Responsibility | External Dependency |
|---|---|---|
| JiraCollector | Query tickets by label, date, project | Jira REST API |
| GitHubCollector | Query PRs, commits, CI status | GitHub GraphQL API |
| LogFetcher | Query logs by time range, keywords | CloudWatch/Datadog |
| Correlator | Match related items across sources | None (local logic) |
| RootCauseAnalyzer | Identify patterns using LLM | OpenAI/Claude API |
| Summarizer | Generate human-readable summary | OpenAI/Claude API |
| SlackPoster | Format and post to channel | Slack Web API |
Interview Tip
Draw the architecture diagram first, then explain data flow. Interviewers want to see you can communicate complex systems visually. Always label external dependencies clearly.
Part 3: LangGraph State Design
LangGraph State Design
State Definition (Pseudocode)
from typing import TypedDict, Annotated, Optional, List
from datetime import datetime
import operator
# Data Models
class JiraTicket(TypedDict):
key: str # "PROJ-123"
summary: str
description: str
labels: List[str]
created: datetime
assignee: Optional[str]
priority: str
status: str
class GitHubPR(TypedDict):
number: int
title: str
author: str
repo: str
merged_at: Optional[datetime]
ci_status: str # "success" | "failure" | "pending"
files_changed: List[str]
is_hotfix: bool
is_reverted: bool
class ErrorLog(TypedDict):
timestamp: datetime
level: str # "ERROR" | "WARN" | "EXCEPTION"
service: str
message: str
stack_trace: Optional[str]
request_id: Optional[str]
class Correlation(TypedDict):
ticket: Optional[JiraTicket]
pr: Optional[GitHubPR]
logs: List[ErrorLog]
confidence: float # 0.0 - 1.0
matched_keywords: List[str]
class RootCause(TypedDict):
description: str
affected_services: List[str]
related_tickets: List[str]
severity: str # "critical" | "high" | "medium" | "low"
suggested_action: str
# Main State
class LogAnalyzerState(TypedDict):
# βββ Inputs βββ
time_range_hours: int
project_filter: Optional[str]
# βββ Collected Data (Accumulators) βββ
jira_tickets: Annotated[List[JiraTicket], operator.add]
github_prs: Annotated[List[GitHubPR], operator.add]
error_logs: Annotated[List[ErrorLog], operator.add]
# βββ Analysis Results βββ
correlations: List[Correlation]
root_causes: List[RootCause]
# βββ Output βββ
summary: str
slack_message_ts: Optional[str]
# βββ Control Flow βββ
errors: Annotated[List[str], operator.add]
current_step: str
retry_count: int
Why Annotated with operator.add?
The Annotated[List, operator.add] pattern enables parallel agents to accumulate results:
# Without accumulator (WRONG - overwrites)
state["jira_tickets"] = new_tickets # Overwrites previous
# With accumulator (CORRECT - appends)
# LangGraph automatically merges: existing + new
return {"jira_tickets": new_tickets} # Appends to list
This is critical because our three collectors run in parallel and all write to state.
State Transitions
Initial State After Collectors After Analysis
βββββββββββββββββ βββββββββββββββββ βββββββββββββββββ
jira_tickets: [] jira_tickets: [50] jira_tickets: [50]
github_prs: [] βββΆ github_prs: [120] βββΆ github_prs: [120]
error_logs: [] error_logs: [500] error_logs: [500]
correlations: [] correlations: [] correlations: [25]
root_causes: [] root_causes: [] root_causes: [5]
summary: "" summary: "" summary: "..."
Error Handling in State
# Each agent appends errors instead of throwing
def jira_collector(state: LogAnalyzerState) -> dict:
try:
tickets = fetch_jira_tickets(state["time_range_hours"])
return {"jira_tickets": tickets}
except JiraAPIError as e:
return {
"jira_tickets": [],
"errors": [f"Jira fetch failed: {str(e)}"]
}
This allows graceful degradation - if Jira fails, we still have GitHub and logs.
Interview Tip
Always explain your state design decisions. Why TypedDict over Pydantic? (Performance, LangGraph native support). Why accumulators? (Parallel execution). Why errors in state? (Graceful degradation).
Part 4: Agent Node Designs
Agent Node Designs
1. Jira Collector Agent
Purpose: Fetch recent tickets with bug/error labels
# Pseudocode
def jira_collector(state: LogAnalyzerState) -> dict:
"""
Query Jira for recent error-related tickets
"""
hours = state["time_range_hours"]
project = state.get("project_filter", "")
# Build JQL query
jql = f"""
labels IN (bug, error, incident, production-issue)
AND created >= -{hours}h
AND status != Done
{f'AND project = {project}' if project else ''}
ORDER BY created DESC
"""
# API call with pagination
tickets = []
start_at = 0
while True:
response = jira_client.search(jql, start_at=start_at, max_results=50)
tickets.extend(parse_tickets(response.issues))
if len(tickets) >= response.total:
break
start_at += 50
return {"jira_tickets": tickets}
Key Considerations:
- Pagination for large result sets
- JQL optimization (use indexed fields)
- Rate limit: 100 requests/minute
2. GitHub Collector Agent
Purpose: Fetch PRs with failed CI, hotfixes, or reverts
# Pseudocode
def github_collector(state: LogAnalyzerState) -> dict:
"""
Query GitHub for problematic PRs
"""
hours = state["time_range_hours"]
since = datetime.utcnow() - timedelta(hours=hours)
# GraphQL query for efficiency (single request)
query = """
query($since: DateTime!) {
search(query: "is:pr updated:>$since", type: ISSUE, first: 100) {
nodes {
... on PullRequest {
number
title
author { login }
repository { name }
mergedAt
commits(last: 1) {
nodes {
commit {
statusCheckRollup { state }
}
}
}
headRefName # Check for "hotfix" or "revert"
}
}
}
}
"""
prs = graphql_client.execute(query, {"since": since.isoformat()})
# Filter for problematic PRs
problematic = [
pr for pr in prs
if pr.ci_status == "FAILURE"
or "hotfix" in pr.branch.lower()
or "revert" in pr.title.lower()
]
return {"github_prs": problematic}
Key Considerations:
- GraphQL > REST (single request for complex data)
- Filter at query level when possible
- Rate limit: 5000 points/hour
3. Log Fetcher Agent
Purpose: Query error logs from CloudWatch/Datadog
# Pseudocode
def log_fetcher(state: LogAnalyzerState) -> dict:
"""
Query CloudWatch Logs Insights for errors
"""
hours = state["time_range_hours"]
# CloudWatch Insights query
query = """
fields @timestamp, @message, @logStream
| filter @message like /ERROR|EXCEPTION|FATAL/
| filter @timestamp > ago({hours}h)
| sort @timestamp desc
| limit 1000
"""
# Start async query
query_id = cloudwatch.start_query(
logGroupNames=["/app/production", "/app/api"],
queryString=query.format(hours=hours),
startTime=int((datetime.utcnow() - timedelta(hours=hours)).timestamp()),
endTime=int(datetime.utcnow().timestamp())
)
# Poll for results (CloudWatch is async)
while True:
response = cloudwatch.get_query_results(queryId=query_id)
if response["status"] == "Complete":
break
time.sleep(1)
logs = parse_cloudwatch_results(response["results"])
return {"error_logs": logs}
Key Considerations:
- CloudWatch Insights is async (poll for results)
- Limit results to prevent memory issues
- Consider sampling for high-volume logs
4. Correlator Agent
Purpose: Match tickets, PRs, and logs by patterns
# Pseudocode
def correlator(state: LogAnalyzerState) -> dict:
"""
Correlate tickets, PRs, and logs using multiple signals
"""
correlations = []
for ticket in state["jira_tickets"]:
# Extract keywords from ticket
keywords = extract_keywords(ticket.summary + ticket.description)
service_name = extract_service_name(ticket)
# Find matching logs
matching_logs = [
log for log in state["error_logs"]
if any(kw in log.message.lower() for kw in keywords)
or log.service == service_name
]
# Find matching PRs (by time proximity and service)
matching_prs = [
pr for pr in state["github_prs"]
if any(f in pr.files_changed for f in service_files(service_name))
or time_proximity(ticket.created, pr.merged_at) < timedelta(hours=2)
]
if matching_logs or matching_prs:
correlations.append({
"ticket": ticket,
"pr": matching_prs[0] if matching_prs else None,
"logs": matching_logs[:10], # Limit to top 10
"confidence": calculate_confidence(keywords, matching_logs),
"matched_keywords": keywords
})
return {"correlations": correlations}
Correlation Signals:
| Signal | Weight | Example |
|---|---|---|
| Keyword match | 0.3 | "NullPointerException" in both |
| Service name | 0.3 | "payment-service" in both |
| Time proximity | 0.2 | PR merged 30min before ticket |
| Author/Assignee | 0.1 | Same person |
| File path | 0.1 | PR touched payment/api.py |
5. Root Cause Analyzer Agent (LLM)
Purpose: Use LLM to identify patterns and root causes
# Pseudocode
def root_cause_analyzer(state: LogAnalyzerState) -> dict:
"""
Use LLM to analyze correlations and identify root causes
"""
if not state["correlations"]:
return {"root_causes": []}
# Prepare context for LLM
context = format_correlations_for_llm(state["correlations"])
prompt = f"""
Analyze these correlated incidents and identify root causes:
{context}
For each root cause, provide:
1. Description of the issue
2. Affected services
3. Related ticket numbers
4. Severity (critical/high/medium/low)
5. Suggested action
Focus on patterns: Are multiple tickets related to the same root cause?
"""
response = llm.invoke(prompt)
root_causes = parse_root_causes(response)
return {"root_causes": root_causes}
6. Summarizer Agent (LLM)
Purpose: Generate executive summary for Slack
# Pseudocode
def summarizer(state: LogAnalyzerState) -> dict:
"""
Generate human-readable summary
"""
prompt = f"""
Generate a concise engineering standup summary:
Tickets: {len(state['jira_tickets'])} in last {state['time_range_hours']}h
Failed PRs: {len(state['github_prs'])}
Error logs: {len(state['error_logs'])}
Correlated incidents: {len(state['correlations'])}
Root causes identified: {len(state['root_causes'])}
Root Causes:
{format_root_causes(state['root_causes'])}
Format as:
- π Stats summary (1 line)
- π₯ Critical issues (if any)
- β οΈ Key patterns observed
- π Recommended actions
Keep it under 500 characters for Slack readability.
"""
summary = llm.invoke(prompt)
return {"summary": summary}
7. Slack Poster Agent
Purpose: Post formatted summary to Slack
# Pseudocode
def slack_poster(state: LogAnalyzerState) -> dict:
"""
Post summary to Slack with threaded details
"""
# Main message with Block Kit formatting
blocks = [
{"type": "header", "text": "π Daily Incident Analysis"},
{"type": "section", "text": state["summary"]},
{"type": "context", "elements": [
{"type": "mrkdwn", "text": f"Analyzed {state['time_range_hours']}h β’ {datetime.now().strftime('%Y-%m-%d %H:%M')}"}
]}
]
# Post main message
response = slack_client.chat_postMessage(
channel="#engineering-incidents",
blocks=blocks
)
message_ts = response["ts"]
# Post details in thread
for rc in state["root_causes"]:
slack_client.chat_postMessage(
channel="#engineering-incidents",
thread_ts=message_ts,
text=format_root_cause_detail(rc)
)
return {"slack_message_ts": message_ts}
Interview Tip
For each agent, be prepared to discuss: (1) Input/Output contract, (2) Error handling, (3) Rate limiting, (4) What happens if it fails. This shows production maturity.
Part 5: Graph Construction
Graph Construction
Building the LangGraph
# Pseudocode
from langgraph.graph import StateGraph, START, END
def build_log_analyzer_graph():
# Initialize graph with state schema
graph = StateGraph(LogAnalyzerState)
# βββ Add Nodes βββ
graph.add_node("jira_collector", jira_collector)
graph.add_node("github_collector", github_collector)
graph.add_node("log_fetcher", log_fetcher)
graph.add_node("correlator", correlator)
graph.add_node("analyzer", root_cause_analyzer)
graph.add_node("summarizer", summarizer)
graph.add_node("slack_poster", slack_poster)
# βββ Parallel Collection (Fan-out) βββ
# All three collectors start simultaneously from START
graph.add_edge(START, "jira_collector")
graph.add_edge(START, "github_collector")
graph.add_edge(START, "log_fetcher")
# βββ Synchronization Point (Fan-in) βββ
# Correlator waits for ALL collectors to complete
graph.add_edge("jira_collector", "correlator")
graph.add_edge("github_collector", "correlator")
graph.add_edge("log_fetcher", "correlator")
# βββ Sequential Analysis βββ
graph.add_edge("correlator", "analyzer")
graph.add_edge("analyzer", "summarizer")
graph.add_edge("summarizer", "slack_poster")
graph.add_edge("slack_poster", END)
return graph.compile()
Execution Flow Diagram
START
β
βββββββββββββββΌββββββββββββββ
β β β
βΌ βΌ βΌ
βββββββββββ βββββββββββ βββββββββββ
β Jira β β GitHub β β Logs β PARALLEL
βCollectorβ βCollectorβ β Fetcher β (Fan-out)
ββββββ¬βββββ ββββββ¬βββββ ββββββ¬βββββ
β β β
βββββββββββββββΌββββββββββββββ
β
βΌ
βββββββββββββ
βCorrelator β SYNC POINT
βββββββ¬ββββββ (Fan-in)
β
βΌ
βββββββββββββ
β Analyzer β SEQUENTIAL
β (LLM) β
βββββββ¬ββββββ
β
βΌ
βββββββββββββ
βSummarizer β
β (LLM) β
βββββββ¬ββββββ
β
βΌ
βββββββββββββ
β Slack β
β Poster β
βββββββ¬ββββββ
β
βΌ
END
Adding Conditional Routing
# Skip analysis if no data collected
def should_analyze(state: LogAnalyzerState) -> str:
"""Route based on collected data"""
total_items = (
len(state.get("jira_tickets", [])) +
len(state.get("github_prs", [])) +
len(state.get("error_logs", []))
)
if total_items == 0:
return "skip_to_notify" # Nothing to analyze
return "correlator"
# Add conditional edge
graph.add_conditional_edges(
"jira_collector", # After last parallel node completes
should_analyze,
{
"correlator": "correlator",
"skip_to_notify": "slack_poster" # Post "no issues" message
}
)
Checkpointing for Recovery
from langgraph.checkpoint.sqlite import SqliteSaver
# Enable persistence
checkpointer = SqliteSaver.from_conn_string("./checkpoints.db")
graph = build_log_analyzer_graph()
app = graph.compile(checkpointer=checkpointer)
# Run with thread_id for resumability
config = {"configurable": {"thread_id": "daily-run-2024-01-15"}}
result = app.invoke(initial_state, config)
# If it fails, can resume from last checkpoint
# result = app.invoke(None, config) # Continues from checkpoint
Interview Tip
Explain the fan-out/fan-in pattern clearly. Interviewers love to ask: "What if one collector is slow?" Answer: "The correlator only starts when ALL parallel nodes complete. We set timeouts per agent to prevent indefinite waits."
Part 6: API Integrations
API Integrations
Jira REST API
Endpoint: GET /rest/api/3/search
Auth: Basic Auth or OAuth 2.0
Rate Limits:
- Anonymous: 50 requests/hour
- Authenticated: 100 requests/minute
JQL Query Example:
labels IN (bug, error)
AND created >= -24h
AND status NOT IN (Done, Closed)
ORDER BY created DESC
Pagination:
- startAt: 0, 50, 100...
- maxResults: 50 (default)
- total: returned in response
Response Fields to Extract:
- key: "PROJ-123"
- fields.summary
- fields.description
- fields.labels
- fields.created
- fields.assignee.displayName
- fields.priority.name
GitHub GraphQL API
Endpoint: POST /graphql
Auth: Bearer token (PAT or GitHub App)
Rate Limits:
- 5000 points/hour
- Complex queries cost more points
Query Example:
query {
search(
query: "is:pr is:merged merged:>2024-01-14"
type: ISSUE
first: 100
) {
nodes {
... on PullRequest {
number
title
author { login }
repository { name owner { login } }
mergedAt
headRefName
commits(last: 1) {
nodes {
commit {
statusCheckRollup { state }
}
}
}
files(first: 50) {
nodes { path }
}
}
}
pageInfo {
hasNextPage
endCursor
}
}
}
Why GraphQL over REST:
- Single request for PR + commits + files + status
- REST would need 4+ requests per PR
CloudWatch Logs Insights
Endpoint: logs.start_query / logs.get_query_results
Auth: IAM role or access keys
Query Language Example:
fields @timestamp, @message, @logStream
| filter @message like /ERROR|EXCEPTION|FATAL/
| filter @timestamp > ago(24h)
| stats count(*) by bin(1h)
| sort @timestamp desc
| limit 1000
Async Pattern:
1. start_query() β returns queryId
2. Poll get_query_results(queryId) until status="Complete"
3. Parse results
Timeouts:
- Query timeout: 60 minutes max
- Set shorter timeout (30s) in application
Cost Considerations:
- $0.005 per GB scanned
- Use filter early in query to reduce scan
Slack Web API
Endpoint: POST /api/chat.postMessage
Auth: Bot token (xoxb-...)
Rate Limits:
- Tier 3: 50+ requests/minute (most methods)
- chat.postMessage: ~1 msg/sec per channel
Block Kit for Rich Formatting:
{
"channel": "#engineering-incidents",
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": "π Daily Incident Analysis"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Summary*: 5 incidents, 2 critical"
}
},
{
"type": "divider"
},
{
"type": "section",
"fields": [
{"type": "mrkdwn", "text": "*Tickets:* 12"},
{"type": "mrkdwn", "text": "*Failed PRs:* 3"}
]
}
]
}
Threading:
- First message: Get 'ts' from response
- Replies: Include 'thread_ts' = original 'ts'
Authentication & Secrets Management
Best Practices:
1. Never hardcode credentials
2. Use environment variables or secret manager
3. Rotate tokens periodically
4. Use least-privilege permissions
Secret Sources:
- AWS Secrets Manager
- HashiCorp Vault
- Environment variables (for simple setups)
Example:
JIRA_API_TOKEN=secret://jira/api-token
GITHUB_TOKEN=secret://github/pat
SLACK_BOT_TOKEN=secret://slack/bot-token
OPENAI_API_KEY=secret://openai/key
At Runtime:
secrets = SecretManager()
jira_token = secrets.get("jira/api-token")
Interview Tip
Know the rate limits! Interviewers often ask: "What happens at scale?" Be ready to discuss: pagination, rate limiting, exponential backoff, and caching to reduce API calls.
Part 7: Error Handling & Retries
Error Handling & Retries
Retry Strategy per Agent
# Pseudocode - Exponential backoff decorator
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=30),
reraise=True
)
def jira_collector_with_retry(state: LogAnalyzerState) -> dict:
return jira_collector(state)
# Retry behavior:
# Attempt 1: immediate
# Attempt 2: wait 2 seconds
# Attempt 3: wait 4 seconds
# Then: raise exception
Graceful Degradation
# Agent that handles its own failures
def jira_collector(state: LogAnalyzerState) -> dict:
try:
tickets = fetch_jira_tickets(state["time_range_hours"])
return {"jira_tickets": tickets}
except JiraAPIError as e:
# Log error but don't crash
logger.warning(f"Jira fetch failed: {e}")
return {
"jira_tickets": [],
"errors": [f"β οΈ Jira unavailable: {str(e)}"]
}
except Exception as e:
logger.error(f"Unexpected error in Jira collector: {e}")
return {
"jira_tickets": [],
"errors": [f"β Jira collector crashed: {str(e)}"]
}
Result: System continues with GitHub + Logs even if Jira fails.
Error Aggregation in Summary
def summarizer(state: LogAnalyzerState) -> dict:
# Include errors in summary
errors = state.get("errors", [])
if errors:
error_section = "\nβ οΈ **Data Collection Issues:**\n" + "\n".join(errors)
else:
error_section = ""
summary = generate_summary(state) + error_section
return {"summary": summary}
Circuit Breaker Pattern
# Prevent hammering a failing service
class CircuitBreaker:
def __init__(self, failure_threshold=5, reset_timeout=300):
self.failures = 0
self.threshold = failure_threshold
self.reset_timeout = reset_timeout
self.last_failure = None
self.state = "CLOSED" # CLOSED | OPEN | HALF_OPEN
def call(self, func, *args):
if self.state == "OPEN":
if time.time() - self.last_failure > self.reset_timeout:
self.state = "HALF_OPEN"
else:
raise CircuitOpenError("Circuit breaker is open")
try:
result = func(*args)
self.failures = 0
self.state = "CLOSED"
return result
except Exception as e:
self.failures += 1
self.last_failure = time.time()
if self.failures >= self.threshold:
self.state = "OPEN"
raise
# Usage
jira_breaker = CircuitBreaker(failure_threshold=3, reset_timeout=300)
tickets = jira_breaker.call(fetch_jira_tickets, time_range)
Timeout Handling
import asyncio
async def fetch_with_timeout(coro, timeout_seconds=30):
"""Wrap any coroutine with a timeout"""
try:
return await asyncio.wait_for(coro, timeout=timeout_seconds)
except asyncio.TimeoutError:
raise TimeoutError(f"Operation timed out after {timeout_seconds}s")
# Per-agent timeouts
AGENT_TIMEOUTS = {
"jira_collector": 30,
"github_collector": 30,
"log_fetcher": 60, # CloudWatch can be slow
"correlator": 10,
"analyzer": 45, # LLM can be slow
"summarizer": 30,
"slack_poster": 10,
}
Dead Letter Queue
# For completely failed runs
def handle_complete_failure(state: LogAnalyzerState, error: Exception):
"""
When the entire pipeline fails, save for manual review
"""
dlq_record = {
"timestamp": datetime.utcnow().isoformat(),
"state": state,
"error": str(error),
"traceback": traceback.format_exc()
}
# Option 1: Write to DynamoDB
dynamodb.put_item(
TableName="log-analyzer-dlq",
Item=serialize(dlq_record)
)
# Option 2: Send to SQS
sqs.send_message(
QueueUrl="log-analyzer-dlq",
MessageBody=json.dumps(dlq_record)
)
# Alert on-call
slack_client.chat_postMessage(
channel="#ops-alerts",
text=f"π¨ Log analyzer failed: {str(error)}"
)
Error Handling Summary
| Error Type | Strategy | Example |
|---|---|---|
| Transient (rate limit) | Retry with backoff | HTTP 429 |
| Partial failure | Graceful degradation | Jira down |
| Repeated failures | Circuit breaker | API consistently failing |
| Timeout | Per-agent limits | CloudWatch slow |
| Complete failure | DLQ + Alert | Uncaught exception |
Interview Tip
Error handling is a senior engineer differentiator. Don't just say "retry." Explain: "Exponential backoff for transient failures, circuit breaker to prevent cascading failures, graceful degradation for partial results, and DLQ for forensics."
Part 8: Scaling & Production
Scaling & Production
Rate Limiting Strategy
# Token bucket rate limiter
class RateLimiter:
def __init__(self, calls_per_minute: int):
self.rate = calls_per_minute / 60 # calls per second
self.tokens = calls_per_minute
self.last_update = time.time()
self.lock = threading.Lock()
def acquire(self):
with self.lock:
now = time.time()
elapsed = now - self.last_update
self.tokens = min(
self.tokens + elapsed * self.rate,
self.rate * 60 # max bucket size
)
self.last_update = now
if self.tokens >= 1:
self.tokens -= 1
return True
return False
def wait(self):
while not self.acquire():
time.sleep(0.1)
# Per-service limiters
rate_limiters = {
"jira": RateLimiter(calls_per_minute=50),
"github": RateLimiter(calls_per_minute=80),
"slack": RateLimiter(calls_per_minute=50),
}
Caching Layer (Redis)
# Cache expensive API calls
import redis
import hashlib
cache = redis.Redis(host='localhost', port=6379)
def cached_jira_query(jql: str, ttl_seconds: int = 300):
"""Cache Jira results for 5 minutes"""
cache_key = f"jira:{hashlib.md5(jql.encode()).hexdigest()}"
# Check cache
cached = cache.get(cache_key)
if cached:
return json.loads(cached)
# Fetch from API
result = jira_client.search(jql)
# Store in cache
cache.setex(cache_key, ttl_seconds, json.dumps(result))
return result
# What to cache:
# β Jira tickets (TTL: 5 min) - queries are expensive
# β GitHub user info (TTL: 1 hour) - rarely changes
# β Logs - too dynamic, always fresh
# β Slack posts - no point caching writes
Scheduled vs Event-Driven Execution
SCHEDULED (Cron)
βββββββββββββββββ
Trigger: Every day at 8:30 AM before standup
Pros: Predictable, easy to monitor
Cons: May miss urgent issues
Implementation:
AWS EventBridge: cron(30 8 * * ? *)
or
Kubernetes CronJob: "30 8 * * *"
βββββββββββββββββ
EVENT-DRIVEN
βββββββββββββββββ
Triggers:
- Jira webhook: New ticket with label "production-incident"
- GitHub webhook: CI failure on main branch
- PagerDuty: New incident opened
Pros: Real-time alerting
Cons: More complex, potential spam
Implementation:
Webhook β API Gateway β Lambda β LangGraph
Observability with LangSmith
# Enable LangSmith tracing
import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "log-analyzer"
os.environ["LANGCHAIN_API_KEY"] = "..."
# Automatic tracing of all LLM calls
# See in LangSmith dashboard:
# - Token usage per agent
# - Latency breakdown
# - Input/output for debugging
# - Error traces
Key Metrics to Track:
| Metric | Target | Alert Threshold |
|---|---|---|
| End-to-end latency | < 5 min | > 10 min |
| Collection success rate | > 95% | < 80% |
| Correlation count | 5-50 | 0 or > 100 |
| LLM token usage | < 10k/run | > 20k/run |
| Slack post success | 100% | < 100% |
Cost Analysis
Daily Run Cost Breakdown
βββββββββββββββββββββββββ
Component Est. Cost
βββββββββββββββββββββββββ
Jira API Free (included in license)
GitHub API Free (within rate limits)
CloudWatch Insights $0.10 (scan ~20GB logs)
OpenAI GPT-4 $0.50 (analysis + summary)
Slack API Free
Lambda/Compute $0.01
βββββββββββββββββββββββββ
Total per run ~$0.61
Monthly (30 runs) ~$18.30
Cost Optimization:
1. Use GPT-3.5-turbo for summarization ($0.05 vs $0.50)
2. Cache repeated queries (reduce CloudWatch scans)
3. Batch Slack messages (fewer API calls)
Deployment Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β AWS Architecture β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β EventBridge (Cron) β
β β β
β βΌ β
β Lambda Function β
β - Runs LangGraph β
β - 15 min timeout β
β - 1024MB memory β
β β β
β ββββΆ Secrets Manager (API keys) β
β ββββΆ ElastiCache Redis (caching) β
β ββββΆ DynamoDB (checkpoints, history) β
β ββββΆ CloudWatch (logs, metrics) β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Alternative: Kubernetes CronJob
- More control over execution environment
- Can use larger memory/longer timeouts
- Better for complex dependencies
Interview Tip
Always discuss cost. Interviewers want to see you think about business impact. Calculate per-run and monthly costs. Mention optimization strategies like model selection and caching.
Part 9: Trade-offs & Alternatives
Trade-offs & Alternatives
LangGraph vs Alternatives
| Approach | Pros | Cons | When to Use |
|---|---|---|---|
| LangGraph | State management, parallel exec, checkpointing | Learning curve, dependency | Complex multi-agent flows |
| Simple Python | No dependencies, easy to debug | Manual state, no parallelism | Simple linear flows |
| AutoGen | Multi-agent conversation | Less control over flow | Agent-to-agent chat |
| CrewAI | Role-based agents | Opinionated structure | Team simulation |
| Temporal/Airflow | Battle-tested, durable | Overkill for this use case | Enterprise workflows |
Our Choice: LangGraph because:
- Need parallel collection (3 agents simultaneously)
- Want state persistence for debugging
- Conditional routing (skip if no errors)
- LangSmith integration for observability
Real-Time vs Batch Processing
BATCH (Our Choice)
βββββββββββββββββββ
- Run daily at 8:30 AM
- Comprehensive analysis
- Lower cost (one LLM call)
- Better for patterns
REAL-TIME
βββββββββββββββββββ
- Webhook per incident
- Immediate notification
- Higher cost (many LLM calls)
- Better for urgency
HYBRID (Best of Both)
βββββββββββββββββββ
- Daily batch for summary
- Real-time for critical (P0) only
- Two Slack channels:
#incidents-daily (batch)
#incidents-urgent (real-time)
Push vs Pull Architecture
PULL (Our Choice)
βββββββββββββββββββ
System queries APIs on schedule
+ Simpler to implement
+ No webhook management
+ Works with any API
- Delayed detection
PUSH
βββββββββββββββββββ
APIs send webhooks to our system
+ Real-time detection
+ No polling overhead
- Webhook infrastructure needed
- Not all APIs support webhooks
Our Decision: PULL for simplicity, can add PUSH for critical path later.
LLM Selection
| Model | Latency | Cost | Quality | Best For |
|---|---|---|---|---|
| GPT-4 | 5-10s | $$$$ | Excellent | Root cause analysis |
| GPT-3.5-turbo | 1-2s | $ | Good | Summarization |
| Claude Sonnet | 3-5s | $$ | Great | Complex reasoning |
| Claude Haiku | <1s | $ | Good | Simple formatting |
Our Choice:
- Root cause analysis: GPT-4 (needs reasoning)
- Summarization: GPT-3.5-turbo (faster, cheaper, sufficient)
Alternative Architectures Considered
Option A: Monolithic Script
# Simple sequential script
def main():
tickets = fetch_jira()
prs = fetch_github()
logs = fetch_cloudwatch()
correlations = correlate(tickets, prs, logs)
summary = analyze(correlations)
post_slack(summary)
Rejected because: No parallelism, no error recovery, no observability.
Option B: Airflow DAG
# Airflow task graph
with DAG("log-analyzer") as dag:
jira = PythonOperator(task_id="jira", ...)
github = PythonOperator(task_id="github", ...)
# ...
Rejected because: Heavy infrastructure for simple use case.
Option C: AWS Step Functions
{
"StartAt": "ParallelCollection",
"States": {
"ParallelCollection": {
"Type": "Parallel",
"Branches": [...]
}
}
}
Rejected because: JSON DSL painful for LLM integration.
Future Improvements
Phase 2 Enhancements
ββββββββββββββββββββ
1. Slack Interactivity
- Button to create Jira ticket from incident
- Reaction to mark as "acknowledged"
- Command: /analyze now
2. Historical Trends
- Weekly trend graphs
- "This service failed 5x this week"
- Recurring issue detection
3. Smart Alerting
- Only notify if severity > threshold
- Route to specific teams based on service
- Reduce noise during deployments
4. Multi-Tenant
- Support multiple projects/teams
- Separate Slack channels per team
- RBAC for configuration
What NOT to Build
| Feature | Why Not |
|---|---|
| Auto-fix issues | Too risky, humans should decide |
| Replace Jira | Out of scope, existing tool works |
| Full log aggregator | Use CloudWatch/Datadog instead |
| Chat interface | Slack commands are simpler |
Interview Tip
Always mention what you considered and rejected. This shows you evaluated options, not just picked the first solution. "We considered Airflow but chose LangGraph because..."
Your Solution
Try solving the problem first before viewing the solution