Advanced Go Concurrency Patterns

Sameer Ajmani

Google

Video

This talk was presented at Google I/O in May 2013.

2

Get ready

3

Go supports concurrency

In the language and runtime, not a library.

This changes how you structure your programs.

4

Goroutines and Channels

Goroutines are independently executing functions in the same address space.

go f()
go g(1, 2)

Channels are typed values that allow goroutines to synchronize and exchange information.

c := make(chan int)
go func() { c <- 3 }()
n := <-c

For more on the basics, watch Go Concurrency Patterns (Pike, 2012).

5

Example: ping-pong

// +build ignore,OMIT

package main

import (
	"fmt"
	"time"
)

type Ball struct{ hits int }

func main() {
    table := make(chan *Ball)
    go player("ping", table)
    go player("pong", table)

    table <- new(Ball) // game on; toss the ball
    time.Sleep(1 * time.Second)
    <-table // game over; grab the ball
}

func player(name string, table chan *Ball) {
    for {
        ball := <-table
        ball.hits++
        fmt.Println(name, ball.hits)
        time.Sleep(100 * time.Millisecond)
        table <- ball
    }
}
6

Deadlock detection

// +build ignore,OMIT

package main

import (
	"fmt"
	"time"
)

type Ball struct{ hits int }

func main() {
    table := make(chan *Ball)
    go player("ping", table)
    go player("pong", table)

    // table <- new(Ball) // game on; toss the ball
    time.Sleep(1 * time.Second)
    <-table // game over; grab the ball
}

func player(name string, table chan *Ball) {
    for {
        ball := <-table
        ball.hits++
        fmt.Println(name, ball.hits)
        time.Sleep(100 * time.Millisecond)
        table <- ball
    }
}
7

Panic dumps the stacks

// +build ignore,OMIT

package main

import (
	"fmt"
	"time"
)

type Ball struct{ hits int }

func main() {
    table := make(chan *Ball)
    go player("ping", table)
    go player("pong", table)

    table <- new(Ball) // game on; toss the ball
    time.Sleep(1 * time.Second)
    <-table // game over; grab the ball

    panic("show me the stacks")
}

func player(name string, table chan *Ball) {
    for {
        ball := <-table
        ball.hits++
        fmt.Println(name, ball.hits)
        time.Sleep(100 * time.Millisecond)
        table <- ball
    }
}
8

It's easy to go, but how to stop?

Long-lived programs need to clean up.

Let's look at how to write programs that handle communication, periodic events, and cancellation.

The core is Go's select statement: like a switch, but the decision is made based on the ability to communicate.

select {
case xc <- x:
    // sent x on xc
case y := <-yc:
    // received y from yc
}
9

Example: feed reader

My favorite feed reader disappeared. I need a new one.

Why not write one?

Where do we start?

10

Find an RSS client

Searching pkg.go.dev for "rss" turns up several hits, including one that provides:

// Fetch fetches Items for uri and returns the time when the next
// fetch should be attempted.  On failure, Fetch returns an error.
func Fetch(uri string) (items []Item, next time.Time, err error)

type Item struct{
    Title, Channel, GUID string // a subset of RSS fields
}

But I want a stream:

<-chan Item

And I want multiple subscriptions.

11

Here's what we have

type Fetcher interface {
    Fetch() (items []Item, next time.Time, err error)
}

func Fetch(domain string) Fetcher {...} // fetches Items from domain
12

Here's what we want

type Subscription interface {
    Updates() <-chan Item // stream of Items
    Close() error         // shuts down the stream
}

func Subscribe(fetcher Fetcher) Subscription {...} // converts Fetches to a stream

func Merge(subs ...Subscription) Subscription {...} // merges several streams
13

Example

//go:build ignore && OMIT
// +build ignore,OMIT

// fakemain runs the Subscribe example with a fake RSS fetcher.
package main

import (
	"fmt"
	"math/rand"
	"time"
)

// STARTITEM OMIT
// An Item is a stripped-down RSS item.
type Item struct{ Title, Channel, GUID string }

// STOPITEM OMIT

// STARTFETCHER OMIT
// A Fetcher fetches Items and returns the time when the next fetch should be
// attempted.  On failure, Fetch returns a non-nil error.
type Fetcher interface {
	Fetch() (items []Item, next time.Time, err error)
}

// STOPFETCHER OMIT

// STARTSUBSCRIPTION OMIT
// A Subscription delivers Items over a channel.  Close cancels the
// subscription, closes the Updates channel, and returns the last fetch error,
// if any.
type Subscription interface {
	Updates() <-chan Item
	Close() error
}

// STOPSUBSCRIPTION OMIT

// STARTSUBSCRIBE OMIT
// Subscribe returns a new Subscription that uses fetcher to fetch Items.
func Subscribe(fetcher Fetcher) Subscription {
	s := &sub{
		fetcher: fetcher,
		updates: make(chan Item),       // for Updates
		closing: make(chan chan error), // for Close
	}
	go s.loop()
	return s
}

// STOPSUBSCRIBE OMIT

// sub implements the Subscription interface.
type sub struct {
	fetcher Fetcher         // fetches items
	updates chan Item       // sends items to the user
	closing chan chan error // for Close
}

// STARTUPDATES OMIT
func (s *sub) Updates() <-chan Item {
	return s.updates
}

// STOPUPDATES OMIT

// STARTCLOSE OMIT
// STARTCLOSESIG OMIT
func (s *sub) Close() error {
	// STOPCLOSESIG OMIT
	errc := make(chan error)
	s.closing <- errc // HLchan
	return <-errc     // HLchan
}

// STOPCLOSE OMIT

// loopCloseOnly is a version of loop that includes only the logic
// that handles Close.
func (s *sub) loopCloseOnly() {
	// STARTCLOSEONLY OMIT
	var err error // set when Fetch fails
	for {
		select {
		case errc := <-s.closing: // HLchan
			errc <- err      // HLchan
			close(s.updates) // tells receiver we're done
			return
		}
	}
	// STOPCLOSEONLY OMIT
}

// loopFetchOnly is a version of loop that includes only the logic
// that calls Fetch.
func (s *sub) loopFetchOnly() {
	// STARTFETCHONLY OMIT
	var pending []Item // appended by fetch; consumed by send
	var next time.Time // initially January 1, year 0
	var err error
	for {
		var fetchDelay time.Duration // initially 0 (no delay)
		if now := time.Now(); next.After(now) {
			fetchDelay = next.Sub(now)
		}
		startFetch := time.After(fetchDelay)

		select {
		case <-startFetch:
			var fetched []Item
			fetched, next, err = s.fetcher.Fetch()
			if err != nil {
				next = time.Now().Add(10 * time.Second)
				break
			}
			pending = append(pending, fetched...)
		}
	}
	// STOPFETCHONLY OMIT
}

// loopSendOnly is a version of loop that includes only the logic for
// sending items to s.updates.
func (s *sub) loopSendOnly() {
	// STARTSENDONLY OMIT
	var pending []Item // appended by fetch; consumed by send
	for {
		var first Item
		var updates chan Item // HLupdates
		if len(pending) > 0 {
			first = pending[0]
			updates = s.updates // enable send case // HLupdates
		}

		select {
		case updates <- first:
			pending = pending[1:]
		}
	}
	// STOPSENDONLY OMIT
}

