Python 2 環境で Airflow の PrestoHook を使おうとして TypeError が発生する問題を無理やりねじ伏せる

EOL が差し迫っている Python 2 を使っているのがそもそもアレじゃね? と言われたらぐうの音も出ない案件です。

はじめに

最近の Apache Airflow、具体的にはバージョン 1.10.2 以降を Python 2 環境で走らせつつ PrestoHook を使ってクエリを実行しようとすると、以下のように TypeError: Cannot mix str and non-str arguments の例外が発生してしまいます。

[2019-07-07 21:26:53,887] {models.py:1788} ERROR - Cannot mix str and non-str arguments
Traceback (most recent call last):
  File "/path/to/python2.7/site-packages/airflow/models.py", line 1657, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/path/to/python2.7/site-packages/airflow/operators/python_operator.py", line 103, in execute
    return_value = self.execute_callable()
  File "/path/to/python2.7/site-packages/airflow/operators/python_operator.py", line 108, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/path/to/airflow/dags/hoge.py", line 19, in hoge
    presto.get_records('select 1')
  File "/path/to/python2.7/site-packages/airflow/hooks/presto_hook.py", line 86, in get_records
    self._strip_sql(hql), parameters)
  File "/path/to/python2.7/site-packages/airflow/hooks/dbapi_hook.py", line 116, in get_records
    cur.execute(sql)
  File "/path/to/python2.7/site-packages/pyhive/presto.py", line 201, in execute
    '{}:{}'.format(self._host, self._port), '/v1/statement', None, None, None))
  File "/path/to/python2.7/site-packages/future/backports/urllib/parse.py", line 387, in urlunparse
    _coerce_args(*components))
  File "/path/to/python2.7/site-packages/future/backports/urllib/parse.py", line 115, in _coerce_args
    raise TypeError("Cannot mix str and non-str arguments")
TypeError: Cannot mix str and non-str arguments

この例外は Python 3 であれば発生しないので、Python 2 の利用を諦めて Python 3 に移行することが唯一の根本対処になり得るでしょう。しかし、長いこと運用している Airflow の環境は様々なオーナーによって作られた数多くの DAG が常時動いており、それらの DAG を一気にすべて Python 3 に移行するのはなかなかに困難なタスクと言えます。

そのため、いずれは Airflow の環境を Python 3 に移行することを目的に個別の DAG の Python 3 検証および対応を推し進めつつも、不本意ながら既存の Python 2 環境をだましだまし運用するといったダーティハックが求められることもあり得ます。

このブログエントリではまさにそのようなやり方で、PythonHook における TypeError 例外を無理やりねじ伏せる対処方法について説明しています。

TypeError: Cannot mix str and non-str arguments が発生する理由

さてまずは、Python 2 環境で PrestoHook を利用したときに TypeError が生じる問題についてその理由を探っていきましょう。

TypeError の例外を投げている箇所は、futures モジュール (バージョン 0.16.0) の backports/urllib/parse.py に実装がある _coerce_args() 関数 であり、この例外はその実装から明らかなように、関数の引数に unicode と str の値が混在している場合に発生します。

この _coerce_args 関数は、urlunparse() 関数から呼び出されて おり、この関数は与えられた URL を構成する要素から URL の文字列を生成します。

PyHive モジュールの presto.py にある Cursor#execute() メソッドでは、クエリを送信する先の URL を組み立てる際にこの urlunparse() 関数を呼び出して おり、具体的には以下のコードの抜粋にあるように、プロトコル (http や https などの URI のスキームのこと)、ホスト名とポート番号、パスを明示的に渡して URL を構築しようとしています。

        url = urlparse.urlunparse((
            self._protocol,
            '{}:{}'.format(self._host, self._port), '/v1/statement', None, None, None))

ここで urlunparse() 関数に渡している文字列のうち、プロトコルについては PrestoHook#get_conn() メソッドから渡されてきた 値を使っており、このプロトコルが unicode ではないために結果として TypeError が発生していた、というわけです。

ゆえに、PrestoHook が PyHive の presto.connect() 関数 に渡しているプロトコルを str から unicode に変えてあげることで、TypeError の発生を回避できることが期待できるようになります。

無理やりねじ伏せるコード

そういうわけで、この PrestoHook における TypeError 例外が発生する問題を回避するには、以下のようなモンキーパッチコードを書けば OK です。

import sys

import airflow
from airflow.hooks.presto_hook import PrestoHook


def patch_to_avoid_mix_str_and_nonstr_error():
    if sys.version_info.major != 2:
        return

    if airflow.__version__ not in ['1.10.2', '1.10.3']:
        return

    from requests.auth import HTTPBasicAuth
    from pyhive import presto

    def presto_hook_get_conn(self):
        db = self.get_connection(self.presto_conn_id)
        reqkwargs = None
        if db.password is not None:
            reqkwargs = {'auth': HTTPBasicAuth(db.login, db.password)}
        return presto.connect(
            host=db.host,
            port=db.port,
            username=db.login,
            source=db.extra_dejson.get('source', 'airflow'),
            # unicode に変換して protocol パラメータに指定する
            protocol=unicode(db.extra_dejson.get('protocol', 'http')),
            catalog=db.extra_dejson.get('catalog', 'hive'),
            requests_kwargs=reqkwargs,
            schema=db.schema)

    PrestoHook.get_conn = presto_hook_get_conn

このパッチの問題点は、利用している Airflow のバージョンに合わせて PrestoHook#get_conn() の実装を参考に書かなければならないことにあります。そのため、将来 1.10.4 以降 (1.11 や 2.x を含む) のバージョンがリリースされた場合にこのパッチが必ずしも通用するわけではないことにご注意ください。