Get fresh updates from Hortonworks by email

Once a month, receive latest insights, trends, analytics, offering information and knowledge of the Big Data.

cta

Erste Schritte

Cloud

Sind Sie bereit?

Sandbox herunterladen

Wie können wir Ihnen helfen?

SchließenSchaltfläche „Schließen“

Trucking IoT on HDF

Introduction

This tutorial will cover the core concepts of Storm and the role it plays in an environment where real-time, low-latency and distributed data processing is important.

We will build a Storm topology from the ground up and demonstrate a full data pipeline, from Internet of Things (IoT) data ingestion from the edge, to data processing with Storm, to persisting this data and viewing it in a visualization web application.

Prerequisites

Outline

  • Trucking IoT Use Case – Discuss a real-world use case and understand the role Storm plays within in.
  • Running the Demo – Walk through the demo and first gain an understanding of the data pipeline.
  • Building a Storm Topology – Dive into Storm internals and build a topology from scratch.
  • Deploying the Topology – Package the Storm code and deploy onto your cluster.

Trucking IoT Use Case

Outline

The IoT Use Case

Imagine a trucking company that dispatches trucks across the country. The trucks are outfitted with sensors that collect data – data like the name of the driver, the route the truck is bound for, the speed of the truck, and even what event recently occured (speeding, the truck weaving out of its lane, following too closely, etc). Data like this is generated very often, say once per second and is streamed back to the company’s servers.

Additionally, the company is also polling an internet service for information about traffic congestion on the different trucking routes.

The company needs a way to process both of these streams of data and combine them in such a way that data from the truck is combined with the most up-to-date congestion data. Additionally, they want to run some analysis on the data so that it can make sure trucks are traveling on time but also keeping cargo safe. Oh, and this also needs to be done in real-time!

Why real-time? The trucking company benefits by having a system in place that injests data from multiple sources, correlates these independant sources of data, runs analysis and intelligently reports on important events going on and even actions that the company can do to immediately improve the situation. This even includes catching imminent truck breakdowns before they occur and analyzing driving habits to predit accidents before the driver gets into one!

Sounds like an important task – this is where Storm comes in.

Architectural Overview

At a high level, our data pipeline requirement looks like the following.

Architectural Overview

In the first section, continous and real-time data from sensors onboard each truck is streamed to the system in real-time and published to Kafka topics. A separate, second, stream carrying traffic congestion information about trucking routes is also streamed into the system and stored in a Kafka topic.

The second section represents the biggest requirement. We need something capable of: unpacking the compressed and serialized sensor data, merging independant streams together, performing aggregation and analytics, reserializing the transformed data, and sending streams back out for persistance and visualization. All this should be done in real-time and in distributed fashion while guaranteeing message processing and low-latency.

For these critical tasks we use Apache Storm.

The third section represents the post-ETL process, in this case being visualizing the processed data by a web application.

What is Storm

Apache Storm is a free and open source data processing engine. It can process and act on massive volumes of data in real-time, performing virtually any type of operation or computation as data flows through its components.

Storm exposes a set of components for doing real-time computation. Like how MapReduce greatly eases the writing of parallel batch processing, Storm’s components greatly ease the writing of parallel, real-time computation.

Storm can be used for processing messages and updating databases (stream processing), doing a continuous query on data streams and streaming the results into clients (continuous computation), parallelizing an intense query like a search query on the fly (distributed RPC), and more.

Benefits of Storm

  • Broad set of use cases: Storm’s small set of primitives satisfy a stunning number of use cases. From processing messages and updating databases to doing continuous query and computation on datastreams to parallelizing a traditionally resource-intensive job like search queries.
  • Scalable: Storm scales to handle massive numbers of messages per second. To scale a topology, one can add machines to the cluster or increase the number of parallel threads spawned by Storm.
  • Guarantee no data loss: Real-time systems must have strong guarantees about data being processed successfully and not allow data to be lost. Storm guarantees processing of every message.
  • Robust: It is an explicit goal of the Storm project to make the user experience of managing Storm clusters as painless as possible. This is in contract to other systems that are difficult to manage and deploy, especially in secured environments.
  • Fault-tolerant: Storm makes sure that a computation can run forever, resassigning tasks as necessary if something in the system fails.
  • Development language agnostic: Storm jobs and components can be defined in any language, making Storm accessible to nearly any developer.

Next: Storm in Action

Now that we’ve got a high-level overview of what our use case looks like, let’s move on to seeing this use case and our solution in an actual running demo application.


Running the Demo

Introduction

Let’s walk through the demo and get an understanding for the data pipeline before we dive deeper into Storm internals.

Outline

Environment Setup

SSH into your HDF environment and download the corresponding demo project.

git clone https://github.com/orendain/trucking-iot-demo-2

Run the automated setup script to download and prepare necessary dependencies.

cd trucking-iot-demo-2
./scripts/setup.sh

Note: The script assumes that Kafka and Storm services are started within Ambari and that the Ambari login credentials are admin/admin.

Generate Sensor Data

The demo application leverages a very robust data simulator, which generates data of two types and publishes them to Kafka topics as a CSV string.

EnrichedTruckData: Data simulated by sensors onboard each truck. For the purposes of this demo, this data has been pre-enriched with data from a weather service.

1488767711734|26|1|Edgar Orendain|107|Springfield to Kansas City Via Columbia|38.95940879245423|-92.21923828125|65|Speeding|1|0|1|60

EnrichedTruckData fields

TrafficData: Data simulated from an online traffic service, which reports on traffic congestion on any particular trucking route.

1488767711734|107|60

TrafficData fields

Start the data generator by executing the appropriate script:

./scripts/run-simulator.sh

Deploy the Storm Topology

With simulated data now being pumped into Kafka topics, we power up Storm and process this data. In a separate terminal window, run the following command:

./scripts/deploy-topology.sh

Note: We’ll cover what exactly a “topology” is in the next section.

Here is a slightly more in-depth look at the steps Storm is taking in processing and transforming the two types of simulated data from above.

General Storm Process

Visualize the Processed Data

With the data now fully processed by Storm and published back into accessible Kafka topics, it’s time to visualize some of our handiwork. Start up the included reactive web application, which subscribes to a Kafka topic that processed data is stored in and renders these merged and processed truck and traffic data points on a map.

./scripts/start-web-application.sh

Bring up the web application by accessing it through your broswer at: sandbox.hortonworks.com:15500

Next: Building a Storm Topology

Now that we know how Storm fits into this data pipeline and what type of ETL work it is performing, let’s dive into the actual code and see exactly how it is built.


Building a Storm Topology

Introduction

We now know the role that Storm plays in this Trucking IoT system. Let’s dive into the code and dissect what the code is doing and also learn how to build this topology.

Outline

Storm Components

Now that we have a general idea of the power of Storm, let’s look at its different components, our building blocks when defining a Storm process, and what they’re used for.

  • Tuple: A list of values. The main data structure in Storm.
  • Stream: An unbounded sequence of tuples.
  • Spout: A source of streams. Spouts will read tuples in from an external source and emit them into streams for the rest of the system to process.
  • Bolt: Processes the tuples from an input stream and produces an output stream of results. This process is also called stream transformation. Bolts can do filtering, run custom functions, aggregations, joins, database operations, and much more.
  • Topology: A network of spouts and bolts that are connected together by streams. In other words, the overall process for Storm to perform.

A Storm topology: spouts, streams and bolts

Environment Setup

We will be working with the trucking-iot-demo-1 project that you downloaded in previous sections. Feel free to download the project again on your local environment so you can open it with your favorite text editor or IDE.

git clone https://github.com/orendain/trucking-iot-demo-1

Alternatively, if you would prefer not to download the code, and simply follow along, you may view this project directly on GitHub.

Topology Build and Submit Overview

Look inside the KafkaToKafka.scala class and you’ll find a companion object with our standard entry point, main, and a KafkaToKafka class with a method named buildTopology which handles the building of our Storm topology.

The primary purpose of our main method is to configure and build our topology and then submit it for deployment onto our cluster. Let’s take a closer look at what’s inside:

// Set up configuration for the Storm Topology
val stormConfig = new Config()
stormConfig.setDebug(config.getBoolean(Config.TOPOLOGY_DEBUG))
stormConfig.setMessageTimeoutSecs(config.getInt(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS))
stormConfig.setNumWorkers(config.getInt(Config.TOPOLOGY_WORKERS))

The org.apache.storm.Config class provides a convenient way to create a topology config by providing setter methods for all the configs that can be set. It also makes it easier to do things like add serializations.

Following the creation of a Config instance, our main method continues with:

// Build and submit the Storm config and topology
val topology = new KafkaToKafka(config).buildTopology()
StormSubmitter.submitTopologyWithProgressBar("KafkaToKafka", stormConfig, topology)

Here, we invoke the buildTopology method of our class, which is responsible for building a StormTopology. With the topology that is returned, we use the StormSubmitter class to submit topologies to run on the Storm cluster.

Starting to Build a Storm Topology

Let’s dive into the buildTopology method to see exactly how to build a topology from the ground up.

// Builder to perform the construction of the topology.
implicit val builder = new TopologyBuilder()

// Configurations used for Kafka components
val zkHosts = new ZkHosts(config.getString(Config.STORM_ZOOKEEPER_SERVERS))
val zkRoot = config.getString(Config.STORM_ZOOKEEPER_ROOT)
val groupId = config.getString("kafka.group-id")

We start by creating an instance of TopologyBuilder, which exposes an easy-to-use Java API for putting together a topology. Next, we pull in some values from our configuration file (application.conf).

Building a Kafka Spout

/*
 * Build a Kafka spout for ingesting enriched truck data
 */

// Name of the Kafka topic to connect to
val truckTopic = config.getString("kafka.truck-data.topic")

// Create a Spout configuration object and apply the scheme for the data that will come through this spout
val truckSpoutConfig = new SpoutConfig(zkHosts, truckTopic, zkRoot, groupId)
truckSpoutConfig.scheme = new SchemeAsMultiScheme(new BufferToStringScheme("EnrichedTruckData"))

// Force the spout to ignore where it left off during previous runs (for demo purposes)
truckSpoutConfig.ignoreZkOffsets = true

In order to build a KafkaSpout, we first need to decide what Kafka topic will be read from, where it exists, and exactly how we want to ingest that data in our topology. This is where a SpoutConfig comes in handy.

This SpoutConfig method takes in a list of Kafka ZooKeeper servers as a string (“sandbox-hdf.hortonworks.com:6667” in our case), the name of the Kafka topic to subscribe to, the ZooKeeper root and a group id (the id of the kafka consumer group property).

Note: This tutorial demonstrates using Storm 1.0.2. As of Storm 1.1.0, however, there are additions that make defining components even easier with even less boilerplate.

Our BufferToStringScheme class defines how the spout converts a data from a Kafka topic into a Storm Tuple. To look at the innerworkings of this class, check out BufferToStringScheme.scala. This class produces a Tuple where the first field is the type of the data, “EnrichedTruckData”, and the second is the record itself (a CSV string record).

Now that we have a SpoutConfig, we use it to build a KafkaSpout and place it in the topology.

// Create a spout with the specified configuration, and place it in the topology blueprint
builder.setSpout("enrichedTruckData", new KafkaSpout(truckSpoutConfig), 1)

Remember that builder refers to the TopologyBuilder. We’re creating a new KafkaSpout with a parallelism_hint of 1 (how many tasks, or instances, of the component to run on the cluster). We place the spout in the topology blueprint with the name “enrichedTruckData”.

Building a Custom Bolt

Excellent, we now have a way to ingest our CSV-delimited strings from Kafka topics and into our Storm topology. We now need a way to unpackage these strings into Java objects so we can more easily interact with them.

Let’s go ahead and build a custom Storm Bolt for this purpose. We’ll call it CSVStringToObjectBolt. But first, let’s see how this new custom bolt will fit into our topology blueprint.

builder.setBolt("unpackagedData", new CSVStringToObjectBolt(), 1)
  .shuffleGrouping("enrichedTruckData")
  .shuffleGrouping("trafficData")

We create a new CSVStringToObjectBolt bolt, and tell spot to assign only a single task for this bolt (i.e. create only 1 instance of this bolt in the cluster). We name it “unpackagedData”.

ShuffleGrouping shuffles data flowing in from the specified spouts evenly across all instances of the newly created bolt.

