Tag Archives: Hadoop

Gimel: PayPal’s Analytics Data Processing Platform


It is no secret that the volume, velocity and variety of data is exploding. As a result, the case to move workloads to the Hadoop big data ecosystem is becoming easier by the day.

With Hadoop and the big data world come the additional complexities of managing the semantics of reading from and writing to various data stores. SQL has not been the lingua franca of the analytics world so far. In this world, you need to deal with a lot of choices – compute engines like Map Reduce, Spark, Flink, Hive, etc. and data stores like HDFS/Hive, Kafka, Elasticsearch, HBase, Aerospike, Cassandra, Couchbase, etc. The code required in order to read from and write to these data stores from different compute engines is fragile and cumbersome and differs significantly between data stores.

In the big data world, it is also extremely difficult to catalog datasets across various data stores in a way that makes it easy to add to the catalog or to search and find datasets.

Besides being unable to find the correct datasets, developers, analysts and data scientists don’t want to deal with the code to read and write data, and obviously want to focus their attention and time towards dataset development, analysis and exploration.

We built the Gimel Data Platform – the centralized metadata catalog PCatalog and the unified Data API – to address these challenges and help commoditize data access in the big data world.

What is the Gimel Data Platform

At the core, the Gimel Data Platform consists of two components:

PCatalog: A centralized metadata catalog powered by Gimel Discovery Services, which register datasets found across all the data stores in the company and makes them searchable through a PCatalog portal as well as programmatically through RESTful APIs.

Data API: A unified API to read from and write to data stores, including data streaming systems like Kafka. In addition to providing the API, the Gimel platform also enables SQL access to the API so developers, analysts and data scientists can now use Gimel SQL to access data stored on any data store. The Data API is built to work in batch mode or in streaming mode.


Gimel ecosystem

Gimel ecosystem

The Gimel Data Platform is tightly integrated with the Gimel Compute Platform, which offers a framework and a set of APIs to access the compute infrastructure — the Gimel Platform is accessed through the Gimel SDK and via Jupyter notebooks.

Additionally, the Gimel ecosystem, as seen from the image above, handles logging, monitoring, and alerting automatically so that there is a robust audit trail maintained for the user as well as for the operator.

By making SQL a first-class citizen, the Gimel Data Platform significantly reduces the time it takes to prototype and build analytic and data pipelines.

User Experience with Gimel

Regardless of the type of user or use case, the experience is simplified into at most 3 steps:

Find your datasets: Log into PCatalog portal and search for your datasets

Once you search for and find your dataset, you can take the logical name of the dataset for the next step.

PCatalog search

PCatalog search

Access your datasets: Navigate to Jupyter notebooks & analyze data

Use the logical name of the dataset you found in the earlier step in your analysis.

Gimel query across data stores

Gimel query across data stores

Schedule and productionalize

Productionalize your analysis

Productionalize your analysis

In a nutshell, PCatalog and its services provide the required dataset name, which is a logical pointer to the physical object. The Gimel library, which requires only the name of the dataset, abstracts the logic to access any supported data store while providing the ability for the user to express logic in SQL.

Implementation Details

Let’s take a look into the following pieces:

PCatalog Portal

The PCatalog portal provides multiple views into the entire metadata catalog. These views, some of which are only visible to administrators, give a comprehensive view of all the physical properties of any dataset, including the data store it belongs to.

Dataset Overview

PCatalog - Dataset overview

PCatalog – Dataset overview

Storage System Attributes (owned and set by Storage System Owner)

PCatalog - System Attributes

PCatalog – System Attributes

Physical Object Level Attributes (owned and set by Object Owner)

PCatalog - Object Attributes

PCatalog – Object Attributes

Dataset Readiness for Consumption on various Hadoop Clusters

PCatalog - Dataset Deployment Status

PCatalog – Dataset Deployment Status

A peek inside Gimel SQL

As a user, you may be reading from one dataset & writing to another dataset but the object’s (physical) structure and location is not your interest. We will look at two examples of reading from one data store and writing into another.

Reading from Kafka, persisting into HDFS/Hive in Batch mode

