Real-Time Kafka Streaming: Twitter Hashtags #what #is #happening ?

Hello, I’m back! I know, I know, my last blog was from exactly 3 years ago. But it’s never too late right?

Streaming has always been an area that I was interested in, but I’ve never used it too much in the past. Today is the day! Let’s set up Kafka step by step, and make a program to print out some hashtags that are being mentioned on Twitter, right now.

apache-kafkaImage result for maven twitter_logo.png

Heads-up: I’m a Mac user, and am only running this program locally. The learning goal is to understand the work flow for Kafka and streaming project, not to get a scalable solution for some big data analysis.

Full source code: https://github.com/funjo/kafka_twitter_stream

Step 1: Install Kafka

I installed it through homebrew. You can also download the source code if you prefer.

fangzhou$ brew install kafka

Step 2: Start the server

From the homebrew installation, I got the following message. Apparently Kafka uses ZooKeeper server so you have to first start ZooKeeper then Kafka. I used the one command brew suggested.

==> Caveats
==> zookeeper
To have launchd start zookeeper now and restart at login:
brew services start zookeeper
Or, if you don’t want/need a background service you can just run:
zkServer start
==> kafka
To have launchd start kafka now and restart at login:
brew services start kafka
Or, if you don’t want/need a background service you can just run:
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties

fangzhou$zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties

Step 3. Create a topic

fangzhou$ kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

To make sure the topic is created successfully, do a list command.

fangzhou$ kafka-topics --list --zookeeper localhost:2181

To delete the existing topic:

fangzhou$ kafka-topics --zookeeper localhost:2181 --delete --topic test

Step 4. (Optional) Send some messages

This step is not essential to this project, since we will not be sending messages to the Kafka producer ourselves (Twitter streaming API will). But just for the fun of it, you can try using the Kafka built-in command line client to send some messages to the Kafka cluster. Here is what I did:

One-off piece: I had to add two lines to the properties file (/usr/local/etc/kafka/server.properties) to make sure the port is correctly set otherwise the program will give me this error: {test=LEADER_NOT_AVAILABLE}.

port = 9092
advertised.host.name = localhost

After editing the properties file, you need to restart the kafka server repeating Step 1. Then the following should work:

fangzhou$ kafka-console-producer --broker-list localhost:9092 --topic test
>Hello
>It's me
>I was wondering if after all these years you'd like to meet
>:p

Open another shell tab:

fangzhou$ kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
Hello
It's me
I was wondering if after all these years you'd like to meet
:p

If you keep sending messages to the Producer, the Consumer will also keep printing out your input in real time. Fun! Isn’t it? Finally we can talk to ourselves programmatically now.

Step 5. Create a Java Maven project and set up the pom.xml file

Code in Xml has some display problem on this site, please check out in my repo instead: https://github.com/funjo/kafka_twitter_stream/blob/master/pom.xml

Step 6. Create a class using KafkaProducer class


import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import twitter4j.*;
import twitter4j.conf.ConfigurationBuilder;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

/**
 * A Kafka Producer that gets tweets on certain keywords from twitter datasource and publishes to a kafka topic
 *
 * Arguments:       ...
 * 	- Twitter consumer key
 *   	- Twitter consumer secret
 * 	- Twitter access token
 * 	- Twitter access token secret
 * 		- The kafka topic to subscribe to
 * 		- The keyword to filter tweets
 * 		- Any number of keywords to filter tweets
 *
 * Original source: https://stdatalabs.com/2016/09/spark-streaming-part-3-real-time/
 * Modified by: Fangzhou Cheng
 */

public class kafkaTwitterStream {
    public static void main(String[] args) throws Exception {
        final LinkedBlockingQueue queue = new LinkedBlockingQueue(1000);

        if (args.length < 4) {
            System.out.println(
                    "Usage: KafkaTwitterProducer      ");
            return;
        }

        String consumerKey = args[0];
        String consumerSecret = args[1];
        String accessToken = args[2];
        String accessTokenSecret = args[3];
        String topicName = args[4];
        String[] arguments = args.clone();
        String[] keyWords = Arrays.copyOfRange(arguments, 5, arguments.length);

        // Set twitter oAuth tokens in the configuration
        ConfigurationBuilder cb = new ConfigurationBuilder();
        cb.setDebugEnabled(true).setOAuthConsumerKey(consumerKey).setOAuthConsumerSecret(consumerSecret)
                .setOAuthAccessToken(accessToken).setOAuthAccessTokenSecret(accessTokenSecret);

        // Create twitterstream using the configuration
        TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
        StatusListener listener = new StatusListener() {

            @Override
            public void onStatus(Status status) {
                queue.offer(status);
            }

            @Override
            public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
                System.out.println("Got a status deletion notice id:" + statusDeletionNotice.getStatusId());
            }

            @Override
            public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
                System.out.println("Got track limitation notice:" + numberOfLimitedStatuses);
            }

            @Override
            public void onScrubGeo(long userId, long upToStatusId) {
                System.out.println("Got scrub_geo event userId:" + userId + "upToStatusId:" + upToStatusId);
            }

            @Override
            public void onStallWarning(StallWarning warning) {
                System.out.println("Got stall warning:" + warning);
            }

            @Override
            public void onException(Exception ex) {
                ex.printStackTrace();
            }
        };
        twitterStream.addListener(listener);

        // Filter keywords
        FilterQuery query = new FilterQuery().track(keyWords);
        twitterStream.filter(query);

        // Add Kafka producer config settings
        Properties props = new Properties();
        props.put("metadata.broker.list", "localhost:9092");
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer producer = new KafkaProducer(props);
        int i = 0;

        // poll for new tweets in the queue. If new tweets are added, send them to the topic
        while (true) {
            Status ret = queue.poll();
            if (ret == null) {
                Thread.sleep(100);
            } else {
                for (HashtagEntity hashtage : ret.getHashtagEntities()) {
                    System.out.println("Hashtag: " + hashtage.getText());
                    producer.send(new ProducerRecord(topicName, Integer.toString(i++), ret.getText()));
                }
            }
        }
    }
}

Step 7. Compile and run the java application

If you don’t have Maven installed yet, you should also get one. I also installed it from homebrew by running command `brew install maven`.

To run the application, you’ll also need your Twitter consumer key and secret, access token and secret. If you don’t have one, go apply for a new one here https://dev.twitter.com/apps/new. In my case, I’ve already applied for an app before with my other projects so I’ll just use it ;).

fangzhou$ mvn compile
fangzhou$ mvn exec:java -Dexec.mainClass=kafkaTwitterStream -Dexec.args="twitter-consumer-key twitter-consumer-secret twitter-access-token twitter-access-token-secret topic-name twitter-search-keywords"

Step 8. Enjoy watching the live result

For example, this is probably 2 seconds’ results for Twitter on September 21, 2018 with search keyword “theresa may”:

Screen Shot 2018-09-21 at 18.28.52.png

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s