Tag Archives: Spark

From Big Data to Fast Data in Four Weeks or How Reactive Programming is Changing the World – Part 2


Part 2: Lambda Architecture meets reality

Part 1 can be found here.

Fast Data

Fast forward to December 2015. We have a cross data center Kafka clusters, we have Spark adoption through the roof. All of this, however, was to fix our traditional batch platform. I’m not going to pretend we never thought about real-time stuff. We’d been gearing up toward the Lambda architecture all along, but truly we were not working specifically for the sake of the near real-time analytics.

The beauty of our current stack and skill set is that streaming just comes with it. All we needed to do is to write a Spark Streaming app connected directly to our Kafka and feed the same Teradata repo with micro batches.


Most of the stuff just worked out of the box, with some caveats.
Spark streaming requires a different thinking about resource allocation on Hadoop, dynamic memory management on Spark cache helps a lot. Long running streaming jobs also require a different approach to monitoring and SLA measurements. Our operational ecosystem needed some upgrades. For example, we had to create our own monitoring scripts to parse Spark logs to determine whether a job is still alive.
The only real nasty surprise we got, was from the downstream Tableau infrastructure Product and Marketing people were using. The Tableau server could not refresh its dashboards as fast as we were updating the metrics in Teradata.

Lambda architecture in practice

Let’s get a closer look at our version of the Lambda architecture.

There are two main scenarios inherent in streaming analytical systems, which our Lambda architecture implementation helps to overcome:

Upstream delays in the pipe

We need to remember that asynchronous near real time analytical systems are pretty sensitive to traffic fluctuations. Conceptually our pipe looks like series of buffers spread out across network zones, data centers, and nodes. Some buffers are pushing, some pulling. In total, it is highly available and protected from traffic spikes. Still, fairly often communication between different buffers can be degraded or completely lost. We will not lose events, persistent buffers take care of that. The correctness of the metric calculation, however, will suffer because of the delays. Batch system, on the other hand, provides greater correctness, but we have to artificially delay metric calculation to account for potential delays in the upstream pipe.

We tell our internal customers that they can have “close enough” numbers almost immediately and exact numbers within hours.

We use Spark for both batch and streaming. Same code base. The streaming part runs on micro-batch schedule – i.e. seconds or minutes, the Batch part runs on macro-batch schedule – hours, days. Correspondingly, streaming code calculates short duration metrics like 1 minute totals, and batch code calculates longer duration metrics like 1 hour or 1 day totals, both batches consume the same raw events.

The chained export process adds the latest available calculations into SQL data store, regardless of which part has produced the output.

Imagine some metric, say “number of clicks per page”. Micro-batch will pull new events from Kafka every minute and push totals to SQL data store. User might query last hour totals – an aggregate of the last 60 minutes.  At the same time, one or two nodes has been taken off-line due to some network issue. Some thousands of events might be still buffered (persisted) at the local node storage. Then after 30 minutes, nodes are back on-line and transmitting whatever events left in the buffer. However, at the time when the user saw the “last hour” total, late coming events were not available. Batch, which is scheduled to run on prior 4 hours of data, will include delayed records and will re-calculate hourly totals correctly. Another batch, which is scheduled to run daily, will make sure that whatever might be missed by 4-hour batch is added etc.

Interruptions or pauses in streaming calculations

Another common issue with near real time calculations is availability of the calculating process itself. What happens when our micro-batch goes down for whatever reason? (Could be some systemic Hadoop failure, bug, or planned downtime).

Spark Streaming micro-batch is a long-running Yarn process. This means it will allocate a certain amount of resources and will keep those resources forever (or until it is killed). Keeping this in mind, the amount of resources allocated (number and size of the containers as scheduled by Yarn), must be fine-tuned to fit the volume of the micro-batch pretty closely. Ideally, the duration of the single run should be very close to the micro-batch iteration time to minimize idle time between iterations.

In our case, we usually fine tune the number and size of Spark executors by determining our peak volume and running micro-batch experimentally for a number of days. Normally we would also oversubscribe resources to allow for some fluctuations in traffic. Let’s say we’ve decided to run our micro-batch every 1 minute. Yarn queue, in which our streaming job will run, will have max size to fit up to 2-3 min worth of traffic in one run, just in case of a traffic spike or quick job restart. What it means is our micro-batch will pull at most 3 minutes’ worth of traffic from Kafka in a single iteration and it will spend no more than 1 minute processing it.