Let’s dig in and see how we create this bolt from scratch: check out the CSVStringToObjectBolt.java file.

class CSVStringToObjectBolt extends BaseRichBolt {

Rather than creating a Storm bolt entirely from scratch, we leverage one of Storm’s base classes and simply extend BaseRichBolt. BaseRichBolt takes care of a lot of the lower-level implementation for us.

private var outputCollector: OutputCollector = _

override def prepare(stormConf: util.Map[_, _], context: TopologyContext, collector: OutputCollector): Unit = {
  outputCollector = collector
}

The prepare method provides the bolt with an OutputCollector that is used for emitting tuples from this bolt. Tuples can be emitted at anytime from the bolt – in the prepare, execute, or cleanup methods, or even asynchronously in another thread. This prepare implementation simply saves the OutputCollector as an instance variable to be used later on in the execute method.

override def execute(tuple: Tuple): Unit = {
  // Convert each string into its proper case class instance (e.g. EnrichedTruckData or TrafficData)
  val (dataType, data) = tuple.getStringByField("dataType") match {
    case typ @ "EnrichedTruckData" => (typ, EnrichedTruckData.fromCSV(tuple.getStringByField("data")))
    case typ @ "TrafficData" => (typ, TrafficData.fromCSV(tuple.getStringByField("data")))
  }

  outputCollector.emit(new Values(dataType, data))
  outputCollector.ack(tuple)
}

The execute method receives a tuple from one of the bolt’s inputs. For each tuple that this bolt processes, the execute method is called.

We start by extracting the value of the tuple stored under the name “dataType”, which we know is either “EnrichedTruckData” or “TrafficData”. Depending on which it is, we call the fromCSV method of the appropriate object, which returns a JVM object based on this CSV string.

Next, we use the outputCollector to emit a Tuple onto this bolt’s outbound stream.
Finally, we ack (acknowledge) that the bolt has processed this tuple. This is part of Storm’s reliability API for guaranteeing no data loss.

The last method in this bolt is a short one:

override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = declarer.declare(new Fields("dataType", "data"))

The declareOutputFields method declares that this bolt emits 2-tuples with fields called “dataType” and “data”.

That’s it! We’ve just seen how to build a custom Storm bolt from scratch.

Building a Tumbling Windowed Bolt

Let’s get back to our KafkaToKafka class and look at what other components we’re adding downstream of the CSVStringToObjectBolt.

So now, we have KafkaSpouts ingesting in CSV strings from Kafaka topics and a bolt that creating Java objects from these CSV strings. The next step in our process is to join these two types of Java objects into one.

/*
 * Build a windowed bolt for joining two types of Tuples into one
 */

int windowDuration = config.getInt(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS);

BaseWindowedBolt joinBolt = new TruckAndTrafficJoinBolt().withTumblingWindow(new BaseWindowedBolt.Duration(windowDuration, MILLISECONDS));

builder.setBolt("joinedData", joinBolt, 1).globalGrouping("unpackagedData");

Here, we create a tumbling windowed bolt using our custom TruckAndTrafficJoinBolt, which houses the logic for how to merge the different Tuples. This bolt processes both EnrichedTruckData and TrafficData and joins them to emit instances of EnrichedTruckAndTrafficData.

A tumbling window with a duration means the stream of incoming Tuples are partitioned based on the time they were processed. Think of a traffic light, allowing all vehicles to pass but only the ones that get there by the time the light turns red. All tuples that made it within the window are then processed all at once in the TruckAndTrafficJoinBolt.

We’ll take a look at how to build a custom windowed bolt in the next section.

Building a Sliding Windowed Bolt

Now that we have successfully joined data coming in from two streams, let’s perform some windowed analytics on this data.

// The size of the window, in number of Tuples.
val intervalCount = config.getInt(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)

val statsBolt = new DataWindowingBolt().withWindow(new BaseWindowedBolt.Count(intervalCount))

Creates a sliding windowed bolt using our custom DataWindowindBolt, which is responsible for reducing a list of recent Tuples(data) for a particular driver into a single datatype. This data is used for machine learning.

This sliding windowed bolt with a tuple count as a length means we always process the last ‘N’ tuples in the specified bolt. The window slides over by one, dropping the oldest, each time a new tuple is processed.

builder.setBolt("windowedDriverStats", statsBolt, 1).fieldsGrouping("joinedData", new Fields("driverId"))

Build a bolt and then place it in the topology blueprint connected to the “joinedData” stream.

Create 5 tasks for this bolt, to ease the load for any single instance of this bolt. FieldsGrouping partitions the stream of tuples by the fields specified. Tuples with the same driverId will always go to the same task. Tuples with different driverIds may go to different tasks.

Building a Kafka Bolt

Before we push our Storm-processed data back out to Kafka, we want to serialize the Java objects we’ve been working with into string form.

builder.setBolt("serializedJoinedData", new ObjectToCSVStringBolt()).shuffleGrouping("joinedData")

builder.setBolt("serializedDriverStats", new ObjectToCSVStringBolt()).shuffleGrouping("windowedDriverStats")

These bolts, ObjectToCSVStringBolt are inverse to our previous custom bolt, CSVStringToObjectBolt. They expect tuples with Java objects and emit a CSV string representation of them. Check out the source code if you’re interested in their inner-workings.

Now, we have two streams emitting string data: “serializedJoinedData” which is the result of the two joined streams, and “serializedDriverStats”, which is the result of windowed analytics we performed.

Now, we build KafkaBolts to push data from these streams into Kafka topics. We start by defining some Kafka properties:

// Define properties to pass along to the KafkaBolt
val kafkaBoltProps = new Properties()
kafkaBoltProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getString("kafka.bootstrap-servers"))
kafkaBoltProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, config.getString("kafka.key-serializer"))
kafkaBoltProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, config.getString("kafka.value-serializer"))

