7 Patrones de Concurrencia en Go que Deberías Conocer 🐹

22 de Agosto del 2024 ¿Ves algún error? Corregir artículo golang-wallpaper

La concurrencia es una de las características más potentes de Go, y dominarla es crucial para construir aplicaciones escalables y eficientes. A continuación, te presento 7 patrones de concurrencia en Go que debes conocer.

1. Worker Pool

Explicación:
El patrón Worker Pool implica crear un número fijo de goroutines que procesan tareas de una cola compartida. Este patrón es útil para controlar la cantidad de tareas concurrentes, lo cual es crucial para manejar el uso de recursos.

worker_pool.go
package main import ( "fmt" "sync" "time" ) func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) { defer wg.Done() for job := range jobs { fmt.Printf("Worker %d started job %d\n", id, job) time.Sleep(time.Second) fmt.Printf("Worker %d finished job %d\n", id, job) results <- job * 2 } } func main() { const numJobs = 5 const numWorkers = 3 jobs := make(chan int, numJobs) results := make(chan int, numJobs) var wg sync.WaitGroup for i := 1; i <= numWorkers; i++ { wg.Add(1) go worker(i, jobs, results, &wg) } for j := 1; j <= numJobs; j++ { jobs <- j } close(jobs) wg.Wait() close(results) for result := range results { fmt.Println("Result:", result) } }

Escenario Real:
Un servidor web que maneja solicitudes HTTP entrantes, donde cada solicitud es procesada por un worker del pool.

2. Fan-Out, Fan-In

Explicación:
Fan-Out ocurre cuando múltiples goroutines son iniciadas para procesar datos, y Fan-In es cuando los resultados de estas goroutines se combinan en un solo canal. Este patrón es útil para el procesamiento paralelo y luego recolectar los resultados.

fan_out_fan_in.go
package main import ( "fmt" "sync" ) func producer(id int, ch chan<- int, wg *sync.WaitGroup) { defer wg.Done() for i := 0; i < 5; i++ { ch <- i fmt.Printf("Producer %d produced %d\n", id, i) } } func consumer(id int, in <-chan int, out chan<- int, wg *sync.WaitGroup) { defer wg.Done() for v := range in { out <- v * 2 fmt.Printf("Consumer %d processed %d\n", id, v) } } func main() { numProducers := 2 numConsumers := 2 input := make(chan int, 10) output := make(chan int, 10) var wg sync.WaitGroup for i := 1; i <= numProducers; i++ { wg.Add(1) go producer(i, input, &wg) } wg.Wait() close(input) for i := 1; i <= numConsumers; i++ { wg.Add(1) go consumer(i, input, output, &wg) } wg.Wait() close(output) for result := range output { fmt.Println("Result:", result) } }

Escenario Real:
Un pipeline de procesamiento de datos donde diferentes etapas de procesamiento son manejadas por distintos conjuntos de workers.

3. Pipeline

Explicación:
El patrón Pipeline implica encadenar una serie de etapas donde cada una realiza una transformación en los datos y los pasa a la siguiente etapa. Es útil en escenarios donde los datos necesitan pasar por múltiples pasos de procesamiento secuenciales.

pipeline.go
package main import "fmt" func stage1(nums []int) <-chan int { out := make(chan int) go func() { for _, n := range nums { out <- n } close(out) }() return out } func stage2(in <-chan int) <-chan int { out := make(chan int) go func() { for n := range in { out <- n * 2 } close(out) }() return out } func stage3(in <-chan int) <-chan int { out := make(chan int) go func() { for n := range in { out <- n + 1 } close(out) }() return out } func main() { nums := []int{1, 2, 3, 4, 5} c1 := stage1(nums) c2 := stage2(c1) c3 := stage3(c2) for result := range c3 { fmt.Println(result) } }

Escenario Real:
Un sistema de procesamiento de imágenes donde una imagen pasa por varias etapas como redimensionamiento, filtrado y codificación.

4. Publish-Subscribe

Explicación:
El patrón Publish-Subscribe permite que los mensajes se publiquen a múltiples suscriptores. Este patrón es útil en sistemas donde diferentes servicios necesitan reaccionar independientemente a ciertos eventos o tipos de mensajes.