// mergedLoop is a version of loop that combines loopCloseOnly,
// loopFetchOnly, and loopSendOnly.
func (s *sub) mergedLoop() {
	// STARTFETCHVARS OMIT
	var pending []Item
	var next time.Time
	var err error
	// STOPFETCHVARS OMIT
	for {
		// STARTNOCAP OMIT
		var fetchDelay time.Duration
		if now := time.Now(); next.After(now) {
			fetchDelay = next.Sub(now)
		}
		startFetch := time.After(fetchDelay)
		// STOPNOCAP OMIT
		var first Item
		var updates chan Item
		if len(pending) > 0 {
			first = pending[0]
			updates = s.updates // enable send case
		}

		// STARTSELECT OMIT
		select {
		case errc := <-s.closing: // HLcases
			errc <- err
			close(s.updates)
			return
			// STARTFETCHCASE OMIT
		case <-startFetch: // HLcases
			var fetched []Item
			fetched, next, err = s.fetcher.Fetch() // HLfetch
			if err != nil {
				next = time.Now().Add(10 * time.Second)
				break
			}
			pending = append(pending, fetched...) // HLfetch
			// STOPFETCHCASE OMIT
		case updates <- first: // HLcases
			pending = pending[1:]
		}
		// STOPSELECT OMIT
	}
}

// dedupeLoop extends mergedLoop with deduping of fetched items.
func (s *sub) dedupeLoop() {
	const maxPending = 10
	// STARTSEEN OMIT
	var pending []Item
	var next time.Time
	var err error
	var seen = make(map[string]bool) // set of item.GUIDs // HLseen
	// STOPSEEN OMIT
	for {
		// STARTCAP OMIT
		var fetchDelay time.Duration
		if now := time.Now(); next.After(now) {
			fetchDelay = next.Sub(now)
		}
		var startFetch <-chan time.Time // HLcap
		if len(pending) < maxPending {  // HLcap
			startFetch = time.After(fetchDelay) // enable fetch case  // HLcap
		} // HLcap
		// STOPCAP OMIT
		var first Item
		var updates chan Item
		if len(pending) > 0 {
			first = pending[0]
			updates = s.updates // enable send case
		}
		select {
		case errc := <-s.closing:
			errc <- err
			close(s.updates)
			return
		// STARTDEDUPE OMIT
		case <-startFetch:
			var fetched []Item
			fetched, next, err = s.fetcher.Fetch() // HLfetch
			if err != nil {
				next = time.Now().Add(10 * time.Second)
				break
			}
			for _, item := range fetched {
				if !seen[item.GUID] { // HLdupe
					pending = append(pending, item) // HLdupe
					seen[item.GUID] = true          // HLdupe
				} // HLdupe
			}
			// STOPDEDUPE OMIT
		case updates <- first:
			pending = pending[1:]
		}
	}
}

// loop periodically fetches Items, sends them on s.updates, and exits
// when Close is called.  It extends dedupeLoop with logic to run
// Fetch asynchronously.
func (s *sub) loop() {
	const maxPending = 10
	type fetchResult struct {
		fetched []Item
		next    time.Time
		err     error
	}
	// STARTFETCHDONE OMIT
	var fetchDone chan fetchResult // if non-nil, Fetch is running // HL
	// STOPFETCHDONE OMIT
	var pending []Item
	var next time.Time
	var err error
	var seen = make(map[string]bool)
	for {
		var fetchDelay time.Duration
		if now := time.Now(); next.After(now) {
			fetchDelay = next.Sub(now)
		}
		// STARTFETCHIF OMIT
		var startFetch <-chan time.Time
		if fetchDone == nil && len(pending) < maxPending { // HLfetch
			startFetch = time.After(fetchDelay) // enable fetch case
		}
		// STOPFETCHIF OMIT
		var first Item
		var updates chan Item
		if len(pending) > 0 {
			first = pending[0]
			updates = s.updates // enable send case
		}
		// STARTFETCHASYNC OMIT
		select {
		case <-startFetch: // HLfetch
			fetchDone = make(chan fetchResult, 1) // HLfetch
			go func() {
				fetched, next, err := s.fetcher.Fetch()
				fetchDone <- fetchResult{fetched, next, err}
			}()
		case result := <-fetchDone: // HLfetch
			fetchDone = nil // HLfetch
			// Use result.fetched, result.next, result.err
			// STOPFETCHASYNC OMIT
			fetched := result.fetched
			next, err = result.next, result.err
			if err != nil {
				next = time.Now().Add(10 * time.Second)
				break
			}
			for _, item := range fetched {
				if id := item.GUID; !seen[id] { // HLdupe
					pending = append(pending, item)
					seen[id] = true // HLdupe
				}
			}
		case errc := <-s.closing:
			errc <- err
			close(s.updates)
			return
		case updates <- first:
			pending = pending[1:]
		}
	}
}

// naiveMerge is a version of Merge that doesn't quite work right.  In
// particular, the goroutines it starts may block forever on m.updates
// if the receiver stops receiving.
type naiveMerge struct {
	subs    []Subscription
	updates chan Item
}

// STARTNAIVEMERGE OMIT
func NaiveMerge(subs ...Subscription) Subscription {
	m := &naiveMerge{
		subs:    subs,
		updates: make(chan Item),
	}
	// STARTNAIVEMERGELOOP OMIT
	for _, sub := range subs {
		go func(s Subscription) {
			for it := range s.Updates() {
				m.updates <- it // HL
			}
		}(sub)
	}
	// STOPNAIVEMERGELOOP OMIT
	return m
}

// STOPNAIVEMERGE OMIT

// STARTNAIVEMERGECLOSE OMIT
func (m *naiveMerge) Close() (err error) {
	for _, sub := range m.subs {
		if e := sub.Close(); err == nil && e != nil {
			err = e
		}
	}
	close(m.updates) // HL
	return
}

// STOPNAIVEMERGECLOSE OMIT

func (m *naiveMerge) Updates() <-chan Item {
	return m.updates
}

type merge struct {
	subs    []Subscription
	updates chan Item
	quit    chan struct{}
	errs    chan error
}

// STARTMERGESIG OMIT
// Merge returns a Subscription that merges the item streams from subs.
// Closing the merged subscription closes subs.
func Merge(subs ...Subscription) Subscription {
	// STOPMERGESIG OMIT
	m := &merge{
		subs:    subs,
		updates: make(chan Item),
		quit:    make(chan struct{}),
		errs:    make(chan error),
	}
	// STARTMERGE OMIT
	for _, sub := range subs {
		go func(s Subscription) {
			for {
				var it Item
				select {
				case it = <-s.Updates():
				case <-m.quit: // HL
					m.errs <- s.Close() // HL
					return              // HL
				}
				select {
				case m.updates <- it:
				case <-m.quit: // HL
					m.errs <- s.Close() // HL
					return              // HL
				}
			}
		}(sub)
	}
	// STOPMERGE OMIT
	return m
}

func (m *merge) Updates() <-chan Item {
	return m.updates
}

