ActiveJob and Sidekiq under the hood

Matera
7 min readApr 5, 2023

--

Rails is an amazing framework that comes with its own magic on many topics. Asynchronous job handling is one of them.

As developers, we favor dealing with tasks asynchronously whenever we can to enhance the user experience. To do so, we can benefit from the tools offered by job enqueuing gems. Many exist.

At Matera, we opted for Sidekiq, a well-known job enqueuer that has already proven its worth. But what’s the point of having Rails’ ActiveJob if async actions are already handled by a ruby gem?

Let’s take a look at the async stack to understand what’s going on a high level.

Global functioning

Sidekiq is a background process that stores async jobs on queues. For each queue, it unstacks the jobs and executes them one after the other. It can happen a few seconds or more after the job has been enqueued. Sidekiq handles the job storing using a NoSQL database, Redis.

As a company, you’re using a specific job enqueuer now, but in the future, you might want to switch to another one, be it for business or technical purposes. It can be a huge pain to have to deal with the migration of your codebase to adapt it to the new job enqueuer logic. That’s where ActiveJob comes into play!

Thanks to ActiveJob, You can change your job enqueuer whenever you want.

The schema below recaps the big picture of what’s happening.

Let’s dive in!

Now that we got how the big picture works, let’s take a look at the code! I mentioned Rails magic before, but how do we use this magic exactly? Very easily in fact. When you want to use an async task in Rails, you just need to create a specific class inheriting fromApplicationJob with its own #perform instance method as below :

class MyJob < ApplicationJob
def perform(*args)
# your logic here
end
end

Then, when you want to enqueue the job to be executed later, you just have to call a ::perform_later method on the class with the arguments.

MyJob.perform_later(*args)

You may have noticed we call a class method here that has a different name of the previous defined method, which is all the more an instance method! What’s going on here? It’s time to dig into ActiveJob source code!

ActiveJob does it for you

You can go into the source code of a gem on your machine with simple shell commands starting from your rails repo, which will allow you to open the gem’s repo :

$ bundle show activejob
/{your_computer_path}/.rvm/gems/ruby-3.1.3/gems/activejob-7.0.4.2
$ cd /{your_computer_path}/.rvm/gems/ruby-3.1.3/gems/activejob-7.0.4.2

You can find the ::perform_later method in the ActiveJob::Enqueuing::ClassMethods module.

def perform_later(...)
job = job_or_instantiate(...)
enqueue_result = job.enqueue

yield job if block_given?

enqueue_result
end

If we follow the flow, we see that the job.enqueue line calls the ::enqueue method of the queue_adapter. The job variable here corresponds to a new instance of our MyJob class initialized with the job arguments. And the queue_adapter variable corresponds to Sidekiq here, it is defined in the config/application.rbfile of our Rails repo.

config.active_job.queue_adapter = :sidekiq

ActiveJob has one class for each enqueuer in the ActiveJob::QueueAdapters module. In our case, it is the SidekiqAdapter class.
Below you have the SidekiqAdapter#enqueue method that is called above.

def enqueue(job)
job.provider_job_id = Sidekiq::Client.push \
"class" => JobWrapper,
"wrapped" => job.class,
"queue" => job.queue_name,
"args" => [ job.serialize ]
end

You can see that ActiveJob passes the ball to the Sidekiq gem here by calling Sidekiq::Client#push.

Before going into Sidekiq inner logic, let’s see the last argument we pass to sidekiq client here: "args" => [job.serialize].

Following the code, you’ll see that this method is defined in ActiveJob::Core module and that it eventually calls ::serialize on ActiveJob::Arguments module, which will map on the job arguments and serialize each of them. Our arguments won’t change for most of them. We will focus on one use case here, the one where you pass a class instance as a job argument.

Sidekiq stores the job’s info in Redis, which is a NoSQL database that does not know any of our models and database tables, and that does not manage any relations between tables. Rails needs a way to be able to find your object again when Sidekiq sends the job back to be executed (we’ll see how Sidekiq proceeds just after). You can pass the object directly and ActiveJob serializes it for you. How does it work?

Serializing an object

To serialize your object to be able to find it again after, ActiveJob uses the GlobalID gem and its #to_global_id method. Let’s say you have an object of the class MyClass with the id 7, you’ll have as below.

global_id = my_object.to_global_id
=> #<GlobalID:0x000000010a7ce880 @uri=#<URI::GID gid://application/MyClass/7>>
global_id.to_s
=> "gid://application/MyClass/7"

You can then easily find your object back.

GlobalID::Locator.locate("gid://application/MyClass/7")
=> #<MyClass:0x00000001086bb350 id: 7, ... >

ActiveJob passes the object argument to the queue_adapter as follow:

{ _aj_globalid: "gid://application/MyClass/7" }

We’ll see that ActiveJob finds the object back at the end of the flow, when Sidekiq passes the job back to be executed after unstacking it from the queue stored on Redis. Let’s see now what the Sidekiq::Client does after ActiveJob pushes passes it the job data.

Sidekiq’s enqueuing

