Subscribe via RSS

Monitoring Storm Topologies: Metrics, Best Practices, and Real-World Examples

Storm is a distributed realtime computation system ideal for processing unbounded streams of data and serving as a vehicle for distributing computations. Here at RelateIQ, we use this framework heavily to process several thousand events per minute.

Photo

Distributing computation comes with a price. You have to monitor not just one machine, but many. This is a topic that has come up multiple times on the  storm  mailing list, so I thought it would good to put some ideas together and share some hands-on experience with how we monitor storm topologies.

We can group the monitoring metrics into these categories:

  • Independent metrics from each particular topology
  • JVM metrics
  • Storm metrics (i.e. tuples per minute)
  • Miscellaneous (when using Kafka Spouts)

Independent Metrics from Each Particular Topology

Each topology is different and is going to have different metrics to publish. This is not different than any other way you can monitor your web application. The key here is how to ship application-specific metrics into a data collector for plotting and alerting. It’s worth mentioning some types of data collectors here:

  • Yammer Metrics  with specific backend writer. Some good ones are 
  • Using Statsd  directly is useful when you want to see aggregated data. Here, you would instrument your code to output specific metrics. Statsd will aggregate the received data later on (normally every minute) and write to a backend such as Ganglia, Graphite, or any other service. We have been using Datadog successfully in this capacity and the product as well as their support have been outstanding.

Storm

JVM Metrics

These metrics are used to group information, such as heap size, heap used, and GC time, among others. In the past, I tried to co-locate  JMXTrans  in each Storm box and use Chef to connect to the right ports. For example, I would set up two Storm workers (6700 and 6701) in a particular box and enable JMX for each of them as follows (via storm.yaml file):

-Dcom.sun.management.jmxremote
-Dcom.sun.management.jmxremote.ssl=false
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.port=1%ID%

Note that 1%ID% will be replaced with 16700 and 16701 respectively. This predictability lets me search for how many workers are there in each box via Chef (the workers are Chef attributes), co-locate JMXtrans service in the same box, and dynamically set up a JMXTrans config file for each worker. As you’ve noticed, security is turned off in this example. I highly encourage you to turn security on for production environments.

The code below shows how I tell Chef how many workers I have in each machine:

default[:storm][:workers] = [6700, 6701]

And this is an example of a JMXTrans config file for extracting data from the JVM that writes to a Ganglia collector:

{
  "servers" : [ {
    "port" : "<%= @service_port%>",
    "host" : "127.0.0.1",
    "alias" : "<%= node.name%>:<%= node.name%>",
    "queries" : [ {
      "outputWriters" : [ {
        "@class" : "com.googlecode.jmxtrans.model.output.GangliaWriter",
        "settings" : {
          "groupName" : "jvmheapmemory",
          "port" : <%= node[:ganglia][:port]%>,
          "host" : "<%= node[:ganglia][:collector_host]%>"
        }
      } ],
      "obj" : "java.lang:type=Memory",
      "resultAlias": "<%= @service_name%>-heap",
      "attr" : [ "HeapMemoryUsage", "NonHeapMemoryUsage" ]
    }, {
      "outputWriters" : [ {
        "@class" : "com.googlecode.jmxtrans.model.output.GangliaWriter",
        "settings" : {
          "groupName" : "jvmcmsoldgen",
          "port" : <%= node[:ganglia][:port]%>,
          "host" : "<%= node[:ganglia][:collector_host]%>"
        }
      } ],
      "obj" : "java.lang:name=CMS Old Gen,type=MemoryPool",
      "resultAlias": "<%= @service_name%>-cmsoldgen",
      "attr" : [ "Usage" ]
    } ],
    "numQueryThreads" : 2
  } ]
}

Some Notes on This Approach

  • When a worker is down, JMXTrans errors out, so I had to set up a cron job that restarts JMXTrans service every 30 minutes to make sure I didn’t lose metrics.
  • This approach is high maintenance since it requires keeping both JMXTrans config and storm config in sync.

