Skip to Content

blog post

Preview of Kafka Streams

The preview of Kafka Streams, which is one of the main features of the upcoming Apache Kafka 0.10, was announced by Jay Kreps this week.

Kafka joins the Stream Processing club

Kafka Streams is a library to build streaming applications using Kafka topics as input/output. Kafka Streams is in the same league as other streaming systems such as: Apache Storm, Apache Flink and, not surprisingly, Apache Samza which also uses Kafka for input or output of streams.

One of the main advantages is that if you’re already using Apache Kafka and you need real-time processing, you just need to use the library and you are good to go.
Other important features are: stateful processing, windowing and ability to be deployed using your preferred solution: a simple command line, Mesos, YARN or kubernetes and docker if you’re a container party boy.

Streams and Tables

One of the key concepts in Kafka Streams is the support of KStream and KTable.
That isn’t a new concept if you come from the Event Sourcing world: the KStream is the append-only event store where its state is given by replaying the events from the beginning of time until the last event whereas KTable is the snapshot or projection of the current state of the stream given a point in time.

Example: Twitter Hashtags Job

KafkaStreams

Kafka Streams: KStream and KTable

 

Show me the code!

You can find the complete example here: https://github.com/mserrate/kafka-streams-app

KStream<String, JsonNode> source = builder.stream(stringDeserializer, jsonDeserializer, "streams-hashtag-input");

KTable<String, Long> counts = source
        .filter(new HashtagFilter())
        .flatMapValues(new HashtagSplitter())
        .map(new HashtagMapper())
        .countByKey(stringSerializer, longSerializer, stringDeserializer, longDeserializer, "Counts");

counts.to("streams-hashtag-count-output", stringSerializer, longSerializer);

For this example I’ve been using a simple TweetProducer who connects to the Twitter Streaming API and sends JSON events to a Kafka topic.
This topic is read as a KStream and then we begin the process:

  1. Filter out the tweets without hashtags
  2. Apply a flatMapValues (we are just interested in the values, not the keys) to split the different hashtags in a tweet
  3. Apply a map to return a key (hashtag) value (hashtag) as we want to aggregate by hashtag
  4. Aggregate the streams per key (the hashtag) and count them

 

Finally we send the KTable to the output queue.

blog post

Sentiment analysis of tweets

In the previous post I have presented an overview of the topology used to analyse twitter streams with Kafka and Storm. Now it’s time to cover the technical details of the twitter topology.

Twitter Topology

The declaration of the storm topology using KafkaSpout to read the tweets from a kafka queue:

public class TwitterProcessorTopology extends BaseTopology {

    public TwitterProcessorTopology(String configFileLocation) throws Exception {
        super(configFileLocation);
    }

    private void configureKafkaSpout(TopologyBuilder topology) {
        BrokerHosts hosts = new ZkHosts(topologyConfig.getProperty("zookeeper.host"));

        SpoutConfig spoutConfig = new SpoutConfig(
                hosts,
                topologyConfig.getProperty("kafka.twitter.raw.topic"),
                topologyConfig.getProperty("kafka.zkRoot"),
                topologyConfig.getProperty("kafka.consumer.group"));
        spoutConfig.scheme= new SchemeAsMultiScheme(new StringScheme());

        KafkaSpout kafkaSpout= new KafkaSpout(spoutConfig);
        topology.setSpout("twitterSpout", kafkaSpout);
    }

    private void configureBolts(TopologyBuilder topology) {
        // filtering
        topology.setBolt("twitterFilter", new TwitterFilterBolt(), 4)
                .shuffleGrouping("twitterSpout");

        // sanitization
        topology.setBolt("textSanitization", new TextSanitizationBolt(), 4)
                .shuffleGrouping("twitterFilter");

        // sentiment analysis
        topology.setBolt("sentimentAnalysis", new SentimentAnalysisBolt(), 4)
                .shuffleGrouping("textSanitization");

        // persist tweets with analysis to Cassandra
        topology.setBolt("sentimentAnalysisToCassandra", new SentimentAnalysisToCassandraBolt(topologyConfig), 4)
                .shuffleGrouping("sentimentAnalysis");

        // divide sentiment by hashtag
        topology.setBolt("hashtagSplitter", new HashtagSplitterBolt(), 4)
                .shuffleGrouping("textSanitization");

        // persist hashtags to Cassandra
        topology.setBolt("hashtagCounter", new HashtagCounterBolt(), 4)
                .fieldsGrouping("hashtagSplitter", new Fields("tweet_hashtag"));

        topology.setBolt("topHashtag", new TopHashtagBolt())
                .globalGrouping("hashtagCounter");

        topology.setBolt("topHashtagToCassandra", new TopHashtagToCassandraBolt(topologyConfig), 4)
                .shuffleGrouping("topHashtag");
    }

    private void buildAndSubmit() throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        configureKafkaSpout(builder);
        configureBolts(builder);

        Config config = new Config();

        //set producer properties
        Properties props = new Properties();
        props.put("metadata.broker.list", topologyConfig.getProperty("kafka.broker.list"));
        props.put("request.required.acks", "1");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        config.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);

        StormSubmitter.submitTopology("twitter-processor", config, builder.createTopology());
    }

    public static void main(String[] args) throws Exception {
        String configFileLocation = args[0];

        TwitterProcessorTopology topology = new TwitterProcessorTopology(configFileLocation);
        topology.buildAndSubmit();
    }
}

Read more »

blog post

Analysis of twitter streams with Kafka and Storm

Following my last post, I will present a real-time processing sample with Kafka and Storm using the Twitter Streaming API.

Overview

Twitter Streaming

