·6 min read

Serverless Kafka Real World Example: Content Moderation

Enes AkarEnes AkarCofounder @Upstash

In this post, we will write a very basic example which showcases content moderation for web applications using:

  • Next.js
  • AWS Lambda
  • Serverless Kafka (Upstash)
  • Serverless Redis (Upstash)
  • Sightengine content moderation API

You can check the Github repo for the full example.

Scenario

Our application is a simple web page where users can submit comments. We want to filter inappropriate content before displaying it publicly. Here a screenshot from the demo.

content-moderation

When a user submits a comment, first we send it to a Kafka topic. The produced message triggers an AWS Lambda function. The AWS Lambda function decides whether the comment is inappropriate using the Sightengine API. If the content is clean, it is sent to the Redis list comments, otherwise it is sent to rejected-comments.

content-moderation

In the above algorithm, the user does not immediately see the comment they just posted. A possible improvement would be to only show the posted comment to its author, so the user experience will be smoother.

The Web Application

This is a basic Next.js application which sends the submitted content to an AWS Lambda function. It also loads the accepted and rejected comments from Redis.

index.js
import { useEffect, useState } from "react";
import Head from "next/head";
 
import styles from "../styles/Home.module.css";
 
export default function Home() {
  const [data, setData] = useState([]);
  const [data2, setData2] = useState([]);
  const [loading, setLoading] = useState(false);
 
  useEffect(() => {
    loadComments();
  }, []);
 
  const sendComment = async (event) => {
    event.preventDefault();
 
    const res = await fetch("/api/submit", {
      body: JSON.stringify({
        comment: event.target.comment.value,
      }),
      headers: {
        "Content-Type": "application/json",
      },
      method: "POST",
    });
 
    const result = await res.json();
    console.log(result);
    event.target.reset();
    loadComments();
  };
 
  const loadComments = async () => {
    setLoading(true);
    setData([]);
    setData2([]);
    const res = await fetch("/api/load");
 
    const result = await res.json();
    setData(result.comments);
    setData2(result.censored);
    setLoading(false);
    console.log(result);
  };
 
  return (
    <div className={styles.container}>
      <Head>
        <title>Create Next App</title>
        <meta name="description" content="Generated by create next app" />
        <link rel="icon" href="/favicon.ico" />
      </Head>
 
      <main className={styles.main}>
        <h4 className={styles.title}>
          Content Moderation
          <br />
          with Upstash Kafka
        </h4>
 
        <p className={styles.description}>
          Check the <a href={"#"}>blog post </a> for details.
        </p>
 
        <div className={styles.grid}>
          <div className={styles.div1}>
            <form onSubmit={sendComment}>
              <textarea
                name="comment"
                type="text"
                className={styles.textarea}
              />
              <br />
              <button type="submit" className={styles.submitbutton}>
                Submit
              </button>
            </form>
          </div>
 
          <div className={styles.div2}>
            <h3>Accepted Comments</h3>
 
            {loading ? (
              <div className={styles.loader}></div>
            ) : (
              data.map((item) => <p className={styles.comment}>{item}</p>)
            )}
          </div>
 
          <div className={styles.div2}>
            <h3>Censored Comments</h3>
            {loading ? (
              <div className={styles.loader}></div>
            ) : (
              data2.map((item) => <p className={styles.censored}>{item}</p>)
            )}
          </div>
        </div>
      </main>
 
      <footer className={styles.footer}>
        <a
          href="https://upstash.com?utm_source=content-moderation-demo"
          target="_blank"
          rel="noopener noreferrer"
        >
          Powered by <span className={styles.logo}> Upstash</span>
        </a>
      </footer>
    </div>
  );
}
index.js
import { useEffect, useState } from "react";
import Head from "next/head";
 
import styles from "../styles/Home.module.css";
 