A second approach is to use Yammer Metrics’ JVM Instrumentation. For it to work, you have to use a reporting writer to write the metrics to a backend collector. I use Ganglia and Statsd.  Here is an adapted version of a Statsd reporter  in which we suppress specific metrics (histograms), since Statsd already constructs histograms for us. Yammer Metrics only uses gauges to publish the data, so suppressing histograms doesn’t seem to be a big deal.

The beauty of this approach is that you no longer have to keep two services’ configurations in sync. As soon as storm launches a topology it will start emitting JVM metrics.

This is a screenshot plotted with Graphite, one of the backends to which Statsd writes.

Storm workers JVM heap

Storm Metrics

By “Storm Metrics”, I mean metrics from the Storm framework itself, such as tuples per minute, latency, capacity, and so on. At the time of this post,  RelateIQ  is using Storm 0.8.3, which has some support to attach hooks to components (Spouts and Bolts). Storm calls those hooks with information about the tuples that are emitted, acked, and failed. This is an example of a hook that writes those metrics to Yammer Metrics meters, which are sent out to Statds every minute (in our case).

public class MeteredSpoutHook extends BaseMeteredHook {

 private Meter emittedTuples;
 private Meter ackedTuples;
 private Meter failedTuples;

 public MeteredSpoutHook() {}

 @Override
 public void emit(EmitInfo emitInfo) {
  emittedTuples.mark();
 }

 @Override
 public void spoutAck(SpoutAckInfo spoutAckInfo) {
  ackedTuples.mark();
  // TODO Do something with: spoutAckInfo.completeLatencyMs
 }

 @Override
 public void spoutFail(SpoutFailInfo spoutFailInfo) {
  failedTuples.mark();
  // TODO Do something with:  spoutFailInfo.failLatencyMs
 }

 @Override
 public void init(TopologyContext context, String topologyName, String label) {

  MetricName emittedTuplesMetricName = new MetricName("Topology-" + topologyName, "Spout_" + label, "EmittedTuples");
  emittedTuples = Metrics.newMeter(emittedTuplesMetricName, "tuples", TimeUnit.MINUTES);

  MetricName ackedTuplesMetricName = new MetricName("Topology-" + topologyName, "Spout_" + label, "AckedTuples");
  ackedTuples = Metrics.newMeter(ackedTuplesMetricName, "tuples", TimeUnit.MINUTES);

  MetricName failedTuplesMetricName = new MetricName("Topology-" + topologyName, "Spout_" + label, "FailedTuples");
  failedTuples = Metrics.newMeter(failedTuplesMetricName, "tuples", TimeUnit.MINUTES);
 }
}

Miscellaneous

If you happen to be using Kafka spouts as one of your spouts implementations, a project I found very useful was stormkafkamon (

original and fork, which fixes some bugs). It is intended to work in conjunction with  Kafka spout from storm-contrib. This spout stores a watermark in Zookeeper, and stormkafkamon reads the latest offset from Kafka broker and from Zookeeper, showing the delta in a nice, formatted way.

+--------------+----------+-----------+--------------+--------------+-------------+----------+--------------+-------+
|    Broker    |  Topic   | Partition |   Earliest   |    Latest    |    Depth    |   Spout  |   Current    | Delta |
+--------------+----------+-----------+--------------+--------------+-------------+----------+--------------+-------+
| kafka-broker | TOPIC_1  |     0     | 17314225754  | 18492471767  |  1178246013 | TOPIC_1  | 18492470390  |  1377 |
| kafka-broker | TOPIC_2  |     0     | 85228601970  | 89208988484  |  3980386514 | TOPIC_2  | 89208987752  |  732  |
| kafka-broker | TOPIC_3  |     0     | 457686650224 | 484187159862 | 26500509638 | TOPIC_3  | 484187157164 |  2698 |
+--------------+----------+-----------+--------------+--------------+-------------+----------+--------------+-------+

Number of brokers:       1
Number of partitions:    3
Total broker depth:      31659142165
Total delta:             4807

With some bash script foo, you could add a cron job that runs it once a minute, then parse the data and send it to Statsd. Cheers to my coworker Jón Grétarsson for writing this script!

If you’d like to add anything, or if you have specific questions or feedback, please leave a comment. I’d love to chat. Also, if you are interested in learning more about joining the RelateIQ team, please drop us a line from our jobs page.