Phil's BigData Recipes

Follow me on GitHub

Made in London by writingphil@gmail.com

4 Big Data Riddles: The Straggler, the Slacker, the Fatso, and the Heckler

This article discusses four bottlenecks in BigData applications and introduces a number of tools, some of which are new, for identifying and removing them. These bottlenecks could occur in any framework but a particular emphasis will be given to Apache Spark and PySpark.


The applications/riddles discussed below have something in common: They require around 10 minutes wall clock time when a “local version” of them is run on a commodity notebook. Using more or more powerful processors or machines for their execution would not significantly reduce their run time. But there are also important differences: Each riddle contains a different kind of bottleneck that is responsible for the slowness and each of these bottlenecks will be identified with a different approach. Some of these analytical tools are innovative or not widely used, references to source code are included in the second part of this article. The first section will discuss the riddles in a “black box” manner:


The Fatso

The fatso occurs frequently in the BigData world. A symptom of running a local version of it is noise – the fans of my notebook are very active for almost 10 minutes, the entire application lifetime. Since we can rarely listen to machines in a cluster computing environment, we need a different approach to identify a fatso:

JVM Profile

The following code snippet (full verson here) combines two information sources, the output of a JVM profiler and normal Spark logs, into a single visualization:

profile_file = './data/ProfileFatso/CpuAndMemoryFatso.json.gz'  # Output from JVM profiler
profile_parser = ProfileParser(profile_file, normalize=True)
data_points: List[Scatter] = profile_parser.make_graph()

logfile = './data/ProfileFatso/JobFatso.log.gz'  # standard Spark logs
log_parser = SparkLogParser(logfile)
stage_interval_markers: Scatter = log_parser.extract_stage_markers()
data_points.append(stage_interval_markers)

layout = log_parser.extract_job_markers(700)
fig = Figure(data=data_points, layout=layout)
plot(fig, filename='fatso.html')

The interactive graph produced by running this script can be analyzed in its full glory here, a smaller snapshot is displayed below:

Fatso profile

Spark’s execution model consists of different units of different “granularity levels” and some of these are displayed above: Boundaries of Spark jobs are represented as vertical dashed lines, start and end points of Spark stages are displayed as transparent blue dots on the x-axis which also show the full stage names/IDs. This scheduling information does not add a lot of insight here since Fatso consists of only one Spark job which in turn consists of just a single Spark stage (comprised of three tasks) but, as shown below, knowing such time points can be very helpful when analyzing more complex applications.

For all graphs in this article, the x-axis shows the application run time as UNIX Epoch time (milliseconds passed since 1 January 1970). The y-axis represents different normalized units for different metrics: For graph lines representing memory metrics such as total heap memory used (“heapMemoryTotalUsed”, ocher green line above), it represents gigabytes; for time measurements like MarkSweep GC collection time (“MarkSweepCollTime”, orange line above), data points on the y-axis represent milliseconds. More details can be found in this data struture which can be changed or extended with new metrics from different profilers.


One available metric, ScavengeCollCount, is absent from the snapshot above but present in the original. It signifies a minor garbage collection event and almost increases linearly up to 20000 during Fatso’s execution. In other words, the application ran for almost 10 minutes – from epoch 1550420474091 (= 17/02/2019 16:21:14) until epoch 1550421148780 (= 17/02/2019 16:32:28) – and more than 20000 minor Garbage Collection events and almost 70 major GC events (“MarkSweepCollCount”, green line) occurred.


When the application was launched, no configuration parameters were manually set so the default Spark settings applied. This means that the maximum memory available to the program was 1GB. Having a closer look at the two heap memory metrics heapMemoryCommitted and heapMemoryTotalUsed reveals that both lines approach this 1GB ceiling near the end of the application.


The intermediate conclusion that can be drawn from the discussion so far is that the application is very memory hungry and a lot of GC activity is going on, but the exact reason for this is still unclear. A second tool can help now:


JVM FlameGraph

The profiler also collected stacktraces which can be folded and transformed into flame graphs with the help of my fold_stacks.py script and this external script:

Phils-MacBook-Pro:analytics a$ python3 fold_stacks.py ./analytics/data/ProfileFatso/StacktraceFatso.json.gz  > Fatso.folded
Phils-MacBook-Pro:analytics a$ perl flamegraph.pl Fatso.folded > FatsoFlame.svg

Opening FatsoFlame.svg in a browser shows the following, the full version which is also searchable (top right corner) is located at this location:

