kikumotoのメモ帳

インフラ・ミドル周りを中心に、興味をもったことを適当な感じで。twitter : @takakiku

AWS Fargateで稼働するAtlantisから、GCPリソースを構築する - 修正版

google/internal/externalaccount: Adding metadata verification · golang/oauth2@ec4a9b2 · GitHub

の変更により、以下の仕組みは hashicorp/google バージョン v4.59.0 移行では動作しなくなりました。


AWS FargateでAtlantisを動かして、AWSリソースに対するTerraformのPlanのレビューやApplyをPR上で行なっていたのですが、GCPリソースに対しても必要になってきたので、その時に行った作業のメモ。

なお、AWS Fargateで稼働するAtlantisから、GCPリソースを構築する - 失敗編 - kikumotoのメモ帳 で一度環境を作ったのですが、やり方がまずかったので改めて取り組んだメモとなります。
こちらの記事だけで話しが完結するように、一部失敗編と同じ内容の記載はあります。

前提

前提としては、GCPの鍵ファイルを生成せずにやりたいというのがあります。
鍵ファイルの管理・ローテーションは煩わしいし、穴になる可能性もあるので。

タスクロール

Fargate上のタスクとしてAtlantis を動かしており、すでにタスクロールがあるのでこのタスクロールを利用します。

問題点と解決方法の概要

問題点

以下のGCP側の手順の最後で、構成ファイルをダウンロードするのですが、このファイルに関連する問題があります。

なお、構成ファイルは以下のようなものです。

{
  "type": "external_account",
  "audience": "//iam.googleapis.com/projects/123456789012/locations/global/workloadIdentityPools/atlantis/providers/xxxxxxxx",
  "subject_token_type": "urn:ietf:params:aws:token-type:aws4_request",
  "service_account_impersonation_url": "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/yyyyyyy@project-name.iam.gserviceaccount.com:generateAccessToken",
  "token_url": "https://sts.googleapis.com/v1/token",
  "credential_source": {
    "environment_id": "aws1",
    "region_url": "http://169.254.169.254/latest/meta-data/placement/availability-zone",
    "url": "http://169.254.169.254/latest/meta-data/iam/security-credentials",
    "regional_cred_verification_url": "https://sts.{region}.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15"
  }
}
問題1:シークレットをファイルとして扱えない

環境変数 GOOGLE_APPLICATION_CREDENTIALS に構成ファイルのパスを指定して利用します。
Google Cloud 上では、Google Cloud Secret Manager に登録したシークレットを、Cloud Run などでファイルパスとしてマウントできるのですが、AWSの Secrets Manager や Systems Manager Parameter Store に登録したシークレットをECS タスクにファイルパスとして提供する方法がありません。

構成ファイルは鍵ファイルほど秘匿性が高くないとは思っているのですが、それでもあまり漏らしたくもないので、シークレットとして扱いたいです。
なので、シークレットとして扱いつつ、ファイルとして扱う方法を考える必要があります。

問題2;認証構成ファイルのメタデータURLがEC2用

構成ファイルの credential_source -> url ですが、これは EC2 用のメターデータURLです。 一方でECSの場合は、

にあるように

なものとなります。
また、戻ってくる値も異なります。
このECS用のメタデータURLを、Terraformが(Terraform が利用しているGCPライブラリが)正しく処理できないという点が大きな問題です。

解決方針

問題1

これは、 qiita.com

を参考にさせていただきました。

  • シークレットにファイルの内容を保存
  • 該当シークレットを環境変数に指定(コンテナ定義)
  • 環境変数の内容をファイルに書き出す。

という感じです。

このために、カスタムな Atlantis イメージを作成します。

問題2

上記のEC2メタデータURLにアクセスすると、ロール名が返ってきます。続いて、メタデータURLに返ってきたロール名を付け加えて(例;http://169.254.169.254/latest/meta-data/iam/security-credentials/sampleEC2Role)アクセスすると、以下のような構造のJSONが得られます。

{
    "Code": "Success",
    "LastUpdated" : "<LAST_UPDATED_DATE>",
    "Type" : "AWS-HMAC",
    "AccessKeyId": "<ACCESS_KEY_ID>",
    "SecretAccessKey": "<SECRET_ACCESS_KEY>",
    "Token": "<SECURITY_TOKEN_STRING>",
    "Expiration": "<EXPIRATION_DATE>"
}

一方で、ECSメタデータURLにアクセスすると、その時点で以下のような構造のJSONが得られます。

{
    "AccessKeyId": "<ACCESS_KEY_ID>",
    "Expiration": "<EXPIRATION_DATE>",
    "RoleArn": "<TASK_ROLE_ARN>",
    "SecretAccessKey": "<SECRET_ACCESS_KEY>",
    "Token": "<SECURITY_TOKEN_STRING>"
}

ECSメタデータURLの情報からEC2メタデータURLが返すデータは生成ができそうなので、この変換をするWebアプリを、Atlantisと同じタスク内に別コンテナとして用意することにしました。
このWebアプリのエンドポイントを構成ファイルの credential_source -> url に指定するようにします。

対応作業

GCPGoogle Cloud)側

