はじめに
最近、Goを本格的に勉強しており、並行処理として使用機会も多く自分なりに理解が追いついていない goroutine と channel について勉強していきたいと思います。
並行処理と並列処理
goroutine と channel はgo言語の並行実行機能になります。並行処理と並列処理の違いは以下になります。
- 並行(concurrent)処理とは、複数のタスクを非同期で同時に実行すること。
- 並列(parallel)処理とは、複数のCPUを使って複数のタスクを同時に実行すること。
相互補完的な概念ですが、同じものではありません。並行性は複数のタスクが同じ時間帯に開始され、処理を実行し、終了することをさし、タスク間の相互作用が可能です。
並列性の場合は、単に複数のタスクが同時に実行されているだけになります。
goroutine
goroutine とは並行処理を実現するための軽量なスレッドのようなのもので、関数やメソッドの前に go をつけると goroutine として実行されます。
1 2 3 4 5 | func main() { go func() { fmt.Println("goroutine") }() } |
例えば、上記のコードを実行すると何も出力されません。これは goroutine の実行より先にプログラムが終了するからです。
1 2 3 4 5 6 | func main() { go func() { fmt.Println("goroutine") }() time.Sleep(1 * time.Second) } |
“goroutine” と表示されるはずです。 main 関数中で1秒スリープしているため、 goroutine がプログラムの終了より先に実行されます。
goroutine 実行すると、内部ではそれをスケジューリングし実行します。複数の goroutine を実行すると、それらはキューに積まれて処理されていきます。そのため、最初の例のように、処理が goroutine に切り替わる(スイッチング)より先に main 関数が終了する場合があります。 goroutine の終了を待つのに、 sleep で goroutine の終了を待つ、 sync.WaitGroup で同期をする等、処理の流れを制御する必要があります。
ただし、 goroutine だけでは並行処理のタスク間の相互作用ができません、そこで使用するのが channel になります。
channel
goroutine 間でデータを送受信するためのもので、 channel のサイズ ( buffer )で、1度に扱えるデータのサイズを指定できます。
channel は、 goroutine 間でデータの通信を同期して処理するための配列のようなものです。容量なしの channel は unbuffered channel と呼ばれ、容量ありの channel は、 buffered channel と呼ばれます。
1 2 3 4 | // unbuffered channel messages := make(chan string) // buffered channel messages := make(chan string, 2) |
channelからデータ受信
1 2 3 4 5 6 7 8 9 | func sender(ch chan string) { ch <- "data" // channelにデータを送信する } func main() { ch := make(chan string) // string型のデータを扱うchannel go sender(ch) // goroutineにchannelを引数として渡す fmt.Println(<-ch) // chからデータを受信する } |
このプログラムを実行すると、”data” と表示されます。前章までの流れだと、 goroutine より先に main 関数が終了するように思えますが、 channel を用いることで、 channel にデータを受信するまで処理が止まります。これはブロックと呼ばれ、 channel はデータを受信するまで処理をブロックする性質があります。そのため、この例では goroutine を実行してから main 関数が終了します。
for-range でのデータ受信
受信するデータの数がわからないときは、このように range 関数と channel を用いて、 goroutine を制御することもできます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | func goroutine(ch chan int) { for i := 0; i < 10; i++ { ch <- i } close(ch) //使い終わったchannelはcloseする。 } func main() { ch := make(chan int) go goroutine(ch) for v := range ch { // nilなchannelから読み込むとerrorになるのでclose()が必要 fmt.Println(v) } } |
複数のchannelを受信
上記の例では channel のデータの数を制御しましたが、複数の channel を受信したい場合もあると思います。そんなときは、 select – case を用いることで複数の channel の受信を制御することができます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | func goroutine1(ch chan string) { for { ch <- "packet from 1" time.Sleep(time.Second) } } func goroutine2(ch chan string) { for { time.Sleep(3 * time.Second) ch <- "packet from 2" } } func main() { ch1 := make(chan string) ch2 := make(chan string) go goroutine1(ch1) go goroutine2(ch2) for { select { case msg1 := <-ch1: fmt.Println(msg1) case msg2 := <-ch2: fmt.Println(msg2) } } } |
buffered channel
channel は、 make 時に buffer (容量)を指定することができます。
1 2 3 4 5 6 7 8 9 | func main() { messages := make(chan string, 2) // size : 2 messages <- "buffered" messages <- "channel" fmt.Println(<-messages) fmt.Println(<-messages) } |
unbuffer なchannel には、続けてデータを送信することはできません。 channel に空きがないときにデータを送信しようとするとブロックになってしまうからです。 size を指定した buffered channel は、その数だけデータをキューイングすることができます。上記のプログラムのように、データを2つバッファされ、続けてデータが受信されます。上記のプログラムを実行すると、 “buffered” 、 “channel” が順に出力されます。
例えば、 buffer がいっぱいになったとき、 buffered channel は、 unbuffered channel と良く似た動作をします。つまり、 channel に空きができるまでブロックが起こります。
下記の処理を実行すると、
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | func thrower(c chan int) { for i := 0; i < 5; i++ { c <- i fmt.Println("Threw >>",i) } } func catcher (c chan int) { for i := 0; i < 5; i++ { num := <-c fmt.Println("Caught >>",num) } } func main() { c := make(chan int, 3) go thrower(c) go catcher(c) time.Sleep(100 * time.Millisecond) } |
以下のように出力されます。
1 2 3 4 5 6 7 8 9 10 | Threw >> 0 Threw >> 1 Threw >> 2 Threw >> 3 Caught >> 0 Caught >> 1 Caught >> 2 Threw >> 4 Caught >> 3 Caught >> 4 |
ブロックが発生するまでに3つの数でいっぱいになっていることがわかると思います。
Context
並行処理のプログラムではタイムアウトやキャンセル、あるいはシステムの別の箇所での失敗により、しばしば中断する必要があります。
単純なキャンセルの通知に付随して追加の情報も伝達できると便利です。例えば、キャンセルが発生した理由や、関数の処理を終わらせるべきデッドラインがあるか、などです。
contextパッケージはそういった背景をもとに、標準ライブラリに追加され、これを並行なコードを扱う際に考慮すべき標準なGoイディオムとしました。
contextパッケージには2つの主な目的があります。
- コールグラフの各枝をキャンセルするAPIを提供する。
- コールグラフを通じてリクエストに関するデータを渡すデータの置き場所を提供する。
今回は前者の、キャンセルAPIの提供について触れていきたいと思います。
基本的な使い方
goroutine を呼び出す元の方でオブジェクトを生成します。 context.Background() は空のコンテキストを生成します。
1 | ctx := context.Background() |
これを goroutine に引き渡し伝搬させていきます。
1 | go hoge(ctx) |
WithCancel
手動でキャンセルする場合はWithCancelを使います。
1 2 3 | ctx, cancel := context.WithCancel(ctx) // 処理中略 cancel() // 即キャンセル |
WithTimeout
タイムアウトでキャンセルする場合はWithTimeoutをつかいます。
1 2 3 | // 1秒後にキャンセル ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() |
WithDeadline
WithDeadline は指定した時刻を超えると、処理がキャンセルされます。
1 2 3 | // 現在の時刻から3秒先を deadline として登録する ctx, cancel := context.WithTimeout(ctx, time.Now().Add(3 * time.Second)) defer cancel() |
Done
キャンセルされたかの判断はselectでコンテキストのDoneを待ちます。
1 2 3 4 | select { case <-ctx.Done(): fmt.Println(ctx.Err()) // context deadline exceeded } |
タイムアウトのサンプル
上記を踏まえたタイムアウトのサンプルになります。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | package main import ( "context" "fmt" "time" ) func infiniteLoop(ctx context.Context) { innerCtx, cancel := context.WithCancel(ctx) defer cancel() for { fmt.Println("Waiting for time out") select { case <-innerCtx.Done(): fmt.Println("Exit now!") return default: } } } func main() { // コンテキストの生成 ctx := context.Background() // 5秒後にキャンセル ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() go infiniteLoop(ctx) select { case <-ctx.Done(): fmt.Println(ctx.Err()) // context deadline exceeded } } |
実行結果
1 2 3 4 5 6 7 8 9 | Waiting for time out Waiting for time out Waiting for time out . . . Waiting for time out Exit now! context deadline exceeded |
さいごに
Go言語を使用するにあたって並行処理は必須かと思いますので、応用も含めて勉強していきたいと思います。 sync.WaitGroup や sync.Mutex は次回のブログで解説できればと思います。