はじめに
こんにちは。カイザーです。今回、Goシリーズ第6回目を担当させていただきます!
今回はGoでバックグラウンドでジョブを処理する「gocraft/work」を使用して、常駐プロセスでのジョブ処理について説明します。(以下workと呼びます)
https://github.com/gocraft/work
workの特徴
公式のREADMEに挙げられていますが、その中からピックアップして紹介します。
- 実行速度が速く、かつ効率的に動作するとのことです。Benchmarksによると、他の3ライブラリと比較して、最も早いことが分かります。
- プロセスがクラッシュしてもジョブが失われない。
- Middlewareという仕組みがあり、ジョブ実行時に呼び出され、メトリック計測・ログ出力に適しています。
- cronライクなスケジュールでジョブをエンキューすることも可能
様々な特徴がありますが、サンプルを動かしながら主要な機能を説明していきます。
workを使えるようにする
Redisのインストール
workによるジョブ実行のバックエンドには、Redisの通知があるため、Redisをインストールする必要があります。もうインストールしている人は飛ばしていただいて構いません。
macOSの人はHomeBrewでインストールできます。
1 | $brew install redis |
Go用のライブラリインストール
「go get」コマンドを使用し、今回使用するworkと、依存するredigoのredisをインストールします。
1 2 | $go getgithub.com/gocraft/work $go getgithub.com/gomodule/redigo/redis |
これで準備完了です。
Enqueue〜ジョブ実行まで
公式のサンプルを参考に、Enqueue〜ジョブ実行まで作ってみました。
また、Enqueue側とジョブ実行は、別のGoプロジェクトとしました。
Enqueue側
ほぼ公式サンプルですが、1回実行するごとに、1回エンキューするシンプルなものです。
(日本語でのコメントを記入しました。)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | packagemain import( "log" "github.com/gocraft/work" "github.com/gomodule/redigo/redis" ) // Redisプールの作成 varredisPool=&redis.Pool{ MaxActive:5, MaxIdle: 5, Wait: true, Dial:func()(redis.Conn,error){ returnredis.Dial("tcp",":6379") }, } // 新規Enqueuerを作成します。第一引数はRedis namespaceです。 varenqueuer=work.NewEnqueuer("my_app_namespace",redisPool) funcmain(){ // ジョブをエンキューします。ジョブ名は"send_email"で、addressやsubjectといったパラメータを付加しています。 _,err:=enqueuer.Enqueue("send_email",work.Q{"address":"test@example.com","subject":"hello world"}) iferr!=nil{ log.Fatal(err) } } |
「Redisプールの作成」はRedis周りの設定になります。この場合はエンキュー側の設定になるため、エンキューされる契機が多ければ、MaxActiveやMaxIdleの設定を上げても良いと思います。
こちらの設定については、こちらの記事が参考になります。
あとは、main関数で、ジョブに名前を付け、データを指定することができます。
ジョブ実行側
こちらは起動させておいて、エンキューされたらジョブを実行する側になります。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 | packagemain import( "fmt" "os" "os/signal" "time" "github.com/gocraft/work" "github.com/gomodule/redigo/redis" ) // Redisプールの作成 varredisPool=&redis.Pool{ MaxActive:5, MaxIdle: 5, Wait: true, Dial:func()(redis.Conn,error){ returnredis.Dial("tcp",":6379") }, } // Context ジョブを実行するための構造体 typeContextstruct{ } funcmain(){ // ジョブワーカーを管理するWorkerPoolを作成します。 // 第一引数: ジョブをリクエストされた時のContextとなる構造体インスタンス // 第二引数: 並行処理可能な最大数 // 第三引数: Redis namespace (エンキュー側で設定したもの) // 第四引数: Redis pool pool:=work.NewWorkerPool(Context{},10,"my_app_namespace",redisPool) // 各ジョブに対して実行されるミドルウェアを追加する (今回はログ取り機能) pool.Middleware((*Context).Log) // ジョブ名とそのハンドラーとなる関数をマッピングする pool.Job("send_email",(*Context).SendEmail) // Workerプールを開始する pool.Start() // OSシグナルで終了するまで待機する (その間、Redisからの通知によりジョブを処理する) signalChan:=make(chanos.Signal,1) signal.Notify(signalChan,os.Interrupt,os.Kill,syscall.SIGHUP) <-signalChan // シグナルを受信すると待機をやめ、ここまで進むので、WorkerPoolを止めます。 pool.Stop() } func(c *Context)Log(job *work.Job,next work.NextMiddlewareFunc)error{ fmt.Println("Starting job: ",job.Name) returnnext() } func(c *Context)SendEmail(job *work.Job)error{ // エンキュー時に登録したパラメータを取得する addr:=job.ArgString("address") iferr:=job.ArgError();err!=nil{ returnerr } // 10秒待つ (本来であればメール送信ですが、省略します) time.Sleep(10*time.Second) fmt.Println("Job Processed!!!!!") returnnil } |
main関数の流れを簡単に説明します。
まず、Workerプールを作成し、ミドルウェアの設定、ジョブと関数のマッピングといった設定を行います。
設定が完了したら、Workerプールを開始します。
その後は、OSシグナルで終了するまで待機させておくことにより、その間エンキューを受け付けて処理を実行させることが可能になります。
ミドルウェア用の関数や、ジョブ処理用の関数は、Context構造体のメソッドとなっています。(後述のContextで説明します)
一通りの流れを試してみるには、まずジョブ実行側を「go run」しておき、後からEnqueue側を「go run」すると、ビルド実行側にログが表示されることが分かります。(あらかじめRedisは起動しておいてください)
また、ジョブ実行側はOSシグナルが無い限り待機し続けるため、Enqueue側を何度も実行すれば、ジョブ実行側にその分ログが出力されます。
バックエンドはRedisなので、先にEnqueue側を何度か実行しておけば、ジョブ実行側が後から立ち上がった時に、Enqueueされた分だけジョブが実行されます。
Context
workにおけるContextは実装者が自由に定義できる構造体ですが、WorkerPoolに必要です。
フィールドは空でも、様々なフィールドがあっても構いません。
また、Middlewareやジョブ実行用のメソッドはこのContextに実装する必要があります。
そのため、Middlewareやジョブ実行時に、Contextのフィールドを書き換えることも可能です。
Middleware
Middlewareはジョブ実行時に、任意のコードを実行できる機能です。
ジョブでは本質的に行いたい機能を、Middlewareではログ取りや計測といった、付属的かつ各ジョブでの共通機能、といった使い分けが可能です。
ジョブ実行中のタスクキル
ジョブ実行側では、Redisからの通知を待機するため、「signal.Notify()」を使用して、OSシグナルを受信するまで待機するように実装しました。
1 2 3 4 5 6 7 | // OSシグナルで終了するまで待機する (その間、Redisからの通知によりジョブを処理する) signalChan:=make(chanos.Signal,1) signal.Notify(signalChan,os.Interrupt,os.Kill,syscall.SIGHUP) <-signalChan // シグナルを受信すると待機をやめ、ここまで進むので、WorkerPoolを止めます。 pool.Stop() |
signalがos.Interrupt, os.Kill, syscall.SIGHUPのいずれかを受信すると、signalChanに受信され、5行目以降の処理に移ります。
その後はそのまま「pool.Stop()」でWorkerPoolを止めますが、実行中のジョブに関しては、完了を待ってから終了されます。
そのため、ジョブの実行中(今回の場合は10秒待機中)に「control+c」や「 kill -9」、「kill -HUP」でシグナル送信を行うと、ジョブが完了するまでは、プロセスキルされません。
Cronライクなスケジュール
PeriodicallyEnqueueを使用すると、WorkerPoolを使用して、Cronライクにジョブの定期実行ができます。
GoのCron構文仕様は下記が参考になります。
https://godoc.org/github.com/robfig/cron#hdr-CRON_Expression_Format
例えば、「0 0 * * * *」の場合は、毎時実行されます。
(フィールドは、それぞれ秒、分、時、日、曜日を表します。)
PeriodicallyEnqueueは、先ほどのEnqueueとは異なり、WorkerPoolに対して設定します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | funcmain(){ pool:=work.NewWorkerPool(Context{},10,"my_app_namespace",redisPool) // Periodic Enqueueing (Cron)設定 (ジョブ名とマッピング) pool.PeriodicallyEnqueue("* * * * * *","sample_job") // ジョブ名とそのハンドラーとなる関数をマッピングする pool.Job("sample_job",(*Context).SampleProcess) // Workerプールを開始する pool.Start() // OSシグナルで終了するまで待機する (その間、Redisからの通知によりジョブを処理する) signalChan:=make(chanos.Signal,1) signal.Notify(signalChan,os.Interrupt,os.Kill) <-signalChan // シグナルを受信すると待機をやめ、ここまで進むので、Workersを止めます。 pool.Stop() } // PeriodicallyEnqueue(Cron)で呼び出されるメソッド func(c *Context)SampleProcess(job *work.Job)error{ fmt.Println("Cron Job Processed!!!!",time.Now()) returnnil } |
実行すると、毎秒SampleProcess()が呼び出され、ログ出力されます。
Enqueueした時のRedisについて
最初のサンプルでEnqueueした直後に、redis-cliで実際の値を確認してみました。
1 2 3 4 5 | 127.0.0.1:6379>lrange my_app_namespace:jobs:send_email0-1 1)"{\"name\":\"send_email\",\"id\":\"f2c6c796f0c3342876c5a478\",\"t\":1539574270,\"args\":{\"address\":\"test@example.com\",\"subject\":\"hello world\"}}" 2)"{\"name\":\"send_email\",\"id\":\"6568297e8de239c8310aa9c8\",\"t\":1539574175,\"args\":{\"address\":\"test@example.com\",\"subject\":\"hello world\"}}" 127.0.0.1:6379>smembers my_app_namespace:known_jobs 1)"send_email" |
これを読んでみると、「設定した名前空間:jobs:設定したジョブ名」をキーにして、JSON文字列でEnqueueされたジョブ情報が入っています。(リスト型)
また、「設定した名前空間:known_jobs」には、上記でEnqueueしたジョブ名が入っています。こちらはセット型なので、ジョブ名は重複せずに管理されていることが分かります。
上記を踏まえ、RubyからEnqueueするプログラムを作成してみました。
(「$ gem install redis」が必須です)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | require'redis' require'securerandom' require'json' redis=Redis.new value={ name:'send_email',# ジョブ名 id:SecureRandom.hex(12),# 12バイトの乱数を指定 t:Time.now.to_i,# 現在時刻をタイムスタンプで args:{address:'test@example.com',subject:'hello world'}# 任意の引数 } # キーは「名前空間:jobs:ジョブ名」で、値はJSON化しておく redis.rpush('my_app_namespace:jobs:send_email',value.to_json) # 「名前空間:known_jobs」にジョブ名を入れておく redis.sadd('my_app_namespace:known_jobs','send_email') |
今回は、先ほどのサンプルのEnqueueをRubyで呼び出してみました。workで動作させることが目的であるため、Redisに格納するキーと値は、workの仕様に基づいたものになります。
valueのidは、12バイトの乱数をHex文字列で渡します。tは現在時刻をタイムスタンプで渡します。
Redisを使用しているので、Enqueueする側の自由度はかなり高いですね。
WebUI
同じくgocraftが提供している「workwebui」を導入すると、ジョブの状況について、WebUIで可視化できます。
workwebuiの導入〜起動
「go get」した後、「go install」することで使用可能になります。
1 2 | $go getgithub.com/gocraft/work/cmd/workwebui $go install github.com/gocraft/work/cmd/workwebui |
下記コマンドでWebUIが起動します。
1 | $workwebui-redis="redis:6379"-ns="work"-listen=":5040" |
起動できたらブラウザから「http://localhost:5040/」にアクセスしてください。
先ほどのサンプルをWebUIで見てみる
このように、実行中のジョブを確認することができます。処理中のジョブ、再施行されたジョブ、スケジュールされた(Cronライクな)ジョブ等、各ジョブの状況ごとに表示することができます。
さいごに
workはシンプルなインターフェースでありながら、柔軟性の高いライブラリでした。
Redisを使用しているため、エンキュー側は何でもよく、ワーカー側もContextへの実装により様々なジョブ処理が可能です。
Goでバックグラウンドジョブを実装する際は、ぜひ使って見てください。
Go記事の連載などは、こちらをご覧ください。