Another neat hack is to use Postgres as a quick & dirty replacement for Hadoop/MapReduce if you have a job that has big (100T+) input data but small (~1G) output data. A lot of common tasks fall into this category: generating aggregate statistics from large log files, searching Common Crawl for relevant webpages, identifying abusive users or transactions, etc.
The architecture is to stick a list of your input shards in a Postgres table, have a state flag that goes PENDING->WORKING->FINISHED->(ERROR?), and then spin up a bunch of worker processes as EC2 spot instances that check for the next PENDING task, mark it as WORKING, pull it, process it, mark it as FINISHED, and repeat. They write their output back to the DB in a transaction; there's an assumption that aggregation can happen in-process and then get merged in a relatively cheap transaction. If the worker fails or gets pre-empted, it retries (or marks as ERROR) any shards it was previously working on.
Postgres basically functions as the MapReduce Master & Reducer, the worker functions as the Mapper and Combiner, and there's no need for a shuffle phase because output <<< input. Almost all the actual complexity in MapReduce/Hadoop is in the shuffle, so if you don't need that, the remaining stuff takes < 1 hour to implement and can be done without any frameworks.
If a worker fails or gets pre-empted, how does it retry anything? It's gone at that point, no?
Sounds like you'd end up with a bunch of dangling shards orphaned in WORKING state. And now you need timeouts and health checks and something to coordinate all that.
You've got your normal AWS orchestration working on top of this. If a spot instance is pre-empted AWS will automatically re-start it once the current spot price falls below your bid. One of the parameters you pass to the spot instance is a shell script to run when the instance starts, so that shell script just launches your worker process and in your worker process's main() it checks for the existence of previous WORKING shards with the same machine ID and marks them back as PENDING. You can (and probably want to) also have a retry count that gets incremented so that if there's a problem with the data that results in a permanent failure, you give up, mark it as ERROR (logging the offending shard, record, and error message if possible), and move on to the next shard.
For exceptions, crashes, and other unexpected errors, you just let it crash. In the shell script that starts the worker process, you wrap it in "while (1) { ... }", so it's just permanently restarting. Then the aforementioned retry check in main() handles the previous shard just as if the worker had been pre-empted or finished normally.
There are a variety of low-tech ways to detect when the job as a whole has finished, ranging from manually shutting down the cluster to writing a small cronjob that counts PENDING/WORKING shards and shuts down the cluster if there are none to having your workers return a different exit status if there's no more work to do and shutting down the box if so. A neat side effect of this is that you get an easy status report of exactly which machine is working on which shard and how long you have to go through "SELECT * FROM work_queue". Another neat side effect is that you can view partial results in the DB before the job as a whole completes, something you can't do with MapReduce, and potentially adjust your code and restart the job if you're getting bad data.
I used to do something a little like this with postgres advisory locks. Worker takes out a lock that signifies it's working on a task, if the worker fails or loses its connection in any way the lock gets dropped along with the transaction being rolled back.
SQL is pretty powerful. A last_updated column and a PENDING view that shows PENDING and WORKING with now - last_updated > CUTOFF does a decent job for you.
Yup, but they do so only once per shard (well, twice, once at the beginning and once at the end). If you've got a big job where each shard takes a few minutes to process and a hundred or so workers, the DB gets about 1 req/sec, which is well within the capabilities of Postgres.
We had an "interesting" bug where we inadvertently ended up polling postgres advisory locks around 6 million times per minute from a cluster of 200 servers. It added about 20% load to the database, enough to be anomalous but not enough to outright trip any alarms. The overall database performance was fine.
That works fine too. The advantage of Postgres is that the master is a single point of failure, and if Postgres goes down all the state is persistent and you can just bring it back up again. Also, ad-hoc query tools are a little better for Postgres, which gives you a really simple interface to inspect the status of the job and the partial results.
This architecture originally evolved from an Amazon-SQS based queue system, where I'd stick the jobs in SQS and pop them off. Then I realized that I needed the DB anyway for my reducer-substitute, and if I'm writing SQL/ORM code anyway it was simpler to use the DB to manage the queue than to use SQS, and it gave me some other nifty abilities like being able to record timing & size stats with each shard, being able to query progress while the job is going, and being able to manually restart shards after correcting the errors that led to their failure. If you're running dozens-to-hundreds of workers, the cost of a t2 RDS instance (that's all you need for 1 QPS) is a rounding error, and a lot cheaper than EMR would be.
All the replies to this and no mention of the workflow pattern! I guess Google went ahead and NIH'ed it too, to boot! Surely some of those people had programmed a workflow before, somewhere along the line.
I'm not going to say "I don't know why anybody is surprised," but I'll tell you that I'm surprised! This is good, you're all one of 10,000![1]
I'd call parent's a "basic" workflow, but Wikipedia seems to tag it as "state-based."[2] I'm glad someone pointed out that failed steps do not update the db unless you have good traps in the process itself. I'd probably suggest a housekeeping process that resets any errors after some amount of time so that they're run through again from the beginning. As long as the error is surfaced in the meantime, that is.
This was actually for a few of my post-Google projects. At Google we would just use MapReduce regardless of how inappropriate it was because of the difficulty of standing up an RDBMS on Google's cloud infrastructure, plus CPU cycles were basically free for engineers. Maybe it's different now that internal projects are encouraged to use Google Cloud; Cloud SQL + Compute Engine works just fine for this.
And this is definitely not a new pattern - if you wanna go back in time, this is how things often worked in the mainframe era. My point - and IIUIC the point of the article - is that people often reach for flavor-of-the-month heavyweight frameworks when they can easily adapt tools they're already using to solve the problem within hours.
(Another great example of this is the single-process in-RAM architecture used by PlentyOfFish, StackOverflow, Mailinator, and Hacker News when they were solo-developer projects, where you store all state in RAM and run the whole site on a single box, periodically dumping out checkpoints to disk in case the process crashes. This can scale to thousands of requests per second - that's millions of queries per day - and is a lot simpler to program than 3-tier architectures where everything runs through a data abstraction layer and you spend 90+% of the code shuffling data around.)
>people often reach for flavor-of-the-month heavyweight frameworks when they can easily adapt tools they're already using to solve the problem within hours.
Absolutely, which is why I think it's important to call concepts the same thing they called it in the olden times if you know it's an old thing!
It's not really, although you really want to do it with a data store that supports transactional semantics. Postgres, MySQL with InnoDB,Redis, Amazon SQS, DynamoDB will all work. First time I used this pattern was with Amazon SQS; second time was MySQL with InnoDB tables. The article's about Postgres hacks, though, and Postgres is pretty commonly used for other parts of your system.
I probably would have gone with something like Kafka in this case, but it seems like Postgres serves them pretty well even if it's the "wrong" tool. If it works, fits your current needs, leads to faster development time, and isn't needlessly slow, then I say go for it.
Picking Postgres for alternative applications such as pub/sub services also simplifies both deployment and development. You'll already be using Postgres for persistence, so why waste time deploying another set of services and develop another set of clients just to get the same functionalities?
I have some light experience with this. If you have the PostgreSQL hammer you sometimes think - I really should be using something else, but this hammer works with a lot of nails - and the use cases that postgresql works well enough on keep increasing.
I was pleased to see they are using `SELECT FOR UPDATE SKIP LOCKED`. That is what this 2nd Quadrant article recommends, which I think is required reading if you want to implement this yourself:
We use a lot this kind of tooling.. say, you need to check 20k URLs and you want to rate limit them.. add them to a Pg table (with state and result fields). A single thread worker that just takes a row (marks it as pending) and later updates it. With select for update and skip tricks you can horitzontal scale it to the number of workers you need.
I had seen it also for soft that sends massmail (our case around 100k/day).. it's state is a postgres queue.
We also use Pg for transactional mail. We insert it on a table. (There is a process that sends the row mails).. the so nice part is that the mail is joining the dB transaction for free.. (all or nothing)
Excellent design hack. If anybody in the Node/TypeScript ecosystem is looking for this capability in a neat and supported library, it looks like the graphile folks have you covered:
https://pypi.org/project/pq/ looks similar, although it looks like they use `SKIP LOCKED` instead of advisory locks. I'm not sure what the tradeoff is.
I just remembered that Celery has a DB backend too, but I've not looked deeply; I can't see it doing anything particularly fancy in the implementation:
Anyone know of a good Java library for PostgreSQL-backed queues using the "SKIP LOCKED" technique? I've scoured GitHub but can't seem to find anything.
Geat post!! At Rudder (open-source segment), we used Postgres as our streaming layer in a similar fashion. It has been super stable in production, very easy to setup and we easily get to > 10K events/sec insertion performance.
The code is open-sourced here in case anyone wants to re-use
The caveat to this is in the first paragraph: all of the data is stored in memory id each row is ~1kb and the table size has some sort of TTL you’ll be good.
Yeah, but even without tuning, you can have hundreds of "sparse" queues with this pattern - you don't have to care about the frequency of insertions or deletions or the size of the rows.
I actually used redis pub/sub before switching to triggers - it is significantly easier to just:
INSERT INTO JOBS
versus
INSERT INTO JOBS
redis-cli LPUSH job-queue job-id
It's also easier to configure, redis has BRPOPLPUSH for acked-queues, but it's much easier to just query jobs with a select statement from the database.
Internally but if the change causing the work is in PostgreSQL Redis would need to support 2-phase commits to commit the change and the work together atomically(and durably).
What if your listeners crashed/were down at the time of the `PUB` message? Does this mean the message falls into oblivion (since it will never receive a reply/ACK/get worked on)?
If so if worker crashes you get a connection timeout on postgresql and it rolls back the transaction and releases row locks - then next select SKIP picks it up.
If you want fast response with safety listen and on restart do a poll always to pick up anything that's ready that you might have missed a pub for. If you want lots of coverage do a poll every 10s - postgresql can handle it, you still get immediate response by listening to notify.
If you need fancier approaches you can do a last_updated time on the job and or watch waiting on pg_stat_activity or similar etc - but all totally overkill I think - the simple approach get's pretty far (not an expert though at all).
The pattern I follow is that workers immediately poll once when they start - this means that when your workers restart after crashing, they'll pick up any missing jobs.
The comment you are replying to wasn't addressing that scenario.
Typically you would implement visibility timeouts and other such stuff. Depending on the use case you could make specific optimizations or keep it generic and have SQS like semantics or something.
Because it's trivially easy. The dequeue operation updates the last_lease time; the query makes sure you don't dequeue something that was recently leased. It's far, far easier than setting up a separate job queue, especially if you are already using an RDBMS.
You can have 3 queues and 3 dispatchers, arrange some backpressure, a push-based consumer-producer model and you will have 3 queues, 300 queries and n polls per interval.
At the scale I operate at, I wouldn't consider this a viable option. What's the backpressure like on NOTIFY/LISTEN? (Docs mention a maximum backlog of 8GB on the message queue, is that configurable? Monitorable?) Tons of constant churn on a table means we have to worry about vacuuming right? Now I have to monitor that too to make sure it's keeping up. Not to mention all the usual operational issues with running relational databases.
No thanks, I'll stick with GCP PubSub or AWS SQS, which are explicitly designed for this use case, and for which I have to setup no infrastructure.
At scale, the first problem you will generally hit is that each subscriber needs to hold open a connection to the database, chewing precious database resources (1 process, a little RAM, maintenance overhead). Works great on smaller systems though. Hard to state where the cutoff is given it is a function of hardware capacity.
yeah same. i will use the best tool for the job -- for me, using rabbitmq for a queue system is the best tool for the job. simple to put up, scales really well, out of the box library for all major languages, and it's absurdly reliable.
hell, even its clustering system got a lot better!
- postgres doesn't maintain a queue for notify/listen, it's purely pubsub. I'm not quite sure how the backpressure could make it to the database
- A lot of use cases involve not dropping messages after they are processed (like CI jobs, in this example), so you don't have to vacuum the rows
- If you're comfortable with SQS there's no really big reason to switch, but it makes it so that your project can only run on amazon cloud servers, which is annoying.
> There is a queue that holds notifications that have been sent but not yet processed by all listening sessions. If this queue becomes full, transactions calling NOTIFY will fail at commit. The queue is quite large (8GB in a standard installation) and should be sufficiently sized for almost every use case.
My understanding of MVCC (correct me if I'm wrong), is every time you do an update, dead tuples are left behind that need to eventually be vacuumed. If vacuum isn't running or can't keep up, you'll run out of space or have a tx id wrap around.
Work queues are a simple paradigm and swapping out one queue for another is a trivial exercise that won't change the architecture of a system, and SQS or PubSub could be easily used, regardless of where the rest of the project is hosted. I'd rather pay that one-time cost than the ongoing cost of maintaining my own job queue service and infrastructure.
The LISTEN/NOTIFY queue has nothing to do with MVCC. Only reason why it exists is that postgresql synchronizes the notification events with transaction boundaries (ie. you get notifications only when you don’t have active transaction). Given this the only case when you would care about the depth of the notification queue is when your application is exceptionally misbehaved and in that case you will run into more critical problems (stale locks, MVCC bloat...) well before that will begin to be an issue.
Please notice the usual wisdom of PostgreSQL developers "(8GB in a standard installation)" meaning they probably set up a config parameter that you can tweak if you ever reach this limit while scaling.
This is an often untold superpower of PostgreSQL when arguing about how it supposedly "don't scale well". You often have a large array of optimizations available before your project really scale out of scope.
Or I can use a hosted queue service and not have to worry about it at all. The less I have to worry/configure/optimize, the less room for mistakes, and the more operationally solid my system will be.
My point isn’t that Postgres can’t do it. It can. But it just requires so much more effort to get it right. And this article glosses over so many critically important details about doing these things at scale in production environments. Most of them are things I don’t have to worry about when using a real queue service.
> but it makes it so that your project can only run on amazon cloud servers
Why? Can't you use it as a standalone product that integrates into whatever service you want? The last time I used it was a few years ago for batch processing. I had Heroku instances that dispatched jobs directly to SQS. I'm doubtful but it's been a while since I've used the service so it might have changed.
> In the list above, I skipped things similar to pub/sub servers called "job queues" - they only let one "subscriber" watch for new "events" at a time, and keep a queue of unprocessed events:
If your job queue only allows one single worker (even per named queue), I'd argue it's a shit job queue.
Right, I spent some time earlier this year researching a number of job queue systems for a client and never came across one that had this limitation, which kind of makes me wonder what exactly the article is trying to claim.
Can someone share their experience with scaling pg's NOTIFY and LISTEN?
The use case I have in mind has a lot of logically separate queues. Is it better for each queue to have its own channel (so subscribers can listen to only the queue they need) or have all queues notify a global channel (and have subscribers filter for messages relevant to them). I am mainly confused about whether I need a dedicated db connection per LISTEN query and also how many channels is too much.
Based on what I have read and some experimentation, I concluded that it will not scale very well beyond ~500 listener processes because there needs to be one persistent connection per listener. Furthermore, my understanding is using pgbouncer will not help since it needs to use session pooling in order to support LISTEN.
Hacks like this work at first, but long running transactions and postgres don't do well together.
After a few weeks of running on a multi-TB table, you'll find the dead tuples massively outnumber the live tuples, and database performance is dropping faster than the Vacuum can keep up. Vacuum is inherently single-threaded, while your processes making dead tuples as part of queries are multi-threaded, so it's obviously the vacuum that fails first if most queries are long running and touch most rows. Your statistics will get old because they're also generated by the vacuum process, making everything yet slower.
Even if you can live with gradually dropping performance, eventually the whole thing will fail when your txids wrap around and the whole database goes read-only.
Postgres generally has a fairly low maximum connections. If you're running your own servers, you can adjust this, but in the cloud you may not be able to. For example, Google CloudSQL maxes at 1000, Heroku at 500.
At that point, people usually start looking at the connection pooling tools. Depending on how much work you need from the DB, connections pools can be a win. Anyone know how connection pooling works with listeners?
> Anyone know how connection pooling works with listeners?
They don't. LISTEN is per connection and pgbouncer multiplexes many sessions onto a single one. The poller holds the connection and has no way to propagate back the notification while still maintaining multiplexed sessions as isolated.
So while postgres makes for a pretty awesome database, and a pretty good queueing system, some folks may be seriously impacted by the max number of connections
The reason for low connection limit is because every connection is a separate process. There are also various structures in postgres that are frequently traversed during various operations that large number of connections affect[1]. In my company we use aurora which supposedly was also modified connection handling to use threads to handle more connections. By default I think aurora is set to 500 connections.
In our case we had about ~200 connections to database. After we placed pgbouncer in front and reduced number of connections to 60. Our commit throughput doubled.
Can't say about listeners. But connection poolers introduced more problems than I cared to fix. So now I routinely use 3000 connections, and have 9000 set up for peaks. It eats a little more ram but it is more stable.
With today's hardware I argue you most likely do not need a pub/sub service. Considering the extra work needed in a distributed system, you could save a lot of time by keeping it simple.
This is where the Debezium connector for Postgres [1] comes in: it will retrieve change events from the TX log and push it to brokers such as Apache Kafka or Pulsar, or directly to your application. When not listening, any consumer will continue to read from where it left off before (applying "at least once" semantics).
I fail to see how Debezium would solve this as it's just as likely Debezium wouldn't be watching the WAL as if you had a minimal number of workers always listening and those went down for some reason.
Debezium stores the WAL position it has read ("offset"). So while it's down, of course it won't be able to read any events. But once it's restarted, it will continue to read the WAL from the last offset. So no events will be missed also when the connector isn't running for some time (WAL segments are retained by the DB until acknowledged by a logical decoding client such as Debezium).
In my experience, you don’t. With tons of IOPS you need EBS or crazy expensive instances.
Instead you use a cloud like google cloud where you can add NVMe SSDs to whatever instance type you need and configure custom RAM and CPU instead of picking from the super expensive AWS instances with no configurable options and almost always the wrong resource allocations for your workload.
Source: testing my infrastructure that requires 60,000 iops on both google cloud and AWS and it being 1/4 the cost and higher performance on Google. Of note: this was a very high throughput streaming data application. YMMV for other applications.
I'll soon have to do a pub/sub for an application that's close to a multiplayer video game.
Most advices I have seen say that I'll probably want to code it myself, but I was wondering about the latency of that solution? I'll likely have a SQL store and that would be a good argument to use postgres...
For straight up pub/sub (without the job server), I tried NOTIFY/SUBSCRIBE in Postgresql, but hit some limitations.
We have switched to an Phoenix (elixir) server which listens to the database, then clients subscribe via websockets. We're opensourcing it here: https://github.com/supabase/realtime
It's still in very early stages (although I am using it in production). Basically the Phoenix server listens to PostgreSQL's replication functionality and converts the byte stream into JSON, which it then broadcasts over websockets. This is great since you can scale the Phoenix servers without any additional load on your DB. Also it doesn't require wal2json (a Postgres extension). The beauty of listening to the replication functionality is that you can make changes to your database from anywhere - your api, directly in the DB, via a console etc - and you will still receive the changes via Phoenix.
Redis pub / sub has worked fantastically for me - reached 100k+ concurrent users with ~80 msg / sec, but benchmarked much higher. It was the easiest, fastest, and most reliable solution I found (about ~4 years ago)
Looking into it, it seems to have something like 2ms of latency which is good. Do you think it would be a good idea to use for exchanging, say, realtime position of players in a virtual world?
I'm curious if the same holds true if you drop in Sqlite/MS Sql Server/Mysql.
I.e. is this good advice because Postgres in particular is a great implementation of sql, or because sql in general is good enough to solve this problem, or a mix of the two?
SQL Server has Service Broker (my prize winner for most undervalued and unknown feature in a mainstream software product) that does a tremendously good job of solving these types of problems. I can't say how well it scales, but it has done a reasonable job at every workload I've thrown at it.
IMO the reason why Service Broker is mostly unknown and undervaluated is the name itself. It sounds like something horribly complicated, while it in fact is relatively straightforward implementation of pub/sub.
I agree that PostgreSQL has some very nice extensions and if you know you are going to have multiple users (or multiple processes using it), SQLite can't compete.
But if you only have one process using the DB, then SQLite is immune to N+1 query problem since everything is in-process and there are no round-trips.
Well, yeah, but then there's stuff like enterprisedb which is postgresql + some proprietary stuff on top, and the company employs some of the postgresql core developers. So in some sense it's "PostgreSQL Enterprise Edition" in all but name.
As for mysql - I don't know why anyone wouldn't prefer mariadb - but I suppose it's conceivable mysql has features that makes it worth dealing with oracle licensing. But I doubt it.
The key features used here for PG seems SKIP LOCKED, and the PG notification system, which informs all clients that are directly connected to the database that "this table has changed" so that they can restart their work.
The notification system comes with a cost. You can't scale up the number of connection to a PG database instance.
Therefore you need to go pooling.
And for SKIP LOCKED, I just checked and mysql/maria do have it.
I did something very similar with MySQL since that was the DB we already had setup for data (not my choice). Basically any database that has atomic operations can do this. It definitely is way more convenient and cheaper to do use your existing infra for pub/sub and then only scale out to other services once performance becomes subpar. Although if you already have a messaging service up and running, it’s probably better to use that
I agree. I have prototyped this in the past, but our current pub/sub was not painful enough for us to go full steam ahead with PG.
However, my design was more bare bones. I was picking jobs by chaining CTE's that did the status update as they returned the first element of the queue.
Typically (at least when I've implemented variations of this pattern) a batch job run on a schedule performing something like "expire stale/completed jobs past a threshold."
If you needed the existence of completed jobs to compute future jobs, use a supplementary datastructure that stores spans or other heuristics, updated in a transaction with job completion
It makes the job-fetching more atomic, as the article points out properly. If you take >1, you have to sort the situation where 1 or more jobs fail, which adds an unreasonable amount of complexity.
On my first go I was taking batches, on the reasoning that it optimized for query performance. It was a complete disaster and I quickly realized that I was engaging in premature optimization.
Batching is out, stream-processing is in. If you design your job table correctly, PG will perform very well as an advanced stream processor.
Right, I was working mostly off one very well-written article on the topic, it was an easy search because there is really only one correct way to implement atomic job-handling with PG.
The architecture is to stick a list of your input shards in a Postgres table, have a state flag that goes PENDING->WORKING->FINISHED->(ERROR?), and then spin up a bunch of worker processes as EC2 spot instances that check for the next PENDING task, mark it as WORKING, pull it, process it, mark it as FINISHED, and repeat. They write their output back to the DB in a transaction; there's an assumption that aggregation can happen in-process and then get merged in a relatively cheap transaction. If the worker fails or gets pre-empted, it retries (or marks as ERROR) any shards it was previously working on.
Postgres basically functions as the MapReduce Master & Reducer, the worker functions as the Mapper and Combiner, and there's no need for a shuffle phase because output <<< input. Almost all the actual complexity in MapReduce/Hadoop is in the shuffle, so if you don't need that, the remaining stuff takes < 1 hour to implement and can be done without any frameworks.