カテゴリー: BackEnd

Golangのsyncパッケージによる同期・排他制御

はじめに

以前にgoroutineとchannelとContextという記事を書きましたが、今回は channel を使用しないで並行処理の同期や排他制御を簡単に行える、 sync パッケージについて学習したいと思います。

sync.WaitGroup

sync.WaitGroup は複数の Goroutine の完了を待つことが出来ます。

package main

import (
 "fmt"
 "sync"
 "time"
)

func main() {
 var wg sync.WaitGroup

 for i := 0; i < 100; i++ {
  wg.Add(1)
  go func(i int) {
   time.Sleep(2 * time.Second)
   fmt.Println("End:", i)
   wg.Done()
  }(i)
 }

 wg.Wait()
}

sync.WaitGroup は複数の Goroutine の完了を待つことが出来ます。
sync.WaitGroup は基本的にはただのカウンタですが、カウンタがゼロになるまで処理を待つことができます。上記のコードだと Add(1) でカウンタをインクリメントし、 Done() でデクリメントしています。そして Wait() はカウンタがゼロになるまで待ちます。

sync.Mutex

channel 以外の方法で変数に対して異なる goroutine がアクセスしても、競合によるエラーが起きないようにするための機能です。

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

func parallel(wg *sync.WaitGroup, m *sync.Mutex) {
    m.Lock()
    defer m.Unlock()

    fmt.Println("あ")
    time.Sleep(100 * time.Millisecond)
    fmt.Println("い")
    time.Sleep(100 * time.Millisecond)
    fmt.Println("う")
    time.Sleep(100 * time.Millisecond)
    fmt.Println("え")
    time.Sleep(100 * time.Millisecond)
    fmt.Println("お")
    wg.Done()
}

func main() {
    runtime.GOMAXPROCS(runtime.NumCPU())

    wg := new(sync.WaitGroup)
    m := new(sync.Mutex)
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go parallel(wg, m)
    }
    wg.Wait()
}

sync.Mutex の Lock 関数と Unlock 関数で排他ロックの取得と解除ができます。排他ロックなので、排他ロックを得た goroutine が存在する場合は、ロックを得ようとする他の goroutine は処理を待ち、以下のような結果になります。

あ
い
う
え
お
あ
い
う
え
お
あ
い
う
え
お

sync.RWMutex

sync.RWMutex の Rlock 関数と Runlock 関数で共有ロックの取得と解除ができます。どういうことなのかというと、 Rlock 関数でロックを行なった goroutine 同士であれば処理を進めることができるようになります。また、共有ロックを得た goroutine が存在する場合は、排他ロックを取得しようとする goroutine は待ちます。

package main

import (
    "fmt"
    "math/rand"
    "strings"
    "sync"
    "time"
)

func init() {
    rand.Seed(time.Now().Unix())
}

func sleep() {
    time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
}

func reader(c chan int, m *sync.RWMutex, wg *sync.WaitGroup) {
    sleep()
    m.RLock()
    c <- 1
    sleep()
    c <- -1
    m.RUnlock()
    wg.Done()
}

func writer(c chan int, m *sync.RWMutex, wg *sync.WaitGroup) {
    sleep()
    m.Lock()
    c <- 1
    sleep()
    c <- -1
    m.Unlock()
    wg.Done()
}

func main() {
    var m sync.RWMutex
    var rs, ws int
    rsCh := make(chan int)
    wsCh := make(chan int)
    go func() {
        for {
            select {
            case n := <-rsCh:
                rs += n
            case n := <-wsCh:
                ws += n
            }
            fmt.Printf("%s%s\n", strings.Repeat("R", rs),
                    strings.Repeat("W", ws))
        }
    }()
    wg := sync.WaitGroup{}
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go reader(rsCh, &m, &wg)
    }
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go writer(wsCh, &m, &wg)
    }
    wg.Wait()
}

実行すると以下のようになります。

W

R
RR
RRR
RR
R

W

W

sync.Map

