しゅがーブログ

技術ネタとか書いていけたらな…

ResqueのFailed JobsをCLIで一気にrequeueする方法

TL; DR

障害などでResqueジョブがコケてしまい一気に再実行したい人向けのスクリプト
resque_webからだと1個ずつちまちま実行していかなければならない。
つまりここ

Failureジョブの取得方法について

ApplicationJobが継承されていない古いジョブもあったりするので、ActiveJob::QueueAdapters::ResqueAdapter::JobWrapperで分岐させています。
基本的にpayload出力結果が変わっていたり、エラー表示のさせかたが違います。

最終的には失敗ジョブ名 => 件数として出力される

offset = 0
limit = -1 # 全件取得の場合
Resque::Failure.all(offset, limit).
  map {|f| f.dig("payload", "class") == "ActiveJob::QueueAdapters::ResqueAdapter::JobWrapper" ? f.dig("payload", "args", 0, "job_class") : f.dig("payload", "class")}.
  each_with_object(Hash.new(0)) {|v, o| o[v] += 1}

{
    "HogehogeJob" => 100,
    "FugaFugaJob" => 100,
}

特定期間にスコープを絞る場合

failed_at という失敗日時が入るフィールドが存在するので、from toを用いてスコープを絞ることもできます。

offset = 0
limit = -1 # 全件取得の場合
from_failed_at = "20XX/XX/XX 00:00:00 JST"
to_failed_at = "2020/XX/XX 00:00:00 JST"
Resque::Failure.all(offset, limit).
  select {|f| from_failed_at < f.dig("failed_at") && f.dig("failed_at") < to_failed_at}.
  map {|f| f.dig("payload", "class") == "ActiveJob::QueueAdapters::ResqueAdapter::JobWrapper" ? f.dig("payload", "args", 0, "job_class") : f.dig("payload", "class")}.
  each_with_object(Hash.new(0)) {|v, o| o[v] += 1}

{
    "HogehogeJob" => 50,
    "FugaFugaJob" => 50,
}

対象ジョブの再実行

上記までで何が何件失敗したか把握できたと思います。
最後はどのジョブを再実行させるかです。

not_run_job_class = %w[FugaFugaJob] # 実行したくないクラスを指定

offset = 0
limit = -1 # 全件取得の場合
from_failed_at = "20XX/XX/XX 00:00:00 JST"
to_failed_at = "2020/XX/XX 00:00:00 JST"

# 対象のjobを取得
failed_jobs = Resque::Failure.all(offset, limit).
  each_with_object({}).with_index do |(f, h), i|
    job_class = f.dig("payload", "class") == "ActiveJob::QueueAdapters::ResqueAdapter::JobWrapper" ? f.dig("payload", "args", 0, "job_class") : f.dig("payload", "class")
    next if not_run_job_class.include?(job_class)
    next unless from_failed_at < f.dig("failed_at") && f.dig("failed_at") < to_failed_at

    h[i] = job_class
  end

{
    1 => "HogehogeJob",
    2 => "HogehogeJob",
    3 => "HogehogeJob"
}

対象ジョブをfailed_jobsに放り込んだので、ここからは実行に問題ないかチェック

# 再実行件数確認
failed_jobs.count
=> 50

# 実行されるジョブ確認
failed_jobs.values.uniq
[
    [0] "HogehogeJob"
]

# 内訳
failed_jobs.values.each_with_object(Hash.new(0)) {|v, o| o[v] += 1}

問題なければ再実行
実行結果の確認用に一番最後 + 1のindexを取得しておく。

from_index = Resque::Failure.count
failed_jobs.keys.each do |i|
  Resque::Failure.requeue(i)
end

実行結果
空のhashが返ってくれば問題なく再実行 or キューに放り込まれているはず。

offset = from_index
limit = -1 # 全件取得の場合

Resque::Failure.all(offset, limit).
  map {|f| f.dig("payload", "class") == "ActiveJob::QueueAdapters::ResqueAdapter::JobWrapper" ? f.dig("payload", "args", 0, "job_class") : f.dig("payload", "class")}.
  each_with_object(Hash.new(0)) {|v, o| o[v] += 1}

=> {}

最後に

こんな感じで実行すれば障害発生時に慌てずジョブの再実行ができます。
Sidekiqの方が色々便利なので、できれば移行した方がよさそうですね。