What happens when micro-batch is down for an hour for cluster maintenance? When micro-batch starts after long pause and tries to pull all the events since last iteration, there will be 20 times more data to process than we have resources allocated in the queue. (Total delay is 60 min / max expected for single iteration 3 minute = 20 times). If we try to process all the backlog using our small queue it will run forever and create even more backlog.

Our approach for streaming calculations is to skip forward to the last 3 min after any interruption. This will create gaps in 1 minute totals. Any rolled up totals in this case will be off.

For this scenario as well, macro-batches, totaling raw events independently, will create correct hourly/daily/weekly totals despite any gap in 1 minute totals.


Bottom line, combining streaming micro-batch calculations with traditional macro-batch calculations will cover all user needs from rough numbers for trending during the day to exact numbers for a weekly report.

What’s next?

There are still two reactive dominoes to tumble before we achieve the full reactive nirvana across our stack.

The most obvious step is to relocate user facing dashboards to a nimbler analytical platform. We had already been using Druid for a while and developed a Spark – Druid connector. Now we can push calculation results to users at whatever velocity we need.

Familiarity with FP and Scala and understanding of the Akka role in the success of Spark and Kafka, helped us to re-evaluate our event collector architecture as well. It is written using a traditional Java Servlets container and uses lots of complicated multithreading code to consume multiple streams of events asynchronously.

Reactive approach (specifically reactive Akka Streams) and Scala is exactly what we need to drastically simplify our Java code base and to wring every last ounce of scalability from our hardware.

Thanks to our friends at the PayPal framework team, Reactive programming is now becoming mainstream at PayPal: SQUBS a new reactive way for PayPal to build applications


Is my claim that Reactive Programming is changing the world too dramatic? The cost of scalability is a key factor to consider. Frameworks like Akka are democratizing one of the most complex, hard to test, and debug domains of software development. With a solid foundation, Sparks and Kafkas of the world are evolving much faster than the prior generation of similar tools. This means more time is being spent on business critical features than on fixing inevitable (in the past) concurrency bugs.

Consider our case.

In the past, real-time analytic systems (like CEP – complex event processing) were an extreme luxury, literally costing millions of dollars in license fees just to set it up. Companies would invest in those systems for very important core business reasons only. For PayPal or Credit Card companies it would be fraud detection. Product roll-outs or marketing campaigns however, would very rarely reach the threshold of such importance.
Today we get the streaming platform practically for free as a package deal.

So yes! Reactive programming is changing the world.

From Big Data to Fast Data in Four Weeks or How Reactive Programming is Changing the World – Part 1


Part 1: Reactive Manifesto’s Invisible Hand

Let me first setup the context for my story. I’ve been with PayPal for 5-years. I’m an architect. I’m part of the team responsible for PayPal Tracking domain. Tracking is commonly and historically understood as the measurement of customer visits to web pages. With the customer’s permission our platform collects all kinds of signals from PayPal web pages, mobile apps and services, for variety of reasons. Most prominent among them are measuring new product adoptions, A/B testing, and fraud analysis.

We collect several terabytes of data on our Hadoop systems every day. This is not an extremely huge amount, but it is big enough to be one of the primary motivations for adopting Hadoop at PayPal four years ago.


This particular story starts in the middle of December 2015, a week or so before holidays. I was pulled into a meeting to discuss a couple of new projects.

At the time, PayPal was feverishly preparing to release a brand spanking new mobile wallet app. It had been in development for a long time and promised a complete change in PayPal experience on smartphones. Lots of attention was being paid to this roll out from all levels of leadership – basically, a Big Deal!

Measuring new product adoption happens to be one of the main “raison d’etre” of the tracking platform, so following roll outs is nothing new for us. This time, however, we were ready to provide adoption metrics in near real time. By the launch date in the end of February, Product managers were able to report the new app adoption progress “as of 30 min ago” instead of the customary “as of yesterday”.