// STARTMERGECLOSE OMIT
func (m *merge) Close() (err error) {
	close(m.quit) // HL
	for _ = range m.subs {
		if e := <-m.errs; e != nil { // HL
			err = e
		}
	}
	close(m.updates) // HL
	return
}

// STOPMERGECLOSE OMIT

// NaiveDedupe converts a stream of Items that may contain duplicates
// into one that doesn't.
func NaiveDedupe(in <-chan Item) <-chan Item {
	out := make(chan Item)
	go func() {
		seen := make(map[string]bool)
		for it := range in {
			if !seen[it.GUID] {
				// BUG: this send blocks if the
				// receiver closes the Subscription
				// and stops receiving.
				out <- it // HL
				seen[it.GUID] = true
			}
		}
		close(out)
	}()
	return out
}

type deduper struct {
	s       Subscription
	updates chan Item
	closing chan chan error
}

// Dedupe converts a Subscription that may send duplicate Items into
// one that doesn't.
func Dedupe(s Subscription) Subscription {
	d := &deduper{
		s:       s,
		updates: make(chan Item),
		closing: make(chan chan error),
	}
	go d.loop()
	return d
}

func (d *deduper) loop() {
	in := d.s.Updates() // enable receive
	var pending Item
	var out chan Item // disable send
	seen := make(map[string]bool)
	for {
		select {
		case it := <-in:
			if !seen[it.GUID] {
				pending = it
				in = nil        // disable receive
				out = d.updates // enable send
				seen[it.GUID] = true
			}
		case out <- pending:
			in = d.s.Updates() // enable receive
			out = nil          // disable send
		case errc := <-d.closing:
			err := d.s.Close()
			errc <- err
			close(d.updates)
			return
		}
	}
}

func (d *deduper) Close() error {
	errc := make(chan error)
	d.closing <- errc
	return <-errc
}

func (d *deduper) Updates() <-chan Item {
	return d.updates
}

// Fetch returns a Fetcher for Items from domain.
func Fetch(domain string) Fetcher {
	return fakeFetch(domain)
}

func fakeFetch(domain string) Fetcher {
	return &fakeFetcher{channel: domain}
}

type fakeFetcher struct {
	channel string
	items   []Item
}

// FakeDuplicates causes the fake fetcher to return duplicate items.
var FakeDuplicates bool

func (f *fakeFetcher) Fetch() (items []Item, next time.Time, err error) {
	now := time.Now()
	next = now.Add(time.Duration(rand.Intn(5)) * 500 * time.Millisecond)
	item := Item{
		Channel: f.channel,
		Title:   fmt.Sprintf("Item %d", len(f.items)),
	}
	item.GUID = item.Channel + "/" + item.Title
	f.items = append(f.items, item)
	if FakeDuplicates {
		items = f.items
	} else {
		items = []Item{item}
	}
	return
}

func init() {
	rand.Seed(time.Now().UnixNano())
}

// STARTMAIN OMIT
func main() {
    // Subscribe to some feeds, and create a merged update stream.
    merged := Merge(
        Subscribe(Fetch("blog.golang.org")),
        Subscribe(Fetch("googleblog.blogspot.com")),
        Subscribe(Fetch("googledevelopers.blogspot.com")))

    // Close the subscriptions after some time.
    time.AfterFunc(3*time.Second, func() {
        fmt.Println("closed:", merged.Close())
    })

    // Print the stream.
    for it := range merged.Updates() {
        fmt.Println(it.Channel, it.Title)
    }

    panic("show me the stacks")
}

// STOPMAIN OMIT
14

Subscribe

Subscribe creates a new Subscription that repeatedly fetches items until Close is called.

func Subscribe(fetcher Fetcher) Subscription {
    s := &sub{
        fetcher: fetcher,
        updates: make(chan Item), // for Updates
    }
    go s.loop()
    return s
}

// sub implements the Subscription interface.
type sub struct {
    fetcher Fetcher   // fetches items
    updates chan Item // delivers items to the user
}

// loop fetches items using s.fetcher and sends them
// on s.updates.  loop exits when s.Close is called.
func (s *sub) loop() {...}
15

Implementing Subscription

To implement the Subscription interface, define Updates and Close.

func (s *sub) Updates() <-chan Item {
    return s.updates
}
func (s *sub) Close() error {
    // TODO: make loop exit
    // TODO: find out about any error
    return err
}
16

What does loop do?

17

Naive Implementation

// naivemain runs the Subscribe example with the naive Subscribe
// implementation and a fake RSS fetcher.
//go:build ignore && OMIT
// +build ignore,OMIT

package main

import (
	"fmt"
	"math/rand"
	"time"
)

// STARTITEM OMIT
// An Item is a stripped-down RSS item.
type Item struct{ Title, Channel, GUID string }

// STOPITEM OMIT

// STARTFETCHER OMIT
// A Fetcher fetches Items and returns the time when the next fetch should be
// attempted.  On failure, Fetch returns a non-nil error.
type Fetcher interface {
	Fetch() (items []Item, next time.Time, err error)
}

// STOPFETCHER OMIT

// STARTSUBSCRIPTION OMIT
// A Subscription delivers Items over a channel.  Close cancels the
// subscription, closes the Updates channel, and returns the last fetch error,
// if any.
type Subscription interface {
	Updates() <-chan Item
	Close() error
}

// STOPSUBSCRIPTION OMIT

// STARTSUBSCRIBE OMIT
// Subscribe returns a new Subscription that uses fetcher to fetch Items.
func Subscribe(fetcher Fetcher) Subscription {
	s := &sub{
		fetcher: fetcher,
		updates: make(chan Item),       // for Updates
		closing: make(chan chan error), // for Close
	}
	go s.loop()
	return s
}

// STOPSUBSCRIBE OMIT

// sub implements the Subscription interface.
type sub struct {
	fetcher Fetcher         // fetches items
	updates chan Item       // sends items to the user
	closing chan chan error // for Close
}

// STARTUPDATES OMIT
func (s *sub) Updates() <-chan Item {
	return s.updates
}

// STOPUPDATES OMIT

// STARTCLOSE OMIT
// STARTCLOSESIG OMIT
func (s *sub) Close() error {
	// STOPCLOSESIG OMIT
	errc := make(chan error)
	s.closing <- errc // HLchan
	return <-errc     // HLchan
}

// STOPCLOSE OMIT

// loopCloseOnly is a version of loop that includes only the logic
// that handles Close.
func (s *sub) loopCloseOnly() {
	// STARTCLOSEONLY OMIT
	var err error // set when Fetch fails
	for {
		select {
		case errc := <-s.closing: // HLchan
			errc <- err      // HLchan
			close(s.updates) // tells receiver we're done
			return
		}
	}
	// STOPCLOSEONLY OMIT
}

// loopFetchOnly is a version of loop that includes only the logic
// that calls Fetch.
func (s *sub) loopFetchOnly() {
	// STARTFETCHONLY OMIT
	var pending []Item // appended by fetch; consumed by send
	var next time.Time // initially January 1, year 0
	var err error
	for {
		var fetchDelay time.Duration // initially 0 (no delay)
		if now := time.Now(); next.After(now) {
			fetchDelay = next.Sub(now)
		}
		startFetch := time.After(fetchDelay)

		select {
		case <-startFetch:
			var fetched []Item
			fetched, next, err = s.fetcher.Fetch()
			if err != nil {
				next = time.Now().Add(10 * time.Second)
				break
			}
			pending = append(pending, fetched...)
		}
	}
	// STOPFETCHONLY OMIT
}

