Proposed Pull Request Change

title description ms.topic ms.date ms.devlang ms.custom
Quickstart: Receive events using Apache Storm - Azure Event Hubs Quickstart: This article provides information on how to receive events from Azure Event Hubs using Apache Storm. quickstart 09/28/2021 java ['mode-other', 'sfi-ropc-nochange']
📄 Document Links
GitHub View on GitHub Microsoft Learn View on Microsoft Learn
Raw New Markdown
Generating updated version of doc...
Rendered New Markdown
Generating updated version of doc...
+0 -0
+0 -0
--- title: 'Quickstart: Receive events using Apache Storm - Azure Event Hubs' description: 'Quickstart: This article provides information on how to receive events from Azure Event Hubs using Apache Storm.' ms.topic: quickstart ms.date: 09/28/2021 ms.devlang: java ms.custom: - mode-other - sfi-ropc-nochange --- # Quickstart: Receive events from Event Hubs using Apache Storm [Apache Storm](https://storm.incubator.apache.org) is a distributed real-time computation system that simplifies reliable processing of unbounded streams of data. This section shows how to use an Azure Event Hubs Storm spout to receive events from Event Hubs. Using Apache Storm, you can split events across multiple processes hosted in different nodes. The Event Hubs integration with Storm simplifies event consumption by transparently checkpointing its progress using Storm's Zookeeper installation, managing persistent checkpoints and parallel receives from Event Hubs. For more information about Event Hubs receive patterns, see the [Event Hubs overview][Event Hubs overview]. ## Prerequisites Before you start with the quickstart, **create an Event Hubs namespace and an event hub**. Use the [Azure portal](https://portal.azure.com) to create a namespace of type Event Hubs, and obtain the management credentials your application needs to communicate with the event hub. To create a namespace and an event hub, follow the procedure in [this article](event-hubs-create.md). ## Create project and add code 1. Use the following command to install the package into the local Maven store. This enables you to add it as a reference in the Storm project in a later step. ```shell mvn install:install-file -Dfile=target\eventhubs-storm-spout-0.9-jar-with-dependencies.jar -DgroupId=com.microsoft.eventhubs -DartifactId=eventhubs-storm-spout -Dversion=0.9 -Dpackaging=jar ``` 1. In Eclipse, create a new Maven project (click **File**, then **New**, then **Project**). ![File -> New -> Project][12] 1. Select **Use default Workspace location**, then click **Next** 1. Select the **maven-archetype-quickstart** archetype, then click **Next** 1. Insert a **GroupId** and **ArtifactId**, then click **Finish** 1. In **pom.xml**, add the following dependencies in the `<dependency>` node. ```xml <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.2-incubating</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.microsoft.eventhubs</groupId> <artifactId>eventhubs-storm-spout</artifactId> <version>0.9</version> </dependency> <dependency> <groupId>com.netflix.curator</groupId> <artifactId>curator-framework</artifactId> <version>1.3.3</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> <scope>provided</scope> </dependency> ``` 1. In the **src** folder, create a file called **Config.properties** and copy the following content, substituting the `receive rule key` and `event hub name` values: ```java eventhubspout.username = ReceiveRule eventhubspout.password = {receive rule key} eventhubspout.namespace = ioteventhub-ns eventhubspout.entitypath = {event hub name} eventhubspout.partitions.count = 16 # if not provided, will use storm's zookeeper settings # zookeeper.connectionstring=localhost:2181 eventhubspout.checkpoint.interval = 10 eventhub.receiver.credits = 10 ``` The value for **eventhub.receiver.credits** determines how many events are batched before releasing them to the Storm pipeline. For the sake of simplicity, this example sets this value to 10. In production, it should usually be set to higher values; for example, 1024. 1 . Create a new class called **LoggerBolt** with the following code: ```java import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; public class LoggerBolt extends BaseRichBolt { private OutputCollector collector; private static final Logger logger = LoggerFactory .getLogger(LoggerBolt.class); @Override public void execute(Tuple tuple) { String value = tuple.getString(0); logger.info("Tuple value: " + value); collector.ack(tuple); } @Override public void prepare(Map map, TopologyContext context, OutputCollector collector) { this.collector = collector; this.count = 0; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // no output fields } } ``` This Storm bolt logs the content of the received events. This can easily be extended to store tuples in a storage service. The [HDInsight Storm with Event Hub example] uses this same approach to store data into Azure Storage and Power BI. 11. Create a class called **LogTopology** with the following code: ```java import java.io.FileReader; import java.util.Properties; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.StormTopology; import backtype.storm.topology.TopologyBuilder; import com.microsoft.eventhubs.samples.EventCount; import com.microsoft.eventhubs.spout.EventHubSpout; import com.microsoft.eventhubs.spout.EventHubSpoutConfig; public class LogTopology { protected EventHubSpoutConfig spoutConfig; protected int numWorkers; protected void readEHConfig(String[] args) throws Exception { Properties properties = new Properties(); if (args.length > 1) { properties.load(new FileReader(args[1])); } else { properties.load(EventCount.class.getClassLoader() .getResourceAsStream("Config.properties")); } String username = properties.getProperty("eventhubspout.username"); String password = properties.getProperty("eventhubspout.password"); String namespaceName = properties .getProperty("eventhubspout.namespace"); String entityPath = properties.getProperty("eventhubspout.entitypath"); String zkEndpointAddress = properties .getProperty("zookeeper.connectionstring"); // opt int partitionCount = Integer.parseInt(properties .getProperty("eventhubspout.partitions.count")); int checkpointIntervalInSeconds = Integer.parseInt(properties .getProperty("eventhubspout.checkpoint.interval")); int receiverCredits = Integer.parseInt(properties .getProperty("eventhub.receiver.credits")); // prefetch count // (opt) System.out.println("Eventhub spout config: "); System.out.println(" partition count: " + partitionCount); System.out.println(" checkpoint interval: " + checkpointIntervalInSeconds); System.out.println(" receiver credits: " + receiverCredits); spoutConfig = new EventHubSpoutConfig(username, password, namespaceName, entityPath, partitionCount, zkEndpointAddress, checkpointIntervalInSeconds, receiverCredits); // set the number of workers to be the same as partition number. // the idea is to have a spout and a logger bolt co-exist in one // worker to avoid shuffling messages across workers in storm cluster. numWorkers = spoutConfig.getPartitionCount(); if (args.length > 0) { // set topology name so that sample Trident topology can use it as // stream name. spoutConfig.setTopologyName(args[0]); } } protected StormTopology buildTopology() { TopologyBuilder topologyBuilder = new TopologyBuilder(); EventHubSpout eventHubSpout = new EventHubSpout(spoutConfig); topologyBuilder.setSpout("EventHubsSpout", eventHubSpout, spoutConfig.getPartitionCount()).setNumTasks( spoutConfig.getPartitionCount()); topologyBuilder .setBolt("LoggerBolt", new LoggerBolt(), spoutConfig.getPartitionCount()) .localOrShuffleGrouping("EventHubsSpout") .setNumTasks(spoutConfig.getPartitionCount()); return topologyBuilder.createTopology(); } protected void runScenario(String[] args) throws Exception { boolean runLocal = true; readEHConfig(args); StormTopology topology = buildTopology(); Config config = new Config(); config.setDebug(false); if (runLocal) { config.setMaxTaskParallelism(2); LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("test", config, topology); Thread.sleep(5000000); localCluster.shutdown(); } else { config.setNumWorkers(numWorkers); StormSubmitter.submitTopology(args[0], config, topology); } } public static void main(String[] args) throws Exception { LogTopology topology = new LogTopology(); topology.runScenario(args); } } ``` This class creates a new Event Hubs spout, using the properties in the configuration file to instantiate it. It is important to note that this example creates as many spouts tasks as the number of partitions in the event hub, in order to use the maximum parallelism allowed by that event hub. ## Next steps You can learn more about Event Hubs by visiting the following links: * [Event Hubs overview][Event Hubs overview] * [Create an event hub](event-hubs-create.md) * [Event Hubs FAQ](event-hubs-faq.yml) <!-- Links --> [Event Hubs overview]: ./event-hubs-about.md [HDInsight Storm]: ../hdinsight/storm/apache-storm-overview.md [HDInsight Storm with Event Hub example]: https://github.com/Azure-Samples/hdinsight-java-storm-eventhub <!-- Images --> [12]: ./media/event-hubs-get-started-receive-storm/create-storm1.png
Success! Branch created successfully. Create Pull Request on GitHub
Error: