読者です 読者をやめる 読者になる 読者になる

ActiveJob から見るシリアライズとデシリアライズ

皆さん、こんにちは。プルリクで間違った指摘をして大反省中 の osada です。

プルリクで間違った指摘をして大反省中です。 下記のコードで、何を指摘したか、お分かりになるでしょうか?

class NotificationJob < ActiveJob::Base
  queue_as :default

  def perform(notifiable, user)
    notifiable.notify(user)
  end
end

こんなことを言ってしまったのです。

オブジェクトを丸ごとシリアライズすると、redis の容量を圧迫し、 シリアライズ・デシリアライズにも時間が掛かるので、 クラス名とidを渡して、job の中で取り出して使って下さい。

この発言には2つ、間違いがありました。

  • 1つ目は、オブジェクトが丸ごとシリアライズされると思っていたこと。
  • 2つ目は、ActiveJob は クラス名と id を渡す必要がないことです。

今回は、ActiveJob では、インスタンスをそのまま渡してもよくなったこと、 そして、GlobalID がどのように使われているかを共有します。

話の流れとしては、下記の3つのシリアライズ方法の比較を行います。

  1. python memcache の シリアライズ
  2. resque のシリアライズ
  3. activejob のシリアライズ

Python memcache はインスタンスごとバイナリダンプ

pythonmemcacheclient は、インスタンスを丸ごとダンプします。

In [1]: class A(object):
   ...:     def __init__(self, a):
   ...:         self.a = a
   ...:

In [2]: a = A(1)

In [3]: import memcache

In [5]: mc = memcache.Client(['127.0.0.1:11211'])

In [7]: mc.set("a", a)
Out[7]: True

In [8]: mc.get("a")
Out[8]: <__main__.A at 0x103efbc10>

In [9]: a1 = mc.get("a")

In [10]: a1.a
Out[10]: 1

In [13]: a == a1
Out[13]: False

In [14]: a.__class__ == a1.__class__
Out[14]: True

mc.getで取り出した時に、Aというクラスのインスタンスとして取り出していることがお分かりになるでしょうか。 インスタンスとしては別ものですが、クラスは同じです。

memcache の set では、 オブジェクトをバイナリダンプするpickle というモジュールが動いています。

In [6]: mc._val_to_store_info(a, 0)
Out[6]:
(1,
 89,
 "ccopy_reg\n_reconstructor\np1\n(c__main__\nA\np2\nc__builtin__\nobject\np3\nNtRp4\n(dp5\nS'a'\nI1\nsb.")

12.1. pickle — Python オブジェクトの直列化 — Python 3.4.2 ドキュメント

謎の文字列が格納されていますが、なんとなく読み取れるような? ruby でも marshal を使えば、同じことができると思います。

そしてこれが、一つ目の間違いである、インスタンスを丸ごとダンプしていると考えた理由でした。 きっと rails も、バイナリダンプしているのだろう、と思い込んでしまったのです。

Resque のシリアライズ

では、Rescue はどうしているでしょうか?

[31] pry(main)> class A
[31] pry(main)*   def initialize(a)
[31] pry(main)*     @a = a
[31] pry(main)*   end
[31] pry(main)* end
:initialize
[32] pry(main)> a = A.new 1
#<A:0x007fa192282b80 @a=1>

Aというクラスを定義し、インスタンス化します。

[33] pry(main)> class Q
[33] pry(main)*   @queue = :q
[33] pry(main)*   def self.perform(a)
[33] pry(main)*     p a
[33] pry(main)*   end
[33] pry(main)* end
:perform
[34] pry(main)> Resque.enqueue(Q, a)
true
[35] pry(main)> Resque.keys
[
    [0] "queue:q",
    [1] "queues"
]
[38] pry(main)> Resque.redis.lrange "queue:q", 0, -1
[
    [0] "{\"class\":\"Q\",\"args\":[{\"a\":1}]}"
]

それをQというジョブを作成し、Resque に渡します。 このとき、ジョブQ と、インスタンスaredis に積まれますが、 このとき引数のa が何のクラスだったのか、という情報は消えるようです。 (resque 1.23 です)

redis に積まれた情報だけでは、復元することができません。

なぜかといえば、そもそもインスタンスをダンプしない方針のようです。

If your jobs were run against marshaled objects, they could potentially be operating on a stale record with out-of-date information.

バイナリダンプしてキューに積んだら、 データが更新されてた時に、古いデータのままで使っちゃうかもしれないでしょ?

という感じでしょうか?

resque/resque at 1-x-stable

Resque では、クラスとidを渡して、キューの中で取り出すのが標準です。