// loopSendOnly is a version of loop that includes only the logic for
// sending items to s.updates.
func (s *sub) loopSendOnly() {
	// STARTSENDONLY OMIT
	var pending []Item // appended by fetch; consumed by send
	for {
		var first Item
		var updates chan Item // HLupdates
		if len(pending) > 0 {
			first = pending[0]
			updates = s.updates // enable send case // HLupdates
		}

		select {
		case updates <- first:
			pending = pending[1:]
		}
	}
	// STOPSENDONLY OMIT
}

// mergedLoop is a version of loop that combines loopCloseOnly,
// loopFetchOnly, and loopSendOnly.
func (s *sub) mergedLoop() {
	// STARTFETCHVARS OMIT
	var pending []Item
	var next time.Time
	var err error
	// STOPFETCHVARS OMIT
	for {
		// STARTNOCAP OMIT
		var fetchDelay time.Duration
		if now := time.Now(); next.After(now) {
			fetchDelay = next.Sub(now)
		}
		startFetch := time.After(fetchDelay)
		// STOPNOCAP OMIT
		var first Item
		var updates chan Item
		if len(pending) > 0 {
			first = pending[0]
			updates = s.updates // enable send case
		}

		// STARTSELECT OMIT
		select {
		case errc := <-s.closing: // HLcases
			errc <- err
			close(s.updates)
			return
			// STARTFETCHCASE OMIT
		case <-startFetch: // HLcases
			var fetched []Item
			fetched, next, err = s.fetcher.Fetch() // HLfetch
			if err != nil {
				next = time.Now().Add(10 * time.Second)
				break
			}
			pending = append(pending, fetched...) // HLfetch
			// STOPFETCHCASE OMIT
		case updates <- first: // HLcases
			pending = pending[1:]
		}
		// STOPSELECT OMIT
	}
}

// dedupeLoop extends mergedLoop with deduping of fetched items.
func (s *sub) dedupeLoop() {
	const maxPending = 10
	// STARTSEEN OMIT
	var pending []Item
	var next time.Time
	var err error
	var seen = make(map[string]bool) // set of item.GUIDs // HLseen
	// STOPSEEN OMIT
	for {
		// STARTCAP OMIT
		var fetchDelay time.Duration
		if now := time.Now(); next.After(now) {
			fetchDelay = next.Sub(now)
		}
		var startFetch <-chan time.Time // HLcap
		if len(pending) < maxPending {  // HLcap
			startFetch = time.After(fetchDelay) // enable fetch case  // HLcap
		} // HLcap
		// STOPCAP OMIT
		var first Item
		var updates chan Item
		if len(pending) > 0 {
			first = pending[0]
			updates = s.updates // enable send case
		}
		select {
		case errc := <-s.closing:
			errc <- err
			close(s.updates)
			return
		// STARTDEDUPE OMIT
		case <-startFetch:
			var fetched []Item
			fetched, next, err = s.fetcher.Fetch() // HLfetch
			if err != nil {
				next = time.Now().Add(10 * time.Second)
				break
			}
			for _, item := range fetched {
				if !seen[item.GUID] { // HLdupe
					pending = append(pending, item) // HLdupe
					seen[item.GUID] = true          // HLdupe
				} // HLdupe
			}
			// STOPDEDUPE OMIT
		case updates <- first:
			pending = pending[1:]
		}
	}
}

// loop periodically fetches Items, sends them on s.updates, and exits
// when Close is called.  It extends dedupeLoop with logic to run
// Fetch asynchronously.
func (s *sub) loop() {
	const maxPending = 10
	type fetchResult struct {
		fetched []Item
		next    time.Time
		err     error
	}
	// STARTFETCHDONE OMIT
	var fetchDone chan fetchResult // if non-nil, Fetch is running // HL
	// STOPFETCHDONE OMIT
	var pending []Item
	var next time.Time
	var err error
	var seen = make(map[string]bool)
	for {
		var fetchDelay time.Duration
		if now := time.Now(); next.After(now) {
			fetchDelay = next.Sub(now)
		}
		// STARTFETCHIF OMIT
		var startFetch <-chan time.Time
		if fetchDone == nil && len(pending) < maxPending { // HLfetch
			startFetch = time.After(fetchDelay) // enable fetch case
		}
		// STOPFETCHIF OMIT
		var first Item
		var updates chan Item
		if len(pending) > 0 {
			first = pending[0]
			updates = s.updates // enable send case
		}
		// STARTFETCHASYNC OMIT
		select {
		case <-startFetch: // HLfetch
			fetchDone = make(chan fetchResult, 1) // HLfetch
			go func() {
				fetched, next, err := s.fetcher.Fetch()
				fetchDone <- fetchResult{fetched, next, err}
			}()
		case result := <-fetchDone: // HLfetch
			fetchDone = nil // HLfetch
			// Use result.fetched, result.next, result.err
			// STOPFETCHASYNC OMIT
			fetched := result.fetched
			next, err = result.next, result.err
			if err != nil {
				next = time.Now().Add(10 * time.Second)
				break
			}
			for _, item := range fetched {
				if id := item.GUID; !seen[id] { // HLdupe
					pending = append(pending, item)
					seen[id] = true // HLdupe
				}
			}
		case errc := <-s.closing:
			errc <- err
			close(s.updates)
			return
		case updates <- first:
			pending = pending[1:]
		}
	}
}

// naiveMerge is a version of Merge that doesn't quite work right.  In
// particular, the goroutines it starts may block forever on m.updates
// if the receiver stops receiving.
type naiveMerge struct {
	subs    []Subscription
	updates chan Item
}

// STARTNAIVEMERGE OMIT
func NaiveMerge(subs ...Subscription) Subscription {
	m := &naiveMerge{
		subs:    subs,
		updates: make(chan Item),
	}
	// STARTNAIVEMERGELOOP OMIT
	for _, sub := range subs {
		go func(s Subscription) {
			for it := range s.Updates() {
				m.updates <- it // HL
			}
		}(sub)
	}
	// STOPNAIVEMERGELOOP OMIT
	return m
}

// STOPNAIVEMERGE OMIT

// STARTNAIVEMERGECLOSE OMIT
func (m *naiveMerge) Close() (err error) {
	for _, sub := range m.subs {
		if e := sub.Close(); err == nil && e != nil {
			err = e
		}
	}
	close(m.updates) // HL
	return
}

// STOPNAIVEMERGECLOSE OMIT

func (m *naiveMerge) Updates() <-chan Item {
	return m.updates
}

type merge struct {
	subs    []Subscription
	updates chan Item
	quit    chan struct{}
	errs    chan error
}

