Event-Driven Architecture with AWS: DynamoDB Streams + Lambda 🚀

January 19, 2026 ¿Ves algún error? Corregir artículo AWS Event-Driven Architecture

Event-Driven Architecture (EDA) has become one of the most popular patterns for building scalable, decoupled microservices. When combined with AWS services like DynamoDB Streams and Lambda functions, it becomes a powerful solution for real-time data processing and reactive systems.

In this article, I'll show you how to implement a complete event-driven architecture using AWS native services, with practical examples and best practices I've learned from production environments.

What is Event-Driven Architecture? 📖

Event-Driven Architecture is a design pattern where services communicate through events rather than direct calls. An event represents a state change or significant occurrence in the system.

Key Concepts:

  • Event Producer: Service that generates events (in our case, DynamoDB)
  • Event Stream: Channel that transports events (DynamoDB Streams)
  • Event Consumer: Service that reacts to events (Lambda functions)
  • Event: Immutable record of something that happened

Why DynamoDB Streams + Lambda? 🤔

This combination offers several advantages:

  1. Zero Infrastructure Management: Both are serverless
  2. Automatic Scaling: Handles any volume of events
  3. Pay-per-Use: Only pay for what you consume
  4. Native Integration: AWS handles the complexity
  5. Guaranteed Delivery: Events are processed at least once

DynamoDB Streams Explained 📊

DynamoDB Streams captures a time-ordered sequence of item-level modifications in a DynamoDB table. When you enable streams, DynamoDB captures:

  • INSERT: New item added
  • MODIFY: Existing item updated
  • REMOVE: Item deleted

Each stream record contains:

  • The type of change (INSERT, MODIFY, REMOVE)
  • The primary key of the modified item
  • The old and new images of the item (configurable)

Architecture Overview 🏗️

Let me show you a typical architecture pattern:

services

order-service

handler.go

serverless.yml

notification-service

handler.go

serverless.yml

analytics-service

handler.go

serverless.yml

inventory-service

handler.go

serverless.yml

infrastructure

dynamodb.yml

iam-roles.yml

Flow:

  1. Order Service writes to DynamoDB (Orders table)
  2. DynamoDB Stream captures the change
  3. Multiple Lambda functions react to the event:
    • Notification Service sends email
    • Analytics Service updates metrics
    • Inventory Service adjusts stock

Step 1: Setting Up DynamoDB with Streams 🗄️

First, let's create a DynamoDB table with streams enabled using Serverless Framework:

infrastructure/dynamodb.yml
service: order-dynamodb provider: name: aws region: us-east-1 runtime: go1.x resources: Resources: OrdersTable: Type: AWS::DynamoDB::Table Properties: TableName: Orders AttributeDefinitions: - AttributeName: orderId AttributeType: S - AttributeName: customerId AttributeType: S - AttributeName: createdAt AttributeType: N KeySchema: - AttributeName: orderId KeyType: HASH GlobalSecondaryIndexes: - IndexName: CustomerIndex KeySchema: - AttributeName: customerId KeyType: HASH - AttributeName: createdAt KeyType: RANGE Projection: ProjectionType: ALL ProvisionedThroughput: ReadCapacityUnits: 5 WriteCapacityUnits: 5 StreamSpecification: StreamViewType: NEW_AND_OLD_IMAGES ProvisionedThroughput: ReadCapacityUnits: 5 WriteCapacityUnits: 5 Outputs: OrdersTableStreamArn: Value: !GetAtt OrdersTable.StreamArn Export: Name: OrdersTableStreamArn

Important: StreamViewType options:

  • KEYS_ONLY: Only the key attributes
  • NEW_IMAGE: The entire item after modification
  • OLD_IMAGE: The entire item before modification
  • NEW_AND_OLD_IMAGES: Both new and old images (most useful)

Step 2: Creating the Order Service (Producer) 📝

This service writes orders to DynamoDB, triggering events:

