kikumotoのメモ帳

インフラ・ミドル周りを中心に、興味をもったことを適当な感じで。twitter : @takakiku

Slack Bolt for Python でワークフローステップ実行でLazyリスナー関数を利用する方法

Slack Bolt for Pythonを利用して、ワークフローステップを提供するカスタムアプリを AWS Lambda 上で動かす時にはこう書くと良いよ、という記事です。

試していったこと

Step0:ローカルでSocket Modeで試す。

全体のコードは割愛しますが、Socket Modeの場合は以下のようなコードで特に問題は起きません。

app = App(
    token=token,
    signing_secret=signing_secret,
    process_before_response=False,
)


# WorkflowStep 定義

def edit(ack, step, configure, logger):
    ack()
    logger.info(step)

    blocks = []
    configure(blocks=blocks)

def save(ack, body, view, update, logger):
    ack()
    logger.info(body)

    update(inputs={}, outputs=[])

def execute(body, client, step, complete, fail, logger):
    logger.info(body)
    complete(outputs={})


# WorkflowStep 登録

ws = WorkflowStep(
    callback_id="sample-step",
    edit=edit,
    save=save,
    execute=execute,
)
app.step(ws)

Step1: AWS Lambda で動かす - process_before_response=True

ローカルで動いたので、では Lambda で動かしましょうということで、process_before_response は True にしてデプロイします。

app = App(
    token=token,
    signing_secret=signing_secret,
    process_before_response=True,
)

もちろん、slack_bolt.adapter.aws_lambda.SlackRequestHandler を使うようにしていますが、上記以外の箇所のコードに変更はありません。

実行すると(executeが呼ばれる)、complete の呼出結果として

{
    "ok": false,
    "error": "trigger_exchanged"
}

が返って来ているログが出ていました。

エラーの内容は https://api.slack.com/methods/workflows.stepCompleted にあるように

Error returned when the provided workflow_step_execute_id has already been used.

ということとで、workflow_step_execute_id はもう使用済み、という感じのようです。

よくよくログ見ると、

INFO:slack_bolt.workflows.step.step:execute
INFO:slack_bolt.workflows.step.step:execute
ERROR:slack_bolt.App:Failed

みたいな流れで、最初の execute では complete は成功しているようですが("ok": true になっている)、再度リクエストが来たためにこの状況になっているようです。 なお、ERRORになっているので、何度か同じリクエストがやってきます。

Step2:AWS Lambda で動かす - process_before_response=False

ちょっと、よくわからないけれど、Socket Mode の時には process_before_response=False であったので、これで試してみます。

app = App(
    token=token,
    signing_secret=signing_secret,
    process_before_response=False,
)

def execute(body, client, step, complete, fail, logger):
    logger.info(body)

    time.sleep(1)
    logger.info("sleeped")

    complete(outputs={})

sleep とログで、なんらかの処理を模倣しておきます。

これをデプロイして実行すると、

logger.info("sleeped")

に該当するログは出ません。

一方で

DEBUG:slack_bolt.App:Responding with status: 200 body: ""

というログは出ているので、ack は行われている模様。

結局、https://slack.dev/bolt-python/ja-jp/concepts#lazy-listeners で書かれているように、HTTP レスポンスを返したあとにスレッドやプロセスの実行を続けることができない 状況になっている感じに見える。

明示的に ack していないから、このあたりでフレームワークが ack している雰囲気を感じる。

Step3: AWS Lambda で動かす - 明示的に ack してみる

じゃあ、ack を complete の後に呼び出して見ようと。

app = App(
    token=token,
    signing_secret=signing_secret,
    process_before_response=False,
)

def execute(ack, body, client, step, complete, fail, logger):
    logger.info(body)

    time.sleep(1)
    logger.info("sleeped")

    complete(outputs={})

    ack()

こんな感じですね。

が、結果は先ほど同じで、sleeped のログは出ない。

https://slack.dev/bolt-python/ja-jp/concepts#executing-steps のサンプル見ても、execute で ack してないから、フレームワーク側でよしなにしているのだろうなぁという想像もできる。

Lazyリスナー関数を利用するには??

ここまでの状況から、

  • ack を complete 前に呼び出しておくと良さそう(Socket Modeの挙動と同じになるはず)
  • Lambda だと単純に ack を先に呼び出すと, 実際の処理ができない
  • なので、Lazyリスナー関数を利用したい(実態の遅延呼び出しをしたい)

