Concurrency Design Pattern in Go

Design Patterns

Concurrency in Golang is quite simple, and it’s a powerful tool. Understanding concurrency basic is important before jumping any further. Once basic understanding has been established the next step is to go deeper by looking at using different kind of concurrency patterns. Understanding different concurrency pattern is important to make it easier in picking up the right solution to a certain concurrency problem that we are trying to tackle. Also, it will reduce boilerplate code .

Pipeline

The cascading effect of closing channel (from close(done)) will ensure that all the other channels inside generator(..), add(..) and multiply (..) will also closed.

func main(){
    generator := func(done <-chan interface{}, integers ...int) <-chan int {
        intStream := make(chan int)
        go func() {
            defer close(intStream)
            for _, i := range integers {
                select {
                case <-done:
                    return
                case intStream <- i:
                }
            }
        }()
        return intStream
    }

    multiply := func(
        done <-chan interface{},
        intStream <-chan int,
        multiplier int,
    ) <-chan int {
        multipliedStream := make(chan int)
        go func() {
            defer close(multipliedStream)
            for i := range intStream {
                select {
                case <-done:
                    return
                case multipliedStream <- i*multiplier:
                }
            }
        }()
        return multipliedStream
    }

    add := func(
        done <-chan interface{},
        intStream <-chan int,
        additive int,
    ) <-chan int {
        addedStream := make(chan int)
        go func() {
            defer close(addedStream)
            for i := range intStream {
                select {
                case <-done:
                    return
                case addedStream <- i+additive:
                }
            }
        }()
        return addedStream
    }

    done := make(chan interface{})
    defer close(done)
    intStream := generator(done, 1, 2, 3, 4)
    pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)
    for v := range pipeline {
        fmt.Println(v)
    } 
}

Resources:

Tee-Channel

Takes a single input channel and an arbitrary number of output channels and duplicates each input into every output. When the input channel is closed, all outputs channels are closed. The eapache channels project provide this kind of pattern. Following code snippet is the implementation.

func tee(input SimpleOutChannel, outputs []SimpleInChannel, closeWhenDone bool) {
	cases := make([]reflect.SelectCase, len(outputs))
	for i := range cases {
		cases[i].Dir = reflect.SelectSend
	}
	for elem := range input.Out() {
		for i := range cases {
			cases[i].Chan = reflect.ValueOf(outputs[i].In())
			cases[i].Send = reflect.ValueOf(elem)
		}
		for _ = range cases {
			chosen, _, _ := reflect.Select(cases)
			cases[chosen].Chan = reflect.ValueOf(nil)
		}
	}
	if closeWhenDone {
		for i := range outputs {
			outputs[i].Close()
		}
	}
}

The library uses Golang’s internal SelectCase package to process multiple channels.

type SelectCase struct {
    Dir  SelectDir // direction of case
    Chan Value     // channel to use (for send or receive)
    Send Value     // value to send (for send)
}

The SelectCase structure is like a container of channels depending on the direction specified (Dir).

Fan-in-Fan-out channel

This pattern collate results from goroutines that have been spawned to do background processing.

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := fanIn(generator("Hello"), generator("Bye"))
    for i := 0; i < 10; i++ {
        fmt.Println(<- ch)
    }
}

// fanIn is itself a generator
func fanIn(ch1, ch2 <-chan string) <-chan string { // receives two read-only channels
    new_ch := make(chan string)
    go func() { for { new_ch <- <-ch1 } }() // launch two goroutine while loops to continuously pipe to new channel
    go func() { for { new_ch <- <-ch2 } }()
    return new_ch
}

func generator(msg string) <-chan string { // returns receive-only channel
    ch := make(chan string)
    go func() { // anonymous goroutine
        for i := 0; ; i++ {
            ch <- fmt.Sprintf("%s %d", msg, i)
            time.Sleep(time.Second)
        }
    }()
    return ch
}    

Context

Good for cancelling / terminating goroutine by propagation. In order for this to work effectively the context must be send as part of argument parameters to each of the goroutines. The Done() inside locale(..) method is called as the context.WithTimeout(..) got triggered after 1 second and it propagates all the way to the root context

func main() {
    var wg sync.WaitGroup
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    wg.Add(1)
    go func() {
        defer wg.Done()
        if err := printGreeting(ctx); err != nil {
            fmt.Printf("cannot print greeting: %v\n", err)
            cancel()
        }
    }()
    wg.Add(1)
    go func() {
        defer wg.Done()
        if err := printFarewell(ctx); err != nil {
            fmt.Printf("cannot print farewell: %v\n", err)
        }
    }()
    wg.Wait()
}

func printGreeting(ctx context.Context) error {
    greeting, err := genGreeting(ctx)
    if err != nil {
        return err
    }
    fmt.Printf("%s world!\n", greeting)
    return nil
}

func printFarewell(ctx context.Context) error {
    farewell, err := genFarewell(ctx)
    if err != nil {
        return err
    }
    fmt.Printf("%s world!\n", farewell)
    return nil
}

func genGreeting(ctx context.Context) (string, error) {
    ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
    defer cancel()
    switch locale, err := locale(ctx); {
    case err != nil:
        return "", err
    case locale == "EN/US":
        return "hello", nil
    }
    return "", fmt.Errorf("unsupported locale")
}

func genFarewell(ctx context.Context) (string, error) {
    switch locale, err := locale(ctx); {
    case err != nil:
        return "", err
    case locale == "EN/US":
        return "goodbye", nil
    }
    return "", fmt.Errorf("unsupported locale")
}