さてこれが、2つ目の間違い、クラス名とidを渡して、jobの中で呼び出すという指摘をした原因です。 ActiveJob も同様だろう、と思い込んでしまったのです。

ActiveJob のシリアライズ

ActiveJobでは、GlobalID という機能が入っており、明示的なシリアライズが不要です。 マニュアルに明確に記載がありましたので、確認不足すぎて大反省でした。 Active Job の基礎 — Rails ガイド

バイナリダンプでもないけれども、インスタンスをそのまま渡すことができる、 一体これがどのように動いているのでしょうか?

シリアライズの処理は、activejob-4.2.0/lib/active_job/argument.rb にあります。

def serialize_argument(argument)
  case argument
  when *TYPE_WHITELIST
    argument
  when GlobalID::Identification
    { GLOBALID_KEY => argument.to_global_id.to_s }
  when Array
    argument.map { |arg| serialize_argument(arg) }
  when Hash
    argument.each_with_object({}) do |(key, value), hash|
      hash[serialize_hash_key(key)] = serialize_argument(value)
    end
  else
    raise SerializationError.new("Unsupported argument type: #{argument.class.name}")
  end
end

引数を一つずつ処理しながら、

  1. シリアライズできるもの(TYPE_WHITELIST)
  2. GlobalID::Identification であるもの
  3. 処理できないもの

に分けています(ArrayとHashに対して再帰するのは、引数の処理として勉強になりますね)。

GlobalID::Identification そしてそのメソッドである to_global_id.to_s というのが ActiveRecord を一意に成り立たせているものです。

[1] pry(main)> user = User.first
  User Load (0.5ms)  SELECT  "users".* FROM "users"  ORDER BY "users"."id" ASC LIMIT 1
=> #<User:0x007fd1449c3b88 id: 1, name: "aa", created_at: Sat, 28 Feb 2015 13:12:38 UTC +00:00, updated_at: Sat, 28 Feb 2015 13:12:38 UTC +00:00>
[2] pry(main)> user.to_global_id.to_s
=> "gid://test01/User/1"
[3] pry(main)> user = User.new
=> #<User:0x007fd145d4feb8 id: nil, name: nil, created_at: nil, updated_at: nil>
[4] pry(main)> user.to_global_id.to_s
URI::InvalidURIError: Expected a URI like gid://app/Person/1234: #<URI::Generic gid://test01/User/>

アプリ名、クラス名、id で一意になる文字列を生成しています。 id が無ければ、エラーになります。

そんなわけですから、マニュアルには、ActiveModel にミックスイン、とありますが、 Rails 4.2.0 では、GlobalID というライブラリに変わっており、 ActiveRecord にミックスイン になっていますので、気をつけましょう。

上のコードは、ActiveModel::GlobalIdentificationをミックスインするすべてのクラスで動作します。 このモジュールはActive Modelクラスにデフォルトでミックスインされます。 http://railsguides.jp/active_job_basics.html#globalid

この GlobalID のおかげで、ActiveRecordインスタンスを渡したとき、 global_id の文字列に変換されて積まれる、ということです。

手動でやっていたことを、ライブラリとして組み込んでしまった感じでしょうか。 また、クラス名と、idを渡すのではなく、1つの gid として渡すのも、センスが良いですね。

ActiveJob の デシリアライズ

シリアライズを見てみましょう。

GlobalID::Locator.locate(argument) を使って、 globalid かどうかを確認しています。

def deserialize_argument(argument)
  case argument
  when String
    GlobalID::Locator.locate(argument) || argument
  when *TYPE_WHITELIST
    argument
  when Array
    argument.map { |arg| deserialize_argument(arg) }
  when Hash
    if serialized_global_id?(argument)
      deserialize_global_id argument
    else
      deserialize_hash argument
    end
  else
    raise ArgumentError, "Can only deserialize primitive arguments: #{argument.inspect}"
  end
end

この locate は、下記の手順でデータを取り出します。

  1. string が 妥当なgid かどうか確認
  2. gid があれば locator_for で、適切な finder を取得
  3. finder を使って、データを取得

この finder は、標準で、ActiveRecordFinder が用意されています。 よって、実際は、ActiveRecord.find を使って取り出します。

def locate(gid, options = {})
  if gid = GlobalID.parse(gid)
    locator_for(gid).locate gid if find_allowed?(gid.model_class, options[:only])
  end
end

def locator_for(gid)
  @locators.fetch(normalize_app(gid.app)) { default_locator }
end

@locators = {}

class ActiveRecordFinder
  def locate(gid)
    gid.model_class.find gid.model_id
  end
end

mattr_reader(:default_locator) { ActiveRecordFinder.new }

よって、ActiveRecord 以外でも、自前で locator を用意すれば、使うことが可能です。

custom-app-locator

