しゅがーブログ

技術ネタとか書いていけたらな…

【AWS】Step Functions使ってみた。

Overview

バッチ処理をコンテナ上で動作させるための仕組みを作成しました。
コンテナ起動に失敗することが何度か発生していたので、回避するためStep Functionsを利用することにしました。

Step Functionsとは?

分散アプリケーションの構築、IT およびビジネスプロセスの自動化、AWS のサービスを利用したデータと機械学習のパイプラインの構築に使用するローコードのビジュアルワークフローサービスです。

ちょっと難しいですね。 自分なりに一言で言うならマネージドサービスを束ねてシームレスな処理を作れるサービス!ですかね。

作成したワークフロー

コンテナ起動に失敗するケース対応できるフローです。
試行回数のループはAWS公式リファレンスを参考にして作成しています。

  1. 試行回数の設定
  2. Fargateタスク起動
  3. 実行結果をハンドリング
  4. 失敗時
    1. 試行回数をカウント
    2. 試行回数到達チェック
      1. 試行回数到達時はエラー通知
    3. 2.へ戻る
  5. 成功時
    1. 処理完了

ステートマシン

マネージドコンソールからステートマシンを確認できるので便利です。
f:id:app_engineer:20220315190634p:plain

参考例

ステートマシンのコードサンプルです。
States.Formatを使った埋め込みは少し分かりづらかったりするので、少し注意が必要です。

  StepFunctions:
    Type: AWS::StepFunctions::StateMachine
    Properties:
      DefinitionString: !Sub
        - |-
          {
            "Comment": "Fargate実行サンプル",
            "StartAt": "ConfigureCount",
            "States": {
              "ConfigureCount": {
                  "Type": "Pass",
                  "Result": {
                      "count": 2,
                      "index": 0,
                      "step": 1
                  },
                  "ResultPath": "$.iterator",
                  "Next": "Fargate Task"
              },
              "Fargate Task": {
                "Type": "Task",
                "Resource": "arn:aws:states:::ecs:runTask.sync",
                "Parameters": {
                    略
                  },
                  "Overrides": {
                    "ContainerOverrides": [
                      {
                        "Name": "コンテナ名",
                        "Command.$": "$.commands" # 引数を渡してFargate上でジョブを実行
                      }
                    ]
                  }
                },
                "Catch": [
                  {
                    "ErrorEquals": [
                      "States.ALL"
                    ],
                    "ResultPath": "$.result",
                    "Next": "Error Handler"
                  }
                ],
                "ResultPath": "$.result",
                "Next": "Error Handler"
              },
              "Error Handler": {
                "Type": "Task",
                "Resource": "arn:aws:lambda:region:account_id:function:function_name",
                "Catch": [
                  {
                    "ErrorEquals": [
                      "Exception" # Lambda Functionから発生させるException
                    ],
                    "ResultPath": "$.result",
                    "Next": "Iterator"
                  }
                ],
                "ResultPath": "$.result",
                "Next": "Done"
              },
              "Iterator": {
                "Type": "Task",
                "Resource": "arn:aws:lambda:region:account_id:function:function_name",
                "ResultPath": "$.iterator",
                "Next": "IsCountReached"
              },
              "IsCountReached": {
                "Type": "Choice",
                "Choices": [
                  {
                      "Variable": "$.iterator.continue",
                      "BooleanEquals": true,
                      "Next": "Fargate Task"
                  }
                ],
                "Default": "Retry Failure"
              },
              "Done": {
                "Type": "Pass",
                "End": true
              },
              "Retry Failure": {
                "Type": "Task",
                "Resource": "arn:aws:states:::sns:publish",
                "Parameters": {
                  "Message.$": "States.Format('文字列連携サンプル. Command: {}', $.commands)",
                  "TopicArn": "arn:aws:sns:region:account_id:alert"
                },
                "End": true
              }
            }
          }
        - {
            埋め込み用パラメータ定義
          }
      LoggingConfiguration:
        Destinations:
          - CloudWatchLogsLogGroup:
              LogGroupArn: "target"
        IncludeExecutionData: "true"
        Level: ALL
      RoleArn: "target"
      StateMachineName: step-functions-sample
      StateMachineType: STANDARD

最後に

ちょっと雑なまとめ方ですがうまくいかない方の参考になれば幸いです。

【AWS】バッチ処理をFargateで実現するために

Overview

EC2上のcronで実行しているバッチ処理をECS(Fargate)に切り替えましたが何がベストなんだろうか。

途中で気がついた課題

CloudWatch Eventsの特性

結構色んな人がいっていますが公式リファレンスに記載の通り、特定のトリガーされたルールに対して同じターゲットを複数回起動したりする場合があります
これは1回しか実行したくないバッチ処理であっても複数回実行されてしまう可能性があるという話です。
回避策としては下記の2通りがあるかなと思っています。

