Edit: Reading the article again, I think the most interesting part is the Task Runner, which presumably would allow me to have custom behaviours... but the kinds of things I can do are still really unclear.
Also does it go both ways?
Can I take data from S3 and plug it back into DynamoDB?
For us, a task is a predefined set of processes. An instance of a task is a stream, an instance of a process is a process instance. Typically, a stream will have a few process instances that are typically batch jobs sent to the batch farm. We also allow jython scriptlets as process instances to do house keeping, namely to instantiate multiple sub streams and define variables and the environment that might be required for a job. The root level task can have a version and a revision number.
For instance, a top level stream will have a process instance that instantiates 1000 substreams, which have batch jobs that can be sent out to the batch farm. Those complete, and the substream can send do more or exit based on the status and preconditions. The top level stream will end in success, failure, termination, or cancelation depending on conditions. Any stream or process instance can be rolled back, but only when the parent streams are all in a final state because typically the preconditions (onSuccess, onFailure, onDone, etc...) applies to all streams and process instances to avoid race conditions and data corruption.
We use Oracle to do all our bookkeeping. Jython scriptlets are ran on the main server (typically only used to control flow). We use email for batch system messaging. Typically our jobs are very long jobs and take multiple hours complete, especially monte carlo jobs, and because of this, email has scaled fine, because the amount of emails rarely exceeds 100/second, even if we are concurrently running 2000+ batch jobs. I'd like to move to 0mq for the slight benefit in performance, but really email has been fine.
Most messages are logged directly to Oracle. Batch instances will typically have a log file, but that usually ends up on an nfs server or somewhere a user defines. Previous executions are archived. A user will define a log root, and the log file would typically be in the form of something like:
This can sometimes lead to directories with a very large amount of files,
We abstract out the batch farm and have custom daemons so we can connect dispatch jobs to various batch farms around the world.
So far, it's scaled pretty good. We have over 30 million streams and over 60 million process instances logged in our database. Some of the tasks are pretty complex tree structures do to the parent-child relationship and nesting we allow. This is one reason we've stuck with oracle, due to the fact that it has a CONNECT BY statement. In practice, that's been okay, and would continue to be okay if we had a good partition plan. We have plans to move away from Oracle and implement a database agnostic solution based on temporary tables. Due to legacy requirements and uptime necessity (this software has been developed for the last 5 years), it's been hard to implement these fixes in production. We plan to overhaul some of this when we get a new database server.
Storage is typically up to users writing a task, who often use nfs or, more typically, xrootd (http://xrootd.slac.stanford.edu/) depending on requirements. Many will register the data with other software I develop, called datacatalog, which is mainly for record keeping and metadata maintenance for the data. Access for this is typically done through jython scriptlets.
I think what would work best in our case is, in our stream table, an extra column for what I'd call "ancestor", or the top most stream and execution, then using that (with an additional column denoting an execution "isLatest") to quickly grab a subset of likely-nodes in an execution branch, and then process them to construct the tree of actual nodes. It's all kind of complicated because a third level stream could be the latest execution from a second level stream that's not the latest execution. A full closure table would be better and quicker, but that would require an extra table and a whole lotta entries which would just be bigger headache when it comes to locking a node for a transaction, because now you have lock one row in the main table, and a lot more rows in the closure table for an update.
I didn't design the system. It makes sense when it comes to science processing, especially when you have things where people want to reprocess data processed a few years ago with some new calibration or something. I know how I'd redesign it if I was starting from scratch though, but even then people often think of weird ways to use a system this flexible that can be a pain to work around.
BTW, if there a rule somewhere that all of us who get into grid and batch processing must write our own workflow management systems? We all seem to do it . . .