The processing logic is all possible via SQL, powered by Gimel:

Kafka to HDFS/Hive

Kafka to HDFS/Hive

Reading from Kafka, streaming into HBase

As you can see below, even this streaming access is easily achieved through SQL:

Kafka to HBase

Kafka to HBase

Regardless of the data store or whether it is in batch mode or streaming, you can express all the logic as SQL!

Anatomy of Data API (how does it go from SQL to native code execution)

Anatomy of Data API

Anatomy of Data API

Catalog provider modes

Under the hood, CatalogProvider provides an interface for the Gimel Library to get the dataset details from PCatalog Services. The type of the dataset is a critical configuration provided by CatalogProvider. A DataSet or DataStream Factory in turn returns a StorageDataSet. (For example, KafkaDataSet or ElasticDataSet.)

Thus, the read/write implementations with in each of these StorageDataSets can perform the core read/write operations.

CatalogProvider Modes

CatalogProvider Modes

Gimel is Open Sourced

Gimel is now open sourced and ready for your contributions. Just go over to gimel.io and provide us your inputs. For instructions to get started on your local machine, you can go to try.gimel.io.

We are convinced that Gimel Data Platform will speed up time to market, facilitate innovation, and enable a large population of analysts familiar with SQL to step into the big data world easily.

We cannot wait for you to try it!



Deepak Mohanakumar Chandramouli, Romit Mehta, Prabhu Kasinathan, Sid Anand

Deepak has over 13 years of experience in Data Engineering & 5 years of expertise building scalable data solutions in the Big Data space. He worked on building Apache Spark based Foundational Big Data Platform during the incubation of PayPal’s Data lake. He has applied experience in implementing spark based solutions across several types of No-SQL, Key-Value, Messaging, Document based & relational systems. Deepak has been leading the initiative to enable access to any type of storage on Spark via – unified Data API, SQL, tools & services, thus simplifying analytics & large-scale computation-intensive applications. Github: Dee-Pac

Romit has been building data and analytics solutions for a wide variety of companies across networking, semi-conductors, telecom, security and fintech industries for over 19 years. At PayPal he is leading the product management of core big data and analytics platform products which include a compute framework, a data platform and a notebooks platform. As part of this role, Romit is working to simplify application development on big data technologies like Spark and improve analyst and data scientist agility and ease of accessing data spread across a multitude of data stores via friendly technologies like SQL and notebooks. Github: theromit

Prabhu Kasinathan is engineering lead at PayPal, focusing on building highly scalable and distributed Analytics Platform to handle petabyte scale data at PayPal. His team focus on creating frameworks, APIs, productivity tools and services for big data platform to support multi-tenancy and large scale computation-intensive applications. Github: prabhu1984

Sid Anand currently serves as PayPal’s Chief Data Engineer, focusing on ways to realize the value of data. Prior to joining PayPal, he held several positions including Agari’s Data Architect, a Technical Lead in Search @ LinkedIn, Netflix’s Cloud Data Architect, Etsy’s VP of Engineering, and several technical roles at eBay. Sid earned his BS and MS degrees in CS from Cornell University, where he focused on Distributed Systems. In his spare time, he is a maintainer/committer on Apache Airflow, a co-chair for QCon, and a frequent speaker at conferences. Github: r39132

Spark in Flames – Profiling Spark Applications Using Flame Graphs


When your organization runs multiple jobs on a Spark cluster, resource utilization becomes a priority. Ideally, computations receive sufficient resources to complete in an acceptable time and release resources for other work.

In order to make sure applications do not waste any resources, we want to profile their threads to try and spot any problematic code. Common profiling methods are difficult to apply to a distributed application running on a cluster.