Cron on EC2

従来のcronからFargateのタスクを実行する方法です。
AWS CLIにはECS用のrun-taskコマンドがあります。
run-taskコマンドをcron経由で実行することにより必ず1回しか実行されないように回避することができます。

しかしデメリットも存在します。
cron用のEC2を用意するということは1台だけで実行することになるのでSPOFとなってしまいます。
この辺りの可用性はトレードオフになるのでどこを重要視するか検討していく必要があります。

AWS Step Functionsを利用する方法

classmethodさんの記事がわかりやすいです。
EventBridge → Step Functions → ECSタスクのような構成にし、重複実行のチェックをLambdaで実施する方法です。
冪等性以外の観点でもエラーハンドリングを実施することができるのでマネージドのみの構成にする場合は現状一番良いソリューションなのではないでしょうか。
実装例でAWSの人のサンプルもありました。
今回については見送り Cron on EC2で実施しました。

まとめ

AWS Step Functionsはすごい万能そうですが、独自のASL(Amazon States Language)を書く必要があるみたいです。
今回は工数や学習コスト面から見送りましたが、近いうちに切り替えていきたいなと思いました。

【MongoDB】Passengerプロセスがリコネクトできない時の対処方法

Overview

MongoDBのPrimaryを切り替えたときにPassengerプロセスが切り替わったPrimaryへ接続できなかったのでそのときのまとめです。

環境

MongoDB: 3.6
mongoid: 6.4.2
Ruby MongoDB Driver: 2.8
Rails: 5.2

なぜリコネクトできないのか

公式リファレンスによると、、、

When a process forks, Ruby threads are not transfered to the child processes and the Ruby driver Client objects lose their background monitoring. The close/reconnect pattern described here should be used with Ruby driver version 2.6.2 or higher. Previous driver versions did not recreate monitoring threads when reconnecting.

子プロセス側に監視スレッドが引き継がれないので、フォーク時にリコネクトするようにし子プロセス側でも監視スレッドが立ち上がるよう対応してね、ということみたい。

追加する設定

Passengerの場合はこんな感じに書く
config.ruconfig/application.rb のようなアプリケーション起動時に読み込まれるファイルへ記述すること。
※ 久しぶりに見るとpassengerの設定も追加されていた。

if defined?(PhusionPassenger)
  PhusionPassenger.on_event(:starting_worker_process) do |forked|
    if forked
      Mongoid::Clients.clients.each do |_name, client|
        client.close
        client.reconnect
      end
    end
  end
end

実際にPrimary切り替え

事前にレプリカセットを組んでいることが前提です。

Primaryへログインしmongo shellを起動し、priorityをどのノードよりも高く設定します。

$ mongo データベース名
> conf = rs.conf()
> conf.members[0].priority = 128
rs.reconfig(conf)

切り替え後にアプリケーションログを見てみると、正常に切り替わり後続処理が問題なく完了していることが確認できると思います。

まとめ

上記設定を追加後にPrimaryを切り替えてみると、問題なくリコネクトできていることが確認できると思います。
もちろん書き込み先が変わるのでわずかに書き込みエラーは発生しますが、読み込みエラーなどは限りなく少なくなります。

Capistrano Tasks内でRuby処理を書く際に気をつけること

Overview

Capistrano Tasks内でプレーンなRuby処理を記述したときにどうなるかをまとめです。

前提

capistrano実行場所: Macbookローカル, ユーザ: hoge-man
リモートサーバ: Amazon Linux 2 on EC2, ユーザ: ec2-user

サンプル

以下のようなsampleタスクがあったとします。

executeはリモートサーバに対して処理を実行できます。
では、putsで書いたコードはどこで処理されるのでしょうか?
実際にどのユーザが実行しているか確認していきます。

task :sample do
  on roles(:all) do
    execute "echo hoge"
    puts "hoge"
  end
end

whoami

loggingさせるためinfoで実行ユーザが誰か確認します。

task :sample do
  on roles(:all) do
    info "#{whoami}" ← New
    execute "echo hoge"
  end
end

result

リモートサーバのec2-userが処理を実行していることがわかります。

00:01 sample
      ec2-user ← infoのlogging
      01 hoge
    ✔ 01 ec2-user@ip-XX-XX-XX-XX.ap-northeast-1.compute.internal 0.131s

Ruby処理

どのユーザが実行しているか確認するためのコードを追記します。

task :sample do
  on roles(:all) do
    puts "hoge"
    Process.uid ← 実行ユーザのプロセスIDを出力
    ENV['USER'] ← 実行ユーザ名を出力
  end