Next, we build a KafkaBolt:

val truckingKafkaBolt = new KafkaBolt()
  .withTopicSelector(new DefaultTopicSelector(config.getString("kafka.joined-data.topic")))
  .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("key", "data"))
  .withProducerProperties(kafkaBoltProps)

builder.setBolt("joinedDataToKafka", truckingKafkaBolt, 1).shuffleGrouping("serializedJoinedData")

withTopicSelector specifies the Kafka topic to drop entries into.

withTupleToKafkaMapper is passed an instance of FieldNameBasedTupleToKafkaMapper, which tells the bolt which fields of a Tuple the data to pass in is stored as.

withProducerProperties takes in properties to set itself up with.

Creating the Topology

Now that we have specified the entire Storm topology by adding components into our TopologyBuilder, we create an actual topology using the builder’s blueprint and return it.

// Now that the entire topology blueprint has been built, we create an actual topology from it
builder.createTopology()

Next: Deploying the Storm topology

Phew! We’ve now learned about how a Storm topology is developed. In the next section, we’ll package this project up into a portable JAR file and run a quick command that will deploy this code onto a cluster.


Deploying the Topology

Introduction

Now that we know how to develop a Storm topology, let’s go over how to package it up into a JAR file and deploy it onto a cluster.

Outline

Packaging a JAR

In a terminal, navigate to where the Storm project root is located and run:

sbt assembly

This produces an uber jar, housing your topology and all of its dependencies. The jar is saved to /trucking-iot-demo-2/target/scala-2.11/truckingIot-assembly-0.3.2.jar. This gets uploaded to the cluster for deployment to Storm.

Note: Storm 1.1.0 enhances the way topologies are deployed, providing alternatives to using uber jars. Check out the Storm 1.1.0 release notes for more information.

Deploying to Storm

Note: If the jar from the previous section was built on your computer, you’ll have to copy it onto your cluster before running the next command.

On your cluster, run the following command:

storm jar trucking-iot-demo-2/target/scala-2.11/truckingIot-assembly-0.3.2.jar  com.orendainx.hortonworks.storm.topologies.KafkaToKafka

storm jar will submit the jar to the cluster. After uploading the jar, storm jar calls the main function of the class we specified (com.orendainx.hortonworks.storm.topologies.KafkaToKafka), which deploys the topology by way of the StormSubmitter class.

Summary

Congratulations! You now know about the role that Storm plays in a real-time data pipeline and how to create and deploy a topology from scratch.