Rails is an amazing framework that comes with its own magic on many topics. Asynchronous job handling is one of them.
As developers, we favor dealing with tasks asynchronously whenever we can to enhance the user experience. To do so, we can benefit from the tools offered by job enqueuing gems. Many exist.
At Matera, we opted for Sidekiq, a well-known job enqueuer that has already proven its worth. But what’s the point of having Rails’ ActiveJob
if async actions are already handled by a ruby gem?
Let’s take a look at the async stack to understand what’s going on a high level.
Global functioning
Sidekiq is a background process that stores async jobs on queues. For each queue, it unstacks the jobs and executes them one after the other. It can happen a few seconds or more after the job has been enqueued. Sidekiq handles the job storing using a NoSQL database, Redis.
As a company, you’re using a specific job enqueuer now, but in the future, you might want to switch to another one, be it for business or technical purposes. It can be a huge pain to have to deal with the migration of your codebase to adapt it to the new job enqueuer logic. That’s where ActiveJob
comes into play!
Thanks to ActiveJob
, You can change your job enqueuer whenever you want.
The schema below recaps the big picture of what’s happening.
Let’s dive in!
Now that we got how the big picture works, let’s take a look at the code! I mentioned Rails magic before, but how do we use this magic exactly? Very easily in fact. When you want to use an async task in Rails, you just need to create a specific class inheriting fromApplicationJob
with its own #perform
instance method as below :
class MyJob < ApplicationJob
def perform(*args)
# your logic here
end
end
Then, when you want to enqueue the job to be executed later, you just have to call a ::perform_later
method on the class with the arguments.
MyJob.perform_later(*args)
You may have noticed we call a class method here that has a different name of the previous defined method, which is all the more an instance method! What’s going on here? It’s time to dig into ActiveJob
source code!
ActiveJob does it for you
You can go into the source code of a gem on your machine with simple shell commands starting from your rails repo, which will allow you to open the gem’s repo :
$ bundle show activejob
/{your_computer_path}/.rvm/gems/ruby-3.1.3/gems/activejob-7.0.4.2
$ cd /{your_computer_path}/.rvm/gems/ruby-3.1.3/gems/activejob-7.0.4.2
You can find the ::perform_later method in the ActiveJob::Enqueuing::ClassMethods
module.
def perform_later(...)
job = job_or_instantiate(...)
enqueue_result = job.enqueue
yield job if block_given?
enqueue_result
end
If we follow the flow, we see that the job.enqueue line calls the ::enqueue
method of the queue_adapter
. The job variable here corresponds to a new instance of our MyJob
class initialized with the job arguments. And the queue_adapter
variable corresponds to Sidekiq here, it is defined in the config/application.rb
file of our Rails repo.
config.active_job.queue_adapter = :sidekiq
ActiveJob
has one class for each enqueuer in the ActiveJob::QueueAdapters
module. In our case, it is the SidekiqAdapter
class.
Below you have the SidekiqAdapter#enqueue
method that is called above.
def enqueue(job)
job.provider_job_id = Sidekiq::Client.push \
"class" => JobWrapper,
"wrapped" => job.class,
"queue" => job.queue_name,
"args" => [ job.serialize ]
end
You can see that ActiveJob
passes the ball to the Sidekiq gem here by calling Sidekiq::Client#push
.
Before going into Sidekiq inner logic, let’s see the last argument we pass to sidekiq client here: "args" => [job.serialize]
.
Following the code, you’ll see that this method is defined in ActiveJob::Core
module and that it eventually calls ::serialize on ActiveJob::Arguments
module, which will map on the job arguments and serialize each of them. Our arguments won’t change for most of them. We will focus on one use case here, the one where you pass a class instance as a job argument.
Sidekiq stores the job’s info in Redis, which is a NoSQL database that does not know any of our models and database tables, and that does not manage any relations between tables. Rails needs a way to be able to find your object again when Sidekiq sends the job back to be executed (we’ll see how Sidekiq proceeds just after). You can pass the object directly and ActiveJob
serializes it for you. How does it work?
Serializing an object
To serialize your object to be able to find it again after, ActiveJob
uses the GlobalID gem and its #to_global_id
method. Let’s say you have an object of the class MyClass
with the id 7, you’ll have as below.
global_id = my_object.to_global_id
=> #<GlobalID:0x000000010a7ce880 @uri=#<URI::GID gid://application/MyClass/7>>
global_id.to_s
=> "gid://application/MyClass/7"
You can then easily find your object back.
GlobalID::Locator.locate("gid://application/MyClass/7")
=> #<MyClass:0x00000001086bb350 id: 7, ... >
ActiveJob
passes the object argument to the queue_adapter as follow:
{ _aj_globalid: "gid://application/MyClass/7" }
We’ll see that ActiveJob
finds the object back at the end of the flow, when Sidekiq passes the job back to be executed after unstacking it from the queue stored on Redis. Let’s see now what the Sidekiq::Client
does after ActiveJob
pushes passes it the job data.
Sidekiq’s enqueuing
After a few method calls ( #push
, #raw_push
, and so on), the Sidekiq::Client.push
method will eventually call the #atomic_push
method on the Sidekiq client instance. This #atomic_push
stores the job on Redis (using a redis client, offered by the redis ruby gem).
If you’re interested, you can see the job in Redis by yourself before they are unstacked and executed :
Redis works with different independent databases, each referenced by an index. The object of this article is not to become a Redis expert. To understand the next few lines of code, just be aware that Redis stores data as key-value pairs in different formats. In our case, a specific Sidekiq queue is a Redis list, and you can access its content using lrange
command.
The redis command line is accessible by entering the redis-cli
command in your terminal. Let’s say you called aMyJob.perform_later(app_user, a:, b:)
a few times (with different values for a and b arguments each time and the same app_user everytime) for a queue named my_queue
. See below how we can access the queue data.
> select 12
OK
> keys *
1) "{given_repo_name}:schedule"
2) "{given_repo_name}:queues"
3) "queue:my_queue"
4) "{given_repo_name}:stat:failed"
5) "{given_repo_name}:stat:failed:2023-02-16"
6) "{given_repo_name}:processes"
7) "queues"
8) "{given_repo_name}:stat:processed:2023-02-16"
9) "{given_repo_name}:stat:processed"
> lrange queue:my_queue 0 -1
1) "{... ,\"queue\":\"my_queue\", ..., \"args\":[{\"job_class\":\"MyJob\",
\"arguments\":[{\"_aj_globalid\":\"gid://api-core/AppUser/128526\"},
{\"a\":5,\"b\":\"test_3\"}], ...}"
2) "{... ,\"queue\":\"my_queue\", ..., \"args\":[{\"job_class\":\"MyJob\",
\"arguments\":[{\"_aj_globalid\":\"gid://api-core/AppUser/128526\"},
{\"a\":5,\"b\":\"test_3\"}], ...}"
2) "{... ,\"queue\":\"my_queue\", ..., \"args\":[{\"job_class\":\"MyJob\",
\"arguments\":[{\"_aj_globalid\":\"gid://api-core/AppUser/128526\"},
{\"a\":5,\"b\":\"test_3\"}], ...}"
We ignored a lot of data stored in the jobs’ strings to enhance readability in the code above, but there is more.
You can see that the jobs are stored as strings in the JSON format. You also see the object serialized :
{\"_aj_globalid\":\"gid://api-core/AppUser/128526\"}
So, we stored our jobs into queues in a synchronous way. But the whole point of jobs is then to execute asynchronously. How does this happen?
Unstacking and executing jobs
Using Sidekiq, you’ll need to run the sidekiq command in the terminal to execute jobs. This command launches a background multithreads process (you can specify how many threads you want in sidekiq’s config file or directly in your CLI), which means that several job can be executed at the same time.
For instance, for our three jobs above, they could be executed at the same time if we have 3 or more threads on our process. But how does the sidekiq gem handles the job’s retrieving and executing for a thread?
At some point, the sidekiq command instantiates a Sidekiq::Manager
with your options. Let’s say you defined a concurrency (i.e. number of threads) of 5. Then, the manager will instantiate five Sidekiq::Processor
and call #start
on each of them. You’ll see that Sidekiq::Processor#start
opens a thread for a given processor instance and calls #run
on the given instance.
That’s when the fun begins! Until we shut down the sidekiq process, each processor will be in an infinite loop consisting of the following:
- First, the process sees if there is a job in the queue ;
- Second, it fetches the job data in the queue and deletes it from Redis ;
- Third, it processes the job (we’ll focus on this step in more details).
To process the job, Sidekiq loads the JSON from redis as a hash, so for instance, it will load something as below.
jobstr =
{
"retry"=>true,
"queue"=>"matera_development_default",
"class"=>"ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper",
"wrapped"=>"MyJob",
"args"=>
[
{
"job_class"=>"MyJob",
"job_id"=>"c89b798e-7710-4fc5-81d9-4565e0b15374",
"provider_job_id"=>nil,
"queue_name"=>"matera_development_default",
"priority"=>nil,
"arguments"=>
[
{"_aj_globalid"=>"gid://api-core/AppUser/128526"},
{"a"=>5, "b"=>"test_3", "_aj_ruby2_keywords"=>["a", "b"]}
],
"executions"=>0,
"exception_executions"=>{},
"locale"=>"fr",
"timezone"=>"Europe/Paris",
"enqueued_at"=>"2023-03-28T11:07:40Z"
}
],
"jid"=>"f77e55a46c2d1491556a74a1",
"created_at"=>1680001660.2047172,
"enqueued_at"=>1680001660.2049541
}
Then, Sidekiq instantiates an object from the “class” key with the “args” key’s value as initialization arguments. It calls #perform
on the instance afterwards. In our case we’ll then have:
ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper.new(jobstr["args"]).perform
Sidekiq just passed back the job to the ActiveJob gem. The JobWrapper
will execute the job. To do so, an new instance of our job is created using the “job_class” key. The MyJob
instance is initialized with the hash in our “args” key. The arguments are deserialized then if needed. For instance, in our case, we will instantiate back our AppUser
thanks to Global ID (remember this?). Then, eventually the #perform
method is called on our job instance, the same #perform
method we define every time we create a new job class!
If the job fails, it will be enqueued again depending on the retry boolean option and the number of times we allow it to be executed. There will be then a specific retry queue for a given initial queue on Redis.
In the meantime, each Sidekiq::Processor
keep trying fetching new jobs from Redis and passing them to ActiveJob
until we tell it to stop.
This was an overview of how ActiveJob
and Sidekiq work together on a deeper level than usual. I hoped you enjoyed this article!