// STARTMERGESIG OMIT
// Merge returns a Subscription that merges the item streams from subs.
// Closing the merged subscription closes subs.
func Merge(subs ...Subscription) Subscription {
	// STOPMERGESIG OMIT
	m := &merge{
		subs:    subs,
		updates: make(chan Item),
		quit:    make(chan struct{}),
		errs:    make(chan error),
	}
	// STARTMERGE OMIT
	for _, sub := range subs {
		go func(s Subscription) {
			for {
				var it Item
				select {
				case it = <-s.Updates():
				case <-m.quit: // HL
					m.errs <- s.Close() // HL
					return              // HL
				}
				select {
				case m.updates <- it:
				case <-m.quit: // HL
					m.errs <- s.Close() // HL
					return              // HL
				}
			}
		}(sub)
	}
	// STOPMERGE OMIT
	return m
}

func (m *merge) Updates() <-chan Item {
	return m.updates
}

// STARTMERGECLOSE OMIT
func (m *merge) Close() (err error) {
	close(m.quit) // HL
	for _ = range m.subs {
		if e := <-m.errs; e != nil { // HL
			err = e
		}
	}
	close(m.updates) // HL
	return
}

// STOPMERGECLOSE OMIT

// NaiveDedupe converts a stream of Items that may contain duplicates
// into one that doesn't.
func NaiveDedupe(in <-chan Item) <-chan Item {
	out := make(chan Item)
	go func() {
		seen := make(map[string]bool)
		for it := range in {
			if !seen[it.GUID] {
				// BUG: this send blocks if the
				// receiver closes the Subscription
				// and stops receiving.
				out <- it // HL
				seen[it.GUID] = true
			}
		}
		close(out)
	}()
	return out
}

type deduper struct {
	s       Subscription
	updates chan Item
	closing chan chan error
}

// Dedupe converts a Subscription that may send duplicate Items into
// one that doesn't.
func Dedupe(s Subscription) Subscription {
	d := &deduper{
		s:       s,
		updates: make(chan Item),
		closing: make(chan chan error),
	}
	go d.loop()
	return d
}

func (d *deduper) loop() {
	in := d.s.Updates() // enable receive
	var pending Item
	var out chan Item // disable send
	seen := make(map[string]bool)
	for {
		select {
		case it := <-in:
			if !seen[it.GUID] {
				pending = it
				in = nil        // disable receive
				out = d.updates // enable send
				seen[it.GUID] = true
			}
		case out <- pending:
			in = d.s.Updates() // enable receive
			out = nil          // disable send
		case errc := <-d.closing:
			err := d.s.Close()
			errc <- err
			close(d.updates)
			return
		}
	}
}

func (d *deduper) Close() error {
	errc := make(chan error)
	d.closing <- errc
	return <-errc
}

func (d *deduper) Updates() <-chan Item {
	return d.updates
}

// Fetch returns a Fetcher for Items from domain.
func Fetch(domain string) Fetcher {
	return fakeFetch(domain)
}

func fakeFetch(domain string) Fetcher {
	return &fakeFetcher{channel: domain}
}

type fakeFetcher struct {
	channel string
	items   []Item
}

// FakeDuplicates causes the fake fetcher to return duplicate items.
var FakeDuplicates bool

func (f *fakeFetcher) Fetch() (items []Item, next time.Time, err error) {
	now := time.Now()
	next = now.Add(time.Duration(rand.Intn(5)) * 500 * time.Millisecond)
	item := Item{
		Channel: f.channel,
		Title:   fmt.Sprintf("Item %d", len(f.items)),
	}
	item.GUID = item.Channel + "/" + item.Title
	f.items = append(f.items, item)
	if FakeDuplicates {
		items = f.items
	} else {
		items = []Item{item}
	}
	return
}

func NaiveSubscribe(fetcher Fetcher) Subscription {
	s := &naiveSub{
		fetcher: fetcher,
		updates: make(chan Item),
	}
	go s.loop()
	return s
}

type naiveSub struct {
	fetcher Fetcher
	updates chan Item
	closed  bool
	err     error
}

func (s *naiveSub) Updates() <-chan Item {
	return s.updates
}

func (s *naiveSub) loop() {
    for {
        if s.closed {
            close(s.updates)
            return
        }
        items, next, err := s.fetcher.Fetch()
        if err != nil {
            s.err = err                 
            time.Sleep(10 * time.Second)
            continue
        }
        for _, item := range items {
            s.updates <- item
        }
        if now := time.Now(); next.After(now) {
            time.Sleep(next.Sub(now))
        }
    }
}

func (s *naiveSub) Close() error {
	s.closed = true // HLsync
	return s.err    // HLsync
}

func init() {
	rand.Seed(time.Now().UnixNano())
}

func main() {
	// Subscribe to some feeds, and create a merged update stream.
	merged := Merge(
		NaiveSubscribe(Fetch("blog.golang.org")),
		NaiveSubscribe(Fetch("googleblog.blogspot.com")),
		NaiveSubscribe(Fetch("googledevelopers.blogspot.com")))

	// Close the subscriptions after some time.
	time.AfterFunc(3*time.Second, func() {
		fmt.Println("closed:", merged.Close())
	})

	// Print the stream.
	for it := range merged.Updates() {
		fmt.Println(it.Channel, it.Title)
	}

	// The loops are still running.  Let the race detector notice.
	time.Sleep(1 * time.Second)

	panic("show me the stacks")
}
func (s *naiveSub) Close() error {
    s.closed = true
    return s.err   
}
18

Bug 1: unsynchronized access to s.closed/s.err

    for {
        if s.closed {
            close(s.updates)
            return
        }
        items, next, err := s.fetcher.Fetch()
        if err != nil {
            s.err = err
            time.Sleep(10 * time.Second)
            continue
        }
        for _, item := range items {
            s.updates <- item
        }
        if now := time.Now(); next.After(now) {
            time.Sleep(next.Sub(now))
        }
    }
func (s *naiveSub) Close() error {
    s.closed = true
    return s.err
}
19

Race Detector

go run -race naivemain.go
// naivemain runs the Subscribe example with the naive Subscribe
// implementation and a fake RSS fetcher.
//go:build ignore && OMIT
// +build ignore,OMIT

package main

import (
	"fmt"
	"math/rand"
	"time"
)

// STARTITEM OMIT
// An Item is a stripped-down RSS item.
type Item struct{ Title, Channel, GUID string }

// STOPITEM OMIT

// STARTFETCHER OMIT
// A Fetcher fetches Items and returns the time when the next fetch should be
// attempted.  On failure, Fetch returns a non-nil error.
type Fetcher interface {
	Fetch() (items []Item, next time.Time, err error)
}

// STOPFETCHER OMIT

// STARTSUBSCRIPTION OMIT
// A Subscription delivers Items over a channel.  Close cancels the
// subscription, closes the Updates channel, and returns the last fetch error,
// if any.
type Subscription interface {
	Updates() <-chan Item
	Close() error
}

// STOPSUBSCRIPTION OMIT

// STARTSUBSCRIBE OMIT
// Subscribe returns a new Subscription that uses fetcher to fetch Items.
func Subscribe(fetcher Fetcher) Subscription {
	s := &sub{
		fetcher: fetcher,
		updates: make(chan Item),       // for Updates
		closing: make(chan chan error), // for Close
	}
	go s.loop()
	return s
}

// STOPSUBSCRIBE OMIT

// sub implements the Subscription interface.
type sub struct {
	fetcher Fetcher         // fetches items
	updates chan Item       // sends items to the user
	closing chan chan error // for Close
}

