はじめに
以前にgoroutineとchannelとContextという記事を書きましたが、今回は channel を使用しないで並行処理の同期や排他制御を簡単に行える、 sync パッケージについて学習したいと思います。
sync.WaitGroup
sync.WaitGroup は複数の Goroutine の完了を待つことが出来ます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | package main import ( "fmt" "sync" "time" ) func main() { var wg sync.WaitGroup for i := 0; i < 100; i++ { wg.Add(1) go func(i int) { time.Sleep(2 * time.Second) fmt.Println("End:", i) wg.Done() }(i) } wg.Wait() } |
sync.WaitGroup は複数の Goroutine の完了を待つことが出来ます。
sync.WaitGroup は基本的にはただのカウンタですが、カウンタがゼロになるまで処理を待つことができます。上記のコードだと Add(1) でカウンタをインクリメントし、 Done() でデクリメントしています。そして Wait() はカウンタがゼロになるまで待ちます。
sync.Mutex
channel 以外の方法で変数に対して異なる goroutine がアクセスしても、競合によるエラーが起きないようにするための機能です。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | package main import ( "fmt" "runtime" "sync" "time" ) func parallel(wg *sync.WaitGroup, m *sync.Mutex) { m.Lock() defer m.Unlock() fmt.Println("あ") time.Sleep(100 * time.Millisecond) fmt.Println("い") time.Sleep(100 * time.Millisecond) fmt.Println("う") time.Sleep(100 * time.Millisecond) fmt.Println("え") time.Sleep(100 * time.Millisecond) fmt.Println("お") wg.Done() } func main() { runtime.GOMAXPROCS(runtime.NumCPU()) wg := new(sync.WaitGroup) m := new(sync.Mutex) for i := 0; i < 3; i++ { wg.Add(1) go parallel(wg, m) } wg.Wait() } |
sync.Mutex の Lock 関数と Unlock 関数で排他ロックの取得と解除ができます。排他ロックなので、排他ロックを得た goroutine が存在する場合は、ロックを得ようとする他の goroutine は処理を待ち、以下のような結果になります。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | あ い う え お あ い う え お あ い う え お |
sync.RWMutex
sync.RWMutex の Rlock 関数と Runlock 関数で共有ロックの取得と解除ができます。どういうことなのかというと、 Rlock 関数でロックを行なった goroutine 同士であれば処理を進めることができるようになります。また、共有ロックを得た goroutine が存在する場合は、排他ロックを取得しようとする goroutine は待ちます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 | package main import ( "fmt" "math/rand" "strings" "sync" "time" ) func init() { rand.Seed(time.Now().Unix()) } func sleep() { time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) } func reader(c chan int, m *sync.RWMutex, wg *sync.WaitGroup) { sleep() m.RLock() c <- 1 sleep() c <- -1 m.RUnlock() wg.Done() } func writer(c chan int, m *sync.RWMutex, wg *sync.WaitGroup) { sleep() m.Lock() c <- 1 sleep() c <- -1 m.Unlock() wg.Done() } func main() { var m sync.RWMutex var rs, ws int rsCh := make(chan int) wsCh := make(chan int) go func() { for { select { case n := <-rsCh: rs += n case n := <-wsCh: ws += n } fmt.Printf("%s%s\n", strings.Repeat("R", rs), strings.Repeat("W", ws)) } }() wg := sync.WaitGroup{} for i := 0; i < 3; i++ { wg.Add(1) go reader(rsCh, &m, &wg) } for i := 0; i < 3; i++ { wg.Add(1) go writer(wsCh, &m, &wg) } wg.Wait() } |
実行すると以下のようになります。
1 2 3 4 5 6 7 8 9 10 11 | W R RR RRR RR R W W |
sync.Map
Go の map は goroutine で Read しているときに別の goroutine から Write してはいけないというルールがあります。これを行なってしまうと panic が起きてしまいます。例えば以下の処理を実行すると異常終了を起こします。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | package main import ( "fmt" "math/rand" ) func main() { m := map[int]struct{}{} go func() { for { m[rand.Intn(100000)] = struct{}{} } }() for { fmt.Println(m[rand.Intn(100000)]) } } |
こちらに対応するには、上記の sync.Mutex を使うことで対応可能ですが、 sync.Map を使用することで、ロックの制御を気にせずに map を使用することができます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | package main import ( "fmt" "math/rand" "sync" ) func main() { var m sync.Map go func() { for { m.Store(rand.Intn(100000), struct{}{}) } }() for { fmt.Println(m.Load(rand.Intn(100000))) } } |
sync.Once
sync.Once は関数を一度だけ実行するようにできます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | package main import ( "fmt" "runtime" "sync" ) var once = new(sync.Once) func greeting(wg *sync.WaitGroup) { once.Do(func() { fmt.Println("一度だけ") }) fmt.Println("連続") wg.Done() } func main() { runtime.GOMAXPROCS(runtime.NumCPU()) defer fmt.Println("最後") wg := new(sync.WaitGroup) for i := 0; i < 5; i++ { wg.Add(1) go greeting(wg) } wg.Wait() } |
実行すると以下のようになります。
1 2 3 4 5 6 7 | 一度だけ 連続 連続 連続 連続 連続 最後 |
sync.Pool
sync.Pool はスレッドセーフなメモリプールです。使い方は、 sync.Pool 構造体を生成する時の New フィールドにオブジェクト生成の関数を指定します。必要時に New の関数が呼ばれます。あるスレッドで、 Get メソッドを呼び、もしプールに無かったら New 関数でオブジェクトを生成します。そして Get でオブジェクトを取得します。もしプールにオブジェクトが在れば、そのオブジェクトを返します。使い終わったら Put メソッドでプールに戻します。
これの繰り返しでオブジェクトをリサイクルすることが出来ます。また、スレッドセーフですので、スレッド間でメモリプールする事ができます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 | package main import ( "fmt" "sync" "time" ) type Hoge struct { name string value int } func main() { pool := sync.Pool{ New: func() interface{} { return &Hoge{ name: "hoge", value: 0, } }, } wg := &sync.WaitGroup{} poolFunc := func() { hoge := pool.Get().(*Hoge) if hoge.value == 0 { fmt.Println("New") } else { fmt.Println("Cache") } hoge.value += 1 pool.Put(hoge) time.Sleep(1 * time.Microsecond) wg.Done() } for i := 0; i < 10; i++ { wg.Add(1) go poolFunc() } for i := 0; i < 10; i++ { wg.Add(1) go poolFunc() } wg.Wait() fmt.Println("End") } |
実行すると以下のようになります。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | New New New Cache New New New Cache New New New New New New New New New New New Cache End |
さいごに
sync パッケージの紹介でしたがいかがでしょうか。一部機能にしか触れられておらず、これからも勉強していこうと思います。