Fatso Flame Graph Reset Zoom Search org.spark_project.jetty.util.thread.QueuedThreadPool.idleJobPoll (20,196 samples, 6.81%) org.spark.. scala.concurrent.impl.Promise$DefaultPromise.ready (6,702 samples, 2.26%) s.. sun.misc.Unsafe.park (20,193 samples, 6.81%) sun.misc... org.spark_project.jetty.server.ServerConnector.accept (6,732 samples, 2.27%) o.. org.apache.spark.sql.Dataset$$anonfun$collect$1.apply (6,705 samples, 2.26%) o.. java.util.concurrent.LinkedBlockingQueue.take (26,888 samples, 9.06%) java.util.con.. org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch (26,898 samples, 9.07%) org.apache.sp.. org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext (20,055 samples, 6.76%) org.apach.. java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos (26,912 samples, 9.07%) java.util.con.. org.spark_project.jetty.util.thread.QueuedThreadPool.runJob (33,660 samples, 11.35%) org.spark_projec.. java.lang.Object.hashCode (420 samples, 0.14%) org.apache.spark.sql.Dataset$$anonfun$collect$1.apply (6,705 samples, 2.26%) o.. org.apache.spark.rdd.RDD.collect (6,702 samples, 2.26%) o.. org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply (26,898 samples, 9.07%) org.apache.sp.. org.apache.spark.sql.Dataset.withAction (6,708 samples, 2.26%) o.. java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly (6,702 samples, 2.26%) j.. org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume (26,928 samples, 9.08%) org.spark_pro.. org.apache.spark.deploy.SparkSubmit.doRunMain$1 (6,738 samples, 2.27%) o.. org.apache.spark.scheduler.Task.run (20,055 samples, 6.76%) org.apach.. profile.sparkjobs.JobFatso$$anonfun$1.apply (20,055 samples, 6.76%) profile.s.. sun.nio.ch.SelectorImpl.select (26,845 samples, 9.05%) sun.nio.ch.Se.. org.apache.spark.util.EventLoop$$anon$1.run (6,731 samples, 2.27%) o.. org.apache.spark.deploy.SparkSubmit.doSubmit (6,746 samples, 2.27%) o.. java.lang.StringBuilder.toString (10,875 samples, 3.67%) java.. java.util.concurrent.locks.LockSupport.park (6,702 samples, 2.26%) j.. sun.nio.ch.KQueueArrayWrapper.kevent0 (26,843 samples, 9.05%) sun.nio.ch.KQ.. org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run (26,898 samples, 9.07%) org.apache.sp.. org.apache.spark.rdd.RDDOperationScope$.withScope (6,702 samples, 2.26%) o.. java.lang.ref.Reference$ReferenceHandler.run (6,749 samples, 2.27%) j.. java.util.concurrent.locks.LockSupport.park (6,729 samples, 2.27%) j.. java.util.concurrent.locks.LockSupport.park (26,888 samples, 9.06%) java.util.con.. org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply (20,055 samples, 6.76%) org.apach.. java.util.TimerThread.run (6,731 samples, 2.27%) j.. org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies (42 samples, 0.01%) sun.reflect.DelegatingMethodAccessorImpl.invoke (6,738 samples, 2.27%) s.. org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume (26,928 samples, 9.08%) org.spark_pro.. sun.misc.Unsafe.park (6,729 samples, 2.27%) s.. org.apache.spark.SparkContext.runJob (6,702 samples, 2.26%) o.. org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply (20,055 samples, 6.76%) org.apach.. io.netty.channel.nio.NioEventLoop.select (26,846 samples, 9.05%) io.netty.chan.. java.lang.ref.ReferenceQueue.remove (6,721 samples, 2.27%) j.. scala.concurrent.impl.Promise$DefaultPromise.tryAwait (6,702 samples, 2.26%) s.. io.netty.channel.nio.SelectedSelectionKeySetSelector.select (26,845 samples, 9.05%) io.netty.chan.. org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated (6,705 samples, 2.26%) o.. org.apache.spark.SparkContext.runJob (6,702 samples, 2.26%) o.. org.apache.spark.util.ThreadUtils$.awaitReady (6,702 samples, 2.26%) o.. profile.sparkjobs.JobFatso.main (6,738 samples, 2.27%) p.. sun.nio.ch.ServerSocketChannelImpl.accept (6,732 samples, 2.27%) s.. org.spark_project.jetty.server.AbstractConnector$Acceptor.run (6,732 samples, 2.27%) o.. org.apache.spark.rdd.RDD.withScope (6,702 samples, 2.26%) o.. java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await (6,729 samples, 2.27%) j.. org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run (20,208 samples, 6.81%) org.apach.. java.util.concurrent.ThreadPoolExecutor.runWorker (128,435 samples, 43.29%) java.util.concurrent.ThreadPoolExecutor.runWorker java.lang.Object.wait (6,745 samples, 2.27%) j.. java.util.Arrays.copyOf (38 samples, 0.01%) java.util.TimerThread.mainLoop (6,731 samples, 2.27%) j.. org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning (6,729 samples, 2.27%) o.. sun.nio.ch.SelectorImpl.select (26,928 samples, 9.08%) sun.nio.ch.Se.. java.lang.AbstractStringBuilder.append (38 samples, 0.01%) java.util.Arrays.copyOfRange (10,270 samples, 3.46%) jav.. org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp (26,898 samples, 9.07%) org.apache.sp.. profile.sparkjobs.JobFatso$.main (6,737 samples, 2.27%) p.. org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp (6,729 samples, 2.27%) o.. sun.nio.ch.SelectorImpl.lockAndDoSelect (26,928 samples, 9.08%) sun.nio.ch.Se.. java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt (6,702 samples, 2.26%) j.. java.lang.ref.ReferenceQueue.remove (6,734 samples, 2.27%) j.. org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext (20,055 samples, 6.76%) org.apach.. org.spark_project.jetty.io.ManagedSelector$SelectorProducer.select (26,928 samples, 9.08%) org.spark_pro.. org.apache.spark.storage.BlockManager$RemoteBlockDownloadFileManager$$anon$2.run (6,734 samples, 2.27%) o.. profile.sparkjobs.Helper$.fatFunctionInner (11,343 samples, 3.82%) prof.. org.apache.spark.deploy.SparkSubmit.submit (6,743 samples, 2.27%) o.. org.apache.spark.SparkContext.runJob (6,702 samples, 2.26%) o.. java.util.concurrent.LinkedBlockingQueue.take (53,872 samples, 18.16%) java.util.concurrent.LinkedB.. java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await (20,205 samples, 6.81%) java.util.. sun.nio.ch.KQueueArrayWrapper.poll (26,928 samples, 9.08%) sun.nio.ch.KQ.. sun.reflect.NativeMethodAccessorImpl.invoke0 (6,738 samples, 2.27%) s.. org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply (20,055 samples, 6.76%) org.apach.. java.lang.Object.wait (6,731 samples, 2.27%) j.. java.lang.Object.wait (6,721 samples, 2.27%) j.. org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply (20,055 samples, 6.76%) org.apach.. java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await (53,872 samples, 18.16%) java.util.concurrent.locks.A.. java.lang.Object.toString (420 samples, 0.14%) java.util.concurrent.locks.LockSupport.parkNanos (20,193 samples, 6.81%) java.util.. org.apache.spark.SparkContext.runJob (6,702 samples, 2.26%) o.. java.util.concurrent.locks.LockSupport.parkNanos (26,912 samples, 9.07%) java.util.con.. org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.deserializetoobject_doConsume_0$ (20,055 samples, 6.76%) org.apach.. scala.collection.immutable.Range.foreach$mVc$sp (20,055 samples, 6.76%) scala.col.. sun.nio.ch.KQueueArrayWrapper.kevent0 (26,928 samples, 9.08%) sun.nio.ch.KQ.. java.util.concurrent.locks.LockSupport.park (6,731 samples, 2.27%) j.. java.util.concurrent.SynchronousQueue$TransferStack.transfer (604 samples, 0.20%) org.apache.spark.MapOutputTrackerMaster$MessageLoop.run (53,872 samples, 18.16%) org.apache.spark.MapOutputTr.. org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit (6,746 samples, 2.27%) o.. scala.concurrent.impl.Promise$DefaultPromise.ready (6,702 samples, 2.26%) s.. org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.execute (26,928 samples, 9.08%) org.spark_pro.. sun.nio.ch.SelectorImpl.select (26,928 samples, 9.08%) sun.nio.ch.Se.. org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply (20,055 samples, 6.76%) org.apach.. scala.util.DynamicVariable.withValue (26,898 samples, 9.07%) scala.util.Dy.. org.apache.spark.deploy.JavaMainApplication.start (6,738 samples, 2.27%) o.. java.util.concurrent.ThreadPoolExecutor.getTask (34,247 samples, 11.54%) java.util.concurr.. java.lang.Object.wait (6,731 samples, 2.27%) j.. java.lang.Object.wait (6,745 samples, 2.27%) j.. java.util.concurrent.LinkedBlockingDeque.take (6,729 samples, 2.27%) j.. profile.sparkjobs.JobFatso$$anonfun$fatFunctionOuter$1.apply$mcVI$sp (11,343 samples, 3.82%) prof.. java.util.concurrent.LinkedBlockingDeque.takeFirst (6,729 samples, 2.27%) j.. java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await (6,731 samples, 2.27%) j.. java.util.concurrent.locks.LockSupport.park (53,872 samples, 18.16%) java.util.concurrent.locks.L.. java.lang.Object.wait (6,734 samples, 2.27%) j.. org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp (26,898 samples, 9.07%) org.apache.sp.. sun.nio.ch.KQueueSelectorImpl.doSelect (26,845 samples, 9.05%) sun.nio.ch.KQ.. java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take (26,912 samples, 9.07%) java.util.con.. org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply (6,705 samples, 2.26%) o.. java.lang.ref.Reference.tryHandlePending (6,748 samples, 2.27%) j.. org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId (6,705 samples, 2.26%) o.. org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply (26,898 samples, 9.07%) org.apache.sp.. org.apache.spark.rdd.MapPartitionsRDD.compute (20,055 samples, 6.76%) org.apach.. java.lang.String.<init> (10,270 samples, 3.46%) jav.. org.apache.spark.util.Utils$.tryOrStopSparkContext (6,729 samples, 2.27%) o.. org.apache.spark.scheduler.DAGScheduler.runJob (6,702 samples, 2.26%) o.. sun.nio.ch.KQueueArrayWrapper.poll (26,843 samples, 9.05%) sun.nio.ch.KQ.. org.apache.spark.executor.Executor$TaskRunner.run (20,097 samples, 6.77%) org.apach.. org.apache.spark.sql.execution.BufferedRowIterator.hasNext (20,055 samples, 6.76%) org.apach.. org.apache.spark.sql.Dataset$$anonfun$53.apply (6,705 samples, 2.26%) o.. org.apache.spark.deploy.SparkSubmit$.main (6,746 samples, 2.27%) o.. sun.misc.Unsafe.park (6,702 samples, 2.26%) s.. org.apache.spark.deploy.SparkSubmit.main (6,746 samples, 2.27%) o.. sun.misc.Unsafe.park (604 samples, 0.20%) sun.misc.Unsafe.park (6,731 samples, 2.27%) s.. sun.reflect.NativeMethodAccessorImpl.invoke (6,738 samples, 2.27%) s.. org.apache.spark.rdd.RDD$$anonfun$collect$1.apply (6,702 samples, 2.26%) o.. java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos (20,193 samples, 6.81%) java.util.. java.util.concurrent.LinkedBlockingQueue.take (6,731 samples, 2.27%) j.. sun.nio.ch.ServerSocketChannelImpl.accept0 (6,732 samples, 2.27%) s.. org.spark_project.jetty.io.ManagedSelector.run (26,928 samples, 9.08%) org.spark_pro.. io.netty.channel.nio.NioEventLoop.run (26,877 samples, 9.06%) io.netty.chan.. org.apache.spark.scheduler.ResultTask.runTask (20,055 samples, 6.76%) org.apach.. java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take (26,912 samples, 9.07%) java.util.con.. java.util.concurrent.locks.LockSupport.parkNanos (604 samples, 0.20%) org.apache.spark.rdd.RDD.iterator (20,055 samples, 6.76%) org.apach.. org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.mapelements_doConsume_0$ (20,055 samples, 6.76%) org.apach.. sun.nio.ch.ServerSocketChannelImpl.accept (6,732 samples, 2.27%) s.. io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run (33,577 samples, 11.32%) io.netty.util.co.. java.util.concurrent.SynchronousQueue.poll (604 samples, 0.20%) java.lang.Object.wait (6,745 samples, 2.27%) j.. profile.sparkjobs.JobFatso$$anonfun$1.apply (20,055 samples, 6.76%) profile.s.. java.lang.ref.Finalizer$FinalizerThread.run (6,749 samples, 2.27%) j.. sun.misc.Unsafe.park (26,912 samples, 9.07%) sun.misc.Unsa.. sun.nio.ch.KQueueSelectorImpl.doSelect (26,928 samples, 9.08%) sun.nio.ch.KQ.. java.util.concurrent.ThreadPoolExecutor$Worker.run (128,435 samples, 43.29%) java.util.concurrent.ThreadPoolExecutor$Worker.run org.apache.spark.ContextCleaner$$anon$1.run (6,729 samples, 2.27%) o.. org.spark_project.jetty.util.thread.QueuedThreadPool.access$800 (20,196 samples, 6.81%) org.spark.. org.apache.spark.util.Utils$.tryWithSafeFinally (20,055 samples, 6.76%) org.apach.. scala.collection.mutable.StringBuilder.toString (10,875 samples, 3.67%) scal.. sun.misc.Unsafe.park (20,205 samples, 6.81%) sun.misc... org.apache.spark.rdd.RDDOperationScope$.withScope (6,702 samples, 2.26%) o.. java.lang.reflect.Method.invoke (6,738 samples, 2.27%) j.. java.lang.AbstractStringBuilder.ensureCapacityInternal (38 samples, 0.01%) org.apache.spark.storage.BlockManager$RemoteBlockDownloadFileManager.org$apache$spark$storage$BlockManager$RemoteBlockDownloadFileManager$$keepCleaning (6,734 samples, 2.27%) o.. org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan (6,705 samples, 2.26%) o.. org.apache.spark.rdd.RDD.computeOrReadCheckpoint (20,055 samples, 6.76%) org.apach.. java.lang.Thread.run (215,869 samples, 72.76%) java.lang.Thread.run org.spark_project.jetty.io.ManagedSelector$SelectorProducer.produce (26,928 samples, 9.08%) org.spark_pro.. profile.sparkjobs.JobFatso$.fatFunctionOuter (20,055 samples, 6.76%) profile.s.. org.apache.spark.sql.Dataset.collect (6,708 samples, 2.26%) o.. java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill (604 samples, 0.20%) sun.nio.ch.SelectorImpl.lockAndDoSelect (26,845 samples, 9.05%) sun.nio.ch.Se.. org.apache.spark.sql.execution.SparkPlan.executeCollect (6,705 samples, 2.26%) o.. io.netty.util.concurrent.SingleThreadEventExecutor$5.run (26,877 samples, 9.06%) io.netty.util.. java.lang.Thread.sleep (6,700 samples, 2.26%) j.. sun.misc.Unsafe.park (26,888 samples, 9.06%) sun.misc.Unsa.. java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await (26,888 samples, 9.06%) java.util.con.. java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly (6,702 samples, 2.26%) j.. org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain (6,738 samples, 2.27%) o.. java.lang.ref.ReferenceQueue.remove (6,749 samples, 2.27%) j.. scala.collection.mutable.StringBuilder.append (41 samples, 0.01%) io.netty.util.ThreadDeathWatcher$Watcher.run (6,700 samples, 2.26%) i.. java.lang.StringBuilder.append (41 samples, 0.01%) sun.misc.Unsafe.park (53,872 samples, 18.16%) sun.misc.Unsafe.park java.util.concurrent.LinkedBlockingQueue.take (20,205 samples, 6.81%) java.util.. all (296,689 samples, 100%) java.util.concurrent.locks.LockSupport.park (20,205 samples, 6.81%) java.util.. org.spark_project.jetty.util.thread.QueuedThreadPool$2.run (53,856 samples, 18.15%) org.spark_project.jetty.util.. java.lang.ref.ReferenceQueue.remove (6,749 samples, 2.27%) j.. org.apache.spark.util.Utils$.tryOrStopSparkContext (26,898 samples, 9.07%) org.apache.sp.. org.spark_project.jetty.util.BlockingArrayQueue.poll (20,196 samples, 6.81%) org.spark..

