fluentd のバッファファイルを直接加工する

たとえば変なレコードが混じってしまったせいで何度リトライしてもバッファのフラッシュに失敗するようなときに、バッファファイル (buffer_type file で作られるやつ) を使っていれば、そのファイルをいじることで応急処置ができる。

バッファファイルは [tag, time, record] という三つ組の列を msgpack でシリアライズした形式になっていて、v0.12.x と v0.14.x で time の型が違う *1 けど、どちらのバージョンでも以下のようなコードでバッファファイルを読み書きできそう。 なお v0.12.x において Fluent::Engine.msgpack_factory が追加されたのは v0.12.17 から なので注意。

require 'fluent/engine'

in_path = '/path/to/some-buffer.q0123456789abcdef.log'
out_path = '/path/to/modified-buffer.log'

def modify(tag, time, record)
  # do something
  [tag, time, log]
end

File.open(in_path) do |fin|
  unpacker = Fluent::Engine.msgpack_factory.unpacker(fin)
  File.open(out_path, 'w') do |fout|
    packer = Fluent::Engine.msgpack_factory.packer(fout)
    unpacker.each do |triplet|
      packer.write(modify(*triplet))
    end
  end
end

一旦 fluentd を停止してバッファファイルが触られないようにしてから、上のようなスクリプトで変更を加えたバッファファイルを作成し、 mv /path/to/modified-buffer.log /path/to/some-buffer.q0123456789abcdef.log で上書きしてから fluentd を起動するとよさそう。