Skip to content
Snippets Groups Projects
Unverified Commit ea219736 authored by Seok Won's avatar Seok Won
Browse files

Java: Create Kafka Streams Twitter filter

Run StreamsFilterTweets.java and TwitterProducerOptimized.java

See result using:
kafka-console-consumer --bootstrap-server localhost:9092 --topic famous-tweets

Tweets that go into a topic named "twitter-tweets", will be filtered with a tweet that is posted by a user who has 10K followers and they will go into a topic named "famous-tweets"
parent ffb1afdf
No related branches found
No related tags found
No related merge requests found
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>kafkaStudies</artifactId>
<groupId>csw.kafka</groupId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>kafka-streams-filter-tweets</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.6.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.6</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package csw.kafka.streams;
import com.google.gson.JsonParser;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Properties;
public class StreamsFilterTweets {
public static void main(String[] args) {
String myServer = "192.168.137.232:9093"; // My Hyper-V Server
// Kafka Streams 설정
Properties properties = new Properties();
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, myServer);
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-demo");
properties.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
properties.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
// create a topology
StreamsBuilder streamsBuilder = new StreamsBuilder();
// input topic (bolt)
KStream<String, String> inputTopic = streamsBuilder.stream("twitter-tweets");
KStream<String, String> filteredStream = inputTopic.filter(
// filter for tweets that has a user of 10k followers
(k, jsonTweet) -> getFollwersFromTweet(jsonTweet) > 10000
);
filteredStream.to("famous-tweets"); // Send to a topic
// build the topology
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
// start our streams app
kafkaStreams.start();
// kafka-console-consumer --bootstrap-server localhost:9092 --topic famous-tweets
}
private static Integer getFollwersFromTweet(String tweetJson) {
// gson
try {
return JsonParser.parseString(tweetJson)
.getAsJsonObject()
.get("user")
.getAsJsonObject()
.get("followers_count")
.getAsInt();
} catch (NullPointerException e) {
// got exceptions
return 0;
}
}
}
......@@ -78,7 +78,7 @@ public class TwitterProducerOptimized {
}
private KafkaProducer<String, String> createKafkaProducer() {
String myServer = "localhost:9092";
String myServer = "192.168.137.232:9093"; // My Hyper-V Server
// Kafka Producer 설정
Properties properties = new Properties();
......@@ -106,7 +106,7 @@ public class TwitterProducerOptimized {
Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();
List<String> terms = Lists.newArrayList("kafka", "korea"); // tweets about kafka
List<String> terms = Lists.newArrayList("kafka", "covid", "korea"); // tweets about kafka
hosebirdEndpoint.trackTerms(terms);
// These secrets should be read from a config file
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment