Understanding Concurrency in Go-Part 2

Concurrency Patterns

Confinement

When working with concurrent code, there are basically two ways by which we can perform safe operations:

  1. Explicit Techniques

    1. Synchronization primitives for sharing memory(e.g sync.Mutex)
    2. Synchronization via communicating (e.g. channels)
  2. Implicit Techniques

    1. Immutable data
    2. Data protected by Confinement

Immutable data is ideal because it is implicitly concurrent-safe. Each concurrent process can operate on the same data, but cannot modify it.If it wants to create a new data,it must create a new copy of the data with the desired modifications.In Go, this can be achieved by writing code that utilizes copies of values instead of pointers.

Confinement is simple and powerful and it ensures the information is only ever available from one concurrent process. When this is achieved, a concurrent program is implicitly safe and no synchronization is needed. There are two kinds of confinements:

  1. ad hoc confinement: This is achieved via a convention - sticking to a convention is difficult,unless you have tools to enforce that. For example
        data := make([]int,4)

        loopData := func(handleData chan <- int){
            defer close(handleData)
            
            for i := range data{
                handleData <- data[i]
            }
        }

        handleData := make(chan int)
        go loopData(handleData)

        for num := range handleData{
            fmt.Println(num)
        }

We can see that the data slice of integers is available from both the loopData function and the loop over the handleData channel; however, by convention we’re only accessing it from the loopData function. But as the code is touched by many people, and deadlines loom, mistakes might be made, and the confinement might break down and cause issues.

  1. lexical confienment: This is the preferred technique as this invloves using the lexical scope to expose only the correct data and concurrency primitives for multiple concurrent processes to use. One way of doing it is by exposing read or write levels of channels at different parts of the program:
        chanOwner := func() <-chan int{
            results := make(chan int 5)
            go func(){
                defer close(results)
                for i := 0; i<= 5; i++ {
                    reults <- i
                }
            }()
            return results
        }
        consumer := func(results <-chan int) { 
        for result := range results {
            fmt.Printf("Received: %d\n", result)
        }
        fmt.Println("Done receiving!")
        }

        results := chanOwner()        
        consumer(results)

Here the channel is instantiated within the lexical scope of chanOwner function.This limits the scope of the write aspect of the channel within the closure and this prevents other goroutine from writing to it.

The consumer receives a read-only aspect of the channel and can only read from it. This is an ideal case for confinement since channels are concurrent safe. A much better example is the use of a data structure which is not concurrent-safe, like an instance of bytes.buffer:

        printData := func(wg *sync.WaitGroup, data []byte) {
        defer wg.Done()

        var buff bytes.Buffer
        for _, b := range data {
            fmt.Fprintf(&buff, "%c", b)
        }
        fmt.Println(buff.String())
        }

        var wg sync.WaitGroup
        wg.Add(2)
        data := []byte("golang")
        go printData(&wg, data[:3])    
        go printData(&wg, data[3:])    

        wg.Wait()

Here the printData function doesn’t close around the data slice, it cannot access it and it needs to take a slice of the byte to operate on.We pass in different subsets of the slice, thus constraining the goroutines we start to only the part of the slice we’re passing in. Because of the lexical scope, we’ve made it impossible to do the wrong thing, and so we don’t need to synchronize memory access or share data through communication.

The for-select Loop

The general syntax of this type of loop is:

        for { // Either loop infinitely or range over something
            select {
            // Do some work with channels
            }
        }

Some scenarios where this is useful:

  1. Sending iteration variables out on a channel:
        for _, s := range []string{"a", "b", "c"} {
        select {
        case <-done:
            return
        case stringStream <- s:
        }
        }
  1. Looping infinitely waiting to be stopped.It’s very common to create goroutines that loop infinitely until they are stopped. There can be two variations to this:

This keeps the select statement as short as possible:

        for {
            select {
            case <-done:
                return
            default:
            }

            // Do non-preemptable work
        }

The second variation embeds work in a default clause of the select statement:

        for {
        select {
        case <-done:
            return
        default:
            // Do non-preemptable work
        }
        }

When we enter the select statement, if the done channel hasn’t been closed, we’ll execute the default clause instead.

Preventing Goroutine leaks