Go の map は goroutine で Read しているときに別の goroutine から Write してはいけないというルールがあります。これを行なってしまうと panic が起きてしまいます。例えば以下の処理を実行すると異常終了を起こします。

package main

import (
 "fmt"
 "math/rand"
)

func main() {
 m := map[int]struct{}{}
 go func() {
  for {
   m[rand.Intn(100000)] = struct{}{}
  }
 }()

 for {
  fmt.Println(m[rand.Intn(100000)])
 }
}

こちらに対応するには、上記の sync.Mutex を使うことで対応可能ですが、 sync.Map を使用することで、ロックの制御を気にせずに map を使用することができます。

package main

import (
 "fmt"
 "math/rand"
 "sync"
)

func main() {
 var m sync.Map

 go func() {
  for {
   m.Store(rand.Intn(100000), struct{}{})
  }
 }()

 for {
  fmt.Println(m.Load(rand.Intn(100000)))
 }
}

sync.Once

sync.Once は関数を一度だけ実行するようにできます。

package main

import (
    "fmt"
    "runtime"
    "sync"
)

var once = new(sync.Once)

func greeting(wg *sync.WaitGroup) {
    once.Do(func() {
        fmt.Println("一度だけ")
    })

    fmt.Println("連続")
    wg.Done()
}

func main() {
    runtime.GOMAXPROCS(runtime.NumCPU())

    defer fmt.Println("最後")

    wg := new(sync.WaitGroup)
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go greeting(wg)
    }
    wg.Wait()
}

実行すると以下のようになります。

一度だけ
連続
連続
連続
連続
連続
最後

sync.Pool

sync.Pool はスレッドセーフなメモリプールです。使い方は、 sync.Pool 構造体を生成する時の New フィールドにオブジェクト生成の関数を指定します。必要時に New の関数が呼ばれます。あるスレッドで、 Get メソッドを呼び、もしプールに無かったら New 関数でオブジェクトを生成します。そして Get でオブジェクトを取得します。もしプールにオブジェクトが在れば、そのオブジェクトを返します。使い終わったら Put メソッドでプールに戻します。
これの繰り返しでオブジェクトをリサイクルすることが出来ます。また、スレッドセーフですので、スレッド間でメモリプールする事ができます。

package main

import (
 "fmt"
 "sync"
 "time"
)

type Hoge struct {
 name  string
 value int
}

func main() {

 pool := sync.Pool{
  New: func() interface{} {
   return &Hoge{
    name:  "hoge",
    value: 0,
   }
  },
 }
 wg := &sync.WaitGroup{}

 poolFunc := func() {
  hoge := pool.Get().(*Hoge)
  if hoge.value == 0 {
   fmt.Println("New")
  } else {
   fmt.Println("Cache")
  }
  hoge.value += 1
  pool.Put(hoge)
  time.Sleep(1 * time.Microsecond)
  wg.Done()
 }

 for i := 0; i < 10; i++ {
  wg.Add(1)
  go poolFunc()
 }

 for i := 0; i < 10; i++ {
  wg.Add(1)
  go poolFunc()
 }

 wg.Wait()
 fmt.Println("End")
}

実行すると以下のようになります。

New
New
New
Cache
New
New
New
Cache
New
New
New
New
New
New
New
New
New
New
New
Cache
End

さいごに

sync パッケージの紹介でしたがいかがでしょうか。一部機能にしか触れられておらず、これからも勉強していこうと思います。

おすすめ書籍

   

nukky

シェア
執筆者:
nukky
タグ: golang

最近の投稿

フロントエンドで動画デコレーション&レンダリング

はじめに 今回は、以下のように…

2週間 前

Goのクエリビルダー goqu を使ってみる

はじめに 最近携わっているとあ…

4週間 前

【Xcode15】プライバシーマニフェスト対応に備えて

はじめに こんにちは、suzu…

2か月 前

FSMを使った状態管理をGoで実装する

はじめに 一般的なアプリケーシ…

3か月 前