GCP側では、サービスアカウント、Workload Identity Pool の作成を行なっていきます。 ドキュメントとしては「Workload Identity 連携の構成」のところ。

必要なリソースをTerraformで書くと以下のような感じに。 aws_account_idaws_task_role_name は適宜、自分の環境のものを設定する感じです。

locals {
  aws_account_id     = "000000000000"
  aws_task_role_name = "ecs_task_role"
}

####
# Atlantis が利用するサービスアカウント

resource "google_service_account" "atlantis" {
  account_id  = "atlantis"
  description = "Service Account for Atlantis (Managed by Terraform)"
}

resource "google_project_iam_member" "atlantis" {
  project = var.gcp_project
  role    = "roles/editor"
  member  = "serviceAccount:${google_service_account.atlantis.email}"
}

####
# AWS 連携のための Workload Identity

resource "google_iam_workload_identity_pool" "atlantis" {
  workload_identity_pool_id = "atlantis"
  display_name              = "atlantis"
  description               = "Pool for Atlantis (Managed by Terraform)"
}

resource "google_iam_workload_identity_pool_provider" "atlantis" {
  workload_identity_pool_provider_id = "aws-atlantis"
  workload_identity_pool_id          = google_iam_workload_identity_pool.atlantis.workload_identity_pool_id

  aws {
    account_id = local.aws_account_id
  }

  attribute_mapping = {
    "google.subject"     = "assertion.arn"
    "attribute.aws_role" = "assertion.arn.contains('assumed-role') ? assertion.arn.extract('{account_arn}assumed-role/') + 'assumed-role/' + assertion.arn.extract('assumed-role/{role_name}/') : assertion.arn"
  }

  attribute_condition = "attribute.aws_role=='arn:aws:sts::${local.aws_account_id}:assumed-role/${local.aws_task_role_name}'"
}

####
# Workload Identity とサービスアカウントの関連付け

resource "google_service_account_iam_binding" "atlantis" {
  service_account_id = google_service_account.atlantis.id
  role               = "roles/iam.workloadIdentityUser"

  members = [
    "principalSet://iam.googleapis.com/${google_iam_workload_identity_pool.atlantis.name}/attribute.aws_role/arn:aws:sts::${local.aws_account_id}:assumed-role/${local.aws_task_role_name}"
  ]
}

これをapplyしたら、構成ファイルを取得します。 プール詳細画面の右側にあるペインで、接続済みサービスアカウント に行けば、ダウンロードできます。

AWS

パラメータストアの準備

問題1の解決の流れと、問題2の解決に関連したファイル内容の修正となります。

私の場合は、パラメータストアに構成ファイルのデータを保存し、それを GOOGLE_APPLICATION_CREDENTIALS_DATA 環境変数で取得できるようにして、 GOOGLE_APPLICATION_CREDENTIALS 環境変数の指すファイルに保存しています。

保存するデータについては、

  • credential_source -> region_url は削除
  • credential_source -> url は、http://127.0.0.1:8080/latest/meta-data/iam/security-credentials というようにしています。同居コンテナのWebアプリはローカルホストとして参照できるので、合わせてそのWebアプリがListenしているポートを指定している感じです。
カスタム Atlantis イメージの作成

構成ファイルの環境変数からのファイル化をするために、カスタム Atlantis Dockerイメージを作成します。

こんな wrapper.sh

#!/usr/bin/dumb-init /bin/sh
set -e

echo $GOOGLE_APPLICATION_CREDENTIALS_DATA | jq . > $GOOGLE_APPLICATION_CREDENTIALS

を用意して、Docerfile が以下のような感じにしています。

FROM ghcr.io/runatlantis/atlantis:latest

RUN apk update && apk add jq

COPY wrapper.sh /usr/local/bin/wrapper.sh

ENTRYPOINT ["wrapper.sh"]
CMD ["server"]

なお、wrapper.sh のところで、jq を通して $GOOGLE_APPLICATION_CREDENTIALS に書き出していますが、これは必須ではなくて ECS Exec で入って調査するような時に読みやすいようにしておきたかった、ぐらいな感じです。

そしてDockerビルドしたイメージをECRレポジトリに登録しておきます。

メタデータ変換Webアプリ

github.com

にWebアプリの実装を置いています。

これをDockerビルドしイメージをECRレポジトリに登録しておきます。

タスク定義・コンテナ定義

