Go Concurrency Notes

Table of Contents

Restricting Writers

Scenario

Building library that perform expensive I/O operation (eg: writing to a file). The library uses a writer code to write the data and it limits the number of writers to the number of CPU cores. This will safeguard the library write operation to work correctly and consistently in the event if the caller is calling the writer in an uncontrollable manner. Imagine an app that calls the writer in 100 goroutines in the hope it speeds up the write operation, which in reality does not happen as I/O operation are bound to the speed of the I/O peripherals.

Solution

The caller calls the writer function in a normal way. This will works normal in a sequential manner, the number of channels does not make any difference. All 10 calling will be processed.

package main

import (
	"log"
	"time"
)

type storage struct {
	workersLimitCh chan struct{}
}

func (s *storage) InsertRows(i int) error {
	insert := func() error {
		j := i
		defer func() { <-s.workersLimitCh }()
		log.Println("Sleeping...", j)
		time.Sleep(2 * time.Second)
		return nil
	}

	log.Println("Selecting...")

	select {
	case s.workersLimitCh <- struct{}{}:
		log.Println("insert()....")
		return insert()
	default:
		log.Println("defaulting...")
	}

	return nil
}

func main() {
	s := storage{}
	s.workersLimitCh = make(chan struct{}, 10)
	for i := 0; i <= 10; i++ {
		s.InsertRows(i)
	}
	for {
	}
}

The caller spins up goroutine to call the writer function. Even when the client spins up 10 goroutine to call the function, the function will limit it to only the number of channels available (which is 4). The rest of the 6 goroutine will be ignored.

package main

import (
	"log"
	"time"
)

type storage struct {
	workersLimitCh chan struct{}
}

func (s *storage) InsertRows(i int) error {
	insert := func() error {
		j := i
		defer func() { <-s.workersLimitCh }()
		log.Println("Sleeping...", j)
		time.Sleep(2 * time.Second)
		log.Println("Done Sleeping...", j)

		return nil
	}

	log.Println("Selecting...")

	select {
	case s.workersLimitCh <- struct{}{}:
		log.Println("insert()....")
		return insert()
	default:
		log.Println("defaulting...")
	}

	return nil
}

func main() {
	s := storage{}
	s.workersLimitCh = make(chan struct{}, 4)
	for i := 0; i <= 10; i++ {
		go func(i int) {
			s.InsertRows(i)

		}(i)
	}
	for {
	}
}