Golang has excellent support for concurrency. Let’s see how to implement a common pattern - worker pool in Golang.

Worker pool

A worker pool is a collection of threads that are waiting for tasks to be assigned. We can limit the number of concurrent operations at a time with this approach. Usually starting a new concurrent thread or routine is not practical for every task especially when the number of tasks is large.

Example

Let’s see an example of a worker pool. We will create a pool of workers and assign tasks to them.

package workerpool

import (
    "fmt"
    "sync"
    "time"
)

// A task should have process method
type Task interface {
    Process()
}

// WorkerPool is a collection of workers
type WorkerPool struct {
    taskList  []Task
	workers   int
	taskQueue chan Task
	wg        sync.WaitGroup
}

// AddTask adds a task to the worker pool
func (wp *WorkerPool) AddTask(task Task) {
    wp.taskList = append(wp.taskList, task)
}

// worker is a worker function that runs process method of a task
// it will be called as a go routine
func (wp *WorkerPool) work(id int) {
    // this will wait for the task to be assigned
    // task will be received from the taskQueue channel
	for task := range wp.taskQueue {
		task.Process()
	}
    // calls done after channel is closed
	wp.wg.Done()
}

// Run starts the worker pool, creates workers and assigns tasks
func (wp *WorkerPool) Run() {
    wp.taskQueue = make(chan Task, len(wp.TaskList))

    // create workers
    for i := 0; i < wp.workers; i++ {
        go wp.work(i)
    }

    // wait config for all workers
    wp.wg.Add(wp.workers)
    // send tasks to workers via taskQueue channel
    for _, task := range wp.TaskList {
        wp.taskQueue <- task
    }
    // close the taskQueue channel after all tasks are assigned
    close(wp.taskQueue)

    // wait for all workers to finish
    wp.wg.Wait()
}

We can implement a task by implementing the Task interface. and these tasks can be assigned to the worker pool.

Example tasks:

type EmailTask struct {
    To      string
    Subject string
    Body    string
}

func (e *EmailTask) Process() {
    fmt.Printf("Sending email to %s\n", e.To)
    time.Sleep(1 * time.Second)
}

type ApiCallTask struct {
    Url string
}

func (a *ApiCallTask) Process() {
    fmt.Printf("Calling API %s\n", a.Url)
    time.Sleep(1 * time.Second)
}

Finally, let’s see a sample usage of the worker pool.

package main

import (
    "worker-pattern/workerpool"
)

func main() {
    wp := workerpool.WorkerPool{
        TaskList: tasks,
        workers:  3,
    }

    wp.AddTask(&EmailTask{})
    wp.AddTask(&ApiCallTask{})
    wp.AddTask(&EmailTask{})
    wp.AddTask(&ApiCallTask{})
    wp.AddTask(&EmailTask{})
    wp.AddTask(&ApiCallTask{})
    wp.AddTask(&EmailTask{})
    wp.AddTask(&ApiCallTask{})
    wp.AddTask(&EmailTask{})
    wp.AddTask(&ApiCallTask{})
    wp.AddTask(&EmailTask{})
    wp.AddTask(&ApiCallTask{})

    wp.Run()
}

In the above example, we have total 12 tasks and 3 workers. So, 3 workers will process 12 tasks in a controlled manner.

This pattern will be useful to limit the number of concurrent goroutines.