カテゴリー: BackEnd

Go言語 gocraft/workを使って常駐プロセスでジョブを処理させる

はじめに

こんにちは。カイザーです。今回、Goシリーズ第6回目を担当させていただきます!
今回はGoでバックグラウンドでジョブを処理する「gocraft/work」を使用して、常駐プロセスでのジョブ処理について説明します。(以下workと呼びます)
https://github.com/gocraft/work

workの特徴

公式のREADMEに挙げられていますが、その中からピックアップして紹介します。

  • 実行速度が速く、かつ効率的に動作するとのことです。Benchmarksによると、他の3ライブラリと比較して、最も早いことが分かります。
  • プロセスがクラッシュしてもジョブが失われない。
  • Middlewareという仕組みがあり、ジョブ実行時に呼び出され、メトリック計測・ログ出力に適しています。
  • cronライクなスケジュールでジョブをエンキューすることも可能

様々な特徴がありますが、サンプルを動かしながら主要な機能を説明していきます。

workを使えるようにする

Redisのインストール

workによるジョブ実行のバックエンドには、Redisの通知があるため、Redisをインストールする必要があります。もうインストールしている人は飛ばしていただいて構いません。
macOSの人はHomeBrewでインストールできます。

$ brew install redis

Go用のライブラリインストール

「go get」コマンドを使用し、今回使用するworkと、依存するredigoのredisをインストールします。

$ go get github.com/gocraft/work
$ go get github.com/gomodule/redigo/redis

これで準備完了です。

Enqueue〜ジョブ実行まで

公式のサンプルを参考に、Enqueue〜ジョブ実行まで作ってみました。
また、Enqueue側とジョブ実行は、別のGoプロジェクトとしました。

Enqueue側

ほぼ公式サンプルですが、1回実行するごとに、1回エンキューするシンプルなものです。
(日本語でのコメントを記入しました。)

package main

import (
 "log"

 "github.com/gocraft/work"
 "github.com/gomodule/redigo/redis"
)

// Redisプールの作成
var redisPool = &redis.Pool{
 MaxActive: 5,
 MaxIdle:   5,
 Wait:      true,
 Dial: func() (redis.Conn, error) {
  return redis.Dial("tcp", ":6379")
 },
}

// 新規Enqueuerを作成します。第一引数はRedis namespaceです。
var enqueuer = work.NewEnqueuer("my_app_namespace", redisPool)

func main() {
 // ジョブをエンキューします。ジョブ名は"send_email"で、addressやsubjectといったパラメータを付加しています。
 _, err := enqueuer.Enqueue("send_email", work.Q{"address": "test@example.com", "subject": "hello world"})
 if err != nil {
  log.Fatal(err)
 }
}

「Redisプールの作成」はRedis周りの設定になります。この場合はエンキュー側の設定になるため、エンキューされる契機が多ければ、MaxActiveやMaxIdleの設定を上げても良いと思います。
こちらの設定については、こちらの記事が参考になります。

あとは、main関数で、ジョブに名前を付け、データを指定することができます。

ジョブ実行側

こちらは起動させておいて、エンキューされたらジョブを実行する側になります。

package main

import (
 "fmt"
 "os"
 "os/signal"
 "time"

 "github.com/gocraft/work"
 "github.com/gomodule/redigo/redis"
)

// Redisプールの作成
var redisPool = &redis.Pool{
 MaxActive: 5,
 MaxIdle:   5,
 Wait:      true,
 Dial: func() (redis.Conn, error) {
  return redis.Dial("tcp", ":6379")
 },
}

// Context ジョブを実行するための構造体
type Context struct {
}

func main() {
 // ジョブワーカーを管理するWorkerPoolを作成します。
 // 第一引数: ジョブをリクエストされた時のContextとなる構造体インスタンス
 // 第二引数: 並行処理可能な最大数
 // 第三引数: Redis namespace (エンキュー側で設定したもの)
 // 第四引数: Redis pool
 pool := work.NewWorkerPool(Context{}, 10, "my_app_namespace", redisPool)

 // 各ジョブに対して実行されるミドルウェアを追加する (今回はログ取り機能)
 pool.Middleware((*Context).Log)

 // ジョブ名とそのハンドラーとなる関数をマッピングする
 pool.Job("send_email", (*Context).SendEmail)

 // Workerプールを開始する
 pool.Start()

 // OSシグナルで終了するまで待機する (その間、Redisからの通知によりジョブを処理する)
 signalChan := make(chan os.Signal, 1)
 signal.Notify(signalChan, os.Interrupt, os.Kill, syscall.SIGHUP)
 <-signalChan

 // シグナルを受信すると待機をやめ、ここまで進むので、WorkerPoolを止めます。
 pool.Stop()
}

