Login

Factual Blog /

Profiling Hadoop Jobs With Riemann

Factual processes nontrivial amounts of data. Our analyses may range over 1011 records, reading hundreds of gigabytes to hundreds of terabytes of source data and intermediate representations.

At this scale, performance optimizations can save us significant time and money. We use VisualVM, jhat, and Yourkit for memory and CPU profiling, and the excellent Criterium for microbenchmarks in Clojure. These tools are terrific for optimizing single-JVM processes–but are much less effective for analyzing distributed systems, like Hadoop jobs.

There are a few reasons why Hadoop is tough to instrument:

  1. It's a logistical mess to find the JVM that's executing your code. Hadoop spins up new JVMs for each mapper and reducer. Those VMs could be splayed across dozens, hundreds, or even thousands of nodes, or worse, EMR instances. Even then, a typical job is split into several stages, each with a mapper and a reducer executing different code paths. To cover all the phases in your job, you may have to look at dozens of distinct processes.
  2. Once you've found the VMs in question, you have to actually instrument them. Attaching Yourkit to the process remotely works fairly well, but is labor intensive--and may require firewall hijinks. Getting every process to emit a profiler dump is also possible, but then you have to combine that information somehow. Where do you store the files? Where do you copy them to for analysis? How do you name them?
  3. How do you find the dump that's actually meaningful? Long-tail operations can dominate your compute time, especially for reducers. 99% of your tasks may execute just fine--but there might be a few pathological inputs that take a hundred times longer than the others. It's easy enough to see when that happens in the Hadoop dashboard, but mapping that time back to a particular performance dump is harder. I wanted a tool that could tell me what a system was doing continuously.

All of these are complicated by frameworks like Cascading and Hive, which invert control, introduce unpredictable layers of serialization, automatically generate intermediate job phases, and so on. Your code may only be responsible for a fraction of the overall runtime– so wrapping your code in an instrumentation library may not reveal true costs.

With these problems in mind, I’ve built a very simple distributed profiler for soft-real-time telemetry from hundreds to thousands of JVMs concurrently. It’s nowhere near as comprehensive in its analysis as, say, Yourkit, but it can tell you, across a distributed system, which functions are taking the most time, and what their dominant callers are.

It builds on the Riemann monitoring system, which receives events describing the state of each JVM, synthesizes a picture of the system as a whole, and streams that data to the browser. Having a Riemann server as a fixed point of contact means you can treat the Hadoop VMs as an amorphous, undifferentiated soup of compute resources; regardless of where the job executes, data about its execution all flows to the same place.

The profiler works by sampling the stacks of all running threads periodically. It uses Zach Tellman’s clj-radix library and Clojure’s atoms to build up a radix tree of each stacktrace, then computes the dominant functions in that particular JVM by self-time, chooses the maximally-contributing stacktrace for each of those functions, and sends the top candidates off to Riemann as a series of events.

The results have been enlightening.

This riemann-dash view shows the total number of JVMs reporting in, plus the aggregate sample rate in hz. Riemann-jvm-profiler adjusts its sampling interval to stay within a bounded CPU budget, so you can run it on production systems without significant impact. The middle pane shows the highest self-times for each function, in dimensionless units of seconds spent in that function per second. The bottom pane shows detailed information about one specific function and its exemplar stacktrace.

For instance, we found that one of our jobs parsed dates using clj-time.format/parse some-string, then fell back to a custom date format. If the bulk of the input data was in the custom date format, this required clj-time to pass over 52 distinct parsers, allowing each to fail, before trying the most likely case. Trying the custom format first cut the impact of date parsing to negligible levels.

In almost every job I’ve profiled, serialization dominates. In fact, it might be safe to say that less than 10% of the compute time in our Hadoop jobs is actually doing real work. The majority is spent parsing serialized data structures and emitting new ones. Because serialization is handled by the Cascading framework, at a higher level than our analysis code, instrumenting our function calls didn’t reveal the real problem.

As an example, the Kryo serializer has a mechanism for reducing repeated information in records by keeping a cache of previously seen objects. That cache can be surprisingly expensive–even though it is never invoked by our code paths directly. We saw dominant traces like:

java.lang.System identityHashCode (System.java:-2)
com.esotericsoftware.kryo.util.IdentityObjectIntMap get (IdentityObjectIntMap.java:244)
com.esotericsoftware.kryo.util.MapReferenceResolver getWrittenId (MapReferenceResolver.java:28)
com.esotericsoftware.kryo.Kryo writeReferenceOrNull (Kryo.java:574)
com.esotericsoftware.kryo.Kryo writeClassAndObject (Kryo.java:552)
carbonite.serializer$write_map invoke (serializer.clj:69)
...

As it turns out, identityHashCode has been known to be slow for something like eight years, which might (?) have to do with the lack of registerNatives in identityHashCode vs. regular hashCode. Disabling the Kryo referenceResolver significantly boosted the performance of our long-tail reduce phase, which spent the bulk of its time checking its IdentityObjectIntMap to see if it had already emitted that object.

And of course, our age-old friend, Clojure keywords, rears its ugly head once more. Clojure calls String.intern for every keyword read, regardless of whether or not the keyword is already cached. I wrote and tested a series of patches to Clojure’s keyword implementation to ameliorate these costs, yielding an overall 2-3x speedup in our JSON-heavy workloads. Those performance improvements translate into immediate speedups in our other data-heavy applications, just by bumping the Clojure dependency version.

It’s important to note that most sampling profilers like Yourkit, VisualVM, hprof, and riemann-jvm-profiler etc can only sample the VM at safepoints–which may be optimized out of hot methods. That sampling bias can skew the results of a profiling run towards cold methods. But I assert that some information is often better than no information, and we’ve been able to use both Yourkit, VisualVM, and riemann-jvm-profiler to make substantial improvements to our systems.

Want to try it out? Learn more about Riemann, then check out the JVM profiler repo on Github.

Kyle Kingsbury
Software Engineer, Factual

If you like what you see, Factual might be the place for you
See Job Openings