When your organization runs multiple jobs on a Spark cluster, resource utilization becomes a priority. Ideally, computations receive sufficient resources to complete in an acceptable time and release resources for other work.
In order to make sure applications do not waste any resources, we want to profile their threads to try and spot any problematic code. Common profiling methods are difficult to apply to a distributed application running on a cluster.
This post suggests an approach to profiling Spark applications. The form of thread profiling used is sampling – capturing stack traces and aggregating these stack traces into meaningful data, in this case displayed as Flame Graphs. Flame graphs are graphs which show what percentage of CPU time is spent running which methods in a clear and user-friendly way as interactive SVG files (http://www.brendangregg.com/flamegraphs.html http://techblog.netflix.com/2015/07/java-in-flames.html). This solution is a derivation of the Hadoop jobs thread profiling solution described in a post by Ihor Bobak which relies on Etsy’s statsd-jvm-profiler.
Since Spark applications run on a cluster, and computation is split up into different executors (on different machines), each running their own processes, profiling such an application is trickier than profiling a simple application running on a single JVM. We need to capture stack traces on each executor’s process and collect them to a single place in order to compute Flame Graphs for our application.
To achieve this we specify the statsd-jvm-profiler java agent in the executor processes to capture stack traces. We configure the java agent to report to InfluxDB (a time-series database to centrally store all stack traces with timestamps). Next, we run a script which dumps the stack traces from InfluxDB to text files, which we then input to Flame Graphs SVG rendering script.
You can generate flame graphs for specific executors, or an aggregation of all executors.
As with other profiling methods, your application will incur slight performance overhead by running the profiling itself, so beware of constantly running your applications in production with the java agent.
Below is an example of the result of the solution described. What you see is a Flame Graph, aggregated over stack traces taken from all executors running a simple Spark application, which contains a problematic method named inefficientMethod in its code. By studying the Flame Graph we see that the application spends 41.29% of its threads running this method. The second column in the graph, in which we find ineffecientMethod, is the application’s user code. The other columns are Spark native threads which appear in most of the stack traces so the focus of our profiling is on the column containing the user code. By focusing on that second column, we see that ineffecientMethod method takes up most of the user code’s CPU run time and the other parts of the user code are just a tiny sliver reaching the top of the Flame Graph.
1. Download and install InfluxDB 1.0.0
1.0.0 is currently in beta, however 0.13 has concurrency bugs which cause it to fail (See: https://github.com/influxdata/influxdb/issues/6235)
sudo service influxdb start
Access the DB using either the web UI (http://localhost:8083/) or shell:
Create a database to store the stack traces in:
CREATE DATABASE profiler
Create a user:
CREATE USER profiler WITH PASSWORD ‘profiler’ WITH ALL PRIVILEGES
2. Build/download statsd-jvm-profiler jar
Deploy the jar to the machines which are running the executor processes. One way to do this is to using spark-submit’s –jars attribute, which will deploy it to the executor.
Specify the Java agent in your executor processes. This can be done, for example, by using spark-submit’s –conf attribute
3. Download influxdb_dump.py
Install all required python modules that influxdb_dump.py imports.
Run influxdb_dump.py to create text files with stack traces as input for flame graphs:
python influxdb_dump.py -o "<INFLUX_DB_HOST>" -u <INFLUX_DB_USERNAME> -p <INFLUX_DB_PASSWORD> -d <INFLUX_DB_DATABASE_NAME> -t <TAGS> -e <VALUES> -x "<OUTPUT_DIR>"
4. Download flamegraph.pl
Generate flame graphs using the text files you dumped from DB:
flamegraph.pl <INPUT_FILES> > <OUTPUT_FILES>
The following is an example of running and profiling a Spark application.
Submit spark application:
--deploy-mode cluster \
--master yarn \
--class com.mycorporation.MySparkApplication \
--conf “spark.executor.extraJavaOptions=-javaagent:statsd-jvm-profiler-2.1.0-jar-with-dependencies.jar=server=influxdbhost.mycorporation.com,port=8086,reporter=InfluxDBReporter,database=profiler,username=profiler,password=profiler,prefix=MyNamespace.MySparkApplication,tagMapping=namespace.application” \
--name MySparkApplication \
-jars /path/to/profiling/statsd-jvm-profiler-2.1.0-jar-with-dependencies.jar \
Dump stack traces from DB:
influxdb_dump.py -o "influxdbhost.mycorporation.com" -u profiler -p profiler -d profiler -t namespace.application -e MyNamespace.MySparkApplication -x "my_stack_traces"
Generate Flame Graph (In this case for all executors):
flamegraph.pl my_stack_traces/all_*.txt > my_flame_graph.svg