export default function Home() {
  const [data, setData] = useState([]);
  const [data2, setData2] = useState([]);
  const [loading, setLoading] = useState(false);
 
  useEffect(() => {
    loadComments();
  }, []);
 
  const sendComment = async (event) => {
    event.preventDefault();
 
    const res = await fetch("/api/submit", {
      body: JSON.stringify({
        comment: event.target.comment.value,
      }),
      headers: {
        "Content-Type": "application/json",
      },
      method: "POST",
    });
 
    const result = await res.json();
    console.log(result);
    event.target.reset();
    loadComments();
  };
 
  const loadComments = async () => {
    setLoading(true);
    setData([]);
    setData2([]);
    const res = await fetch("/api/load");
 
    const result = await res.json();
    setData(result.comments);
    setData2(result.censored);
    setLoading(false);
    console.log(result);
  };
 
  return (
    <div className={styles.container}>
      <Head>
        <title>Create Next App</title>
        <meta name="description" content="Generated by create next app" />
        <link rel="icon" href="/favicon.ico" />
      </Head>
 
      <main className={styles.main}>
        <h4 className={styles.title}>
          Content Moderation
          <br />
          with Upstash Kafka
        </h4>
 
        <p className={styles.description}>
          Check the <a href={"#"}>blog post </a> for details.
        </p>
 
        <div className={styles.grid}>
          <div className={styles.div1}>
            <form onSubmit={sendComment}>
              <textarea
                name="comment"
                type="text"
                className={styles.textarea}
              />
              <br />
              <button type="submit" className={styles.submitbutton}>
                Submit
              </button>
            </form>
          </div>
 
          <div className={styles.div2}>
            <h3>Accepted Comments</h3>
 
            {loading ? (
              <div className={styles.loader}></div>
            ) : (
              data.map((item) => <p className={styles.comment}>{item}</p>)
            )}
          </div>
 
          <div className={styles.div2}>
            <h3>Censored Comments</h3>
            {loading ? (
              <div className={styles.loader}></div>
            ) : (
              data2.map((item) => <p className={styles.censored}>{item}</p>)
            )}
          </div>
        </div>
      </main>
 
      <footer className={styles.footer}>
        <a
          href="https://upstash.com?utm_source=content-moderation-demo"
          target="_blank"
          rel="noopener noreferrer"
        >
          Powered by <span className={styles.logo}> Upstash</span>
        </a>
      </footer>
    </div>
  );
}

We need two API routes. submit.js to send the comment to Kafka and load.js to load the comments from Redis. Create a Redis database and Kafka cluster using the Upstash console, then replace the Redis and Kafka urls and credentials below.

Submit

submit.js
import { Kafka } from "@upstash/kafka";
 
const kafka = new Kafka({
  url: "REPLACE_HERE",
  username: "REPLACE_HERE",
  password: "REPLACE_HERE",
});
 
export default async function handler(req, res) {
  const p = kafka.producer();
  const l = await p.produce("comments", req.body.comment);
  console.log(l);
  res.status(200).json(l);
}
submit.js
import { Kafka } from "@upstash/kafka";
 
const kafka = new Kafka({
  url: "REPLACE_HERE",
  username: "REPLACE_HERE",
  password: "REPLACE_HERE",
});
 
export default async function handler(req, res) {
  const p = kafka.producer();
  const l = await p.produce("comments", req.body.comment);
  console.log(l);
  res.status(200).json(l);
}

Load

load.js
import { Redis } from "@upstash/redis";
 
const redis = new Redis({
  url: "REPLACE_HERE",
  token: "REPLACE_HERE",
});
 
export default async function handler(req, res) {
  const comments = await redis.lrange("comments", 0, 100);
  const censored = await redis.lrange("rejected-comments", 0, 100);
  const result = { comments: comments, censored: censored };
  res.status(200).json(result);
}
load.js
import { Redis } from "@upstash/redis";
 
const redis = new Redis({
  url: "REPLACE_HERE",
  token: "REPLACE_HERE",
});
 
export default async function handler(req, res) {
  const comments = await redis.lrange("comments", 0, 100);
  const censored = await redis.lrange("rejected-comments", 0, 100);
  const result = { comments: comments, censored: censored };
  res.status(200).json(result);
}

The AWS Lambda Function