A rule of thumb for the interpretation of flame graphs is: The more spiky the shape, the better. We see many plateaus above with native Spark/Java functions like sun.misc.unsafe.park sitting on top (first plateau) or low-level functions from packages like io.netty occurring near the top, this is a 3rd party library that Spark depends on for network communication / IO. The only functions in the picture that are defined by me are located in the center plateau, searching for the package name profile.sparkjob in the top right corner will prove this claim. On top of these user defined functions are native Java Array and String functions; a closer look at the definition of fatFunctionOuter and fatFunctionInner would reveal that they create many String objects in an efficient way so we have identified the two Fatso methods that need to be optimized.


Python/PySpark Profiles

What about Spark applications written in Python? I created several PySpark profilers that try to provide some of the functionality of Uber’s JVM profiler. Because of the architecture of PySpark, it might be beneficial to generate both Python and JVM profiles in order to get a good grasp of the overall resource usage. This can be accomplished for the Python edition of Fatso by using the following launch command (abbreviated, full command here):

~/spark-2.4.0-bin-hadoop2.7/bin/spark-submit  \
--conf spark.python.profile=true  \
--conf spark.driver.extraJavaOptions=-javaagent:/.../=sampleInterval=1000,metricInterval=100,reporter=...outputDir=... \
 ./spark_jobs/job_fatso.py  cpumemstack  /users/phil/phil_stopwatch/analytics/data/profile_fatso  >  Fatso_PySpark.log 

