はじめに
以前にgoroutineとchannelとContextという記事を書きましたが、今回は channel を使用しないで並行処理の同期や排他制御を簡単に行える、 sync パッケージについて学習したいと思います。
sync.WaitGroup
sync.WaitGroup は複数の Goroutine の完了を待つことが出来ます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | packagemain import( "fmt" "sync" "time" ) funcmain(){ varwg sync.WaitGroup fori:=0;i<100;i++{ wg.Add(1) gofunc(iint){ time.Sleep(2*time.Second) fmt.Println("End:",i) wg.Done() }(i) } wg.Wait() } |
sync.WaitGroup は複数の Goroutine の完了を待つことが出来ます。
sync.WaitGroup は基本的にはただのカウンタですが、カウンタがゼロになるまで処理を待つことができます。上記のコードだと Add(1) でカウンタをインクリメントし、 Done() でデクリメントしています。そして Wait() はカウンタがゼロになるまで待ちます。
sync.Mutex
channel 以外の方法で変数に対して異なる goroutine がアクセスしても、競合によるエラーが起きないようにするための機能です。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | packagemain import( "fmt" "runtime" "sync" "time" ) funcparallel(wg *sync.WaitGroup,m *sync.Mutex){ m.Lock() deferm.Unlock() fmt.Println("あ") time.Sleep(100*time.Millisecond) fmt.Println("い") time.Sleep(100*time.Millisecond) fmt.Println("う") time.Sleep(100*time.Millisecond) fmt.Println("え") time.Sleep(100*time.Millisecond) fmt.Println("お") wg.Done() } funcmain(){ runtime.GOMAXPROCS(runtime.NumCPU()) wg:=new(sync.WaitGroup) m:=new(sync.Mutex) fori:=0;i<3;i++{ wg.Add(1) goparallel(wg,m) } wg.Wait() } |
sync.Mutex の Lock 関数と Unlock 関数で排他ロックの取得と解除ができます。排他ロックなので、排他ロックを得た goroutine が存在する場合は、ロックを得ようとする他の goroutine は処理を待ち、以下のような結果になります。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | あ い う え お あ い う え お あ い う え お |
sync.RWMutex
sync.RWMutex の Rlock 関数と Runlock 関数で共有ロックの取得と解除ができます。どういうことなのかというと、 Rlock 関数でロックを行なった goroutine 同士であれば処理を進めることができるようになります。また、共有ロックを得た goroutine が存在する場合は、排他ロックを取得しようとする goroutine は待ちます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 | packagemain import( "fmt" "math/rand" "strings" "sync" "time" ) funcinit(){ rand.Seed(time.Now().Unix()) } funcsleep(){ time.Sleep(time.Duration(rand.Intn(1000))*time.Millisecond) } funcreader(cchanint,m *sync.RWMutex,wg *sync.WaitGroup){ sleep() m.RLock() c<-1 sleep() c<--1 m.RUnlock() wg.Done() } funcwriter(cchanint,m *sync.RWMutex,wg *sync.WaitGroup){ sleep() m.Lock() c<-1 sleep() c<--1 m.Unlock() wg.Done() } funcmain(){ varmsync.RWMutex varrs,ws int rsCh:=make(chanint) wsCh:=make(chanint) gofunc(){ for{ select{ casen:=<-rsCh: rs+=n casen:=<-wsCh: ws+=n } fmt.Printf("%s%s\n",strings.Repeat("R",rs), strings.Repeat("W",ws)) } }() wg:=sync.WaitGroup{} fori:=0;i<3;i++{ wg.Add(1) goreader(rsCh,&m,&wg) } fori:=0;i<3;i++{ wg.Add(1) gowriter(wsCh,&m,&wg) } wg.Wait() } |
実行すると以下のようになります。
1 2 3 4 5 6 7 8 9 10 11 | W R RR RRR RR R W W |
sync.Map
Go の map は goroutine で Read しているときに別の goroutine から Write してはいけないというルールがあります。これを行なってしまうと panic が起きてしまいます。例えば以下の処理を実行すると異常終了を起こします。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | packagemain import( "fmt" "math/rand" ) funcmain(){ m:=map[int]struct{}{} gofunc(){ for{ m[rand.Intn(100000)]=struct{}{} } }() for{ fmt.Println(m[rand.Intn(100000)]) } } |
こちらに対応するには、上記の sync.Mutex を使うことで対応可能ですが、 sync.Map を使用することで、ロックの制御を気にせずに map を使用することができます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | packagemain import( "fmt" "math/rand" "sync" ) funcmain(){ varmsync.Map gofunc(){ for{ m.Store(rand.Intn(100000),struct{}{}) } }() for{ fmt.Println(m.Load(rand.Intn(100000))) } } |
sync.Once
sync.Once は関数を一度だけ実行するようにできます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | packagemain import( "fmt" "runtime" "sync" ) varonce=new(sync.Once) funcgreeting(wg *sync.WaitGroup){ once.Do(func(){ fmt.Println("一度だけ") }) fmt.Println("連続") wg.Done() } funcmain(){ runtime.GOMAXPROCS(runtime.NumCPU()) deferfmt.Println("最後") wg:=new(sync.WaitGroup) fori:=0;i<5;i++{ wg.Add(1) gogreeting(wg) } wg.Wait() } |
実行すると以下のようになります。
1 2 3 4 5 6 7 | 一度だけ 連続 連続 連続 連続 連続 最後 |
sync.Pool
sync.Pool はスレッドセーフなメモリプールです。使い方は、 sync.Pool 構造体を生成する時の New フィールドにオブジェクト生成の関数を指定します。必要時に New の関数が呼ばれます。あるスレッドで、 Get メソッドを呼び、もしプールに無かったら New 関数でオブジェクトを生成します。そして Get でオブジェクトを取得します。もしプールにオブジェクトが在れば、そのオブジェクトを返します。使い終わったら Put メソッドでプールに戻します。
これの繰り返しでオブジェクトをリサイクルすることが出来ます。また、スレッドセーフですので、スレッド間でメモリプールする事ができます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 | packagemain import( "fmt" "sync" "time" ) typeHogestruct{ name string value int } funcmain(){ pool:=sync.Pool{ New:func()interface{}{ return&Hoge{ name: "hoge", value:0, } }, } wg:=&sync.WaitGroup{} poolFunc:=func(){ hoge:=pool.Get().(*Hoge) ifhoge.value==0{ fmt.Println("New") }else{ fmt.Println("Cache") } hoge.value+=1 pool.Put(hoge) time.Sleep(1*time.Microsecond) wg.Done() } fori:=0;i<10;i++{ wg.Add(1) gopoolFunc() } fori:=0;i<10;i++{ wg.Add(1) gopoolFunc() } wg.Wait() fmt.Println("End") } |
実行すると以下のようになります。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | New New New Cache New New New Cache New New New New New New New New New New New Cache End |
さいごに
sync パッケージの紹介でしたがいかがでしょうか。一部機能にしか触れられておらず、これからも勉強していこうと思います。