もとのタスク定義・コンテナ定義を修正します。

{
    "containerDefinitions": [
        {
            "name": "atlantis",
            "image": "<作成したカスタムAtlantisイメージのレポジトリURL>",
            "environment": [
-- snip --
                {
                    "name": "GOOGLE_APPLICATION_CREDENTIALS",
                    "value": "/path/to/gcp_credentials.json"
                }
            ],
            "secrets": [
-- snip --
                {
                    "name": "GOOGLE_APPLICATION_CREDENTIALS_DATA",
                    "valueFrom": "<パラメータストアのパラメータ名>"
                }
            ],
-- snip --
        },
        {
            "name": "imitate-ec2-metadata-url",
            "image": "<作成したメタデータ変換WebアプリのレポジトリURL>",
            "environment": [
                {
                    "name": "PORT",
                    "value": "8080"
                }
            ],
-- snip --
        }
    ],
-- snip --
}

これでデプロイします。

以上で、PR上でAtlantisからGCPにapplyできるようになります。

1つのAtlantisで複数クラウドを管理したい方の参考になれば!

補足:複数のGCPプロジェクトを対象とする場合

複数のGCPプロジェクトを対象とする場合は、上記で作成したサービスアカウント(のメールアドレス)に対して、各プロジェクトのIAMで権限付与すればOKです。

AWS Fargateで稼働するAtlantisから、GCPリソースを構築する - 失敗編

AWS FargateでAtlantisを動かして、AWSリソースに対するTerraformのPlanのレビューやApplyをPR上で行なっていたのですが、GCPリソースに対しても必要になってきたので、その時に行った作業のメモ。

2022.12.23: 下記の方法は、起動直後は動くが、AWS_SESSION_TOKENの有効期限が切れると動かなくなる。 完全にうっかりしてました。 現在、他の方法について検証中。 リベンジ編は果たしてあるか!?

前提

前提としては、GCPの鍵ファイルを生成せずにやりたいというのがあります。 鍵ファイルの管理・ローテーションは煩わしいし、穴になる可能性もあるので。

タスクロール

Fargate上のタスクとしてAtlantis を動かしていおり、すでにタスクロールがあるのでこのタスクロールを利用します。

GCPGoogle Cloud)側

GCP側では、サービスアカウント、Workload Identity Pool の作成を行なっていきます。 ドキュメントとしては「Workload Identity 連携の構成」のところ。

必要なリソースをTerraformで書くと以下のような感じに。 aws_account_idaws_task_role_name は適宜、自分の環境のものを設定する感じです。

locals {
  aws_account_id     = "000000000000"
  aws_task_role_name = "ecs_task_role"
}

####
# Atlantis が利用するサービスアカウント

resource "google_service_account" "atlantis" {
  account_id  = "atlantis"
  description = "Service Account for Atlantis (Managed by Terraform)"
}

resource "google_project_iam_member" "atlantis" {
  project = var.gcp_project
  role    = "roles/editor"
  member  = "serviceAccount:${google_service_account.atlantis.email}"
}

####
# AWS 連携のための Workload Identity

resource "google_iam_workload_identity_pool" "atlantis" {
  workload_identity_pool_id = "atlantis"
  display_name              = "atlantis"
  description               = "Pool for Atlantis (Managed by Terraform)"
}

resource "google_iam_workload_identity_pool_provider" "atlantis" {
  workload_identity_pool_provider_id = "aws-atlantis"
  workload_identity_pool_id          = google_iam_workload_identity_pool.atlantis.workload_identity_pool_id

  aws {
    account_id = local.aws_account_id
  }

  attribute_mapping = {
    "google.subject"     = "assertion.arn"
    "attribute.aws_role" = "assertion.arn.contains('assumed-role') ? assertion.arn.extract('{account_arn}assumed-role/') + 'assumed-role/' + assertion.arn.extract('assumed-role/{role_name}/') : assertion.arn"
  }

  attribute_condition = "attribute.aws_role=='arn:aws:sts::${local.aws_account_id}:assumed-role/${local.aws_task_role_name}'"
}

####
# Workload Identity とサービスアカウントの関連付け

resource "google_service_account_iam_binding" "atlantis" {
  service_account_id = google_service_account.atlantis.id
  role               = "roles/iam.workloadIdentityUser"

  members = [
    "principalSet://iam.googleapis.com/${google_iam_workload_identity_pool.atlantis.name}/attribute.aws_role/arn:aws:sts::${local.aws_account_id}:assumed-role/${local.aws_task_role_name}"
  ]
}

これをapplyしたら、構成ファイルを取得します。 プール詳細画面の右側にあるペインで、接続済みサービスアカウント に行けば、ダウンロードできます。

AWS

パラメータストアの準備

