From 9b667d5a275f5be73c5dc3e27b5f3da5ad90e8b7 Mon Sep 17 00:00:00 2001 From: Seok Won <alfex4936@gmail.com> Date: Mon, 30 Nov 2020 20:26:52 +0900 Subject: [PATCH] Update TwitterProducer to send data to kafka Create a topic named "twitter-tweets" with below command, kafka-topics --zookeeper localhost:2181 --create --topic twitter-tweets --partitions 6 --replication-factor 1 and run this application. --- .../kafka/study/twitter/TwitterProducer.java | 35 ++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/src/main/java/csw/kafka/study/twitter/TwitterProducer.java b/src/main/java/csw/kafka/study/twitter/TwitterProducer.java index e5a7798..533abd9 100644 --- a/src/main/java/csw/kafka/study/twitter/TwitterProducer.java +++ b/src/main/java/csw/kafka/study/twitter/TwitterProducer.java @@ -11,10 +11,14 @@ import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint; import com.twitter.hbc.core.processor.StringDelimitedProcessor; import com.twitter.hbc.httpclient.auth.Authentication; import com.twitter.hbc.httpclient.auth.OAuth1; +import org.apache.kafka.clients.producer.*; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; +import java.util.Properties; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -39,7 +43,15 @@ public class TwitterProducer { client.connect(); // Kafka Producer - // Kafka에 트윗 전송하기 + KafkaProducer<String, String> producer = createKafkaProducer(); + + // 종료 hook + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + logger.info("Stopping application..."); + client.stop(); + producer.close(); + logger.info("Bye!"); + })); // on a different thread, or multiple different threads.... while (!client.isDone()) { @@ -52,11 +64,32 @@ public class TwitterProducer { } if (msg != null) { logger.info(msg); + // Kafka에 트윗 전송하기 + // kafka-topics --zookeeper localhost:2181 --create --topic twitter-tweets --partitions 6 --replication-factor 1 + producer.send(new ProducerRecord<>("twitter-tweets", null, msg), (recordMetadata, e) -> { + if (e != null) { + logger.info("Something bad happended", e); + } + }); } } logger.info("End of application."); } + private KafkaProducer<String, String> createKafkaProducer() { + String myServer = "localhost:9092"; + + // Kafka Producer 설정 + Properties properties = new Properties(); + properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, myServer); + properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + // Kafka는 데이터를 바이트로 보냄 (0,1) + + // Kafka Producer 만들기 + return new KafkaProducer<>(properties); + } + public Client createTwitterProducer(BlockingQueue<String> msgQueue) { // Declare the host you want to connect to, the endpoint, and authentication (basic auth or oauth) // Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST); -- GitLab