Skip to Content


Tag Archives: Storm


Preview of Kafka Streams

The preview of Kafka Streams, which is one of the main features of the upcoming Apache Kafka 0.10, was announced by Jay Kreps this week.

Kafka joins the Stream Processing club

Kafka Streams is a library to build streaming applications using Kafka topics as input/output. Kafka Streams is in the same league as other streaming systems such as: Apache Storm, Apache Flink and, not surprisingly, Apache Samza which also uses Kafka for input or output of streams.

One of the main advantages is that if you’re already using Apache Kafka and you need real-time processing, you just need to use the library and you are good to go.
Other important features are: stateful processing, windowing and ability to be deployed using your preferred solution: a simple command line, Mesos, YARN or kubernetes and docker if you’re a container party boy.

Streams and Tables

One of the key concepts in Kafka Streams is the support of KStream and KTable.
That isn’t a new concept if you come from the Event Sourcing world: the KStream is the append-only event store where its state is given by replaying the events from the beginning of time until the last event whereas KTable is the snapshot or projection of the current state of the stream given a point in time.

Example: Twitter Hashtags Job


Kafka Streams: KStream and KTable


Show me the code!

You can find the complete example here:

KStream<String, JsonNode> source =, jsonDeserializer, "streams-hashtag-input");

KTable<String, Long> counts = source
        .filter(new HashtagFilter())
        .flatMapValues(new HashtagSplitter())
        .map(new HashtagMapper())
        .countByKey(stringSerializer, longSerializer, stringDeserializer, longDeserializer, "Counts");"streams-hashtag-count-output", stringSerializer, longSerializer);

For this example I’ve been using a simple TweetProducer who connects to the Twitter Streaming API and sends JSON events to a Kafka topic.
This topic is read as a KStream and then we begin the process:

  1. Filter out the tweets without hashtags
  2. Apply a flatMapValues (we are just interested in the values, not the keys) to split the different hashtags in a tweet
  3. Apply a map to return a key (hashtag) value (hashtag) as we want to aggregate by hashtag
  4. Aggregate the streams per key (the hashtag) and count them


Finally we send the KTable to the output queue.


Sentiment analysis of tweets

In the previous post I have presented an overview of the topology used to analyse twitter streams with Kafka and Storm. Now it’s time to cover the technical details of the twitter topology.

Twitter Topology

The declaration of the storm topology using KafkaSpout to read the tweets from a kafka queue:

public class TwitterProcessorTopology extends BaseTopology {

    public TwitterProcessorTopology(String configFileLocation) throws Exception {

    private void configureKafkaSpout(TopologyBuilder topology) {
        BrokerHosts hosts = new ZkHosts(topologyConfig.getProperty(""));

        SpoutConfig spoutConfig = new SpoutConfig(
        spoutConfig.scheme= new SchemeAsMultiScheme(new StringScheme());

        KafkaSpout kafkaSpout= new KafkaSpout(spoutConfig);
        topology.setSpout("twitterSpout", kafkaSpout);

    private void configureBolts(TopologyBuilder topology) {
        // filtering
        topology.setBolt("twitterFilter", new TwitterFilterBolt(), 4)

        // sanitization
        topology.setBolt("textSanitization", new TextSanitizationBolt(), 4)

        // sentiment analysis
        topology.setBolt("sentimentAnalysis", new SentimentAnalysisBolt(), 4)

        // persist tweets with analysis to Cassandra
        topology.setBolt("sentimentAnalysisToCassandra", new SentimentAnalysisToCassandraBolt(topologyConfig), 4)

        // divide sentiment by hashtag
        topology.setBolt("hashtagSplitter", new HashtagSplitterBolt(), 4)

        // persist hashtags to Cassandra
        topology.setBolt("hashtagCounter", new HashtagCounterBolt(), 4)
                .fieldsGrouping("hashtagSplitter", new Fields("tweet_hashtag"));

        topology.setBolt("topHashtag", new TopHashtagBolt())

        topology.setBolt("topHashtagToCassandra", new TopHashtagToCassandraBolt(topologyConfig), 4)

    private void buildAndSubmit() throws Exception {
        TopologyBuilder builder = new TopologyBuilder();

        Config config = new Config();

        //set producer properties
        Properties props = new Properties();
        props.put("", topologyConfig.getProperty(""));
        props.put("request.required.acks", "1");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        config.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);

        StormSubmitter.submitTopology("twitter-processor", config, builder.createTopology());

    public static void main(String[] args) throws Exception {
        String configFileLocation = args[0];

        TwitterProcessorTopology topology = new TwitterProcessorTopology(configFileLocation);

Read more »


Analysis of twitter streams with Kafka and Storm

Following my last post, I will present a real-time processing sample with Kafka and Storm using the Twitter Streaming API.


Twitter Streaming

The solution consists of the following:

  • twitter-kafka-producer: A very basic producer that reads tweets from the Twitter Streaming API and stores them in Kafka.
  • twitter-storm-topology: A Storm topology that reads tweets from Kafka and, after applying filtering and sanitization, process the messages in parallel for:
    • Sentiment Analysis: Using a sentiment analysis algorithm to classify the tweet into a positive or negative feeling.
    • Top Hashtags: Calculates the top 20 hashtags using a sliding window.


Storm Topology

Twitter Topology

The Storm topology consist of the following elements:

  • Kafka Spout: The spout implementation to read messages from Kafka.
  • Filtering: Filtering out all non-english language tweets.
  • Sanitization: Text normalization in order to be processed properly by the sentiment analysis algorithm.
  • Sentiment Analysis: The algorithm that analyses word by word the text of the tweet, giving a value between -1 to 1.
  • Sentiment Analysis to Cassandra: Stores the tweets and its sentiment value in Cassandra.
  • Hashtag Splitter: Splits the different hashtags appearing in a tweet.
  • Hashtag Counter: Counts hashtag occurrences.
  • Top Hashtag: Does a ranking of the top 20 hashtags given a sliding windows (using the Tick Tuple feature from Storm).
  • Top Hashtag to Cassandra: Stores the top 20 hashtags in Cassandra.



In this post we have seen the benefits of using Apache Kafka & Apache Storm to ingest and process streams of data, on next posts will look at the implementation details and will provide some analytical insight from the data stored in Cassandra.

The sample can be found on Github:,