構成ファイルの場所は、環境変数 GOOGLE_APPLICATION_CREDENTIALS で指定するのですが、GCPのシークレットのようにファイルとしてみせるような設定ができないので、

qiita.com

のやり方を真似る感じにします。

私の場合は、パラメータストアに構成ファイルのデータを保存し、それを GOOGLE_APPLICATION_CREDENTIALS_DATA 環境変数で取得できるようにして、 GOOGLE_APPLICATION_CREDENTIALS 環境変数の指すファイルに保存しています。

なお、保存するデータですが、

blog.studysapuri.jp

にあるように、メタデータURLがECSタスクの場合は異なるので、credential_source の url, region_url を消して登録しています。

カスタム Atlantis イメージの作成

構成ファイルの環境変数からのファイル化と、ECSタスクのメタデータURLからAWSクレデンシャルを取得するために、カスタム Atlantis Dockerイメージを作成します

こんな wrapper.sh

#!/usr/bin/dumb-init /bin/sh
set -e

echo $GOOGLE_APPLICATION_CREDENTIALS_DATA | jq . > $GOOGLE_APPLICATION_CREDENTIALS

token_file=$(mktemp)
# https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-iam-roles.html
curl -sS "http://169.254.170.2$AWS_CONTAINER_CREDENTIALS_RELATIVE_URI" > "$token_file"
AWS_ACCESS_KEY_ID=$(jq -r ".AccessKeyId" "$token_file")
AWS_SECRET_ACCESS_KEY=$(jq -r ".SecretAccessKey" "$token_file")
AWS_SESSION_TOKEN=$(jq -r ".Token" "$token_file")
rm "$token_file"

export AWS_ACCESS_KEY_ID
export AWS_SECRET_ACCESS_KEY
export AWS_SESSION_TOKEN

exec docker-entrypoint.sh "$@"

を用意して、Docerfile が以下のような感じにしています。

FROM ghcr.io/runatlantis/atlantis:latest

RUN apk update && apk add jq

COPY wrapper.sh /usr/local/bin/wrapper.sh

ENTRYPOINT ["wrapper.sh"]
CMD ["server"]

なお、wrapper.sh のところで、jq を通して $GOOGLE_APPLICATION_CREDENTIALS に書き出していますが、これは必須ではなくて、パラメータストアに保存したデータが改行情報を失っているので、どうせ jq 入れていることもあり綺麗にしておこう、くらいな感じです。

DockerビルドしたイメージをECRレポジトリに登録しておきます。

タスク定義の更新

あとは、タスク定義において * GOOGLE_APPLICATION_CREDENTIALS * GOOGLE_APPLICATION_CREDENTIALS_DATA

環境変数を設定し(以下のような感じ)

イメージURLを登録したECRリポジトリのものに変更した上で、デプロイすればOK。

以上で、PR上でAtlantisからGCPにapplyできるようになります。

1つのAtlantisで複数クラウドを管理したい方の参考になれば!

補足:複数のGCPプロジェクトを対象とする場合

複数のGCPプロジェクトを対象とする場合は、上記で作成したサービスアカウント(のメールアドレス)に対して、各プロジェクトのIAMで権限付与すればOKです。

Terraformを使ってLooker Studio 用に Google Cloud サービス アカウントを設定する

基本的に、

support.google.com

の内容を Terraform で記述するとどうなるか、という記事です。

Looker Studio 用サービスアカウント

サービス アカウントを使ってデータにアクセスできるようにする、という点では以下のような Terraform になる。

resource "google_service_account" "looker-studio" {
  account_id  = "demo-lookerstudio"
  description = "Use for Looker Studio access to BigQuery (Managed by Terraform)"
}

resource "google_service_account_iam_binding" "looker-studio" {
  service_account_id = google_service_account.looker-studio.id
  role               = "roles/iam.serviceAccountTokenCreator"

  members = [
    "serviceAccount:service-XXX-888888888888@gcp-sa-datastudio.iam.gserviceaccount.com"
  ]
}

resource "google_project_iam_member" "looker-studio-bigquery-jobuser" {
  project = var.gcp_project
  role    = "roles/bigquery.jobUser"
  member  = "serviceAccount:${google_service_account.looker-studio.email}"
}

resource "google_bigquery_dataset_iam_member" "looker-studio" {
  dataset_id = "ZZZZZZ"
  role       = "roles/bigquery.dataViewer"
  member     = "serviceAccount:${google_service_account.looker-studio.email}"
}

serviceAccount:service-XXX-888888888888@gcp-sa-datastudio.iam.gserviceaccount.com は、Looker Studio サービスエージェントのメールアドレスとなります。 これは、本家ヘルプにもあるように Looker Studio サービス エージェントのヘルプページ で表示されるサービスエージェントのメールアドレスからコピーしてくるもの。