This was our first full production Fast Data application. You might be wondering why the title says four weeks, though from mid-December to the end of February it is ten weeks.

It did in fact take a small team – Sudhir R., Loga V., Shunmugavel C., Krishnaprasad K. – only four weeks to make it happen.

The first full volume live test happened during the Super Bowl, when PayPal aired its ad during the game. And small volume tests happened even earlier in January when the ad was first showcased in some news channels.

Just one year back there would have been no chance we could deliver on something like that in four weeks’ time, but in December 2015 we were on a roll. We had a production quality prototype running within two weeks and a full volume live test happened two weeks later. In another six weeks, PayPal’s largest release of the last several years was measured by this analytical workflow.

So what changed during a single year?
Allow me to get really nerdy for a second. On a fundamental level, what happened is that we embraced (at first without even realizing it) the Reactive Manifesto in two key components of our platform.

What has changed between December 2014 and December 2015

The original Tracking platform design followed common industry standard from 4-5 years ago. We collected data in real-time, buffered it in memory and small files, and finally uploaded consolidated files onto Hadoop cluster. At this point a variety of MapReduce jobs would crunch the data and publish the “analyst friendly” version to Teradata repository, from which our end users could query the data using SQL.

Sounds straightforward? It is anything but. There are several pitfalls and bottlenecks to consider.
First of all is data movement. We are a large company, with lots of services and lots of different types of processing distributed across multiple datacenters. On top of all of this we are also paranoid about security of anything and everything.

We ended up keeping a dedicated Hadoop cluster in a live environment for the sole purpose of collecting data. Then we would copy files to our main analytical Hadoop cluster in a different data center.


It worked, but … . All this waiting for big enough files and scheduling of file transfers added up to hours. We have tinkered with different tools, open source and commercial, looking for the goldilocks solution to address all the constraints and challenges. At the end of the day most of those tools failed to account for the reality on the ground.

Until, that is, we discovered Kafka. That was the first reactive domino to fall.

Kafka and moving data across data centers

There are three brilliant design decisions which made Kafka the indispensable tool it is today:

  • multi-datacenter topology out of the box;
  • short-circuiting internal data movement from network socket to OS file system cache and back to network socket;
  • sticking with strictly sequential I/O reads and writes

It fits our needs almost perfectly:

  • Throughput? Moving data between datacenters? Check.
  • Can configure topology to satisfy our complicated environment? Flexible persisted buffer between real-time stream and Hadoop? Check.
  • Price-performance? High availability? Check.


Challenges? Few.
The only real trouble we’ve had with Kafka itself is the level of maturity of its producer API and MirrorMaker. There are lots of knobs and dials to figure out. It changes from release to release. Some producer API behaviors are finicky. We had to invest some time into making sure we are not losing data between our client and Kafka broker when the broker goes down or connection is lost.

MirrorMaker requires some careful configuration to make sure we are not double compressing events. It took us some experimentation in a live environment to get to a proper capacity for the MirrorMaker instances. Even with all the hiccups, considering we were using pre-GA release open-source tool, it was a pretty rewarding and successful endeavor.

Most of our troubles came not from Kafka. Our network infrastructure and especially firewall capacities were not ready for the awesome throughput Kafka has unleashed. We were able to move very high volume of data at sub second latency between geographically removed data centers, but every so many hours the whole setup would slow down to a crawl.

It took us some time to understand that it was not a Kafka or MirrorMaker problem (as we naturally suspected at first). As it happened we were the first team to attempt that kind of volume of streaming traffic between our data centers. Prevailing traffic patterns at the time were all bulk oriented: file transfer, DB import/export etc.. Sustained real-time traffic has immediately uncovered bottlenecks and temporary saturations, which were hidden before. Good learning experience for us and our Network engineers.

Personally I was very impressed with Kafka capabilities and naturally wanted to know how it was written. That led me to Scala. Long story short, I had to rediscover functional programming and with it a totally different way of thinking about concurrency. That led me to Akka and Actors model – Hello Reactive Programming!

From MapReduce to Spark

Another major challenge that plagued us at the time was MapReduce inefficiency, both in physical sense and in a developer’s productivity sense. There are, literally, books written on MapReduce optimization. We know why it is inefficient and how to deal with it – in theory.

