Tag Archives: Fast Data

From Big Data to Fast Data in Four Weeks or How Reactive Programming is Changing the World – Part 2

By

Part 2: Lambda Architecture meets reality

Part 1 can be found here.

Fast Data

Fast forward to December 2015. We have a cross data center Kafka clusters, we have Spark adoption through the roof. All of this, however, was to fix our traditional batch platform. I’m not going to pretend we never thought about real-time stuff. We’d been gearing up toward the Lambda architecture all along, but truly we were not working specifically for the sake of the near real-time analytics.

The beauty of our current stack and skill set is that streaming just comes with it. All we needed to do is to write a Spark Streaming app connected directly to our Kafka and feed the same Teradata repo with micro batches.

fpti-spark-stream

Most of the stuff just worked out of the box, with some caveats.
Spark streaming requires a different thinking about resource allocation on Hadoop, dynamic memory management on Spark cache helps a lot. Long running streaming jobs also require a different approach to monitoring and SLA measurements. Our operational ecosystem needed some upgrades. For example, we had to create our own monitoring scripts to parse Spark logs to determine whether a job is still alive.
The only real nasty surprise we got, was from the downstream Tableau infrastructure Product and Marketing people were using. The Tableau server could not refresh its dashboards as fast as we were updating the metrics in Teradata.

Lambda architecture in practice

Let’s get a closer look at our version of the Lambda architecture.

There are two main scenarios inherent in streaming analytical systems, which our Lambda architecture implementation helps to overcome:

Upstream delays in the pipe

We need to remember that asynchronous near real time analytical systems are pretty sensitive to traffic fluctuations. Conceptually our pipe looks like series of buffers spread out across network zones, data centers, and nodes. Some buffers are pushing, some pulling. In total, it is highly available and protected from traffic spikes. Still, fairly often communication between different buffers can be degraded or completely lost. We will not lose events, persistent buffers take care of that. The correctness of the metric calculation, however, will suffer because of the delays. Batch system, on the other hand, provides greater correctness, but we have to artificially delay metric calculation to account for potential delays in the upstream pipe.

We tell our internal customers that they can have “close enough” numbers almost immediately and exact numbers within hours.

We use Spark for both batch and streaming. Same code base. The streaming part runs on micro-batch schedule – i.e. seconds or minutes, the Batch part runs on macro-batch schedule – hours, days. Correspondingly, streaming code calculates short duration metrics like 1 minute totals, and batch code calculates longer duration metrics like 1 hour or 1 day totals, both batches consume the same raw events.

The chained export process adds the latest available calculations into SQL data store, regardless of which part has produced the output.

Imagine some metric, say “number of clicks per page”. Micro-batch will pull new events from Kafka every minute and push totals to SQL data store. User might query last hour totals – an aggregate of the last 60 minutes.  At the same time, one or two nodes has been taken off-line due to some network issue. Some thousands of events might be still buffered (persisted) at the local node storage. Then after 30 minutes, nodes are back on-line and transmitting whatever events left in the buffer. However, at the time when the user saw the “last hour” total, late coming events were not available. Batch, which is scheduled to run on prior 4 hours of data, will include delayed records and will re-calculate hourly totals correctly. Another batch, which is scheduled to run daily, will make sure that whatever might be missed by 4-hour batch is added etc.

Interruptions or pauses in streaming calculations

Another common issue with near real time calculations is availability of the calculating process itself. What happens when our micro-batch goes down for whatever reason? (Could be some systemic Hadoop failure, bug, or planned downtime).

Spark Streaming micro-batch is a long-running Yarn process. This means it will allocate a certain amount of resources and will keep those resources forever (or until it is killed). Keeping this in mind, the amount of resources allocated (number and size of the containers as scheduled by Yarn), must be fine-tuned to fit the volume of the micro-batch pretty closely. Ideally, the duration of the single run should be very close to the micro-batch iteration time to minimize idle time between iterations.

In our case, we usually fine tune the number and size of Spark executors by determining our peak volume and running micro-batch experimentally for a number of days. Normally we would also oversubscribe resources to allow for some fluctuations in traffic. Let’s say we’ve decided to run our micro-batch every 1 minute. Yarn queue, in which our streaming job will run, will have max size to fit up to 2-3 min worth of traffic in one run, just in case of a traffic spike or quick job restart. What it means is our micro-batch will pull at most 3 minutes’ worth of traffic from Kafka in a single iteration and it will spend no more than 1 minute processing it.