The solution consists of the following:

  • twitter-kafka-producer: A very basic producer that reads tweets from the Twitter Streaming API and stores them in Kafka.
  • twitter-storm-topology: A Storm topology that reads tweets from Kafka and, after applying filtering and sanitization, process the messages in parallel for:
    • Sentiment Analysis: Using a sentiment analysis algorithm to classify the tweet into a positive or negative feeling.
    • Top Hashtags: Calculates the top 20 hashtags using a sliding window.

 

Storm Topology

Twitter Topology

The Storm topology consist of the following elements:

  • Kafka Spout: The spout implementation to read messages from Kafka.
  • Filtering: Filtering out all non-english language tweets.
  • Sanitization: Text normalization in order to be processed properly by the sentiment analysis algorithm.
  • Sentiment Analysis: The algorithm that analyses word by word the text of the tweet, giving a value between -1 to 1.
  • Sentiment Analysis to Cassandra: Stores the tweets and its sentiment value in Cassandra.
  • Hashtag Splitter: Splits the different hashtags appearing in a tweet.
  • Hashtag Counter: Counts hashtag occurrences.
  • Top Hashtag: Does a ranking of the top 20 hashtags given a sliding windows (using the Tick Tuple feature from Storm).
  • Top Hashtag to Cassandra: Stores the top 20 hashtags in Cassandra.

 

Summary

In this post we have seen the benefits of using Apache Kafka & Apache Storm to ingest and process streams of data, on next posts will look at the implementation details and will provide some analytical insight from the data stored in Cassandra.

The sample can be found on Github: https://github.com/mserrate/twitter-streaming-app,

blog post

Big Data: streams and lambdas

I’ve been working for some years now in distributed systems and event-driven architectures, from the misunderstood SOA (or its refurbished version known as Microservices) to Event Sourcing.

Some of the concepts presented in these systems related to events like immutability, perpetuity and versioning are valid as well for stream processing. Stream processing along with batch processing is sometimes referred as Big Data.

Big Data

When we think about Big Data what it first comes to our mind is Hadoop for batch processing. Although Hadoop has a big capacity to process indecent amounts of data, it also comes with a high latency response.

Although this latency won’t be a problem for a lot of use cases, it may be a problem when we need to get real (or near-real) time feedback.

That’s where the Lambda Architecture (by Nathan Marz) comes in by describing how to design a system where most of our data is processed by the batch layer but, while this process is running, we are able to process the streams coming into our system:

Lambda architecture

Where we can say that:

Current View = Query(Batch View) + Query(Stream View)

Batch Layer

The batch processing layer computes arbitrary sets of data using the entire historical data. The obvious example of batch processing is Hadoop, or to be more precise, the distributed file system HDFS and a processing tool like MapReduce, Pig

The result of this process will be stored in a database that should support batch writes (ElephantDB, HBase) but no random writes. That makes the database architecture extremely simple by removing features like online compactation or concurrency.

Stream Layer

The stream processing layer computes data one by one giving immediate feedback. Depending on the number of events or the throughput needed we may use different technologies: Spark Streaming (although it’s micro-batch the latency may be sufficient for many use cases), Storm, Samza, Flink.

The result of this process will be stored in a database that should support random writes, one option may be Cassandra.

 

In following posts I will present concrete examples with docker images using some technologies that I’ve used like: Kafka, Storm, Cassandra and Druid.

blog post

Speaking at DotNetSpain conference

Too long without posting… Anyway, a short post to remember that I will be speaking at DotNetSpain 2015 about Complex Event Processing, Immutability and Projections with EventStore.

So, if you are interested come and say hi!

blog post

Webcast about SOA, DDD & CQRS with NServiceBus

I will be giving a Webcast tomorrow about SOA, DDD, CQRS with NServiceBus in Spanish. In this talk I will cover the DDD strategic design, bounded contexts and how to model domain logic through NServiceBus Sagas.

You can see the details in the following link:

blog post

NServiceBus: DRY with unobtrusive conventions

Many times when working with NServiceBus in unobtrusive mode you may feel that you are repeating the same conventions over and over again on all the endpoints.

The IWantToRunBeforeConfiguration interface is a great help in order to embrace the DRY principle.

Just define your implementation in an assembly referenced by all the endpoints:

public class UnobtrusiveConventions : IWantToRunBeforeConfiguration
{
    public void Init()
    {
        Configure.Instance
            .DefiningCommandsAs(t => t.Namespace != null
                && t.Namespace.EndsWith("Commands"))
            .DefiningEventsAs(t => t.Namespace != null
                && t.Namespace.EndsWith("Events"))
            .DefiningMessagesAs(t => t.Namespace != null
                && t.Namespace.EndsWith("Messages"));
    }
}

and NServiceBus will pick this class automatically for each endpoint.

blog post

NServiceBus Training

I will be giving the Udi Dahan‘s Enterprise Development with NServiceBus 4-day course in Spain at:

 

More info at: http://udidahan.com/2012/10/12/training-for-this-winter/

blog post

Cassandra on Azure CentOS VM

Having some fun with Cassandra lately I wanted to figure out how to setup a working environment on the new Windows Azure VM roles, so I decided to give a try and install a Cassandra cluster on CentOS.

Although it’s on Ubuntu, the following article is a good guide that helped me to configure a Linux cluster: https://www.windowsazure.com/en-us/manage/linux/other-resources/how-to-run-cassandra-with-linux/

We create the 1st VM assigning a pem certificate in order to get access by ssh:

Create VM

Read more »

blog post

UI Composition for Services

One of the most important concepts when applying either SOA or DDD is the definition of Services (or Bounded Contexts in the DDD lingo).

Each of these services will be responsible for its own data and behavior and could also own the UI components of that service.

Let’s see an example with the typical Ecommerce domain:

Read more »