# Tie a locator to an app.
# Useful when different apps collaborate and reference each others' Global IDs.
#
# The locator can be either a block or a class.
#
# Using a block:
#
#   GlobalID::Locator.use :foo do |gid|
#     FooRemote.const_get(gid.model_name).find(gid.model_id)
#   end
#
# Using a class:
#
#   GlobalID::Locator.use :bar, BarLocator.new
#
#   class BarLocator
#     def locate(gid)
#       @search_client.search name: gid.model_name, id: gid.model_id
#     end
#   end

ただ、実装としては、gid.model_class.find gid.model_id だけですので、 実装したいクラスに、find メソッドid メソッド を用意した方が楽な気がしますね。

まとめ

今回のまとめです。

  1. キューのためにデータをシリアライズするときは、データのズレを考慮し、バイナリダンプしない
  2. 永続化されたモデルは、グローバルIDを発行することで、クラスとidを使うよりも、シンプルに扱える
  3. プルリクを見る時は、謙虚に。しかし臆すること無く。

なお、4.2.0 の時点で、ActiveJob に キーワード引数を使うことはできませんが、 4.2.1 には入るようです。 しばらくは、キーワード引数を使わない実装で、頑張りましょう。 ActiveJob should support passing of keyword arguments to perform method · Issue #18741 · rails/rails

参考

TYPE_WHITELIST はプリミティブ型です

TYPE_WHITELIST = [ NilClass, Fixnum, Float, String, TrueClass, FalseClass, Bignum ]

Resque のダンプは MultiJSON

# Given a Ruby object, returns a string suitable for storage in a
# queue.
def encode(object)
  if MultiJson.respond_to?(:dump) && MultiJson.respond_to?(:load)
    MultiJson.dump object
  else
    MultiJson.encode object
  end

end

[10] pry(main)> MultiJson.dump(a)
"{\"a\":1}"
[11] pry(main)> a1 = MultiJson.load(MultiJson.dump(a))
{
    "a" => 1
}
[12] pry(main)> a.class == a1.class
false

ActiveJob は自前クラスは入れられない

[1] pry(main)> class AJob < ActiveJob::Base
[1] pry(main)*   queue_as :default
[1] pry(main)*   def perform(a)
[1] pry(main)*     p a
[1] pry(main)*   end
[1] pry(main)* end
:perform
[2] pry(main)> class A
[2] pry(main)*   def initialize(a)
[2] pry(main)*     @a = a
[2] pry(main)*   end
[2] pry(main)* end
:initialize
[3] pry(main)> a = A.new 1
#<A:0x007ff4d41b0a50 @a=1>

[4] pry(main)> AJob.perform_later(a)
Enqueued AJob (Job ID: 71481807-dae8-460f-a050-655ab0394bd5) to Resque(default) with arguments: #<A:0x007ff4d41b0a50 @a=1>
ActiveJob::SerializationError: Unsupported argument type: A

[5] pry(main)> AJob.perform_later({a: 1})
Enqueued AJob (Job ID: f9cf4409-f749-4714-81bf-aa309f400b60) to Resque(default) with arguments: {:a=>1}
#<AJob:0x007ff4d2c60128 @arguments=[{:a=>1}], @job_id="f9cf4409-f749-4714-81bf-aa309f400b60", @queue_name="default">

[40] pry(main)> user = User.first
  User Load (0.5ms)  SELECT  "users".* FROM "users"  ORDER BY "users"."id" ASC LIMIT 1
=> #<User:0x007fd142924738 id: 1, name: "aa", created_at: Sat, 28 Feb 2015 13:12:38 UTC +00:00, updated_at: Sat, 28 Feb 2015 13:12:38 UTC +00:00>

[41] pry(main)> AJob.perform_later(user)
Enqueued AJob (Job ID: 016c46b7-4abe-4cae-9605-84c29572faab) to Inline(default) with arguments: gid://test01/User/1
  User Load (1.2ms)  SELECT  "users".* FROM "users" WHERE "users"."id" = ? LIMIT 1  [["id", 1]]
Performing AJob from Inline(default) with arguments: gid://test01/User/1
#<User id: 1, name: "aa", created_at: "2015-02-28 13:12:38", updated_at: "2015-02-28 13:12:38">
Performed AJob from Inline(default) in 0.47ms
=> #<AJob:0x007fd144f6a6b8
 @arguments=[#<User:0x007fd142924738 id: 1, name: "aa", created_at: Sat, 28 Feb 2015 13:12:38 UTC +00:00, updated_at: Sat, 28 Feb 2015 13:12:38 UTC +00:00>],
 @job_id="016c46b7-4abe-4cae-9605-84c29572faab",
 @queue_name="default">

参考文献