A Kafka topic will trigger this AWS Lambda function with each new comment. The function will call Sightengine API and depending on the response it will either push the comment to the comments list or rejected-comments list in the Redis. Here the required steps:

  • Create a Serverless function (select Node.js runtime) using AWS SAM or Serverless Framework.
  • Set your Kafka topic as trigger for the Lambda function as explained here.
  • Create a Sightengine account and get API keys.

Here the function code:

const axios = require("axios");
const FormData = require("form-data");
const { Redis } = require("@upstash/redis");
 
const redis = new Redis({
  url: "REPLACE_HERE",
  token: "REPLACE_HERE",
});
 
module.exports.consume = async (event) => {
  if (!event.records) {
    return { response: "no kafka event" };
  }
 
  for (let messages of Object.values(event.records)) {
    for (let msg of messages) {
      let buff = Buffer.from(msg.value, "base64");
      let text = buff.toString("ascii");
 
      let res = await filterContent(text);
      if (res === "none") {
        await redis.lpush("comments", text);
      } else {
        await redis.lpush("rejected-comments", text);
      }
    }
  }
  return { response: "success" };
};
 
async function filterContent(text) {
  let data = new FormData();
  data.append("text", text);
  data.append("lang", "en");
  data.append("mode", "standard");
  data.append("api_user", "REPLACE_HERE");
  data.append("api_secret", "REPLACE_HERE");
 
  let res = await axios({
    url: "https://api.sightengine.com/1.0/text/check.json",
    method: "post",
    data: data,
    headers: data.getHeaders(),
  });
  if (res.data.profanity.matches.length > 0) return "profanity";
  else if (res.data.personal.matches.length > 0) return "personal";
  else return "none";
}
const axios = require("axios");
const FormData = require("form-data");
const { Redis } = require("@upstash/redis");
 
const redis = new Redis({
  url: "REPLACE_HERE",
  token: "REPLACE_HERE",
});
 
module.exports.consume = async (event) => {
  if (!event.records) {
    return { response: "no kafka event" };
  }
 
  for (let messages of Object.values(event.records)) {
    for (let msg of messages) {
      let buff = Buffer.from(msg.value, "base64");
      let text = buff.toString("ascii");
 
      let res = await filterContent(text);
      if (res === "none") {
        await redis.lpush("comments", text);
      } else {
        await redis.lpush("rejected-comments", text);
      }
    }
  }
  return { response: "success" };
};
 
async function filterContent(text) {
  let data = new FormData();
  data.append("text", text);
  data.append("lang", "en");
  data.append("mode", "standard");
  data.append("api_user", "REPLACE_HERE");
  data.append("api_secret", "REPLACE_HERE");
 
  let res = await axios({
    url: "https://api.sightengine.com/1.0/text/check.json",
    method: "post",
    data: data,
    headers: data.getHeaders(),
  });
  if (res.data.profanity.matches.length > 0) return "profanity";
  else if (res.data.personal.matches.length > 0) return "personal";
  else return "none";
}

Test & Run

If you deployed the AWS Lambda function and added Kafka as trigger, you are ready to test your system. Run npm run dev in the folder of your web application. Refresh the page 5 seconds after posting a comment. You should see the comment in the web site either accepted or censored. If it does not work check:

  • Check the logs of your AWS Lambda function. Ensure there is no error log.
  • Check if the messages are in the Kafka topic. You may use the Upstash REST API.
  • Check the lists in the Redis database.

Why did we use Kafka?

We could process the submitted comments directly in the backend. So why do we need Kafka? Kafka opens many possibilities for how you process the data for different resources. For example you can easily move the comments to your database or data warehouse using a Kafka connector. You can run some other processes where you make analysis on your users’ comments. You can set up alerting systems processing the comments in real time. So Kafka becomes the data hub of your system where you can connect different systems feeding from the same data.

Is AWS Lambda a must?

Not at all. We preferred AWS Lambda to process messages to make the application serverless. You can use your servers/instances/containers to process the Kafka messages.

Closing Words

In this post, I tried to show how easy it is to implement a complex system end to end using serverless tools. We are planning to publish similar examples, follow us at Twitter and Discord.