また、ここでは BigQuery のデータセット ZZZZZZ にアクセスできるように設定しています。最後の google_bigquery_dataset_iam_member リソースのところ。 ここは適宜、テーブルへのアクセスに絞るならそれに該当するように変更することになる。

ここまでの設定で、データソースのオーナーがそもそも強い権限を持っていれば、データの認証情報 を作成したサービスアカウントに変更可能である。 変更後は、そのデータソースを利用するレポートを開くと、BigQueryへのアクセスはサービスアカウントによって行われる。

ユーザロールを付与する

データソースのオーナーが一般ユーザ的な権限であれば、データの認証情報 をサービスアカウントに変更できない。

例えば

locals {
  test_user_email = "test@example.com"
}

resource "google_project_iam_member" "test-test-user" {
  project = var.gcp_project
  role    = "roles/bigquery.jobUser"
  member  = "user:${local.test_user_email}"
}

resource "google_bigquery_dataset_iam_member" "test-test-user" {
  dataset_id = "ZZZZZZ"
  role       = "roles/bigquery.dataViewer"
  member     = "user:${local.test_user_email}"
}

のようなユーザは、該当BigQueryのデータセット ZZZZZZ にアクセスできてクエリも実行できるので、これをデータソースとして設定できる。 データソースの画面の データの認証情報 でユーザ名(下記画面参照) をクリックして、表示されるダイアログで、サービスアカウント認証情報 に上記で作成したサービスアカウントのメールアドレス入力して更新を実行しようとしても、以下のようにエラーとなる。

該当ユーザがサービスアカウントを利用できるように以下の設定も追加する。

resource "google_service_account_iam_member" "test-test-user" {
  service_account_id = google_service_account.looker-studio.id
  role               = "roles/iam.serviceAccountUser"
  member             = "user:${local.test_user_email}"
}

この設定の追加後は、サービスアカウントのメールアドレスを入力して更新 すると成功する。 これで、これ以降はこのデータソースへのアクセスはサービスアカウントによって行われる。

めでたしめでたし!

Slack Bolt for Python でワークフローステップ実行でLazyリスナー関数を利用する方法

Slack Bolt for Pythonを利用して、ワークフローステップを提供するカスタムアプリを AWS Lambda 上で動かす時にはこう書くと良いよ、という記事です。

試していったこと

Step0:ローカルでSocket Modeで試す。

全体のコードは割愛しますが、Socket Modeの場合は以下のようなコードで特に問題は起きません。

app = App(
    token=token,
    signing_secret=signing_secret,
    process_before_response=False,
)


# WorkflowStep 定義

def edit(ack, step, configure, logger):
    ack()
    logger.info(step)

    blocks = []
    configure(blocks=blocks)

def save(ack, body, view, update, logger):
    ack()
    logger.info(body)

    update(inputs={}, outputs=[])

def execute(body, client, step, complete, fail, logger):
    logger.info(body)
    complete(outputs={})


# WorkflowStep 登録

ws = WorkflowStep(
    callback_id="sample-step",
    edit=edit,
    save=save,
    execute=execute,
)
app.step(ws)

Step1: AWS Lambda で動かす - process_before_response=True

ローカルで動いたので、では Lambda で動かしましょうということで、process_before_response は True にしてデプロイします。

app = App(
    token=token,
    signing_secret=signing_secret,
    process_before_response=True,
)

もちろん、slack_bolt.adapter.aws_lambda.SlackRequestHandler を使うようにしていますが、上記以外の箇所のコードに変更はありません。

実行すると(executeが呼ばれる)、complete の呼出結果として

{
    "ok": false,
    "error": "trigger_exchanged"
}

が返って来ているログが出ていました。

エラーの内容は https://api.slack.com/methods/workflows.stepCompleted にあるように

Error returned when the provided workflow_step_execute_id has already been used.

ということとで、workflow_step_execute_id はもう使用済み、という感じのようです。

よくよくログ見ると、

INFO:slack_bolt.workflows.step.step:execute
INFO:slack_bolt.workflows.step.step:execute
ERROR:slack_bolt.App:Failed

みたいな流れで、最初の execute では complete は成功しているようですが("ok": true になっている)、再度リクエストが来たためにこの状況になっているようです。 なお、ERRORになっているので、何度か同じリクエストがやってきます。

Step2:AWS Lambda で動かす - process_before_response=False

ちょっと、よくわからないけれど、Socket Mode の時には process_before_response=False であったので、これで試してみます。

app = App(
    token=token,
    signing_secret=signing_secret,
    process_before_response=False,
)

def execute(body, client, step, complete, fail, logger):
    logger.info(body)

    time.sleep(1)
    logger.info("sleeped")

    complete(outputs={})