func locale(ctx context.Context) (string, error) {
    select {
    case <-ctx.Done():
        return "", ctx.Err()
    case <-time.After(1 * time.Minute):
        fmt.Println("Expired")
    }
    return "EN/US", nil
}

Bridge Channel

.........

..........

.......

Error Propagation

.........

..........

.......

Concurrency Project

This section will discuss in detail about the go-resiliency project. This project is a very useful project to learn more in-depth about concurrency. There are several different implementations and patterns it implemented that are useful to use in an application. The other project that is useful to learn is the channels project, this project is useful to learn different ways using Golang channels.

Blackhole

The function of the class is to receive data and discard it and calculated the total number of data it received. This is useful in situation where we want to check if our app concurrency app is receiving the same amount of data as expected.

package channels

// BlackHole implements the InChannel interface and provides an analogue for the "Discard" variable in
// the ioutil package - it never blocks, and simply discards every value it reads. The number of items
// discarded in this way is counted and returned from Len.
type BlackHole struct {
	input  chan interface{}
	length chan int
	count  int
}

func NewBlackHole() *BlackHole {
	ch := &BlackHole{
		input:  make(chan interface{}),
		length: make(chan int),
	}
	go ch.discard()
	return ch
}

func (ch *BlackHole) In() chan<- interface{} {
	return ch.input
}

func (ch *BlackHole) Len() int {
	val, open := <-ch.length
	if open {
		return val
	} else {
		return ch.count
	}
}

func (ch *BlackHole) Cap() BufferCap {
	return Infinity
}

func (ch *BlackHole) Close() {
	close(ch.input)
}

func (ch *BlackHole) discard() {
	for {
		select {
		case _, open := <-ch.input:
			if !open {
				close(ch.length)
				return
			}
			ch.count++
		case ch.length <- ch.count:
		}
	}
}

Following is a code sample on how to use Blackhole

func TestBlackHole(t *testing.T) {
	discard := NewBlackHole()

	for i := 0; i < 1000; i++ {
		discard.In() <- i
	}

	discard.Close()

	if discard.Len() != 1000 {
		t.Error("blackhole expected 1000 was", discard.Len())
	}
}

Deadline

There are instances where we want our application to do a background process but there is a time limit imposed on it. Following is a sample snippet on how to use this functionality

func ExampleDeadline() {
	dl := New(1 * time.Second)

	err := dl.Run(func(stopper <-chan struct{}) error {
		// do something possibly slow
		// check stopper function and give up if timed out
		return nil
	})

	switch err {
	case ErrTimedOut:
		// execution took too long, oops
	default:
		// some other error
	}
}

Circuit Breaker

Implementation is a simple counter measurement of success and failure of a particular task. The only goroutine used is to run the timer to make sure that that there some time limit imposed. The different states - closed, half-open and open

  • closed – close the circuit breaker as all are good
  • half-open – transition to this state happens when the timeout timer has expired
  • open – transition to this state is triggered when error happens, there could be 2 different scenarios - no of errors reached errors threshold or the current state is half-open (timeout occured)

The function processResult(..) is the main logic performing the transition to different states based on the success and errors threshold criteria.

func (b *Breaker) processResult(result error, panicValue interface{}) {
	b.lock.Lock()
	defer b.lock.Unlock()

	if result == nil && panicValue == nil {
		if b.state == halfOpen {
			b.successes++
			if b.successes == b.successThreshold {
				b.closeBreaker()
			}
		}
	} else {
		if b.errors > 0 {
			expiry := b.lastError.Add(b.timeout)
			if time.Now().After(expiry) {
				b.errors = 0
			}
		}

		switch b.state {
		case closed:
			b.errors++
			if b.errors == b.errorThreshold {
				b.openBreaker()
			} else {
				b.lastError = time.Now()
			}
		case halfOpen:
			b.openBreaker()
		}
	}
}

The following diagram explains in detail how the logic works inside processResult. The diagram breaks down the logic into 2 different category - success and errors. Each category have different logic to transition to different states.

batcherchannels

Batcher

Below is a high level diagram on how the design pattern interact with the different pieces internally.

batcherchannels

This pattern handle situation where we need to do a particular task but we need to wait X amount of time before running it. For example - we are waiting for all parameteres to be collected together in 1 minute before executing a particular function/task for processing the parameter.

Code uses 2 different channels:

  • future – for communication err value received from the executed task
  • submit – this channel is used as part of the timeout feature. The channel will be pushed with
    • parameter that need to be collected to be sent to the targeted task and
    • future channels that will be used to send the error value

The submitWork method is the method that is called when there is a duration specified for the timeout as the task need to be executed asynchronously.

func (b *Batcher) submitWork(w *work) {
	b.lock.Lock()
	defer b.lock.Unlock()

	if b.submit == nil {
		b.submit = make(chan *work, 4)
		go b.batch()
	}

	b.submit <- w
}

The batch() method is the crux of the whole process. The code will keep looping the b.submit channel to extract the params interface{} and future channel.Code will exit the loop when the b.submit channel is closed by the the b.timer() function (as it expired)

Once it’s out of the loop it will execute the task by calling b.doWork(..) and on completion of the task the error obtained (err) will be push to all the future channels.

func (b *Batcher) batch() {
	var params []interface{}
	var futures []chan error
	input := b.submit

	go b.timer()

	for work := range input {
		params = append(params, work.param)   
		futures = append(futures, work.future)
	}

	ret := b.doWork(params)

	for _, future := range futures {
		future <- ret
		close(future)
	}
}

Following diagram outlines the different channels used for this pattern

batcherchannels