Event-Driven Architecture with AWS: DynamoDB Streams + Lambda 🚀
January 19, 2026 ¿Ves algún error? Corregir artículo
Event-Driven Architecture (EDA) has become one of the most popular patterns for building scalable, decoupled microservices. When combined with AWS services like DynamoDB Streams and Lambda functions, it becomes a powerful solution for real-time data processing and reactive systems.
In this article, I'll show you how to implement a complete event-driven architecture using AWS native services, with practical examples and best practices I've learned from production environments.
What is Event-Driven Architecture? 📖
Event-Driven Architecture is a design pattern where services communicate through events rather than direct calls. An event represents a state change or significant occurrence in the system.
Key Concepts:
- Event Producer: Service that generates events (in our case, DynamoDB)
- Event Stream: Channel that transports events (DynamoDB Streams)
- Event Consumer: Service that reacts to events (Lambda functions)
- Event: Immutable record of something that happened
Why DynamoDB Streams + Lambda? 🤔
This combination offers several advantages:
- Zero Infrastructure Management: Both are serverless
- Automatic Scaling: Handles any volume of events
- Pay-per-Use: Only pay for what you consume
- Native Integration: AWS handles the complexity
- Guaranteed Delivery: Events are processed at least once
DynamoDB Streams Explained 📊
DynamoDB Streams captures a time-ordered sequence of item-level modifications in a DynamoDB table. When you enable streams, DynamoDB captures:
- INSERT: New item added
- MODIFY: Existing item updated
- REMOVE: Item deleted
Each stream record contains:
- The type of change (INSERT, MODIFY, REMOVE)
- The primary key of the modified item
- The old and new images of the item (configurable)
Architecture Overview 🏗️
Let me show you a typical architecture pattern:
services
order-service
handler.go
serverless.yml
notification-service
handler.go
serverless.yml
analytics-service
handler.go
serverless.yml
inventory-service
handler.go
serverless.yml
infrastructure
dynamodb.yml
iam-roles.yml
Flow:
- Order Service writes to DynamoDB (Orders table)
- DynamoDB Stream captures the change
- Multiple Lambda functions react to the event:
- Notification Service sends email
- Analytics Service updates metrics
- Inventory Service adjusts stock
Step 1: Setting Up DynamoDB with Streams 🗄️
First, let's create a DynamoDB table with streams enabled using Serverless Framework:
infrastructure/dynamodb.ymlservice: order-dynamodb provider: name: aws region: us-east-1 runtime: go1.x resources: Resources: OrdersTable: Type: AWS::DynamoDB::Table Properties: TableName: Orders AttributeDefinitions: - AttributeName: orderId AttributeType: S - AttributeName: customerId AttributeType: S - AttributeName: createdAt AttributeType: N KeySchema: - AttributeName: orderId KeyType: HASH GlobalSecondaryIndexes: - IndexName: CustomerIndex KeySchema: - AttributeName: customerId KeyType: HASH - AttributeName: createdAt KeyType: RANGE Projection: ProjectionType: ALL ProvisionedThroughput: ReadCapacityUnits: 5 WriteCapacityUnits: 5 StreamSpecification: StreamViewType: NEW_AND_OLD_IMAGES ProvisionedThroughput: ReadCapacityUnits: 5 WriteCapacityUnits: 5 Outputs: OrdersTableStreamArn: Value: !GetAtt OrdersTable.StreamArn Export: Name: OrdersTableStreamArn
Important: StreamViewType options:
KEYS_ONLY: Only the key attributesNEW_IMAGE: The entire item after modificationOLD_IMAGE: The entire item before modificationNEW_AND_OLD_IMAGES: Both new and old images (most useful)
Step 2: Creating the Order Service (Producer) 📝
This service writes orders to DynamoDB, triggering events:
services/order-service/handler.gopackage main import ( "context" "encoding/json" "time" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/google/uuid" ) type Order struct { OrderID string `json:"orderId" dynamodbav:"orderId"` CustomerID string `json:"customerId" dynamodbav:"customerId"` Items []Item `json:"items" dynamodbav:"items"` TotalPrice float64 `json:"totalPrice" dynamodbav:"totalPrice"` Status string `json:"status" dynamodbav:"status"` CreatedAt int64 `json:"createdAt" dynamodbav:"createdAt"` } type Item struct { ProductID string `json:"productId" dynamodbav:"productId"` Quantity int `json:"quantity" dynamodbav:"quantity"` Price float64 `json:"price" dynamodbav:"price"` } type CreateOrderRequest struct { CustomerID string `json:"customerId"` Items []Item `json:"items"` } var dynamoClient *dynamodb.Client func init() { cfg, err := config.LoadDefaultConfig(context.TODO()) if err != nil { panic(err) } dynamoClient = dynamodb.NewFromConfig(cfg) } func handler(ctx context.Context, request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) { var req CreateOrderRequest if err := json.Unmarshal([]byte(request.Body), &req); err != nil { return events.APIGatewayProxyResponse{ StatusCode: 400, Body: `{"error": "Invalid request body"}`, }, nil } // Calculate total price var totalPrice float64 for _, item := range req.Items { totalPrice += item.Price * float64(item.Quantity) } // Create order order := Order{ OrderID: uuid.New().String(), CustomerID: req.CustomerID, Items: req.Items, TotalPrice: totalPrice, Status: "PENDING", CreatedAt: time.Now().Unix(), } // Convert to DynamoDB format av, err := attributevalue.MarshalMap(order) if err != nil { return events.APIGatewayProxyResponse{ StatusCode: 500, Body: `{"error": "Failed to marshal order"}`, }, nil } // Put item in DynamoDB (this triggers the stream!) _, err = dynamoClient.PutItem(ctx, &dynamodb.PutItemInput{ TableName: aws.String("Orders"), Item: av, }) if err != nil { return events.APIGatewayProxyResponse{ StatusCode: 500, Body: `{"error": "Failed to create order"}`, }, nil } responseBody, _ := json.Marshal(order) return events.APIGatewayProxyResponse{ StatusCode: 201, Body: string(responseBody), }, nil } func main() { lambda.Start(handler) }
services/order-service/serverless.ymlservice: order-service provider: name: aws runtime: go1.x region: us-east-1 iam: role: statements: - Effect: Allow Action: - dynamodb:PutItem - dynamodb:UpdateItem Resource: - arn:aws:dynamodb:us-east-1:*:table/Orders functions: create: handler: bin/handler events: - http: path: orders method: post cors: true
Step 3: Creating Consumer Services (Lambdas) 🎯
Now let's create Lambda functions that react to DynamoDB Stream events:
Notification Service
services/notification-service/handler.gopackage main import ( "context" "encoding/json" "fmt" "log" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/ses" "github.com/aws/aws-sdk-go-v2/service/ses/types" "github.com/aws/aws-sdk-go-v2/aws" ) var sesClient *ses.Client func init() { cfg, err := config.LoadDefaultConfig(context.TODO()) if err != nil { panic(err) } sesClient = ses.NewFromConfig(cfg) } func handler(ctx context.Context, event events.DynamoDBEvent) error { for _, record := range event.Records { // Only process INSERT events (new orders) if record.EventName == "INSERT" { if err := processNewOrder(ctx, record); err != nil { log.Printf("Error processing record: %v", err) // Continue processing other records continue } } // Process MODIFY events (order status changes) if record.EventName == "MODIFY" { if err := processOrderUpdate(ctx, record); err != nil { log.Printf("Error processing update: %v", err) continue } } } return nil } func processNewOrder(ctx context.Context, record events.DynamoDBEventRecord) error { newImage := record.Change.NewImage orderId := newImage["orderId"].String() customerEmail := newImage["customerEmail"].String() totalPrice := newImage["totalPrice"].Number() log.Printf("Processing new order: %s for customer: %s", orderId, customerEmail) // Send email notification subject := "Order Confirmation" body := fmt.Sprintf( "Thank you for your order!\n\nOrder ID: %s\nTotal: $%s\n\nWe'll notify you when it ships.", orderId, totalPrice, ) return sendEmail(ctx, customerEmail, subject, body) } func processOrderUpdate(ctx context.Context, record events.DynamoDBEventRecord) error { oldStatus := record.Change.OldImage["status"].String() newStatus := record.Change.NewImage["status"].String() // Only send notification if status changed if oldStatus == newStatus { return nil } orderId := record.Change.NewImage["orderId"].String() customerEmail := record.Change.NewImage["customerEmail"].String() log.Printf("Order %s status changed: %s -> %s", orderId, oldStatus, newStatus) subject := "Order Status Update" body := fmt.Sprintf( "Your order status has been updated!\n\nOrder ID: %s\nNew Status: %s", orderId, newStatus, ) return sendEmail(ctx, customerEmail, subject, body) } func sendEmail(ctx context.Context, to, subject, body string) error { input := &ses.SendEmailInput{ Source: aws.String("noreply@yourcompany.com"), Destination: &types.Destination{ ToAddresses: []string{to}, }, Message: &types.Message{ Subject: &types.Content{ Data: aws.String(subject), }, Body: &types.Body{ Text: &types.Content{ Data: aws.String(body), }, }, }, } _, err := sesClient.SendEmail(ctx, input) return err } func main() { lambda.Start(handler) }
services/notification-service/serverless.ymlservice: notification-service provider: name: aws runtime: go1.x region: us-east-1 iam: role: statements: - Effect: Allow Action: - ses:SendEmail Resource: "*" functions: processOrder: handler: bin/handler events: - stream: type: dynamodb arn: Fn::ImportValue: OrdersTableStreamArn batchSize: 10 startingPosition: LATEST maximumRetryAttempts: 3 enabled: true filterPatterns: - eventName: [INSERT, MODIFY]
Analytics Service
services/analytics-service/handler.gopackage main import ( "context" "encoding/json" "log" "time" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/cloudwatch" "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types" ) var cwClient *cloudwatch.Client func init() { cfg, err := config.LoadDefaultConfig(context.TODO()) if err != nil { panic(err) } cwClient = cloudwatch.NewFromConfig(cfg) } func handler(ctx context.Context, event events.DynamoDBEvent) error { var totalRevenue float64 var orderCount int for _, record := range event.Records { if record.EventName == "INSERT" { orderCount++ // Extract total price from new image if priceAttr, ok := record.Change.NewImage["totalPrice"]; ok { var price float64 json.Unmarshal([]byte(priceAttr.Number()), &price) totalRevenue += price } } } if orderCount > 0 { // Send metrics to CloudWatch if err := publishMetrics(ctx, orderCount, totalRevenue); err != nil { log.Printf("Error publishing metrics: %v", err) return err } log.Printf("Processed %d orders with total revenue: $%.2f", orderCount, totalRevenue) } return nil } func publishMetrics(ctx context.Context, orderCount int, totalRevenue float64) error { timestamp := time.Now() _, err := cwClient.PutMetricData(ctx, &cloudwatch.PutMetricDataInput{ Namespace: aws.String("OrderService"), MetricData: []types.MetricDatum{ { MetricName: aws.String("OrderCount"), Value: aws.Float64(float64(orderCount)), Timestamp: ×tamp, Unit: types.StandardUnitCount, }, { MetricName: aws.String("Revenue"), Value: aws.Float64(totalRevenue), Timestamp: ×tamp, Unit: types.StandardUnitNone, }, }, }) return err } func main() { lambda.Start(handler) }
Inventory Service
services/inventory-service/handler.gopackage main import ( "context" "encoding/json" "log" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" ) var dynamoClient *dynamodb.Client type OrderItem struct { ProductID string `json:"productId"` Quantity int `json:"quantity"` } func init() { cfg, err := config.LoadDefaultConfig(context.TODO()) if err != nil { panic(err) } dynamoClient = dynamodb.NewFromConfig(cfg) } func handler(ctx context.Context, event events.DynamoDBEvent) error { for _, record := range event.Records { if record.EventName == "INSERT" { if err := decrementInventory(ctx, record); err != nil { log.Printf("Error decrementing inventory: %v", err) // In production, you might want to send to DLQ here continue } } // Handle order cancellations if record.EventName == "MODIFY" { oldStatus := record.Change.OldImage["status"].String() newStatus := record.Change.NewImage["status"].String() if newStatus == "CANCELLED" && oldStatus != "CANCELLED" { if err := incrementInventory(ctx, record); err != nil { log.Printf("Error incrementing inventory: %v", err) continue } } } } return nil } func decrementInventory(ctx context.Context, record events.DynamoDBEventRecord) error { // Extract items from order itemsAttr := record.Change.NewImage["items"] var items []OrderItem // Parse items list if itemsAttr.DataType() == events.DataTypeList { for _, item := range itemsAttr.List() { var orderItem OrderItem itemMap := item.Map() orderItem.ProductID = itemMap["productId"].String() var qty int json.Unmarshal([]byte(itemMap["quantity"].Number()), &qty) orderItem.Quantity = qty items = append(items, orderItem) } } // Update inventory for each item for _, item := range items { log.Printf("Decrementing inventory for product %s by %d", item.ProductID, item.Quantity) _, err := dynamoClient.UpdateItem(ctx, &dynamodb.UpdateItemInput{ TableName: aws.String("Inventory"), Key: map[string]types.AttributeValue{ "productId": &types.AttributeValueMemberS{Value: item.ProductID}, }, UpdateExpression: aws.String("SET stock = stock - :qty, lastUpdated = :timestamp"), ExpressionAttributeValues: map[string]types.AttributeValue{ ":qty": &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", item.Quantity)}, ":timestamp": &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", time.Now().Unix())}, }, ConditionExpression: aws.String("stock >= :qty"), // Prevent negative stock }) if err != nil { return fmt.Errorf("failed to update inventory for product %s: %w", item.ProductID, err) } } return nil } func incrementInventory(ctx context.Context, record events.DynamoDBEventRecord) error { // Similar logic but with ADD instead of subtract // Implementation left as exercise return nil } func main() { lambda.Start(handler) }
Event Filtering and Batch Processing ⚡
Lambda allows you to filter events before processing:
serverless.ymlfunctions: processHighValueOrders: handler: bin/handler events: - stream: type: dynamodb arn: !GetAtt OrdersTable.StreamArn batchSize: 100 maximumBatchingWindowInSeconds: 10 startingPosition: LATEST filterPatterns: # Only process orders over $100 - eventName: [INSERT] dynamodb: NewImage: totalPrice: N: [{ numeric: [">=", 100] }]
Batch Processing Benefits:
- Process multiple events at once
- Reduce Lambda invocations (lower cost)
- Batch window allows accumulation of events
Error Handling and Retry Strategy 🔄
services/common/error-handler.gopackage common import ( "context" "encoding/json" "log" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/sqs" ) type ErrorHandler struct { sqsClient *sqs.Client dlqURL string } func NewErrorHandler(sqsClient *sqs.Client, dlqURL string) *ErrorHandler { return &ErrorHandler{ sqsClient: sqsClient, dlqURL: dlqURL, } } // ProcessWithRetry processes a record with error handling func (h *ErrorHandler) ProcessWithRetry( ctx context.Context, record events.DynamoDBEventRecord, processor func(context.Context, events.DynamoDBEventRecord) error, ) error { err := processor(ctx, record) if err != nil { log.Printf("Error processing record: %v", err) // Send to Dead Letter Queue for manual inspection if dlqErr := h.sendToDLQ(ctx, record, err); dlqErr != nil { log.Printf("Failed to send to DLQ: %v", dlqErr) } // Return error to trigger Lambda retry mechanism return err } return nil } func (h *ErrorHandler) sendToDLQ(ctx context.Context, record events.DynamoDBEventRecord, processingError error) error { message := map[string]interface{}{ "record": record, "error": processingError.Error(), } messageBody, err := json.Marshal(message) if err != nil { return err } _, err = h.sqsClient.SendMessage(ctx, &sqs.SendMessageInput{ QueueUrl: aws.String(h.dlqURL), MessageBody: aws.String(string(messageBody)), }) return err }
Monitoring and Observability 📊
Key metrics to monitor:
infrastructure/alarms.ymlresources: HighErrorRateAlarm: Type: AWS::CloudWatch::Alarm Properties: AlarmName: NotificationService-HighErrorRate AlarmDescription: Alert when error rate exceeds threshold MetricName: Errors Namespace: AWS/Lambda Statistic: Sum Period: 300 EvaluationPeriods: 2 Threshold: 10 ComparisonOperator: GreaterThanThreshold Dimensions: - Name: FunctionName Value: !Ref NotificationServiceFunction StreamIteratorAgeAlarm: Type: AWS::CloudWatch::Alarm Properties: AlarmName: DynamoDB-StreamLag AlarmDescription: Alert when stream processing is lagging MetricName: IteratorAge Namespace: AWS/Lambda Statistic: Maximum Period: 60 EvaluationPeriods: 3 Threshold: 60000 # 1 minute in milliseconds ComparisonOperator: GreaterThanThreshold Dimensions: - Name: FunctionName Value: !Ref NotificationServiceFunction
Advantages of This Pattern 🚀
1. Loose Coupling
Services don't know about each other. The order service doesn't call notification or inventory services directly.
2. Scalability
Each Lambda scales independently. High notification load doesn't affect inventory processing.
3. Reliability
Built-in retry mechanism. Failed events are automatically retried with exponential backoff.
4. Auditability
DynamoDB Streams provides a complete audit log of all changes for 24 hours.
5. Cost-Effective
Pay only for what you use. No idle servers waiting for events.
6. Easy to Extend
Adding a new consumer is simple - just create a new Lambda subscription to the stream.
Disadvantages and Considerations ⚠️
1. Eventual Consistency
Events are processed asynchronously. There's a delay between the write and event processing.
2. Ordering Challenges
While events for the same partition key are ordered, processing them in order requires careful design.
3. Duplicate Processing
Lambda guarantees "at least once" delivery. Your code must be idempotent.
4. 24-Hour Stream Retention
DynamoDB Streams only retains data for 24 hours. If a Lambda is down longer, events are lost.
5. Cold Starts
Lambda functions may experience cold starts, adding latency to event processing.
6. Debugging Complexity
Distributed event processing can be harder to debug than synchronous calls.
Best Practices 💡
1. Make Your Handlers Idempotent
~// Bad: Not idempotent func processOrder(orderId string) { inventory.Decrement(productId, quantity) } // Good: Idempotent with deduplication func processOrder(orderId string, eventId string) { if alreadyProcessed(eventId) { log.Printf("Event %s already processed, skipping", eventId) return } inventory.Decrement(productId, quantity) markAsProcessed(eventId) }
2. Use Batch Processing
Process multiple records in a single invocation to reduce costs and improve throughput.
3. Implement Dead Letter Queues
Always configure a DLQ to capture events that repeatedly fail processing.
4. Monitor Iterator Age
High iterator age means your consumers are falling behind. Scale or optimize your Lambdas.
5. Use Event Filtering
Filter events at the source to reduce unnecessary Lambda invocations.
6. Structure Your Events Well
Include all necessary context in the event to avoid additional database lookups.
Real-World Use Cases 🌍
E-commerce Order Processing
- Order created → Multiple services react (inventory, shipping, notifications)
- Order status changed → Update customer, analytics, warehouse systems
User Activity Tracking
- User action → Update recommendations, analytics, notifications
- Profile updated → Sync across systems, update caches
Financial Transactions
- Transaction created → Fraud detection, accounting, notifications
- Account updated → Compliance checks, reporting
IoT Data Processing
- Sensor data → Real-time analytics, alerting, data warehousing
- Device status change → Monitoring, maintenance scheduling
Testing Event-Driven Systems 🧪
handler_test.gopackage main import ( "context" "testing" "github.com/aws/aws-lambda-go/events" ) func TestOrderEventProcessing(t *testing.T) { // Create a mock DynamoDB event event := events.DynamoDBEvent{ Records: []events.DynamoDBEventRecord{ { EventName: "INSERT", Change: events.DynamoDBStreamRecord{ NewImage: map[string]events.DynamoDBAttributeValue{ "orderId": events.NewStringAttribute("12345"), "customerId": events.NewStringAttribute("customer-1"), "totalPrice": events.NewNumberAttribute("99.99"), "status": events.NewStringAttribute("PENDING"), }, }, }, }, } // Test the handler err := handler(context.Background(), event) if err != nil { t.Errorf("Expected no error, got %v", err) } // Verify expected side effects // (check email sent, inventory updated, etc.) }
Deployment 🚀
Deploy all services at once:
~# Deploy infrastructure first cd infrastructure serverless deploy # Deploy all services cd ../services for dir in */; do cd "$dir" GOOS=linux GOARCH=amd64 go build -o bin/handler handler.go serverless deploy cd .. done
Conclusion 🎯
Event-Driven Architecture with DynamoDB Streams and Lambda provides a powerful, scalable pattern for building microservices. The key advantages are:
- Decoupled services that can evolve independently
- Automatic scaling without infrastructure management
- Cost-effective pay-per-use model
- Built-in reliability with retries and error handling
- Easy to extend with new consumers
However, remember the tradeoffs:
- Eventual consistency instead of immediate
- Requires idempotent handlers
- More complex debugging
- Need proper monitoring
When implemented correctly with proper error handling, monitoring, and idempotency, this pattern enables building highly scalable and resilient systems.
If you have questions about implementing event-driven architectures in your projects, feel free to reach out!
Visit my GitHub