sleep とログで、なんらかの処理を模倣しておきます。

これをデプロイして実行すると、

logger.info("sleeped")

に該当するログは出ません。

一方で

DEBUG:slack_bolt.App:Responding with status: 200 body: ""

というログは出ているので、ack は行われている模様。

結局、https://slack.dev/bolt-python/ja-jp/concepts#lazy-listeners で書かれているように、HTTP レスポンスを返したあとにスレッドやプロセスの実行を続けることができない 状況になっている感じに見える。

明示的に ack していないから、このあたりでフレームワークが ack している雰囲気を感じる。

Step3: AWS Lambda で動かす - 明示的に ack してみる

じゃあ、ack を complete の後に呼び出して見ようと。

app = App(
    token=token,
    signing_secret=signing_secret,
    process_before_response=False,
)

def execute(ack, body, client, step, complete, fail, logger):
    logger.info(body)

    time.sleep(1)
    logger.info("sleeped")

    complete(outputs={})

    ack()

こんな感じですね。

が、結果は先ほど同じで、sleeped のログは出ない。

https://slack.dev/bolt-python/ja-jp/concepts#executing-steps のサンプル見ても、execute で ack してないから、フレームワーク側でよしなにしているのだろうなぁという想像もできる。

Lazyリスナー関数を利用するには??

ここまでの状況から、

  • ack を complete 前に呼び出しておくと良さそう(Socket Modeの挙動と同じになるはず)
  • Lambda だと単純に ack を先に呼び出すと, 実際の処理ができない
  • なので、Lazyリスナー関数を利用したい(実態の遅延呼び出しをしたい)

という感じに。

が、ぱっと見、WorkflowStep の execute に渡す関数を、遅延呼び出しする方法がない。 ということで、ここにきてようやくフレームワークのソースを検索。。。

WorkflowStep は https://github.com/slackapi/bolt-python/blob/v1.15.3/slack_bolt/workflows/step/step.py#L302 で定義されている。
ここ見ていくと、execute に渡した値は https://github.com/slackapi/bolt-python/blob/v1.15.3/slack_bolt/workflows/step/step.py#L338-L344 のように

        self.execute = self.build_listener(
            callback_id=callback_id,
            app_name=app_name,
            listener_or_functions=execute,
            name="execute",
            base_logger=base_logger,
        )

build_listener 関数に渡されている。この時に listener_or_functions と複数になっているのに気づく。

さらに見ていく。 listener_or_functions が List の場合は、https://github.com/slackapi/bolt-python/blob/v1.15.3/slack_bolt/workflows/step/step.py#L372 の分岐に入っていく。
そこから、execute には関数の配列を渡せることがわかった。

さらに https://github.com/slackapi/bolt-python/blob/v1.15.3/slack_bolt/workflows/step/step.py#L392

            ack_function = functions.pop(0)

リストの先頭は、ack 関数になり、https://github.com/slackapi/bolt-python/blob/v1.15.3/slack_bolt/workflows/step/step.py#L398

                lazy_functions=functions,

残りの関数が Lazyリスナー関数になるっぽく見えることがわかった。

ちなみに、その次の行

                auto_acknowledgement=name == "execute",

というのがあり、これから execute の場合は ack は勝手に実行されるんだなということも見えてきた。 この auto_acknowledgement については https://github.com/slackapi/bolt-python/blob/v1.15.3/slack_bolt/listener/thread_runner.py#L48 を見ていくと良さそう。

Step4:Lazyリスナー関数対応

ということで、以下のようなコードにする。

app = App(
    token=token,
    signing_secret=signing_secret,
    process_before_response=True,
)

def execute(body, client, step, complete, fail, logger):
    logger.info(body)

    time.sleep(1)
    logger.info("sleeped")
    complete(outputs={})

def _ack(ack, logger):
    logger.info("ack")
    ack()

ws = WorkflowStep(
    callback_id="sample-step",
    edit=edit,
    save=save,
    execute=[_ack, execute],
)

これで、trigger_exchanged のエラーも発生せず、期待通りに動いた。

サンプルコード

上記の方法を取り入れたサンプルを以下に置きました。 github.com

サンプルでは、ベタに書いていくよりは整理がつくので、ワークフローステップを別ファイルにクラス化し分離しています。

https://github.com/kikumoto/bolt-workflowstep-sample/blob/main/app/workflowstep.py

class CustomWorkflowStep:

    def register(self, app, callback_id):
        ws = WorkflowStep(
            callback_id=callback_id,
            edit=self.edit,
            save=self.save,
            execute=[self.ack, self.execute],
        )
        app.step(ws)

本筋ではないですが、サンプルコードについて少し解説

このワークフローステップは、設定したメッセージを postEphemeral で投稿するものです。
Slack の Workflow は、入力値を保持してくれるので、

