愛と勇気と缶ビール

ふしぎとぼくらはなにをしたらよいか

gRPCサーバをAWSのNLBでロードバランスする場合のtips

※ この記事に書いてあるのは「ベストな方法」ではないです。時間に制約がある中でとりあえずこうやって問題解決した、今のところは良さげ、という記事です。

gRPCサーバのロードバランスを行う方法については色々な方法があって、それをくだくだ書くことはしない。リンク先を参照されたし。

grpc.io

hakobe932.hatenablog.com

まー要は

  1. クライアント側でやる
  2. Proxy挟んでそいつにやらせる
  3. L4でロードバランス
  4. L7でロードバランス

のどれかを選ぶことになる。サービスがコンテナベースで動いているなら、EnvoyとかのSideCarにやらせるのが一番センスいい気がしている。

で、ちょっとそのためだけに実環境をコンテナ化するわけにもいかんし、Proxyサーバを別に運用したくなんてないし、そしてAWSのL7ロードバランサであるALBはバックエンドのHTTP/2通信に対応していないクソである。どうするか。色々考えた末にNLBを使ってL4でロードバランスすることにした。

この方法には問題があって、要はL4だとメッセージ単位でなくコネクション単位でしかロードバランスしないので「あんまりロードバランスできていない」感じになってしまう。まあそこは妥協しよう。

で、やっていったところ、開発環境で問題が起こった。

blog.manabusakai.com

上記リンク先にあるように、NLBは勝手にtimeoutしてコネクションを切る。最大3600秒か。これが起こるとクライアント側が通信しようとした時点でサーバ側からRSTが送りつけられ、gRPC的にはUNAVAILABLEエラーが発生する。困ったことにこれが発生するとコネクションが腐ったような状態になってしまい、gRPCサーバを一度再起動しないと直らない。困ったな、困ったな。

だったらそもそもkeepaliveせずに都度接続すりゃいいんじゃないか?と思って色々試したが、gRPCクライアントのコネクション管理は内部的にChannelと呼ばれる何かによって行われており、Rubyレベルでオブジェクトを破棄してもTCPコネクションを閉じてはくれない。そして明示的に閉じるインタフェースもない。困ったな、困ったな。どうやらgRPCではユーザ側でTCPコネクションの状態を制御するもんじゃないらしい。全ては俺たち (Channel) に任せろ、お前らは手を出すな。そういう思想らしい。

それならkeepaliveの上でheartbeatをちゃんと打つように設定するか、と思ってクライアント側やサーバ側の設定値をいろいろいじっても、heartbeat送ってる感全然なし。俺はtsharkとにらめっこ。なんだこりゃ!設定が反映されないぞ。困ったな、困ったな。こんなことやってるとリリースに間に合わんぞ。

どーも調べると、クライアント側にも設定自体はできるが意味ないオプションがあったりするようだ。それで色々いじっているうち、「サーバ側から一定時間でコネクションを切る」というオプションはちゃんと動くことがわかった。

grpc.max_connection_age_ms

これだ。

これを設定すると、一定時間以上接続の続いているコネクション(ストリーム?)にはHTTP/2のGOAWAYフレームが送出されるようになる。なお、クライアント側では切断時にGOAWAYエラーが発生するようになった。

ちなみにgRPCの公式な仕様として、「サーバ側から明示的にコネクション切る際にはGOAWAYを送る」ということになっている。らしい。

grpc/PROTOCOL-HTTP2.md at master · grpc/grpc · GitHub

なので、この設定を入れることによって

「3600秒で勝手にNLBがコネクションを切り、クライアント側はUNAVAILABLEエラーを受け取る。以降、コネクションが腐ったような状態になる」

から

「一定時間経過で明示的にGOAWAYフレームが送られ、クライアント側はGOAWAYエラーを受け取る」

に変わったことになる。微妙な違いのように見えるが、事態をハンドリングできているだけ後者の方がだいぶマシである。

で、このままだと新規リクエストを送る際にGOAWAYエラーが発生してしまうことがある(明示的にGOAWAY送る設定にしたんだから当たり前だけど)。カバーするためにクライアント側でGOAWAYの時だけリトライを行うようにして解決しました。ちゃんちゃん。

あ、ちなみにクライアント側の言語はRubyでした。

CベースのgRPCサーバをgraceful restartする、たった一つの冴えたやり方

たった一つかどうかは知らんけど、「まあとりあえずこれでいいや」みたいな方法。

「CベースのgRPCサーバ」とは何ぞやっていうと、gRPCの実装はC言語ベースの実装・golang実装・Java実装に分かれているっぽくて、いわゆるRubyとかPythonとかのLLにおけるgRPC Serverは「各言語のライブラリ => 各言語のC言語バインディング => Cで書かれたコア」という構成になっているっぽい。早い話が、goとJava以外ではCベースの実装を使っていることになる。

で、ある程度真面目にRuby / gRPCで書かれたAPIサーバを運用しようとするにあたって「これどうやってgraceful restartしようかな」と思ったわけですね。

今まで僕がgraceful restartをやる方法といえば、

  1. UnicornとかNginxとか、サーバアプリケーション自体がgraceful restartを実装しているのでそれに乗っかる
  2. H2OとかRhebokとか、server-starter経由でgraceful restartできるサーバアプリケーションを使う

のどっちかしかやってこなかったわけですが、gRPC Serverは少なくとも1ではなさそうであり、2のためにコードに手を入れるのもなんだかな、という感じだったので「どうしたもんかな〜」と思いながらgRPCのCのソースを眺めいていたわけです。

