diff --git a/kafka-streams-filter-tweets/pom.xml b/kafka-streams-filter-tweets/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..bfec60f3c129b6fe599fd9a179091911e9158447 --- /dev/null +++ b/kafka-streams-filter-tweets/pom.xml @@ -0,0 +1,41 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>kafkaStudies</artifactId> + <groupId>csw.kafka</groupId> + <version>1.0</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>kafka-streams-filter-tweets</artifactId> + + <properties> + <maven.compiler.source>8</maven.compiler.source> + <maven.compiler.target>8</maven.compiler.target> + </properties> + + <dependencies> + <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams --> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-streams</artifactId> + <version>2.6.0</version> + </dependency> + <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>1.7.30</version> + </dependency> + <!-- https://mvnrepository.com/artifact/com.google.code.gson/gson --> + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + <version>2.8.6</version> + </dependency> + + </dependencies> + +</project> \ No newline at end of file diff --git a/kafka-streams-filter-tweets/src/main/java/csw/kafka/streams/StreamsFilterTweets.java b/kafka-streams-filter-tweets/src/main/java/csw/kafka/streams/StreamsFilterTweets.java new file mode 100644 index 0000000000000000000000000000000000000000..e5629377355805fee2a390102a727ed01cbe26d3 --- /dev/null +++ b/kafka-streams-filter-tweets/src/main/java/csw/kafka/streams/StreamsFilterTweets.java @@ -0,0 +1,57 @@ +package csw.kafka.streams; + +import com.google.gson.JsonParser; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStream; + +import java.util.Properties; + +public class StreamsFilterTweets { + public static void main(String[] args) { + String myServer = "192.168.137.232:9093"; // My Hyper-V Server + + // Kafka Streams 설정 + Properties properties = new Properties(); + properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, myServer); + properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-demo"); + properties.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); + properties.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); + + // create a topology + StreamsBuilder streamsBuilder = new StreamsBuilder(); + + // input topic (bolt) + KStream<String, String> inputTopic = streamsBuilder.stream("twitter-tweets"); + KStream<String, String> filteredStream = inputTopic.filter( + // filter for tweets that has a user of 10k followers + (k, jsonTweet) -> getFollwersFromTweet(jsonTweet) > 10000 + ); + filteredStream.to("famous-tweets"); // Send to a topic + + // build the topology + KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties); + + // start our streams app + kafkaStreams.start(); + + // kafka-console-consumer --bootstrap-server localhost:9092 --topic famous-tweets + } + + private static Integer getFollwersFromTweet(String tweetJson) { + // gson + try { + return JsonParser.parseString(tweetJson) + .getAsJsonObject() + .get("user") + .getAsJsonObject() + .get("followers_count") + .getAsInt(); + } catch (NullPointerException e) { + // got exceptions + return 0; + } + } +} diff --git a/src/main/java/csw/kafka/study/twitter/TwitterProducerOptimized.java b/src/main/java/csw/kafka/study/twitter/TwitterProducerOptimized.java index 61e499cd95a40990be48a275e3980b2ca64cde0a..5a1af3d30a971af2dc6d24bc3a99437a89d5986f 100644 --- a/src/main/java/csw/kafka/study/twitter/TwitterProducerOptimized.java +++ b/src/main/java/csw/kafka/study/twitter/TwitterProducerOptimized.java @@ -78,7 +78,7 @@ public class TwitterProducerOptimized { } private KafkaProducer<String, String> createKafkaProducer() { - String myServer = "localhost:9092"; + String myServer = "192.168.137.232:9093"; // My Hyper-V Server // Kafka Producer 설정 Properties properties = new Properties(); @@ -106,7 +106,7 @@ public class TwitterProducerOptimized { Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST); StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint(); - List<String> terms = Lists.newArrayList("kafka", "korea"); // tweets about kafka + List<String> terms = Lists.newArrayList("kafka", "covid", "korea"); // tweets about kafka hosebirdEndpoint.trackTerms(terms); // These secrets should be read from a config file