In practice however, developers tend to use MapReduce in the simplest way possible, splitting workflows into multiple digestible steps, saving intermediate results to HDFS, and incurring horrifying amount of I/O in the process.


MapReduce is just too low level and too dumb. Mixing complex business logic with MapReduce low level optimization techniques is asking too much. In architectural lingo: the buildability of such system is very problematic. Or in English: it takes way too long to code and test. Predictable results are tons of long running, brittle, hard to manage, and inefficient workflows.

Plus it is Java! Who in their right mind would use Java for analytics? Analytics is a synonym for SQL, right? We ended up writing convoluted pipes in MapReduce to cleanup and reorganize data and then dumped it to Teradata so somebody else can do agile analytical work using SQL, while we were super busy figuring out MapReduce. Pig and Hive helped to a degree, but only just.

We have tried to explore alternatives like “SQL on Hadoop” etc. Lots of cool tools, lots of vendors. The Hadoop ecosystem is rich, but hard to navigate. None has addressed our needs comprehensively. Goldilocks solution for BigData was out of reach it seemed.

Until, that is, we stumbled into Spark. And that was the second reactive domino to fall.

By now, it has become clear to us that using MapReduce is a dead-end. But Spark? Switching to Scala? What about the existing, hard earned skill set? People just started to get comfortable with MapReduce and Pig and Hive, it is very risky to switch. All valid arguments. Nevertheless, we jumped.

What makes Spark such a compelling system for the big data?
RDD (Resilient Distributed Dataset) concept has addressed most of the MapReduce inefficiency issues, both physical and productivity.

Resilient – resiliency is a key. The main reason behind MapReduce excessive I/O disorder is the need to preserve intermediate data to protect against node crashes. Spark solved the same with minimum I/O by keeping a history of transformations (functions) instead of actual transformed data chunks. Intermediate data can still be check pointed to HDFS if the process is very long and the developer can decide if a checkpoint is needed case by case. Once a dataset is read into cache, it can stay there as long as needed or as long as the cache size allows it. And again, the developer can control the behavior case by case.

Distributed Dataset- One thing that always bugged me in MapReduce is its inability to reason about my data as a dataset. Instead you are forced to think in single key-value pair, small chunk, block, split, or file. Coming from SQL, it felt like going backwards 20 years. Spark has solved this perfectly. A developer can now reason about his data as a Scala immutable collection (RDD API) or data frame (DataFrame API) or both interchangeably (DataSet API). This flexibility and abstraction from the distributed nature of the data leads to a huge productivity gains for developers.

And finally, no more Map + Reduce shackles. You can chain whatever functions you need, Spark will optimize your logic into DAG (dynamic acyclic graph – similar to how SQL is interpreted) and run it in one swoop keeping data in memory as much as physically possible.


“Minor” detail –  nobody knew Scala here. So to get all the efficiency and productivity benefits we had to train people in Scala and do it quickly.

It is a bit more than just a language though. Scala, at the first glance, has enough of a similarity to Java that any Java coder can fake it. (Plus, however cumbersome, Java can be used as Spark language and so can Python).

The real problem was to change the mindset of the Java MapReduce developer to a functional way of thinking. We needed a clean break from old Java/Python MR habits. What habits? Most of all mutability: Java beans, mutable collections and variables, loops etc. We were going to switch to Scala and do it right.

It has started slow. We have designed a curriculum for our Java MapReduce developers. We started with couple of two-hour sessions, one for Functional programming/Scala basics and one for Spark basics. After some trial and error the FP/Scala part was extended to a four hour learning by example session, while Spark barely needed two hours if at all. As it happens, once you understand the functional side of Scala, especially immutable collections, collection functions, and lambda expressions, Spark comes naturally.

It snowballed very quickly, within a couple of months we had the majority of our MapReduce engineers re-trained and sworn off MapReduce forever. Spark was in.

Spark, naturally, is written in Scala and Akka – 2:0 for team Reactive.

Stay tuned for Part 2: “Lambda Architecture meets reality” coming soon

Part 2 Preview

  • Fast Data stack
  • Micro and Macro batches
  • Invisible hand keeps on waving


