Hacker News new | past | comments | ask | show | jobs | submit login
Using Elasticsearch as the Primary Data Store in an ETL Pipeline (vlkan.com)
110 points by kn7 on Nov 30, 2018 | hide | past | favorite | 41 comments

> The reason PostgreSQL results were omitted is no matter what kind of optimization we throw at it, the benchmark always took more than 2 hours, regardless of partitioning, whereas MongoDB and Elasticsearch took a couple of minutes.

This seems like something was done incorrectly the comparison shouldn't be that drastic.

> Just one PostgreSQL 9.6.10 instance (shared_buffers = 128MB)

This looks way too low. The postgresql docs say a good starting point for shared_buffers is 25% of the server's memory. In this case that would be 32GB.


They set out on a 4 year journey to improve their ETL, but didn't take 1 second to change a conservative global config default. Can barely take the rest of the article seriously after a blunder like that

See my response above, it was indeed a typo from my side. I am sorry to hear that it spoiled the rest of the post for you.

It was indeed an incorrect snippet -- removed it. We do have a group of PostgreSQL experts within the company and we let them tune the database for the benchmark. Let me remind, this was not a tune once, run once operation. We spent close to a month to make sure that we are not missing anything obvious for each storage engine. But as I explained, nothing much worked in case of PostgreSQL.

Either way, something still seems off. If MongoDB and Elasticsearch were 3-5x more performant I would still find that surprising. A > 10-20x difference really seems like a configuration or implementation issue.

I'm certain I'm missing something very obvious about Elasticsearch and other NoSQL data stores.

But in my brief experience with an Elasticsearch-backed web application I found it difficult to write integration tests for the portions of code that dealt with ES. Thanks to ES's "eventually consistent" nature, tests would fail intermittently because we'd be e.g. querying some data that was written to ES but hadn't been fully persisted yet. ES gives you some "flush the data to disk right now, please" commands but they're merely suggestions and cannot be relied upon.

Obviously, you want to stub/mock out as many of those actual physical database reads/writes as possible, but sometimes what you want to test is the ES queries themselves and I don't know what on earth the best practice is there.

Just to reiterate, this is an issue that any non-ACID datastore would experience. I'm not criticizing them, I'm just sort of wondering how people typically solve that...

I use `refresh=True` on my insert/update/delete, which forces the writes to complete. Then all the reads work as you would expect.

That still doesn't guarantee that a consecutive read is gonna get the last state.

Direct from ES documentation on the `refresh` flag: "Refresh the relevant primary and replica shards (not the whole index) immediately after the operation occurs, so that the updated document appears in search results immediately."

Do you have other information about the refresh flag because their documentation clearly states that forcing a refresh is applied to primary and replica shards meaning that it will be available for query directly after the call to refresh is made.

My experience was that this was not reliable. It was nearly always true, but not always, and tests would sporadically fail some small percentage of the time.

However, this was back in ~2015 and Elasticsearch 1.3 or something like this, which is of course a now-ancient version. Perhaps things are different now.

edit: Perhaps we were using the refresh command and not the refresh flag. It was a few years ago and I don't have access to the code any more, and my memory may be failing here. If the refresh flag works as advertised (enforces an index update and guarantees a consistent view of the data for the next query, which the command did not seem to) then that of course solves my initial problem W.R.T. writing tests.

Yes it does. I'm not sure what you think the refresh operation is if not that.

I've run countless integration tests with ES and never seen something fail due to refresh not working as advertised. If you have, what version of ES was it? Can you give some sample code that sporadically exhibits the problem?

We generally use index refresh in ITs (running ES in Docker) and it fails occasionally, which I believe the case described here: "The (near) real-time capabilities depend on the index engine used." https://www.elastic.co/guide/en/elasticsearch/reference/curr...

That seems to be the issue then. The refresh flag should be passed in your insert/update/delete operations.