// STARTUPDATES OMIT
func (s *sub) Updates() <-chan Item {
	return s.updates
}

// STOPUPDATES OMIT

// STARTCLOSE OMIT
// STARTCLOSESIG OMIT
func (s *sub) Close() error {
	// STOPCLOSESIG OMIT
	errc := make(chan error)
	s.closing <- errc // HLchan
	return <-errc     // HLchan
}

// STOPCLOSE OMIT

// loopCloseOnly is a version of loop that includes only the logic
// that handles Close.
func (s *sub) loopCloseOnly() {
	// STARTCLOSEONLY OMIT
	var err error // set when Fetch fails
	for {
		select {
		case errc := <-s.closing: // HLchan
			errc <- err      // HLchan
			close(s.updates) // tells receiver we're done
			return
		}
	}
	// STOPCLOSEONLY OMIT
}

// loopFetchOnly is a version of loop that includes only the logic
// that calls Fetch.
func (s *sub) loopFetchOnly() {
	// STARTFETCHONLY OMIT
	var pending []Item // appended by fetch; consumed by send
	var next time.Time // initially January 1, year 0
	var err error
	for {
		var fetchDelay time.Duration // initially 0 (no delay)
		if now := time.Now(); next.After(now) {
			fetchDelay = next.Sub(now)
		}
		startFetch := time.After(fetchDelay)

		select {
		case <-startFetch:
			var fetched []Item
			fetched, next, err = s.fetcher.Fetch()
			if err != nil {
				next = time.Now().Add(10 * time.Second)
				break
			}
			pending = append(pending, fetched...)
		}
	}
	// STOPFETCHONLY OMIT
}

// loopSendOnly is a version of loop that includes only the logic for
// sending items to s.updates.
func (s *sub) loopSendOnly() {
	// STARTSENDONLY OMIT
	var pending []Item // appended by fetch; consumed by send
	for {
		var first Item
		var updates chan Item // HLupdates
		if len(pending) > 0 {
			first = pending[0]
			updates = s.updates // enable send case // HLupdates
		}

		select {
		case updates <- first:
			pending = pending[1:]
		}
	}
	// STOPSENDONLY OMIT
}

// mergedLoop is a version of loop that combines loopCloseOnly,
// loopFetchOnly, and loopSendOnly.
func (s *sub) mergedLoop() {
	// STARTFETCHVARS OMIT
	var pending []Item
	var next time.Time
	var err error
	// STOPFETCHVARS OMIT
	for {
		// STARTNOCAP OMIT
		var fetchDelay time.Duration
		if now := time.Now(); next.After(now) {
			fetchDelay = next.Sub(now)
		}
		startFetch := time.After(fetchDelay)
		// STOPNOCAP OMIT
		var first Item
		var updates chan Item
		if len(pending) > 0 {
			first = pending[0]
			updates = s.updates // enable send case
		}

		// STARTSELECT OMIT
		select {
		case errc := <-s.closing: // HLcases
			errc <- err
			close(s.updates)
			return
			// STARTFETCHCASE OMIT
		case <-startFetch: // HLcases
			var fetched []Item
			fetched, next, err = s.fetcher.Fetch() // HLfetch
			if err != nil {
				next = time.Now().Add(10 * time.Second)
				break
			}
			pending = append(pending, fetched...) // HLfetch
			// STOPFETCHCASE OMIT
		case updates <- first: // HLcases
			pending = pending[1:]
		}
		// STOPSELECT OMIT
	}
}

// dedupeLoop extends mergedLoop with deduping of fetched items.
func (s *sub) dedupeLoop() {
	const maxPending = 10
	// STARTSEEN OMIT
	var pending []Item
	var next time.Time
	var err error
	var seen = make(map[string]bool) // set of item.GUIDs // HLseen
	// STOPSEEN OMIT
	for {
		// STARTCAP OMIT
		var fetchDelay time.Duration
		if now := time.Now(); next.After(now) {
			fetchDelay = next.Sub(now)
		}
		var startFetch <-chan time.Time // HLcap
		if len(pending) < maxPending {  // HLcap
			startFetch = time.After(fetchDelay) // enable fetch case  // HLcap
		} // HLcap
		// STOPCAP OMIT
		var first Item
		var updates chan Item
		if len(pending) > 0 {
			first = pending[0]
			updates = s.updates // enable send case
		}
		select {
		case errc := <-s.closing:
			errc <- err
			close(s.updates)
			return
		// STARTDEDUPE OMIT
		case <-startFetch:
			var fetched []Item
			fetched, next, err = s.fetcher.Fetch() // HLfetch
			if err != nil {
				next = time.Now().Add(10 * time.Second)
				break
			}
			for _, item := range fetched {
				if !seen[item.GUID] { // HLdupe
					pending = append(pending, item) // HLdupe
					seen[item.GUID] = true          // HLdupe
				} // HLdupe
			}
			// STOPDEDUPE OMIT
		case updates <- first:
			pending = pending[1:]
		}
	}
}

// loop periodically fetches Items, sends them on s.updates, and exits
// when Close is called.  It extends dedupeLoop with logic to run
// Fetch asynchronously.
func (s *sub) loop() {
	const maxPending = 10
	type fetchResult struct {
		fetched []Item
		next    time.Time
		err     error
	}
	// STARTFETCHDONE OMIT
	var fetchDone chan fetchResult // if non-nil, Fetch is running // HL
	// STOPFETCHDONE OMIT
	var pending []Item
	var next time.Time
	var err error
	var seen = make(map[string]bool)
	for {
		var fetchDelay time.Duration
		if now := time.Now(); next.After(now) {
			fetchDelay = next.Sub(now)
		}
		// STARTFETCHIF OMIT
		var startFetch <-chan time.Time
		if fetchDone == nil && len(pending) < maxPending { // HLfetch
			startFetch = time.After(fetchDelay) // enable fetch case
		}
		// STOPFETCHIF OMIT
		var first Item
		var updates chan Item
		if len(pending) > 0 {
			first = pending[0]
			updates = s.updates // enable send case
		}
		// STARTFETCHASYNC OMIT
		select {
		case <-startFetch: // HLfetch
			fetchDone = make(chan fetchResult, 1) // HLfetch
			go func() {
				fetched, next, err := s.fetcher.Fetch()
				fetchDone <- fetchResult{fetched, next, err}
			}()
		case result := <-fetchDone: // HLfetch
			fetchDone = nil // HLfetch
			// Use result.fetched, result.next, result.err
			// STOPFETCHASYNC OMIT
			fetched := result.fetched
			next, err = result.next, result.err
			if err != nil {
				next = time.Now().Add(10 * time.Second)
				break
			}
			for _, item := range fetched {
				if id := item.GUID; !seen[id] { // HLdupe
					pending = append(pending, item)
					seen[id] = true // HLdupe
				}
			}
		case errc := <-s.closing:
			errc <- err
			close(s.updates)
			return
		case updates <- first:
			pending = pending[1:]
		}
	}
}

// naiveMerge is a version of Merge that doesn't quite work right.  In
// particular, the goroutines it starts may block forever on m.updates
// if the receiver stops receiving.
type naiveMerge struct {
	subs    []Subscription
	updates chan Item
}

