Logo

Recollect Engineering

  • Archive
  • RSS

Queue within a queue

This article is about queues. If you’re not familiar with queues, here’s a good introduction to them.

The Problem

Less than 36 hours after we launched our splash page, I noticed our queue wasn’t draining.

Sometimes queues don’t drain as fast as you would like them to but since we’re running on Amazon EC2, we have a great contingency plan for this scenario: Spin up more boxes. We can spin up a few High-CPU instances, put the worker code on them and drain the queue until it goes down to 0 and then shutdown the boxes. It costs us less than a cup of coffee. If it kept happening, we might increase the number of permanent dedicated Offline Task boxes or even write a script that monitors the size of the queue and automatically spin up and shut down High-CPU instances to handle the load. Unfortunately, the problem we were having was that the queue wasn’t draining at all.

We use gearman at Recollect. I’ve seen a few queueing systems over the years, including two iterations of Flickr’s homegrown system and Leonard Lin’s system for Barack Obama’s 2008 campaign, which had a nice Amazon SQS fallback because apparently when you process contributions for a political campaign it’s a good idea to have a redundant queue in case your main queue goes down. I’d also rolled my own for Coca Cola. It seems to me that there’s a point where every engineer wants to write their own blogging system and those engineers eventually graduate to wanting to write their own queueing system. This isn’t a bad thing, it’s good to know how the internals of such things work, but I no longer have that desire. I had experimented with Gearman in the past with good results, so we went with it.

I SSH’d into our Offline Task box and noticed that the gearmand process was consuming between 70-90% of the CPU. The workers were all running but not receiving any new tasks and there were over 60,000 tasks in the system. I checked the gearman and worker logs and then ran strace on the processes. Then I shutdown the workers, restarted gearman, and started them up again. They started to process tasks and I monitored it for a few minutes and sure enough, after a while the workers stopped receiving tasks and the gearmand process gradually took over the whole CPU. The logs and straces were similar. Over the next few hours, I repeated this process a few times.

We use a persistent storage layer for Gearman so that if you shutdown the daemon and start it again, it still has the same tasks in the system that it had before. I knew that if I deleted all the tasks and started it again, it would be fine, but then we would just be rolling the dice to when this would happen again.

The Diagnosis

At first I got side-tracked thinking this was the bug that I was encountering, so I dutifully upgraded Gearman to the newest version but had the same results. I also compared my Gearman worker code to similar implementations and watched most of this presentation at Etsy on the off chance that someone mentioned this issue. Finally I shutdown the gearman server and examined the tasks themselves and noticed an interesting characteristic, over 5000 of them were tasks that had future timestamps.

Many of our offline tasks interact with the Twitter REST API and if you’ve done any work with that API, you know that Twitter rate-limits you to 350 OAuth calls per hour, per authed user. Once you reach your limit for the hour, you must throttle your requests to the Twitter API and wait out the remainder of the hour. Twitter even facilitates this by sending you headers with how many API calls you have remaining and if you have reached the limit, the Unix timestamp when you can resume hammering their API.

When we built Gearman into our system, I researched ways for it to handle this rate-limiting and found that recent versions of Gearman support a command called SUBMIT_JOB_EPOCH, which lets you specify that a job will not run before a certain timestamp. This sounded like exactly what we needed - when a task that worked with the Twitter API got a response from Twitter that we had been rate-limited, we could simply re-insert the same task into the system with SUBMIT_JOB_EPOCH using the same Unix timestamp that Twitter had provided for when to resume the API call. I found that neither the PECL extension nor the PEAR Net::Gearman libraries supported this new command, so I patched it into the PEAR library and we started using it in production. It worked under the small loads that we put upon it.

I decided to run an experiment. While the gearman server was shutdown, I modified the tasks in our persistent store to all run immediately and when they re-inserted due to rate-limiting, to not use SUBMIT_JOB_EPOCH but instead immediately re-insert the task for processing. This would mean that we were trying to run tasks that we knew could not be run because Twitter had already rate-limited them but we could at least test the possibility that SUBMIT_JOB_EPOCH was inefficient and crashing gearmand. Sure enough, after I restarted the server, it started processing tasks and eventually drained all 60,000 tasks.

Now that I had identified that SUBMIT_JOB_EPOCH almost certainly had something to do with gearmand hanging, I did some more research on it and found its origin story. Looking over it’s code, I realized that the process was likely iterating through a linked list of 5000+ future jobs over and over again which might explain why the gearmand CPU skyrocketed. Technically I could just move the gearmand process to a beefier server, but logically we would just have the same problem once we reached 10,000 or 25,000 future jobs. No, as per Unix philosophy, we would just have to provide that functionality somewhere else.