pub_sub.go
package main import ( "fmt" "sync" "time" ) type PubSub struct { mu sync.Mutex channels map[string][]chan string } func NewPubSub() *PubSub { return &PubSub{ channels: make(map[string][]chan string), } } func (ps *PubSub) Subscribe(topic string) <-chan string { ch := make(chan string) ps.mu.Lock() ps.channels[topic] = append(ps.channels[topic], ch) ps.mu.Unlock() return ch } func (ps *PubSub) Publish(topic, msg string) { ps.mu.Lock() for _, ch := range ps.channels[topic] { ch <- msg } ps.mu.Unlock() } func (ps *PubSub) Close(topic string) { ps.mu.Lock() for _, ch := range ps.channels[topic] { close(ch) } ps.mu.Unlock() } func main() { ps := NewPubSub() subscriber1 := ps.Subscribe("news") subscriber2 := ps.Subscribe("news") var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() for msg := range subscriber1 { fmt.Println("Subscriber 1 received:", msg) } }() go func() { defer wg.Done() for msg := range subscriber2 { fmt.Println("Subscriber 2 received:", msg) } }() ps.Publish("news", "Breaking News!") ps.Publish("news", "Another News!") time.Sleep(time.Second) ps.Close("news") wg.Wait() }

Escenario Real:
Un sistema de mensajería donde diferentes servicios se suscriben a ciertos tipos de eventos o mensajes.

5. Select con Timeout

Explicación:
Usar la instrucción select con un timeout permite evitar bloqueos indefinidos. Este patrón es útil cuando deseas realizar una acción o abortar si una operación tarda demasiado.

select_timeout.go
package main import ( "fmt" "time" ) func main() { c := make(chan string) go func() { time.Sleep(2 * time.Second) c <- "result" }() select { case res := <-c: fmt.Println("Received:", res) case <-time.After(1 * time.Second): fmt.Println("Timeout") } }

Escenario Real:
Un cliente de red que intenta conectarse a un servidor y se detiene si el servidor no responde a tiempo.

6. Semaphore

Explicación:
Un Semaphore limita el número de goroutines que pueden acceder a un recurso particular de manera concurrente. Este patrón es útil para controlar la concurrencia y evitar sobrecargar los recursos.

semaphore.go
package main import ( "fmt" "sync" "time" ) func worker(id int, sem chan struct{}, wg *sync.WaitGroup) { defer wg.Done() sem <- struct{}{} // Adquirir semáforo fmt.Printf("Worker %d starting\n", id) time.Sleep(time.Second) fmt.Printf("Worker %d done\n", id) <-sem // Liberar semáforo } func main() { const numWorkers = 5 const maxConcurrent = 2 sem := make(chan struct{}, maxConcurrent) var wg sync.WaitGroup for i := 1; i <= numWorkers; i++ { wg.Add(1) go worker(i, sem, &wg) } wg.Wait() }

Escenario Real:
Un pool de conexiones de base de datos donde un número limitado de conexiones están permitidas a la vez.

7. Rate Limiting

Explicación:
Rate Limiting controla la tasa a la que se procesan los eventos utilizando un ticker. Este patrón es útil cuando necesitas controlar la frecuencia de ciertas tareas, como las solicitudes a una API.

rate_limiting.go
package main import ( "fmt" "time" ) func main() { rate := time.Second ticker := time.NewTicker(rate) defer ticker.Stop() requests := make(chan int, 5) for i := 1; i <= 5; i++ { requests <- i } close(requests) for req := range requests { <-ticker.C // Esperar el siguiente tick fmt.Println("Processing request", req) } }

Escenario Real:
Un gateway de API que limita la cantidad de solicitudes que un usuario puede hacer en un período de tiempo determinado.

Conclusión

Los patrones de concurrencia en Go son esenciales para construir aplicaciones eficientes y escalables. Dominar estos patrones te permitirá manejar la concurrencia de manera efectiva, optimizando el uso de recursos y mejorando el rendimiento de tus aplicaciones.


¡Gracias por leer! Si te ha gustado este post, asegúrate de compartirlo y seguirme para más contenido sobre Go y desarrollo de software.

Conviértete en un Go Ninja 🥷.Suscríbete a mi newsletter y recibe las últimas novedades en Go.