// STARTNAIVEMERGE OMIT
func NaiveMerge(subs ...Subscription) Subscription {
	m := &naiveMerge{
		subs:    subs,
		updates: make(chan Item),
	}
	// STARTNAIVEMERGELOOP OMIT
	for _, sub := range subs {
		go func(s Subscription) {
			for it := range s.Updates() {
				m.updates <- it // HL
			}
		}(sub)
	}
	// STOPNAIVEMERGELOOP OMIT
	return m
}

// STOPNAIVEMERGE OMIT

// STARTNAIVEMERGECLOSE OMIT
func (m *naiveMerge) Close() (err error) {
	for _, sub := range m.subs {
		if e := sub.Close(); err == nil && e != nil {
			err = e
		}
	}
	close(m.updates) // HL
	return
}

// STOPNAIVEMERGECLOSE OMIT

func (m *naiveMerge) Updates() <-chan Item {
	return m.updates
}

type merge struct {
	subs    []Subscription
	updates chan Item
	quit    chan struct{}
	errs    chan error
}

// STARTMERGESIG OMIT
// Merge returns a Subscription that merges the item streams from subs.
// Closing the merged subscription closes subs.
func Merge(subs ...Subscription) Subscription {
	// STOPMERGESIG OMIT
	m := &merge{
		subs:    subs,
		updates: make(chan Item),
		quit:    make(chan struct{}),
		errs:    make(chan error),
	}
	// STARTMERGE OMIT
	for _, sub := range subs {
		go func(s Subscription) {
			for {
				var it Item
				select {
				case it = <-s.Updates():
				case <-m.quit: // HL
					m.errs <- s.Close() // HL
					return              // HL
				}
				select {
				case m.updates <- it:
				case <-m.quit: // HL
					m.errs <- s.Close() // HL
					return              // HL
				}
			}
		}(sub)
	}
	// STOPMERGE OMIT
	return m
}

func (m *merge) Updates() <-chan Item {
	return m.updates
}

// STARTMERGECLOSE OMIT
func (m *merge) Close() (err error) {
	close(m.quit) // HL
	for _ = range m.subs {
		if e := <-m.errs; e != nil { // HL
			err = e
		}
	}
	close(m.updates) // HL
	return
}

// STOPMERGECLOSE OMIT

// NaiveDedupe converts a stream of Items that may contain duplicates
// into one that doesn't.
func NaiveDedupe(in <-chan Item) <-chan Item {
	out := make(chan Item)
	go func() {
		seen := make(map[string]bool)
		for it := range in {
			if !seen[it.GUID] {
				// BUG: this send blocks if the
				// receiver closes the Subscription
				// and stops receiving.
				out <- it // HL
				seen[it.GUID] = true
			}
		}
		close(out)
	}()
	return out
}

type deduper struct {
	s       Subscription
	updates chan Item
	closing chan chan error
}

// Dedupe converts a Subscription that may send duplicate Items into
// one that doesn't.
func Dedupe(s Subscription) Subscription {
	d := &deduper{
		s:       s,
		updates: make(chan Item),
		closing: make(chan chan error),
	}
	go d.loop()
	return d
}

func (d *deduper) loop() {
	in := d.s.Updates() // enable receive
	var pending Item
	var out chan Item // disable send
	seen := make(map[string]bool)
	for {
		select {
		case it := <-in:
			if !seen[it.GUID] {
				pending = it
				in = nil        // disable receive
				out = d.updates // enable send
				seen[it.GUID] = true
			}
		case out <- pending:
			in = d.s.Updates() // enable receive
			out = nil          // disable send
		case errc := <-d.closing:
			err := d.s.Close()
			errc <- err
			close(d.updates)
			return
		}
	}
}

func (d *deduper) Close() error {
	errc := make(chan error)
	d.closing <- errc
	return <-errc
}

func (d *deduper) Updates() <-chan Item {
	return d.updates
}

// Fetch returns a Fetcher for Items from domain.
func Fetch(domain string) Fetcher {
	return fakeFetch(domain)
}

func fakeFetch(domain string) Fetcher {
	return &fakeFetcher{channel: domain}
}

type fakeFetcher struct {
	channel string
	items   []Item
}

// FakeDuplicates causes the fake fetcher to return duplicate items.
var FakeDuplicates bool

func (f *fakeFetcher) Fetch() (items []Item, next time.Time, err error) {
	now := time.Now()
	next = now.Add(time.Duration(rand.Intn(5)) * 500 * time.Millisecond)
	item := Item{
		Channel: f.channel,
		Title:   fmt.Sprintf("Item %d", len(f.items)),
	}
	item.GUID = item.Channel + "/" + item.Title
	f.items = append(f.items, item)
	if FakeDuplicates {
		items = f.items
	} else {
		items = []Item{item}
	}
	return
}

func NaiveSubscribe(fetcher Fetcher) Subscription {
	s := &naiveSub{
		fetcher: fetcher,
		updates: make(chan Item),
	}
	go s.loop()
	return s
}

type naiveSub struct {
	fetcher Fetcher
	updates chan Item
	closed  bool
	err     error
}

func (s *naiveSub) Updates() <-chan Item {
	return s.updates
}

func (s *naiveSub) loop() {
    for {
        if s.closed {
            close(s.updates)
            return
        }
        items, next, err := s.fetcher.Fetch()
        if err != nil {
            s.err = err
			time.Sleep(10 * time.Second) // HLsleep
			continue
		}
		for _, item := range items {
			s.updates <- item // HLsend
		}
		if now := time.Now(); next.After(now) {
			time.Sleep(next.Sub(now)) // HLsleep
		}
	}
	// STOPNAIVE OMIT
}

func (s *naiveSub) Close() error {
	s.closed = true // HLsync
	return s.err    // HLsync
}

func init() {
	rand.Seed(time.Now().UnixNano())
}

func main() {
	// Subscribe to some feeds, and create a merged update stream.
	merged := Merge(
		NaiveSubscribe(Fetch("blog.golang.org")),
		NaiveSubscribe(Fetch("googleblog.blogspot.com")),
		NaiveSubscribe(Fetch("googledevelopers.blogspot.com")))

	// Close the subscriptions after some time.
	time.AfterFunc(3*time.Second, func() {
		fmt.Println("closed:", merged.Close())
	})

	// Print the stream.
	for it := range merged.Updates() {
		fmt.Println(it.Channel, it.Title)
	}

	// The loops are still running.  Let the race detector notice.
	time.Sleep(1 * time.Second)

	panic("show me the stacks")
}
func (s *naiveSub) Close() error {
    s.closed = true
    return s.err
}
20

Bug 2: time.Sleep may keep loop running

    for {
        if s.closed {
            close(s.updates)
            return
        }
        items, next, err := s.fetcher.Fetch()
        if err != nil {
            s.err = err                 
            time.Sleep(10 * time.Second)
            continue
        }
        for _, item := range items {
            s.updates <- item
        }
        if now := time.Now(); next.After(now) {
            time.Sleep(next.Sub(now))
        }
    }
21

Bug 3: loop may block forever on s.updates

    for {
        if s.closed {
            close(s.updates)
            return
        }
        items, next, err := s.fetcher.Fetch()
        if err != nil {
            s.err = err                 
            time.Sleep(10 * time.Second)
            continue
        }
        for _, item := range items {
            s.updates <- item
        }
        if now := time.Now(); next.After(now) {
            time.Sleep(next.Sub(now))
        }
    }