Here’s the problem: we have a bunch of jobs, each with a timestamp that specifies when it needs to be run in the future. If we do the “Stupidest Thing That Works ™”, then we insert these into a mySQL table and have a cronjob that retrieves and deletes rows that are earlier or equal to the current time and insert these all into Gearman. At first glance, this seems appealing, but part of the reason we’re using Gearman in the first place is so we don’t have a crappy mySQL queue that has to be polled. Then I thought, we could do it in memory - we could have an array hashed by timestamp and then check if the first timestamp is earlier than the current time and if not, have the system sleep until that time. But then we have a problem, how do we handle inserting new tasks into the system if the system is sleeping? If we don’t sleep, do we poll? How often do we poll and if it’s too often, isn’t that exactly what Gearman is doing in the first place?

Usually when I can’t figure out how to code something, I sleep on it and without fail halfway through my shower the next day it occurs to me. In this case, I had lunch instead and halfway through lunch I realized that I had been approaching it all wrong - why not just use a system with events already built into it?

The Code

60 lines of node.js flavored Javascript later (newest version here):

var gearman = require("gearman"),
    gearman_client = gearman.createClient(4730, "localhost");

var http = require('http');

http.createServer(function (req, res) {

    req.setEncoding('utf8');

    var fullBody = '';

    req.on('data', function(chunk) {
      fullBody += chunk.toString();
    });

    req.on('end', function() {

        res.writeHead(200, {'Content-Type': 'text/plain'});
        res.end('SUCCESS\n');

        var data = JSON.parse(fullBody);

        var recv_string = 'Received Job: ' + data.job.name + ' - ' + data.job.workload;

        var now = Math.round(new Date().getTime() / 1000);

        if ((data.when_to_run - now) <= 0) {

            console.log(recv_string + ' - executing now!');

            job = data.job;

            gearman_client.submitJob(job.name, job.workload, {
                background: true,
                priority: 'normal',
                encoding: 'utf8',
                uniqid: data.uniqid
            });

        } else {

            console.log(recv_string + ' - executing in ' + (data.when_to_run - now) + ' seconds.');

            setTimeout(function(job) {

                console.log('Executing Job: ' + job.name + ' - ' + job.workload);

                gearman_client.submitJob(job.name, job.workload, {
                    background: true,
                    priority: 'normal',
                    encoding: 'utf8',
                    uniqid: data.uniqid
                });

            }, (data.when_to_run - now) * 1000, data.job);

        }

    });

}).listen(10101, "127.0.0.1");

In our worker code, I just make a POST request to this server with a JSON object containing the job and when the job should run. The server leverages setTimeout to delay the Gearman insert until the epoch specified. Since node.js is single threaded, it is not guaranteed that the Gearman insert will happen exactly at the epoch, but this isn’t a requirement for our rate-limiting case - we just need it to happen some time after the epoch. We run it through forever to daemonize it and that’s about it - it took me longer to compile node.js on our EC2 instance than it did to write the program. Gearman continues doing what it does best, sending tasks to workers without blinking, and this does what it does best, firing events more or less when it’s supposed to fire them.

Some Final Thoughts

Earlier in the post, I talked about how I didn’t have a desire to write a queue and then I did, which just goes to show you that engineers write queues even when they don’t want to write them - our brains are just wired that way.

Last but not least, I like to name our daemons and tools by describing what they do. This program delivers messages to the future. At the end of Back to the Future Part II, Doc Brown is living in 1885 and needs to send a message to Marty McFly in 1955, so he leaves a letter with a Western Union. In this one case, Western Union did manage to successfully deliver a message to the future, so I named it westernunion. There’s also an episode of Quantum Leap where Sam needs to get a message to Ziggy to open a chamber door in 1999 but they’re in 1945, so they mail a letter to his dad’s lawyer. I think you will agree that westernunion sounded like a better delivery mechanism than dadslawyer.


Bertrand Fan, co-founder

    • #queue
    • #gearman
  • 1 year ago
  • 4
  • Comments
  • Permalink
  • Share
    Tweet

4 Notes/ Hide

  1. hartsell likes this
  2. naifalmadhi likes this
  3. joshuanguyen reblogged this from recollectcode and added:
    Bert lays out some good thoughts about working with Gearman for his new startup.It’s a great look into smart...
  4. recollectcode posted this

Recent comments

Blog comments powered by Disqus
← Previous • Next →
This is the code blog describing the engineering efforts behind Recollect.
  • RSS
  • Random
  • Archive
  • Mobile

Effector Theme by Carlo Franco.

Powered by Tumblr