ActiveJob から見るシリアライズとデシリアライズ
皆さん、こんにちは。プルリクで間違った指摘をして大反省中 の osada です。
プルリクで間違った指摘をして大反省中です。 下記のコードで、何を指摘したか、お分かりになるでしょうか?
class NotificationJob < ActiveJob::Base queue_as :default def perform(notifiable, user) notifiable.notify(user) end end
こんなことを言ってしまったのです。
オブジェクトを丸ごとシリアライズすると、redis の容量を圧迫し、 シリアライズ・デシリアライズにも時間が掛かるので、 クラス名とidを渡して、job の中で取り出して使って下さい。
この発言には2つ、間違いがありました。
- 1つ目は、
オブジェクトが丸ごとシリアライズされると思っていた
こと。 - 2つ目は、
ActiveJob は クラス名と id を渡す必要がない
ことです。
今回は、ActiveJob では、インスタンスをそのまま渡してもよくなったこと、 そして、GlobalID がどのように使われているかを共有します。
話の流れとしては、下記の3つのシリアライズ方法の比較を行います。
Python memcache はインスタンスごとバイナリダンプ
python
の memcacheclient
は、インスタンスを丸ごとダンプします。
In [1]: class A(object): ...: def __init__(self, a): ...: self.a = a ...: In [2]: a = A(1) In [3]: import memcache In [5]: mc = memcache.Client(['127.0.0.1:11211']) In [7]: mc.set("a", a) Out[7]: True In [8]: mc.get("a") Out[8]: <__main__.A at 0x103efbc10> In [9]: a1 = mc.get("a") In [10]: a1.a Out[10]: 1 In [13]: a == a1 Out[13]: False In [14]: a.__class__ == a1.__class__ Out[14]: True
mc.get
で取り出した時に、Aというクラスのインスタンスとして取り出していることがお分かりになるでしょうか。
インスタンスとしては別ものですが、クラスは同じです。
memcache の set では、
オブジェクトをバイナリダンプするpickle
というモジュールが動いています。
In [6]: mc._val_to_store_info(a, 0) Out[6]: (1, 89, "ccopy_reg\n_reconstructor\np1\n(c__main__\nA\np2\nc__builtin__\nobject\np3\nNtRp4\n(dp5\nS'a'\nI1\nsb.")
12.1. pickle — Python オブジェクトの直列化 — Python 3.4.2 ドキュメント
謎の文字列が格納されていますが、なんとなく読み取れるような? ruby でも marshal を使えば、同じことができると思います。
そしてこれが、一つ目の間違いである、インスタンスを丸ごとダンプしている
と考えた理由でした。
きっと rails も、バイナリダンプしているのだろう、と思い込んでしまったのです。
Resque のシリアライズ
では、Rescue はどうしているでしょうか?
[31] pry(main)> class A [31] pry(main)* def initialize(a) [31] pry(main)* @a = a [31] pry(main)* end [31] pry(main)* end :initialize [32] pry(main)> a = A.new 1 #<A:0x007fa192282b80 @a=1>
A
というクラスを定義し、インスタンス化します。
[33] pry(main)> class Q [33] pry(main)* @queue = :q [33] pry(main)* def self.perform(a) [33] pry(main)* p a [33] pry(main)* end [33] pry(main)* end :perform [34] pry(main)> Resque.enqueue(Q, a) true [35] pry(main)> Resque.keys [ [0] "queue:q", [1] "queues" ] [38] pry(main)> Resque.redis.lrange "queue:q", 0, -1 [ [0] "{\"class\":\"Q\",\"args\":[{\"a\":1}]}" ]
それをQ
というジョブを作成し、Resque に渡します。
このとき、ジョブQ
と、インスタンスa
が redis
に積まれますが、
このとき引数のa
が何のクラスだったのか、という情報は消えるようです。
(resque 1.23 です)
redis に積まれた情報だけでは、復元することができません。
なぜかといえば、そもそもインスタンスをダンプしない方針のようです。
If your jobs were run against marshaled objects, they could potentially be operating on a stale record with out-of-date information.
バイナリダンプしてキューに積んだら、 データが更新されてた時に、古いデータのままで使っちゃうかもしれないでしょ?
という感じでしょうか?
Resque では、クラスとidを渡して、キューの中で取り出すのが標準です。
さてこれが、2つ目の間違い、クラス名とidを渡して、jobの中で呼び出す
という指摘をした原因です。
ActiveJob
も同様だろう、と思い込んでしまったのです。
ActiveJob のシリアライズ
ActiveJobでは、GlobalID
という機能が入っており、明示的なシリアライズが不要です。
マニュアルに明確に記載がありましたので、確認不足すぎて大反省でした。
Active Job の基礎 — Rails ガイド
バイナリダンプでもないけれども、インスタンスをそのまま渡すことができる、 一体これがどのように動いているのでしょうか?
シリアライズの処理は、activejob-4.2.0/lib/active_job/argument.rb
にあります。
def serialize_argument(argument) case argument when *TYPE_WHITELIST argument when GlobalID::Identification { GLOBALID_KEY => argument.to_global_id.to_s } when Array argument.map { |arg| serialize_argument(arg) } when Hash argument.each_with_object({}) do |(key, value), hash| hash[serialize_hash_key(key)] = serialize_argument(value) end else raise SerializationError.new("Unsupported argument type: #{argument.class.name}") end end
引数を一つずつ処理しながら、
- シリアライズできるもの(
TYPE_WHITELIST
) GlobalID::Identification
であるもの- 処理できないもの
に分けています(ArrayとHashに対して再帰するのは、引数の処理として勉強になりますね)。
GlobalID::Identification
そしてそのメソッドである to_global_id.to_s
というのが ActiveRecord を一意に成り立たせているものです。
[1] pry(main)> user = User.first User Load (0.5ms) SELECT "users".* FROM "users" ORDER BY "users"."id" ASC LIMIT 1 => #<User:0x007fd1449c3b88 id: 1, name: "aa", created_at: Sat, 28 Feb 2015 13:12:38 UTC +00:00, updated_at: Sat, 28 Feb 2015 13:12:38 UTC +00:00> [2] pry(main)> user.to_global_id.to_s => "gid://test01/User/1" [3] pry(main)> user = User.new => #<User:0x007fd145d4feb8 id: nil, name: nil, created_at: nil, updated_at: nil> [4] pry(main)> user.to_global_id.to_s URI::InvalidURIError: Expected a URI like gid://app/Person/1234: #<URI::Generic gid://test01/User/>
アプリ名、クラス名、id
で一意になる文字列を生成しています。
id が無ければ、エラーになります。
そんなわけですから、マニュアルには、ActiveModel にミックスイン、とありますが、
Rails 4.2.0
では、GlobalID
というライブラリに変わっており、
ActiveRecord にミックスイン になっていますので、気をつけましょう。
上のコードは、ActiveModel::GlobalIdentificationをミックスインするすべてのクラスで動作します。 このモジュールはActive Modelクラスにデフォルトでミックスインされます。 http://railsguides.jp/active_job_basics.html#globalid
この GlobalID のおかげで、ActiveRecord のインスタンスを渡したとき、 global_id の文字列に変換されて積まれる、ということです。
手動でやっていたことを、ライブラリとして組み込んでしまった感じでしょうか。 また、クラス名と、idを渡すのではなく、1つの gid として渡すのも、センスが良いですね。
ActiveJob の デシリアライズ
デシリアライズを見てみましょう。
GlobalID::Locator.locate(argument)
を使って、
globalid かどうかを確認しています。
def deserialize_argument(argument) case argument when String GlobalID::Locator.locate(argument) || argument when *TYPE_WHITELIST argument when Array argument.map { |arg| deserialize_argument(arg) } when Hash if serialized_global_id?(argument) deserialize_global_id argument else deserialize_hash argument end else raise ArgumentError, "Can only deserialize primitive arguments: #{argument.inspect}" end end
この locate
は、下記の手順でデータを取り出します。
string
が 妥当なgid
かどうか確認gid
があればlocator_for
で、適切なfinder
を取得finder
を使って、データを取得
この finder
は、標準で、ActiveRecordFinder
が用意されています。
よって、実際は、ActiveRecord.find
を使って取り出します。
def locate(gid, options = {}) if gid = GlobalID.parse(gid) locator_for(gid).locate gid if find_allowed?(gid.model_class, options[:only]) end end def locator_for(gid) @locators.fetch(normalize_app(gid.app)) { default_locator } end @locators = {} class ActiveRecordFinder def locate(gid) gid.model_class.find gid.model_id end end mattr_reader(:default_locator) { ActiveRecordFinder.new }
よって、ActiveRecord 以外でも、自前で locator を用意すれば、使うことが可能です。
# Tie a locator to an app. # Useful when different apps collaborate and reference each others' Global IDs. # # The locator can be either a block or a class. # # Using a block: # # GlobalID::Locator.use :foo do |gid| # FooRemote.const_get(gid.model_name).find(gid.model_id) # end # # Using a class: # # GlobalID::Locator.use :bar, BarLocator.new # # class BarLocator # def locate(gid) # @search_client.search name: gid.model_name, id: gid.model_id # end # end
ただ、実装としては、gid.model_class.find gid.model_id
だけですので、
実装したいクラスに、find
メソッドと id
メソッド を用意した方が楽な気がしますね。
まとめ
今回のまとめです。
- キューのためにデータをシリアライズするときは、データのズレを考慮し、バイナリダンプしない
- 永続化されたモデルは、グローバルIDを発行することで、クラスとidを使うよりも、シンプルに扱える
- プルリクを見る時は、謙虚に。しかし臆すること無く。
なお、4.2.0
の時点で、ActiveJob に キーワード引数を使うことはできませんが、
4.2.1
には入るようです。
しばらくは、キーワード引数を使わない実装で、頑張りましょう。
ActiveJob should support passing of keyword arguments to perform method · Issue #18741 · rails/rails
参考
TYPE_WHITELIST はプリミティブ型です
TYPE_WHITELIST = [ NilClass, Fixnum, Float, String, TrueClass, FalseClass, Bignum ]
Resque のダンプは MultiJSON
# Given a Ruby object, returns a string suitable for storage in a # queue. def encode(object) if MultiJson.respond_to?(:dump) && MultiJson.respond_to?(:load) MultiJson.dump object else MultiJson.encode object end end [10] pry(main)> MultiJson.dump(a) "{\"a\":1}" [11] pry(main)> a1 = MultiJson.load(MultiJson.dump(a)) { "a" => 1 } [12] pry(main)> a.class == a1.class false
ActiveJob は自前クラスは入れられない
[1] pry(main)> class AJob < ActiveJob::Base [1] pry(main)* queue_as :default [1] pry(main)* def perform(a) [1] pry(main)* p a [1] pry(main)* end [1] pry(main)* end :perform [2] pry(main)> class A [2] pry(main)* def initialize(a) [2] pry(main)* @a = a [2] pry(main)* end [2] pry(main)* end :initialize [3] pry(main)> a = A.new 1 #<A:0x007ff4d41b0a50 @a=1> [4] pry(main)> AJob.perform_later(a) Enqueued AJob (Job ID: 71481807-dae8-460f-a050-655ab0394bd5) to Resque(default) with arguments: #<A:0x007ff4d41b0a50 @a=1> ActiveJob::SerializationError: Unsupported argument type: A [5] pry(main)> AJob.perform_later({a: 1}) Enqueued AJob (Job ID: f9cf4409-f749-4714-81bf-aa309f400b60) to Resque(default) with arguments: {:a=>1} #<AJob:0x007ff4d2c60128 @arguments=[{:a=>1}], @job_id="f9cf4409-f749-4714-81bf-aa309f400b60", @queue_name="default"> [40] pry(main)> user = User.first User Load (0.5ms) SELECT "users".* FROM "users" ORDER BY "users"."id" ASC LIMIT 1 => #<User:0x007fd142924738 id: 1, name: "aa", created_at: Sat, 28 Feb 2015 13:12:38 UTC +00:00, updated_at: Sat, 28 Feb 2015 13:12:38 UTC +00:00> [41] pry(main)> AJob.perform_later(user) Enqueued AJob (Job ID: 016c46b7-4abe-4cae-9605-84c29572faab) to Inline(default) with arguments: gid://test01/User/1 User Load (1.2ms) SELECT "users".* FROM "users" WHERE "users"."id" = ? LIMIT 1 [["id", 1]] Performing AJob from Inline(default) with arguments: gid://test01/User/1 #<User id: 1, name: "aa", created_at: "2015-02-28 13:12:38", updated_at: "2015-02-28 13:12:38"> Performed AJob from Inline(default) in 0.47ms => #<AJob:0x007fd144f6a6b8 @arguments=[#<User:0x007fd142924738 id: 1, name: "aa", created_at: Sat, 28 Feb 2015 13:12:38 UTC +00:00, updated_at: Sat, 28 Feb 2015 13:12:38 UTC +00:00>], @job_id="016c46b7-4abe-4cae-9605-84c29572faab", @queue_name="default">
参考文献
- Active Job の基礎 — Rails ガイド
- Rails - Active Jobについて - Qiita
- resque/resque at 1-x-stable
- Resqueで色々やって、Redisに何が格納されているのか調べてみた - きたけーの朝は早いブログ
- Redis に保存されてる値を見ようと思った時に覚えておきたい redis コマンド | そんなこと覚えてない
- リスト型 — redis 2.0.3 documentation
- rails/globalid
- rails/activemodel-globalid
- python-memcached/memcache.py at master · linsomniac/python-memcached
- 12.1. pickle — Python オブジェクトの直列化 — Python 3.4.2 ドキュメント