The –conf parameter in the third line is responsible for attaching the JVM profiler. The –conf parameter in the second line as well as the two script arguments in the last line are Python specific and required for PySpark profiling: The cpumemstack argument will choose a PySpark profiler that captures both CPU/memory usage as well as stack traces. By providing a second script argument in the form of a directory path, it is ensured that the profile records are written into separate output files instead of just printing all of them to the standard output.


Similar to its Scala cousin, the PySpark edition of Fatso completes in around 10 minutes on my MacBook and creates several JSON files in the specified output directory. The JVM profile could be visualized idenpendently of the Python profile but it might be more insightful to create a single combined graph from them. This can be accomplished easily and is shown in the second half of this script. The full combined graph is located here

Fatso PySpark profile

The clever reader will already have a hunch about the high memory consumption and who is responsible for it: The garbage collection activity of the JVM that is again represented by MarkSweepCollCount and ScavengeCollCount is much lower here compared to the “pure” Spark run described in the previous paragraphs (20000 events above versus less than 20 GC events now). The two inefficient fatso functions are now implemented in Python and therefore not managed by the JVM leading to far fewer JVM memory usage and GC events. A PySpark flamegraph should confirm our hunch:

Phils-MacBook-Pro:analytics a$ python3 fold_stacks.py ./analytics/data/profile_fatso/s_8_stack.json  > FatsoPyspark.folded
Phils-MacBook-Pro:analytics a$ perl flamegraph.pl  FatsoPyspark.folded  > FatsoPySparkFlame.svg