What happens when micro-batch is down for an hour for cluster maintenance? When micro-batch starts after long pause and tries to pull all the events since last iteration, there will be 20 times more data to process than we have resources allocated in the queue. (Total delay is 60 min / max expected for single iteration 3 minute = 20 times). If we try to process all the backlog using our small queue it will run forever and create even more backlog.

Our approach for streaming calculations is to skip forward to the last 3 min after any interruption. This will create gaps in 1 minute totals. Any rolled up totals in this case will be off.

For this scenario as well, macro-batches, totaling raw events independently, will create correct hourly/daily/weekly totals despite any gap in 1 minute totals.

 

Bottom line, combining streaming micro-batch calculations with traditional macro-batch calculations will cover all user needs from rough numbers for trending during the day to exact numbers for a weekly report.

What’s next?

There are still two reactive dominoes to tumble before we achieve the full reactive nirvana across our stack.

The most obvious step is to relocate user facing dashboards to a nimbler analytical platform. We had already been using Druid for a while and developed a Spark – Druid connector. Now we can push calculation results to users at whatever velocity we need.

Familiarity with FP and Scala and understanding of the Akka role in the success of Spark and Kafka, helped us to re-evaluate our event collector architecture as well. It is written using a traditional Java Servlets container and uses lots of complicated multithreading code to consume multiple streams of events asynchronously.

Reactive approach (specifically reactive Akka Streams) and Scala is exactly what we need to drastically simplify our Java code base and to wring every last ounce of scalability from our hardware.

Thanks to our friends at the PayPal framework team, Reactive programming is now becoming mainstream at PayPal: SQUBS a new reactive way for PayPal to build applications

Conclusion

Is my claim that Reactive Programming is changing the world too dramatic? The cost of scalability is a key factor to consider. Frameworks like Akka are democratizing one of the most complex, hard to test, and debug domains of software development. With a solid foundation, Sparks and Kafkas of the world are evolving much faster than the prior generation of similar tools. This means more time is being spent on business critical features than on fixing inevitable (in the past) concurrency bugs.

Consider our case.

In the past, real-time analytic systems (like CEP – complex event processing) were an extreme luxury, literally costing millions of dollars in license fees just to set it up. Companies would invest in those systems for very important core business reasons only. For PayPal or Credit Card companies it would be fraud detection. Product roll-outs or marketing campaigns however, would very rarely reach the threshold of such importance.
Today we get the streaming platform practically for free as a package deal.

So yes! Reactive programming is changing the world.

From Big Data to Fast Data in Four Weeks or How Reactive Programming is Changing the World – Part 1

By

Part 1: Reactive Manifesto’s Invisible Hand

Let me first setup the context for my story. I’ve been with PayPal for 5-years. I’m an architect. I’m part of the team responsible for PayPal Tracking domain. Tracking is commonly and historically understood as the measurement of customer visits to web pages. With the customer’s permission our platform collects all kinds of signals from PayPal web pages, mobile apps and services, for variety of reasons. Most prominent among them are measuring new product adoptions, A/B testing, and fraud analysis.

We collect several terabytes of data on our Hadoop systems every day. This is not an extremely huge amount, but it is big enough to be one of the primary motivations for adopting Hadoop at PayPal four years ago.

Story

This particular story starts in the middle of December 2015, a week or so before holidays. I was pulled into a meeting to discuss a couple of new projects.

At the time, PayPal was feverishly preparing to release a brand spanking new mobile wallet app. It had been in development for a long time and promised a complete change in PayPal experience on smartphones. Lots of attention was being paid to this roll out from all levels of leadership – basically, a Big Deal!

Measuring new product adoption happens to be one of the main “raison d’etre” of the tracking platform, so following roll outs is nothing new for us. This time, however, we were ready to provide adoption metrics in near real time. By the launch date in the end of February, Product managers were able to report the new app adoption progress “as of 30 min ago” instead of the customary “as of yesterday”.