https://github.com/kikumoto/bolt-workflowstep-sample/blob/main/app/workflowstep.py#L32

"initial_value": step.get("inputs", {}).get("message", {}).get("value", ""),

な感じで書いておくと、最初の設定時はデフォルト値(ここでは空文字)、以降は入力データが表示されるようになります。

設定したメッセージを mrkdwn 表示するには

https://github.com/kikumoto/bolt-workflowstep-sample/blob/main/app/workflowstep.py#L103

"text": html.unescape(inputs["message"]["value"]),

のように html unescape する必要があります。

まとめ

Slack Bolt for Pythonを利用して、ワークフローステップを提供するカスタムアプリを AWS Lambda 上で動かす時には、execute では Lazy リスナー関数を利用する必要があります。

ws = WorkflowStep(
    callback_id="sample-step",
    edit=edit,
    save=save,
    execute=[_ack, execute],
)

のように、execute に ack を実行するための関数と、実処理をする関数を List にして渡せばOKです。

詳しくは https://github.com/kikumoto/bolt-workflowstep-sample を参考にしてもらえると良いです。

以上。

Ractorに入門してみた - Consumers/Producer はこんな感じ?編 -

以下での背景と失敗を踏まえて、元々やりたいことを書いてみた、という内容の記事になります。

kikumoto.hatenablog.com

やりたい事

  • ある処理をするためのデータが多数ある
  • それを並列に処理したい。
  • 並列数は固定でOK。
  • 結果は随時処理したいが、処理つの都合で1カ所でやりたい

みたいな感じです。

github.com

Worker pool のサンプルだと

(1..N).each{|i|
  pipe << i
}

で、全てのデータを渡してから、最後に結果をごそっと受け取るという感じで微妙にニーズと一致しませんでした。(結果を随時受け取っていきたい)

実装

そこで、実際のデータとか実処理の内容はのぞいて、並列処理部分はこんな感じ?というのを書いてみました。

def main()
  # 同時実行数
  c = 2

  producer = Ractor.new Ractor.current, c do |parent, c|
    puts "start producer"

    get_data.each do |d|
      Ractor.yield Ractor.make_shareable(d, copy: true)
      # sleep 1
    end

    # consumer に終了通知
    c.times do
      Ractor.yield :term
    end

    parent.send :producer_finished
  end

  consumers = (1..c).map do |i|
    Ractor.new producer, i do |producer, i|
      puts "start consumer #{i}"

      loop do
        d = producer.take
        break if d == :term

        puts "consumer_#{i}: #{d['id']}"
        Ractor.yield d['value']
      end

      # main Ractor への終了報告
      Ractor.yield :consumer_finished
    end
  end

  # consumer からの結果受け取り&終了待ち
  until consumers.empty?
    r, obj = Ractor.select(*consumers)
    if obj == :consumer_finished
      consumers.delete r
      next
    end
    puts "message: #{obj}"
  end

  puts "wait producer"
  Ractor.receive
end

def get_data
  [
    { 'id' => 1, 'value' => 'aaa'},
    { 'id' => 2, 'value' => 'bbb'},
    { 'id' => 3, 'value' => 'ccc'},
    { 'id' => 4, 'value' => 'ddd'},
  ]
end

main

一応、これで期待通りっぽく動いているのだけど、果たして。。。
詳しい人に教えて欲しい〜。

Ractorに入門してみた - 基本的なところでハマった編 -

背景

ruby で書き始めたちょっとしたツールで、処理する対象量の都合で並列実行したいな、となり、できればやはり複数 CPU 使いたいなということで、Ruby って Process 以外で今って何かできるんだっけと見たら Ractor というのがあるんですね。

Ractor 自体は

techlife.cookpad.com

などなど見てもらうのが良いと思います。なんせ、まだ私はわからないことだらけなので。

で experimental ということですが、作ろとしているツールは1回ぽっきり的なので、それなら勉強がてら(最近この手のもので新しいことを使う機会が少ないこともあり)使ってみようと思い手を付けてみました。

この記事は、やってみて、適当な理解なせいでしばらくハマった内容を書いたものです。 ちゃんと考えればわかるでしょ、的な自戒な意味を込めた記事。

なお、rubyのバージョンは以下。

$ ruby --version
ruby 3.1.0p0 (2021-12-25 revision fb4df44d16) [x86_64-darwin19]

間違い実装

Consumers - Producer 的な感じなものを実装したくて、プロタイピング的になんとなくで書いてみました。(dame.rb とします)

def main
  producer = Ractor.new do
    get_data.each do |d|
      Ractor.yield Ractor.make_shareable(d, copy: true)
    end
    Ractor.yield :term
  end

  consumer = Ractor.new producer do |producer|
    loop do
      d = producer.take
      break if d == :term
      puts "consumer: #{d['value']}"
    end
  end

  # producer 完了待ち
  producer.take

  # consumer 完了待ち
  consumer.take