func (c *Context) Log(job *work.Job, next work.NextMiddlewareFunc) error {
 fmt.Println("Starting job: ", job.Name)
 return next()
}


func (c *Context) SendEmail(job *work.Job) error {
 // エンキュー時に登録したパラメータを取得する
 addr := job.ArgString("address")
 if err := job.ArgError(); err != nil {
  return err
 }

 // 10秒待つ (本来であればメール送信ですが、省略します)
 time.Sleep(10 * time.Second)
        fmt.Println("Job Processed!!!!!")

 return nil
}

main関数の流れを簡単に説明します。
まず、Workerプールを作成し、ミドルウェアの設定、ジョブと関数のマッピングといった設定を行います。
設定が完了したら、Workerプールを開始します。
その後は、OSシグナルで終了するまで待機させておくことにより、その間エンキューを受け付けて処理を実行させることが可能になります。

ミドルウェア用の関数や、ジョブ処理用の関数は、Context構造体のメソッドとなっています。(後述のContextで説明します)

一通りの流れを試してみるには、まずジョブ実行側を「go run」しておき、後からEnqueue側を「go run」すると、ビルド実行側にログが表示されることが分かります。(あらかじめRedisは起動しておいてください)

また、ジョブ実行側はOSシグナルが無い限り待機し続けるため、Enqueue側を何度も実行すれば、ジョブ実行側にその分ログが出力されます。

バックエンドはRedisなので、先にEnqueue側を何度か実行しておけば、ジョブ実行側が後から立ち上がった時に、Enqueueされた分だけジョブが実行されます。

Context

workにおけるContextは実装者が自由に定義できる構造体ですが、WorkerPoolに必要です。
フィールドは空でも、様々なフィールドがあっても構いません。
また、Middlewareやジョブ実行用のメソッドはこのContextに実装する必要があります。
そのため、Middlewareやジョブ実行時に、Contextのフィールドを書き換えることも可能です。

Middleware

Middlewareはジョブ実行時に、任意のコードを実行できる機能です。
ジョブでは本質的に行いたい機能を、Middlewareではログ取りや計測といった、付属的かつ各ジョブでの共通機能、といった使い分けが可能です。

ジョブ実行中のタスクキル

ジョブ実行側では、Redisからの通知を待機するため、「signal.Notify()」を使用して、OSシグナルを受信するまで待機するように実装しました。

// OSシグナルで終了するまで待機する (その間、Redisからの通知によりジョブを処理する)
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt, os.Kill, syscall.SIGHUP)
<-signalChan

// シグナルを受信すると待機をやめ、ここまで進むので、WorkerPoolを止めます。
pool.Stop()

signalがos.Interrupt, os.Kill, syscall.SIGHUPのいずれかを受信すると、signalChanに受信され、5行目以降の処理に移ります。
その後はそのまま「pool.Stop()」でWorkerPoolを止めますが、実行中のジョブに関しては、完了を待ってから終了されます。

そのため、ジョブの実行中(今回の場合は10秒待機中)に「control+c」や「 kill -9」、「kill -HUP」でシグナル送信を行うと、ジョブが完了するまでは、プロセスキルされません。

Cronライクなスケジュール

PeriodicallyEnqueueを使用すると、WorkerPoolを使用して、Cronライクにジョブの定期実行ができます。
GoのCron構文仕様は下記が参考になります。
https://godoc.org/github.com/robfig/cron#hdr-CRON_Expression_Format

例えば、「0 0 * * * *」の場合は、毎時実行されます。
(フィールドは、それぞれ秒、分、時、日、曜日を表します。)

PeriodicallyEnqueueは、先ほどのEnqueueとは異なり、WorkerPoolに対して設定します。

func main() {
 pool := work.NewWorkerPool(Context{}, 10, "my_app_namespace", redisPool)
 // Periodic Enqueueing (Cron)設定 (ジョブ名とマッピング)
 pool.PeriodicallyEnqueue("* * * * * *", "sample_job")

 // ジョブ名とそのハンドラーとなる関数をマッピングする
 pool.Job("sample_job", (*Context).SampleProcess)

 // Workerプールを開始する
 pool.Start()

 // OSシグナルで終了するまで待機する (その間、Redisからの通知によりジョブを処理する)
 signalChan := make(chan os.Signal, 1)
 signal.Notify(signalChan, os.Interrupt, os.Kill)
 <-signalChan

 // シグナルを受信すると待機をやめ、ここまで進むので、Workersを止めます。
 pool.Stop()
}