This was our first full production Fast Data application. You might be wondering why the title says four weeks, though from mid-December to the end of February it is ten weeks.

It did in fact take a small team – Sudhir R., Loga V., Shunmugavel C., Krishnaprasad K. – only four weeks to make it happen.

The first full volume live test happened during the Super Bowl, when PayPal aired its ad during the game. And small volume tests happened even earlier in January when the ad was first showcased in some news channels.

Just one year back there would have been no chance we could deliver on something like that in four weeks’ time, but in December 2015 we were on a roll. We had a production quality prototype running within two weeks and a full volume live test happened two weeks later. In another six weeks, PayPal’s largest release of the last several years was measured by this analytical workflow.

So what changed during a single year?
Allow me to get really nerdy for a second. On a fundamental level, what happened is that we embraced (at first without even realizing it) the Reactive Manifesto in two key components of our platform.

What has changed between December 2014 and December 2015

The original Tracking platform design followed common industry standard from 4-5 years ago. We collected data in real-time, buffered it in memory and small files, and finally uploaded consolidated files onto Hadoop cluster. At this point a variety of MapReduce jobs would crunch the data and publish the “analyst friendly” version to Teradata repository, from which our end users could query the data using SQL.

Sounds straightforward? It is anything but. There are several pitfalls and bottlenecks to consider.
First of all is data movement. We are a large company, with lots of services and lots of different types of processing distributed across multiple datacenters. On top of all of this we are also paranoid about security of anything and everything.

We ended up keeping a dedicated Hadoop cluster in a live environment for the sole purpose of collecting data. Then we would copy files to our main analytical Hadoop cluster in a different data center.

fpti-etl

It worked, but … . All this waiting for big enough files and scheduling of file transfers added up to hours. We have tinkered with different tools, open source and commercial, looking for the goldilocks solution to address all the constraints and challenges. At the end of the day most of those tools failed to account for the reality on the ground.

Until, that is, we discovered Kafka. That was the first reactive domino to fall.

Kafka and moving data across data centers

There are three brilliant design decisions which made Kafka the indispensable tool it is today:

  • multi-datacenter topology out of the box;
  • short-circuiting internal data movement from network socket to OS file system cache and back to network socket;
  • sticking with strictly sequential I/O reads and writes

It fits our needs almost perfectly:

  • Throughput? Moving data between datacenters? Check.
  • Can configure topology to satisfy our complicated environment? Flexible persisted buffer between real-time stream and Hadoop? Check.
  • Price-performance? High availability? Check.

fpti-kafka

Challenges? Few.
The only real trouble we’ve had with Kafka itself is the level of maturity of its producer API and MirrorMaker. There are lots of knobs and dials to figure out. It changes from release to release. Some producer API behaviors are finicky. We had to invest some time into making sure we are not losing data between our client and Kafka broker when the broker goes down or connection is lost.

MirrorMaker requires some careful configuration to make sure we are not double compressing events. It took us some experimentation in a live environment to get to a proper capacity for the MirrorMaker instances. Even with all the hiccups, considering we were using pre-GA release open-source tool, it was a pretty rewarding and successful endeavor.

Most of our troubles came not from Kafka. Our network infrastructure and especially firewall capacities were not ready for the awesome throughput Kafka has unleashed. We were able to move very high volume of data at sub second latency between geographically removed data centers, but every so many hours the whole setup would slow down to a crawl.

It took us some time to understand that it was not a Kafka or MirrorMaker problem (as we naturally suspected at first). As it happened we were the first team to attempt that kind of volume of streaming traffic between our data centers. Prevailing traffic patterns at the time were all bulk oriented: file transfer, DB import/export etc.. Sustained real-time traffic has immediately uncovered bottlenecks and temporary saturations, which were hidden before. Good learning experience for us and our Network engineers.

Personally I was very impressed with Kafka capabilities and naturally wanted to know how it was written. That led me to Scala. Long story short, I had to rediscover functional programming and with it a totally different way of thinking about concurrency. That led me to Akka and Actors model – Hello Reactive Programming!

From MapReduce to Spark

Another major challenge that plagued us at the time was MapReduce inefficiency, both in physical sense and in a developer’s productivity sense. There are, literally, books written on MapReduce optimization. We know why it is inefficient and how to deal with it – in theory.

