·7 min read

Sentiment Analysis on Reddit using OpenAI (ChatGPT) and Upstash Kafka

Fahreddin OzcanFahreddin OzcanSoftware Engineer @Upstash

In this blog post, we will build an analysis app that utilizes sentiment analysis to evaluate social media posts. The app will send notifications to Slack whenever a new post containing a specific keyword is published. To build this tool, we will use Upstash Kafka, the Reddit API and the OpenAI API.

Project Description

The Kafka producer will send search requests for a specific keyword to the Reddit API every hour. Whenever a new post becomes available, the producer will extract the data and send it to the Upstash Kafka cluster.

The cluster will then trigger the consumer, which will retrieve the messages sent by the producer. The consumer will send the data to the OpenAI API with an appropriate prompt that instructs ChatGPT to analyze the content of the post and convert it into a notification format. The result of this analysis will then be sent to a Slack channel as a notification, using the Slack API.

diagram

Getting Started

Creating Upstash Kafka Cluster

First of all, we'll set up the Upstash Kafka. Navigate to Upstash Console and login. In the Kafka section, click Create Cluster button. Fill in the name and region fields, then create your first topic. You are all set!

Reddit API Request

In order to send requests to Reddit API, we first have to sign in to Reddit.Then go to your app preferences and click the Create app or Create another app button. Fill out the form like so:

reddit-app

When the app is created, create a new python file named producer.py and copy/paste the CLIENT_ID and CLIENT_SECRET fields to this file from the Reddit app preferences.

For the Reddit API requests, we'll use praw library, which is Python Reddit API wrapper. It'll enable us the make the queries easier. Here's the example code for the requests:

CLIENT_ID = <YOUR_CLIENT_ID>
CLIENT_SECRET= <YOUR_CLIENT_SECRET>
USER_AGENT="social-media-analysis by "+ <YOUR_REDDIT_USERNAME>
 
reddit = praw.Reddit(client_id=CLIENT_ID,
                     client_secret=CLIENT_SECRET,
                     user_agent=USER_AGENT)
 
def get_reddit_posts(subreddit_query, query):
    posts = []
    subreddit = reddit.subreddit(subreddit_query)
    for submission in subreddit.search(query, limit=5):
        if submission.selftext == "":
            continue
        posts.append({
            "subreddit": subreddit_query,
            "query": query,
            "author": submission.author.name,
            "title": submission.title,
            "text": submission.selftext,
            "url": submission.url
        })
    return posts
CLIENT_ID = <YOUR_CLIENT_ID>
CLIENT_SECRET= <YOUR_CLIENT_SECRET>
USER_AGENT="social-media-analysis by "+ <YOUR_REDDIT_USERNAME>
 
reddit = praw.Reddit(client_id=CLIENT_ID,
                     client_secret=CLIENT_SECRET,
                     user_agent=USER_AGENT)
 
def get_reddit_posts(subreddit_query, query):
    posts = []
    subreddit = reddit.subreddit(subreddit_query)
    for submission in subreddit.search(query, limit=5):
        if submission.selftext == "":
            continue
        posts.append({
            "subreddit": subreddit_query,
            "query": query,
            "author": submission.author.name,
            "title": submission.title,
            "text": submission.selftext,
            "url": submission.url
        })
    return posts

With a given subreddit_query and query parameters, the get_reddit_posts function will return the recent posts with author, title, text and url information. With that, we're now able to make requests to Reddit API. Next, we'll send the post data to Upstash Kafka Cluster.

Upstash Kafka Producer

In order to use Kafka Producers, you have to install the kafka-python library. You can do that by typing in the command below in the terminal.

pip install kafka-python
pip install kafka-python

NOTE: Make sure that you've installed the kafka-python library, not the deprecated kafka library. That's a possible error one may face.

We have already created a cluster and a topic on Upstash Kafka. To proceed further, please go to the Upstash Console and navigate to the Connect to your cluster section. Once there, copy the code snippet from the Producer section and paste it into your producer.py file.

from kafka import KafkaProducer
 