After a few method calls ( #push, #raw_push, and so on), the Sidekiq::Client.push method will eventually call the #atomic_push method on the Sidekiq client instance. This #atomic_push stores the job on Redis (using a redis client, offered by the redis ruby gem).

If you’re interested, you can see the job in Redis by yourself before they are unstacked and executed :

Redis works with different independent databases, each referenced by an index. The object of this article is not to become a Redis expert. To understand the next few lines of code, just be aware that Redis stores data as key-value pairs in different formats. In our case, a specific Sidekiq queue is a Redis list, and you can access its content using lrange command.

The redis command line is accessible by entering the redis-cli command in your terminal. Let’s say you called aMyJob.perform_later(app_user, a:, b:)a few times (with different values for a and b arguments each time and the same app_user everytime) for a queue named my_queue. See below how we can access the queue data.

> select 12
OK
> keys *
1) "{given_repo_name}:schedule"
2) "{given_repo_name}:queues"
3) "queue:my_queue"
4) "{given_repo_name}:stat:failed"
5) "{given_repo_name}:stat:failed:2023-02-16"
6) "{given_repo_name}:processes"
7) "queues"
8) "{given_repo_name}:stat:processed:2023-02-16"
9) "{given_repo_name}:stat:processed"
> lrange queue:my_queue 0 -1
1) "{... ,\"queue\":\"my_queue\", ..., \"args\":[{\"job_class\":\"MyJob\",
\"arguments\":[{\"_aj_globalid\":\"gid://api-core/AppUser/128526\"},
{\"a\":5,\"b\":\"test_3\"}], ...}"
2) "{... ,\"queue\":\"my_queue\", ..., \"args\":[{\"job_class\":\"MyJob\",
\"arguments\":[{\"_aj_globalid\":\"gid://api-core/AppUser/128526\"},
{\"a\":5,\"b\":\"test_3\"}], ...}"
2) "{... ,\"queue\":\"my_queue\", ..., \"args\":[{\"job_class\":\"MyJob\",
\"arguments\":[{\"_aj_globalid\":\"gid://api-core/AppUser/128526\"},
{\"a\":5,\"b\":\"test_3\"}], ...}"

We ignored a lot of data stored in the jobs’ strings to enhance readability in the code above, but there is more.

You can see that the jobs are stored as strings in the JSON format. You also see the object serialized :

{\"_aj_globalid\":\"gid://api-core/AppUser/128526\"}

So, we stored our jobs into queues in a synchronous way. But the whole point of jobs is then to execute asynchronously. How does this happen?

Unstacking and executing jobs

Using Sidekiq, you’ll need to run the sidekiq command in the terminal to execute jobs. This command launches a background multithreads process (you can specify how many threads you want in sidekiq’s config file or directly in your CLI), which means that several job can be executed at the same time.

For instance, for our three jobs above, they could be executed at the same time if we have 3 or more threads on our process. But how does the sidekiq gem handles the job’s retrieving and executing for a thread?

At some point, the sidekiq command instantiates a Sidekiq::Manager with your options. Let’s say you defined a concurrency (i.e. number of threads) of 5. Then, the manager will instantiate five Sidekiq::Processor and call #start on each of them. You’ll see that Sidekiq::Processor#start opens a thread for a given processor instance and calls #run on the given instance.

That’s when the fun begins! Until we shut down the sidekiq process, each processor will be in an infinite loop consisting of the following:

  • First, the process sees if there is a job in the queue ;
  • Second, it fetches the job data in the queue and deletes it from Redis ;
  • Third, it processes the job (we’ll focus on this step in more details).

To process the job, Sidekiq loads the JSON from redis as a hash, so for instance, it will load something as below.

jobstr =
{
"retry"=>true,
"queue"=>"matera_development_default",
"class"=>"ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper",
"wrapped"=>"MyJob",
"args"=>
[
{
"job_class"=>"MyJob",
"job_id"=>"c89b798e-7710-4fc5-81d9-4565e0b15374",
"provider_job_id"=>nil,
"queue_name"=>"matera_development_default",
"priority"=>nil,
"arguments"=>
[
{"_aj_globalid"=>"gid://api-core/AppUser/128526"},
{"a"=>5, "b"=>"test_3", "_aj_ruby2_keywords"=>["a", "b"]}
],
"executions"=>0,
"exception_executions"=>{},
"locale"=>"fr",
"timezone"=>"Europe/Paris",
"enqueued_at"=>"2023-03-28T11:07:40Z"
}
],
"jid"=>"f77e55a46c2d1491556a74a1",
"created_at"=>1680001660.2047172,
"enqueued_at"=>1680001660.2049541
}

Then, Sidekiq instantiates an object from the “class” key with the “args” key’s value as initialization arguments. It calls #perform on the instance afterwards. In our case we’ll then have:

ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper.new(jobstr["args"]).perform

Sidekiq just passed back the job to the ActiveJob gem. The JobWrapper will execute the job. To do so, an new instance of our job is created using the “job_class” key. The MyJob instance is initialized with the hash in our “args” key. The arguments are deserialized then if needed. For instance, in our case, we will instantiate back our AppUser thanks to Global ID (remember this?). Then, eventually the #perform method is called on our job instance, the same #perform method we define every time we create a new job class!

If the job fails, it will be enqueued again depending on the retry boolean option and the number of times we allow it to be executed. There will be then a specific retry queue for a given initial queue on Redis.

In the meantime, each Sidekiq::Processor keep trying fetching new jobs from Redis and passing them to ActiveJob until we tell it to stop.

This was an overview of how ActiveJob and Sidekiq work together on a deeper level than usual. I hoped you enjoyed this article!

--

--

No responses yet