end

result

先程とは違いリモートサーバで実行された形跡がなく、実行場所であるローカルのユーザが出力されます。

00:01 sample
hoge
502
hoge-man

まとめ

とあるタスクにてプレーンなRubyがそのまま書かれていたので今回のような確認をしてみました。
localhostからcapistranoを実行する分にはそのまま記述して問題ありませんが、複数ホストにまたがる場合はタスクが失敗することがあります。
基本的にはプレーンなRuby処理はラップしておいて、execute経由のrails runnerなどで実行した方がいいのかなと思いました。

aws-vault使ってらくらくAWS CLI

Overview

ローカルからaws cliをセキュアに使いたい人向け
MFAで認証させてから利用します。
※この方法ではMFA用のアクセスキー/シークレットキーを発行します。
※手順はmacOS用です。

事前作業

AWS CLIのインストール

まずはこちらの手順にてaws cliをインストールします。
docs.aws.amazon.com

Session Manager Pluginのインストール

続いても手順にてSession Manager Pluginをインストールします。 docs.aws.amazon.com

aws-vaultのインストール

brew install --cask aws-vault

IAMから対象ユーザのアクセスキー/シークレットキーを発行

IAMユーザ対象ユーザ認証情報タブアクセスキーの作成
ここで発行したアクセスキー/シークレットキーは控えておく。

MFA用ポリシーを作成

適当な名前で下記アクションが許可されたポリシーを作成し、対象ユーザへポリシーを割り当てる。
※手動で管理しきれないと思うのでCFnで一元管理を推奨。

ResyncMFADevice
DeactivateMFADevice
EnableMFADevice
CreateVirtualMFADevice
DeleteVirtualMFADevice

プロファイル作成

aws-vault add プロファイル名
プロファイル名は相応しい命名を。

aws-vault add test
Enter Access Key ID: XXX
Enter Secret Access Key: YYY
Added credentials to profile "test" in vault

XXX: 発行済アクセスキーを入力
YYY: 発行済シークレットキーを入力

そしてプロファイル用の妥当なパスワードを入力。
f:id:app_engineer:20210712164033p:plain

プロファイルとMFAの紐付け

[profile test]
mfa_serial=arn:aws:iam::xxxxxxxxxxxxxxxx:mfa/対象ユーザ

実行方法

aws-vault exec test -- コマンド で実行できる。

aws-vault exec test -- aws ssm start-session --target 対象インスタンス --region 対象リージョン

初回アクセス時にMFAトークンを入力後、キーチェーンのダイアログが出力されるので登録したパスワードを入力。
あとはssmのコンソールに繋がるので、良しなにご利用ください。

参考

aws-vault についてのあれこれ - Qiita

GitHub - 99designs/aws-vault: A vault for securely storing and accessing AWS credentials in development environments

【Rails】MongoDBとMySQLそれぞれへのindexの貼り方とか

Overview

いつも仕事でメインはMongoDB、サブはMySQLみたいな使い方をしていて、MySQLってどうやるんだっけ?と忘れることがあるのでその備忘。

MySQL

みなさんご存知のRDBです。
migrateを実行してDBに各種定義を反映していきます。

table & column作成

bundle exec rails g migration AddHogehogeTable 上記で作成されたmigrateファイルを編集します。

class AddHogehogeTable < ActiveRecord::Migration[5.X]
  def change
      create_table :hogehoges do |t|
         t.string :fugafuga
      end
  end
end

index作成

class AddHogehogeTable < ActiveRecord::Migration[5.X]
  def change
      create_table :hogehoges do |t|
         t.string :fugafuga
      end

      add_index :hogehoges, :fugafuga ← NEW
  end
end

migrate実行

ステータスの確認をしてupになっていれば問題なし。
あとはrails consoleからデータを作成できるかも確認しましょう。
bundle exec rake db:migrate
bundle exec rake db:migrate:status

MongoDB

NoSQL Databaseです。
基本モデルと連動しているので、再読み込みさせることによって各種定義が反映されます。

table(collection)作成

rails g model Hogehoge

column(field)作成

hogehogeファイルを開いて、下記を記載するだけ。
includeしているモジュールはmongoid(ORM)経由でMongoDBに定義するため必要なものになります。

class Hogehoge
  include Mongoid::Document

  field :fugafuga
end

index作成

# ターミナルにてrails consoleに入ります
$ bundle exec rails console -e development
略
pry(main)> Hogehoge.create_indexs # index作成
pry(main)> Hogehoge.collection.indexes.to_a # 作成されたindexが確認できる

まとめ

