はじめに
こんにちは。カイザーです。今回、Goシリーズ第6回目を担当させていただきます!
今回はGoでバックグラウンドでジョブを処理する「gocraft/work」を使用して、常駐プロセスでのジョブ処理について説明します。(以下workと呼びます)
https://github.com/gocraft/work
workの特徴
公式のREADMEに挙げられていますが、その中からピックアップして紹介します。
- 実行速度が速く、かつ効率的に動作するとのことです。Benchmarksによると、他の3ライブラリと比較して、最も早いことが分かります。
- プロセスがクラッシュしてもジョブが失われない。
- Middlewareという仕組みがあり、ジョブ実行時に呼び出され、メトリック計測・ログ出力に適しています。
- cronライクなスケジュールでジョブをエンキューすることも可能
様々な特徴がありますが、サンプルを動かしながら主要な機能を説明していきます。
workを使えるようにする
Redisのインストール
workによるジョブ実行のバックエンドには、Redisの通知があるため、Redisをインストールする必要があります。もうインストールしている人は飛ばしていただいて構いません。
macOSの人はHomeBrewでインストールできます。
1 | $ brew install redis |
Go用のライブラリインストール
「go get」コマンドを使用し、今回使用するworkと、依存するredigoのredisをインストールします。
1 2 | $ go get github.com/gocraft/work $ go get github.com/gomodule/redigo/redis |
これで準備完了です。
Enqueue〜ジョブ実行まで
公式のサンプルを参考に、Enqueue〜ジョブ実行まで作ってみました。
また、Enqueue側とジョブ実行は、別のGoプロジェクトとしました。
Enqueue側
ほぼ公式サンプルですが、1回実行するごとに、1回エンキューするシンプルなものです。
(日本語でのコメントを記入しました。)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | package main import ( "log" "github.com/gocraft/work" "github.com/gomodule/redigo/redis" ) // Redisプールの作成 var redisPool = &redis.Pool{ MaxActive: 5, MaxIdle: 5, Wait: true, Dial: func() (redis.Conn, error) { return redis.Dial("tcp", ":6379") }, } // 新規Enqueuerを作成します。第一引数はRedis namespaceです。 var enqueuer = work.NewEnqueuer("my_app_namespace", redisPool) func main() { // ジョブをエンキューします。ジョブ名は"send_email"で、addressやsubjectといったパラメータを付加しています。 _, err := enqueuer.Enqueue("send_email", work.Q{"address": "test@example.com", "subject": "hello world"}) if err != nil { log.Fatal(err) } } |
「Redisプールの作成」はRedis周りの設定になります。この場合はエンキュー側の設定になるため、エンキューされる契機が多ければ、MaxActiveやMaxIdleの設定を上げても良いと思います。
こちらの設定については、こちらの記事が参考になります。
あとは、main関数で、ジョブに名前を付け、データを指定することができます。
ジョブ実行側
こちらは起動させておいて、エンキューされたらジョブを実行する側になります。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 | package main import ( "fmt" "os" "os/signal" "time" "github.com/gocraft/work" "github.com/gomodule/redigo/redis" ) // Redisプールの作成 var redisPool = &redis.Pool{ MaxActive: 5, MaxIdle: 5, Wait: true, Dial: func() (redis.Conn, error) { return redis.Dial("tcp", ":6379") }, } // Context ジョブを実行するための構造体 type Context struct { } func main() { // ジョブワーカーを管理するWorkerPoolを作成します。 // 第一引数: ジョブをリクエストされた時のContextとなる構造体インスタンス // 第二引数: 並行処理可能な最大数 // 第三引数: Redis namespace (エンキュー側で設定したもの) // 第四引数: Redis pool pool := work.NewWorkerPool(Context{}, 10, "my_app_namespace", redisPool) // 各ジョブに対して実行されるミドルウェアを追加する (今回はログ取り機能) pool.Middleware((*Context).Log) // ジョブ名とそのハンドラーとなる関数をマッピングする pool.Job("send_email", (*Context).SendEmail) // Workerプールを開始する pool.Start() // OSシグナルで終了するまで待機する (その間、Redisからの通知によりジョブを処理する) signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, os.Interrupt, os.Kill, syscall.SIGHUP) <-signalChan // シグナルを受信すると待機をやめ、ここまで進むので、WorkerPoolを止めます。 pool.Stop() } func (c *Context) Log(job *work.Job, next work.NextMiddlewareFunc) error { fmt.Println("Starting job: ", job.Name) return next() } func (c *Context) SendEmail(job *work.Job) error { // エンキュー時に登録したパラメータを取得する addr := job.ArgString("address") if err := job.ArgError(); err != nil { return err } // 10秒待つ (本来であればメール送信ですが、省略します) time.Sleep(10 * time.Second) fmt.Println("Job Processed!!!!!") return nil } |
main関数の流れを簡単に説明します。
まず、Workerプールを作成し、ミドルウェアの設定、ジョブと関数のマッピングといった設定を行います。
設定が完了したら、Workerプールを開始します。
その後は、OSシグナルで終了するまで待機させておくことにより、その間エンキューを受け付けて処理を実行させることが可能になります。
ミドルウェア用の関数や、ジョブ処理用の関数は、Context構造体のメソッドとなっています。(後述のContextで説明します)
一通りの流れを試してみるには、まずジョブ実行側を「go run」しておき、後からEnqueue側を「go run」すると、ビルド実行側にログが表示されることが分かります。(あらかじめRedisは起動しておいてください)
また、ジョブ実行側はOSシグナルが無い限り待機し続けるため、Enqueue側を何度も実行すれば、ジョブ実行側にその分ログが出力されます。
バックエンドはRedisなので、先にEnqueue側を何度か実行しておけば、ジョブ実行側が後から立ち上がった時に、Enqueueされた分だけジョブが実行されます。
Context
workにおけるContextは実装者が自由に定義できる構造体ですが、WorkerPoolに必要です。
フィールドは空でも、様々なフィールドがあっても構いません。
また、Middlewareやジョブ実行用のメソッドはこのContextに実装する必要があります。
そのため、Middlewareやジョブ実行時に、Contextのフィールドを書き換えることも可能です。
Middleware
Middlewareはジョブ実行時に、任意のコードを実行できる機能です。
ジョブでは本質的に行いたい機能を、Middlewareではログ取りや計測といった、付属的かつ各ジョブでの共通機能、といった使い分けが可能です。
ジョブ実行中のタスクキル
ジョブ実行側では、Redisからの通知を待機するため、「signal.Notify()」を使用して、OSシグナルを受信するまで待機するように実装しました。
1 2 3 4 5 6 7 | // OSシグナルで終了するまで待機する (その間、Redisからの通知によりジョブを処理する) signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, os.Interrupt, os.Kill, syscall.SIGHUP) <-signalChan // シグナルを受信すると待機をやめ、ここまで進むので、WorkerPoolを止めます。 pool.Stop() |
signalがos.Interrupt, os.Kill, syscall.SIGHUPのいずれかを受信すると、signalChanに受信され、5行目以降の処理に移ります。
その後はそのまま「pool.Stop()」でWorkerPoolを止めますが、実行中のジョブに関しては、完了を待ってから終了されます。
そのため、ジョブの実行中(今回の場合は10秒待機中)に「control+c」や「 kill -9」、「kill -HUP」でシグナル送信を行うと、ジョブが完了するまでは、プロセスキルされません。
Cronライクなスケジュール
PeriodicallyEnqueueを使用すると、WorkerPoolを使用して、Cronライクにジョブの定期実行ができます。
GoのCron構文仕様は下記が参考になります。
https://godoc.org/github.com/robfig/cron#hdr-CRON_Expression_Format
例えば、「0 0 * * * *」の場合は、毎時実行されます。
(フィールドは、それぞれ秒、分、時、日、曜日を表します。)
PeriodicallyEnqueueは、先ほどのEnqueueとは異なり、WorkerPoolに対して設定します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | func main() { pool := work.NewWorkerPool(Context{}, 10, "my_app_namespace", redisPool) // Periodic Enqueueing (Cron)設定 (ジョブ名とマッピング) pool.PeriodicallyEnqueue("* * * * * *", "sample_job") // ジョブ名とそのハンドラーとなる関数をマッピングする pool.Job("sample_job", (*Context).SampleProcess) // Workerプールを開始する pool.Start() // OSシグナルで終了するまで待機する (その間、Redisからの通知によりジョブを処理する) signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, os.Interrupt, os.Kill) <-signalChan // シグナルを受信すると待機をやめ、ここまで進むので、Workersを止めます。 pool.Stop() } // PeriodicallyEnqueue(Cron)で呼び出されるメソッド func (c *Context) SampleProcess(job *work.Job) error { fmt.Println("Cron Job Processed!!!!", time.Now()) return nil } |
実行すると、毎秒SampleProcess()が呼び出され、ログ出力されます。
Enqueueした時のRedisについて
最初のサンプルでEnqueueした直後に、redis-cliで実際の値を確認してみました。
1 2 3 4 5 | 127.0.0.1:6379> lrange my_app_namespace:jobs:send_email 0 -1 1) "{\"name\":\"send_email\",\"id\":\"f2c6c796f0c3342876c5a478\",\"t\":1539574270,\"args\":{\"address\":\"test@example.com\",\"subject\":\"hello world\"}}" 2) "{\"name\":\"send_email\",\"id\":\"6568297e8de239c8310aa9c8\",\"t\":1539574175,\"args\":{\"address\":\"test@example.com\",\"subject\":\"hello world\"}}" 127.0.0.1:6379> smembers my_app_namespace:known_jobs 1) "send_email" |
これを読んでみると、「設定した名前空間:jobs:設定したジョブ名」をキーにして、JSON文字列でEnqueueされたジョブ情報が入っています。(リスト型)
また、「設定した名前空間:known_jobs」には、上記でEnqueueしたジョブ名が入っています。こちらはセット型なので、ジョブ名は重複せずに管理されていることが分かります。
上記を踏まえ、RubyからEnqueueするプログラムを作成してみました。
(「$ gem install redis」が必須です)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | require 'redis' require 'securerandom' require 'json' redis = Redis.new value = { name: 'send_email', # ジョブ名 id: SecureRandom.hex(12), # 12バイトの乱数を指定 t: Time.now.to_i, # 現在時刻をタイムスタンプで args: {address: 'test@example.com', subject: 'hello world'} # 任意の引数 } # キーは「名前空間:jobs:ジョブ名」で、値はJSON化しておく redis.rpush('my_app_namespace:jobs:send_email', value.to_json) # 「名前空間:known_jobs」にジョブ名を入れておく redis.sadd('my_app_namespace:known_jobs', 'send_email') |
今回は、先ほどのサンプルのEnqueueをRubyで呼び出してみました。workで動作させることが目的であるため、Redisに格納するキーと値は、workの仕様に基づいたものになります。
valueのidは、12バイトの乱数をHex文字列で渡します。tは現在時刻をタイムスタンプで渡します。
Redisを使用しているので、Enqueueする側の自由度はかなり高いですね。
WebUI
同じくgocraftが提供している「workwebui」を導入すると、ジョブの状況について、WebUIで可視化できます。
workwebuiの導入〜起動
「go get」した後、「go install」することで使用可能になります。
1 2 | $ go get github.com/gocraft/work/cmd/workwebui $ go install github.com/gocraft/work/cmd/workwebui |
下記コマンドでWebUIが起動します。
1 | $ workwebui -redis="redis:6379" -ns="work" -listen=":5040" |
起動できたらブラウザから「http://localhost:5040/」にアクセスしてください。
先ほどのサンプルをWebUIで見てみる
このように、実行中のジョブを確認することができます。処理中のジョブ、再施行されたジョブ、スケジュールされた(Cronライクな)ジョブ等、各ジョブの状況ごとに表示することができます。
さいごに
workはシンプルなインターフェースでありながら、柔軟性の高いライブラリでした。
Redisを使用しているため、エンキュー側は何でもよく、ワーカー側もContextへの実装により様々なジョブ処理が可能です。
Goでバックグラウンドジョブを実装する際は、ぜひ使って見てください。
Go記事の連載などは、こちらをご覧ください。