producer = KafkaProducer(
  bootstrap_servers=[<YOUR_BOOTSTRAP_ENDPOINT>],
  sasl_mechanism='SCRAM-SHA-256',
  security_protocol='SASL_SSL',
  sasl_plain_username=<YOUR_SASL_USERNAME>,
  sasl_plain_password=<YOUR_SASL_PASSWORD>,
)
from kafka import KafkaProducer
 
producer = KafkaProducer(
  bootstrap_servers=[<YOUR_BOOTSTRAP_ENDPOINT>],
  sasl_mechanism='SCRAM-SHA-256',
  security_protocol='SASL_SSL',
  sasl_plain_username=<YOUR_SASL_USERNAME>,
  sasl_plain_password=<YOUR_SASL_PASSWORD>,
)

"Once you have completed the previous step, the only thing left to do is to make the requests and send them to the Kafka cluster. When sending a message, it is important to specify the topic. Copy the code snippet below and add it to the end of your producer.py file."

posts=get_reddit_posts("all", "upstash")
 
for post in posts:
    producer.send('upstash', bytes(str(post), 'utf-8'))
 
producer.close()
posts=get_reddit_posts("all", "upstash")
 
for post in posts:
    producer.send('upstash', bytes(str(post), 'utf-8'))
 
producer.close()

After you run the producer.py file, you can see the message analytics on Upstash Console and verify that the messages are actually produced and sent to the queue.

console analytics

Upstash Kafka Consumer

Creating a consumer client is similar to creating a producer client. First, create a consumer.py file in the same folder as producer.py. Then copy/paste the Consumer code snippet to consumer.py from Connect to your cluster section in the Upstash Console

consumer.py
from kafka import KafkaConsumer
 
consumer = KafkaConsumer(
  bootstrap_servers=[<YOUR_BOOTSTRAP_ENDPOINT>],
  sasl_mechanism='SCRAM-SHA-256',
  security_protocol='SASL_SSL',
  sasl_plain_username=<YOUR_SASL_USERNAME>,
  sasl_plain_password=<YOUR_SASL_PASSWORD>,
  group_id='$GROUP_NAME',
  auto_offset_reset='earliest',
)
consumer.py
from kafka import KafkaConsumer
 
consumer = KafkaConsumer(
  bootstrap_servers=[<YOUR_BOOTSTRAP_ENDPOINT>],
  sasl_mechanism='SCRAM-SHA-256',
  security_protocol='SASL_SSL',
  sasl_plain_username=<YOUR_SASL_USERNAME>,
  sasl_plain_password=<YOUR_SASL_PASSWORD>,
  group_id='$GROUP_NAME',
  auto_offset_reset='earliest',
)

After creating consumer client, we'll complete the methods for OpenAI request, and Slack notification request.

OpenAI API Request

First, go to OpenAI platform, create an application and get an API key after you sign in. You may need to update your payment information.

chatgpt api key

Before making the request to ChatGPT, we must decide on a clear and unambiguous prompt that conveys our purpose. As an example, I have provided a prompt below for you to use. Please copy and paste it into your consumer.py file

consumer.py
prompt="I want you to act as a notification bot that works with Slack API. I'll provide Reddit posts about a specific keyword and I expect you to apply sentiment analysis to the text and return it as a notification. The analysis should reflect the thoughts, problems, and feedbacks of the user in a concise comment length. Avoid using first-person or the word Notification: in the beginning. Instead, write the notification in an easy-to-read format, indicating that the post is about the topic or keyword mentioned.
Here's an example:
TechEnthusiast21 posted about Upstash, mentioning that it's a game changer for cloud databases. They were impressed with the ease of use and speed of the database, which led to a significant improvement in their application's performance. They highly recommend Upstash for anyone looking for a fast and reliable cloud database."
consumer.py
prompt="I want you to act as a notification bot that works with Slack API. I'll provide Reddit posts about a specific keyword and I expect you to apply sentiment analysis to the text and return it as a notification. The analysis should reflect the thoughts, problems, and feedbacks of the user in a concise comment length. Avoid using first-person or the word Notification: in the beginning. Instead, write the notification in an easy-to-read format, indicating that the post is about the topic or keyword mentioned.
Here's an example:
TechEnthusiast21 posted about Upstash, mentioning that it's a game changer for cloud databases. They were impressed with the ease of use and speed of the database, which led to a significant improvement in their application's performance. They highly recommend Upstash for anyone looking for a fast and reliable cloud database."

