·5 min read

Stream Twitter to Serverless Kafka

Nihan ImanNihan ImanSoftware Developer (Guest Author)

Upstash offers serverless Kafka topics. In this blog post, we are going to demonstrate how to publish to and read from a Kafka topic with only a few clicks using Upstash. The complete project code can be found at here

We are going to use an AWS Lambda function to periodically invoke Twitter search api. The function is going to look for tweets containing a keyword we choose and publish those tweets to Upstash's serverless Kafka offering. Once the tweets are stored in Kafka, you can process them with multiple consumers or copy them to a data store using a connector.

Creating the project

Let us start by creating a simple maven project. We will use Twittered Java library for Twitter API access, Kafka-clients for Kafka, Log4j for logging utility and AWS Lambda for serverless compute service. Here are our dependencies,

<dependency>
    <groupId>io.github.redouane59.twitter</groupId>
    <artifactId>twittered</artifactId>
    <version>2.16</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-slf4j-impl</artifactId>
    <version>2.17.1</version>
</dependency>
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>aws-lambda-java-core</artifactId>
    <version>1.2.1</version>
</dependency>
<dependency>
    <groupId>io.github.redouane59.twitter</groupId>
    <artifactId>twittered</artifactId>
    <version>2.16</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-slf4j-impl</artifactId>
    <version>2.17.1</version>
</dependency>
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>aws-lambda-java-core</artifactId>
    <version>1.2.1</version>
</dependency>

Create Twitter Developer Account

First, we need to create user credentials to use Twitter API. Sign up for a developer account at Twitter Developer Portal. Sign in to developer console. Give a name to your app. We will use bearer token for credentials.

TwitterProducer class is responsible for searching Twitter publishing tweets to Kafka.

public TwitterProducer(String keyword){
  bearerToken = System.getenv("BEARER_TOKEN");
  twitterClient = new TwitterClient(TwitterCredentials.builder().
            bearerToken(bearerToken)
            .build());
 
  setRule(keyword);
}
public TwitterProducer(String keyword){
  bearerToken = System.getenv("BEARER_TOKEN");
  twitterClient = new TwitterClient(TwitterCredentials.builder().
            bearerToken(bearerToken)
            .build());
 
  setRule(keyword);
}

We are going to schedule our lambda function to run every 15 minutes. So let us search only tweets posted in the last 15 minutes.

 public void run(){
    ...
 
    TweetList result = twitterClient.searchTweets(keyword, AdditionalParameters.builder()
            .startTime(ConverterHelper.minutesBeforeNow(15))
                    .recursiveCall(false)
                    .build());
    List<TweetV2.TweetData> tweetDataList = result.getData();
    ...
}
 public void run(){
    ...
 
    TweetList result = twitterClient.searchTweets(keyword, AdditionalParameters.builder()
            .startTime(ConverterHelper.minutesBeforeNow(15))
                    .recursiveCall(false)
                    .build());
    List<TweetV2.TweetData> tweetDataList = result.getData();
    ...
}

Kafka Producer

Now we want to publish these tweets to a Kafka topic. We need to sign up Upstash console here. Lets create Kafka cluster in Upstash. You can find instructions here. Upstash Console already gives us the code to connect to the cluster. I just copied below connection piece off Upstash Console. We define bootstrap servers, Kafka username and Kafka password as environment variables.

private KafkaProducer<String, String> createKafkaProducer() {
    // Create producer properties
    // This block is provided in Upstash UI
    Properties props = new Properties();
    props.put("bootstrap.servers", kafkaBootstrapServers);
    props.put("sasl.mechanism", "SCRAM-SHA-256");
    props.put("security.protocol", "SASL_SSL");
    props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"" +
                REPLACE + "\" password=\"" + REPLACE + "\";");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
    // Create producer
    return new KafkaProducer<String, String>(props);
}
private KafkaProducer<String, String> createKafkaProducer() {
    // Create producer properties
    // This block is provided in Upstash UI
    Properties props = new Properties();
    props.put("bootstrap.servers", kafkaBootstrapServers);
    props.put("sasl.mechanism", "SCRAM-SHA-256");
    props.put("security.protocol", "SASL_SSL");
    props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"" +
                REPLACE + "\" password=\"" + REPLACE + "\";");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
    // Create producer
    return new KafkaProducer<String, String>(props);
}

Feed the tweets into the Kafka producer.