そしたら、アレですよアレ。なんとCベースのgRPCサーバはデフォルトでSO_REUSEPORTが有効になってるんですね。やった!これや!ということで超手抜きなgraceful restartの実現方法がこちら

  1. 新サーバを立ち上げる (同じportをlistenしても死なない、なぜならREUSEPORTが効いてるからな!)
  2. 暫し待つ
  3. 旧サーバを殺す

Great.

ただしこの方法には明らかな問題があって、もし新サーバを立ち上げるのに失敗した場合は爆死してしまう。そこは新サーバの立ち上がりをちゃんと確認してから旧サーバを殺すようにすればいいだろう。

イマドキはgraceful restartなんてやらずにBlue / Greenなdeployを行うのが主流だったりするのかもだけど、まあやりたい人はいるだろうということで。

PostgreSQLのLISTEN/NOTIFYでJSONデータを飛ばす

PostgreSQLではJSON, JSONBデータ型のサポートがあります。MySQLも5.7からあるようです。

アプリに対してちょっとした通知を飛ばしたくなったのですが、ちょっとした通知のために別のミドルウェアを導入するのは明らかに筋が悪いのでPostgresのLISTEN/NOTIFYを使うことにしました。

PostgresのLISTEN/NOTIFYはtext型しか渡せないので、ここは構造化されたデータををJSONなどで渡したくなるのが人情というものです。

以下のようなTRIGGERと

DROP TRIGGER IF EXISTS my_table_inserted ON my_table;

CREATE TRIGGER my_table_inserted
    AFTER INSERT ON my_table
    FOR EACH ROW
    EXECUTE PROCEDURE on_my_table_inserted();

以下のようなFUNCTIONで人情を達成します。

CREATE OR REPLACE FUNCTION on_my_table_inserted() RETURNS trigger AS $$
BEGIN
    PERFORM pg_notify('my_table_inserted', json_build_object(
        'id', NEW.id,
        'data_1', NEW.data_1,
        'data_2', NEW.data_2
    )::text);
    RETURN NEW;
END;
$$ LANGUAGE 'plpgsql';

結局のところ送っているのはtext型なのですが、受け取る側でJSON decodeすれば問題ないです。

Pythonで非同期に通知を受け取るコードは別記事に書きます。

完全に余談ですが、現状のPipelineDBではTRIGGERとLISTEN/NOTIFYをこのように組み合わせてCONTINUOUS VIEWの更新通知を受け取ることは出来ません。 CONTINUOUS TRIGGERの実現方法が原因だと思うのですが、多分そのうち何とかなると思います。

PostgreSQLちょっといい話

最近、PostgreSQL(というかPipelineDB)をいじる機会があったのでメモ。

RDBMSにあるとうれしい、というかPostgresにあって「これ良いな」と思った機能を適当に列挙するの回。

CREATE DOMAIN

要はtypedefが出来る。

CREATE DOMAIN company_code smallint;

これで型の別名を定義して、table定義でこの別名を使うことにより

  • 開発中に型の定義を「やっぱこっちで」と切り替えるのが簡単になる
  • 本来同じ型の値を使うべきtable間でうっかり定義がずれてしまうことを抑止できる

などなどのメリットが得られる。地味な機能だけど割とうれしい。

Lateral Join

Reuse Calculations in the Same Query with Lateral Joins

くやしくはリンク先を読んでほしいのだけど、要はカラムの値を元にした計算結果を使って更に何らかの値を計算したいような場合に、まあまあ簡便な記法でクエリが書けるということ。 集計とかでうれしい。

RETURNING

MySQLだとlast_insert_idとかを使うしかないが、RETURNINGを使うとINSERTやUPDATEした後に任意の値を返せる。よい。

思ったよりネタがなかった。

Pythonで async def / def 両対応のデコレータを書く

タイトルの通り。

単純に以下の様なdecoratorを書くと、async defをラップできない。

from functools import wraps
from datetime import datetime

def timetrack(func):
    @wraps(func)
    def inner(self, *args, **kwargs):
        start = datetime.now()
        return_value = func(self, *args, **kwargs)
        end = datetime.now()
        self.logger.info('%s takes %s sec', func.__name__, (end - start).total_seconds())
        return return_value
    return inner

かといって次のように書くと、今度は普通のdefが勝手にcoroutine functionになってしまって困る。

from functools import wraps
from datetime import datetime

def timetrack(func):
    @wraps(func)
    async def inner(self, *args, **kwargs):
        start = datetime.now()
        return_value = await func(self, *args, **kwargs)
        end = datetime.now()
        self.logger.info('%s takes %s sec', func.__name__, (end - start).total_seconds())
        return return_value
    return inner

なので、asyncio.iscoroutinefunctionを使って以下のように分岐するといける。

from functools import wraps
from datetime import datetime

def timetrack(func):
    if asyncio.iscoroutinefunction(func):
        @wraps(func)
        async def async_inner(self, *args, **kwargs):
            start = datetime.now()
            return_value = await func(self, *args, **kwargs)
            end = datetime.now()
            self.logger.info('%s takes %s sec', func.__name__, (end - start).total_seconds())
            return return_value
        return async_inner
    else:
        @wraps(func)
        def inner(self, *args, **kwargs):
            start = datetime.now()
            return_value = func(self, *args, **kwargs)
            end = datetime.now()
            self.logger.info('%s takes %s sec', func.__name__, (end - start).total_seconds())
            return return_value
        return inner

一般化したユーティリティみたいのを書けないこともない気がするけど、今回そこまでのモチベーションはなし。