Four fails and a win at a big data stack for realtime analytics
Building a user-friendly app to analyze big data in real time (that is, keeping response times below 60 seconds) is a challenge. In the big data world, you’re either doing batch analytics where nobody really cares about query time (most businesses); or you’re doing streaming (Uber, Facebook and kin) where query time is critical, but data is only big on aggregate — each user only sees or uses a tiny bit and there are batch jobs going on in the background.
I work on a web app that enables non-data scientist users to construct and visualize complex analyses of geographic data in realtime, and as our data and users and the complexity of our offering has grown, we’ve had to modify the stack to meet performance criteria. Our problem seems like it should be a common one, but guidance is oddly slim on doing intensive read-only computation on large amounts of data within a user friendly time frame.
A year ago, I was no expert in the big data space, which comes with enough of its own history of tools, jargon, and opinion that it’s difficult to know where to begin. I don’t have all the answers yet, but I’ve figured out enough to tentatively and humbly speed others down a path towards a stack that’s working for us.
Fail 1: Naive Postgres
First, I’d say don’t modify your stack until you have to. This is probably the point at which data should be considered big — when your existing infrastructure is struggling to cope. We operated happily doing everything in SQL for a long time. Postgres is an amazing database, and my experience with it has been that, with proper indexing and well constructed queries, tables of up to about 80 million rows can be aggregated and queried in realtime.
(Although I mentioned being in the geospatial arena, in this article I’m not talking about geographic computations such as with PostGIS. They are very computationally expensive, but we do much of that work during the initial ETL/data ingestion process.)
Unfortunately, it can be rather painful to carry out all the necessary DB maintenance, figure out the proper indexes, and write and rewrite queries. You also have to get the data stored just right, in neatly normalized rows and columns.
Fail 2: Denormalization
Or do you? In general it’s good to keep SQL data normalized and have a single source of truth, but complex and frequent table joins were killing performance. As we could no longer meet users’ needs, we had to move on from dumb queries and denormalize some of our bigger tables. So, for example,
This needs us to do a bit more in the ETL step, and ensure good data governance, but it improved speed a lot. To offset the additional time whilst the team size remained constant, we moved from self-hosting Postgres to AWS Aurora Postgres. Performance didn’t change, but it did take away some database administration headaches.
Fail 3: Aurora
Except that it also created new performance headaches. One of the biggest features of our app is that it can score and cluster, according to user-specified criteria, tiny hexagon-shaped areas of cities. One city might consist of half a million hexes; for each hex the user might want to rate it on the median incomes and psychographic profiles of its nighttime dwellers and daytime workers, as well as how far it is from the nearest Starbucks. This requires a lot of data and computation.
Moving to Aurora cost us the ability to keep data in the memory cache of our own machine. As different users try to score different cities, sequentially or even worse simultaneously, data was being pulled into (presumably from S3) and ejected from our virtual machine. This data load time, like some kind of page thrashing, vastly outweighed the computation time: the end result was that a city scoring which took 3 minutes when the data is available — itself not a fantastic result for something intended to be realtime — mushroomed to ten minutes or just timed out.
Now from what I can tell in the big data world that I haven’t been part of for too long, is that the current “nobody got fired for buying IBM” is Apache Spark. I diligently set about creating a cluster of Spark machines to see if it could help.
Since we were now dealing with tables that had hundreds of millions of rows, and Postgres was creaking, partitioning that data and dividing computation over multiple nodes must be the way forward if our app is to scale.
Minor Win 1: A Cluster
Making a cluster on AWS turned out to not be that hard thanks to Kubernetes and Kops.
At this point you’ll get a bunch of new AWS resources including instances for the cluster master and the computation nodes. Docker containers can be deployed into those instances and thanks to Kubernetes magic they will be able to communicate with each other, with other machines on the same VPC, and if you choose with the outside world via exposed load balancers.
With Helm, I was able to quickly get a bunch of Spark nodes up and running, together with some Hadoop nodes for their HDFS file storage.
Fail 4: Spark
I found Spark hard to administer, difficult to work with (it’s all Java and is designed to work with Java applications, despite what the promise of PySpark may have you believe), and overall just not that fast.
I’m sure Spark can function in some people’s workflows for batch processing, and I’m aware it has streaming extensions too. However, it takes several minutes to spin up the machinery for even a simple job, rendering it useless for a realtime use case.
Additionally, HDFS was not fast and seems kind of irrelevant now we have S3.
Besides, if you come from the Python ecosystem as I do, it’s impossible not to miss the fantastic array of tools we have at our disposal. If you’re that person and you’re starting or evolving a big data project today, I learned there’s a much better way.
Win: Distributed Pandas with Dask
Pandas is about as fast as it gets for working with numeric data. Array computations happen at native speed (thanks to Cython) and often leverage vector instructions (thanks NumPy).
Despite this, I’d always thought Pandas was really for the ETL and analytic parts of working with data; more of a data science tool than actually a library that can be deployed into production. I was wrong:
pandas has been used extensively in production in financial applications Dask, a distributed computation library made by the same people behind Anaconda, partitions large dataframes over different nodes, schedules parallel computations on those nodes (building a graph and lazily deferring computation til the last minute, like Spark does), and gathers results and manages the distributed data. It’s integrated with and largely interoperable with Pandas, so a joy to work with.
(Not 100%; some things like e.g. indexing a dataframe on multiple columns don’t make sense with a partitioned dataframe).
And it’s fast: each node leverages Pandas and its Cythonized, vectorized code. Distributed computing isn’t free; it takes a bit of time and network latency for Dask to build the graph and schedule each node’s computations, as well as to combine/reduce data at the end. Basic Pandas would be faster, if it could fit the whole dataframe into memory — and it could, if we scaled up our base machine, but that’s not going to last forever: we’d still need to horizontally scale our platform at some point.
Coding with Dask is largely familiar. For example, here’s a replacement for the SQL
SELECT MAX(col + col2) FROM table WHERE zip_code IN ('94105', '94106') AND day_of_week IN (1,2,3)
from dask.distributed import Client import dask.dataframe as dd c = Client('172.x.x.x:8786') df = dd.read_parquet('s3://my-bucket/mydatafile.parquet') df = c.persist(df) subset = df[df["zip_code"].isin(['94105', '94106']) & df["day_of_week"].isin([1,2,3])] df["sum"] = df["col1"] + df["col2"] print(df["sum"].max().compute())
It was again fairly easy to deploy Dask into the Kubernetes cluster I’d made earlier: they have their own Helm charts documented here. Our infrastructure now looks like this:
- Monolithic Node.js backend
- Which calls Python microservices (built with Flask and integrating a task queue with SQS)
- That connect to the cluster and instruct the nodes to
- Download their portions of data from S3 (which is very fast using pre-partioned Parquet files as reads from each node happen in parallel)
- Process its data with Dask/Pandas/NumPy
- And return a JSON response to the backend, which forwards it on to the client webapp
The analytics that took several minutes in Postgres, and often much longer if data needed to be read in from persistent storage, now returns to the client in under 30 seconds.
If you have the money to spend, and especially if you are not still proving out product/market fit, go for a vendor solution. A fast, column-oriented in-memory SQL database would not have required much change to our process and code.
Otherwise, for processing big data in realtime as part of a SaaS, I do recommend looking to see if Dask could meet your needs: it’s fast, it scales horizontally, it lets you write code in the same way using the same libraries you’re used to, and it’s being used live in production today (*well, by us at least).
Obviously, I think Dask kills Spark, but the author of Dask does give a more nuanced view here if you’re interested.