Finally, we'll create the analyze_post method which will apply sentiment analysis on the Reddit posts.

consumer.py
api_key=<YOUR_OPENAI_API_KEY>
 
def analyze_tweet(post):
    response = openai.ChatCompletion.create(
        api_key=api_key,
        model="gpt-3.5-turbo",
        messages=[{"role":"user","content": prompt+"\n"+\
        f"The subreddit: {post['subreddit']}\n\
        The query: {post['query']}\n\
        The authors:
  -  {post['author']}\n\
        The title: {post['title']}\n\
        The post text: {post['text']}\n"}])
    return response.choices[0].message.content
consumer.py
api_key=<YOUR_OPENAI_API_KEY>
 
def analyze_tweet(post):
    response = openai.ChatCompletion.create(
        api_key=api_key,
        model="gpt-3.5-turbo",
        messages=[{"role":"user","content": prompt+"\n"+\
        f"The subreddit: {post['subreddit']}\n\
        The query: {post['query']}\n\
        The authors:
  -  {post['author']}\n\
        The title: {post['title']}\n\
        The post text: {post['text']}\n"}])
    return response.choices[0].message.content

Here's a demonstration on how this prompt and its response would look like:

chatgpt prompt

Our OpenAI chat works just fine now, but we have to work on the Slack notification app before the final touch.

Slack API Request

To get started, go to the Slack API Console and navigate to the Your apps section. Click the Create New App button and provide an app name, then choose the workspace for your notification bot.

In the Slack app dashboard, navigate to the OAuth & Permissions section in the sidebar. Under the Scopes section, add the chat:write scope to your bot as a Bot Token Scope.

Now, you can add your bot to a channel in Slack by right-clicking on it from the sidebar. Your Slack bot is now ready to send notifications to the channel, but we first need to configure the backend.

To do this, open your consumer.py file and import the necessary libraries. Then, initialize the Slack client at the beginning of the file

from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
 
client = WebClient(token=<YOUR_SLACK_API_KEY>)
channel_id = <YOUR_CHANNEL_ID>
logger = logging.getLogger(__name__)
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
 
client = WebClient(token=<YOUR_SLACK_API_KEY>)
channel_id = <YOUR_CHANNEL_ID>
logger = logging.getLogger(__name__)

You may find your channel_id from Right click on the channel > View channel details.

Now that all parts of the consumer is ready, it's time to assemble them with a final touch.

Final Touch

Before finalizing the project, we need to complete several tasks. These include pulling messages from the Upstash cluster, sending them to OpenAI for sentiment analysis, and then sending the result of the analysis to Slack as a notification. We have already configured the necessary clients, and here's how we will perform these operations.

consumer.subscribe(['upstash'])
 
for message in consumer:
    data=message.value.decode('utf-8')
    post = ast.literal_eval(data)
    analysis=analyze_tweet(post)
 
    try:
        analysis+=f"\n\nHere's the link to the post: {post['url']}"
        result=client.chat_postMessage(channel=channel_id,text=analysis)
 
    except SlackApiError as e:
        logger.error(f"Error posting message: {e}")
 
consumer.close()
consumer.subscribe(['upstash'])
 
for message in consumer:
    data=message.value.decode('utf-8')
    post = ast.literal_eval(data)
    analysis=analyze_tweet(post)
 
    try:
        analysis+=f"\n\nHere's the link to the post: {post['url']}"
        result=client.chat_postMessage(channel=channel_id,text=analysis)
 
    except SlackApiError as e:
        logger.error(f"Error posting message: {e}")
 
consumer.close()

After running the producer.py file, run the consumer.py file and you'll see that notifications are being sent to Slack channel.

Last Words

In conclusion, this project has provided valuable experience in integrating Upstash Kafka with third-party APIs. By leveraging the power of OpenAI, we were able to perform sentiment analysis on incoming messages and send the results to Slack as notifications.

To further enhance the analysis results, you can experiment with different OpenAI prompts or try different approaches to text preprocessing. Additionally, you can explore other integrations with Upstash Kafka, such as connecting to other messaging platforms or data sources.

I hope you found this blog informative and useful for your own projects. Thank you for reading!