Opening FatsoPySparkFlame.svg in a browser displays …

PySpark Fatso Flame Graph Reset Zoom Search /Users/phil/IdeaProjects/phil_stopwatch/pyspark_profilers.py:profile:221 (7,997 samples, 99.96%) /Users/phil/IdeaProjects/phil_stopwatch/pyspark_profilers.py:profile:221 /Users/phil/IdeaProjects/phil_stopwatch/./spark_jobs/job_fatso.py:fat_function_outer:20 (7,996 samples, 99.95%) /Users/phil/IdeaProjects/phil_stopwatch/./spark_jobs/job_fatso.py:fat_function_outer:20 /Users/phil/IdeaProjects/phil_stopwatch/pyspark_profilers.py:profile:222 (3 samples, 0.04%) /Users/phil/IdeaProjects/phil_stopwatch/helper.py:fat_function_inner:206 (1 samples, 0.01%) /usr/local/Cellar/python/3.6.5/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py:join:1056 (3 samples, 0.04%) /Users/phil/IdeaProjects/phil_stopwatch/helper.py:fat_function_inner:207 (1,432 samples, 17.90%) /Users/phil/IdeaProjects/phil_.. /Users/phil/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py:process:367 (7,997 samples, 99.96%) /Users/phil/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py:process:367 /Users/phil/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/util.py:wrapper:99 (7,997 samples, 99.96%) /Users/phil/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/util.py:wrapper:99 /Users/phil/IdeaProjects/phil_stopwatch/./spark_jobs/job_fatso.py:fat_function_outer:18 (1 samples, 0.01%) /Users/phil/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py:main:370 (8,000 samples, 100.00%) /Users/phil/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py:main:370 all (8,000 samples, 100%) /Users/phil/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/daemon.py:worker:73 (8,000 samples, 100.00%) /Users/phil/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/daemon.py:worker:73 /Users/phil/IdeaProjects/phil_stopwatch/helper.py:fat_function_inner:208 (5,772 samples, 72.15%) /Users/phil/IdeaProjects/phil_stopwatch/helper.py:fat_function_inner:208 /Users/phil/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/daemon.py:manager:170 (8,000 samples, 100.00%) /Users/phil/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/daemon.py:manager:170 /usr/local/Cellar/python/3.6.5/Frameworks/Python.framework/Versions/3.6/lib/python3.6/runpy.py:_run_module_as_main:193 (8,000 samples, 100.00%) /usr/local/Cellar/python/3.6.5/Frameworks/Python.framework/Versions/3.6/lib/python3.6/runpy.py:_run_module_as_main:193 /Users/phil/IdeaProjects/phil_stopwatch/./spark_jobs/job_fatso.py:<lambda>:39 (7,997 samples, 99.96%) /Users/phil/IdeaProjects/phil_stopwatch/./spark_jobs/job_fatso.py:<lambda>:39 /Users/phil/IdeaProjects/phil_stopwatch/pyspark_profilers.py:stop:311 (3 samples, 0.04%) /usr/local/Cellar/python/3.6.5/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py:_wait_for_tstate_lock:1072 (3 samples, 0.04%) /Users/phil/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/daemon.py:<module>:195 (8,000 samples, 100.00%) /Users/phil/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/daemon.py:<module>:195 /usr/local/Cellar/python/3.6.5/Frameworks/Python.framework/Versions/3.6/lib/python3.6/runpy.py:_run_code:85 (8,000 samples, 100.00%) /usr/local/Cellar/python/3.6.5/Frameworks/Python.framework/Versions/3.6/lib/python3.6/runpy.py:_run_code:85 /Users/phil/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py:dump_stream:390 (7,997 samples, 99.96%) /Users/phil/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py:dump_stream:390