services/order-service/handler.go
package main import ( "context" "encoding/json" "time" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/google/uuid" ) type Order struct { OrderID string `json:"orderId" dynamodbav:"orderId"` CustomerID string `json:"customerId" dynamodbav:"customerId"` Items []Item `json:"items" dynamodbav:"items"` TotalPrice float64 `json:"totalPrice" dynamodbav:"totalPrice"` Status string `json:"status" dynamodbav:"status"` CreatedAt int64 `json:"createdAt" dynamodbav:"createdAt"` } type Item struct { ProductID string `json:"productId" dynamodbav:"productId"` Quantity int `json:"quantity" dynamodbav:"quantity"` Price float64 `json:"price" dynamodbav:"price"` } type CreateOrderRequest struct { CustomerID string `json:"customerId"` Items []Item `json:"items"` } var dynamoClient *dynamodb.Client func init() { cfg, err := config.LoadDefaultConfig(context.TODO()) if err != nil { panic(err) } dynamoClient = dynamodb.NewFromConfig(cfg) } func handler(ctx context.Context, request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) { var req CreateOrderRequest if err := json.Unmarshal([]byte(request.Body), &req); err != nil { return events.APIGatewayProxyResponse{ StatusCode: 400, Body: `{"error": "Invalid request body"}`, }, nil } // Calculate total price var totalPrice float64 for _, item := range req.Items { totalPrice += item.Price * float64(item.Quantity) } // Create order order := Order{ OrderID: uuid.New().String(), CustomerID: req.CustomerID, Items: req.Items, TotalPrice: totalPrice, Status: "PENDING", CreatedAt: time.Now().Unix(), } // Convert to DynamoDB format av, err := attributevalue.MarshalMap(order) if err != nil { return events.APIGatewayProxyResponse{ StatusCode: 500, Body: `{"error": "Failed to marshal order"}`, }, nil } // Put item in DynamoDB (this triggers the stream!) _, err = dynamoClient.PutItem(ctx, &dynamodb.PutItemInput{ TableName: aws.String("Orders"), Item: av, }) if err != nil { return events.APIGatewayProxyResponse{ StatusCode: 500, Body: `{"error": "Failed to create order"}`, }, nil } responseBody, _ := json.Marshal(order) return events.APIGatewayProxyResponse{ StatusCode: 201, Body: string(responseBody), }, nil } func main() { lambda.Start(handler) }
services/order-service/serverless.yml
service: order-service provider: name: aws runtime: go1.x region: us-east-1 iam: role: statements: - Effect: Allow Action: - dynamodb:PutItem - dynamodb:UpdateItem Resource: - arn:aws:dynamodb:us-east-1:*:table/Orders functions: create: handler: bin/handler events: - http: path: orders method: post cors: true

Step 3: Creating Consumer Services (Lambdas) 🎯

Now let's create Lambda functions that react to DynamoDB Stream events:

Notification Service

services/notification-service/handler.go
package main import ( "context" "encoding/json" "fmt" "log" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/ses" "github.com/aws/aws-sdk-go-v2/service/ses/types" "github.com/aws/aws-sdk-go-v2/aws" ) var sesClient *ses.Client func init() { cfg, err := config.LoadDefaultConfig(context.TODO()) if err != nil { panic(err) } sesClient = ses.NewFromConfig(cfg) } func handler(ctx context.Context, event events.DynamoDBEvent) error { for _, record := range event.Records { // Only process INSERT events (new orders) if record.EventName == "INSERT" { if err := processNewOrder(ctx, record); err != nil { log.Printf("Error processing record: %v", err) // Continue processing other records continue } } // Process MODIFY events (order status changes) if record.EventName == "MODIFY" { if err := processOrderUpdate(ctx, record); err != nil { log.Printf("Error processing update: %v", err) continue } } } return nil } func processNewOrder(ctx context.Context, record events.DynamoDBEventRecord) error { newImage := record.Change.NewImage orderId := newImage["orderId"].String() customerEmail := newImage["customerEmail"].String() totalPrice := newImage["totalPrice"].Number() log.Printf("Processing new order: %s for customer: %s", orderId, customerEmail) // Send email notification subject := "Order Confirmation" body := fmt.Sprintf( "Thank you for your order!\n\nOrder ID: %s\nTotal: $%s\n\nWe'll notify you when it ships.", orderId, totalPrice, ) return sendEmail(ctx, customerEmail, subject, body) } func processOrderUpdate(ctx context.Context, record events.DynamoDBEventRecord) error { oldStatus := record.Change.OldImage["status"].String() newStatus := record.Change.NewImage["status"].String() // Only send notification if status changed if oldStatus == newStatus { return nil } orderId := record.Change.NewImage["orderId"].String() customerEmail := record.Change.NewImage["customerEmail"].String() log.Printf("Order %s status changed: %s -> %s", orderId, oldStatus, newStatus) subject := "Order Status Update" body := fmt.Sprintf( "Your order status has been updated!\n\nOrder ID: %s\nNew Status: %s", orderId, newStatus, ) return sendEmail(ctx, customerEmail, subject, body) } func sendEmail(ctx context.Context, to, subject, body string) error { input := &ses.SendEmailInput{ Source: aws.String("noreply@yourcompany.com"), Destination: &types.Destination{ ToAddresses: []string{to}, }, Message: &types.Message{ Subject: &types.Content{ Data: aws.String(subject), }, Body: &types.Body{ Text: &types.Content{ Data: aws.String(body), }, }, }, } _, err := sesClient.SendEmail(ctx, input) return err } func main() { lambda.Start(handler) }
services/notification-service/serverless.yml
service: notification-service provider: name: aws runtime: go1.x region: us-east-1 iam: role: statements: - Effect: Allow Action: - ses:SendEmail Resource: "*" functions: processOrder: handler: bin/handler events: - stream: type: dynamodb arn: Fn::ImportValue: OrdersTableStreamArn batchSize: 10 startingPosition: LATEST maximumRetryAttempts: 3 enabled: true filterPatterns: - eventName: [INSERT, MODIFY]

Analytics Service

services/analytics-service/handler.go
package main import ( "context" "encoding/json" "log" "time" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/cloudwatch" "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types" ) var cwClient *cloudwatch.Client func init() { cfg, err := config.LoadDefaultConfig(context.TODO()) if err != nil { panic(err) } cwClient = cloudwatch.NewFromConfig(cfg) } func handler(ctx context.Context, event events.DynamoDBEvent) error { var totalRevenue float64 var orderCount int for _, record := range event.Records { if record.EventName == "INSERT" { orderCount++ // Extract total price from new image if priceAttr, ok := record.Change.NewImage["totalPrice"]; ok { var price float64 json.Unmarshal([]byte(priceAttr.Number()), &price) totalRevenue += price } } } if orderCount > 0 { // Send metrics to CloudWatch if err := publishMetrics(ctx, orderCount, totalRevenue); err != nil { log.Printf("Error publishing metrics: %v", err) return err } log.Printf("Processed %d orders with total revenue: $%.2f", orderCount, totalRevenue) } return nil } func publishMetrics(ctx context.Context, orderCount int, totalRevenue float64) error { timestamp := time.Now() _, err := cwClient.PutMetricData(ctx, &cloudwatch.PutMetricDataInput{ Namespace: aws.String("OrderService"), MetricData: []types.MetricDatum{ { MetricName: aws.String("OrderCount"), Value: aws.Float64(float64(orderCount)), Timestamp: &timestamp, Unit: types.StandardUnitCount, }, { MetricName: aws.String("Revenue"), Value: aws.Float64(totalRevenue), Timestamp: &timestamp, Unit: types.StandardUnitNone, }, }, }) return err } func main() { lambda.Start(handler) }

Inventory Service

services/inventory-service/handler.go
package main import ( "context" "encoding/json" "log" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" ) var dynamoClient *dynamodb.Client type OrderItem struct { ProductID string `json:"productId"` Quantity int `json:"quantity"` } func init() { cfg, err := config.LoadDefaultConfig(context.TODO()) if err != nil { panic(err) } dynamoClient = dynamodb.NewFromConfig(cfg) } func handler(ctx context.Context, event events.DynamoDBEvent) error { for _, record := range event.Records { if record.EventName == "INSERT" { if err := decrementInventory(ctx, record); err != nil { log.Printf("Error decrementing inventory: %v", err) // In production, you might want to send to DLQ here continue } } // Handle order cancellations if record.EventName == "MODIFY" { oldStatus := record.Change.OldImage["status"].String() newStatus := record.Change.NewImage["status"].String() if newStatus == "CANCELLED" && oldStatus != "CANCELLED" { if err := incrementInventory(ctx, record); err != nil { log.Printf("Error incrementing inventory: %v", err) continue } } } } return nil } func decrementInventory(ctx context.Context, record events.DynamoDBEventRecord) error { // Extract items from order itemsAttr := record.Change.NewImage["items"] var items []OrderItem // Parse items list if itemsAttr.DataType() == events.DataTypeList { for _, item := range itemsAttr.List() { var orderItem OrderItem itemMap := item.Map() orderItem.ProductID = itemMap["productId"].String() var qty int json.Unmarshal([]byte(itemMap["quantity"].Number()), &qty) orderItem.Quantity = qty items = append(items, orderItem) } } // Update inventory for each item for _, item := range items { log.Printf("Decrementing inventory for product %s by %d", item.ProductID, item.Quantity) _, err := dynamoClient.UpdateItem(ctx, &dynamodb.UpdateItemInput{ TableName: aws.String("Inventory"), Key: map[string]types.AttributeValue{ "productId": &types.AttributeValueMemberS{Value: item.ProductID}, }, UpdateExpression: aws.String("SET stock = stock - :qty, lastUpdated = :timestamp"), ExpressionAttributeValues: map[string]types.AttributeValue{ ":qty": &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", item.Quantity)}, ":timestamp": &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", time.Now().Unix())}, }, ConditionExpression: aws.String("stock >= :qty"), // Prevent negative stock }) if err != nil { return fmt.Errorf("failed to update inventory for product %s: %w", item.ProductID, err) } } return nil } func incrementInventory(ctx context.Context, record events.DynamoDBEventRecord) error { // Similar logic but with ADD instead of subtract // Implementation left as exercise return nil } func main() { lambda.Start(handler) }

Event Filtering and Batch Processing ⚡

Lambda allows you to filter events before processing:

serverless.yml
functions: processHighValueOrders: handler: bin/handler events: - stream: type: dynamodb arn: !GetAtt OrdersTable.StreamArn batchSize: 100 maximumBatchingWindowInSeconds: 10 startingPosition: LATEST filterPatterns: # Only process orders over $100 - eventName: [INSERT] dynamodb: NewImage: totalPrice: N: [{ numeric: [">=", 100] }]

Batch Processing Benefits:

  • Process multiple events at once
  • Reduce Lambda invocations (lower cost)
  • Batch window allows accumulation of events

Error Handling and Retry Strategy 🔄

services/common/error-handler.go
package common import ( "context" "encoding/json" "log" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/sqs" ) type ErrorHandler struct { sqsClient *sqs.Client dlqURL string } func NewErrorHandler(sqsClient *sqs.Client, dlqURL string) *ErrorHandler { return &ErrorHandler{ sqsClient: sqsClient, dlqURL: dlqURL, } } // ProcessWithRetry processes a record with error handling func (h *ErrorHandler) ProcessWithRetry( ctx context.Context, record events.DynamoDBEventRecord, processor func(context.Context, events.DynamoDBEventRecord) error, ) error { err := processor(ctx, record) if err != nil { log.Printf("Error processing record: %v", err) // Send to Dead Letter Queue for manual inspection if dlqErr := h.sendToDLQ(ctx, record, err); dlqErr != nil { log.Printf("Failed to send to DLQ: %v", dlqErr) } // Return error to trigger Lambda retry mechanism return err } return nil } func (h *ErrorHandler) sendToDLQ(ctx context.Context, record events.DynamoDBEventRecord, processingError error) error { message := map[string]interface{}{ "record": record, "error": processingError.Error(), } messageBody, err := json.Marshal(message) if err != nil { return err } _, err = h.sqsClient.SendMessage(ctx, &sqs.SendMessageInput{ QueueUrl: aws.String(h.dlqURL), MessageBody: aws.String(string(messageBody)), }) return err }

Monitoring and Observability 📊

Key metrics to monitor:

infrastructure/alarms.yml
resources: HighErrorRateAlarm: Type: AWS::CloudWatch::Alarm Properties: AlarmName: NotificationService-HighErrorRate AlarmDescription: Alert when error rate exceeds threshold MetricName: Errors Namespace: AWS/Lambda Statistic: Sum Period: 300 EvaluationPeriods: 2 Threshold: 10 ComparisonOperator: GreaterThanThreshold Dimensions: - Name: FunctionName Value: !Ref NotificationServiceFunction StreamIteratorAgeAlarm: Type: AWS::CloudWatch::Alarm Properties: AlarmName: DynamoDB-StreamLag AlarmDescription: Alert when stream processing is lagging MetricName: IteratorAge Namespace: AWS/Lambda Statistic: Maximum Period: 60 EvaluationPeriods: 3 Threshold: 60000 # 1 minute in milliseconds ComparisonOperator: GreaterThanThreshold Dimensions: - Name: FunctionName Value: !Ref NotificationServiceFunction

Advantages of This Pattern 🚀

1. Loose Coupling

Services don't know about each other. The order service doesn't call notification or inventory services directly.

2. Scalability

Each Lambda scales independently. High notification load doesn't affect inventory processing.

3. Reliability

Built-in retry mechanism. Failed events are automatically retried with exponential backoff.

4. Auditability

DynamoDB Streams provides a complete audit log of all changes for 24 hours.

5. Cost-Effective

Pay only for what you use. No idle servers waiting for events.

6. Easy to Extend

Adding a new consumer is simple - just create a new Lambda subscription to the stream.

Disadvantages and Considerations ⚠️

1. Eventual Consistency

Events are processed asynchronously. There's a delay between the write and event processing.

2. Ordering Challenges

While events for the same partition key are ordered, processing them in order requires careful design.

3. Duplicate Processing

Lambda guarantees "at least once" delivery. Your code must be idempotent.

4. 24-Hour Stream Retention

DynamoDB Streams only retains data for 24 hours. If a Lambda is down longer, events are lost.

5. Cold Starts

Lambda functions may experience cold starts, adding latency to event processing.

6. Debugging Complexity

Distributed event processing can be harder to debug than synchronous calls.

Best Practices 💡

1. Make Your Handlers Idempotent

~
// Bad: Not idempotent func processOrder(orderId string) { inventory.Decrement(productId, quantity) } // Good: Idempotent with deduplication func processOrder(orderId string, eventId string) { if alreadyProcessed(eventId) { log.Printf("Event %s already processed, skipping", eventId) return } inventory.Decrement(productId, quantity) markAsProcessed(eventId) }

2. Use Batch Processing

Process multiple records in a single invocation to reduce costs and improve throughput.

3. Implement Dead Letter Queues

Always configure a DLQ to capture events that repeatedly fail processing.

4. Monitor Iterator Age

High iterator age means your consumers are falling behind. Scale or optimize your Lambdas.

5. Use Event Filtering

Filter events at the source to reduce unnecessary Lambda invocations.

6. Structure Your Events Well

Include all necessary context in the event to avoid additional database lookups.

Real-World Use Cases 🌍

E-commerce Order Processing

  • Order created → Multiple services react (inventory, shipping, notifications)
  • Order status changed → Update customer, analytics, warehouse systems

User Activity Tracking

  • User action → Update recommendations, analytics, notifications
  • Profile updated → Sync across systems, update caches

Financial Transactions

  • Transaction created → Fraud detection, accounting, notifications
  • Account updated → Compliance checks, reporting

IoT Data Processing

  • Sensor data → Real-time analytics, alerting, data warehousing
  • Device status change → Monitoring, maintenance scheduling

Testing Event-Driven Systems 🧪

handler_test.go
package main import ( "context" "testing" "github.com/aws/aws-lambda-go/events" ) func TestOrderEventProcessing(t *testing.T) { // Create a mock DynamoDB event event := events.DynamoDBEvent{ Records: []events.DynamoDBEventRecord{ { EventName: "INSERT", Change: events.DynamoDBStreamRecord{ NewImage: map[string]events.DynamoDBAttributeValue{ "orderId": events.NewStringAttribute("12345"), "customerId": events.NewStringAttribute("customer-1"), "totalPrice": events.NewNumberAttribute("99.99"), "status": events.NewStringAttribute("PENDING"), }, }, }, }, } // Test the handler err := handler(context.Background(), event) if err != nil { t.Errorf("Expected no error, got %v", err) } // Verify expected side effects // (check email sent, inventory updated, etc.) }

Deployment 🚀

Deploy all services at once:

~
# Deploy infrastructure first cd infrastructure serverless deploy # Deploy all services cd ../services for dir in */; do cd "$dir" GOOS=linux GOARCH=amd64 go build -o bin/handler handler.go serverless deploy cd .. done

Conclusion 🎯

Event-Driven Architecture with DynamoDB Streams and Lambda provides a powerful, scalable pattern for building microservices. The key advantages are:

  1. Decoupled services that can evolve independently
  2. Automatic scaling without infrastructure management
  3. Cost-effective pay-per-use model
  4. Built-in reliability with retries and error handling
  5. Easy to extend with new consumers

However, remember the tradeoffs:

  • Eventual consistency instead of immediate
  • Requires idempotent handlers
  • More complex debugging
  • Need proper monitoring

When implemented correctly with proper error handling, monitoring, and idempotency, this pattern enables building highly scalable and resilient systems.

If you have questions about implementing event-driven architectures in your projects, feel free to reach out!

Visit my GitHub