private void run() {
 
    ...
 
    // Kafka Producer
    producer = createKafkaProducer();
 
    for (Iterator i = tweetDataList.iterator(); i.hasNext(); ){
        TweetV2.TweetData tweet = (TweetV2.TweetData)i.next();
        LOGGER.info("Tweet Data: " + tweet.getText());
        producer.send(new ProducerRecord<>("myTwitterTopic", null, tweet.getText())).get();
    }
}
private void run() {
 
    ...
 
    // Kafka Producer
    producer = createKafkaProducer();
 
    for (Iterator i = tweetDataList.iterator(); i.hasNext(); ){
        TweetV2.TweetData tweet = (TweetV2.TweetData)i.next();
        LOGGER.info("Tweet Data: " + tweet.getText());
        producer.send(new ProducerRecord<>("myTwitterTopic", null, tweet.getText())).get();
    }
}

Setting up with AWS Lambda

AWS can trigger user defined functions based on external events or periodically. We will just schedule our function to run every 15 seconds. We need to provide a class implementing AWS Lambda's RequestHandler interface. Our TweetStreamApp class implements RequestHandler interface. All we need to do is trigger our TwitterProducer object in handleRequest method.

public class TweetStreamApp implements RequestHandler {
    @Override
    public String handleRequest(Object input, Context context) {
        TwitterProducer twitterProducer = new TwitterProducer("morning");
        twitterProducer.run();
        return null;
    }
}
public class TweetStreamApp implements RequestHandler {
    @Override
    public String handleRequest(Object input, Context context) {
        TwitterProducer twitterProducer = new TwitterProducer("morning");
        twitterProducer.run();
        return null;
    }
}

We have all the parts that we need for our project. Let's build a jar archive containing all the dependencies. Simply invoke mvn package.

We need an AWS account. Create AWS Lambda function in AWS UI. We are going to make some configuration. Configure "Handler" parameter as tweetStream.TweetStreamApp::handleRequest.

Add environment variables under configuration part.

Configure Timeouts: 30 sec, Memory: 512 MB and Ephemeral storage: 512 MB.

Add a schedule rule in Amazon EventBridge to schedule our function every 15 minutes.

Add a EventBridge trigger for our lambda function. Pick the schedule rule that we created in the previous step.

Our function will be triggered every 15 minutes now. We can see the logs in CloudWatch.

Let's check Upstash console and verify that our Kafka topic is receiving messages.

Lets write a simple Kafka consumer to verify that we can consume all these tweets.

public static void main(String[] args) throws Exception {
    ...
 
    Properties props = new Properties();
    props.put("bootstrap.servers", kafkaBootstrapServers);
    props.put("sasl.mechanism", "SCRAM-SHA-256");
    props.put("security.protocol", "SASL_SSL");
    props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"" +
            REPLACE + "\" password=\"" + REPLACE + "\";");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("auto.offset.reset", "earliest");
    props.put("group.id", "myGroupName");
 
    try(Consumer<String, String> consumer = new KafkaConsumer<String, String>(props)) {
        Set<String> topics = new HashSet<>();
        topics.add("myTwitterTopic");
        consumer.subscribe(topics);
        do {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            for (Iterator<ConsumerRecord<String, String>> iter = records.iterator(); iter.hasNext(); ) {
                ConsumerRecord<String, String> record = iter.next();
                LOGGER.info(record.key() + ":" + record.value());
            }
        } while (true);
    }
}
 
public static void main(String[] args) throws Exception {
    ...
 
    Properties props = new Properties();
    props.put("bootstrap.servers", kafkaBootstrapServers);
    props.put("sasl.mechanism", "SCRAM-SHA-256");
    props.put("security.protocol", "SASL_SSL");
    props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"" +
            REPLACE + "\" password=\"" + REPLACE + "\";");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("auto.offset.reset", "earliest");
    props.put("group.id", "myGroupName");
 
    try(Consumer<String, String> consumer = new KafkaConsumer<String, String>(props)) {
        Set<String> topics = new HashSet<>();
        topics.add("myTwitterTopic");
        consumer.subscribe(topics);
        do {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            for (Iterator<ConsumerRecord<String, String>> iter = records.iterator(); iter.hasNext(); ) {
                ConsumerRecord<String, String> record = iter.next();
                LOGGER.info(record.key() + ":" + record.value());
            }
        } while (true);
    }
}
 

It is working :)

Conclusion

We wrote a periodic job to search Twitter and save relevant tweets to Kafka entirely serverless. Management of our Kafka cluster is handled by Upstash. We only copied credentials from Upstash Console and started publishing records.