【AWS】Step Functions使ってみた。
Overview
バッチ処理をコンテナ上で動作させるための仕組みを作成しました。
コンテナ起動に失敗することが何度か発生していたので、回避するためStep Functionsを利用することにしました。
Step Functionsとは?
分散アプリケーションの構築、IT およびビジネスプロセスの自動化、AWS のサービスを利用したデータと機械学習のパイプラインの構築に使用するローコードのビジュアルワークフローサービスです。
ちょっと難しいですね。 自分なりに一言で言うならマネージドサービスを束ねてシームレスな処理を作れるサービス!ですかね。
作成したワークフロー
コンテナ起動に失敗するケース対応できるフローです。
試行回数のループはAWS公式リファレンスを参考にして作成しています。
- 試行回数の設定
- Fargateタスク起動
- 実行結果をハンドリング
- 失敗時
- 試行回数をカウント
- 試行回数到達チェック
- 試行回数到達時はエラー通知
- 2.へ戻る
- 成功時
- 処理完了
ステートマシン
マネージドコンソールからステートマシンを確認できるので便利です。
参考例
ステートマシンのコードサンプルです。
States.Formatを使った埋め込みは少し分かりづらかったりするので、少し注意が必要です。
StepFunctions: Type: AWS::StepFunctions::StateMachine Properties: DefinitionString: !Sub - |- { "Comment": "Fargate実行サンプル", "StartAt": "ConfigureCount", "States": { "ConfigureCount": { "Type": "Pass", "Result": { "count": 2, "index": 0, "step": 1 }, "ResultPath": "$.iterator", "Next": "Fargate Task" }, "Fargate Task": { "Type": "Task", "Resource": "arn:aws:states:::ecs:runTask.sync", "Parameters": { 略 }, "Overrides": { "ContainerOverrides": [ { "Name": "コンテナ名", "Command.$": "$.commands" # 引数を渡してFargate上でジョブを実行 } ] } }, "Catch": [ { "ErrorEquals": [ "States.ALL" ], "ResultPath": "$.result", "Next": "Error Handler" } ], "ResultPath": "$.result", "Next": "Error Handler" }, "Error Handler": { "Type": "Task", "Resource": "arn:aws:lambda:region:account_id:function:function_name", "Catch": [ { "ErrorEquals": [ "Exception" # Lambda Functionから発生させるException ], "ResultPath": "$.result", "Next": "Iterator" } ], "ResultPath": "$.result", "Next": "Done" }, "Iterator": { "Type": "Task", "Resource": "arn:aws:lambda:region:account_id:function:function_name", "ResultPath": "$.iterator", "Next": "IsCountReached" }, "IsCountReached": { "Type": "Choice", "Choices": [ { "Variable": "$.iterator.continue", "BooleanEquals": true, "Next": "Fargate Task" } ], "Default": "Retry Failure" }, "Done": { "Type": "Pass", "End": true }, "Retry Failure": { "Type": "Task", "Resource": "arn:aws:states:::sns:publish", "Parameters": { "Message.$": "States.Format('文字列連携サンプル. Command: {}', $.commands)", "TopicArn": "arn:aws:sns:region:account_id:alert" }, "End": true } } } - { 埋め込み用パラメータ定義 } LoggingConfiguration: Destinations: - CloudWatchLogsLogGroup: LogGroupArn: "target" IncludeExecutionData: "true" Level: ALL RoleArn: "target" StateMachineName: step-functions-sample StateMachineType: STANDARD
最後に
ちょっと雑なまとめ方ですがうまくいかない方の参考になれば幸いです。
【AWS】バッチ処理をFargateで実現するために
Overview
EC2上のcronで実行しているバッチ処理をECS(Fargate)に切り替えましたが何がベストなんだろうか。
途中で気がついた課題
CloudWatch Eventsの特性
結構色んな人がいっていますが公式リファレンスに記載の通り、特定のトリガーされたルールに対して同じターゲットを複数回起動したりする場合があります
これは1回しか実行したくないバッチ処理であっても複数回実行されてしまう可能性があるという話です。
回避策としては下記の2通りがあるかなと思っています。
Cron on EC2
従来のcronからFargateのタスクを実行する方法です。
AWS CLIにはECS用のrun-task
コマンドがあります。
run-task
コマンドをcron経由で実行することにより必ず1回しか実行されないように回避することができます。
しかしデメリットも存在します。
cron用のEC2を用意するということは1台だけで実行することになるのでSPOFとなってしまいます。
この辺りの可用性はトレードオフになるのでどこを重要視するか検討していく必要があります。
AWS Step Functionsを利用する方法
classmethodさんの記事がわかりやすいです。
EventBridge → Step Functions → ECSタスクのような構成にし、重複実行のチェックをLambdaで実施する方法です。
冪等性以外の観点でもエラーハンドリングを実施することができるのでマネージドのみの構成にする場合は現状一番良いソリューションなのではないでしょうか。
実装例でAWSの人のサンプルもありました。
今回については見送り Cron on EC2で実施しました。
まとめ
AWS Step Functionsはすごい万能そうですが、独自のASL(Amazon States Language)を書く必要があるみたいです。
今回は工数や学習コスト面から見送りましたが、近いうちに切り替えていきたいなと思いました。
【MongoDB】Passengerプロセスがリコネクトできない時の対処方法
Overview
MongoDBのPrimaryを切り替えたときにPassengerプロセスが切り替わったPrimaryへ接続できなかったのでそのときのまとめです。
環境
MongoDB: 3.6
mongoid: 6.4.2
Ruby MongoDB Driver: 2.8
Rails: 5.2
なぜリコネクトできないのか
公式リファレンスによると、、、
When a process forks, Ruby threads are not transfered to the child processes and the Ruby driver Client objects lose their background monitoring. The close/reconnect pattern described here should be used with Ruby driver version 2.6.2 or higher. Previous driver versions did not recreate monitoring threads when reconnecting.
子プロセス側に監視スレッドが引き継がれないので、フォーク時にリコネクトするようにし子プロセス側でも監視スレッドが立ち上がるよう対応してね、ということみたい。
追加する設定
Passengerの場合はこんな感じに書く
config.ru
や config/application.rb
のようなアプリケーション起動時に読み込まれるファイルへ記述すること。
※ 久しぶりに見るとpassengerの設定も追加されていた。
if defined?(PhusionPassenger) PhusionPassenger.on_event(:starting_worker_process) do |forked| if forked Mongoid::Clients.clients.each do |_name, client| client.close client.reconnect end end end end
実際にPrimary切り替え
事前にレプリカセットを組んでいることが前提です。
Primaryへログインしmongo shellを起動し、priorityをどのノードよりも高く設定します。
$ mongo データベース名 > conf = rs.conf() > conf.members[0].priority = 128 rs.reconfig(conf)
切り替え後にアプリケーションログを見てみると、正常に切り替わり後続処理が問題なく完了していることが確認できると思います。
まとめ
上記設定を追加後にPrimaryを切り替えてみると、問題なくリコネクトできていることが確認できると思います。
もちろん書き込み先が変わるのでわずかに書き込みエラーは発生しますが、読み込みエラーなどは限りなく少なくなります。
Capistrano Tasks内でRuby処理を書く際に気をつけること
Overview
Capistrano Tasks内でプレーンなRuby処理を記述したときにどうなるかをまとめです。
前提
capistrano実行場所: Macbookローカル, ユーザ: hoge-man
リモートサーバ: Amazon Linux 2 on EC2, ユーザ: ec2-user
サンプル
以下のようなsampleタスクがあったとします。
executeはリモートサーバに対して処理を実行できます。
では、puts
で書いたコードはどこで処理されるのでしょうか?
実際にどのユーザが実行しているか確認していきます。
task :sample do on roles(:all) do execute "echo hoge" puts "hoge" end end
whoami
loggingさせるためinfoで実行ユーザが誰か確認します。
task :sample do on roles(:all) do info "#{whoami}" ← New execute "echo hoge" end end
result
リモートサーバのec2-userが処理を実行していることがわかります。
00:01 sample ec2-user ← infoのlogging 01 hoge ✔ 01 ec2-user@ip-XX-XX-XX-XX.ap-northeast-1.compute.internal 0.131s
Ruby処理
どのユーザが実行しているか確認するためのコードを追記します。
task :sample do on roles(:all) do puts "hoge" Process.uid ← 実行ユーザのプロセスIDを出力 ENV['USER'] ← 実行ユーザ名を出力 end end
result
先程とは違いリモートサーバで実行された形跡がなく、実行場所であるローカルのユーザが出力されます。
00:01 sample hoge 502 hoge-man
まとめ
とあるタスクにてプレーンなRubyがそのまま書かれていたので今回のような確認をしてみました。
localhostからcapistranoを実行する分にはそのまま記述して問題ありませんが、複数ホストにまたがる場合はタスクが失敗することがあります。
基本的にはプレーンなRuby処理はラップしておいて、execute
経由のrails runner
などで実行した方がいいのかなと思いました。
aws-vault使ってらくらくAWS CLI
Overview
ローカルからaws cliをセキュアに使いたい人向け
MFAで認証させてから利用します。
※この方法ではMFA用のアクセスキー/シークレットキーを発行します。
※手順はmacOS用です。
事前作業
AWS CLIのインストール
まずはこちらの手順にてaws cliをインストールします。
docs.aws.amazon.com
Session Manager Pluginのインストール
続いても手順にてSession Manager Pluginをインストールします。 docs.aws.amazon.com
aws-vaultのインストール
brew install --cask aws-vault
IAMから対象ユーザのアクセスキー/シークレットキーを発行
IAM
→ ユーザ
→ 対象ユーザ
→ 認証情報タブ
→ アクセスキーの作成
ここで発行したアクセスキー/シークレットキーは控えておく。
MFA用ポリシーを作成
適当な名前で下記アクションが許可されたポリシーを作成し、対象ユーザへポリシーを割り当てる。
※手動で管理しきれないと思うのでCFnで一元管理を推奨。
ResyncMFADevice DeactivateMFADevice EnableMFADevice CreateVirtualMFADevice DeleteVirtualMFADevice
プロファイル作成
aws-vault add プロファイル名
プロファイル名は相応しい命名を。
aws-vault add test Enter Access Key ID: XXX Enter Secret Access Key: YYY Added credentials to profile "test" in vault
XXX: 発行済アクセスキーを入力
YYY: 発行済シークレットキーを入力
そしてプロファイル用の妥当なパスワードを入力。
プロファイルとMFAの紐付け
[profile test] mfa_serial=arn:aws:iam::xxxxxxxxxxxxxxxx:mfa/対象ユーザ
実行方法
aws-vault exec test -- コマンド
で実行できる。
aws-vault exec test -- aws ssm start-session --target 対象インスタンス --region 対象リージョン
初回アクセス時にMFAトークンを入力後、キーチェーンのダイアログが出力されるので登録したパスワードを入力。
あとはssmのコンソールに繋がるので、良しなにご利用ください。
参考
【Rails】MongoDBとMySQLそれぞれへのindexの貼り方とか
Overview
いつも仕事でメインはMongoDB、サブはMySQLみたいな使い方をしていて、MySQLってどうやるんだっけ?と忘れることがあるのでその備忘。
MySQL
みなさんご存知のRDBです。
migrateを実行してDBに各種定義を反映していきます。
table & column作成
bundle exec rails g migration AddHogehogeTable
上記で作成されたmigrateファイルを編集します。
class AddHogehogeTable < ActiveRecord::Migration[5.X] def change create_table :hogehoges do |t| t.string :fugafuga end end end
index作成
class AddHogehogeTable < ActiveRecord::Migration[5.X] def change create_table :hogehoges do |t| t.string :fugafuga end add_index :hogehoges, :fugafuga ← NEW end end
migrate実行
ステータスの確認をしてupになっていれば問題なし。
あとはrails consoleからデータを作成できるかも確認しましょう。
bundle exec rake db:migrate
bundle exec rake db:migrate:status
MongoDB
NoSQL Databaseです。
基本モデルと連動しているので、再読み込みさせることによって各種定義が反映されます。
table(collection)作成
rails g model Hogehoge
column(field)作成
hogehogeファイルを開いて、下記を記載するだけ。
includeしているモジュールはmongoid(ORM)経由でMongoDBに定義するため必要なものになります。
class Hogehoge include Mongoid::Document field :fugafuga end
index作成
# ターミナルにてrails consoleに入ります $ bundle exec rails console -e development 略 pry(main)> Hogehoge.create_indexs # index作成 pry(main)> Hogehoge.collection.indexes.to_a # 作成されたindexが確認できる
まとめ
MongoDBはschema定義がそのままモデルに記載されているので、MySQLより簡単にDB定義を変更できます。
ですが、トランザクションがないため整合性を重要視するプロダクトには向かないかもです。(一応MongoDB4.0からトランザクション機能が増えたので、かなりよくなっているかもですが)
MySQLはトランザクションがあるので、整合性に関しても安心できそうです。
その分こういうのがあったりしますが。
schema定義はそのままschema.rb
にあるので見やすいですが、ある程度MongoDBに慣れてしまうとmigrate実行漏れがあったりするので気をつけましょう。
ResqueのFailed JobsをCLIで一気にrequeueする方法
TL; DR
障害などでResqueジョブがコケてしまい一気に再実行したい人向けのスクリプト
resque_web
からだと1個ずつちまちま実行していかなければならない。
つまりここ
Failureジョブの取得方法について
ApplicationJob
が継承されていない古いジョブもあったりするので、ActiveJob::QueueAdapters::ResqueAdapter::JobWrapper
で分岐させています。
基本的にpayload出力結果が変わっていたり、エラー表示のさせかたが違います。
最終的には失敗ジョブ名 => 件数として出力される
offset = 0 limit = -1 # 全件取得の場合 Resque::Failure.all(offset, limit). map {|f| f.dig("payload", "class") == "ActiveJob::QueueAdapters::ResqueAdapter::JobWrapper" ? f.dig("payload", "args", 0, "job_class") : f.dig("payload", "class")}. each_with_object(Hash.new(0)) {|v, o| o[v] += 1} { "HogehogeJob" => 100, "FugaFugaJob" => 100, }
特定期間にスコープを絞る場合
failed_at
という失敗日時が入るフィールドが存在するので、from toを用いてスコープを絞ることもできます。
offset = 0 limit = -1 # 全件取得の場合 from_failed_at = "20XX/XX/XX 00:00:00 JST" to_failed_at = "2020/XX/XX 00:00:00 JST" Resque::Failure.all(offset, limit). select {|f| from_failed_at < f.dig("failed_at") && f.dig("failed_at") < to_failed_at}. map {|f| f.dig("payload", "class") == "ActiveJob::QueueAdapters::ResqueAdapter::JobWrapper" ? f.dig("payload", "args", 0, "job_class") : f.dig("payload", "class")}. each_with_object(Hash.new(0)) {|v, o| o[v] += 1} { "HogehogeJob" => 50, "FugaFugaJob" => 50, }
対象ジョブの再実行
上記までで何が何件失敗したか把握できたと思います。
最後はどのジョブを再実行させるかです。
not_run_job_class = %w[FugaFugaJob] # 実行したくないクラスを指定 offset = 0 limit = -1 # 全件取得の場合 from_failed_at = "20XX/XX/XX 00:00:00 JST" to_failed_at = "2020/XX/XX 00:00:00 JST" # 対象のjobを取得 failed_jobs = Resque::Failure.all(offset, limit). each_with_object({}).with_index do |(f, h), i| job_class = f.dig("payload", "class") == "ActiveJob::QueueAdapters::ResqueAdapter::JobWrapper" ? f.dig("payload", "args", 0, "job_class") : f.dig("payload", "class") next if not_run_job_class.include?(job_class) next unless from_failed_at < f.dig("failed_at") && f.dig("failed_at") < to_failed_at h[i] = job_class end { 1 => "HogehogeJob", 2 => "HogehogeJob", 3 => "HogehogeJob" }
対象ジョブをfailed_jobs
に放り込んだので、ここからは実行に問題ないかチェック
# 再実行件数確認 failed_jobs.count => 50 # 実行されるジョブ確認 failed_jobs.values.uniq [ [0] "HogehogeJob" ] # 内訳 failed_jobs.values.each_with_object(Hash.new(0)) {|v, o| o[v] += 1}
問題なければ再実行
実行結果の確認用に一番最後 + 1のindexを取得しておく。
from_index = Resque::Failure.count failed_jobs.keys.each do |i| Resque::Failure.requeue(i) end
実行結果
空のhashが返ってくれば問題なく再実行 or キューに放り込まれているはず。
offset = from_index limit = -1 # 全件取得の場合 Resque::Failure.all(offset, limit). map {|f| f.dig("payload", "class") == "ActiveJob::QueueAdapters::ResqueAdapter::JobWrapper" ? f.dig("payload", "args", 0, "job_class") : f.dig("payload", "class")}. each_with_object(Hash.new(0)) {|v, o| o[v] += 1} => {}
最後に
こんな感じで実行すれば障害発生時に慌てずジョブの再実行ができます。
Sidekiqの方が色々便利なので、できれば移行した方がよさそうですね。