22

Solution

Change the body of loop to a select with three cases:

23

Structure: for-select loop

loop runs in its own goroutine.

select lets loop avoid blocking indefinitely in any one state.

func (s *sub) loop() {
    ... declare mutable state ...
    for {
        ... set up channels for cases ...
        select {
        case <-c1:
            ... read/write state ...
        case c2 <- x:
            ... read/write state ...
        case y := <-c3:
            ... read/write state ...
        }
    }
}

The cases interact via local state in loop.

24

Case 1: Close

Close communicates with loop via s.closing.

type sub struct {
    closing chan chan error
}

The service (loop) listens for requests on its channel (s.closing).

The client (Close) sends a request on s.closing: exit and reply with the error

In this case, the only thing in the request is the reply channel.

25

Case 1: Close

Close asks loop to exit and waits for a response.

func (s *sub) Close() error {
    errc := make(chan error)
    s.closing <- errc
    return <-errc
}

loop handles Close by replying with the Fetch error and exiting.

    var err error // set when Fetch fails
    for {
        select {
        case errc := <-s.closing:
            errc <- err
            close(s.updates) // tells receiver we're done
            return
        }
    }
26

Case 2: Fetch

Schedule the next Fetch after some delay.

    var pending []Item // appended by fetch; consumed by send
    var next time.Time // initially January 1, year 0
    var err error
    for {
        var fetchDelay time.Duration // initially 0 (no delay)
        if now := time.Now(); next.After(now) {
            fetchDelay = next.Sub(now)
        }
        startFetch := time.After(fetchDelay)

        select {
        case <-startFetch:
            var fetched []Item
            fetched, next, err = s.fetcher.Fetch()
            if err != nil {
                next = time.Now().Add(10 * time.Second)
                break
            }
            pending = append(pending, fetched...)
        }
    }
27

Case 3: Send

Send the fetched items, one at a time.

var pending []Item // appended by fetch; consumed by send
for {
    select {
    case s.updates <- pending[0]:
        pending = pending[1:]
    }
}

Whoops. This crashes.

28

Select and nil channels

Sends and receives on nil channels block.

Select never selects a blocking case.

// +build ignore,OMIT

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func init() {
	rand.Seed(time.Now().UnixNano())
}

func main() {
    a, b := make(chan string), make(chan string)
    go func() { a <- "a" }()
    go func() { b <- "b" }()
    if rand.Intn(2) == 0 {
        a = nil
        fmt.Println("nil a")
    } else {
        b = nil
        fmt.Println("nil b")
    }
    select {
    case s := <-a:
        fmt.Println("got", s)
    case s := <-b:
        fmt.Println("got", s)
    }
}
29

Case 3: Send (fixed)

Enable send only when pending is non-empty.

    var pending []Item // appended by fetch; consumed by send
    for {
        var first Item
        var updates chan Item
        if len(pending) > 0 {
            first = pending[0]
            updates = s.updates // enable send case
        }

        select {
        case updates <- first:
            pending = pending[1:]
        }
    }
30

Select

Put the three cases together:

        select {
        case errc := <-s.closing:
            errc <- err
            close(s.updates)
            return
        case <-startFetch:
            var fetched []Item
            fetched, next, err = s.fetcher.Fetch()
            if err != nil {
                next = time.Now().Add(10 * time.Second)
                break
            }
            pending = append(pending, fetched...)
        case updates <- first:
            pending = pending[1:]
        }

The cases interact via err, next, and pending.

No locks, no condition variables, no callbacks.

31

Bugs fixed

        select {
        case errc := <-s.closing:
            errc <- err
            close(s.updates)
            return
        case <-startFetch:
            var fetched []Item
            fetched, next, err = s.fetcher.Fetch()
            if err != nil {
                next = time.Now().Add(10 * time.Second)
                break
            }
            pending = append(pending, fetched...)
        case updates <- first:
            pending = pending[1:]
        }
32

We can improve loop further

33

Issue: Fetch may return duplicates

    var pending []Item
    var next time.Time
    var err error
        case <-startFetch:
            var fetched []Item
            fetched, next, err = s.fetcher.Fetch()
            if err != nil {
                next = time.Now().Add(10 * time.Second)
                break
            }
            pending = append(pending, fetched...)
34

Fix: Filter items before adding to pending

    var pending []Item
    var next time.Time
    var err error
    var seen = make(map[string]bool) // set of item.GUIDs
        case <-startFetch:
            var fetched []Item
            fetched, next, err = s.fetcher.Fetch()
            if err != nil {
                next = time.Now().Add(10 * time.Second)
                break
            }
            for _, item := range fetched {
                if !seen[item.GUID] {
                    pending = append(pending, item)
                    seen[item.GUID] = true
                }
            }
35

Issue: Pending queue grows without bound

        case <-startFetch:
            var fetched []Item
            fetched, next, err = s.fetcher.Fetch()
            if err != nil {
                next = time.Now().Add(10 * time.Second)
                break
            }
            for _, item := range fetched {
                if !seen[item.GUID] {
                    pending = append(pending, item)
                    seen[item.GUID] = true
                }
            }
36

Fix: Disable fetch case when too much pending

const maxPending = 10
        var fetchDelay time.Duration
        if now := time.Now(); next.After(now) {
            fetchDelay = next.Sub(now)
        }
        var startFetch <-chan time.Time
        if len(pending) < maxPending {
            startFetch = time.After(fetchDelay) // enable fetch case
        }

Could instead drop older items from the head of pending.

37

Issue: Loop blocks on Fetch

        case <-startFetch:
            var fetched []Item
            fetched, next, err = s.fetcher.Fetch()
            if err != nil {
                next = time.Now().Add(10 * time.Second)
                break
            }
            for _, item := range fetched {
                if !seen[item.GUID] {
                    pending = append(pending, item)
                    seen[item.GUID] = true         
                }
            }
38

Fix: Run Fetch asynchronously

Add a new select case for fetchDone.

type fetchResult struct{ fetched []Item; next time.Time; err error }
    var fetchDone chan fetchResult // if non-nil, Fetch is running
        var startFetch <-chan time.Time
        if fetchDone == nil && len(pending) < maxPending {
            startFetch = time.After(fetchDelay) // enable fetch case
        }
        select {
        case <-startFetch:
            fetchDone = make(chan fetchResult, 1)
            go func() {
                fetched, next, err := s.fetcher.Fetch()
                fetchDone <- fetchResult{fetched, next, err}
            }()
        case result := <-fetchDone:
            fetchDone = nil
            // Use result.fetched, result.next, result.err
39

Implemented Subscribe

Responsive. Cleans up. Easy to read and change.

Three techniques:

More details online, including Merge.

40

Conclusion

Concurrent programming can be tricky.

Go makes it easier:

41

Links

Go Concurrency Patterns (2012)

Concurrency is not parallelism

Share memory by communicating

Go Tour (learn Go in your browser)

42

Thank you

Use the left and right arrow keys or click the left and right edges of the page to navigate between slides.
(Press 'H' or navigate to hide this message.)