This post suggests an approach to profiling Spark applications. The form of thread profiling used is sampling – capturing stack traces and aggregating these stack traces into meaningful data, in this case displayed as Flame Graphs. Flame graphs are graphs which show what percentage of CPU time is spent running which methods in a clear and user-friendly way as interactive SVG files (http://www.brendangregg.com/flamegraphs.html http://techblog.netflix.com/2015/07/java-in-flames.html). This solution is a derivation of the Hadoop jobs thread profiling solution described in a post by Ihor Bobak which relies on Etsy’s statsd-jvm-profiler.


Since Spark applications run on a cluster, and computation is split up into different executors (on different machines), each running their own processes, profiling such an application is trickier than profiling a simple application running on a single JVM. We need to capture stack traces on each executor’s process and collect them to a single place in order to compute Flame Graphs for our application.

To achieve this we specify the statsd-jvm-profiler java agent in the executor processes to capture stack traces. We configure the java agent to report to InfluxDB (a time-series database to centrally store all stack traces with timestamps). Next, we run a script which dumps the stack traces from InfluxDB to text files, which we then input to Flame Graphs SVG rendering script.

You can generate flame graphs for specific executors, or an aggregation of all executors.

As with other profiling methods, your application will incur slight performance overhead by running the profiling itself, so beware of constantly running your applications in production with the java agent.


Spark Application Profiling Overview


Below is an example of the result of the solution described. What you see is a Flame Graph, aggregated over stack traces taken from all executors running a simple Spark application, which contains a problematic method named inefficientMethod in its code. By studying the Flame Graph we see that the application spends 41.29% of its threads running this method. The second column in the graph, in which we find ineffecientMethod, is the application’s user code. The other columns are Spark native threads which appear in most of the stack traces so the focus of our profiling is on the column containing the user code. By focusing on that second column, we see that ineffecientMethod method takes up most of the user code’s CPU run time and the other parts of the user code are just a tiny sliver reaching the top of the Flame Graph.

Spark in Flames_Image 2


1. Download and install InfluxDB 1.0.0

1.0.0 is currently in beta, however 0.13 has concurrency bugs which cause it to fail (See: https://github.com/influxdata/influxdb/issues/6235)


Run it:

# linux
sudo service influxdb start

Access the DB using either the web UI (http://localhost:8083/) or shell:

# linux

Create a database to store the stack traces in:


Create a user:


2. Build/download statsd-jvm-profiler jar


Deploy the jar to the machines which are running the executor processes. One way to do this is to using spark-submit’s –jars attribute, which will deploy it to the executor.

--jars /path/to/statsd-jvm-profiler-2.1.0-jar-with-dependencies.jar

Specify the Java agent in your executor processes. This can be done, for example, by using spark-submit’s –conf attribute


3. Download influxdb_dump.py


Install all required python modules that influxdb_dump.py imports.

Run influxdb_dump.py to create text files with stack traces as input for flame graphs:


4. Download flamegraph.pl


Generate flame graphs using the text files you dumped from DB:

flamegraph.pl <INPUT_FILES> > <OUTPUT_FILES>


The following is an example of running and profiling a Spark application.

Submit spark application:

spark-submit \
--deploy-mode cluster \
--master yarn \
--class com.mycorporation.MySparkApplication \
--conf “spark.executor.extraJavaOptions=-javaagent:statsd-jvm-profiler-2.1.0-jar-with-dependencies.jar=server=influxdbhost.mycorporation.com,port=8086,reporter=InfluxDBReporter,database=profiler,username=profiler,password=profiler,prefix=MyNamespace.MySparkApplication,tagMapping=namespace.application” \
--name MySparkApplication \
-jars /path/to/profiling/statsd-jvm-profiler-2.1.0-jar-with-dependencies.jar \

Dump stack traces from DB:

influxdb_dump.py -o "influxdbhost.mycorporation.com" -u profiler -p profiler -d profiler -t namespace.application -e MyNamespace.MySparkApplication -x "my_stack_traces"

Generate Flame Graph (In this case for all executors):

flamegraph.pl my_stack_traces/all_*.txt > my_flame_graph.svg

Deep Learning on Hadoop 2.0


The Data Science team in Boston is working on leveraging cutting edge tools and algorithms to optimize business actions based on insights hidden in user data. Data Science heavily exploits machine algorithms that can help us identify and exploit patterns in the data. Obtaining insights from Internet scale data is a challenging undertaking; hence being able to run the algorithms at scale is a crucial requirement. With the explosion of data and accompanying thousand-machine clusters, we need to adapt the algorithms to be able to operate on such distributed environments. Running machine learning algorithms in general purpose distributed computing environment possesses its own set of challenges.

Here we discuss how we have implemented and deployed Deep Learning, a cutting edge machine-learning framework, in one of the Hadoop clusters. We provide the details on how the algorithm was adapted to run in a distributed setting. We also present results of running the algorithm on a standard dataset.

Deep Belief Networks

Deep Belief Networks (DBN) are graphical models that are obtained by stacking and training Restricted Boltzmann Machines (RBM) in a greedy, unsupervised manner [1]. DBNs are trained to extract deep hierarchical representation of the training data by modeling the joint distribution between observed vectors x and the l hidden layers hk as follows, where distribution for each hidden layer is conditioned on a layer immediately preceding it [4]:

DBN Distribution

Equation 1: DBN Distribution

The relationship between the input layer and the hidden layers can be observed in the figure below. At a high level, the first layer is trained as an RBM that models the raw input x. An input is a sparse binary vector representing the data to be classified, for e.g. a binary image of a digit. Subsequent layers are trained using the transformed data (sample or mean activations) as training examples from the previous layers. Number of layers can be determined empirically to obtain the best model performance, and DBNs support arbitrary many layers.

DBN layers

Figure 1: DBN layers

The following code snippet shows the training that goes into an RBM. For the input data supplied to the RBM, there are a multiple number of predefined epochs. The input data is divided into small batches and the weights, activations, and deltas are computed for each layer:










After all layers are trained, the parameters of the Deep Network are fine-tuned using a supervised training criterion. The supervised training criterion, for instance, can be framed as a classification problem, which then allows using the deep network to solve a classification problem. More complex supervised criterion can be employed which can provide interesting results such as scene interpretation, for instance explaining what objects are present in a picture.


Deep learning has received large attention not only because of the fact that it can deliver results superior to some of the other learning algorithms, but also because it can be run on a distributed setting, allowing processing of large scale datasets. Deep networks can be parallelized in two major levels – at the layer level, and at the data level [6]. For layer level parallelization, many implementations use GPU arrays to compute layer activations in parallel and frequently synchronize them. However, this approach is not suitable for clusters where data can reside across multiple machines connected by a network, because of high network costs. Data level parallelization, in which the training is parallelized to subsets of data, is more suitable for these settings. Most of the data at Paypal is stored in Hadoop clusters; hence being able to run the algorithms in those clusters is our priority. Dedicated cluster maintenance and support is also an important factor for us to consider. However, since deep learning is inherently iterative in nature, a paradigm such as MapReduce is not well suited for running these algorithms. But with the advent of Hadoop 2.0 and Yarn based resource management, we can write iterative applications as we can finely control the resources the application is using. We adapted IterativeReduce [7], a simple abstraction for writing iterative algorithms in Hadoop YARN, and were able to deploy it in one of the PayPal clusters running Hadoop 2.4.1.


We implemented the core deep learning algorithm by Hinton, reference in [2]. Since distributing the algorithm for running on multi-machine cluster is our requirement, we adapted their algorithm for such a setting. For distributing the algorithm across multiple machines, we followed the guidelines proposed by Grazia, et al. [6]. A high level summary of our implementation is given below:

  1. Master node initializes the weights of the RBM
  2. Master node pushes the weights and the splits to the worker nodes.
  3. The worker trains a RBM layer for 1 dataset epoch, i.e. one complete pass through the entire split, and sends back the updated weights to the master node.
  4. The master node averages the weights from all workers for a given epoch.
  5. Steps 3-5 are repeated for a predefined set of epochs (50 in our case).
  6. After step 6 is done, one layer is trained. The steps are repeated for subsequent RBM layers.
  7. After all layers are trained, the deep network is fine-tuned using error back-propagation.

The figure below describes a single dataset epoch (steps 3-5) while running the deep learning algorithm. We note that this paradigm can be leveraged to implement a host of machine learning algorithms that are iterative in nature.

Single dataset epoch for training

Figure 2: Single dataset epoch for training


The following code snippet shows the steps involved in training a DBN in a single machine. The dataset is first divided into multiple batches. Then multiple RBM layers are initialized and trained sequentially. After the RBMs are trained, they are passed through a fine-tune phase which uses error back propagation.












We adapted the IterativeReduce [7] implementation for much of the YARN “plumbing”. We did a major overhaul of the implementation to make it useable for our deep learning implementation. The IterativeReduce implementation was written for Cloudera Hadoop distribution, which we re-platformed to adapt it to standard Apache Hadoop distribution. We also rewrote the implementation to use the standard programming models described in [8]. In particular, we used YarnClient API for communication between the client application and the ResourceManager. We also used the AMRMClient and AMNMClient for interaction between ApplicationMaster and ResourceManager and NodeManager.

We first submit an application to the YARN resource manager using the YarnClient API:






After the application is submitted, YARN resource manager launches the application master. The application master is responsible for allocating and releasing the worker containers as necessary. The application master uses AMRMClient to communicate with the resource manager.






The application master uses the NMClient API to run commands in the containers it received from the application master.





Once the application master launches the worker containers it requires, it sets up a port to communicate with the workers. For our deep learning implementation, we added the methods required for parameter initialization, layer-by-layer training, and fine-tune signaling to the original IterativeReduce interface. IterativeReduce uses Apache Avro IPC for Master-Worker communication.

The following code snippets shows the series of steps involved in Master-worker nodes for distributed training. The master sends the initial parameters to the workers, and then the worker trains its RBM on its portion of the data. After worker is done training, it sends back the results to master, which then combines the results. After the iterations are completed, master completes the process by starting the back propagation fine-tune phase.










We evaluated the performance of the deep learning implementation using the MNIST handwritten digit recognition [3]. The dataset contains manually labeled hand written digits ranging from 0-9. The training set consists of 60,000 images and the test set consists of 10,000 images.

In order to measure the performance, the DBN was first pre-trained and then fine-tuned on the 60,000 training images. After the above steps, the DBN was then evaluated on the 10,000 test images. No pre-processing was done on the images during training or evaluation. The error rate was obtained as a ratio between total number of misclassified images and the total number of images on the test set.

We were able to achieve the best classification error rate of 1.66% when using RBM with 500-500-2000 hidden units in each RBM, and using a 10-node distributed cluster setting. The error rate is comparable with the error rate of 1.2% reported by authors of the original algorithm (with 500-500-2000 hidden units) [2], and with some of the results with similar settings reported in [3]. We note that original implementation was on a single machine, and our implementation is on a distributed setting. The parameter-averaging step contributes to slight reduction in performance, although the benefit of distributing the algorithm over multiple machines far outweighs the reduction. The table below summarizes the error rate variation per the number of hidden units in each layer while running on a 10-node cluster.

MNIST performance evaluation

Table 1: MNIST performance evaluation

Further thoughts

We had a successful deployment of a distributed deep learning system, which we believe will prove useful in solving some of the machine learning problems. Furthermore, the iterative reduce abstraction can be leveraged to distribute any other suitable machine learning algorithm. Being able to utilize the general-purpose Hadoop cluster will prove highly beneficial for running scalable machine-learning algorithms on large datasets. We note that there are several improvements we would like to make to the current framework, chiefly around reducing the network latency and having more advanced resource management. Additionally we’d like to optimize the DBN framework so that we can minimize inter-node communication. With fine-grained control of cluster resources, Hadoop Yarn framework provides us the flexibility to do so.


[1] G. E. Hinton, S. Osindero, and Y. Teh. A fast learning algorithm for deep belief nets. Neural Computations, 18(7):1527–1554, 2006.

[2] G. E. Hinton and R. R. Salakhutdinov. Reducing the Dimensionality of Data with Neural Networks. Science, 313(5786):504–507, 2006.

[3] Y. LeCun, C. Cortes, C. J.C. Burges. The MNIST database of handwritten digits.

[4] Deep Learning Tutorial. LISA lab, University of Montreal

[5] G. E. Hinton. A Practical Guide to Training Restricted Boltzmann Machines. Lecture Notes in Computer Science Volume 7700: 599-619, 2012.

[6] M. Grazia, I. Stoianov, M. Zorzi. Parallelization of Deep Networks. ESANN, 2012

[7] IterativeReduce, https://github.com/jpatanooga/KnittingBoar/wiki/IterativeReduce

[8] Apache Hadoop YARN – Enabling Next Generation Data Applications, http://www.slideshare.net/hortonworks/apache-hadoop-yarn-enabling-nex

Building Data Science at Scale


As part of the Boston-based Engineering group, the Data Science team’s charter is to enable science-based personalization and recommendation for PayPal’s global users. As companies of all sizes are starting to leverage their data assets, data science has become indispensable in creating relevant user experience. Helping fulfill PayPal’s mission to build the Web’s most convenient payment solution, the team works with various internal partners and strives to deliver best-in-class data science.

Technology Overview

At the backend of the data science platform reside large-scale machine learning engines that continuously learn and predict, from transactional, behavioral, and other datasets. An example of a question we might try to answer is: if someone just purchased a piece of software, does it increase his likelihood to purchase electronics in near future? It is no wonder that the answer lies in the huge amounts of transaction data, in that what people bought in the past is predictive of what they might consider buying next.

We leverage state-of-the-art machine learning technologies to make such predictions. Machine learning itself has been quickly evolving in recent years. New advances including large-scale matrix factorization, probabilistic behavioral models, and deep learning are no strangers to the team. To make things work at large scale, we leverage Apache Hadoop and its rich ecosystem of tools to process large amounts of data and build data pipelines that are part of the data science platform.

Tackling Data Science

Companies take different approaches in tackling data science. While some companies define a data scientist as someone who performs statistical modeling, we at PayPal Engineering have chosen to take a combined science & engineering approach. Our data scientists are “analytically-minded, statistically and mathematically sophisticated data engineers” [1]. There are of course more science-inclined, and more engineering-inclined individuals on the team, but there is much more of a blend of expertise than a marked distinction between these individuals. This approach to data science allows us to quickly iterate and operationalize high-performing predictive models at scale.

The Venn diagram below, which bears similarity to Conway’s diagram [2], displays the three cornerstones pivotal to the success of the team. Science, which entails machine learning, statistics and analytics, is the methodology by which we generate actionable predictions and insights from very-large datasets. The Engineering component, which includes Apache Hadoop and Spark, makes it possible for us to do science and analytics at scale and deliver results with quick turnaround. Last but not least, I cannot emphasize more the importance of understanding the business for a data scientist. None of the best work in this area that I know of is done in isolation. It is through understanding the problem domain that a data scientist may come up with a better predictive model among other results.


Scaling Data Science

There are multiple dimensions to scaling data science, which at a minimum involves the team, technology infrastructure, and the operating process. While each of these is worth thoughtful discussion in its own right, I will focus on the operating process since it is critical to how value is delivered. A typical process for the team could break down into these steps:

  • Identify business use case;
  • Understand business logic and KPIs;
  • Identify and capture datasets;
  • Proof of concept and back-test;
  • Operationalize predictive models;
  • Measure lift, optimize and iterate.

Let me explain some of the key elements. First, we see scaling data science as an ongoing collaboration with Product and Business teams. Understanding product and business KPIs and using data science to optimize the same is an essential ingredient of our day-to-day. Second, we follow the best practice in data science, whereby each predictive model is fully back-tested before operationalization. This practice guarantees the effectiveness of the data science platform. Third, the most powerful predictive modeling often requires iterative measurement and optimization. As a concrete example, putting this process into practice along with PayPal Media Network, we were able to achieve excellent results based on:

  • Lookalike modeling: Merchants can reach consumers who look like the merchant’s best existing customers.
  • Purchase intent modeling: Merchants can engage consumers who have a propensity to spend within specific categories.

While it is challenging to crunch the data and create tangible value, it is interesting and rewarding work. I hope to discuss more details of all the fun things we do as data scientists in the future.


[1] http://www.forbes.com/sites/danwoods/2011/10/11/emc-greenplums-steven-hillion-on-what-is-a-data-scientist

[2] http://drewconway.com/zia/2013/3/26/the-data-science-venn-diagram