In summary -- their RabbitMQ consumer library and config is broken in that their consumers are fetching additional messages when they shouldn't. I've never seen this in years of dealing with RabbitMQ. This caused a cascading failure in that consumers were unable to grab messages, rightfully, when only one of the messages was manually ack'ed. Fixing this one fetch issue with their consumer would have fixed the entire problem. Switching to pg probably caused them to rewrite their message fetching code, which probably fixed the underlying issue.
It ultimately doesn't matter because of the low volume they're dealing with, but gang, "just slap a queue on it" gets you the same results as "just slap a cache on it" if you don't understand the tool you're working with. If they knew that some jobs would take hours and some jobs would take seconds, why would you not immediately spin up four queues. Two for the short jobs (one acting as a DLQ), and two for the long jobs (again, one acting as a DLQ). Your DLQ queues have a low TTL, and on expiration those messages get placed back onto the tail of the original queues. Any failure by your consumer, and that message gets dropped onto the DLQ and your overall throughput is determined by the number * velocity of your consumers, and not on your queue architecture.
This pg queue will last a very long time for them. Great! They're willing to give up the easy fanout architecture for simplicity, which again at their volume, sure, that's a valid trade. At higher volumes, they should go back to the drawing board.
If they knew that some jobs would take hours and some jobs would take seconds, why would you not immediately spin up four queues. Two for the short jobs (one acting as a DLQ), and two for the long jobs (again, one acting as a DLQ). Your DLQ queues have a low TTL, and on expiration those messages get placed back onto the tail of the original queues.
Here is why I would not recommend that.
Do that and you have to rewrite your system around predictions how long each job will take, deal with 4 sources of failure, and have more complicated code. All this to maintain the complication of an ultimately unneeded queueing system. You call it an "easy fanout architecture for simplicity." I call it, "an entirely unnecessary complication that they have no business having to deal with."
If they get to a volume where they should go back to the drawing board, then they can worry about it then. This would be a good problem to have. But there is no need to worry about it now.
Agreed. I previously worked for a well known service provider that relied on a single SQL database for queueing hundreds of jobs per second across all regions. The system was well understood by everyone at the company and the company had a surprisingly high market cap. You can get very far with the simple thing.
> In summary -- their RabbitMQ consumer library and config is broken in that their consumers are fetching additional messages when they shouldn't. I've never seen this in years of dealing with RabbitMQ.
What do you mean by "broken"? Are you implying that the behavior they're describing is not the way the consumer library is supposed to work? They linked to RabbitMQ's documentation basically saying that's exactly how it works. Also, where do you get the sense that they've misconfigured it? You've made these statements, but did not exactly enlightened us as to how one should set things up to have consumers handle exactly one job at a time. That was their only problem (Edit: an answer by @whakim https://news.ycombinator.com/item?id=35530108 is providing more light on this).
The rest of your answer sanctimoniously presumes that they don't know how to use the tool, but your own proposed solution is moot, as it seems to address a different problem, not the one that they have (1 job per consumer max).
The architectural error was to have messages acknowledged at end of long processing, probably to avoid to handle messages produced from the worker, instead of having two messages with quick ack, one message at start, one message at end from worker.
RabbitMQ jobs is to handle message transfert, if you tie business logics (jobs completion) to its state, you have a problem.
So basically, producer and consumer MUST have their own storage for tracking processes, typically by ID, RabbitMQ jobs being the message handler for synchin' states.
"two messages with quick ack, one message at start, one message at end from worker."
how do you deal with worker getting killed in that scenario ? if you can't rely on the queue job state then you need another whole set of code somewhere to handle timeouts & retries, don't you ?
A worker, or at least, the consumer receiving the messages and spawning sub workers, when getting killed, should restart at some point, inspecting jobs ids unfinished, the one stored just before ack'ing reception, and either resume work or notify failure.
Answering is the worker sole responsibility, and no viable implementation of an app can reliably try to substitute to that.
As for timeouts, they are necessary when waiting for answers from services that are not under your control, eg, you cannot assume they will answer you in all cases. Under your control, you have to keep yourself in a situation where worker response is guaranteed, success or failure.
I feel like this would reimplement a lot of the machinery that's built-in the job queue (atomic state locking to take ownership of the job, with deadlocking prevention mechanism to make sure a killed process doesn't keep that job locked, and timeouts).
At this point, why not throw the whole job queue away and simply use that system ?
I wish you wouldn't speak so authoritatively because much of this is just one way (and hardly the only way) to implement such a system. If you read the RabbitMQ docs (see https://www.rabbitmq.com/confirms.html) you'll see that ACK'ing the messages after processing is explicitly described as a way to handle consumer failures.
Depends on your definition of processing. In the case I describe the processing is receiving and recording the job id sent by the producer. Ack should be sent asap so that all non-business issues on message transmissions should be handled by messaging logics.
Having the jobs processing itself being independent of the messaging solution seems like good way to go as the actual message implementation, might have to change. I really think that you don't want ossification of your messaging implementation detail on your business logics.
I did not pretend that it was the only solution, merely just something I know would be reliable, an implicit «one way to it».
Again though, RabbitMQ and many AMQP consumers are explicitly designed so that you don't have to do this if you don't want to. If you have long idempotent tasks, it is both possible (and, in many cases, recommended!) to set QoS (prefetch) to 1 and ACK only after the long task has completed. This means you don't have to write any logic to deal with worker crashes/disconnects/etc. because this is a fully anticipated and supported workflow. There are certainly downsides to this approach, but not having to worry about tracking job ids is much simpler.
It's not only simpler, it's just way more reliable for high volumes of messages with a large number of workers and short processing.
I really was focusing on low volume, long jobs, where RabbitMQ is overkill, and workers respawn does not need proper dequeuing.
One aspect to keep in mind is also the kind of use case, when the job is just a subtask, and the final result is stored, on completion it produces as message to be handled for continuation, you have direct id tracking without further additional logics, and a simple ack is not enough.
For eg, sending emails, the jobs does not create further message, and no shared mutable state occurs. The scenario with ack on end of process covers all cases.
That seems like a rather uncommon definition of processing. Certainly not what people mean when they generally talk about processing a message.
You’re right that if the business-processing is a long heavy-weight operation then that should be decoupled from the message queue and failure handled internally, in which case your definition does make sense as an intermediary hand-off step. Just in the general case, I think most people wouldn’t consider hand-off alone as processing.
> I've never seen this in years of dealing with RabbitMQ.
Did you do long running jobs like they did? It's a stereotype, but I don't think they used the technology correctly here -- you're not supposed to hold onto messages for hours before acknowledging. They should have used RabbitMQ just to kick off the job, immediately ACKing the request, and job tracking/completion handled inside... a database.
The short answer is "yes" but the questions that you should be asking are: A) How long am I willing to block the queue for additional consumers, B) How committed am I to getting close to exactly-once processing, and C) how tolerant of consumer failure should I be? Depending on the answer to those three questions is what drives your queue architecture. Note that this has nothing to do with time spent processing messages or "long running jobs".
Assume that your producers will be able to spike and generate messages faster than your consumers can process them. This is normal! This is why you have a queue in the first place! If your jobs take 5 seconds or 5 hours, your strategy is influenced by the answers to those three questions. For example -- if you're willing to drop a message if a consumer gets power-cycled, then yeah, you'd immediately ack the request and put it back onto a dead letter queue if your consumer runs into an exception. Alternatively, if you're unwilling to block and you want to be very tolerant of consumer failure, you'd fan out your queues and have your consumers checking multiple queues in parallel. Etc etc etc, you get the drift.
Keep in mind also that this isn't specific to RabbitMQ! You'd want to answer the same questions if you were using SQS, or if you were using Kafka, or if you were using 0mq, or if you were using Redis queues, or if you were using pg queues.
If I were designing a system with tasks that took that long, I'd probably want to find a way to checkpoint along the way and break the task down into smaller steps. I'd hate to lose 11 hours and 59 seconds of work if the task fails near the end. You can then use multiple queues to pass the task along between steps.
I’ve used RabbitMq to do long running jobs. Jobs that take hours and hours to complete. Occasionally even 1-2 days.
It did take some configuring to get it working. Between acking appropriately and the prefetch (qos perhaps? Can’t remember, don’t have it in front of me). We were able to make it work. It was pretty straightforward it never even crossed my mind that this isn’t a correct use case for RMQ.
I do something like this in some projects of mine.
RabbitMQ is completely happy to do this. On the other hand, literally every python client library would fall over and/or wedge forever periodically. Celery in particular was a flaming pile of garbage (at least circa 2018).
I wound up vendoring the best of the bunch (amqpstorm, fyi), and adding a bunch of additional logic to handle all the various corner cases. That, plus a bunch of careful thought about how the queues are structured, and I've been pushing millions of messages a day without issue.
I wasn't able to get a lot of my changes upstream due to the extent I had to alter the assumptions the library made about control flow.
You do not need a database. It is trivial (and correct) to create a ~'<x>-status' topic. In the forward arc you are reliably propagating job requests (acked). In the backflow the processing status of job is posted for anyone interested. You can even propagate retries, etc. It is an MQ and RabbitMQ shines in defining complex dispatch toplogies.
Yeah but they already have a database, so it's not like they're adding a database to the system. And (as the article says) the database already contains state, so it makes way more sense to remove RMQ and hold all the state in the database (like they did).
Because a queuing system offers a different thing than a (relational) database.
You can build a queuing system with a database, but you have to do that. Some of the features and constraints of the database might even make your life harder than it has to be.
Instead, view it like that: there is a need for a queuing system and a job system. Either or both can be implemented using a database for certain concersn, but it can also be a custom implementation. It's not a great idea to mix the two things unless the operational and infrastructure costs and complexity outweigh the benefits of a clear separation.
There are libraries that implement queueing on top of databases that require very little setup by the user. For example https://github.com/timgit/pg-boss
Right. However, please don't forget that there are very often inherit limitations regarding scaling or availability and also that many fully fledged message queues come with a lot of perks like access management and administration / debugging tooling and interfaces.
I'm not saying that libraries like pg-boss and co. cannot sometimes replace a full queue implementation. But the tradeoffs need to be clear.
It's all a matter of how much throughput you need. A queuing system can handle, in the same hardware, orders of magnitude more than a traditional SQL database that writes rows to disk in a page-oriented fashion.
If your load is, say, a few hundred writes/second, stick with the database only, and it will be much simpler.
No, because you only need to read and write ids and maybe timestamps to your db, both of which are trivially indexed, rather than the whole blob of your message payload.
In many cases, the message payload is (or should be) an ID anyway. It's seldom desirable for the message payload to include a copy of an external source of truth, because it can cause sync issues. There are exceptions, of course.
It may be a misconfiguration but I’m fairly sure you couldn’t change this behaviour in the past. Each worker would take a job in advance and you could not prevent it (I might be misremembering but I think I checked the source at the time).
In my experience, RabbitMQ isn’t a good fit for long running tasks. This was 10 years ago. But honestly, if you have a short number of long running tasks, Postgres is probably a better fit. You get transactional control and you remove a load of complexity from the system.
I don't think this behavior has changed significantly. The key issue is that they seem to have correctly identified that they wanted to prefetch a single task, but they didn't recognize that this setting is the count of un-ACK'ed tasks. If you ACK upon receipt (as most consumers do by default), then you're really prefetching two tasks - one that's being processed, and one that's waiting to be processed. If you ACK late, you get the behavior that TFA seems to want. I've seen this misconfiguration a number of times.
I'll add another to the anecdata. We saw this issue with RabbitMQ. We replaced it with SQS at the time but we're currently rebuilding it all on SELECT FOR UPDATE.
Our problem was that when a consumer hung on a poison pilled message, the prefetched messages would not be released. We fixed the hanging, but hit a similar issue, and then we fixed that, etc.
We moved to SQS for other reasons (the primary being that we sometimes saturated a single erlang process per rabbit queue), but moving to the SQS visibility timeout model has in general been easier to reason about and has been a better operations experience.
However, we've found that all the jobs are in postgres anyway, and being able to index into our job queue and remove jobs is really useful. We started storing job metadata (including "don't process this job") in postgres and checking it at the start of all our queue workers and we've decided that our lives would be simpler if it was all in postgres.
It's still an experiment on our part, but we've seen a lot of strong stories around it and think it's worth trying out.
What exactly do you mean by "take a job in advance"? I have certainly set the prefetch limit to 1 on my queues, which I believe prevents them from taking a job while they are running one. One of our production applications runs 4+ hour jobs in a RabbitMQ queue without blocking and has done so for 5+ years.
You do want to make sure to set the VM high water mark below 50% of RAM as the GC phase can double the used memory. If high water mark is set too high, the box will thrash swap really badly during GC and hang entirely.
The linked docs in the article also suffer from the “wall of text” style that almost all software docs suffer from. If that’s what they had to go by to work out this behaviour, I wouldn’t be surprised they missed it. Most of the time when I look at docs, none of them are laid out to build understanding. It’s just a description of the system.
Your last comment is the key, they had an issue and not the scale so a simpler approach works, but then I imagine that this company, which is a new company and growing, will have a future blogpost about switching from pg queue to something that fits their scale...
Also when they have two many jobs for their one table - partition the table by customer, when that's still somehow too big - shard the table across a couple DB instances. Toss in some beefy machines that can keep the tables in memory and I suspect you'd have a LOOOONG way to go before you ever really needed to get off of postgres.
In my experience, the benefits of a SQL table for a problem like this are real big - easier to see what's in the queue, manipulate the queue, resolve head-of-queue blocking problems, etc.
Not exactly. For performance (I guess) each worker fetches an extra job while it's working on the current job. If the current job happens to be very long, then the extra job it fetched will be stuck waiting for a long time.
Your multiple queue solution might work but it is most efficient to have just one queue with a pool of workers where where each worker doesn't pop a job unless it's ready to process it immediately. In my experience, this is the optimal solution.
I think this is definitely optimal if your jobs take a long time (and if you add in worker autoscaling of some type, you can deal with quick jobs reasonably well too). But if you have a very large number of quick jobs, setting a high prefetch limit can increase throughput by a tremendous amount. It's in this context that multiple queues makes a lot of sense; you can optimize some workers to read from one queue and others to read from another.
Yeah, my first thought was curiosity about their volume needs. DB based queues are fine if you don't need more than a few messages a second of transport. For that matter, I've found Azure's Storage Queues probably the simplest and most reliable easy button for queues that don't need a lot of complexity. Once you need more than that, it gets... complicated.
Also, sharing queues for multiple types of jobs just feels like frustration waiting to happen.
> DB based queues are fine if you don't need more than a few messages a second of transport
I'd estimate more like dozens to hundreds per second should be pretty doable, depending on payload, even on a small DB. More if you can logically partition the events. Have implemented such a queue and haven't gotten close to bottlenecking on it.
Their solution does not preclude fanout. Fetching work items from such a work queue by multiple nodes/servers should be no problem. One solution that also would be good for monitoring would be to have a state column, and a "handled by node" column. So, in a transaction, find a row that is not taken, take it by setting the state to PROCESSING, and set the handled_by_node to the nodename. Shove in a timestamp for good measure. When it is done, set the state to DONE, or delete the row.
Monitor by having some health check evaluating that no row stays in PROCESSING for too long, and that no row stays NOT_STARTED for too long, etc. Introspect by making a nice little HTML screen that shows this work queue and its states.
This! In all my years of working with RabbitMQ is been super solid! Throwing all that out in exchange for a db queue, honestly doesnt seem smart (with respect)
I'm at a point where I built a low volume queue in MySQL and need to rip it out and replace it with something that does 100+ QPS, exactly once dispatch / single worker processing, job priority level, job topics, sampling from the queue without dequeuing, and limited on failure retry.
I can probably bolt some of these properties onto a queue that doesn't support all the features I need.
What about messages that uncover a bug in the consumer/message format? After X failed attempts, the data shouldn’t be enqueued anymore, but rather logged somewhere for engineers to inspect.
>To make all of this run smoothly, we enqueue and dequeue thousands of jobs every day.
If you your needs aren't that expensive, and you don't anticipate growing a ton, then it's probably a smart technical decision to minimize your operational stack. Assuming 10k/jobs a day, thats roughly 7 jobs per minute. Even the most unoptimized database should be able to handle this.
Years of being bullshitted have taught me to instantly distrust anyone who is telling me about how many things they do per day. Jobs or customers per day is something to tell you banker, or investors. For tech people it’s per second, per minute, maybe per hour, or self aggrandizement.
A million requests a day sounds really impressive, but it’s 12req/s which is not a lot. I had a project that needed 100 req/s ages ago. That was considered a reasonably complex problem but not world class, and only because C10k was an open problem. Now you could do that with a single 8xlarge. You don’t even need a cluster.
10k tasks a day is 7 per minute. You could do that with Jenkins.
the other thing is averaging over days says nothing about spikes in the rate - I imagine very few systems see more-or-less constant traffic over the course of an entire day
I worked on a SaaS CMS. A major consumer PC manufacturer was a customer and their "Welcome to your PC" app would pull their news feed from our system. People would turn on their PC and they'd pull a little JSON. 100s of Thousands of request per day, no big deal. The manufacturer decided that they wanted the updates to show up for everybody at the same time on the hour. Serving 100k req/s for a minute is a much different problem to solve.
Pretty common in industrial control. Your sensors take measurements at a constant rate all day everyday. Sometime a pretty high rate e.g. IEC 61850 is at 4 kHz.
Anything other than operations per second raises my suspicions--a minute is a huge duration for most computing, the speaker is just angling to multiply the measure by 60 for whatever reason.
I snorted at the Jenkins suggestion… and you’re right… spooling an agent all that nonsens… lol I imagine telling “the business” that your customer requests are handled timely by a jenkins job queue
"It's not a lot" has basically been my career up until now, at least the bits where I interacted with a back-end; in the few cases where I witnessed a new back-end being set up, they over-engineered it without fully understanding the actual problem or needs; the engineering was often wishful thinking as well, as in, they wished they had the problems that their proposed solution would fix. Cargo culting, in other words.
> Even the most unoptimized database should be able to handle this.
Anybody had any success running a queue on top of... sqlite?
With the way the sqlite file locking mechanisms work, are you basically guaranteed really low concurrency? You can have lots of readers but not really a lot of writers, and in order to pop a job off of the queue you need to have a process spinning waiting for work, move its status from "to do" to "in progress" and then "done" or "error", which is sort of "write" heavy?
> An EXCLUSIVE lock is needed in order to write to the database file. Only one EXCLUSIVE lock is allowed on the file and no other locks of any kind are allowed to coexist with an EXCLUSIVE lock. In order to maximize concurrency, SQLite works to minimize the amount of time that EXCLUSIVE locks are held.
> You can avoid locks when reading, if you set database journal mode to Write-Ahead Logging (see: http://www.sqlite.org/wal.html).
I processed ~3m messages a day on Sqlite using a pair of ca 2006 era Xeon on spinning rust (for redundancy; each could easily handle the load by itself). The queue processor was written in Ruby and ran on the (very slow) 1.8.x series (even then it used about 10% of a single core.
On modern hardware you should be able to trivially handle more than that.
Does this mean you are processing messages on only one machine, since it's Sqlite? Depending on what you are processing, that could take longer than the queue/dequeue.
The queue was managed by a server written in Ruby that spoke Stomp [1], so while for the biggest queue most of the processing did in fact happen on the same machine, that was just because it happened to fit.
SQLite is missing some features like `SELECT FOR UPDATE`, but you can work around some issues with a few extra queries. I wrote litequeue[0] with this specific purpose. I haven't been able to use it a lot, so I don't have real-world numbers of how it scales, but the scaling limits depend on how fast you can insert into the database.
I don’t know how many messages per second it does but for a podcast crawling side project I have processed hundreds of millions of messages through this little Python wrapper around SQLite. Zero problems. It just keeps running happily.
Why use something as complicated as SQLite? You can use a plain old set of directories and files as a queue, with sane, portable, exclusive, atomic locks, on any filesystem, with concurrent readers/writers. That's how we ran mail servers that handled millions (now billions) of messages a day, 20+ years ago.
Sorry, I don't follow? SQLite also has durability problems on power loss. You can use exactly the same method that SQLite uses to flush data to disk. There is no need for a WAL because the queue operations are atomic, using rename() on POSIX filesystems (including NFS). There's no rollback of course, but you can recover items that are stalled in the queue. With a journaled filesystem, fdatasync() after each atomic operation, and battery-backed RAID, I can't imagine it getting more durable [in comparison to SQLite].
Interestingly, journaled rollback mode in SQLite is probably worse durability than the file/directory queue method. (https://www.sqlite.org/lockingv3.html#how_to_corrupt) (https://www.sqlite.org/atomiccommit.html#sect_9_0) It depends on the same system-level guarantees about the filesystem and hardware, except it also depends on POSIX advisory locks, which are fallible and implementation-specific; while the file/directory queue solely depends on rename() being atomic, which it always is.
> Sorry, I don't follow? SQLite also has durability problems on power loss.
No, it doesn't. Once COMMIT returns, your data is durable.
> You can use exactly the same method that SQLite uses to flush data to disk.
Good luck with that! It took them about ten years to get it right, and the SQLite people are world class. (You can't just copy their code wholesale, since they have one file to worry about and your scheme has tons.)
> With a journaled filesystem, fdatasync() after each atomic operation, and battery-backed RAID, I can't imagine it getting more durable [in comparison to SQLite].
The problem is lack of imagination. :-) See https://danluu.com/deconstruct-files/ (and the linked papers) for an overview of all the things that can go wrong. In particular, if you only only ever fdatasync(), your files are not even guaranteed to show up after a power loss.
That is, without the custom mail server you describe. You can feed every incoming message to a custom command with ".qmail", or forward with ".forward", so we used that for "push" delivery of messages and plain old POP3 for pull. E-mail really does have everything you need to build complex routing topologies.
We ran a mail provider, and so had a heavily customised Qmail setup, and when we needed a queue we figured we might as well use that. Meant we could trivially do things like debugging by cc:'ing messages to a mailbox someone connected a mail client to, for example.
My ISP days were not unsimilar, and had we a TARDIS it might be fun to go back to when both setups were still there and compare notes.
However, I'd hope the SOAP part made it clear I was exaggerating a little for effect :)
... also because the idea of the custom server meaning you could have the main system's qmail handle spaced retries based on SMTP status codes amused me.
I mean SOAP fit right in with the zeitgeist of that era... I seem to remember we were sending XML (shudder), as we had quite a lot of other stuff using XML, but nothing as verbose as SOAP.
Our main CGI (yes...) written in C++ (I can hear you squirm from here) was also loosely inspired by CORBA (stop that screaming) in terms of "sort-of" exposing objects via router that automatically mapped URLs to objects, and which used XML for persistence and an XML based rendering pipeline (not that different from React components, actually, except all C++ and server side).
Yeah, I had quite a bit of XML flying around, plus the inevitable "all our vendors want to communicate via CSV over FTP", and some customer facing perl CGIs plus a daemon or two I'd written that usually spoke XML over TCP sockets.
Plus lots of NFS for the shared filestore that the qmail SMTP nodes and the courier/qmail-pop3d mail receipt nodes mounted.
Plus ... yeah, you can see why I thought we'd not find each others' setups -too- surprising.
So, no, not going to squirm, because I mean, yes, I know, but it all (mostly) worked and the customers weren't unusually unhappier with us than they are with any provider ;)
Yeah, I'm super confused with this. Getting a few thousand per second seems relatively trivial, on an Arduino. Maybe there's something I'm missing here, or is this the abstractions that software lives at these days?
This limits simultaneous writes to the maximum number of open file handles supported by the OS. I don’t know what that is, but I don’t see how it can compare to a multiple multiplexed TCP/IP sockets.
When you’re writing billions of messages per day, I don’t see how a file system scales.
On Linux, /proc/sys/fs/file-max has the maximum number of simultaneous open filehandles supported by the kernel. On my laptop this is about 1.6 million
But also keep in mind every executable has at minimum its own executable as an open file. Picking a random python process I currently have running, lsof reports it has 41 open *.so files.
Yes, but it's also highly unlikely that if you're trying to push transactions per second into that kind of range that you'd be doing it with individual processes per transaction. You'd also be likely to be hitting IO limits long before the number of file descriptors is becoming the issue.
Yep I’ve used this approach for file transfers/ messaging between two systems. Primitive but get the lock process unlock and move sequence working and it works for that kind of use case
The p3rl.org/Minion job queue has a sqlite backend and I'm aware of it handling what seemed to me a fairly acceptable amount of throughput and the source to https://metacpan.org/pod/Minion::Backend::SQLite looks pretty comfortable to me.
I've not run it myself in production but I would definitely have heard if it went wrong, I think OpenSuSe's OpenQA thing does build worker boxes that way (though of course they'll have fairly slow jobs so it may just be the write throughput doesn't matter).
This being HN, I'd like to point out you can steal the SQL from the source for your own uses if you want to, this just happens to be the example full implementation I'm familiar with.
Not to be coy, but it only is if it is. For this application I'd agree, but there are plenty of apps that want queues for in-memory use, and if the thing that holds the memory for the queue dies, chances are the whole ecosystem has.
I once saw WAL turned off to make a sqlite queue perform better.
But that was ... gremlins. More than one person tried to figure out wtf was going one and eventually it was better business wise to declare 'gremlins' and everybody involved in the incident has been annoyed about not figuring it out since.
A simple implementation of a queue in SQL will need to acquire an exclusive lock on the table anyway. Although it's not necessary to use locking at the level of SQL itself: https://news.ycombinator.com/item?id=27482402
For a client-server RDBMS with writes over the network, sure. For an SQLite database with writes to the local filesystem and an order of magnitude better write performance, not so much.
We were comfortably supporting millions of jobs per day as a Postgres queue (using select for update skip locked semantics) at a previous role.
Scaled much, much further than I would’ve guessed at the time when I called it a short-term solution :) — now I have much more confidence in Postgres ;)
> We were comfortably supporting millions of jobs per day as a Postgres queue (using select for update skip locked semantics) at a previous role.
That's very refreshing to hear. In a previous role I was in a similar situation than yours, but I pushed for RabbitMQ instead of postgres due to scaling concerns, with hypothetical seilings smaller than the ones you faced. My team had to make a call without having hard numbers to support any decision and no time to put together a proof of concept. The design pressures were the simplicity of postgres vs paying for the assurance of getting a working message broker with complexity. In the end I pushed for the most conservative approach and we went with RabbitMQ, because I didn't wanted to be the one having to explain why we had problems getting a RDBMS to act as a message broker when we get a real message broker for free with a docker pull.
I was always left wondering if that was the right call, and apparently it wasn't, because RabbitMQ also put up a fight.
If there were articles out there showcasing case studies of real world applications of implementing message brokers over RDBMS then people like me would have an easier time pushing for saner choices.
Those don't have money to fund studies about industry best practices. So you don't get many.
Almost everything you see on how to use a DBMS is an amateur blog or one of those studies. One of those is usually dismissed on any organization with more than one layer of management.
> Those don't have money to fund studies about industry best practices. So you don't get many.
Your comment reads like a strawman. I didn't needed "studies". It was good enough if there was a guy with a blog saying "I used postgres as a message broker like this and I got these numbers", and they had a gitlab project page providing the public with the setup and benchmark code.
Just out of curiosity (as someone who hasn't done a lot of this kind of operational stuff) how does this approach to queueing with Postgres degrade as scale increases? Is it just that your job throughput starts to hit a ceiling?
Throughput is less of an issue then queue size—Postgres can handle a truly incredible amount of throughput as long as the jobs table is small enough that it can safely remain in memory for every operation. We can handle 800k jobs/hr with postgres, but if you have more than 5k or 10k jobs in the table at any given time, you're in dangerous territory. It's a different way of thinking about queue design then some other systems, but it's definitely worth it if you're interested in the benefits Postgres can bring (atomicity, reliability, etc)
With Postgres, you also need to worry a lot about tombstoning and your ability to keep up with the vacuums necessary to deal with highly mutable data. This can depend a lot on what else is going on with the database and whether you have more than one index on the table.
One strategy for mitigating vacuum costs would be to adopt an append-only strategy and partition the table. Then you can just drop partitions and avoid the vacuum costs.
Really depends on the needs but this can unlock some very impressive and sustainable throughputs.
That's the original problem, but then there are the secondary effects. Some of the people who made decision on that basis write blog posts about what they did, and then those blog posts end up on StackOverflow etc, and eventually it just becomes "this is how we do it by default" orthodoxy without much conscious reasoning involved - it's just a safe bet to do what works for everybody else even if it's not optimal.
My hobby project does ~1.5M jobs per day enqueued into Postgres, no sweat. I use https://github.com/bensheldon/good_job which uses PG's LISTEN/NOTIFY to lower worker poll latency.
Briefly, it spins up a background thread with a dedicated database connection and makes a blocking Postgres LISTEN query until results are returned, and then it forwards the result to other subscribing objects.
I can't speak for how they do it, but when your worker polls the table and finds no rows, you will sleep. While sleeping, you can also LISTEN on a channel (and if you get a message, you abort your sleep).
Then, whenever you write a new job to the queue, you also do a NOTIFY on the same channel.
This lets you keep latency low while still polling relatively infrequently.
NOTIFY is actually transactional which makes this approach even better (the LISTENer won't be notified until the NOTIFY transaction commits)
A few millions a days is a few dozens per second; we currently have a service running this order of magnitude of jobs with a SELECT/SKIP LOCKED pattern and no issue at all on a medium AWS box.
In other SQL databases an 'in memory' table could be a candidate. It looks like Postgres only has session specific temporary tables, but does have an UNLOGGED https://www.postgresql.org/docs/13/sql-createtable.html table type which has desirable properties for temporary data that must be shared.
a well-tuned bare metal box in a master-slave config should easily handle (being conservative here) 10k/s... I assume a purpose-built box could handle 100k/s without breaking a sweat
I've used Postgres to handle 60M jobs per second (using FOR UPDATE SKIP LOCKED) in production, for two years, on a single dual core 8GB GCE VM. Postgres goes far.
> Assuming 10k/jobs a day, that's roughly 7 jobs per minute.
I've seen systems at that scale where that's roughly true. But I've also seen systems where those jobs come in a daily batch, at a point in time of day, and then nothing until the next day's batch.
Yep, even websites can be highly non-periodic. I used to run the web services for the 50 or so magazines that Risk-Waters had at the time, and around lunch time was a massive peak of traffic, easily 100x our slower times.
Postgres can handle 10k batch inserts in seconds on even commodity hardware. Not done batch, you should still get >100 inserts/second with a few indexes thrown in there.
The best thing about using PostgreSQL for a queue is that you can benefit from transactions: only queue a job if the related data is 100% guaranteed to have been written to the database, in such a way that it's not possible for the queue entry not to be written.
On a quick read this seems like another name for Change Data Capture. In general the pattern works better if you can integrate it with the database's transaction log, so then you can't accidentally forget to publish something.
CDC is one of the mechanisms you can use to implement this if the volume of message is high, but the idea is to decouple your business transactions from sending out notifications and do the latter asynchronously.
Also you can benefit from being able to use all of SQL to manage the queue.
I built a system ages ago that had modest queue needs.. maybe 100 jobs a day. It involved syncing changes in the local database with external devices. Many changes would ultimately update the same device, and making the fewest number of updates was important.
The system used an extremely simple schema: A table with something like [job_id, device, start_after, time_started, time_finished]
When queueing a job for $device, do an upsert to either insert a new record, or bump up the start_after of a not yet started job to now+5 minutes. When looking for a job to run, ignore anything with a start_after in the future.
As edits were made, it would create a single job for each device that would run 5 minutes after the last change was made.
I know a lot of queueing systems have the concept of a delayed job, but I haven't come across any that had the concept of delayed jobs+dedup/coalescence.
I also used a similar pattern in reverse to process incoming messages that had to be partially processed in-order: one simple worker that dumped the queue to a Postgres table at full speed and N workers that used a slightly more complicated SELECT ... FOR UPDATE SKIP LOCKED query than usual to pick up the next message that could be safely processed. Again, using a single database made transactions very simple: only the message-dumping worker had to worry about processing messages exactly once (which it did via being conservative with acking and utilizing INSERT ... ON CONFLICT DO NOTHING), every other worker could just implicitly rely on database transactions).
This is so important if you want to avoid incredibly gnarly race conditions. In particular for us: jobs being run even before the transaction has been fully committed to the database.
We utilise a decorator for our job addition to external queues, such that the function that does the addition gets attached to Django's "on transaction commit" signal and thus don't actually get run until the outer database transaction for that request has been committed.
That is the simple, but unreliable way to fix the issue. If your python process crashes or is killed between sending the commit and enqueueing the job, the job will never be enqueued.
A possible solution to this is to use a "transactional outbox" pattern, but that has many of the same drawbacks of using postgres as a queue.
The original messaging middleware on which RabbitMQ (and other AMQP-capable message brokers) are based is IBM's WebSphere MQ fka MQseries. MQ can be used in a transactional fashion depending on whether messages are stored or can be timed out, and other QoS. Within CICS, MQ can also participate in distributed transactions along with database transactions, and this would be a typical way to use transactional message processing. And X/Open distributed transactions were one of the earliest open system/Unix standards from around 1983, so have been a well understood architecture outside mainframes as well.
That it's beneficial to use Postgres messaging (or Oracle AQ or whatever) for its transactional semantics is kind of accidental and a consequence of folks not wanting to bother with dtx. Even though databases are accessed via networks, truly scalable work distribution can't be achieved using SQL, much less with SQLite. Or in other words, if you're using messaging queues in databases, you could use tables and row locks directly just as well.
Here are a couple of tips if you want to use postgres queues:
- You probably want FOR NO KEY UPDATE instead of FOR UPDATE so you don't block inserts into tables that have a foreign key relationship with the job table. [1]
- If you need to process messages in order, you don't want SKIP LOCKED. Also, make sure you have an ORDER BY clause.
My main use-case for queues is syncing resources in our database to QuickBooks. The overall structure looks like:
BEGIN; -- start a transaction
SELECT job.job_id, rm.data
FROM qbo.transmit_job job
JOIN resource_mutation rm USING (tenant_id, resource_mutation_id)
WHERE job.state = 'pending'
ORDER BY job.create_time
LIMIT 1 FOR NO KEY UPDATE OF job NOWAIT;
-- External API call to QuickBooks.
-- If successsful:
UPDATE qbo.transmit_job
SET state = 'transmitted'
WHERE job_id = $1;
COMMIT;
This code will serialize access to the transmit_job table. A more clever approach
would be to serialize access by tenant_id. I haven't figured out how to do that
yet (probably lock on a tenant ID first, then lock on the job ID).
Somewhat annoyingly, Postgres will log an error if another worker holds the row lock (since we're not using SKIP LOCKED). It won't block because of NOWAIT.
CrunchyData also has a good overview of Postgres queues: [2]
Not doing SKIP LOCKED will make it basically single threaded, no? I’m of the opinion that you should just use Temporal if you don’t need inter-job order guarantees
I explicitly want single-threaded execution within a tenant. After writing the original post, I figured out how to parallelize across tenants. The trick is to limit the query to only look at the next pending job for each tenant, which then allows for using SKIP LOCKED to process tenants in parallel:
SELECT job.job_id, rm.data
FROM qbo.transmit_job job
JOIN resource_mutation rm USING (tenant_id, resource_mutation_id)
WHERE job.state = 'pending'
AND job.job_id = (
SELECT qj2.job_id
FROM qbo.transmit_job qj2
WHERE job.tenant_id = qj2.tenant_id
AND qj2.state = 'pending'
ORDER BY qj2.create_time
LIMIT 1
)
ORDER BY job.create_time
LIMIT 1 FOR NO KEY UPDATE OF job SKIP LOCKED
> you should just use Temporal
Do you mean the Airflow-esque DAG runner? I prefer Postgres because it's one less moving part, I like transactional guarantees, my volume is tiny, and I can tweak the queue logic like above with a simple predicate change.
Skip locked main utility is to provide work queue capability to PG. Even documentation of PG is referring to skip locked as a main driver for work queues
Postgres is probably the best solution for every type of data store for 95-99% of projects. The operational complexity of maintaining other attached resources far exceed the benefit they realise over just using Postgres.
You don’t need a queue, a database, a blob store, and a cache. You just need Postgres for all of these use cases. Once your project scales past what Postgres can handle along one of these dimensions, replace it (but most of the time this will never happen)
We collect messages from tens of thousands of devices and use RabbitMQ specifically because it is uncoupled from the Postgres databases. If the shit hits the fan and a database needs to be taken offline the messages can pool up in RabbitMQ until we are in a state where things can be processed again.
Still trivial to get that benefit with just a separate postgres instance for your queue, then you have the (very large IMO) benefit of simplifying your overall tech stack and having fewer separate components you have to have knowledge for, keep track of version updates for, etc.
In my experience, persistent message queue is just a poor secondary database.
If anything, I prefer to use ZeroMQ and make sure everything can recover from an outage and settle eventually.
To ingest large inputs, I would just use short append only files and maybe send them over to the other node over ZeroMQ to get a little bit more reliability, but rarely are such high volume data that critical.
There is nothing like free lunch when talking distributed fault tolerant systems and simplicity usually fares rather well.
Sure, if you're at the "it's not worth maintaining two dbs or paying for any managed offering" stage then you should still use the pattern but implement the work queue in your primary db. It'll be slower and you'll have to diy a little if there aren't ready made libs to do it but you still get all the major benefits.
You act like the "I can persist data" is the part that matters. It's the fact that I can from my pool of app servers post a unit of work to be done and be sure it will happen even if the app server gets struck down. It's the architecture of offloading work from your frontend whenever possible to work that can be done at your leisure.
Use whatever you like to actually implement the queue, Postgres wouldn't be my first or second choice but it's fine, I've used it for small one-off projects.
And if you store your work inbox in a relational db then you invented a queueing system. The point is that queues can ingest messages much much faster and cheaper than a db, route messages based on priority and globally tune the speed of workers to keep your db from getting overwhelmed or use idle time.
If you push a change to your frontend server that does all the work itself and it breaks you have a service interruption. That same system based on queues it's a queue backup and you have breathing room.
If you're doing a longer operation and your app server crashes that work is lost and the client has sit there and wait, in a queue system you can return almost immediately and if there's a crash it'll get picked up by another worker.
Having a return queue and websockets let you give feedback to the client js in a way that is impervious to network interruptions and refreshes, once you reconnect it can catch up.
These aren't theoretical, this architecture has saved my ass on more occasions than I can count.
I believe there are a lot of assumptions baked into your argument that are going undiscussed. Foremost being that queuing all requests is even desirable. For certain workloads that may be the case, like ingesting event data, but for others queuing the requests doesn't make the service any more available than not queuing the requests.
I'm a bit late to the party (also, why is everyone standing in a circle with their pants down) but, does no one care about high availability with zero data loss failover, and zero downtime deployments anymore?
In one instance I did that by storing the file on a fault tolerant NAS. Basically I outsourced the issue to NetApp. The NAS was already there to match other requirements so why not lean on IT.
This really depends on the size of your blobs. Having a small jsonb payload will not be a problem, storing 500MB blobs in each row is probably not ideal if you frequently update the rows and they have to be re-written.
While it makes sense to use postgres for a queue where latency isn't a big issue, I've always thought that the latency needs of many kinds of caches are such that postgres wouldn't suffice, and that's why people use (say) redis or memcached.
But do you think postgres latency is actually going to be fine for many things people use redis or memcached for?
As the maintainer of a rabbitmq client library (not the golang one mentioned in the article) the bit about dealing with reconnections really range true. Something about the AMQP protocol seems to make library authors just... avoid dealing with it, forcing the work onto users, or wrapper libraries. It's a real frustration across languages, golang, python, JS, etc. Retry/reconnect is built in to HTTP libraries, and database drivers. Why don't more authors consider this a core component of a RabbitMQ client?
AMQP (the protocol) basically requires this kind of behavior. Whenever anything "bad" happens, the state of the session becomes unknown (was the last message actually sent? was the last ack actually received? etc), and the only way to correct it is to kill the connection, create a new session, and start over.
Fixing that really needs to be at the protocol level, like a way to re-establish a previous session, or rollback the session state, or something. It's definitely hard mode for library authors to fix this in any kind of transparent way.
The qpid client libraries supported automatic transparent reconnection attempts, but in the end I usually had to disable them in order to add logic for what to do after reconnecting. IE, I needed to know the connection was lost in order to handle it anyways.
Nats has all of these features built in, and is a small standalone binary with optional persistence. I still don’t understand why it’s not more popular.
This also occurs when dealing with channel-level protocol exceptions, so this behavior is doubly important to get right. I think one of the hard parts here is that the client needs to be aware of these events in order to ensure that application level consistency requirements are being kept. The other part is that most of the client libraries I have seen are very imperative. It's much easier to handle retries at the library level when the client has specified what structures need to be declared again during a reconnect/channel-recreation.
We've had RabbitMQ as part of our stack at my day job since time began, I think it's great software overall but boy are the client libraries a challenge.
We've built a generalised abstraction around first Pika and then pyamqp (because Pika had some odd issues, I forget the details of which) and while pyamqp seems better, it's still not without its odd warts.
We ended up needing to develop a watchdog to wrap invocation of amqp.Connection.drain_events(timeout: int) because despite using the timeout, that call would very occasionally inexplicably block forever (with the only way to break it free being to call amqp.Connector.collect()).
My other data point was a time I built something to slice off a copy of production data for testing purposes (from instances of the system above) using Benthos (pretty cool software tbh, Go underneath), but it would inexplicably just stop consuming messages and I had no idea why (so I just went back to our gross but proven Python abstraction to achieve the same).
In many scenarios a DB/SQL-backed queue is far superior to the fancy queue solutions such as RabbitMQ because it gives you instantaneous granular control over 'your queue' (since it is the result set of your query to reserve the next job).
Historically people like to point out the common locking issues etc... with SQL but in modern datbases you have a good number of tools to deal with that ("select for update nowait").
If you think about it a queue is just a performance optimisation (it helps you get the 'next' item in a cheap way, that's it).
So you can get away with "just a db" for a long time and just query the DB to get the next job (with some 'reservations' to avoid duplicate processing).
At some point you may overload the DB if you have too many workers asking the DB for the next job. At that point you can add a queue to relieve that pressure.
This way you can keep a super dynamic process by periodically selecting 'next 50 things to do' and injecting those job IDs in the queue.
This gives you the best of both worlds because you can maintain granular control of the process by not having large queues (you drip feed from DB to queue in small batches) and the DB is not overly burdened.
+1 to this. I'm just as wary to recommend using a DB as a queue as the next person, but it is a very common pattern at my company.
DB queues allow easy prioritization, blocking, canceling, and other dynamic queued job controls that basic FIFO queues do not. These are all things that add contention to the queue operations. Keep your queues as dumb as you possibly can and go with FIFO if you can get away with it, but DB queues aren't the worst design choice.
> One of our team members has gotten into the habit of pointing out that “you can do this in Postgres” whenever we do some kind of system design or talk about implementing a new feature. So much so that it’s kind of become a meme at the company.
We do just about everything with one or more Postgres databases. We have workers that query the db for tasks, do the work, and update the db. Portals that are the read-only view of the work being performed, and it's pretty amazing how far we've gotten with just Postgres and no real tuning on our end. There's been a couple scenarios where query time was excessive and we solved by learning a bit more about how Postgres worked and how to redefine our data model. It seems to be the swiss army knife that allows you to excel at most general cases, and if you need to do something very specific, well at that point you probably need a different type of database.
I find it funny how sometimes there are two sides to the same coin, and articles like these rarely talk about engineering tradeoffs. Just one solution good, other solution bad. I think it is a mistake for a technical discussion to not talk in terms of tradeoffs.
Obviously it makes sense to not use complex tech when simple tech works, especially at companies with a lower traffic volume. That is just practical engineering.
The inverse, however, can also be true. At super high volumes you run into issues really quickly. Just got off a 3 hour site-wide outage due to the database unable to keep up with the unprecedented queue load, and the db system basically ground to a halt. The proposed solution is actually to move off of a dedicated db queue for SQS.
This was a system running that has run well for about 10 years. Granted there was an unprecedented queue volume for this system, but sometimes a scaling ceiling is hit, and it is hit faster than you might expect from all these comments saying to always use a db always, even with all the proper indexing and optimizations.
We've inadvertently "load tested" our distributed locking / queue impl on postgres in production, and so I know that it can handle hundreds of thousands of "what should I run / try to take lock on task" queries per minute, with a schema designed to avoid bloat/vacuuming, tuned indices, and reasonably beefy hardware.
RabbitMQ may have been overkill for the need, but it's also clear that there was an implementation bug which was missed.
Db queues are simple to implement and so given the volume it's one way to approach working around an mq client issue.
Personally, and I mean personally, I have found messaging platforms to be full of complexity, fluff, and non-standard "standards", it's just alot of baggage and in the case of messaging alot of bugs.
I have seen Kafka deployed and ripped out a year later, and countless bugs in client implementations due to developer misunderstanding, poor documentation, and unnecessary complexity.
For this reason, I refer to event driven systems as "expert systems" to be avoided. But in your life "there will be queues"
I wrote a message queue in Python called StarQueue.
It’s meant to be a simpler reimagining of Amazon SQS.
It has an HTTP API and behaves mostly like SQS.
I wrote it to support Postgres, Microsoft’s SQL server and also MySQL because they all support SKIP LOCKED.
At some point I turned it into a hosted service and only maintained the Postgres implementation though the MySQL and SQL server code is still in there.
After that I wanted to write the worlds fastest message queue so I implemented an HTTP message queue in Rust. It maxed out the disk at about 50,000 messages a second I vaguely recall, so I switched to purely memory only and in the biggest EC2 instance I could run it on it did about 7 million messages a second. That was just a crappy prototype so I never released the code.
After that I wanted to make the simplest possible message queue so I discovered that Linux atomic moves are the basis of a perfectly acceptable message queue that is simply file system based. I didn’t put it into a message queue, but close enough to be the same I wrote an SMTP buffer called Arnie. It’s only about 100 lines of Python. https://github.com/bootrino/arniesmtpbufferserver
What is the prefetch value for RabbitMQ mean?
> The value defines the max number of unacknowledged deliveries that are permitted on a channel.
From the Article:
> Turns out each RabbitMQ consumer was prefetching the next message (job) when it picked up the current one.
that's a prefetch count of 2.
The first message is unacknowledged, and if you have a prefetch count of 1, you'll only get 1 message because you've set the maximum number of unacknowledged messages to 1.
So, I'm curious what the actual issue is. I'm sure someone checked things, and I'm sure they saw something, but this isn't right.
tl;dr: prefetch count of 1 only gets one message, it doesn't get one message, and then a second.
Note: I didn't test this, so there could be some weird issue, or the documentation is wrong, but I've never seen this as an issue in all the years I've used RabbitMQ.
That's my thinking as well. Seems like they're not using the tool correctly and didn't read the documentation. Oh well, let's switch to Postgres because "reasons". And now to get the features of a queuing system, you have to build it yourself. Little bit of Not Invented Here syndrome it sounds like.
I mean, at some point you do have to write code. Either
1.) You (hopefully) know a bit about how your DB works, what the workload is, what your tasks are. You also (hopefully) know a bit about SQL and Postgres. So you learn a bit more and build upon that knowledge, and implement the queue there (which comes with other benefits).
2.) You learn about a new package, deal with how to set that up, and how to integrate it with your existing database (including how tasks get translated between the queue and your existing DB). This also increases your maintenance and deployment burden, and now developers need to know not only about your DB, but the queueing package as well.
There are certainly cases where #2 makes sense. But writing off #1 as NIH often leads to horrifically over-engineered software stacks, when 10s/few hundred lines of code will suffice.
But, as a sibling comment mentioned, they could probably got away with a basic get instead.
As documentation writers (and often that should be read "as developers") it is our task to make our users fall into the success pit - and stay there.
Unfortunately, by not considering this, thousands of hours get lost every year and lots of potential users leave because they were somehow (Google, random blogs, our documentation) led into the hard path.
My favourite example is how, for years, if you tried to find out how to use an image in a Java application you would end up with the documentation for an abstract class 2dgraphics or something, while the easy way was to use an Icon.
"Once the number reaches the configured count, RabbitMQ will stop delivering more messages on the channel unless at least one of the outstanding ones is acknowledged"
The article unintentionally communicated more about the engineering team than the subject.
btw, publisher confirms used in conjunction with prefetch setting can allow for flow control within a very well behaved band.
People run into issues with Rabbit for two reasons. You noted one (they are documentation averse), and number is two is mistaking a message queue for a distributed log. Rabbit does -not- like holding on to messages. Performance will crater if you treat it as a lake. It is a river
I'm wondering if they made the mistake of acknowledging the message before the processing was done. From the article it sounds like their jobs take a long time to run so they may have acked the message to stop RabbitMQ from delivering the message to another worker for retry but IIRC there is a setting that allows you to extend the "lease" time on a message before retry.
This is interesting because I’ve seen a queue that was implemented in Postgres that had performance problems before: the job which wrote new work to the queue table would have DB contention with the queue marking the rows as processed. I wonder if they have the same problem but the scale is such that it doesn’t matter or if they’re marking the rows as processed in a way that doesn’t interfere with rows being added.
They claim "thousands of jobs every day", so the volume sounds very manageable. In a past job, I used postgres to handle millions of jobs/day without too much hassle.
They also say that some jobs take hours and that they use 'SELECT ... FOR UPDATE' row locks for the duration of the job being processed. That strongly implies a small volume to me, as you'd otherwise need many active connections (which are expensive in Postgres!) or some co-ordinator process that handles the locking for multiple rows using a single connection (but it sounds like their workers have direct connections).
I think the 'select for update' query is used by a worker to fetch jobs ready for pickup, then update the status to something like 'processing' and the lock is released. The article doesn't mention holding the lock for the entire duration of the job.
I wish they actually wrote about their exact implementation. Article is kinda light on any content without that. I suspect you are right, I have implemented this kinda thing in a similar way.
It's a bit of work but allows for fine grained retry logic. Some jobs should not be retried but escalated, others might need a retry in one hour, others can be retried at a lower priority. I prefer a flatter stack and total control over the queue. That being said, I have RabbitMQ running in a production project that I haven't touched in years, it's rock solid and I'm pretty sure these guys had it misconfigured.
> To make all of this run smoothly, we enqueue and dequeue thousands of jobs every day.
The scale isn't large enough for this to at all be a worry. The biggest worry here I imagine is ensuring that a job isn't processed by multiple workers, which they solve with features built into Postgres.
Usually I caution against using a database as a queue, but in this case it removes a piece of the architecture that they have to manage and they're clearly more comfortable with SQL than RabbitMQ so it sounds like a good call.
It is easy to avoid multiple workers processing the same task: `delete from task where id = (select id from task for update skip locked limit 1) returning *;`
(not sure why this comment was dead, I vouched for it)
There are a lot of ways to implement a queue in an RDBMS and a lot of those ways are naive to locking behavior. That said, with PostgreSQL specifically, there are some techniques that result in an efficient queue without locking problems. The article doesn't really talk about their implementation so we can't know what they did, but one open source example is Que[1]. Que uses a combination of advisory locking rather than row-level locks and notification channels to great effect, as you can read in the README.
yep, i had precisely this issue in a previous job, where i tried to build a hacky distributed queue on top of postgres. almost certainly my inexperience with databases rather than the volume of jobs, but i figured i was being shortsighted trying to roll my own and replaced it with rabbitmq (which we had a hell of a time administering, but that's a different story)
Sounds like a poorly written AMQP client of which there are many. Either you go bare bones and write wrappers to implement basic functionality or find a fully fleshed out opinionated client. If you can get away with using PostgreSQL go for it.
Using a DB as an event queue opens up many options not easily possible with traditional queues. You can dedupe your events by upserting. You can easily implement dynamic priority adjustment to adjust processing order. Dedupe and priority adjustment feels like an operational superpower.
I've had a very good experience with pg queuing. I didn't even know `skip locked` was a pg clause. That would have... made the experience even better!
I am afraid I've moved to a default three-way architecture:
- backend autoscaling stateless server
- postgres database for small data
- blobstore for large data
it's not that other systems are bad. its just that those 3 components get you off the ground flying, and if you're struggling to scale past that, you're already doing enormous volume or have some really interesting data patterns (geospatial or timeseries, perhaps).
Quite correct. Made an error. Carryover from a prior job where we mucked with weather data... I was thinking more along the lines of raster geo datasets like sat images, etc. Each pixel represents one geo location, with a carrying load of metadata, then a timeseries of that snapshot & md, so timeseries-raster-geomapped data basically.
I don't remember anymore what that general kind of datatype is called, sadly.
I've done something like this and opted to use advisory locks instead of row locks thinking that I'd increase performance by avoiding an actual lock.
I'm curious to hear what the team thinks the pros/cons of a row vs advisory lock are and if there really are any performance implications. I'm also curious what they do with job/task records once they're complete (e.g., do they leave them in that table? Is there some table where they get archived? Do they just get deleted?)
Advisory locks are purely in-memory locks, while row locks might ultimately hit disk.
The memory space reserved for locks is finite, so if you were to have workers claim too many queue items simultaneously, you might get "out of memory for locks" errors all over the place.
> Both advisory locks and regular locks are stored in a shared memory pool whose size is defined by the configuration variables max_locks_per_transaction and max_connections. Care must be taken not to exhaust this memory or the server will be unable to grant any locks at all. This imposes an upper limit on the number of advisory locks grantable by the server, typically in the tens to hundreds of thousands depending on how the server is configured.
> What did you do to avoid implicit locking, and what sort of isolation level were you using?
I avoided implicit locking by manually handling transactions. The query that acquired the lock was a separate transaction from the query that figured out which jobs were eligible.
> Without more information about your setup, the advisory locking sounds like dead weight.
Can you expand on this? Implementation-wise, my understanding is that both solutions require a query to acquire the lock or fast-fail, so the advisory lock acquisition query is almost identical SQL to the row-lock solution. I'm not sure where the dead weight is.
And if that's accurate, I'm failing to see how an advisory lock would leave the table unblocked for any amount of time greater than row-level locks would.
The point of the explicit row-level locking is to allow a worker to query for fresh rows without fetching any records that are already in-progress (i.e. it avoids what would otherwise be a race condition between the procedural SELECT and UPDATE components of parallel workers), so if you've already queried a list of eligible jobs, and then have your workers take advisory locks, what are those locks synchronizing access to?
In my solution, the id I was passing to pg_try_advisory_lock was the id of the record that was being processed, which would allow several threads to acquire jobs in parallel.
The second difference is that my solution filters the table containing jobs with the pg_locks table and excluds records where the the lock ids overlapped and the lock type was an advisory lock. Something like:
SELECT j.*
FROM jobs j
WHERE j.id NOT IN (
SELECT pg_locks l ON j.id = (l.classid::bigint << 32) | l.objid::bigint
WHERE l.locktype = 'advisory'
)
LIMIT 1;
The weird expression in the middle comes from the fact that Postgres takes the id you pass to get an advisory lock and splits it across two columns in pg_locks, forcing the user to put them back together if they want the original id. See https://www.postgresql.org/docs/current/view-pg-locks.html.
Would be nice if a little more detail were added in order to give anyone looking to do the same more heads-up to watch out for potential trouble spots. I take it the workers are polling to fetch the next job which requires a row lock which in turn requires a transaction yeah? How tight is this loop? What's the sleep time per thread/goroutine? At what point does Postgres go, sorry not doing that? Or is there an alternative to polling and if so, what? :)
Were Prequel using RaabitMQ to stay cloud platform agnostic when spinning up new environments? Always wondered how companies that offer managed services on the customers cloud like this manage infrastructure in this regard. Do you maintain an environment on each cloud platform with a relatively standard configuration, or do you have a central cluster hosted in one cloud provider which the other deployments phone home to?
Low effort post on my part, but I sure won't be buying or taking any advice from this company that publicly advertises this kind of mess in such a "proud" manner. Not only that but it's like the SASS-equivalent of the recipe blog post meme.
This is what you get when you hire using leet code exercises and dump all your design thinking to "Senior" developers that have 1-3 years under their belt.
I've also used PostgreSQL as a queue but I worry about operational implications. Ideally you want clients to dequeue an item, but put it back in the queue (rollback transaction) if they crash while prpcessing the item. But processing is a long-running task, which means that you need to keep the database connection open while processing. Which means that your number of database connections must scale along with the number of queue workers. And I've understood that scaling database connections can be problematic.
Another problem is that INSERT followed by SELECT FOR UPDATE followed by UPDATE and DELETE results in a lot of garbage pages that need to be vacuumed. And managing vacuuming well is also an annoying issue...
I've generally seen short(ish) leases as the solution to this problem. The queue has an owner and expiration column and workers update the lease and NOW()+N when getting work, and when selecting for work get anything that has expired or has no lease.
This assumes the processing is idempotent in the rest of the system and is only committed transactionally when it's done. Some workers might do wasted work, but you can tune the expiration future time for throughput or latency.
As much as I detest MongoDB immaturity in many respects, I found a lot of features that are actually making life easier when you design pretty large scale applications (mine was typically doing 2GB/s of data out of the database, I like to think it is pretty large).
One feature I like is change event stream which you can subscribe to. It is pretty fast and reliable and for good reason -- the same mechanism is used to replicate MongoDB nodes.
I found you can use it as a handy notification / queueing mechanism (more like Kafka topics than RabbitMQ). I would not recommend it as any kind of interface between components but within an application, for its internal workings, I think it is pretty viable option.
Funny enough, we designed one subsystem to use RabbitMQ to enforce linear committed records into mongodb to avoid indices. I.e. the routes in rabbitMQ would ensure a GUID tagged record was spatially localized with other user data on the same host (the inter-host shovel traffic is minimized).
Depends on the use-case, but the original article smells like FUD. This is because the connection C so lib allows you to select how the envelopes are bound/ack'ed on the queue/dead-letter-route in the AMQP client-consumer (you don't usually camp on the connection). Also, the expected runtime constraint should always be included when designing a job-queue regardless of the underlying method (again, expiry default routing is built into rabbitMQ)...
MongoDB's change stream is accidentally very simple to use. You just call the database and get continuous stream of documents that you are interested in from the database. If you need to restart, you can restart processing from the chosen point. It is not a global WAL or anything like that, it is just a stream of documents with some metadata.
> If you need to restart, you can restart processing from the chosen point
One caveat to this is that you can only start from wherever the beginning of your oplog window is. So for large deployments and/or situations where your oplog ondisk size simply isn't tuned properly, you're SOL unless you build a separate mechanism for catching up.
Yep, absolutely. But the side effect I am referring to (and probably wasn't clear enough about) is that the oplog is globally shared across the replica set. So even if your queue collection tops out at like 10k documents max, if you have another collection in the same deployment thats getting 10mm docs/min, your queue window is also gonna be artificially limited.
Putting the queue in its own deployment is a good insulation against this (assuming you don't need to use aggregate() with the queue across collections obviously).
I do agree, but listen... this is supposed to be handy solution. You know, my app already uses MongoDB, why do I need another component if I can run my notifications with a collection?
Also, I am firm believer that you should not put actual data through notifications. Notifications are meant to wake other systems up, not carry gigabytes of data. You can pack your data into another storage and notify "Hey, here is data of 10k new clients that needs to be processed. Cheers!"
The message is meant to ensure correct processing flow (message has been received, processed, if it fails somebody else will process it, etc.), but it does not have to carry all the data.
I have fixed at least one platform that "reached limits of Kafka" (their words not mine) and "was looking for expert help" to manage the problem.
My solution? I got the component that publishes upload the data to compressed JSON to S3 and post the notification with some metadata and link to the JSON. And the client to parse the JSON. Bam, suddenly everything works fine, no bottlenecks anymore. For the cost of maybe three pages of code.
There is few situation where you absolutely need to track so many individual objects that you have to start caring if they make hard drives large enough. And I managed some pretty large systems.
> I do agree, but listen... this is supposed to be handy solution. You know, my app already uses MongoDB, why do I need another component if I can run my notifications with a collection?
We're in agreement, I think we may be talking past each other. I use mongo for the exact use case you're describing (messages as signals, not payloads of data).
I'm just sharing a footgun for others that may be reading that bit me fairly recently in a 13TB replica set dealing with 40mm docs/min ingress.
(Its a high resolution RF telemetry service, but the queue mechanism is only a minor portion of it which never gets larger than maybe 50-100 MB. Its oplog window got starved because of the unrelated ingress.)
You have a single mongo cluster that's writing 40M docs a minute? Can you explain how? I dont think I've ever seen a benchmark for any DB that's gotten above >30k writes/sec.
Sorry for the late reply here, just noticed this. You're correct that figure was wrong, that metric was supposed to be per day, not per minute. Its actually closer to 47mm per day now, so roughly 33k docs/min.
> I dont think I've ever seen a benchmark for any DB that's gotten above >30k writes/sec
Mongo's own published benchmarks note that a balanced YCSB workload of 50/50 read/write can hit 160k ops/sec on dual 12-core Xeon-Westmere w/ 96GB RAM [1].
Notably that figure was optimized for throughput and the journal is not flushed to disk regularly (all data would be lost from last wiredtiger checkpoint in the event of a failure). Even in the durability optimized scenario though, mongo still hit 31k ops/sec.
Moving beyond just MongoDB though, Cockroach has seen 118k inserts/sec OLTP workload [2].
> We maintain things like queue ordering by adding an ORDER BY clause in the query that consumers use to read from it (groundbreaking, we know).
The dude's being a bit too self-deprecating with that (sarcastic quip).
But there's valuable something buried there - if it is possible to efficiently solve a problem without introducing novel mechanisms and concepts, it is highly desirable to do so.
Don't reinvent the wheel unless you need to.
> You could set the prefetch count to 1, which meant every worker will prefetch at most 1 message. Or you could set it to 0, which meant they will each prefetch an infinite number of messages.
O what the actual fuck?
I'm really hoping he's right about holding it wrong, because otherwise ???
I’d venture to guess that the median RabbitMQ-using app in production could not easily be replaced with postgres though. The main reasons this one could are very low volume and not really using any of RMQ’s features.
I love postgres! But RMQ and co fulfill a different need.
and the high end of unreasonable hardware can get you reaaaaaallly far, which in the age of cloud computing is something people forget about and try to scale horizontally when not strictly necessary
Not when a queue is involved. IME trying to replicate something like beanstalkd (https://beanstalkd.github.io/) in postgres is asking for trouble for anything but trivial workloads.
If you're measuring throughput in jobs/s, use a real work queue.
I think you are dramatically underestimating the speed of Postgres here if you think it breaks > 1 job/s. That's nowhere close to the point where you'd run into trouble with Postgres.
Have you used postgres for a work queue before? I have, and it did not scale even for smaller workloads. It's possible there are new primitives that make it more efficient, but back in 2014/2015 it didn't work.
And yes, the point is then to use a database to hold the work, dispatching from that. In the Mats3 context I describe, the reason is to pace the dispatching to a queue (!), but for them, it should be to just run the work from that table. Also, the introspection/monitoring argument presented should be relevant for them.
That a message queue library fetches more messages than the one it is working on is totally normal: ActiveMQ per default uses a prefetch of 1000 for queues, and Short.MAX_VALUE-1 (!) for topics. The broker will backfill you when you've depleted half of that. This is obviously to gain speed, so that once you've finished with one message, you already have another available, not needing to go back to the broker with both network and processing latencies:
https://activemq.apache.org/what-is-the-prefetch-limit-for
In summary, I feel that the use case they have, "thousands of jobs per day", which is extremely little for a message queue, where many of these jobs are hours-long, is .. well .. not optimal use case for a MQ. It is the wrong tool for the job, and just adds complexity.
While some argue that RabbitMQ was misconfigured, I agree with the argument that reducing tech stack complexity is beneficial if a technology does not offer advantages that cannot be achieved with the basic tech stack.
When programming my side hustle I also had the requirement of a SIMPLE queue. I didn't want to introduce AWS SQS or RabbitMQ, so I wrote a few C# classes which where dequeueing from a MongoDB collection. It works pretty well. It basically leverages the atomic MongoDB operation "findAndModify", so you can ensure that dequeueing will find only messages in status "enqueued" and in the same operation sets the status to "processing" so you can ensure that only one reader processes the message. (https://www.mongodb.com/docs/manual/reference/method/db.coll...)
- clear case of misconfigured instance (prefetch) and a bug somewhere in the stack (reconnection issue)
- prefetch behavior sounds like they receive a message, ack it then process it
- i wouldn't recommend ack before processing, because you become responsible for tracking and verifying if the worker ran to completion or not.
- work then ack is the way. the other way around ignores key job processing benefits like rabbitmq automagically requeueing messages when a worker crashes and failure related logic like deadletter queues.
- the trick i've started leaning on with rabbitmq is giving each worker their own instance queue (i call it their mailbox).
- when a worker starts a job, it writes the job id, start time and the worker's mailbox to a db. any system can now look up the "running job" in the database, know how long it has been running and can even talk to the worker using its mailbox to inquire if it is running and if that job state in the db is accurate.
- happy the writer and team found what works for them. ultimately, what you understand best would serve you better, so they made a good choice to lean on their strengths (postgre).
I don't agree with their assessment, but also not really interested in putting effort into debunking it.
Just occurred to me there's a different possibility:
The extra connection bug mentioned could be from a rogue thread/process/code-path connecting to the queue and fetching an item, but somehow not doing the work.
Verifying this likely requires access to the codebase, so treat this as little more than idle speculation.
from what i gather, it looks like they saw the 'q' in rabbitmq and thought 'ha, a queue we could use.' totally ignoring the 'm' part. the blog post doesn't say much but it's obvious they were not sending 'messages'. a message is a specific thing, it contains information that is meaningful to the recipient[0]. or perhaps rabbitmq has allowed itself to be drawn into all sorts of use cases (as a result of competition with kafka)? a message should be very small, immediately ack-ed or rejected (i.e. before any processing begins). that's why rabbitmq assumes it can run entirely in memory, because messages are not expected to stay in queues for long (i.e. during processing by recipients).
I’ve been doing this for a long time. Back then I thought I was just being lazy, not wanting to maintain another component for a low volume of events, but over time, I saw the elegance of reducing the number of components in the architecture. Today, a quarter of a billion dollar/yr business runs on top of that queue, which just works.
> And we guarantee that jobs won’t be picked up by more than one worker through simple read/write row-level locks. The new system is actually kind of absurdly simple when you look at it. And that’s a good thing. It’s also behaved flawlessly so far.
Wouldn't this lead to contention issue when a lot of multiple workers are involved?
My understanding is that a naive implementation essentially serializes access to the queue table. So it works, but no matter how many requests you make in parallel, only one will be served at a time (unless you have a random component in the query).
With SKIP LOCKED you can resolve this easily, as long as you know about the feature. But almost every tutorial and description of implementing a job queue in Postgres mentions this now.
Postgres is super cool and comes with batteries for almost any situation you can throw at it. Low throughput scenarios are a great match. In high throughput cases, you might find yourself not needing all the extra guarantees that Postgres gives you, and at the same time you might need other capabilities that Postgres was not designed to handle, or at least not without a performance hit.
Like everything else in life, it's always a tradeoff. Know your workload, the tradeoffs your tools are making, and make sure to mix and match appropriately.
In the case of Prequel, it seems they possibly have a low throughput situation at hands, i.e. in the case of periodic syncs the time spent queuing the instruction <<< the time needed to execute it. Postgres is great in this case.
I think its a reasonable option and the webapp.io people scaled it out pretty high. At Zapier we utilize RabbitMQ heavily and I cannot imagine scaling the amount of tasks we handle each day on postgres.
This article mostly is lesson learns and misconfigurations of RabbitMQ though. Which is probably a good reason to simplify if you don't need its power. No reason to have to learn how to configure it if postgres is good enough.
A lot of companies would be better off if they had just used a single big database instance with some read replicas instead of all the distributed cloud blahblahblah that 99.9% of even tech companies will never need.
Thousands a day? Really? Even if it were hundreds of thousands a day it would make more sense to use a managed Pub Sub service and save yourself the hassle (assuming modest throughput).
Yeah, I'd do the opposite of what they ended up doing. Start with Postgres, which will handle their thousands-per-day no sweat. If they scaled to > 100 millions/day, then start investigating a dedicated message bus / queue system if an optimized PG solution starts to hit its limits.
Yeah, it seems like a more natural evolution into specialization and scale rather than a step "backwards" to PG which I suspect will be the subject of a future blogpost about replacing their pq queue with another solution...
We use ActiveMQ (classic) because of the incredible console. Combine that with hawt.io and you get some extra functionality not included in the normal console.
I'm always surprised, even with the older version of ActiveMQ, what kind of throughput you can get, on modest hardware. A 1gb kvm with 1 cpu easily pushes 5000 msgs/second across a couple hundred topics and queues. Quite impressive and more than we need for our use case. ActiveMQ Artemis is supposed to scale even farther out.
Same. Ran a perf test recently. With two 1 core brokers I got 2000 rps with persistence and peaked at 12000 rps with non-persistence.
We’ve also had similar issues as OP, except fixing it just came down to configuring the Java client to have 0 prefetch so that long jobs don’t block other msgs from being processed by other clients. Also using separate queues wide different workloads.
It seems that OP's company doesn't really know anything about the job workload beforehand, as the jobs are created by their customers. Having different queues for short/long workloads might be impossible.
For small stuff, rabbitmq and celery are hideously heavy to use. I had issues with celery-for bg tasks that are scheduled you could not execute further async requests in the tasks which was mind blowingly useless. This was years ago.
Nowadays for small stuff I just create a simple script that uses asyncio, I can async request stuff and run_forever etc and handle it all as a docker service.
One thing worth pointing out - that the approach described in TFA changes PUSH architecture to PULL.
So now you have to deal with deciding how tight your polling loop is, and with reads that are happening regardless of whether you have messages waiting to be processed or not, expending both CPU and requests, which may matter if you are billed accordingly.
Not in any way knocking it, just pointing out some trade-offs.
Also had a similar experience using RabbitMQ with Django+Celery. Extremely complicated and workers/queues would just stop for no reason.
Moved to Python-RQ [1] + Redis and been rock solid for years now. Redis is also great for locking to ensure only one instance of a job/task can run at a time.
We have a mix of agenda jobs and rabbitmq. I know there are more complex use-cases, like fan out. but in reality the rabbit stack keeps disconnecting silently in the stack we're using (js). Someone has to go in and restart pods (k8s).
All the stuff on Agenda works perfectly all the time. (which is basically using mongo's find and update)
> One of our team members has gotten into the habit of pointing out that “you can do this in Postgres”
Actually, using Postgres stored procedures they can do anything in Postgres. I am quite sure they can rewrite their entire product using only stored procedures. Doesn't mean they really want to do that, of course.
SQL server won't callback clients [or application-server] informing that new data is available. So how do you poll it? A query in periodic loop? Some other way? How much that scales, or how much load does that create on the server?
I love postgresql. It's a great database. However, this blog post is by people who are not quite experienced enough with message processing systems to understand that the problem wasn't RabbitMQ but how they used it.
Another middle ground is AWS Batch. If you don’t need like complicated/rules based on the outcome of the run etc it’s simpler, especially if you’re already used to doing ECS tasks.
It's important not to gloss over what your actual use-case is. Don't just pick some tech because "it seems simpler". Who gives a crap about simplicity if it doesn't meet your needs? List your exact needs and how each solution is going to meet them, and then pick the simplest solution that meets your needs.
If you ever get into a case where "we don't think we're using it right", then you didn't understand it when you implemented it. That is a much bigger problem to understand and prevent in the future than the problem of picking a tool.
In the very few occasions that I've seen a queue backed by a Postgres table, when a job was taken, its row in the database was locked for the entire processing. If the job was finished, a status column was updated so the job won't be taken again in the future. If it wasn't, maybe because the consumer died, the transaction would eventually be rolled back, leaving the row unlocked for another consumer to take it. But the author may have implemented this differently.
That's a good approach if the worker is connected to the database.
If an external process is responsible for marking a job as done, you could add a timestamp column that will act as a timeout. The column will be updated before the job is given to the worker.
Not the author, but I've used PG like this in the past. My criteria for selecting a job was (1) the job was not locked and (2) was not in a terminal state. If a job was in the "processing" state and the worker died, that lock would be free and that job would be eligible to get picked up since its not in a terminal state (e.g., done or failed). This can be misleading at times because a job will be marked as processing even though its not.
I don't doubt that a switch from a custom backfill management system written on RabbitMQ could be rewritten in certain cases on Postgres for equivalent if not better results.
My question would be why you're in the business of writing a job manager in the first place.
It ultimately doesn't matter because of the low volume they're dealing with, but gang, "just slap a queue on it" gets you the same results as "just slap a cache on it" if you don't understand the tool you're working with. If they knew that some jobs would take hours and some jobs would take seconds, why would you not immediately spin up four queues. Two for the short jobs (one acting as a DLQ), and two for the long jobs (again, one acting as a DLQ). Your DLQ queues have a low TTL, and on expiration those messages get placed back onto the tail of the original queues. Any failure by your consumer, and that message gets dropped onto the DLQ and your overall throughput is determined by the number * velocity of your consumers, and not on your queue architecture.
This pg queue will last a very long time for them. Great! They're willing to give up the easy fanout architecture for simplicity, which again at their volume, sure, that's a valid trade. At higher volumes, they should go back to the drawing board.