7 Concurrency Patterns in Go You Should Know 🐹

August 22, 2024 ¿Ves algún error? Corregir artículo golang-wallpaper

Concurrency is one of Go's most powerful features, and mastering it is crucial for building scalable and efficient applications. Below, I present 7 concurrency patterns in Go that you should know.

1. Worker Pool

Explanation:
The Worker Pool pattern involves creating a fixed number of goroutines that process tasks from a shared queue. This pattern is useful for controlling the number of concurrent tasks, which is crucial for managing resource usage.

worker_pool.go
package main import ( "fmt" "sync" "time" ) func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) { defer wg.Done() for job := range jobs { fmt.Printf("Worker %d started job %d\n", id, job) time.Sleep(time.Second) fmt.Printf("Worker %d finished job %d\n", id, job) results <- job * 2 } } func main() { const numJobs = 5 const numWorkers = 3 jobs := make(chan int, numJobs) results := make(chan int, numJobs) var wg sync.WaitGroup for i := 1; i <= numWorkers; i++ { wg.Add(1) go worker(i, jobs, results, &wg) } for j := 1; j <= numJobs; j++ { jobs <- j } close(jobs) wg.Wait() close(results) for result := range results { fmt.Println("Result:", result) } }

Real-World Scenario:
A web server handling incoming HTTP requests, where each request is processed by a worker from the pool.

2. Fan-Out, Fan-In

Explanation:
Fan-Out occurs when multiple goroutines are started to process data, and Fan-In is when the results from these goroutines are combined into a single channel. This pattern is useful for parallel processing and then collecting the results.

fan_out_fan_in.go
package main import ( "fmt" "sync" ) func producer(id int, ch chan<- int, wg *sync.WaitGroup) { defer wg.Done() for i := 0; i < 5; i++ { ch <- i fmt.Printf("Producer %d produced %d\n", id, i) } } func consumer(id int, in <-chan int, out chan<- int, wg *sync.WaitGroup) { defer wg.Done() for v := range in { out <- v * 2 fmt.Printf("Consumer %d processed %d\n", id, v) } } func main() { numProducers := 2 numConsumers := 2 input := make(chan int, 10) output := make(chan int, 10) var wg sync.WaitGroup for i := 1; i <= numProducers; i++ { wg.Add(1) go producer(i, input, &wg) } wg.Wait() close(input) for i := 1; i <= numConsumers; i++ { wg.Add(1) go consumer(i, input, output, &wg) } wg.Wait() close(output) for result := range output { fmt.Println("Result:", result) } }

Real-World Scenario:
A data processing pipeline where different stages of processing are handled by different sets of workers.

3. Pipeline

Explanation:
The Pipeline pattern involves chaining a series of stages where each one performs a transformation on the data and passes it to the next stage. It's useful in scenarios where data needs to go through multiple sequential processing steps.

pipeline.go
package main import "fmt" func stage1(nums []int) <-chan int { out := make(chan int) go func() { for _, n := range nums { out <- n } close(out) }() return out } func stage2(in <-chan int) <-chan int { out := make(chan int) go func() { for n := range in { out <- n * 2 } close(out) }() return out } func stage3(in <-chan int) <-chan int { out := make(chan int) go func() { for n := range in { out <- n + 1 } close(out) }() return out } func main() { nums := []int{1, 2, 3, 4, 5} c1 := stage1(nums) c2 := stage2(c1) c3 := stage3(c2) for result := range c3 { fmt.Println(result) } }

Real-World Scenario:
An image processing system where an image goes through several stages such as resizing, filtering, and encoding.

4. Publish-Subscribe

Explanation:
The Publish-Subscribe pattern allows messages to be published to multiple subscribers. This pattern is useful in systems where different services need to react independently to certain events or message types.

pub_sub.go
package main import ( "fmt" "sync" "time" ) type PubSub struct { mu sync.Mutex channels map[string][]chan string } func NewPubSub() *PubSub { return &PubSub{ channels: make(map[string][]chan string), } } func (ps *PubSub) Subscribe(topic string) <-chan string { ch := make(chan string) ps.mu.Lock() ps.channels[topic] = append(ps.channels[topic], ch) ps.mu.Unlock() return ch } func (ps *PubSub) Publish(topic, msg string) { ps.mu.Lock() for _, ch := range ps.channels[topic] { ch <- msg } ps.mu.Unlock() } func (ps *PubSub) Close(topic string) { ps.mu.Lock() for _, ch := range ps.channels[topic] { close(ch) } ps.mu.Unlock() } func main() { ps := NewPubSub() subscriber1 := ps.Subscribe("news") subscriber2 := ps.Subscribe("news") var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() for msg := range subscriber1 { fmt.Println("Subscriber 1 received:", msg) } }() go func() { defer wg.Done() for msg := range subscriber2 { fmt.Println("Subscriber 2 received:", msg) } }() ps.Publish("news", "Breaking News!") ps.Publish("news", "Another News!") time.Sleep(time.Second) ps.Close("news") wg.Wait() }

Real-World Scenario:
A messaging system where different services subscribe to certain types of events or messages.

5. Select with Timeout

Explanation:
Using the select statement with a timeout allows you to avoid indefinite blocking. This pattern is useful when you want to perform an action or abort if an operation takes too long.

select_timeout.go
package main import ( "fmt" "time" ) func main() { c := make(chan string) go func() { time.Sleep(2 * time.Second) c <- "result" }() select { case res := <-c: fmt.Println("Received:", res) case <-time.After(1 * time.Second): fmt.Println("Timeout") } }

Real-World Scenario:
A network client attempting to connect to a server and stopping if the server doesn't respond in time.

6. Semaphore

Explanation:
A Semaphore limits the number of goroutines that can access a particular resource concurrently. This pattern is useful for controlling concurrency and avoiding resource overload.

semaphore.go
package main import ( "fmt" "sync" "time" ) func worker(id int, sem chan struct{}, wg *sync.WaitGroup) { defer wg.Done() sem <- struct{}{} // Adquirir semáforo fmt.Printf("Worker %d starting\n", id) time.Sleep(time.Second) fmt.Printf("Worker %d done\n", id) <-sem // Liberar semáforo } func main() { const numWorkers = 5 const maxConcurrent = 2 sem := make(chan struct{}, maxConcurrent) var wg sync.WaitGroup for i := 1; i <= numWorkers; i++ { wg.Add(1) go worker(i, sem, &wg) } wg.Wait() }

Real-World Scenario:
A database connection pool where a limited number of connections are allowed at once.

7. Rate Limiting

Explanation:
Rate Limiting controls the rate at which events are processed using a ticker. This pattern is useful when you need to control the frequency of certain tasks, such as API requests.

rate_limiting.go
package main import ( "fmt" "time" ) func main() { rate := time.Second ticker := time.NewTicker(rate) defer ticker.Stop() requests := make(chan int, 5) for i := 1; i <= 5; i++ { requests <- i } close(requests) for req := range requests { <-ticker.C // Esperar el siguiente tick fmt.Println("Processing request", req) } }

Real-World Scenario:
An API gateway that limits the number of requests a user can make in a given time period.

Conclusion

Concurrency patterns in Go are essential for building efficient and scalable applications. Mastering these patterns will allow you to handle concurrency effectively, optimizing resource usage and improving your application's performance.


Thanks for reading! If you enjoyed this post, make sure to share it and follow me for more content about Go and software development.