end

def get_data
  [
    { 'id' => 1, 'value' => 'aaa'},
    { 'id' => 2, 'value' => 'bbb'},
    { 'id' => 3, 'value' => 'ccc'},
    { 'id' => 4, 'value' => 'ddd'},
  ]
end

main
  • producer からデータ流して、
  • consumer で受け取って何か処理
  • 後は終了待ち

みたいなやつです。

これ実行すると

$ ruby dame.rb
<internal:ractor>:267: warning: Ractor is experimental, and the behavior may change in future versions of Ruby! Also there are many implementation issues.
consumer: bbb
consumer: ccc
consumer: ddd

みたいな感じで、一個データが consumer に渡っていなんですよ。 これにしばらく悩みました。。。

原因は

  # producer 完了待ち
  producer.take

これ。

当然ですよね。。。
producer.take するってことは producerRactor.yield したやつを受け取るので、ここに1つ producer から流されたデータが渡ってきていた、というだけ。

終了待ちは take だからという適当な理解で書いてしまった、というオチです。

動くようにしたもの

動くようにするには、この実装だと最後の待ちの順番を入れ替えるだけ。

  # consumer 完了待ち
  consumer.take

  # producer 完了待ち
  producer.take

これだと期待通り。

$ ruby ok.rb
<internal:ractor>:267: warning: Ractor is experimental, and the behavior may change in future versions of Ruby! Also there are many implementation issues.
consumer: aaa
consumer: bbb
consumer: ccc
consumer: ddd

さて、動いていはいるけど、これで本当に良いのか全く自信なし。

:term みたいなメッセージ投げるのが正しいのだろうか、というのも。

make_shareable のところはこの記事的なところだと copy: true 必要はないけど、今後予定のことを考えてそうしてはいます。

まだまだ理解に乏しいけど、実装しながら学んでいくことにします!!
(初めから〇〇言語で書けば、というのはあるが...それは置いとく)

iOSDC Japan 2021のコアスタッフやってきました

もうブログはほぼ放棄ぎみなのですが、iOSDC Japan 2021が終わって書く気分になったので書きます。

感謝

今年も大成功にiOSDCが終わったと思います。
スポンサー、スピーカー、参加者、スタッフ(4Sというワードが爆誕してましたね)の皆さまのおかけです。 とても楽しい 2.5 Days でした!!

参加形態

今年もコアスタッフとして参加させていただきました。
スタッフ自体は1回目からさせていただいており、もはや私にとっては一年で一番大事なタスクになっていますね。
普段の通常の仕事よりは完全に優先事項。内緒です。。。

事前の担当

今年は、スピーカー、サポーター向けのノベルティの1つタンブラーを担当しました。
スタッフの Slack でノベルティ案の会話があったときに、ポロッと、「タンブラーとか?」みたいな発言をしたのがきっかけでした。

デザインはデザイナーの方にしていただきましたが、ベースとするタンブラーのサンプル集めから、最終発注までは2か月くらいかかった力作です!! iOSDC Japan 2021のロゴが、タンブラーと蓋にレーザー彫刻で刻印されています。

自分でも大満足な仕上がりになりました。
是非、スピーカー、サポーターの皆さまに長くご愛用いただければ嬉しいです。
おそらく毎年つくるような定番系のノベルティではないので、今後数年は作られないような気がしますし。

f:id:kikumoto:20210908174335j:plain f:id:kikumoto:20210920185337j:plain

当時の担当

当日はリモートからの参加で、Ask the Speaker の方を担当させていただきました。
拙い司会となりましたが、スピーカー、参加者の皆様のおかげもあり、なんとか無事に終えることができたと思います。 ありがとうございました!

来年にむけて

この2年はオンラインでの開催を通じて、以前から参加いただいている方にはオンラインの良さを認識しつつ、オフライン開催の懐かしさも感じ始めたころではないでしょうか?
iOSDCの良いとこの1つは、常にチャレンジをしているカンファレンスだと思っています。
来年はオフライン開催も見えてくる気配もあるので、きっとオン・オフをうまくミックスしたカンファレンスへのチャレンジとなるのではと思っています(実行委員長次第ではある)。
そして、自分もまたそこになにか貢献したい気持ちはあります。
しかし、オン・オフのミックスとなるとスタッフとしてやることは今まで以上に多いと思うので、是非来年は4Sの1つスタッフをしてみようかな、と少しでも思ったあなた!、是非時期がきましたらスタッフへの参加をお願いします!!!

それでは、また来年 iOSDC という同窓会でお会いしましょう!!!