TL;DR: Airflow 1.10.3 の webhook ベースの Slack integration は致命的にバグっていて生きるのが辛い。

はじめに

Airflow には、DAG から Slack になにがしかの通知を飛ばす際の利用に適した機能として、Slack API ベースの実装である SlackOperator / SlackHook と webhook ベースの SlackWebhookOperator / SlackWebhookHook が用意されています。ここで「通知を飛ばす」という目的だけを考えれば、後者の webhook を用いた機能を利用するのが適しているのですが、Airflow 1.10.3 には webhook へのリクエスト送信できない致命的なバグ 1 が存在しており、なかなかお辛い状況になっています。

このエントリでは、そのバグを暫定的に修正する monkey patch と具体的な operator / hook の利用方法についてメモしています。

Airflow 1.10.3 におけるバグに monkey patch を当てる

まずは monkey patch から。

バグの具体的な内容は、SlackWebhookHook__init__() メソッドにて親クラスである HttpHook__init__() メソッド を呼び出す際に http_conn_id を渡し損ねている、というものです。

というわけで、SlackWebhookHook__init__() メソッドを以下ののように差し替えてしまえば良さそうです。

import airflow

from airflow.contrib.hooks.slack_webhook_hook import SlackWebhookHook


def apply_monkey_patch_if_needed():
    if airflow.__version__ != '1.10.3':
        return

    def slack_webhook_hook_init(self, http_conn_id=None, webhook_token=None, message="", attachments=None, channel=None,
                                username=None, icon_emoji=None, link_names=False, proxy=None, *args, **kwargs):
        # http_conn_id を親クラスにも渡すようにする
        super(SlackWebhookHook, self).__init__(http_conn_id=http_conn_id, *args, **kwargs)
        self.webhook_token = self._get_token(webhook_token, http_conn_id)
        self.message = message
        self.attachments = attachments
        self.channel = channel
        self.username = username
        self.icon_emoji = icon_emoji
        self.link_names = link_names
        self.proxy = proxy

    SlackWebhookHook.__init__ = slack_webhook_hook_init

このモジュールを必要に応じて import して apply_monkey_patch_if_needed() を呼び出せば OK です。

SlackWebhookOperator / SlackWebhookHook を利用する

前準備

Webhook を利用する operator / hook を使う場合、まずは connection の設定が必要になります。 ここで Slack から払い出された webhook URL が https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX だったとして、以下のように設定します。

  • Conn Id
    • 任意の名前 (例えば slack_webhook など)
  • Conn Type
    • HTTP
  • Host
    • https://hooks.slack.com
  • Extra
    • {"webhook_token":"/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX"}

もしくは、Airflow の CLI を使うのであれば、以下のように connections サブコマンドで -a (add) します。

airflow connections -a \
  --conn_id slack_webhook \
  --conn_type http \
  --conn_host "https://hooks.slack.com/services" \
  --conn_extra '{"webhook_token":"/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX"}'

DAG における Slack 通知の実装

Airflow 1.10.3 においては、SlackWebhookOperatorSlackWebhookHook のラッパー相当の実装になっています。 従って、operator 側で指定できるパラメータも hook のそれに準じています。

API ドキュメントはこちら。

Operator / hook いずれにしても、インスタンス生成時にチャンネル (channel パラメータ) やアイコン (icon_emoji パラメータ)、ユーザ名 (username パラメータ) などを指定することになります。 またメッセージにユーザへのメンションを含む場合は、link_names パラメータに True を指定する必要があるでしょう。

http_conn_id パラメータでは、connections で設定した Slack webhook の conn_id を指定します。

Slack に通知したいメッセージは、message パラメータで指定することになるのですが、これは operator だけではなく hook も同じで仕様であることに注意が必要です。つまり、SlackWebhookHook のインスタンスを事前に生成しておいて後からメッセージを指定する… といった使い方が できません

サンプルコードの全体は GitHub 上 で見てもらうとして、具体的な実装は以下になります。

def slack_webhook_hook_demo():
    slack = SlackWebhookHook(
        http_conn_id='slack',
        message='Hello, world! (SlackWebhookHook)',
        username='SlackWebhookHook',
        icon_emoji=':cat:',
        link_names=True)
    slack.execute()

with DAG('slack_demo',
         default_args=default_args,
         schedule_interval='*/1 * * * *',
         catchup=False) as dag:
    first = SlackWebhookOperator(
        task_id='slack_webhook_operator_demo',
        http_conn_id='slack',
        message='*Hello, world!* (SlackWebhookOperator)\nhttps://airflow.apache.org/',
        username='SlackWebhookOperator',
        icon_emoji=':dog:',
        link_names=True,
        dag=dag)

    second = PythonOperator(
        task_id='slack_webhook_hook_demo',
        python_callable=slack_webhook_hook_demo,
        dag=dag)

    first >> second

しかるべき WebhookConnections を設定してサンプルコードを実行すると、

Slack での通知例

といった通知が得られることでしょう。

なお SlackWebhookOperatormessage パラメータであっても Jinja テンプレートは使えないことに注意が必要です。タスクの詳細な情報を Slack 通知したいということであれば、PythonOperatorSlackWebhookHook を使うのが無難でしょう。

まとめ

  • Airflow 1.10.3 は致命的なバグがあって辛いよ
    • Monkey patch キメれば一応使えるようにはなるよ
  • Slack 通知に使う Webhook は connections に設定するよ
  • SlackWebhookOperatorSlackWebhookHook があるけど、タスクの詳細な情報を通知したいなら後者の hook を使うのがいいよ

  1. 1.10.4 で修正予定となっています。