The refresh command can also be called (which is what you're doing) but this is a different operation and just triggers the index build with no guarantees that it finishes or is consistent with any particular data mutation.

Did you read the previously posted documentation for the refresh flag?

ES comes in handy with very large sets of data. If it fits on one (or a handful) of nodes, you're probably better served by a relational database.

When you try to scale something to large sizes AND want high availability, it's pretty much a given you'll be dealing with eventual consistency.

We use ES to ingest billions of records per day. For us, being able to immediately query a row that was just added is less important than being able to deal with the volume in relatively predictable performances.

It's not a given. Plenty of distributed databases support strong consistency.

ES is not meant to be an OLTP database. It's a search index with a much better wrapper around Lucene, but the distributed part has always been weak. The last several years of updates have primarily been around fixing the home-grown replication and storage.

Elasticsearch has a "refresh" query flag to force immediate consistency: https://www.elastic.co/guide/en/elasticsearch/reference/curr...

That still doesn't guarantee that a consecutive read is gonna get the last state. Welcome to the wonderful world of "eventual consistency".

The "refresh=wait_for" [1] index setting does guarantee that a subsequent read will get the data. It causes all shards to refresh:

    Refresh the relevant primary and replica shards
    (not the whole index) immediately after the operation
    occurs, so that the updated document appears in search
    results immediately
There's also the "wait_for_active_shards=<n>" setting, which merely asks to wait until n shards have written the changes.

[1] https://www.elastic.co/guide/en/elasticsearch/reference/curr...

How so? That's the entire point of the feature. What else is there to update?

It guarantees that all replications will report state in sync with each other on search, not that the last reported state is the actual current state of the index.

As the numerous comments here and the documentation states, the refresh flag on your insert/update will ensure that changed data in that request is consistent for queries after.

Where did you get the behavior you described? Are you sure you're not confusing this for the separate refresh command itself? That is not attached to any particular insert/update.

That is indeed the case and we are also bitten by that. The most effective work around we managed to find is to flush periodically (say every 300ms) for a certain timeout period before reading from ES again for checks. Though even then, ITs still fail time to time.

Well, with C# and Linq with Mongo (you did day “other NoSQL” data stores) you can mock out the provider and test your Linq queries by substituting the IMongoQueryable<T> with a List<T>. ElasticLinq is a thing but I’ve heard mixed things about it.

If Elasticsearch is at the end of an ETL pipeline, does that mean that if Elasticsearch gets corrupted you can rebuild it by re-running the pipeline?

If so I wouldn't call this a "primary data store", since durability isn't critical.

The article says:

> After drafting many blueprints, we went for a Java service backed by Elasticsearch as the primary storage! This idea brought shivers to even the most senior Elasticsearch consultants hired

I'll shiver if Elasticsearch corrupting irreversibly loses data, but if it can be rebuilt from another source I don't see any problems with it at all.


We’ve been running large Elasticsearch clusters as our primary search/analytics engine. While it’s overall very stable, stuff does occasionally happen that requires an index rebuild. We use HBase as our primary store and index via map/reduce or Spark Batch.

As much as I love Elasticsearch, I definitely wouldn’t be able to sleep at night knowing it was the primary datastore.

We store the real-time content stream in a separate bulk storage unit (e.g., BigQuery) with a certain retention window, but the ETL'ed documents are always on ES. Given a plain event (i.e., not ETL'ed document) is not much of a value for search, I would not call the stream storage as the primary storage. It just assists us to re-build the ETL state in case of an emergency.

> "Enough blaming the former engineer."

I was one of them. I don't work there anymore.

I believe is it not the actual situation in bol.com. If it is, I would be disappointed.

Last I remember, Bol.com has really good set of ops and dev tooling on hadoop, hbase, spark, flink etc. for scheduling, running jobs etc.

I wouldn't know why they replicated data both on hbase, elastic search etc. Having read the blog, I don't see how this fits the event sourcing pattern that bol.com was trying to implement and also, the idea of self service BI that they envisioned.