And indeed, two fatso methods sit ontop the stack for almost 90% of all measurements burning most CPU cycles. It would be easy to create a combined JVM/Python flamegraph by concatenating the respective stacktrace files. This would be of limited use here though since the JVM flamegraph will likely consist entirely of native Java/Spark functions over which a Python coder has no control. One scenario I can think of where this merging of JVM with PySpark stacktraces might be especilly useful is when Java code or libraries are registered and called from PySpark/Python code which is getting easier and easier in newer versions of Spark. In the discussion of Slacker later on, I will present a combined stack trace of Python and Scala code.



The Straggler

The Straggler is deceiving: It appears as if all resources are fully utilized most the time and only closer analysis can reveal that this might be the case for only a small subset of the system or for a limited period of time. The following graph created from this script combines two CPU metrics with information about task and stage boundaries extracted from the standard logging output of a typical straggler run; the full size graph can be investigated here

Straggler profile

The associated application consisted of one Spark job which is represented as vertical dashed lines at the left and right. This single job was comprised of a single stage, shown as transparent blue dots on the x axis that coincide with the job start and end points. But there were three tasks within that stage so we can see three horizontal task lines. The naming schema of this execution hierarchy is not arbitrary:

  • The stage name in the graph is 0.0@0 because a stage with the id 0.0 which belonged to a job with id 0 is referred to. The first part of stage or task names is a floating point number, this reflects the apparent naming convention in Spark logs that new attempts of failed task or stages are baptized with an incremented fraction part.
  • The task names are 0.0@0.0@0, 1.0@0.0@0, and 2.0@0.0@0 because three tasks were launched that were all members of stage 0.0@0 that in turn belonged to job 0

The three tasks have the same start time which almost coincides with the application’s invocation but very different end times: Tasks 1.0@0.0@0 and 2.0@0.0@0 finish within the first fifth of the application’s lifetime whereas task 0.0@0.0@0 stays alive for almost the entire application since its start and end points are located at the left and right borders of this graph. The orange and light blue lines visualize two CPU metrics (system cpu load and process cpu load) whose fluctuations correspond with the task activity: We can observe that the CPU load drops right after tasks 1.0@0.0@0 and 2.0@0.0@0 end. It stays at around 20% for 4/5 of the time, when only straggler task 0.0@0.0@0 is running.

Concurrency Profiles

When an application consists of more than just one stage with three tasks like Straggler, it might be more illuminating to calculate and represent the total number of tasks that were running at any point during the application’s lifetime. The “concurrency profile” of a BigData workload might look more like