Airflow 1.10.3 で webhook ベースの Slack integration を利用する
TL;DR: Airflow 1.10.3 の webhook ベースの Slack integration は致命的にバグっていて生きるのが辛い。
はじめに
Airflow には、DAG から Slack になにがしかの通知を飛ばす際の利用に適した機能として、Slack API ベースの実装である SlackOperator
/ SlackHook
と webhook ベースの SlackWebhookOperator
/ SlackWebhookHook
が用意されています。ここで「通知を飛ばす」という目的だけを考えれば、後者の webhook を用いた機能を利用するのが適しているのですが、Airflow 1.10.3 には webhook へのリクエスト送信できない致命的なバグ 1 が存在しており、なかなかお辛い状況になっています。
このエントリでは、そのバグを暫定的に修正する monkey patch と具体的な operator / hook の利用方法についてメモしています。
Airflow 1.10.3 におけるバグに monkey patch を当てる
まずは monkey patch から。
バグの具体的な内容は、SlackWebhookHook
の __init__()
メソッドにて親クラスである HttpHook
の __init__()
メソッド を呼び出す際に http_conn_id
を渡し損ねている、というものです。
というわけで、SlackWebhookHook
の __init__()
メソッドを以下ののように差し替えてしまえば良さそうです。
import airflow
from airflow.contrib.hooks.slack_webhook_hook import SlackWebhookHook
def apply_monkey_patch_if_needed():
if airflow.__version__ != '1.10.3':
return
def slack_webhook_hook_init(self, http_conn_id=None, webhook_token=None, message="", attachments=None, channel=None,
username=None, icon_emoji=None, link_names=False, proxy=None, *args, **kwargs):
# http_conn_id を親クラスにも渡すようにする
super(SlackWebhookHook, self).__init__(http_conn_id=http_conn_id, *args, **kwargs)
self.webhook_token = self._get_token(webhook_token, http_conn_id)
self.message = message
self.attachments = attachments
self.channel = channel
self.username = username
self.icon_emoji = icon_emoji
self.link_names = link_names
self.proxy = proxy
SlackWebhookHook.__init__ = slack_webhook_hook_init
このモジュールを必要に応じて import して apply_monkey_patch_if_needed()
を呼び出せば OK です。
SlackWebhookOperator / SlackWebhookHook を利用する
前準備
Webhook を利用する operator / hook を使う場合、まずは connection の設定が必要になります。 ここで Slack から払い出された webhook URL が https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX だったとして、以下のように設定します。
- Conn Id
- 任意の名前 (例えば
slack_webhook
など)
- 任意の名前 (例えば
- Conn Type
HTTP
- Host
https://hooks.slack.com
- Extra
{"webhook_token":"/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX"}
もしくは、Airflow の CLI を使うのであれば、以下のように connections
サブコマンドで -a
(add) します。
airflow connections -a \
--conn_id slack_webhook \
--conn_type http \
--conn_host "https://hooks.slack.com/services" \
--conn_extra '{"webhook_token":"/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX"}'
DAG における Slack 通知の実装
Airflow 1.10.3 においては、SlackWebhookOperator
は SlackWebhookHook
のラッパー相当の実装になっています。
従って、operator 側で指定できるパラメータも hook のそれに準じています。
API ドキュメントはこちら。
Operator / hook いずれにしても、インスタンス生成時にチャンネル (channel
パラメータ) やアイコン (icon_emoji
パラメータ)、ユーザ名 (username
パラメータ) などを指定することになります。
またメッセージにユーザへのメンションを含む場合は、link_names
パラメータに True
を指定する必要があるでしょう。
http_conn_id
パラメータでは、connections で設定した Slack webhook の conn_id
を指定します。
Slack に通知したいメッセージは、message
パラメータで指定することになるのですが、これは operator だけではなく hook も同じで仕様であることに注意が必要です。つまり、SlackWebhookHook
のインスタンスを事前に生成しておいて後からメッセージを指定する… といった使い方が できません。
サンプルコードの全体は GitHub 上 で見てもらうとして、具体的な実装は以下になります。
def slack_webhook_hook_demo():
slack = SlackWebhookHook(
http_conn_id='slack',
message='Hello, world! (SlackWebhookHook)',
username='SlackWebhookHook',
icon_emoji=':cat:',
link_names=True)
slack.execute()
with DAG('slack_demo',
default_args=default_args,
schedule_interval='*/1 * * * *',
catchup=False) as dag:
first = SlackWebhookOperator(
task_id='slack_webhook_operator_demo',
http_conn_id='slack',
message='*Hello, world!* (SlackWebhookOperator)\nhttps://airflow.apache.org/',
username='SlackWebhookOperator',
icon_emoji=':dog:',
link_names=True,
dag=dag)
second = PythonOperator(
task_id='slack_webhook_hook_demo',
python_callable=slack_webhook_hook_demo,
dag=dag)
first >> second
しかるべき WebhookConnections を設定してサンプルコードを実行すると、
といった通知が得られることでしょう。
なお SlackWebhookOperator
は message
パラメータであっても Jinja テンプレートは使えないことに注意が必要です。タスクの詳細な情報を Slack 通知したいということであれば、PythonOperator
と SlackWebhookHook
を使うのが無難でしょう。
まとめ
- Airflow 1.10.3 は致命的なバグがあって辛いよ
- Monkey patch キメれば一応使えるようにはなるよ
- Slack 通知に使う Webhook は connections に設定するよ
SlackWebhookOperator
とSlackWebhookHook
があるけど、タスクの詳細な情報を通知したいなら後者の hook を使うのがいいよ
-
1.10.4 で修正予定となっています。 ↩