Hey Debarsh! First, thanks for taking time to read such a lengthy post.

If I am not mistaken the majority of the PL/SQL glue is owned by Gert, though you might recall better. Quite some VCS history was lost while migrating from SVN to Git. ;-)

The reason we are "replicating" the entire data is to 1) determine the affected products and 2) re-execute the relevant configurations (facets, synonyms, etc.) while making retroactive changes. (For instance, say someone has changed the PL/SQL of "leeftijd" facet.) Here, the storage is required to allow querying on every field, for (1), and on id, for (2). While id-based bulk querying is (almost) supported by every ETL source, querying on every field is not. Hence, we "replicate" the sources on our side to suffice these needs. Actually, the entire point of the post was to explain this problem, but apparently it was not clear enough.

For your remarks on event sourcing and BI, I am a little bit puzzled. I will need some elaboration on these remarks. We do have event sourcing on our side (that is how we can replay in case of need) and BI is not really interested in ETL data. Maybe I misunderstood you?

I am also confused by how you relate scheduling/running PL/SQL jobs via Hadoop, Spark, Flink, etc. Did you see the link to Redwood Explorer I shared in the post?

I am not Debarsh, and I am not a data engineer, but isn't the purpose of ETL for transforming data into a more accessible/palatable form for BI?

Bol has plenty of other ETL pipelines for BI. What I meant is the data cooked for search is not (much) of interest to BI, yet. Though we do have other means to feed BI for search-relevant content.

To all fairness, you are right about oracle stuff ingrained in bol.com, however, I am not sure if I should go in detail, but the whole thing used to be like - Maintain event states with "versions" table and then run hadoop, spark jobs on them, and snapshot the latest computed state to oracle so that they could run BI on it.

But I understand now what you actually mean. I wouldn't call it ETL, as ETL is more about prepping the data for BI and not cooking data for search.

yea, I remember they used to have redwood for scheduling PL/SQL queries but I think majority of ETL jobs for BI were in hadoop/spark/flink.

Having said all these, I think it is quite some neat and cool engineering work, I hope you guys are successful implementing the solution.

> BI is not really interested in ETL data

Isn't ETL an intermediary in BI? I think I am a bit confused, to give some context, this is my understanding, you have all the services generating data, you have ETL jobs, that extract data from these services, transform and move the data to a star or snowflake schema in RDBMS prepared for BI tools for query efficiently.

Any feedback on how this went? The even sourcing pattern I mean.

ES is currently the main data store for zagat.com which ends up being a sink from a data pipeline and more or less being used as Key-Value store on the query side. It has worked OK for our current use case, but definitely came with some pain points. Primary key fetches were way too slow for what we needed especially with some in-memory joins happening and we ended up sticking a cache in front of ES to satisfy our performance reqs.

We had a tight deadline on implementation (3 months to extract from Google) and chose ES in order to satisfy a kv store as well as TF-IDF corpus search.

If it's a sink from a data pipeline then presumably it's not your primary point-of-truth data store because if Elasticsearch gets corrupted you can rebuild it from the rest of the pipeline?

Yep, it's a bit more symbiotic than that unfortunately, but in general most of the data can be restored from an upstream source.

Is elasticsearch really the primary data store if they can replay events to reconstruct the data in elasticsearch?

It sounds like the primary data store would be the stream of events.

... If your primary data store should be Elasticsearch. Nice write up of everything, though, for sure.

Would love to do/see similar benchmarks on Postgres using more performant setups. Like someone else said, the disparity is downright shocking.

Please see my reply to "someone else" you mentioned. If there is anything else you think I might have mistaken to employ during benchmarks, I am all ears.

Yeah, I saw your reply. To be honest, I was mostly fishing for the possibility of finding a dataset with which I could perform the test myself.

Guidelines | FAQ | Lists | API | Security | Legal | Apply to YC | Contact