Tag Archives: Hadoop

Spark in Flames – Profiling Spark Applications Using Flame Graphs

By

When your organization runs multiple jobs on a Spark cluster, resource utilization becomes a priority. Ideally, computations receive sufficient resources to complete in an acceptable time and release resources for other work.

In order to make sure applications do not waste any resources, we want to profile their threads to try and spot any problematic code. Common profiling methods are difficult to apply to a distributed application running on a cluster.

This post suggests an approach to profiling Spark applications. The form of thread profiling used is sampling – capturing stack traces and aggregating these stack traces into meaningful data, in this case displayed as Flame Graphs. Flame graphs are graphs which show what percentage of CPU time is spent running which methods in a clear and user-friendly way as interactive SVG files (http://www.brendangregg.com/flamegraphs.html http://techblog.netflix.com/2015/07/java-in-flames.html). This solution is a derivation of the Hadoop jobs thread profiling solution described in a post by Ihor Bobak which relies on Etsy’s statsd-jvm-profiler.

OVERVIEW

Since Spark applications run on a cluster, and computation is split up into different executors (on different machines), each running their own processes, profiling such an application is trickier than profiling a simple application running on a single JVM. We need to capture stack traces on each executor’s process and collect them to a single place in order to compute Flame Graphs for our application.

To achieve this we specify the statsd-jvm-profiler java agent in the executor processes to capture stack traces. We configure the java agent to report to InfluxDB (a time-series database to centrally store all stack traces with timestamps). Next, we run a script which dumps the stack traces from InfluxDB to text files, which we then input to Flame Graphs SVG rendering script.

You can generate flame graphs for specific executors, or an aggregation of all executors.

As with other profiling methods, your application will incur slight performance overhead by running the profiling itself, so beware of constantly running your applications in production with the java agent.

 

Spark Application Profiling Overview

RESULT

Below is an example of the result of the solution described. What you see is a Flame Graph, aggregated over stack traces taken from all executors running a simple Spark application, which contains a problematic method named inefficientMethod in its code. By studying the Flame Graph we see that the application spends 41.29% of its threads running this method. The second column in the graph, in which we find ineffecientMethod, is the application’s user code. The other columns are Spark native threads which appear in most of the stack traces so the focus of our profiling is on the column containing the user code. By focusing on that second column, we see that ineffecientMethod method takes up most of the user code’s CPU run time and the other parts of the user code are just a tiny sliver reaching the top of the Flame Graph.

Spark in Flames_Image 2

SETUP

1. Download and install InfluxDB 1.0.0

1.0.0 is currently in beta, however 0.13 has concurrency bugs which cause it to fail (See: https://github.com/influxdata/influxdb/issues/6235)

https://influxdata.com/downloads/#influxdb

Run it:

# linux
sudo service influxdb start

Access the DB using either the web UI (http://localhost:8083/) or shell:

# linux
/user/bin/influx

Create a database to store the stack traces in:

CREATE DATABASE profiler

Create a user:

CREATE USER profiler WITH PASSWORD ‘profiler’ WITH ALL PRIVILEGES

2. Build/download statsd-jvm-profiler jar

https://github.com/etsy/statsd-jvm-profiler

Deploy the jar to the machines which are running the executor processes. One way to do this is to using spark-submit’s –jars attribute, which will deploy it to the executor.

--jars /path/to/statsd-jvm-profiler-2.1.0-jar-with-dependencies.jar

Specify the Java agent in your executor processes. This can be done, for example, by using spark-submit’s –conf attribute

-javaagent:statsd-jvm-profiler-2.1.0-jar-with-dependencies.jar=server=<INFLUX_DB_HOST>,port=<INFLUX_DB_PORT>,reporter=InfluxDBReporter,database=<INFLUX_DB_DATABASE_NAME>,username=<INFLUX_DB_USERNAME>,password=<INFLUX_DB_PASSWORD>,prefix=<TAG_VALUE_1>.<TAG_VALUE_2>.….<TAG_VALUE_N>,tagMapping=<TAG_NAME_1>.<TAG_NAME_2>.….<TAG_NAME_N>

3. Download influxdb_dump.py

https://github.com/aviemzur/statsd-jvm-profiler/blob/master/visualization/influxdb_dump.py

Install all required python modules that influxdb_dump.py imports.

Run influxdb_dump.py to create text files with stack traces as input for flame graphs:

python influxdb_dump.py -o "<INFLUX_DB_HOST>" -u <INFLUX_DB_USERNAME> -p <INFLUX_DB_PASSWORD> -d <INFLUX_DB_DATABASE_NAME> -t <TAGS> -e <VALUES> -x "<OUTPUT_DIR>"

4. Download flamegraph.pl

https://github.com/brendangregg/FlameGraph/blob/master/flamegraph.pl

Generate flame graphs using the text files you dumped from DB:

flamegraph.pl <INPUT_FILES> > <OUTPUT_FILES>

Example

The following is an example of running and profiling a Spark application.

Submit spark application:

spark-submit \
--deploy-mode cluster \
--master yarn \
--class com.mycorporation.MySparkApplication \
--conf “spark.executor.extraJavaOptions=-javaagent:statsd-jvm-profiler-2.1.0-jar-with-dependencies.jar=server=influxdbhost.mycorporation.com,port=8086,reporter=InfluxDBReporter,database=profiler,username=profiler,password=profiler,prefix=MyNamespace.MySparkApplication,tagMapping=namespace.application” \
--name MySparkApplication \
-jars /path/to/profiling/statsd-jvm-profiler-2.1.0-jar-with-dependencies.jar \
MySparkApplication.jar

Dump stack traces from DB:

influxdb_dump.py -o "influxdbhost.mycorporation.com" -u profiler -p profiler -d profiler -t namespace.application -e MyNamespace.MySparkApplication -x "my_stack_traces"

Generate Flame Graph (In this case for all executors):

flamegraph.pl my_stack_traces/all_*.txt > my_flame_graph.svg

Deep Learning on Hadoop 2.0

By

The Data Science team in Boston is working on leveraging cutting edge tools and algorithms to optimize business actions based on insights hidden in user data. Data Science heavily exploits machine algorithms that can help us identify and exploit patterns in the data. Obtaining insights from Internet scale data is a challenging undertaking; hence being able to run the algorithms at scale is a crucial requirement. With the explosion of data and accompanying thousand-machine clusters, we need to adapt the algorithms to be able to operate on such distributed environments. Running machine learning algorithms in general purpose distributed computing environment possesses its own set of challenges.

Here we discuss how we have implemented and deployed Deep Learning, a cutting edge machine-learning framework, in one of the Hadoop clusters. We provide the details on how the algorithm was adapted to run in a distributed setting. We also present results of running the algorithm on a standard dataset.

Deep Belief Networks

Deep Belief Networks (DBN) are graphical models that are obtained by stacking and training Restricted Boltzmann Machines (RBM) in a greedy, unsupervised manner [1]. DBNs are trained to extract deep hierarchical representation of the training data by modeling the joint distribution between observed vectors x and the l hidden layers hk as follows, where distribution for each hidden layer is conditioned on a layer immediately preceding it [4]:

DBN Distribution

Equation 1: DBN Distribution

The relationship between the input layer and the hidden layers can be observed in the figure below. At a high level, the first layer is trained as an RBM that models the raw input x. An input is a sparse binary vector representing the data to be classified, for e.g. a binary image of a digit. Subsequent layers are trained using the transformed data (sample or mean activations) as training examples from the previous layers. Number of layers can be determined empirically to obtain the best model performance, and DBNs support arbitrary many layers.

DBN layers

Figure 1: DBN layers

The following code snippet shows the training that goes into an RBM. For the input data supplied to the RBM, there are a multiple number of predefined epochs. The input data is divided into small batches and the weights, activations, and deltas are computed for each layer:

1

 

 

 

 

 

 

 

 

After all layers are trained, the parameters of the Deep Network are fine-tuned using a supervised training criterion. The supervised training criterion, for instance, can be framed as a classification problem, which then allows using the deep network to solve a classification problem. More complex supervised criterion can be employed which can provide interesting results such as scene interpretation, for instance explaining what objects are present in a picture.

Infrastructure

Deep learning has received large attention not only because of the fact that it can deliver results superior to some of the other learning algorithms, but also because it can be run on a distributed setting, allowing processing of large scale datasets. Deep networks can be parallelized in two major levels – at the layer level, and at the data level [6]. For layer level parallelization, many implementations use GPU arrays to compute layer activations in parallel and frequently synchronize them. However, this approach is not suitable for clusters where data can reside across multiple machines connected by a network, because of high network costs. Data level parallelization, in which the training is parallelized to subsets of data, is more suitable for these settings. Most of the data at Paypal is stored in Hadoop clusters; hence being able to run the algorithms in those clusters is our priority. Dedicated cluster maintenance and support is also an important factor for us to consider. However, since deep learning is inherently iterative in nature, a paradigm such as MapReduce is not well suited for running these algorithms. But with the advent of Hadoop 2.0 and Yarn based resource management, we can write iterative applications as we can finely control the resources the application is using. We adapted IterativeReduce [7], a simple abstraction for writing iterative algorithms in Hadoop YARN, and were able to deploy it in one of the PayPal clusters running Hadoop 2.4.1.

Methodology

We implemented the core deep learning algorithm by Hinton, reference in [2]. Since distributing the algorithm for running on multi-machine cluster is our requirement, we adapted their algorithm for such a setting. For distributing the algorithm across multiple machines, we followed the guidelines proposed by Grazia, et al. [6]. A high level summary of our implementation is given below:

  1. Master node initializes the weights of the RBM
  2. Master node pushes the weights and the splits to the worker nodes.
  3. The worker trains a RBM layer for 1 dataset epoch, i.e. one complete pass through the entire split, and sends back the updated weights to the master node.
  4. The master node averages the weights from all workers for a given epoch.
  5. Steps 3-5 are repeated for a predefined set of epochs (50 in our case).
  6. After step 6 is done, one layer is trained. The steps are repeated for subsequent RBM layers.
  7. After all layers are trained, the deep network is fine-tuned using error back-propagation.

The figure below describes a single dataset epoch (steps 3-5) while running the deep learning algorithm. We note that this paradigm can be leveraged to implement a host of machine learning algorithms that are iterative in nature.

Single dataset epoch for training

Figure 2: Single dataset epoch for training

 

The following code snippet shows the steps involved in training a DBN in a single machine. The dataset is first divided into multiple batches. Then multiple RBM layers are initialized and trained sequentially. After the RBMs are trained, they are passed through a fine-tune phase which uses error back propagation.

 

2

 

 

 

 

 

 

 

 

 

We adapted the IterativeReduce [7] implementation for much of the YARN “plumbing”. We did a major overhaul of the implementation to make it useable for our deep learning implementation. The IterativeReduce implementation was written for Cloudera Hadoop distribution, which we re-platformed to adapt it to standard Apache Hadoop distribution. We also rewrote the implementation to use the standard programming models described in [8]. In particular, we used YarnClient API for communication between the client application and the ResourceManager. We also used the AMRMClient and AMNMClient for interaction between ApplicationMaster and ResourceManager and NodeManager.

We first submit an application to the YARN resource manager using the YarnClient API:

3

 

 

 

 

After the application is submitted, YARN resource manager launches the application master. The application master is responsible for allocating and releasing the worker containers as necessary. The application master uses AMRMClient to communicate with the resource manager.

4

 

 

 

 

The application master uses the NMClient API to run commands in the containers it received from the application master.

5

 

 

 

Once the application master launches the worker containers it requires, it sets up a port to communicate with the workers. For our deep learning implementation, we added the methods required for parameter initialization, layer-by-layer training, and fine-tune signaling to the original IterativeReduce interface. IterativeReduce uses Apache Avro IPC for Master-Worker communication.

The following code snippets shows the series of steps involved in Master-worker nodes for distributed training. The master sends the initial parameters to the workers, and then the worker trains its RBM on its portion of the data. After worker is done training, it sends back the results to master, which then combines the results. After the iterations are completed, master completes the process by starting the back propagation fine-tune phase.

6

 

 

 

 

 

 

 

Results

We evaluated the performance of the deep learning implementation using the MNIST handwritten digit recognition [3]. The dataset contains manually labeled hand written digits ranging from 0-9. The training set consists of 60,000 images and the test set consists of 10,000 images.

In order to measure the performance, the DBN was first pre-trained and then fine-tuned on the 60,000 training images. After the above steps, the DBN was then evaluated on the 10,000 test images. No pre-processing was done on the images during training or evaluation. The error rate was obtained as a ratio between total number of misclassified images and the total number of images on the test set.

We were able to achieve the best classification error rate of 1.66% when using RBM with 500-500-2000 hidden units in each RBM, and using a 10-node distributed cluster setting. The error rate is comparable with the error rate of 1.2% reported by authors of the original algorithm (with 500-500-2000 hidden units) [2], and with some of the results with similar settings reported in [3]. We note that original implementation was on a single machine, and our implementation is on a distributed setting. The parameter-averaging step contributes to slight reduction in performance, although the benefit of distributing the algorithm over multiple machines far outweighs the reduction. The table below summarizes the error rate variation per the number of hidden units in each layer while running on a 10-node cluster.

MNIST performance evaluation

Table 1: MNIST performance evaluation

Further thoughts

We had a successful deployment of a distributed deep learning system, which we believe will prove useful in solving some of the machine learning problems. Furthermore, the iterative reduce abstraction can be leveraged to distribute any other suitable machine learning algorithm. Being able to utilize the general-purpose Hadoop cluster will prove highly beneficial for running scalable machine-learning algorithms on large datasets. We note that there are several improvements we would like to make to the current framework, chiefly around reducing the network latency and having more advanced resource management. Additionally we’d like to optimize the DBN framework so that we can minimize inter-node communication. With fine-grained control of cluster resources, Hadoop Yarn framework provides us the flexibility to do so.

References

[1] G. E. Hinton, S. Osindero, and Y. Teh. A fast learning algorithm for deep belief nets. Neural Computations, 18(7):1527–1554, 2006.

[2] G. E. Hinton and R. R. Salakhutdinov. Reducing the Dimensionality of Data with Neural Networks. Science, 313(5786):504–507, 2006.

[3] Y. LeCun, C. Cortes, C. J.C. Burges. The MNIST database of handwritten digits.

[4] Deep Learning Tutorial. LISA lab, University of Montreal

[5] G. E. Hinton. A Practical Guide to Training Restricted Boltzmann Machines. Lecture Notes in Computer Science Volume 7700: 599-619, 2012.

[6] M. Grazia, I. Stoianov, M. Zorzi. Parallelization of Deep Networks. ESANN, 2012

[7] IterativeReduce, https://github.com/jpatanooga/KnittingBoar/wiki/IterativeReduce

[8] Apache Hadoop YARN – Enabling Next Generation Data Applications, http://www.slideshare.net/hortonworks/apache-hadoop-yarn-enabling-nex

Building Data Science at Scale

By

As part of the Boston-based Engineering group, the Data Science team’s charter is to enable science-based personalization and recommendation for PayPal’s global users. As companies of all sizes are starting to leverage their data assets, data science has become indispensable in creating relevant user experience. Helping fulfill PayPal’s mission to build the Web’s most convenient payment solution, the team works with various internal partners and strives to deliver best-in-class data science.

Technology Overview

At the backend of the data science platform reside large-scale machine learning engines that continuously learn and predict, from transactional, behavioral, and other datasets. An example of a question we might try to answer is: if someone just purchased a piece of software, does it increase his likelihood to purchase electronics in near future? It is no wonder that the answer lies in the huge amounts of transaction data, in that what people bought in the past is predictive of what they might consider buying next.

We leverage state-of-the-art machine learning technologies to make such predictions. Machine learning itself has been quickly evolving in recent years. New advances including large-scale matrix factorization, probabilistic behavioral models, and deep learning are no strangers to the team. To make things work at large scale, we leverage Apache Hadoop and its rich ecosystem of tools to process large amounts of data and build data pipelines that are part of the data science platform.

Tackling Data Science

Companies take different approaches in tackling data science. While some companies define a data scientist as someone who performs statistical modeling, we at PayPal Engineering have chosen to take a combined science & engineering approach. Our data scientists are “analytically-minded, statistically and mathematically sophisticated data engineers” [1]. There are of course more science-inclined, and more engineering-inclined individuals on the team, but there is much more of a blend of expertise than a marked distinction between these individuals. This approach to data science allows us to quickly iterate and operationalize high-performing predictive models at scale.

The Venn diagram below, which bears similarity to Conway’s diagram [2], displays the three cornerstones pivotal to the success of the team. Science, which entails machine learning, statistics and analytics, is the methodology by which we generate actionable predictions and insights from very-large datasets. The Engineering component, which includes Apache Hadoop and Spark, makes it possible for us to do science and analytics at scale and deliver results with quick turnaround. Last but not least, I cannot emphasize more the importance of understanding the business for a data scientist. None of the best work in this area that I know of is done in isolation. It is through understanding the problem domain that a data scientist may come up with a better predictive model among other results.

DS-VennDiagram

Scaling Data Science

There are multiple dimensions to scaling data science, which at a minimum involves the team, technology infrastructure, and the operating process. While each of these is worth thoughtful discussion in its own right, I will focus on the operating process since it is critical to how value is delivered. A typical process for the team could break down into these steps:

  • Identify business use case;
  • Understand business logic and KPIs;
  • Identify and capture datasets;
  • Proof of concept and back-test;
  • Operationalize predictive models;
  • Measure lift, optimize and iterate.

Let me explain some of the key elements. First, we see scaling data science as an ongoing collaboration with Product and Business teams. Understanding product and business KPIs and using data science to optimize the same is an essential ingredient of our day-to-day. Second, we follow the best practice in data science, whereby each predictive model is fully back-tested before operationalization. This practice guarantees the effectiveness of the data science platform. Third, the most powerful predictive modeling often requires iterative measurement and optimization. As a concrete example, putting this process into practice along with PayPal Media Network, we were able to achieve excellent results based on:

  • Lookalike modeling: Merchants can reach consumers who look like the merchant’s best existing customers.
  • Purchase intent modeling: Merchants can engage consumers who have a propensity to spend within specific categories.

While it is challenging to crunch the data and create tangible value, it is interesting and rewarding work. I hope to discuss more details of all the fun things we do as data scientists in the future.

References:

[1] http://www.forbes.com/sites/danwoods/2011/10/11/emc-greenplums-steven-hillion-on-what-is-a-data-scientist

[2] http://drewconway.com/zia/2013/3/26/the-data-science-venn-diagram