
Construyendo Microservicios Event-Driven con NATS, Go Fiber y DDD
La Arquitectura Basada en Eventos (EDA) combinada con Event Sourcing y Domain-Driven Design (DDD) es uno de los patrones más poderosos para construir microservicios escalables y resilientes. En esta guía completa, construiremos un sistema de e-commerce completo usando NATS como nuestro event store y message broker, Go Fiber para APIs REST, PostgreSQL solo para modelos de lectura, y Cobra CLI para nuestra interfaz de línea de comandos.
La clave: los eventos son la fuente de verdad. Minimizaremos las escrituras en base de datos reproduciendo eventos para reconstruir el estado, haciendo nuestro sistema más auditable, escalable y tolerante a fallos.
¿Por qué este Stack? 🤔
NATS JetStream
- Event streaming persistente con entrega exactly-once
- Capacidad de replay de eventos para reconstruir estado
- Ligero y rápido (escrito en Go)
- Clustering integrado para alta disponibilidad
- Políticas de retención de streams para almacenamiento de eventos
Go Fiber
- API estilo Express para construir endpoints REST
- Extremadamente rápido (construido sobre Fasthttp)
- Bajo consumo de memoria
- Fácil integración de middleware
Domain-Driven Design (DDD)
- Clara separación de responsabilidades (Dominio, Aplicación, Infraestructura)
- Encapsulación de lógica de negocio en entidades de dominio
- Lenguaje ubicuo compartido con stakeholders del negocio
Patrón Event Sourcing
- Eventos como fuente de verdad en lugar del estado actual
- Rastro de auditoría completo de todos los cambios
- Viaje en el tiempo - reconstruir estado en cualquier punto
- Consistencia eventual con CQRS (Command Query Responsibility Segregation)
PostgreSQL con GORM
- Solo modelos de lectura (proyecciones de eventos)
- Optimización de consultas para casos de uso específicos
- Vistas materializadas del stream de eventos
Visión General de la Arquitectura 🏗️
Nuestro sistema de e-commerce tendrá:
- Servicio de Comandos (Fiber API) - Maneja operaciones de escritura, publica eventos
- Event Store (NATS JetStream) - Persiste todos los eventos de dominio
- Event Handlers - Se suscriben a eventos, actualizan modelos de lectura
- Servicio de Consultas (Fiber API) - Sirve modelos de lectura desde PostgreSQL
- CLI (Cobra) - Comandos administrativos y replay de eventos
cmd
server
main.go
cli
main.go
root.go
replay.go
internal
domain
order
aggregate.go
events.go
value_objects.go
application
commands
create_order.go
queries
get_order.go
infrastructure
nats
event_store.go
publisher.go
subscriber.go
postgres
repository.go
models.go
http
handlers.go
routes.go
Paso 1: Configuración del Proyecto con Cobra CLI 🛠️
Primero, configuremos la estructura de nuestro proyecto usando Cobra para comandos CLI:
~# Inicializar módulo Go go mod init github.com/yourusername/event-driven-ecommerce # Instalar dependencias go get github.com/gofiber/fiber/v2 go get github.com/nats-io/nats.go go get github.com/spf13/cobra go get gorm.io/gorm go get gorm.io/driver/postgres go get github.com/google/uuid
cmd/cli/main.gopackage main import ( "fmt" "os" "github.com/yourusername/event-driven-ecommerce/cmd/cli/commands" ) func main() { if err := commands.Execute(); err != nil { fmt.Fprintf(os.Stderr, "Error: %v\n", err) os.Exit(1) } }
cmd/cli/commands/root.gopackage commands import ( "github.com/spf13/cobra" ) var rootCmd = &cobra.Command{ Use: "ecommerce", Short: "Event-Driven E-commerce CLI", Long: "A CLI tool for managing event-driven e-commerce system", } func Execute() error { return rootCmd.Execute() } func init() { // Add subcommands rootCmd.AddCommand(replayCmd) rootCmd.AddCommand(serveCmd) }
cmd/cli/commands/replay.gopackage commands import ( "fmt" "log" "time" "github.com/spf13/cobra" "github.com/yourusername/event-driven-ecommerce/internal/infrastructure/nats" ) var ( streamName string fromTime string ) var replayCmd = &cobra.Command{ Use: "replay", Short: "Replay events from NATS JetStream", Long: "Rebuild read models by replaying events from a specific point in time", Run: func(cmd *cobra.Command, args []string) { fmt.Printf("Replaying events from stream: %s\n", streamName) var startTime time.Time var err error if fromTime != "" { startTime, err = time.Parse(time.RFC3339, fromTime) if err != nil { log.Fatalf("Invalid time format: %v", err) } } // Initialize NATS connection eventStore, err := nats.NewEventStore("nats://localhost:4222", streamName) if err != nil { log.Fatalf("Failed to connect to NATS: %v", err) } defer eventStore.Close() // Replay events if err := eventStore.ReplayFrom(startTime); err != nil { log.Fatalf("Failed to replay events: %v", err) } fmt.Println("Event replay completed successfully!") }, } func init() { replayCmd.Flags().StringVarP(&streamName, "stream", "s", "ORDERS", "NATS stream name") replayCmd.Flags().StringVarP(&fromTime, "from", "f", "", "Replay from this time (RFC3339 format)") }
Paso 2: Capa de Dominio (DDD) 🎯
Implementemos nuestro dominio usando principios DDD. Comenzaremos con el agregado Order:
internal/domain/order/value_objects.gopackage order import ( "errors" "github.com/google/uuid" ) // OrderID es un value object que representa un identificador de orden type OrderID struct { value string } func NewOrderID() OrderID { return OrderID{value: uuid.New().String()} } func OrderIDFromString(id string) (OrderID, error) { if id == "" { return OrderID{}, errors.New("order ID cannot be empty") } return OrderID{value: id}, nil } func (o OrderID) String() string { return o.value } // Money value object type Money struct { Amount int64 // Almacenado en centavos para evitar problemas con punto flotante Currency string } func NewMoney(amount int64, currency string) (Money, error) { if amount < 0 { return Money{}, errors.New("amount cannot be negative") } if currency == "" { return Money{}, errors.New("currency is required") } return Money{Amount: amount, Currency: currency}, nil } func (m Money) Add(other Money) (Money, error) { if m.Currency != other.Currency { return Money{}, errors.New("cannot add money with different currencies") } return Money{Amount: m.Amount + other.Amount, Currency: m.Currency}, nil } // OrderItem entity type OrderItem struct { ProductID string Quantity int UnitPrice Money } func NewOrderItem(productID string, quantity int, unitPrice Money) (OrderItem, error) { if productID == "" { return OrderItem{}, errors.New("product ID is required") } if quantity <= 0 { return OrderItem{}, errors.New("quantity must be positive") } return OrderItem{ ProductID: productID, Quantity: quantity, UnitPrice: unitPrice, }, nil } func (i OrderItem) TotalPrice() Money { return Money{ Amount: i.UnitPrice.Amount * int64(i.Quantity), Currency: i.UnitPrice.Currency, } } // OrderStatus value object type OrderStatus string const ( OrderStatusPending OrderStatus = "PENDING" OrderStatusConfirmed OrderStatus = "CONFIRMED" OrderStatusShipped OrderStatus = "SHIPPED" OrderStatusDelivered OrderStatus = "DELIVERED" OrderStatusCancelled OrderStatus = "CANCELLED" )
internal/domain/order/events.gopackage order import "time" // Domain Events - Estos son los hechos que ocurrieron en nuestro sistema type DomainEvent interface { EventType() string AggregateID() string OccurredAt() time.Time } // BaseEvent proporciona campos comunes de eventos type BaseEvent struct { EventID string `json:"event_id"` AggID string `json:"aggregate_id"` EventTime time.Time `json:"event_time"` EventVer int `json:"event_version"` } func (e BaseEvent) AggregateID() string { return e.AggID } func (e BaseEvent) OccurredAt() time.Time { return e.EventTime } // OrderCreatedEvent type OrderCreatedEvent struct { BaseEvent CustomerID string `json:"customer_id"` Items []OrderItem `json:"items"` TotalPrice Money `json:"total_price"` } func (e OrderCreatedEvent) EventType() string { return "order.created" } // OrderConfirmedEvent type OrderConfirmedEvent struct { BaseEvent ConfirmedAt time.Time `json:"confirmed_at"` } func (e OrderConfirmedEvent) EventType() string { return "order.confirmed" } // OrderShippedEvent type OrderShippedEvent struct { BaseEvent TrackingNumber string `json:"tracking_number"` ShippedAt time.Time `json:"shipped_at"` } func (e OrderShippedEvent) EventType() string { return "order.shipped" } // OrderCancelledEvent type OrderCancelledEvent struct { BaseEvent Reason string `json:"reason"` CancelledAt time.Time `json:"cancelled_at"` } func (e OrderCancelledEvent) EventType() string { return "order.cancelled" } // ItemAddedToOrderEvent type ItemAddedToOrderEvent struct { BaseEvent Item OrderItem `json:"item"` } func (e ItemAddedToOrderEvent) EventType() string { return "order.item.added" }
internal/domain/order/aggregate.gopackage order import ( "errors" "time" "github.com/google/uuid" ) // OrderAggregate es nuestro agregado raíz de dominio // Aplica reglas de negocio y emite eventos de dominio type OrderAggregate struct { id OrderID customerID string items []OrderItem status OrderStatus totalPrice Money version int uncommittedEvents []DomainEvent } // NewOrderAggregate crea una nueva orden (comando) func NewOrderAggregate(customerID string, items []OrderItem) (*OrderAggregate, error) { if customerID == "" { return nil, errors.New("customer ID is required") } if len(items) == 0 { return nil, errors.New("order must have at least one item") } // Calcular precio total totalPrice := Money{Amount: 0, Currency: items[0].UnitPrice.Currency} for _, item := range items { itemTotal := item.TotalPrice() var err error totalPrice, err = totalPrice.Add(itemTotal) if err != nil { return nil, err } } aggregate := &OrderAggregate{} // Crear y aplicar evento event := OrderCreatedEvent{ BaseEvent: BaseEvent{ EventID: uuid.New().String(), AggID: NewOrderID().String(), EventTime: time.Now(), EventVer: 1, }, CustomerID: customerID, Items: items, TotalPrice: totalPrice, } aggregate.applyOrderCreated(event) aggregate.uncommittedEvents = append(aggregate.uncommittedEvents, event) return aggregate, nil } // LoadFromEvents reconstruye el estado del agregado desde el historial de eventos func LoadFromEvents(events []DomainEvent) (*OrderAggregate, error) { if len(events) == 0 { return nil, errors.New("cannot load aggregate from empty event list") } aggregate := &OrderAggregate{} for _, event := range events { if err := aggregate.ApplyEvent(event); err != nil { return nil, err } } return aggregate, nil } // ApplyEvent aplica un evento al agregado func (a *OrderAggregate) ApplyEvent(event DomainEvent) error { switch e := event.(type) { case OrderCreatedEvent: a.applyOrderCreated(e) case OrderConfirmedEvent: a.applyOrderConfirmed(e) case OrderShippedEvent: a.applyOrderShipped(e) case OrderCancelledEvent: a.applyOrderCancelled(e) case ItemAddedToOrderEvent: a.applyItemAdded(e) default: return errors.New("unknown event type") } a.version++ return nil } // Métodos de lógica de negocio que emiten eventos func (a *OrderAggregate) Confirm() error { if a.status != OrderStatusPending { return errors.New("only pending orders can be confirmed") } event := OrderConfirmedEvent{ BaseEvent: BaseEvent{ EventID: uuid.New().String(), AggID: a.id.String(), EventTime: time.Now(), EventVer: a.version + 1, }, ConfirmedAt: time.Now(), } a.applyOrderConfirmed(event) a.uncommittedEvents = append(a.uncommittedEvents, event) return nil } func (a *OrderAggregate) Ship(trackingNumber string) error { if a.status != OrderStatusConfirmed { return errors.New("only confirmed orders can be shipped") } if trackingNumber == "" { return errors.New("tracking number is required") } event := OrderShippedEvent{ BaseEvent: BaseEvent{ EventID: uuid.New().String(), AggID: a.id.String(), EventTime: time.Now(), EventVer: a.version + 1, }, TrackingNumber: trackingNumber, ShippedAt: time.Now(), } a.applyOrderShipped(event) a.uncommittedEvents = append(a.uncommittedEvents, event) return nil } func (a *OrderAggregate) Cancel(reason string) error { if a.status == OrderStatusShipped || a.status == OrderStatusDelivered { return errors.New("shipped or delivered orders cannot be cancelled") } if a.status == OrderStatusCancelled { return errors.New("order is already cancelled") } event := OrderCancelledEvent{ BaseEvent: BaseEvent{ EventID: uuid.New().String(), AggID: a.id.String(), EventTime: time.Now(), EventVer: a.version + 1, }, Reason: reason, CancelledAt: time.Now(), } a.applyOrderCancelled(event) a.uncommittedEvents = append(a.uncommittedEvents, event) return nil } // Métodos de aplicación de eventos (cambios de estado) func (a *OrderAggregate) applyOrderCreated(event OrderCreatedEvent) { id, _ := OrderIDFromString(event.AggID) a.id = id a.customerID = event.CustomerID a.items = event.Items a.totalPrice = event.TotalPrice a.status = OrderStatusPending } func (a *OrderAggregate) applyOrderConfirmed(event OrderConfirmedEvent) { a.status = OrderStatusConfirmed } func (a *OrderAggregate) applyOrderShipped(event OrderShippedEvent) { a.status = OrderStatusShipped } func (a *OrderAggregate) applyOrderCancelled(event OrderCancelledEvent) { a.status = OrderStatusCancelled } func (a *OrderAggregate) applyItemAdded(event ItemAddedToOrderEvent) { a.items = append(a.items, event.Item) itemTotal := event.Item.TotalPrice() a.totalPrice, _ = a.totalPrice.Add(itemTotal) } // Getters func (a *OrderAggregate) ID() OrderID { return a.id } func (a *OrderAggregate) Status() OrderStatus { return a.status } func (a *OrderAggregate) GetUncommittedEvents() []DomainEvent { return a.uncommittedEvents } func (a *OrderAggregate) ClearUncommittedEvents() { a.uncommittedEvents = nil } func (a *OrderAggregate) Version() int { return a.version }
Paso 3: Infraestructura de Event Store con NATS 📨
Ahora implementemos NATS JetStream como nuestro event store:
internal/infrastructure/nats/event_store.gopackage nats import ( "encoding/json" "fmt" "time" "github.com/nats-io/nats.go" "github.com/yourusername/event-driven-ecommerce/internal/domain/order" ) type EventStore struct { conn *nats.Conn js nats.JetStreamContext stream string } func NewEventStore(url, streamName string) (*EventStore, error) { // Conectar a NATS nc, err := nats.Connect(url) if err != nil { return nil, fmt.Errorf("failed to connect to NATS: %w", err) } // Obtener contexto JetStream js, err := nc.JetStream() if err != nil { return nil, fmt.Errorf("failed to get JetStream context: %w", err) } // Crear o actualizar stream streamConfig := &nats.StreamConfig{ Name: streamName, Subjects: []string{streamName + ".*"}, Storage: nats.FileStorage, Retention: nats.LimitsPolicy, MaxAge: 365 * 24 * time.Hour, // Mantener eventos por 1 año Replicas: 1, } _, err = js.AddStream(streamConfig) if err != nil { // El stream podría ya existir, intentar actualizar _, err = js.UpdateStream(streamConfig) if err != nil { return nil, fmt.Errorf("failed to create/update stream: %w", err) } } return &EventStore{ conn: nc, js: js, stream: streamName, }, nil } func (es *EventStore) SaveEvents(aggregateID string, events []order.DomainEvent) error { for _, event := range events { data, err := json.Marshal(event) if err != nil { return fmt.Errorf("failed to marshal event: %w", err) } subject := fmt.Sprintf("%s.%s", es.stream, event.EventType()) // Publicar a JetStream con metadata _, err = es.js.Publish(subject, data, nats.MsgId(event.AggregateID())) if err != nil { return fmt.Errorf("failed to publish event: %w", err) } } return nil } func (es *EventStore) LoadEvents(aggregateID string) ([]order.DomainEvent, error) { // Crear un consumer para este agregado específico consumerName := fmt.Sprintf("loader_%s", aggregateID) // Suscribirse a todos los eventos de este agregado subject := fmt.Sprintf("%s.*", es.stream) sub, err := es.js.PullSubscribe(subject, consumerName, nats.DeliverAll()) if err != nil { return nil, fmt.Errorf("failed to subscribe: %w", err) } defer sub.Unsubscribe() var events []order.DomainEvent // Obtener eventos (máximo 100 a la vez) msgs, err := sub.Fetch(100, nats.MaxWait(2*time.Second)) if err != nil && err != nats.ErrTimeout { return nil, fmt.Errorf("failed to fetch messages: %w", err) } for _, msg := range msgs { // Deserializar basado en tipo de evento event, err := es.deserializeEvent(msg.Data, msg.Subject) if err != nil { return nil, err } // Solo incluir eventos de este agregado if event.AggregateID() == aggregateID { events = append(events, event) msg.Ack() } } return events, nil } func (es *EventStore) deserializeEvent(data []byte, subject string) (order.DomainEvent, error) { // Extraer tipo de evento del subject (ej. "ORDERS.order.created" -> "order.created") var baseEvent order.BaseEvent if err := json.Unmarshal(data, &baseEvent); err != nil { return nil, err } // Deserializar a tipo concreto basado en subject switch { case contains(subject, "order.created"): var event order.OrderCreatedEvent if err := json.Unmarshal(data, &event); err != nil { return nil, err } return event, nil case contains(subject, "order.confirmed"): var event order.OrderConfirmedEvent if err := json.Unmarshal(data, &event); err != nil { return nil, err } return event, nil case contains(subject, "order.shipped"): var event order.OrderShippedEvent if err := json.Unmarshal(data, &event); err != nil { return nil, err } return event, nil case contains(subject, "order.cancelled"): var event order.OrderCancelledEvent if err := json.Unmarshal(data, &event); err != nil { return nil, err } return event, nil default: return nil, fmt.Errorf("unknown event type: %s", subject) } } func (es *EventStore) ReplayFrom(startTime time.Time) error { // Suscribirse desde un tiempo específico sub, err := es.js.Subscribe( fmt.Sprintf("%s.*", es.stream), func(msg *nats.Msg) { // Procesar cada evento para reconstruir modelos de lectura fmt.Printf("Replaying event: %s\n", msg.Subject) // Aquí dispararías tus event handlers msg.Ack() }, nats.DeliverByStartTime(startTime), nats.ManualAck(), ) if err != nil { return err } defer sub.Unsubscribe() // Esperar a que complete el replay time.Sleep(5 * time.Second) return nil } func (es *EventStore) Close() { if es.conn != nil { es.conn.Close() } } func contains(s, substr string) bool { return len(s) >= len(substr) && s[len(s)-len(substr):] == substr }
internal/infrastructure/nats/subscriber.gopackage nats import ( "encoding/json" "fmt" "log" "github.com/nats-io/nats.go" ) type EventHandler func(eventType string, data []byte) error type Subscriber struct { js nats.JetStreamContext stream string consumerName string handlers map[string]EventHandler } func NewSubscriber(js nats.JetStreamContext, stream, consumerName string) *Subscriber { return &Subscriber{ js: js, stream: stream, consumerName: consumerName, handlers: make(map[string]EventHandler), } } func (s *Subscriber) RegisterHandler(eventType string, handler EventHandler) { s.handlers[eventType] = handler } func (s *Subscriber) Start() error { subject := fmt.Sprintf("%s.*", s.stream) // Crear consumer durable _, err := s.js.Subscribe( subject, s.handleMessage, nats.Durable(s.consumerName), nats.ManualAck(), nats.DeliverAll(), nats.AckWait(30*time.Second), ) return err } func (s *Subscriber) handleMessage(msg *nats.Msg) { // Extraer tipo de evento del subject parts := strings.Split(msg.Subject, ".") if len(parts) < 2 { log.Printf("Invalid subject format: %s", msg.Subject) msg.Nak() return } eventType := strings.Join(parts[1:], ".") // Encontrar y ejecutar handler if handler, ok := s.handlers[eventType]; ok { if err := handler(eventType, msg.Data); err != nil { log.Printf("Error handling event %s: %v", eventType, err) msg.Nak() return } msg.Ack() } else { log.Printf("No handler for event type: %s", eventType) msg.Ack() // Acknowledge de todos modos para evitar reentrega } }
Paso 4: Modelos de Lectura PostgreSQL (Solo Proyecciones) 💾
PostgreSQL se usa SOLO para modelos de lectura - vistas materializadas de nuestro stream de eventos:
internal/infrastructure/postgres/models.gopackage postgres import ( "time" "gorm.io/gorm" ) // OrderReadModel - Proyección de solo lectura construida desde eventos type OrderReadModel struct { ID string `gorm:"primaryKey"` CustomerID string `gorm:"index"` Status string `gorm:"index"` TotalAmount int64 // En centavos Currency string TrackingNumber string CreatedAt time.Time `gorm:"index"` UpdatedAt time.Time CancelledAt *time.Time ShippedAt *time.Time } // OrderItemReadModel type OrderItemReadModel struct { ID uint `gorm:"primaryKey"` OrderID string `gorm:"index"` ProductID string Quantity int UnitPrice int64 // En centavos Currency string } // Migrate crea las tablas func Migrate(db *gorm.DB) error { return db.AutoMigrate( &OrderReadModel{}, &OrderItemReadModel{}, ) }
internal/infrastructure/postgres/repository.gopackage postgres import ( "fmt" "gorm.io/gorm" ) type ReadRepository struct { db *gorm.DB } func NewReadRepository(db *gorm.DB) *ReadRepository { return &ReadRepository{db: db} } // UpdateOrderProjection actualiza el modelo de lectura desde un evento func (r *ReadRepository) UpdateOrderProjection(order *OrderReadModel) error { // Operación upsert return r.db.Save(order).Error } func (r *ReadRepository) AddOrderItem(item *OrderItemReadModel) error { return r.db.Create(item).Error } func (r *ReadRepository) GetOrder(orderID string) (*OrderReadModel, error) { var order OrderReadModel err := r.db.First(&order, "id = ?", orderID).Error if err != nil { return nil, err } return &order, nil } func (r *ReadRepository) GetOrderWithItems(orderID string) (*OrderReadModel, []OrderItemReadModel, error) { var order OrderReadModel var items []OrderItemReadModel if err := r.db.First(&order, "id = ?", orderID).Error; err != nil { return nil, nil, err } if err := r.db.Find(&items, "order_id = ?", orderID).Error; err != nil { return nil, nil, err } return &order, items, nil } func (r *ReadRepository) ListOrders(customerID string, status string, limit, offset int) ([]OrderReadModel, error) { var orders []OrderReadModel query := r.db.Model(&OrderReadModel{}) if customerID != "" { query = query.Where("customer_id = ?", customerID) } if status != "" { query = query.Where("status = ?", status) } err := query. Order("created_at DESC"). Limit(limit). Offset(offset). Find(&orders).Error return orders, err }
Paso 5: Event Handlers (Actualizar Modelos de Lectura) 🔄
Los event handlers escuchan a NATS y actualizan los modelos de lectura en PostgreSQL:
internal/application/event_handlers.gopackage application import ( "encoding/json" "log" "time" "github.com/yourusername/event-driven-ecommerce/internal/domain/order" "github.com/yourusername/event-driven-ecommerce/internal/infrastructure/postgres" ) type EventHandlers struct { repo *postgres.ReadRepository } func NewEventHandlers(repo *postgres.ReadRepository) *EventHandlers { return &EventHandlers{repo: repo} } func (h *EventHandlers) HandleOrderCreated(eventType string, data []byte) error { var event order.OrderCreatedEvent if err := json.Unmarshal(data, &event); err != nil { return err } log.Printf("Handling OrderCreated: %s", event.AggID) // Crear modelo de lectura orderModel := &postgres.OrderReadModel{ ID: event.AggID, CustomerID: event.CustomerID, Status: string(order.OrderStatusPending), TotalAmount: event.TotalPrice.Amount, Currency: event.TotalPrice.Currency, CreatedAt: event.EventTime, UpdatedAt: event.EventTime, } if err := h.repo.UpdateOrderProjection(orderModel); err != nil { return err } // Crear items de la orden for _, item := range event.Items { itemModel := &postgres.OrderItemReadModel{ OrderID: event.AggID, ProductID: item.ProductID, Quantity: item.Quantity, UnitPrice: item.UnitPrice.Amount, Currency: item.UnitPrice.Currency, } if err := h.repo.AddOrderItem(itemModel); err != nil { return err } } return nil } func (h *EventHandlers) HandleOrderConfirmed(eventType string, data []byte) error { var event order.OrderConfirmedEvent if err := json.Unmarshal(data, &event); err != nil { return err } log.Printf("Handling OrderConfirmed: %s", event.AggID) orderModel, err := h.repo.GetOrder(event.AggID) if err != nil { return err } orderModel.Status = string(order.OrderStatusConfirmed) orderModel.UpdatedAt = event.EventTime return h.repo.UpdateOrderProjection(orderModel) } func (h *EventHandlers) HandleOrderShipped(eventType string, data []byte) error { var event order.OrderShippedEvent if err := json.Unmarshal(data, &event); err != nil { return err } log.Printf("Handling OrderShipped: %s", event.AggID) orderModel, err := h.repo.GetOrder(event.AggID) if err != nil { return err } orderModel.Status = string(order.OrderStatusShipped) orderModel.TrackingNumber = event.TrackingNumber shippedAt := event.ShippedAt orderModel.ShippedAt = &shippedAt orderModel.UpdatedAt = event.EventTime return h.repo.UpdateOrderProjection(orderModel) } func (h *EventHandlers) HandleOrderCancelled(eventType string, data []byte) error { var event order.OrderCancelledEvent if err := json.Unmarshal(data, &event); err != nil { return err } log.Printf("Handling OrderCancelled: %s", event.AggID) orderModel, err := h.repo.GetOrder(event.AggID) if err != nil { return err } orderModel.Status = string(order.OrderStatusCancelled) cancelledAt := event.CancelledAt orderModel.CancelledAt = &cancelledAt orderModel.UpdatedAt = event.EventTime return h.repo.UpdateOrderProjection(orderModel) }
Paso 6: Comandos y Consultas de Aplicación (CQRS) 📝
Separar operaciones de lectura y escritura:
internal/application/commands/create_order.gopackage commands import ( "github.com/yourusername/event-driven-ecommerce/internal/domain/order" natsinfra "github.com/yourusername/event-driven-ecommerce/internal/infrastructure/nats" ) type CreateOrderCommand struct { CustomerID string Items []order.OrderItem } type CreateOrderHandler struct { eventStore *natsinfra.EventStore } func NewCreateOrderHandler(eventStore *natsinfra.EventStore) *CreateOrderHandler { return &CreateOrderHandler{eventStore: eventStore} } func (h *CreateOrderHandler) Handle(cmd CreateOrderCommand) (string, error) { // Crear nuevo agregado aggregate, err := order.NewOrderAggregate(cmd.CustomerID, cmd.Items) if err != nil { return "", err } // Guardar eventos en NATS (event store) events := aggregate.GetUncommittedEvents() if err := h.eventStore.SaveEvents(aggregate.ID().String(), events); err != nil { return "", err } aggregate.ClearUncommittedEvents() return aggregate.ID().String(), nil } // ConfirmOrderCommand type ConfirmOrderCommand struct { OrderID string } type ConfirmOrderHandler struct { eventStore *natsinfra.EventStore } func NewConfirmOrderHandler(eventStore *natsinfra.EventStore) *ConfirmOrderHandler { return &ConfirmOrderHandler{eventStore: eventStore} } func (h *ConfirmOrderHandler) Handle(cmd ConfirmOrderCommand) error { // Cargar agregado desde eventos events, err := h.eventStore.LoadEvents(cmd.OrderID) if err != nil { return err } aggregate, err := order.LoadFromEvents(events) if err != nil { return err } // Ejecutar lógica de negocio if err := aggregate.Confirm(); err != nil { return err } // Guardar nuevos eventos newEvents := aggregate.GetUncommittedEvents() if err := h.eventStore.SaveEvents(aggregate.ID().String(), newEvents); err != nil { return err } aggregate.ClearUncommittedEvents() return nil }
internal/application/queries/get_order.gopackage queries import ( "github.com/yourusername/event-driven-ecommerce/internal/infrastructure/postgres" ) type GetOrderQuery struct { OrderID string } type OrderDTO struct { ID string CustomerID string Status string TotalAmount int64 Currency string TrackingNumber string Items []OrderItemDTO CreatedAt string UpdatedAt string } type OrderItemDTO struct { ProductID string Quantity int UnitPrice int64 Currency string } type GetOrderHandler struct { repo *postgres.ReadRepository } func NewGetOrderHandler(repo *postgres.ReadRepository) *GetOrderHandler { return &GetOrderHandler{repo: repo} } func (h *GetOrderHandler) Handle(query GetOrderQuery) (*OrderDTO, error) { order, items, err := h.repo.GetOrderWithItems(query.OrderID) if err != nil { return nil, err } dto := &OrderDTO{ ID: order.ID, CustomerID: order.CustomerID, Status: order.Status, TotalAmount: order.TotalAmount, Currency: order.Currency, TrackingNumber: order.TrackingNumber, CreatedAt: order.CreatedAt.Format(time.RFC3339), UpdatedAt: order.UpdatedAt.Format(time.RFC3339), Items: make([]OrderItemDTO, 0, len(items)), } for _, item := range items { dto.Items = append(dto.Items, OrderItemDTO{ ProductID: item.ProductID, Quantity: item.Quantity, UnitPrice: item.UnitPrice, Currency: item.Currency, }) } return dto, nil }
Paso 7: Handlers HTTP con Go Fiber 🌐
Crear endpoints de API REST usando Fiber:
internal/infrastructure/http/handlers.gopackage http import ( "github.com/gofiber/fiber/v2" "github.com/yourusername/event-driven-ecommerce/internal/application/commands" "github.com/yourusername/event-driven-ecommerce/internal/application/queries" "github.com/yourusername/event-driven-ecommerce/internal/domain/order" ) type OrderHandlers struct { createOrderHandler *commands.CreateOrderHandler confirmOrderHandler *commands.ConfirmOrderHandler getOrderHandler *queries.GetOrderHandler } func NewOrderHandlers( createOrderHandler *commands.CreateOrderHandler, confirmOrderHandler *commands.ConfirmOrderHandler, getOrderHandler *queries.GetOrderHandler, ) *OrderHandlers { return &OrderHandlers{ createOrderHandler: createOrderHandler, confirmOrderHandler: confirmOrderHandler, getOrderHandler: getOrderHandler, } } type CreateOrderRequest struct { CustomerID string `json:"customer_id"` Items []OrderItemRequest `json:"items"` } type OrderItemRequest struct { ProductID string `json:"product_id"` Quantity int `json:"quantity"` UnitPrice int64 `json:"unit_price"` Currency string `json:"currency"` } func (h *OrderHandlers) CreateOrder(c *fiber.Ctx) error { var req CreateOrderRequest if err := c.BodyParser(&req); err != nil { return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{ "error": "Invalid request body", }) } // Convertir a entidades de dominio items := make([]order.OrderItem, 0, len(req.Items)) for _, itemReq := range req.Items { money, err := order.NewMoney(itemReq.UnitPrice, itemReq.Currency) if err != nil { return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{ "error": err.Error(), }) } item, err := order.NewOrderItem(itemReq.ProductID, itemReq.Quantity, money) if err != nil { return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{ "error": err.Error(), }) } items = append(items, item) } // Ejecutar comando cmd := commands.CreateOrderCommand{ CustomerID: req.CustomerID, Items: items, } orderID, err := h.createOrderHandler.Handle(cmd) if err != nil { return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{ "error": err.Error(), }) } return c.Status(fiber.StatusCreated).JSON(fiber.Map{ "order_id": orderID, "message": "Order created successfully", }) } func (h *OrderHandlers) GetOrder(c *fiber.Ctx) error { orderID := c.Params("id") query := queries.GetOrderQuery{OrderID: orderID} orderDTO, err := h.getOrderHandler.Handle(query) if err != nil { return c.Status(fiber.StatusNotFound).JSON(fiber.Map{ "error": "Order not found", }) } return c.JSON(orderDTO) } func (h *OrderHandlers) ConfirmOrder(c *fiber.Ctx) error { orderID := c.Params("id") cmd := commands.ConfirmOrderCommand{OrderID: orderID} if err := h.confirmOrderHandler.Handle(cmd); err != nil { return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{ "error": err.Error(), }) } return c.JSON(fiber.Map{ "message": "Order confirmed successfully", }) }
internal/infrastructure/http/routes.gopackage http import ( "github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2/middleware/logger" "github.com/gofiber/fiber/v2/middleware/recover" ) func SetupRoutes(app *fiber.App, handlers *OrderHandlers) { // Middleware app.Use(logger.New()) app.Use(recover.New()) // Health check app.Get("/health", func(c *fiber.Ctx) error { return c.JSON(fiber.Map{"status": "ok"}) }) // Rutas de API api := app.Group("/api/v1") // Orders orders := api.Group("/orders") orders.Post("/", handlers.CreateOrder) orders.Get("/:id", handlers.GetOrder) orders.Post("/:id/confirm", handlers.ConfirmOrder) }
Paso 8: Aplicación Principal del Servidor 🚀
Integrar todo:
cmd/server/main.gopackage main import ( "log" "os" "github.com/gofiber/fiber/v2" "gorm.io/driver/postgres" "gorm.io/gorm" "github.com/yourusername/event-driven-ecommerce/internal/application" "github.com/yourusername/event-driven-ecommerce/internal/application/commands" "github.com/yourusername/event-driven-ecommerce/internal/application/queries" httpinfra "github.com/yourusername/event-driven-ecommerce/internal/infrastructure/http" natsinfra "github.com/yourusername/event-driven-ecommerce/internal/infrastructure/nats" postgresinfra "github.com/yourusername/event-driven-ecommerce/internal/infrastructure/postgres" ) func main() { // Cargar configuración desde variables de entorno natsURL := getEnv("NATS_URL", "nats://localhost:4222") postgresURL := getEnv("POSTGRES_URL", "postgresql://user:password@localhost:5432/ecommerce?sslmode=disable") port := getEnv("PORT", "3000") // Conectar a PostgreSQL db, err := gorm.Open(postgres.Open(postgresURL), &gorm.Config{}) if err != nil { log.Fatalf("Failed to connect to database: %v", err) } // Ejecutar migraciones if err := postgresinfra.Migrate(db); err != nil { log.Fatalf("Failed to migrate database: %v", err) } // Inicializar repositorios readRepo := postgresinfra.NewReadRepository(db) // Inicializar event store de NATS eventStore, err := natsinfra.NewEventStore(natsURL, "ORDERS") if err != nil { log.Fatalf("Failed to initialize NATS event store: %v", err) } defer eventStore.Close() // Inicializar command handlers createOrderHandler := commands.NewCreateOrderHandler(eventStore) confirmOrderHandler := commands.NewConfirmOrderHandler(eventStore) // Inicializar query handlers getOrderHandler := queries.NewGetOrderHandler(readRepo) // Inicializar event handlers eventHandlers := application.NewEventHandlers(readRepo) // Configurar suscriptor de NATS js, _ := eventStore.JetStream() subscriber := natsinfra.NewSubscriber(js, "ORDERS", "order-projector") subscriber.RegisterHandler("order.created", eventHandlers.HandleOrderCreated) subscriber.RegisterHandler("order.confirmed", eventHandlers.HandleOrderConfirmed) subscriber.RegisterHandler("order.shipped", eventHandlers.HandleOrderShipped) subscriber.RegisterHandler("order.cancelled", eventHandlers.HandleOrderCancelled) // Iniciar suscriptor if err := subscriber.Start(); err != nil { log.Fatalf("Failed to start subscriber: %v", err) } // Inicializar app de Fiber app := fiber.New(fiber.Config{ AppName: "Event-Driven E-commerce API", }) // Configurar HTTP handlers orderHandlers := httpinfra.NewOrderHandlers( createOrderHandler, confirmOrderHandler, getOrderHandler, ) httpinfra.SetupRoutes(app, orderHandlers) // Iniciar servidor log.Printf("Server starting on port %s", port) if err := app.Listen(":" + port); err != nil { log.Fatalf("Failed to start server: %v", err) } } func getEnv(key, defaultValue string) string { if value := os.Getenv(key); value != "" { return value } return defaultValue }
Paso 9: Configuración de Docker Compose 🐳
docker-compose.ymlversion: '3.8' services: nats: image: nats:latest ports: - "4222:4222" - "8222:8222" command: ["-js", "-m", "8222"] volumes: - nats-data:/data postgres: image: postgres:15 environment: POSTGRES_USER: user POSTGRES_PASSWORD: password POSTGRES_DB: ecommerce ports: - "5432:5432" volumes: - postgres-data:/var/lib/postgresql/data app: build: . ports: - "3000:3000" environment: NATS_URL: "nats://nats:4222" POSTGRES_URL: "postgresql://user:password@postgres:5432/ecommerce?sslmode=disable" depends_on: - nats - postgres volumes: nats-data: postgres-data:
Probando el Sistema 🧪
~# Iniciar servicios docker-compose up -d # Crear una orden curl -X POST http://localhost:3000/api/v1/orders \ -H "Content-Type: application/json" \ -d '{ "customer_id": "customer-123", "items": [ { "product_id": "product-1", "quantity": 2, "unit_price": 2999, "currency": "USD" }, { "product_id": "product-2", "quantity": 1, "unit_price": 4999, "currency": "USD" } ] }' # Respuesta: {"order_id": "uuid", "message": "Order created successfully"} # Obtener orden curl http://localhost:3000/api/v1/orders/<order-id> # Confirmar orden curl -X POST http://localhost:3000/api/v1/orders/<order-id>/confirm # Reproducir eventos desde el inicio (usando CLI) go run cmd/cli/main.go replay --stream ORDERS --from 2026-01-01T00:00:00Z
Ventajas de Esta Arquitectura 🌟
1. Beneficios de Event Sourcing
- Rastro de auditoría completo - Cada cambio de estado está registrado
- Viaje en el tiempo - Reconstruir estado en cualquier punto de la historia
- Replay de eventos - Recuperarse de corrupción de datos reproduciendo eventos
- Debugging - Ver la secuencia exacta de eventos que llevó al estado actual
2. Escrituras Mínimas en Base de Datos
- Los eventos son la fuente de verdad - PostgreSQL solo para optimización de lectura
- Carga reducida en la base de datos - Solo escribir al crear proyecciones
- Mejor escalabilidad - Event store maneja escrituras, DB maneja lecturas
3. Separación CQRS
- Lecturas optimizadas - Modelos de consulta adaptados a casos de uso específicos
- Escrituras optimizadas - Modelos de dominio enfocados en lógica de negocio
- Escalado independiente - Escalar lados de lectura y escritura independientemente
4. Domain-Driven Design
- Lógica de negocio clara - La capa de dominio contiene todas las reglas
- Lenguaje ubicuo - El código coincide con la terminología del negocio
- Contextos acotados - Cada agregado es autocontenido
5. Resiliencia
- Persistencia de eventos - NATS JetStream asegura que no se pierdan eventos
- Mecanismos de reintento - El procesamiento de eventos fallidos puede reintentarse
- Consistencia eventual - El sistema tolera inconsistencias temporales
Desventajas y Desafíos ⚠️
1. Complejidad
Event sourcing añade complejidad significativa comparado con operaciones CRUD. La curva de aprendizaje es pronunciada.
2. Consistencia Eventual
Los modelos de lectura se actualizan de forma asíncrona. Las consultas podrían no reflejar las últimas escrituras inmediatamente.
3. Evolución del Schema de Eventos
Cambiar la estructura de eventos requiere estrategia de versionado. Los eventos antiguos deben seguir siendo procesables.
4. Complejidad de Debugging
Rastrear problemas a través de eventos, handlers y proyecciones puede ser desafiante.
5. Limitaciones de Consulta
Solo puedes consultar lo que has proyectado. Consultas ad-hoc requieren nuevas proyecciones.
6. Crecimiento de Almacenamiento
Los eventos se acumulan con el tiempo. Se necesitan estrategias para archivo y limpieza.
Mejores Prácticas 💡
1. Versionado de Eventos
~type OrderCreatedEventV1 struct { BaseEvent CustomerID string Items []OrderItem } type OrderCreatedEventV2 struct { BaseEvent CustomerID string Items []OrderItem ShippingMethod string // Nuevo campo } // Manejar ambas versiones func deserializeOrderCreated(version int, data []byte) (DomainEvent, error) { switch version { case 1: var event OrderCreatedEventV1 json.Unmarshal(data, &event) // Migrar a V2 si es necesario return event, nil case 2: var event OrderCreatedEventV2 json.Unmarshal(data, &event) return event, nil default: return nil, errors.New("unknown version") } }
2. Event Handlers Idempotentes
~func (h *EventHandlers) HandleOrderCreated(eventType string, data []byte) error { var event order.OrderCreatedEvent json.Unmarshal(data, &event) // Verificar si ya fue procesado (idempotencia) existing, err := h.repo.GetOrder(event.AggID) if err == nil && existing != nil { log.Printf("Order %s already exists, skipping", event.AggID) return nil // Ya procesado } // Procesar evento... }
3. Patrón de Snapshot para Agregados Grandes
~type OrderSnapshot struct { AggregateID string Version int State OrderAggregate CreatedAt time.Time } // Cargar desde snapshot + eventos subsecuentes func LoadFromSnapshot(snapshot OrderSnapshot, events []DomainEvent) (*OrderAggregate, error) { aggregate := snapshot.State for _, event := range events { if event.Version() <= snapshot.Version { continue // Saltar eventos ya aplicados } aggregate.ApplyEvent(event) } return &aggregate, nil }
4. Metadata de Eventos
~type EventMetadata struct { UserID string CorrelationID string CausationID string IP string Timestamp time.Time } type EnrichedEvent struct { Event DomainEvent Metadata EventMetadata }
Escenarios del Mundo Real 🌍
Escenario 1: Cancelación de Orden con Restauración de Inventario
Cuando se cancela una orden, el inventario debe restaurarse. Los eventos hacen esto fácil:
~// Handler para evento de orden cancelada func (h *InventoryHandlers) HandleOrderCancelled(data []byte) error { var event order.OrderCancelledEvent json.Unmarshal(data, &event) // Obtener evento original de creación de orden para saber qué restaurar events, _ := h.eventStore.LoadEvents(event.AggID) var createEvent order.OrderCreatedEvent for _, e := range events { if e.EventType() == "order.created" { createEvent = e.(order.OrderCreatedEvent) break } } // Restaurar inventario para cada item for _, item := range createEvent.Items { h.inventoryService.RestoreStock(item.ProductID, item.Quantity) } return nil }
Escenario 2: Generar Reportes desde Eventos
~// Generar reporte de ingresos reproduciendo eventos func GenerateRevenueReport(from, to time.Time) (*Report, error) { var totalRevenue int64 var orderCount int events := eventStore.LoadEventsBetween("ORDERS", from, to) for _, event := range events { if event.EventType() == "order.created" { e := event.(OrderCreatedEvent) totalRevenue += e.TotalPrice.Amount orderCount++ } } return &Report{ TotalRevenue: totalRevenue, OrderCount: orderCount, Period: Period{From: from, To: to}, }, nil }
Conclusión 🎯
Construir microservicios event-driven con NATS, Go Fiber y Domain-Driven Design proporciona una arquitectura poderosa para sistemas de e-commerce escalables. Puntos clave:
- Eventos como fuente de verdad eliminan escrituras en base de datos para la mayoría de operaciones
- NATS JetStream proporciona persistencia de eventos confiable y replay
- DDD mantiene la lógica de negocio limpia y mantenible
- CQRS optimiza lecturas y escrituras independientemente
- Cobra CLI habilita operaciones administrativas poderosas
- Event Sourcing proporciona rastros de auditoría completos y viaje en el tiempo
Esta arquitectura es ideal para:
- Sistemas que requieren rastros de auditoría completos (financieros, médicos, legales)
- Aplicaciones de alta escala con lógica de negocio compleja
- Sistemas que necesitan reconstruir estado desde historial
- Aplicaciones con necesidades de escalabilidad separadas para lectura y escritura
Sin embargo, considera la complejidad añadida y los trade-offs de consistencia eventual. Para aplicaciones CRUD simples, la arquitectura tradicional puede ser más apropiada.
El código fuente completo para este ejemplo está disponible en GitHub. ¡Feliz construcción! 🚀
Visita mi GitHub