MongoDBはschema定義がそのままモデルに記載されているので、MySQLより簡単にDB定義を変更できます。
ですが、トランザクションがないため整合性を重要視するプロダクトには向かないかもです。(一応MongoDB4.0からトランザクション機能が増えたので、かなりよくなっているかもですが)
MySQLトランザクションがあるので、整合性に関しても安心できそうです。
その分こういうのがあったりしますが。
schema定義はそのままschema.rbにあるので見やすいですが、ある程度MongoDBに慣れてしまうとmigrate実行漏れがあったりするので気をつけましょう。

ResqueのFailed JobsをCLIで一気にrequeueする方法

TL; DR

障害などでResqueジョブがコケてしまい一気に再実行したい人向けのスクリプト
resque_webからだと1個ずつちまちま実行していかなければならない。
つまりここ

Failureジョブの取得方法について

ApplicationJobが継承されていない古いジョブもあったりするので、ActiveJob::QueueAdapters::ResqueAdapter::JobWrapperで分岐させています。
基本的にpayload出力結果が変わっていたり、エラー表示のさせかたが違います。

最終的には失敗ジョブ名 => 件数として出力される

offset = 0
limit = -1 # 全件取得の場合
Resque::Failure.all(offset, limit).
  map {|f| f.dig("payload", "class") == "ActiveJob::QueueAdapters::ResqueAdapter::JobWrapper" ? f.dig("payload", "args", 0, "job_class") : f.dig("payload", "class")}.
  each_with_object(Hash.new(0)) {|v, o| o[v] += 1}

{
    "HogehogeJob" => 100,
    "FugaFugaJob" => 100,
}

特定期間にスコープを絞る場合

failed_at という失敗日時が入るフィールドが存在するので、from toを用いてスコープを絞ることもできます。

offset = 0
limit = -1 # 全件取得の場合
from_failed_at = "20XX/XX/XX 00:00:00 JST"
to_failed_at = "2020/XX/XX 00:00:00 JST"
Resque::Failure.all(offset, limit).
  select {|f| from_failed_at < f.dig("failed_at") && f.dig("failed_at") < to_failed_at}.
  map {|f| f.dig("payload", "class") == "ActiveJob::QueueAdapters::ResqueAdapter::JobWrapper" ? f.dig("payload", "args", 0, "job_class") : f.dig("payload", "class")}.
  each_with_object(Hash.new(0)) {|v, o| o[v] += 1}

{
    "HogehogeJob" => 50,
    "FugaFugaJob" => 50,
}

対象ジョブの再実行

上記までで何が何件失敗したか把握できたと思います。
最後はどのジョブを再実行させるかです。

not_run_job_class = %w[FugaFugaJob] # 実行したくないクラスを指定

offset = 0
limit = -1 # 全件取得の場合
from_failed_at = "20XX/XX/XX 00:00:00 JST"
to_failed_at = "2020/XX/XX 00:00:00 JST"

# 対象のjobを取得
failed_jobs = Resque::Failure.all(offset, limit).
  each_with_object({}).with_index do |(f, h), i|
    job_class = f.dig("payload", "class") == "ActiveJob::QueueAdapters::ResqueAdapter::JobWrapper" ? f.dig("payload", "args", 0, "job_class") : f.dig("payload", "class")
    next if not_run_job_class.include?(job_class)
    next unless from_failed_at < f.dig("failed_at") && f.dig("failed_at") < to_failed_at

    h[i] = job_class
  end

{
    1 => "HogehogeJob",
    2 => "HogehogeJob",
    3 => "HogehogeJob"
}

対象ジョブをfailed_jobsに放り込んだので、ここからは実行に問題ないかチェック

# 再実行件数確認
failed_jobs.count
=> 50

# 実行されるジョブ確認
failed_jobs.values.uniq
[
    [0] "HogehogeJob"
]

# 内訳
failed_jobs.values.each_with_object(Hash.new(0)) {|v, o| o[v] += 1}

問題なければ再実行
実行結果の確認用に一番最後 + 1のindexを取得しておく。

from_index = Resque::Failure.count
failed_jobs.keys.each do |i|
  Resque::Failure.requeue(i)
end

実行結果
空のhashが返ってくれば問題なく再実行 or キューに放り込まれているはず。

offset = from_index
limit = -1 # 全件取得の場合

Resque::Failure.all(offset, limit).
  map {|f| f.dig("payload", "class") == "ActiveJob::QueueAdapters::ResqueAdapter::JobWrapper" ? f.dig("payload", "args", 0, "job_class") : f.dig("payload", "class")}.
  each_with_object(Hash.new(0)) {|v, o| o[v] += 1}

=> {}

最後に

こんな感じで実行すれば障害発生時に慌てずジョブの再実行ができます。
Sidekiqの方が色々便利なので、できれば移行した方がよさそうですね。