という感じに。

が、ぱっと見、WorkflowStep の execute に渡す関数を、遅延呼び出しする方法がない。 ということで、ここにきてようやくフレームワークのソースを検索。。。

WorkflowStep は https://github.com/slackapi/bolt-python/blob/v1.15.3/slack_bolt/workflows/step/step.py#L302 で定義されている。
ここ見ていくと、execute に渡した値は https://github.com/slackapi/bolt-python/blob/v1.15.3/slack_bolt/workflows/step/step.py#L338-L344 のように

        self.execute = self.build_listener(
            callback_id=callback_id,
            app_name=app_name,
            listener_or_functions=execute,
            name="execute",
            base_logger=base_logger,
        )

build_listener 関数に渡されている。この時に listener_or_functions と複数になっているのに気づく。

さらに見ていく。 listener_or_functions が List の場合は、https://github.com/slackapi/bolt-python/blob/v1.15.3/slack_bolt/workflows/step/step.py#L372 の分岐に入っていく。
そこから、execute には関数の配列を渡せることがわかった。

さらに https://github.com/slackapi/bolt-python/blob/v1.15.3/slack_bolt/workflows/step/step.py#L392

            ack_function = functions.pop(0)

リストの先頭は、ack 関数になり、https://github.com/slackapi/bolt-python/blob/v1.15.3/slack_bolt/workflows/step/step.py#L398

                lazy_functions=functions,

残りの関数が Lazyリスナー関数になるっぽく見えることがわかった。

ちなみに、その次の行

                auto_acknowledgement=name == "execute",

というのがあり、これから execute の場合は ack は勝手に実行されるんだなということも見えてきた。 この auto_acknowledgement については https://github.com/slackapi/bolt-python/blob/v1.15.3/slack_bolt/listener/thread_runner.py#L48 を見ていくと良さそう。

Step4:Lazyリスナー関数対応

ということで、以下のようなコードにする。

app = App(
    token=token,
    signing_secret=signing_secret,
    process_before_response=True,
)

def execute(body, client, step, complete, fail, logger):
    logger.info(body)

    time.sleep(1)
    logger.info("sleeped")
    complete(outputs={})

def _ack(ack, logger):
    logger.info("ack")
    ack()

ws = WorkflowStep(
    callback_id="sample-step",
    edit=edit,
    save=save,
    execute=[_ack, execute],
)

これで、trigger_exchanged のエラーも発生せず、期待通りに動いた。

サンプルコード

上記の方法を取り入れたサンプルを以下に置きました。 github.com

サンプルでは、ベタに書いていくよりは整理がつくので、ワークフローステップを別ファイルにクラス化し分離しています。

https://github.com/kikumoto/bolt-workflowstep-sample/blob/main/app/workflowstep.py

class CustomWorkflowStep:

    def register(self, app, callback_id):
        ws = WorkflowStep(
            callback_id=callback_id,
            edit=self.edit,
            save=self.save,
            execute=[self.ack, self.execute],
        )
        app.step(ws)

本筋ではないですが、サンプルコードについて少し解説

このワークフローステップは、設定したメッセージを postEphemeral で投稿するものです。
Slack の Workflow は、入力値を保持してくれるので、

https://github.com/kikumoto/bolt-workflowstep-sample/blob/main/app/workflowstep.py#L32

"initial_value": step.get("inputs", {}).get("message", {}).get("value", ""),

な感じで書いておくと、最初の設定時はデフォルト値(ここでは空文字)、以降は入力データが表示されるようになります。

設定したメッセージを mrkdwn 表示するには

https://github.com/kikumoto/bolt-workflowstep-sample/blob/main/app/workflowstep.py#L103

"text": html.unescape(inputs["message"]["value"]),

のように html unescape する必要があります。

まとめ

Slack Bolt for Pythonを利用して、ワークフローステップを提供するカスタムアプリを AWS Lambda 上で動かす時には、execute では Lazy リスナー関数を利用する必要があります。

ws = WorkflowStep(
    callback_id="sample-step",
    edit=edit,
    save=save,
    execute=[_ack, execute],
)

のように、execute に ack を実行するための関数と、実処理をする関数を List にして渡せばOKです。

詳しくは https://github.com/kikumoto/bolt-workflowstep-sample を参考にしてもらえると良いです。

以上。