In practice however, developers tend to use MapReduce in the simplest way possible, splitting workflows into multiple digestible steps, saving intermediate results to HDFS, and incurring horrifying amount of I/O in the process.

fpti-mr

MapReduce is just too low level and too dumb. Mixing complex business logic with MapReduce low level optimization techniques is asking too much. In architectural lingo: the buildability of such system is very problematic. Or in English: it takes way too long to code and test. Predictable results are tons of long running, brittle, hard to manage, and inefficient workflows.

Plus it is Java! Who in their right mind would use Java for analytics? Analytics is a synonym for SQL, right? We ended up writing convoluted pipes in MapReduce to cleanup and reorganize data and then dumped it to Teradata so somebody else can do agile analytical work using SQL, while we were super busy figuring out MapReduce. Pig and Hive helped to a degree, but only just.

We have tried to explore alternatives like “SQL on Hadoop” etc. Lots of cool tools, lots of vendors. The Hadoop ecosystem is rich, but hard to navigate. None has addressed our needs comprehensively. Goldilocks solution for BigData was out of reach it seemed.

Until, that is, we stumbled into Spark. And that was the second reactive domino to fall.

By now, it has become clear to us that using MapReduce is a dead-end. But Spark? Switching to Scala? What about the existing, hard earned skill set? People just started to get comfortable with MapReduce and Pig and Hive, it is very risky to switch. All valid arguments. Nevertheless, we jumped.

What makes Spark such a compelling system for the big data?
RDD (Resilient Distributed Dataset) concept has addressed most of the MapReduce inefficiency issues, both physical and productivity.

Resilient – resiliency is a key. The main reason behind MapReduce excessive I/O disorder is the need to preserve intermediate data to protect against node crashes. Spark solved the same with minimum I/O by keeping a history of transformations (functions) instead of actual transformed data chunks. Intermediate data can still be check pointed to HDFS if the process is very long and the developer can decide if a checkpoint is needed case by case. Once a dataset is read into cache, it can stay there as long as needed or as long as the cache size allows it. And again, the developer can control the behavior case by case.

Distributed Dataset- One thing that always bugged me in MapReduce is its inability to reason about my data as a dataset. Instead you are forced to think in single key-value pair, small chunk, block, split, or file. Coming from SQL, it felt like going backwards 20 years. Spark has solved this perfectly. A developer can now reason about his data as a Scala immutable collection (RDD API) or data frame (DataFrame API) or both interchangeably (DataSet API). This flexibility and abstraction from the distributed nature of the data leads to a huge productivity gains for developers.

And finally, no more Map + Reduce shackles. You can chain whatever functions you need, Spark will optimize your logic into DAG (dynamic acyclic graph – similar to how SQL is interpreted) and run it in one swoop keeping data in memory as much as physically possible.

fpti-spark-batch

“Minor” detail –  nobody knew Scala here. So to get all the efficiency and productivity benefits we had to train people in Scala and do it quickly.

It is a bit more than just a language though. Scala, at the first glance, has enough of a similarity to Java that any Java coder can fake it. (Plus, however cumbersome, Java can be used as Spark language and so can Python).

The real problem was to change the mindset of the Java MapReduce developer to a functional way of thinking. We needed a clean break from old Java/Python MR habits. What habits? Most of all mutability: Java beans, mutable collections and variables, loops etc. We were going to switch to Scala and do it right.

It has started slow. We have designed a curriculum for our Java MapReduce developers. We started with couple of two-hour sessions, one for Functional programming/Scala basics and one for Spark basics. After some trial and error the FP/Scala part was extended to a four hour learning by example session, while Spark barely needed two hours if at all. As it happens, once you understand the functional side of Scala, especially immutable collections, collection functions, and lambda expressions, Spark comes naturally.

It snowballed very quickly, within a couple of months we had the majority of our MapReduce engineers re-trained and sworn off MapReduce forever. Spark was in.

Spark, naturally, is written in Scala and Akka – 2:0 for team Reactive.

Stay tuned for Part 2: “Lambda Architecture meets reality” coming soon

Part 2 Preview

  • Fast Data stack
  • Micro and Macro batches
  • Invisible hand keeps on waving