The ease with which goroutines are created,their small memory footprint and the level of abstraction provided by the runtime does not mean that a goroutine does not require cleanup.Yes, goroutines are not garbage collected by the runtime. The goroutines has a few paths to termination:

  1. When it has completed work.
  2. When it cannot continue due to an unrecoverable error.
  3. When it’s told to stop working.

The first two paths are how we write our programs but the third part requires the main or parent goroutine to have contextual knowledge of its child goroutines and can signal anytime to terminate and cleanup.

A simple example of a goroutine leak:

    doWork := func(strings <-chan string) <-chan interface{}{
        completed  := make(chan interface{})
        go func(){
            defer fmt.Println("doWork exited...")
            defer close(completed)

            for s := range strings{
                //Do something
                fmt.Println(s)
            }
        }()
        return completed
    }

    doWork(nil)
    fmt.Println("Done")

Here we see that the main goroutine passes a nil channel into doWork. Therefore, the strings channel will never actually gets any strings written onto it, and the goroutine containing doWork will remain in memory for the lifetime of this process (we would even deadlock if we joined the goroutine within doWork and the main goroutine).

The way to mitigate this is to establish a signal between the parent goroutine and its children that allows the parent to signal cancellation to its children.By convention,this signal is usually a read-only channel named done. The parent goroutine passes this channel to the child goroutine and then closes the channel when it wants to cancel the child goroutine.

        //Passing the done channel
        doWork := func(done <-chan interface{},strings <-chan string)<-chan interface{}{
            terminated := make(chan interface{})
            go func() {
                defer fmt.Println("doWork Exited")
                defer close(terminated)
                for {
                    select {
                        case s := <-strings:
                            fmt.Println(s)

                        // Checking if the done channel has been signalled
                        case <-done:
                            return
                    }
                }
            }()
            return terminated
        }

        done := make(chan interface{})
        terminated := doWork(done,nil)

        //Another goroutine that will cancel the goroutine spawned in doWork
        go func(){
            //Cancel the operation after 1 sec
            time.Sleep(1 *time.Second)
            fmt.Println("Cancelling the doWork goroutine....")
            close(done)
        }()

        //This where the spawned goroutine in doWork is joined with the main goroutine
        <-terminated
        fmt.Println("Done")

Here we did not get any deadlock, because before we join the two goroutines, we create a third goroutine to cancel the goroutine within doWork after a second.

There can be another situation where a goroutine is blocked in an attempt to write a value to a channel:


            newRandStream := func() <-chan int {
            randStream := make(chan int)
            go func() {
                defer fmt.Println("newRandStream closure exited.") 1
                defer close(randStream)
                for {
                    randStream <- rand.Int()
                }
            }()

            return randStream
        }

        randStream := newRandStream()
        fmt.Println("3 random ints:")
        for i := 1; i <= 3; i++ {
            fmt.Printf("%d: %d\n", i, <-randStream)
        }

Running the code produces:

        3 random ints:
        1: 5577006791947779410
        2: 8674665223082153551
        3: 6129484611666145821

The deffered fmt.Println statement never gets run.After the third iteration of our loop, our goroutine blocks trying to send the next random integer to a channel that is no longer being read from. We have no way of telling the producer it can stop.

The problem can be fixed by providing the producer with a channel informing it to exit:


    newRandStream := func(done <-chan interface{}) <-chan int {
    randStream := make(chan int)
    go func() {
        defer fmt.Println("newRandStream closure exited.")
        defer close(randStream)
        for {
            select {
            case randStream <- rand.Int():
            case <-done:
                return
            }
        }
    }()

    return randStream
    }

    done := make(chan interface{})
    randStream := newRandStream(done)
    fmt.Println("3 random ints:")
    for i := 1; i <= 3; i++ {
        fmt.Printf("%d: %d\n", i, <-randStream)
    }
    close(done)

    // Simulate ongoing work
    time.Sleep(1 * time.Second)

This produces the correct output now:

3 random ints:
1: 5577006791947779410
2: 8674665223082153551
3: 6129484611666145821
newRandStream closure exited.

The convention here is: If a goroutine is responsible for creating a goroutine, it is also responsible for ensuring it can stop the goroutine.

I am still reaing up on more patterns and will update the blog post soon. These patterns are:

  1. The Or Channel
  2. Error Propagation
  3. Pipelines