Building Event-Driven Microservices with NATS, Go Fiber, and DDD 🚀
January 23, 2026 ¿Ves algún error? Corregir artículo
Event-Driven Architecture (EDA) combined with Event Sourcing and Domain-Driven Design (DDD) is one of the most powerful patterns for building scalable, resilient microservices. In this comprehensive guide, we'll build a complete e-commerce system using NATS as our event store and message broker, Go Fiber for REST APIs, PostgreSQL for read models only, and Cobra CLI for our command-line interface.
The key insight: events are the source of truth. We'll minimize database writes by replaying events to rebuild state, making our system more auditable, scalable, and fault-tolerant.
Why This Stack? 🤔
NATS JetStream
- Persistent event streaming with exactly-once delivery
- Event replay capability for rebuilding state
- Lightweight and fast (written in Go)
- Built-in clustering for high availability
- Stream retention policies for event storage
Go Fiber
- Express-like API for building REST endpoints
- Extremely fast (built on Fasthttp)
- Low memory footprint
- Easy middleware integration
Domain-Driven Design (DDD)
- Clear separation of concerns (Domain, Application, Infrastructure)
- Business logic encapsulation in domain entities
- Ubiquitous language shared with business stakeholders
Event Sourcing Pattern
- Events as source of truth instead of current state
- Full audit trail of all changes
- Time travel - rebuild state at any point
- Eventual consistency with CQRS (Command Query Responsibility Segregation)
PostgreSQL with GORM
- Read models only (projections from events)
- Query optimization for specific use cases
- Materialized views of event stream
Architecture Overview 🏗️
Our e-commerce system will have:
- Command Service (Fiber API) - Handles write operations, publishes events
- Event Store (NATS JetStream) - Persists all domain events
- Event Handlers - Subscribe to events, update read models
- Query Service (Fiber API) - Serves read models from PostgreSQL
- CLI (Cobra) - Administrative commands and event replay
cmd
server
main.go
cli
main.go
root.go
replay.go
internal
domain
order
aggregate.go
events.go
value_objects.go
application
commands
create_order.go
queries
get_order.go
infrastructure
nats
event_store.go
publisher.go
subscriber.go
postgres
repository.go
models.go
http
handlers.go
routes.go
Step 1: Project Setup with Cobra CLI 🛠️
First, let's set up our project structure using Cobra for CLI commands:
~# Initialize Go module go mod init github.com/yourusername/event-driven-ecommerce # Install dependencies go get github.com/gofiber/fiber/v2 go get github.com/nats-io/nats.go go get github.com/spf13/cobra go get gorm.io/gorm go get gorm.io/driver/postgres go get github.com/google/uuid
cmd/cli/main.gopackage main import ( "fmt" "os" "github.com/yourusername/event-driven-ecommerce/cmd/cli/commands" ) func main() { if err := commands.Execute(); err != nil { fmt.Fprintf(os.Stderr, "Error: %v\n", err) os.Exit(1) } }
cmd/cli/commands/root.gopackage commands import ( "github.com/spf13/cobra" ) var rootCmd = &cobra.Command{ Use: "ecommerce", Short: "Event-Driven E-commerce CLI", Long: "A CLI tool for managing event-driven e-commerce system", } func Execute() error { return rootCmd.Execute() } func init() { // Add subcommands rootCmd.AddCommand(replayCmd) rootCmd.AddCommand(serveCmd) }
cmd/cli/commands/replay.gopackage commands import ( "fmt" "log" "time" "github.com/spf13/cobra" "github.com/yourusername/event-driven-ecommerce/internal/infrastructure/nats" ) var ( streamName string fromTime string ) var replayCmd = &cobra.Command{ Use: "replay", Short: "Replay events from NATS JetStream", Long: "Rebuild read models by replaying events from a specific point in time", Run: func(cmd *cobra.Command, args []string) { fmt.Printf("Replaying events from stream: %s\n", streamName) var startTime time.Time var err error if fromTime != "" { startTime, err = time.Parse(time.RFC3339, fromTime) if err != nil { log.Fatalf("Invalid time format: %v", err) } } // Initialize NATS connection eventStore, err := nats.NewEventStore("nats://localhost:4222", streamName) if err != nil { log.Fatalf("Failed to connect to NATS: %v", err) } defer eventStore.Close() // Replay events if err := eventStore.ReplayFrom(startTime); err != nil { log.Fatalf("Failed to replay events: %v", err) } fmt.Println("Event replay completed successfully!") }, } func init() { replayCmd.Flags().StringVarP(&streamName, "stream", "s", "ORDERS", "NATS stream name") replayCmd.Flags().StringVarP(&fromTime, "from", "f", "", "Replay from this time (RFC3339 format)") }
Step 2: Domain Layer (DDD) 🎯
Let's implement our domain using DDD principles. We'll start with the Order aggregate:
internal/domain/order/value_objects.gopackage order import ( "errors" "github.com/google/uuid" ) // OrderID is a value object representing an order identifier type OrderID struct { value string } func NewOrderID() OrderID { return OrderID{value: uuid.New().String()} } func OrderIDFromString(id string) (OrderID, error) { if id == "" { return OrderID{}, errors.New("order ID cannot be empty") } return OrderID{value: id}, nil } func (o OrderID) String() string { return o.value } // Money value object type Money struct { Amount int64 // Stored in cents to avoid floating point issues Currency string } func NewMoney(amount int64, currency string) (Money, error) { if amount < 0 { return Money{}, errors.New("amount cannot be negative") } if currency == "" { return Money{}, errors.New("currency is required") } return Money{Amount: amount, Currency: currency}, nil } func (m Money) Add(other Money) (Money, error) { if m.Currency != other.Currency { return Money{}, errors.New("cannot add money with different currencies") } return Money{Amount: m.Amount + other.Amount, Currency: m.Currency}, nil } // OrderItem entity type OrderItem struct { ProductID string Quantity int UnitPrice Money } func NewOrderItem(productID string, quantity int, unitPrice Money) (OrderItem, error) { if productID == "" { return OrderItem{}, errors.New("product ID is required") } if quantity <= 0 { return OrderItem{}, errors.New("quantity must be positive") } return OrderItem{ ProductID: productID, Quantity: quantity, UnitPrice: unitPrice, }, nil } func (i OrderItem) TotalPrice() Money { return Money{ Amount: i.UnitPrice.Amount * int64(i.Quantity), Currency: i.UnitPrice.Currency, } } // OrderStatus value object type OrderStatus string const ( OrderStatusPending OrderStatus = "PENDING" OrderStatusConfirmed OrderStatus = "CONFIRMED" OrderStatusShipped OrderStatus = "SHIPPED" OrderStatusDelivered OrderStatus = "DELIVERED" OrderStatusCancelled OrderStatus = "CANCELLED" )
internal/domain/order/events.gopackage order import "time" // Domain Events - These are the facts that happened in our system type DomainEvent interface { EventType() string AggregateID() string OccurredAt() time.Time } // BaseEvent provides common event fields type BaseEvent struct { EventID string `json:"event_id"` AggID string `json:"aggregate_id"` EventTime time.Time `json:"event_time"` EventVer int `json:"event_version"` } func (e BaseEvent) AggregateID() string { return e.AggID } func (e BaseEvent) OccurredAt() time.Time { return e.EventTime } // OrderCreatedEvent type OrderCreatedEvent struct { BaseEvent CustomerID string `json:"customer_id"` Items []OrderItem `json:"items"` TotalPrice Money `json:"total_price"` } func (e OrderCreatedEvent) EventType() string { return "order.created" } // OrderConfirmedEvent type OrderConfirmedEvent struct { BaseEvent ConfirmedAt time.Time `json:"confirmed_at"` } func (e OrderConfirmedEvent) EventType() string { return "order.confirmed" } // OrderShippedEvent type OrderShippedEvent struct { BaseEvent TrackingNumber string `json:"tracking_number"` ShippedAt time.Time `json:"shipped_at"` } func (e OrderShippedEvent) EventType() string { return "order.shipped" } // OrderCancelledEvent type OrderCancelledEvent struct { BaseEvent Reason string `json:"reason"` CancelledAt time.Time `json:"cancelled_at"` } func (e OrderCancelledEvent) EventType() string { return "order.cancelled" } // ItemAddedToOrderEvent type ItemAddedToOrderEvent struct { BaseEvent Item OrderItem `json:"item"` } func (e ItemAddedToOrderEvent) EventType() string { return "order.item.added" }
internal/domain/order/aggregate.gopackage order import ( "errors" "time" "github.com/google/uuid" ) // OrderAggregate is our domain aggregate root // It enforces business rules and emits domain events type OrderAggregate struct { id OrderID customerID string items []OrderItem status OrderStatus totalPrice Money version int uncommittedEvents []DomainEvent } // NewOrderAggregate creates a new order (command) func NewOrderAggregate(customerID string, items []OrderItem) (*OrderAggregate, error) { if customerID == "" { return nil, errors.New("customer ID is required") } if len(items) == 0 { return nil, errors.New("order must have at least one item") } // Calculate total price totalPrice := Money{Amount: 0, Currency: items[0].UnitPrice.Currency} for _, item := range items { itemTotal := item.TotalPrice() var err error totalPrice, err = totalPrice.Add(itemTotal) if err != nil { return nil, err } } aggregate := &OrderAggregate{} // Create and apply event event := OrderCreatedEvent{ BaseEvent: BaseEvent{ EventID: uuid.New().String(), AggID: NewOrderID().String(), EventTime: time.Now(), EventVer: 1, }, CustomerID: customerID, Items: items, TotalPrice: totalPrice, } aggregate.applyOrderCreated(event) aggregate.uncommittedEvents = append(aggregate.uncommittedEvents, event) return aggregate, nil } // LoadFromEvents rebuilds aggregate state from event history func LoadFromEvents(events []DomainEvent) (*OrderAggregate, error) { if len(events) == 0 { return nil, errors.New("cannot load aggregate from empty event list") } aggregate := &OrderAggregate{} for _, event := range events { if err := aggregate.ApplyEvent(event); err != nil { return nil, err } } return aggregate, nil } // ApplyEvent applies an event to the aggregate func (a *OrderAggregate) ApplyEvent(event DomainEvent) error { switch e := event.(type) { case OrderCreatedEvent: a.applyOrderCreated(e) case OrderConfirmedEvent: a.applyOrderConfirmed(e) case OrderShippedEvent: a.applyOrderShipped(e) case OrderCancelledEvent: a.applyOrderCancelled(e) case ItemAddedToOrderEvent: a.applyItemAdded(e) default: return errors.New("unknown event type") } a.version++ return nil } // Business logic methods that emit events func (a *OrderAggregate) Confirm() error { if a.status != OrderStatusPending { return errors.New("only pending orders can be confirmed") } event := OrderConfirmedEvent{ BaseEvent: BaseEvent{ EventID: uuid.New().String(), AggID: a.id.String(), EventTime: time.Now(), EventVer: a.version + 1, }, ConfirmedAt: time.Now(), } a.applyOrderConfirmed(event) a.uncommittedEvents = append(a.uncommittedEvents, event) return nil } func (a *OrderAggregate) Ship(trackingNumber string) error { if a.status != OrderStatusConfirmed { return errors.New("only confirmed orders can be shipped") } if trackingNumber == "" { return errors.New("tracking number is required") } event := OrderShippedEvent{ BaseEvent: BaseEvent{ EventID: uuid.New().String(), AggID: a.id.String(), EventTime: time.Now(), EventVer: a.version + 1, }, TrackingNumber: trackingNumber, ShippedAt: time.Now(), } a.applyOrderShipped(event) a.uncommittedEvents = append(a.uncommittedEvents, event) return nil } func (a *OrderAggregate) Cancel(reason string) error { if a.status == OrderStatusShipped || a.status == OrderStatusDelivered { return errors.New("shipped or delivered orders cannot be cancelled") } if a.status == OrderStatusCancelled { return errors.New("order is already cancelled") } event := OrderCancelledEvent{ BaseEvent: BaseEvent{ EventID: uuid.New().String(), AggID: a.id.String(), EventTime: time.Now(), EventVer: a.version + 1, }, Reason: reason, CancelledAt: time.Now(), } a.applyOrderCancelled(event) a.uncommittedEvents = append(a.uncommittedEvents, event) return nil } // Event application methods (state changes) func (a *OrderAggregate) applyOrderCreated(event OrderCreatedEvent) { id, _ := OrderIDFromString(event.AggID) a.id = id a.customerID = event.CustomerID a.items = event.Items a.totalPrice = event.TotalPrice a.status = OrderStatusPending } func (a *OrderAggregate) applyOrderConfirmed(event OrderConfirmedEvent) { a.status = OrderStatusConfirmed } func (a *OrderAggregate) applyOrderShipped(event OrderShippedEvent) { a.status = OrderStatusShipped } func (a *OrderAggregate) applyOrderCancelled(event OrderCancelledEvent) { a.status = OrderStatusCancelled } func (a *OrderAggregate) applyItemAdded(event ItemAddedToOrderEvent) { a.items = append(a.items, event.Item) itemTotal := event.Item.TotalPrice() a.totalPrice, _ = a.totalPrice.Add(itemTotal) } // Getters func (a *OrderAggregate) ID() OrderID { return a.id } func (a *OrderAggregate) Status() OrderStatus { return a.status } func (a *OrderAggregate) GetUncommittedEvents() []DomainEvent { return a.uncommittedEvents } func (a *OrderAggregate) ClearUncommittedEvents() { a.uncommittedEvents = nil } func (a *OrderAggregate) Version() int { return a.version }
Step 3: NATS Event Store Infrastructure 📨
Now let's implement NATS JetStream as our event store:
internal/infrastructure/nats/event_store.gopackage nats import ( "encoding/json" "fmt" "time" "github.com/nats-io/nats.go" "github.com/yourusername/event-driven-ecommerce/internal/domain/order" ) type EventStore struct { conn *nats.Conn js nats.JetStreamContext stream string } func NewEventStore(url, streamName string) (*EventStore, error) { // Connect to NATS nc, err := nats.Connect(url) if err != nil { return nil, fmt.Errorf("failed to connect to NATS: %w", err) } // Get JetStream context js, err := nc.JetStream() if err != nil { return nil, fmt.Errorf("failed to get JetStream context: %w", err) } // Create or update stream streamConfig := &nats.StreamConfig{ Name: streamName, Subjects: []string{streamName + ".*"}, Storage: nats.FileStorage, Retention: nats.LimitsPolicy, MaxAge: 365 * 24 * time.Hour, // Keep events for 1 year Replicas: 1, } _, err = js.AddStream(streamConfig) if err != nil { // Stream might already exist, try to update _, err = js.UpdateStream(streamConfig) if err != nil { return nil, fmt.Errorf("failed to create/update stream: %w", err) } } return &EventStore{ conn: nc, js: js, stream: streamName, }, nil } func (es *EventStore) SaveEvents(aggregateID string, events []order.DomainEvent) error { for _, event := range events { data, err := json.Marshal(event) if err != nil { return fmt.Errorf("failed to marshal event: %w", err) } subject := fmt.Sprintf("%s.%s", es.stream, event.EventType()) // Publish to JetStream with metadata _, err = es.js.Publish(subject, data, nats.MsgId(event.AggregateID())) if err != nil { return fmt.Errorf("failed to publish event: %w", err) } } return nil } func (es *EventStore) LoadEvents(aggregateID string) ([]order.DomainEvent, error) { // Create a consumer for this specific aggregate consumerName := fmt.Sprintf("loader_%s", aggregateID) // Subscribe to all events for this aggregate subject := fmt.Sprintf("%s.*", es.stream) sub, err := es.js.PullSubscribe(subject, consumerName, nats.DeliverAll()) if err != nil { return nil, fmt.Errorf("failed to subscribe: %w", err) } defer sub.Unsubscribe() var events []order.DomainEvent // Fetch events (max 100 at a time) msgs, err := sub.Fetch(100, nats.MaxWait(2*time.Second)) if err != nil && err != nats.ErrTimeout { return nil, fmt.Errorf("failed to fetch messages: %w", err) } for _, msg := range msgs { // Deserialize based on event type event, err := es.deserializeEvent(msg.Data, msg.Subject) if err != nil { return nil, err } // Only include events for this aggregate if event.AggregateID() == aggregateID { events = append(events, event) msg.Ack() } } return events, nil } func (es *EventStore) deserializeEvent(data []byte, subject string) (order.DomainEvent, error) { // Extract event type from subject (e.g., "ORDERS.order.created" -> "order.created") var baseEvent order.BaseEvent if err := json.Unmarshal(data, &baseEvent); err != nil { return nil, err } // Deserialize to concrete type based on subject switch { case contains(subject, "order.created"): var event order.OrderCreatedEvent if err := json.Unmarshal(data, &event); err != nil { return nil, err } return event, nil case contains(subject, "order.confirmed"): var event order.OrderConfirmedEvent if err := json.Unmarshal(data, &event); err != nil { return nil, err } return event, nil case contains(subject, "order.shipped"): var event order.OrderShippedEvent if err := json.Unmarshal(data, &event); err != nil { return nil, err } return event, nil case contains(subject, "order.cancelled"): var event order.OrderCancelledEvent if err := json.Unmarshal(data, &event); err != nil { return nil, err } return event, nil default: return nil, fmt.Errorf("unknown event type: %s", subject) } } func (es *EventStore) ReplayFrom(startTime time.Time) error { // Subscribe from a specific time sub, err := es.js.Subscribe( fmt.Sprintf("%s.*", es.stream), func(msg *nats.Msg) { // Process each event to rebuild read models fmt.Printf("Replaying event: %s\n", msg.Subject) // Here you would trigger your event handlers msg.Ack() }, nats.DeliverByStartTime(startTime), nats.ManualAck(), ) if err != nil { return err } defer sub.Unsubscribe() // Wait for replay to complete time.Sleep(5 * time.Second) return nil } func (es *EventStore) Close() { if es.conn != nil { es.conn.Close() } } func contains(s, substr string) bool { return len(s) >= len(substr) && s[len(s)-len(substr):] == substr }
internal/infrastructure/nats/subscriber.gopackage nats import ( "encoding/json" "fmt" "log" "github.com/nats-io/nats.go" ) type EventHandler func(eventType string, data []byte) error type Subscriber struct { js nats.JetStreamContext stream string consumerName string handlers map[string]EventHandler } func NewSubscriber(js nats.JetStreamContext, stream, consumerName string) *Subscriber { return &Subscriber{ js: js, stream: stream, consumerName: consumerName, handlers: make(map[string]EventHandler), } } func (s *Subscriber) RegisterHandler(eventType string, handler EventHandler) { s.handlers[eventType] = handler } func (s *Subscriber) Start() error { subject := fmt.Sprintf("%s.*", s.stream) // Create durable consumer _, err := s.js.Subscribe( subject, s.handleMessage, nats.Durable(s.consumerName), nats.ManualAck(), nats.DeliverAll(), nats.AckWait(30*time.Second), ) return err } func (s *Subscriber) handleMessage(msg *nats.Msg) { // Extract event type from subject parts := strings.Split(msg.Subject, ".") if len(parts) < 2 { log.Printf("Invalid subject format: %s", msg.Subject) msg.Nak() return } eventType := strings.Join(parts[1:], ".") // Find and execute handler if handler, ok := s.handlers[eventType]; ok { if err := handler(eventType, msg.Data); err != nil { log.Printf("Error handling event %s: %v", eventType, err) msg.Nak() return } msg.Ack() } else { log.Printf("No handler for event type: %s", eventType) msg.Ack() // Acknowledge anyway to avoid redelivery } }
Step 4: PostgreSQL Read Models (Projections Only) 💾
PostgreSQL is used ONLY for read models - materialized views of our event stream:
internal/infrastructure/postgres/models.gopackage postgres import ( "time" "gorm.io/gorm" ) // OrderReadModel - Read-only projection built from events type OrderReadModel struct { ID string `gorm:"primaryKey"` CustomerID string `gorm:"index"` Status string `gorm:"index"` TotalAmount int64 // In cents Currency string TrackingNumber string CreatedAt time.Time `gorm:"index"` UpdatedAt time.Time CancelledAt *time.Time ShippedAt *time.Time } // OrderItemReadModel type OrderItemReadModel struct { ID uint `gorm:"primaryKey"` OrderID string `gorm:"index"` ProductID string Quantity int UnitPrice int64 // In cents Currency string } // Migrate creates the tables func Migrate(db *gorm.DB) error { return db.AutoMigrate( &OrderReadModel{}, &OrderItemReadModel{}, ) }
internal/infrastructure/postgres/repository.gopackage postgres import ( "fmt" "gorm.io/gorm" ) type ReadRepository struct { db *gorm.DB } func NewReadRepository(db *gorm.DB) *ReadRepository { return &ReadRepository{db: db} } // UpdateOrderProjection updates the read model from an event func (r *ReadRepository) UpdateOrderProjection(order *OrderReadModel) error { // Upsert operation return r.db.Save(order).Error } func (r *ReadRepository) AddOrderItem(item *OrderItemReadModel) error { return r.db.Create(item).Error } func (r *ReadRepository) GetOrder(orderID string) (*OrderReadModel, error) { var order OrderReadModel err := r.db.First(&order, "id = ?", orderID).Error if err != nil { return nil, err } return &order, nil } func (r *ReadRepository) GetOrderWithItems(orderID string) (*OrderReadModel, []OrderItemReadModel, error) { var order OrderReadModel var items []OrderItemReadModel if err := r.db.First(&order, "id = ?", orderID).Error; err != nil { return nil, nil, err } if err := r.db.Find(&items, "order_id = ?", orderID).Error; err != nil { return nil, nil, err } return &order, items, nil } func (r *ReadRepository) ListOrders(customerID string, status string, limit, offset int) ([]OrderReadModel, error) { var orders []OrderReadModel query := r.db.Model(&OrderReadModel{}) if customerID != "" { query = query.Where("customer_id = ?", customerID) } if status != "" { query = query.Where("status = ?", status) } err := query. Order("created_at DESC"). Limit(limit). Offset(offset). Find(&orders).Error return orders, err }
Step 5: Event Handlers (Update Read Models) 🔄
Event handlers listen to NATS and update PostgreSQL read models:
internal/application/event_handlers.gopackage application import ( "encoding/json" "log" "time" "github.com/yourusername/event-driven-ecommerce/internal/domain/order" "github.com/yourusername/event-driven-ecommerce/internal/infrastructure/postgres" ) type EventHandlers struct { repo *postgres.ReadRepository } func NewEventHandlers(repo *postgres.ReadRepository) *EventHandlers { return &EventHandlers{repo: repo} } func (h *EventHandlers) HandleOrderCreated(eventType string, data []byte) error { var event order.OrderCreatedEvent if err := json.Unmarshal(data, &event); err != nil { return err } log.Printf("Handling OrderCreated: %s", event.AggID) // Create read model orderModel := &postgres.OrderReadModel{ ID: event.AggID, CustomerID: event.CustomerID, Status: string(order.OrderStatusPending), TotalAmount: event.TotalPrice.Amount, Currency: event.TotalPrice.Currency, CreatedAt: event.EventTime, UpdatedAt: event.EventTime, } if err := h.repo.UpdateOrderProjection(orderModel); err != nil { return err } // Create order items for _, item := range event.Items { itemModel := &postgres.OrderItemReadModel{ OrderID: event.AggID, ProductID: item.ProductID, Quantity: item.Quantity, UnitPrice: item.UnitPrice.Amount, Currency: item.UnitPrice.Currency, } if err := h.repo.AddOrderItem(itemModel); err != nil { return err } } return nil } func (h *EventHandlers) HandleOrderConfirmed(eventType string, data []byte) error { var event order.OrderConfirmedEvent if err := json.Unmarshal(data, &event); err != nil { return err } log.Printf("Handling OrderConfirmed: %s", event.AggID) orderModel, err := h.repo.GetOrder(event.AggID) if err != nil { return err } orderModel.Status = string(order.OrderStatusConfirmed) orderModel.UpdatedAt = event.EventTime return h.repo.UpdateOrderProjection(orderModel) } func (h *EventHandlers) HandleOrderShipped(eventType string, data []byte) error { var event order.OrderShippedEvent if err := json.Unmarshal(data, &event); err != nil { return err } log.Printf("Handling OrderShipped: %s", event.AggID) orderModel, err := h.repo.GetOrder(event.AggID) if err != nil { return err } orderModel.Status = string(order.OrderStatusShipped) orderModel.TrackingNumber = event.TrackingNumber shippedAt := event.ShippedAt orderModel.ShippedAt = &shippedAt orderModel.UpdatedAt = event.EventTime return h.repo.UpdateOrderProjection(orderModel) } func (h *EventHandlers) HandleOrderCancelled(eventType string, data []byte) error { var event order.OrderCancelledEvent if err := json.Unmarshal(data, &event); err != nil { return err } log.Printf("Handling OrderCancelled: %s", event.AggID) orderModel, err := h.repo.GetOrder(event.AggID) if err != nil { return err } orderModel.Status = string(order.OrderStatusCancelled) cancelledAt := event.CancelledAt orderModel.CancelledAt = &cancelledAt orderModel.UpdatedAt = event.EventTime return h.repo.UpdateOrderProjection(orderModel) }
Step 6: Application Commands and Queries (CQRS) 📝
Separate read and write operations:
internal/application/commands/create_order.gopackage commands import ( "github.com/yourusername/event-driven-ecommerce/internal/domain/order" natsinfra "github.com/yourusername/event-driven-ecommerce/internal/infrastructure/nats" ) type CreateOrderCommand struct { CustomerID string Items []order.OrderItem } type CreateOrderHandler struct { eventStore *natsinfra.EventStore } func NewCreateOrderHandler(eventStore *natsinfra.EventStore) *CreateOrderHandler { return &CreateOrderHandler{eventStore: eventStore} } func (h *CreateOrderHandler) Handle(cmd CreateOrderCommand) (string, error) { // Create new aggregate aggregate, err := order.NewOrderAggregate(cmd.CustomerID, cmd.Items) if err != nil { return "", err } // Save events to NATS (event store) events := aggregate.GetUncommittedEvents() if err := h.eventStore.SaveEvents(aggregate.ID().String(), events); err != nil { return "", err } aggregate.ClearUncommittedEvents() return aggregate.ID().String(), nil } // ConfirmOrderCommand type ConfirmOrderCommand struct { OrderID string } type ConfirmOrderHandler struct { eventStore *natsinfra.EventStore } func NewConfirmOrderHandler(eventStore *natsinfra.EventStore) *ConfirmOrderHandler { return &ConfirmOrderHandler{eventStore: eventStore} } func (h *ConfirmOrderHandler) Handle(cmd ConfirmOrderCommand) error { // Load aggregate from events events, err := h.eventStore.LoadEvents(cmd.OrderID) if err != nil { return err } aggregate, err := order.LoadFromEvents(events) if err != nil { return err } // Execute business logic if err := aggregate.Confirm(); err != nil { return err } // Save new events newEvents := aggregate.GetUncommittedEvents() if err := h.eventStore.SaveEvents(aggregate.ID().String(), newEvents); err != nil { return err } aggregate.ClearUncommittedEvents() return nil }
internal/application/queries/get_order.gopackage queries import ( "github.com/yourusername/event-driven-ecommerce/internal/infrastructure/postgres" ) type GetOrderQuery struct { OrderID string } type OrderDTO struct { ID string CustomerID string Status string TotalAmount int64 Currency string TrackingNumber string Items []OrderItemDTO CreatedAt string UpdatedAt string } type OrderItemDTO struct { ProductID string Quantity int UnitPrice int64 Currency string } type GetOrderHandler struct { repo *postgres.ReadRepository } func NewGetOrderHandler(repo *postgres.ReadRepository) *GetOrderHandler { return &GetOrderHandler{repo: repo} } func (h *GetOrderHandler) Handle(query GetOrderQuery) (*OrderDTO, error) { order, items, err := h.repo.GetOrderWithItems(query.OrderID) if err != nil { return nil, err } dto := &OrderDTO{ ID: order.ID, CustomerID: order.CustomerID, Status: order.Status, TotalAmount: order.TotalAmount, Currency: order.Currency, TrackingNumber: order.TrackingNumber, CreatedAt: order.CreatedAt.Format(time.RFC3339), UpdatedAt: order.UpdatedAt.Format(time.RFC3339), Items: make([]OrderItemDTO, 0, len(items)), } for _, item := range items { dto.Items = append(dto.Items, OrderItemDTO{ ProductID: item.ProductID, Quantity: item.Quantity, UnitPrice: item.UnitPrice, Currency: item.Currency, }) } return dto, nil }
Step 7: Go Fiber HTTP Handlers 🌐
Create REST API endpoints using Fiber:
internal/infrastructure/http/handlers.gopackage http import ( "github.com/gofiber/fiber/v2" "github.com/yourusername/event-driven-ecommerce/internal/application/commands" "github.com/yourusername/event-driven-ecommerce/internal/application/queries" "github.com/yourusername/event-driven-ecommerce/internal/domain/order" ) type OrderHandlers struct { createOrderHandler *commands.CreateOrderHandler confirmOrderHandler *commands.ConfirmOrderHandler getOrderHandler *queries.GetOrderHandler } func NewOrderHandlers( createOrderHandler *commands.CreateOrderHandler, confirmOrderHandler *commands.ConfirmOrderHandler, getOrderHandler *queries.GetOrderHandler, ) *OrderHandlers { return &OrderHandlers{ createOrderHandler: createOrderHandler, confirmOrderHandler: confirmOrderHandler, getOrderHandler: getOrderHandler, } } type CreateOrderRequest struct { CustomerID string `json:"customer_id"` Items []OrderItemRequest `json:"items"` } type OrderItemRequest struct { ProductID string `json:"product_id"` Quantity int `json:"quantity"` UnitPrice int64 `json:"unit_price"` Currency string `json:"currency"` } func (h *OrderHandlers) CreateOrder(c *fiber.Ctx) error { var req CreateOrderRequest if err := c.BodyParser(&req); err != nil { return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{ "error": "Invalid request body", }) } // Convert to domain entities items := make([]order.OrderItem, 0, len(req.Items)) for _, itemReq := range req.Items { money, err := order.NewMoney(itemReq.UnitPrice, itemReq.Currency) if err != nil { return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{ "error": err.Error(), }) } item, err := order.NewOrderItem(itemReq.ProductID, itemReq.Quantity, money) if err != nil { return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{ "error": err.Error(), }) } items = append(items, item) } // Execute command cmd := commands.CreateOrderCommand{ CustomerID: req.CustomerID, Items: items, } orderID, err := h.createOrderHandler.Handle(cmd) if err != nil { return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{ "error": err.Error(), }) } return c.Status(fiber.StatusCreated).JSON(fiber.Map{ "order_id": orderID, "message": "Order created successfully", }) } func (h *OrderHandlers) GetOrder(c *fiber.Ctx) error { orderID := c.Params("id") query := queries.GetOrderQuery{OrderID: orderID} orderDTO, err := h.getOrderHandler.Handle(query) if err != nil { return c.Status(fiber.StatusNotFound).JSON(fiber.Map{ "error": "Order not found", }) } return c.JSON(orderDTO) } func (h *OrderHandlers) ConfirmOrder(c *fiber.Ctx) error { orderID := c.Params("id") cmd := commands.ConfirmOrderCommand{OrderID: orderID} if err := h.confirmOrderHandler.Handle(cmd); err != nil { return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{ "error": err.Error(), }) } return c.JSON(fiber.Map{ "message": "Order confirmed successfully", }) }
internal/infrastructure/http/routes.gopackage http import ( "github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2/middleware/logger" "github.com/gofiber/fiber/v2/middleware/recover" ) func SetupRoutes(app *fiber.App, handlers *OrderHandlers) { // Middleware app.Use(logger.New()) app.Use(recover.New()) // Health check app.Get("/health", func(c *fiber.Ctx) error { return c.JSON(fiber.Map{"status": "ok"}) }) // API routes api := app.Group("/api/v1") // Orders orders := api.Group("/orders") orders.Post("/", handlers.CreateOrder) orders.Get("/:id", handlers.GetOrder) orders.Post("/:id/confirm", handlers.ConfirmOrder) }
Step 8: Main Server Application 🚀
Wire everything together:
cmd/server/main.gopackage main import ( "log" "os" "github.com/gofiber/fiber/v2" "gorm.io/driver/postgres" "gorm.io/gorm" "github.com/yourusername/event-driven-ecommerce/internal/application" "github.com/yourusername/event-driven-ecommerce/internal/application/commands" "github.com/yourusername/event-driven-ecommerce/internal/application/queries" httpinfra "github.com/yourusername/event-driven-ecommerce/internal/infrastructure/http" natsinfra "github.com/yourusername/event-driven-ecommerce/internal/infrastructure/nats" postgresinfra "github.com/yourusername/event-driven-ecommerce/internal/infrastructure/postgres" ) func main() { // Load configuration from environment natsURL := getEnv("NATS_URL", "nats://localhost:4222") postgresURL := getEnv("POSTGRES_URL", "postgresql://user:password@localhost:5432/ecommerce?sslmode=disable") port := getEnv("PORT", "3000") // Connect to PostgreSQL db, err := gorm.Open(postgres.Open(postgresURL), &gorm.Config{}) if err != nil { log.Fatalf("Failed to connect to database: %v", err) } // Run migrations if err := postgresinfra.Migrate(db); err != nil { log.Fatalf("Failed to migrate database: %v", err) } // Initialize repositories readRepo := postgresinfra.NewReadRepository(db) // Initialize NATS event store eventStore, err := natsinfra.NewEventStore(natsURL, "ORDERS") if err != nil { log.Fatalf("Failed to initialize NATS event store: %v", err) } defer eventStore.Close() // Initialize command handlers createOrderHandler := commands.NewCreateOrderHandler(eventStore) confirmOrderHandler := commands.NewConfirmOrderHandler(eventStore) // Initialize query handlers getOrderHandler := queries.NewGetOrderHandler(readRepo) // Initialize event handlers eventHandlers := application.NewEventHandlers(readRepo) // Setup NATS subscriber js, _ := eventStore.JetStream() subscriber := natsinfra.NewSubscriber(js, "ORDERS", "order-projector") subscriber.RegisterHandler("order.created", eventHandlers.HandleOrderCreated) subscriber.RegisterHandler("order.confirmed", eventHandlers.HandleOrderConfirmed) subscriber.RegisterHandler("order.shipped", eventHandlers.HandleOrderShipped) subscriber.RegisterHandler("order.cancelled", eventHandlers.HandleOrderCancelled) // Start subscriber if err := subscriber.Start(); err != nil { log.Fatalf("Failed to start subscriber: %v", err) } // Initialize Fiber app app := fiber.New(fiber.Config{ AppName: "Event-Driven E-commerce API", }) // Setup HTTP handlers orderHandlers := httpinfra.NewOrderHandlers( createOrderHandler, confirmOrderHandler, getOrderHandler, ) httpinfra.SetupRoutes(app, orderHandlers) // Start server log.Printf("Server starting on port %s", port) if err := app.Listen(":" + port); err != nil { log.Fatalf("Failed to start server: %v", err) } } func getEnv(key, defaultValue string) string { if value := os.Getenv(key); value != "" { return value } return defaultValue }
Step 9: Docker Compose Setup 🐳
docker-compose.ymlversion: '3.8' services: nats: image: nats:latest ports: - "4222:4222" - "8222:8222" command: ["-js", "-m", "8222"] volumes: - nats-data:/data postgres: image: postgres:15 environment: POSTGRES_USER: user POSTGRES_PASSWORD: password POSTGRES_DB: ecommerce ports: - "5432:5432" volumes: - postgres-data:/var/lib/postgresql/data app: build: . ports: - "3000:3000" environment: NATS_URL: "nats://nats:4222" POSTGRES_URL: "postgresql://user:password@postgres:5432/ecommerce?sslmode=disable" depends_on: - nats - postgres volumes: nats-data: postgres-data:
Testing the System 🧪
~# Start services docker-compose up -d # Create an order curl -X POST http://localhost:3000/api/v1/orders \ -H "Content-Type: application/json" \ -d '{ "customer_id": "customer-123", "items": [ { "product_id": "product-1", "quantity": 2, "unit_price": 2999, "currency": "USD" }, { "product_id": "product-2", "quantity": 1, "unit_price": 4999, "currency": "USD" } ] }' # Response: {"order_id": "uuid", "message": "Order created successfully"} # Get order curl http://localhost:3000/api/v1/orders/<order-id> # Confirm order curl -X POST http://localhost:3000/api/v1/orders/<order-id>/confirm # Replay events from beginning (using CLI) go run cmd/cli/main.go replay --stream ORDERS --from 2026-01-01T00:00:00Z
Advantages of This Architecture 🌟
1. Event Sourcing Benefits
- Complete audit trail - Every state change is recorded
- Time travel - Rebuild state at any point in history
- Event replay - Recover from data corruption by replaying events
- Debugging - See exact sequence of events that led to current state
2. Minimal Database Writes
- Events are the source of truth - PostgreSQL only for read optimization
- Reduced database load - Only write when creating projections
- Better scalability - Event store handles writes, DB handles reads
3. CQRS Separation
- Optimized reads - Query models tailored for specific use cases
- Optimized writes - Domain models focus on business logic
- Independent scaling - Scale read and write sides independently
4. Domain-Driven Design
- Clear business logic - Domain layer contains all rules
- Ubiquitous language - Code matches business terminology
- Bounded contexts - Each aggregate is self-contained
5. Resilience
- Event persistence - NATS JetStream ensures no event loss
- Retry mechanisms - Failed event processing can be retried
- Eventual consistency - System tolerates temporary inconsistencies
Disadvantages and Challenges ⚠️
1. Complexity
Event sourcing adds significant complexity compared to CRUD operations. The learning curve is steep.
2. Eventual Consistency
Read models are updated asynchronously. Queries might not reflect the latest writes immediately.
3. Event Schema Evolution
Changing event structure requires versioning strategy. Old events must still be processable.
4. Debugging Distributed Systems
Tracing issues across events, handlers, and projections can be challenging.
5. Query Limitations
You can only query what you've projected. Ad-hoc queries require new projections.
6. Storage Growth
Events accumulate over time. Need strategies for archival and cleanup.
Best Practices 💡
1. Event Versioning
~type OrderCreatedEventV1 struct { BaseEvent CustomerID string Items []OrderItem } type OrderCreatedEventV2 struct { BaseEvent CustomerID string Items []OrderItem ShippingMethod string // New field } // Handle both versions func deserializeOrderCreated(version int, data []byte) (DomainEvent, error) { switch version { case 1: var event OrderCreatedEventV1 json.Unmarshal(data, &event) // Migrate to V2 if needed return event, nil case 2: var event OrderCreatedEventV2 json.Unmarshal(data, &event) return event, nil default: return nil, errors.New("unknown version") } }
2. Idempotent Event Handlers
~func (h *EventHandlers) HandleOrderCreated(eventType string, data []byte) error { var event order.OrderCreatedEvent json.Unmarshal(data, &event) // Check if already processed (idempotency) existing, err := h.repo.GetOrder(event.AggID) if err == nil && existing != nil { log.Printf("Order %s already exists, skipping", event.AggID) return nil // Already processed } // Process event... }
3. Snapshot Pattern for Large Aggregates
~type OrderSnapshot struct { AggregateID string Version int State OrderAggregate CreatedAt time.Time } // Load from snapshot + subsequent events func LoadFromSnapshot(snapshot OrderSnapshot, events []DomainEvent) (*OrderAggregate, error) { aggregate := snapshot.State for _, event := range events { if event.Version() <= snapshot.Version { continue // Skip already applied events } aggregate.ApplyEvent(event) } return &aggregate, nil }
4. Event Metadata
~type EventMetadata struct { UserID string CorrelationID string CausationID string IP string Timestamp time.Time } type EnrichedEvent struct { Event DomainEvent Metadata EventMetadata }
Real-World Scenarios 🌍
Scenario 1: Order Cancellation with Inventory Restoration
When an order is cancelled, inventory must be restored. Events make this easy:
~// Handler for order cancelled event func (h *InventoryHandlers) HandleOrderCancelled(data []byte) error { var event order.OrderCancelledEvent json.Unmarshal(data, &event) // Get original order creation event to know what to restore events, _ := h.eventStore.LoadEvents(event.AggID) var createEvent order.OrderCreatedEvent for _, e := range events { if e.EventType() == "order.created" { createEvent = e.(order.OrderCreatedEvent) break } } // Restore inventory for each item for _, item := range createEvent.Items { h.inventoryService.RestoreStock(item.ProductID, item.Quantity) } return nil }
Scenario 2: Generating Reports from Events
~// Generate revenue report by replaying events func GenerateRevenueReport(from, to time.Time) (*Report, error) { var totalRevenue int64 var orderCount int events := eventStore.LoadEventsBetween("ORDERS", from, to) for _, event := range events { if event.EventType() == "order.created" { e := event.(OrderCreatedEvent) totalRevenue += e.TotalPrice.Amount orderCount++ } } return &Report{ TotalRevenue: totalRevenue, OrderCount: orderCount, Period: Period{From: from, To: to}, }, nil }
Conclusion 🎯
Building event-driven microservices with NATS, Go Fiber, and Domain-Driven Design provides a powerful architecture for scalable e-commerce systems. Key takeaways:
- Events as source of truth eliminate database writes for most operations
- NATS JetStream provides reliable event persistence and replay
- DDD keeps business logic clean and maintainable
- CQRS optimizes reads and writes independently
- Cobra CLI enables powerful administrative operations
- Event Sourcing provides complete audit trails and time travel
This architecture is ideal for:
- Systems requiring full audit trails (financial, medical, legal)
- High-scale applications with complex business logic
- Systems that need to reconstruct state from history
- Applications with separate read and write scalability needs
However, consider the added complexity and eventual consistency tradeoffs. For simple CRUD applications, traditional architecture may be more appropriate.
The complete source code for this example is available on GitHub. Happy building! 🚀
Visit my GitHub