Spark in Flames – Profiling Spark Applications Using Flame Graphs


When your organization runs multiple jobs on a Spark cluster, resource utilization becomes a priority. Ideally, computations receive sufficient resources to complete in an acceptable time and release resources for other work.

In order to make sure applications do not waste any resources, we want to profile their threads to try and spot any problematic code. Common profiling methods are difficult to apply to a distributed application running on a cluster.

This post suggests an approach to profiling Spark applications. The form of thread profiling used is sampling – capturing stack traces and aggregating these stack traces into meaningful data, in this case displayed as Flame Graphs. Flame graphs are graphs which show what percentage of CPU time is spent running which methods in a clear and user-friendly way as interactive SVG files (http://www.brendangregg.com/flamegraphs.html http://techblog.netflix.com/2015/07/java-in-flames.html). This solution is a derivation of the Hadoop jobs thread profiling solution described in a post by Ihor Bobak which relies on Etsy’s statsd-jvm-profiler.


Since Spark applications run on a cluster, and computation is split up into different executors (on different machines), each running their own processes, profiling such an application is trickier than profiling a simple application running on a single JVM. We need to capture stack traces on each executor’s process and collect them to a single place in order to compute Flame Graphs for our application.

To achieve this we specify the statsd-jvm-profiler java agent in the executor processes to capture stack traces. We configure the java agent to report to InfluxDB (a time-series database to centrally store all stack traces with timestamps). Next, we run a script which dumps the stack traces from InfluxDB to text files, which we then input to Flame Graphs SVG rendering script.

You can generate flame graphs for specific executors, or an aggregation of all executors.

As with other profiling methods, your application will incur slight performance overhead by running the profiling itself, so beware of constantly running your applications in production with the java agent.


Spark Application Profiling Overview


Below is an example of the result of the solution described. What you see is a Flame Graph, aggregated over stack traces taken from all executors running a simple Spark application, which contains a problematic method named inefficientMethod in its code. By studying the Flame Graph we see that the application spends 41.29% of its threads running this method. The second column in the graph, in which we find ineffecientMethod, is the application’s user code. The other columns are Spark native threads which appear in most of the stack traces so the focus of our profiling is on the column containing the user code. By focusing on that second column, we see that ineffecientMethod method takes up most of the user code’s CPU run time and the other parts of the user code are just a tiny sliver reaching the top of the Flame Graph.

Spark in Flames_Image 2


1. Download and install InfluxDB 1.0.0

1.0.0 is currently in beta, however 0.13 has concurrency bugs which cause it to fail (See: https://github.com/influxdata/influxdb/issues/6235)


Run it:

# linux
sudo service influxdb start

Access the DB using either the web UI (http://localhost:8083/) or shell:

# linux

Create a database to store the stack traces in:


Create a user:


2. Build/download statsd-jvm-profiler jar


Deploy the jar to the machines which are running the executor processes. One way to do this is to using spark-submit’s –jars attribute, which will deploy it to the executor.

--jars /path/to/statsd-jvm-profiler-2.1.0-jar-with-dependencies.jar

Specify the Java agent in your executor processes. This can be done, for example, by using spark-submit’s –conf attribute


3. Download influxdb_dump.py


Install all required python modules that influxdb_dump.py imports.

Run influxdb_dump.py to create text files with stack traces as input for flame graphs:


4. Download flamegraph.pl


Generate flame graphs using the text files you dumped from DB:

flamegraph.pl <INPUT_FILES> > <OUTPUT_FILES>


The following is an example of running and profiling a Spark application.

Submit spark application:

spark-submit \
--deploy-mode cluster \
--master yarn \
--class com.mycorporation.MySparkApplication \
--conf “spark.executor.extraJavaOptions=-javaagent:statsd-jvm-profiler-2.1.0-jar-with-dependencies.jar=server=influxdbhost.mycorporation.com,port=8086,reporter=InfluxDBReporter,database=profiler,username=profiler,password=profiler,prefix=MyNamespace.MySparkApplication,tagMapping=namespace.application” \
--name MySparkApplication \
-jars /path/to/profiling/statsd-jvm-profiler-2.1.0-jar-with-dependencies.jar \

Dump stack traces from DB:

influxdb_dump.py -o "influxdbhost.mycorporation.com" -u profiler -p profiler -d profiler -t namespace.application -e MyNamespace.MySparkApplication -x "my_stack_traces"

Generate Flame Graph (In this case for all executors):

flamegraph.pl my_stack_traces/all_*.txt > my_flame_graph.svg

Building Data Science at Scale


As part of the Boston-based Engineering group, the Data Science team’s charter is to enable science-based personalization and recommendation for PayPal’s global users. As companies of all sizes are starting to leverage their data assets, data science has become indispensable in creating relevant user experience. Helping fulfill PayPal’s mission to build the Web’s most convenient payment solution, the team works with various internal partners and strives to deliver best-in-class data science.

Technology Overview

At the backend of the data science platform reside large-scale machine learning engines that continuously learn and predict, from transactional, behavioral, and other datasets. An example of a question we might try to answer is: if someone just purchased a piece of software, does it increase his likelihood to purchase electronics in near future? It is no wonder that the answer lies in the huge amounts of transaction data, in that what people bought in the past is predictive of what they might consider buying next.

We leverage state-of-the-art machine learning technologies to make such predictions. Machine learning itself has been quickly evolving in recent years. New advances including large-scale matrix factorization, probabilistic behavioral models, and deep learning are no strangers to the team. To make things work at large scale, we leverage Apache Hadoop and its rich ecosystem of tools to process large amounts of data and build data pipelines that are part of the data science platform.

Tackling Data Science

Companies take different approaches in tackling data science. While some companies define a data scientist as someone who performs statistical modeling, we at PayPal Engineering have chosen to take a combined science & engineering approach. Our data scientists are “analytically-minded, statistically and mathematically sophisticated data engineers” [1]. There are of course more science-inclined, and more engineering-inclined individuals on the team, but there is much more of a blend of expertise than a marked distinction between these individuals. This approach to data science allows us to quickly iterate and operationalize high-performing predictive models at scale.

The Venn diagram below, which bears similarity to Conway’s diagram [2], displays the three cornerstones pivotal to the success of the team. Science, which entails machine learning, statistics and analytics, is the methodology by which we generate actionable predictions and insights from very-large datasets. The Engineering component, which includes Apache Hadoop and Spark, makes it possible for us to do science and analytics at scale and deliver results with quick turnaround. Last but not least, I cannot emphasize more the importance of understanding the business for a data scientist. None of the best work in this area that I know of is done in isolation. It is through understanding the problem domain that a data scientist may come up with a better predictive model among other results.


Scaling Data Science

There are multiple dimensions to scaling data science, which at a minimum involves the team, technology infrastructure, and the operating process. While each of these is worth thoughtful discussion in its own right, I will focus on the operating process since it is critical to how value is delivered. A typical process for the team could break down into these steps:

  • Identify business use case;
  • Understand business logic and KPIs;
  • Identify and capture datasets;
  • Proof of concept and back-test;
  • Operationalize predictive models;
  • Measure lift, optimize and iterate.

Let me explain some of the key elements. First, we see scaling data science as an ongoing collaboration with Product and Business teams. Understanding product and business KPIs and using data science to optimize the same is an essential ingredient of our day-to-day. Second, we follow the best practice in data science, whereby each predictive model is fully back-tested before operationalization. This practice guarantees the effectiveness of the data science platform. Third, the most powerful predictive modeling often requires iterative measurement and optimization. As a concrete example, putting this process into practice along with PayPal Media Network, we were able to achieve excellent results based on:

  • Lookalike modeling: Merchants can reach consumers who look like the merchant’s best existing customers.
  • Purchase intent modeling: Merchants can engage consumers who have a propensity to spend within specific categories.

While it is challenging to crunch the data and create tangible value, it is interesting and rewarding work. I hope to discuss more details of all the fun things we do as data scientists in the future.


[1] http://www.forbes.com/sites/danwoods/2011/10/11/emc-greenplums-steven-hillion-on-what-is-a-data-scientist

[2] http://drewconway.com/zia/2013/3/26/the-data-science-venn-diagram