// PeriodicallyEnqueue(Cron)で呼び出されるメソッド
func (c *Context) SampleProcess(job *work.Job) error {
 fmt.Println("Cron Job Processed!!!!", time.Now())
 return nil
}

実行すると、毎秒SampleProcess()が呼び出され、ログ出力されます。

Enqueueした時のRedisについて

最初のサンプルでEnqueueした直後に、redis-cliで実際の値を確認してみました。

127.0.0.1:6379> lrange my_app_namespace:jobs:send_email 0 -1
1) "{\"name\":\"send_email\",\"id\":\"f2c6c796f0c3342876c5a478\",\"t\":1539574270,\"args\":{\"address\":\"test@example.com\",\"subject\":\"hello world\"}}"
2) "{\"name\":\"send_email\",\"id\":\"6568297e8de239c8310aa9c8\",\"t\":1539574175,\"args\":{\"address\":\"test@example.com\",\"subject\":\"hello world\"}}"
127.0.0.1:6379> smembers my_app_namespace:known_jobs
1) "send_email"

これを読んでみると、「設定した名前空間:jobs:設定したジョブ名」をキーにして、JSON文字列でEnqueueされたジョブ情報が入っています。(リスト型)
また、「設定した名前空間:known_jobs」には、上記でEnqueueしたジョブ名が入っています。こちらはセット型なので、ジョブ名は重複せずに管理されていることが分かります。
上記を踏まえ、RubyからEnqueueするプログラムを作成してみました。
(「$ gem install redis」が必須です)

require 'redis'
require 'securerandom'
require 'json'

redis = Redis.new

value = {
    name: 'send_email', # ジョブ名
    id: SecureRandom.hex(12), # 12バイトの乱数を指定
    t: Time.now.to_i, # 現在時刻をタイムスタンプで
    args: {address: 'test@example.com', subject: 'hello world'} # 任意の引数
}

# キーは「名前空間:jobs:ジョブ名」で、値はJSON化しておく
redis.rpush('my_app_namespace:jobs:send_email', value.to_json)
# 「名前空間:known_jobs」にジョブ名を入れておく
redis.sadd('my_app_namespace:known_jobs', 'send_email')

今回は、先ほどのサンプルのEnqueueをRubyで呼び出してみました。workで動作させることが目的であるため、Redisに格納するキーと値は、workの仕様に基づいたものになります。
valueのidは、12バイトの乱数をHex文字列で渡します。tは現在時刻をタイムスタンプで渡します。
Redisを使用しているので、Enqueueする側の自由度はかなり高いですね。

WebUI

同じくgocraftが提供している「workwebui」を導入すると、ジョブの状況について、WebUIで可視化できます。

workwebuiの導入〜起動

「go get」した後、「go install」することで使用可能になります。

$ go get github.com/gocraft/work/cmd/workwebui
$ go install github.com/gocraft/work/cmd/workwebui

下記コマンドでWebUIが起動します。

$ workwebui -redis="redis:6379" -ns="work" -listen=":5040"

起動できたらブラウザから「http://localhost:5040/」にアクセスしてください。

先ほどのサンプルをWebUIで見てみる


このように、実行中のジョブを確認することができます。処理中のジョブ、再施行されたジョブ、スケジュールされた(Cronライクな)ジョブ等、各ジョブの状況ごとに表示することができます。

さいごに

workはシンプルなインターフェースでありながら、柔軟性の高いライブラリでした。
Redisを使用しているため、エンキュー側は何でもよく、ワーカー側もContextへの実装により様々なジョブ処理が可能です。
Goでバックグラウンドジョブを実装する際は、ぜひ使って見てください。

Go記事の連載などは、こちらをご覧ください。

おすすめ書籍

      

カイザー

シェア
執筆者:
カイザー
タグ: golangGo言語

最近の投稿

フロントエンドで動画デコレーション&レンダリング

はじめに 今回は、以下のように…

4週間 前

Goのクエリビルダー goqu を使ってみる

はじめに 最近携わっているとあ…

1か月 前

【Xcode15】プライバシーマニフェスト対応に備えて

はじめに こんにちは、suzu…

2か月 前

FSMを使った状態管理をGoで実装する

はじめに 一般的なアプリケーシ…

3か月 前