Overview
One of the unique features of Go is the use of channels to communicate safely between goroutines. In this article, you'll learn what channels are, how to use them effectively, and some common patterns.
What Is a Channel?
A channel is a synchronized in-memory queue that goroutines and regular functions can use to send and receive typed values. Communication is serialized through the channel.
You create a channel using make()
and specify the type of values the channel accepts:
ch := make(chan int)
Go provides a nice arrow syntax for sending and receiving to/from channels:
// send value to a channel ch <- 5 // receive value from a channel x := <- ch
You don't have to consume the value. It's OK just to pop up a value from a channel:
<-ch
Channels are blocking by default. If you send a value to a channel, you'll block until someone receives it. Similarly, if you receive from a channel, you'll block until someone sends a value to the channel.
The following program demonstrates this. The main()
function makes a channel and starts a go routine called that prints "start", reads a value from the channel, and prints too. Then main()
starts another goroutine that just prints a dash ("-") every second. Then, it sleeps for 2.5 seconds, sends a value to the channel and sleeps 3 more seconds to let all goroutines finish.
import ( "fmt" "time" ) func main() { ch := make(chan int) // Start a goroutine that reads a value from a channel and prints it go func(ch chan int) { fmt.Println("start") fmt.Println(<-ch) }(ch) // Start a goroutine that prints a dash every second go func() { for i := 0; i < 5; i++ { time.Sleep(time.Second) fmt.Println("-") } }() // Sleep for two seconds time.Sleep(2500 * time.Millisecond) // Send a value to the channel ch <- 5 // Sleep three more seconds to let all goroutines finish time.Sleep(3 * time.Second) }
This program demonstrates very well the blocking nature of the channel. The first goroutine prints "start" right away, but then is blocked on trying to receive from the channel until the main()
function, which sleeps for 2.5 seconds and sends the value. The other goroutine just provides a visual indication of the flow of time by printing a dash regularly every second.
Here is the output:
start - - 5 - - -
Buffered Channels
This behavior tightly couples senders to receivers and sometimes is not what you want. Go provides several mechanisms to address that.
Buffered channels are channels that can hold a certain (predefined) number of values so that senders don't block until the buffer is full, even if no one is receiving.
To create a buffered channel, just add a capacity as a second argument:
ch := make(chan int, 5)
The following program illustrates the behavior of buffered channels. The main()
program defines a buffered channel with a capacity of 3. Then it starts one goroutine that reads a buffer from the channel every second and prints, and another goroutine that just prints a dash every second to give a visual indication of the progress of time. Then, it sends five values to the channel.
import ( "fmt" "time" ) func main() { ch := make(chan int, 3) // Start a goroutine that reads a value from the channel every second and prints it go func(ch chan int) { for { time.Sleep(time.Second) fmt.Printf("Goroutine received: %d\n", <-ch) } }(ch) // Start a goroutine that prints a dash every second go func() { for i := 0; i < 5; i++ { time.Sleep(time.Second) fmt.Println("-") } }() // Push values to the channel as fast as possible for i := 0; i < 5; i++ { ch <- i fmt.Printf("main() pushed: %d\n", i) } // Sleep five more seconds to let all goroutines finish time.Sleep(5 * time.Second) }
What happens at runtime? The first three values are buffered by the channel immediately, and the main()
function blocks. After a second, a value is received by the goroutine, and the main()
function can push another value. Another second goes by, the goroutine receives another value, and the main()
function can push the last value. At this point, the goroutine keeps receiving values from the channel every second.
Here is the output:
main() pushed: 0 main() pushed: 1 main() pushed: 2 - Goroutine received: 0 main() pushed: 3 - Goroutine received: 1 main() pushed: 4 - Goroutine received: 2 - Goroutine received: 3 - Goroutine received: 4
Select
Buffered channels (as long as the buffer is big enough) can address the issue of temporary fluctuations where there aren't enough receivers to process all the sent messages. But there is also the opposite problem of blocked receivers waiting for messages to process. Go has got you covered.
What if you want your goroutine to do something else when there are no messages to process in a channel? A good example is if your receiver is waiting for messages from multiple channels. You don't want to block on channel A if channel B has messages right now. The following program attempts to compute the sum of 3 and 5 using the full power of the machine.
The idea is to simulate a complex operation (e.g. a remote query to a distributed DB) with redundancy. The sum()
function (note how it's defined as nested function inside main()
) accepts two int parameters and returns an int channel. An internal anonymous goroutine sleeps some random time up to one second and then writes the sum to the channel, closes it, and returns it.
Now, main calls sum(3, 5)
four times and stores the resulting channels in variables ch1 to ch4. The four calls to sum()
return immediately because the random sleeping happens inside the goroutine that each sum()
function invokes.
Here comes the cool part. The select
statement lets the main()
function wait on all channels and respond to the first one that returns. The select
statement operates a little like the switch
statement.
func main() { r := rand.New(rand.NewSource(time.Now().UnixNano())) sum := func(a int, b int) <-chan int { ch := make(chan int) go func() { // Random time up to one second delay := time.Duration(r.Int()%1000) * time.Millisecond time.Sleep(delay) ch <- a + b close(ch) }() return ch } // Call sum 4 times with the same parameters ch1 := sum(3, 5) ch2 := sum(3, 5) ch3 := sum(3, 5) ch4 := sum(3, 5) // wait for the first goroutine to write to its channel select { case result := <-ch1: fmt.Printf("ch1: 3 + 5 = %d", result) case result := <-ch2: fmt.Printf("ch2: 3 + 5 = %d", result) case result := <-ch3: fmt.Printf("ch3: 3 + 5 = %d", result) case result := <-ch4: fmt.Printf("ch4: 3 + 5 = %d", result) } }
Sometimes you don't want the main()
function to block waiting even for the first goroutine to finish. In this case, you can add a default case that will execute if all channels are blocked.
A Web Crawler Example
In my previous article, I showed a solution to the web crawler exercise from the Tour of Go. I've used goroutines and a synchronized map. I also solved the exercise using channels. The complete source code for both solutions is available on GitHub.
Let's look at the relevant parts. First, here is a struct that will be sent to a channel whenever a goroutine parses a page. It contains the current depth and all URLs found on the page.
type links struct { urls []string depth int }
The fetchURL()
function accepts a URL, a depth, and an output channel. It uses the fetcher (provided by the exercise) to get the URLs of all the links on the page. It sends the list of URLs as a single message to the candidate's channel as a links
struct with a decremented depth. The depth represents how much further should we crawl. When depth reaches 0, no further processing should take place.
func fetchURL(url string, depth int, candidates chan links) { body, urls, err := fetcher.Fetch(url) fmt.Printf("found: %s %q\n", url, body) if err != nil { fmt.Println(err) } candidates <- links{urls, depth - 1} }
The ChannelCrawl()
function coordinates everything. It keeps track of all the URLs that were already fetched in a map. There is no need to synchronize access because no other function or goroutine is touching. It also defines a candidate channel that all the goroutines will write their results to.
Then, it starts invoking parseUrl
as goroutines for each new URL. The logic keeps track of how many goroutines were launched by managing a counter. Whenever a value is read from the channel, the counter is decremented (because the sending goroutine exits after sending), and whenever a new goroutine is launched, the counter is incremented. If the depth gets to zero then no new goroutines will be launched, and the main function will keep reading from the channel until all goroutines are done.
// ChannelCrawl crawls links from a seed url func ChannelCrawl(url string, depth int, fetcher Fetcher) { candidates := make(chan links, 0) fetched := make(map[string]bool) counter := 1 // Fetch initial url to seed the candidates channel go fetchURL(url, depth, candidates) for counter > 0 { candidateLinks := <-candidates counter-- depth = candidateLinks.depth for _, candidate := range candidateLinks.urls { // Already fetched. Continue... if fetched[candidate] { continue } // Add to fetched mapped fetched[candidate] = true if depth > 0 { counter++ go fetchURL(candidate, depth, candidates) } } }
Conclusion
Go's channels provide a lot of options for safe communication between goroutines. The syntax support is both concise and illustrative. It's a real boon for expressing concurrent algorithms. There is much more to channels than I presented here. I encourage you to dive in and get familiar with the various concurrency patterns they enable.
No comments:
Post a Comment