Pipeline is yet another common pattern we see in concurrent programming.

Stages for a data processing pipline can be for example,

  • Read the file
  • Process the data
  • Transform the data
  • Write the data to another file

Let’s check an example of such a pipeline in Golang.

package pipeline

import (
    "fmt"
    "time"
)

type FileData struct {
    src string
    data string
}

// readData stage where we read the file
// it returns a channel of FileData to use in the next stage
func readData(filePaths []string) <-chan FileData {
    out := make(chan FileData)
    go func() {
        for _, path := range filePaths {
            time.Sleep(1 * time.Second) // Simulate file read
            out <- FileData{src: path, data: "data"}
        }
        close(out)
    }()
    return out
}

// processData stage where we process the data
// it returns a channel of processed FileData to use in the next stage
func processData(in <-chan FileData) <-chan FileData {
    out := make(chan FileData)
    go func() {
        for data := range in {
            time.Sleep(1 * time.Second) // Simulate data processing
            out <- FileData{src: data.src, data: data.data + " processed"}
        }
        close(out)
    }()
    return out
}

// transformData stage where we transform the data
// it returns a channel of transformed FileData to use in the next stage
func transformData(in <-chan FileData) <-chan FileData {
    out := make(chan FileData)
    go func() {
        for data := range in {
            time.Sleep(1 * time.Second) // Simulate data transformation
            out <- FileData{src: data.src, data: data.data + " transformed"}
        }
        close(out)
    }()
    return out
}

// writeData stage where we write the data to a file
func writeData(in <-chan FileData) bool {
    for data := range in {
        time.Sleep(1 * time.Second) // Simulate file write
        fmt.Printf("Writing data to %s\n", data.src)
    }
    return true
}

// RunPipeline runs the pipeline
// each stage is a go routine
// and they are connected via channels
func RunPipeline(filePaths []string) {
    readDataCh := readData(filePaths)
    processDataCh := processData(readDataCh)
    transformDataCh := transformData(processDataCh)
    writeData(transformDataCh)
}

In the above example, we have implemented a pipeline with 4 stages. Each stage is a go routine and they are connected via channels.

We can run the pipeline by calling RunPipeline function with the file paths.