diff --git a/src/main/java/csw/kafka/study/twitter/TwitterProducer.java b/src/main/java/csw/kafka/study/twitter/TwitterProducer.java index e5a7798ab45bd1aa0f5e0143ef97cc2df37d3c31..533abd93b7c348d4d877740cd254939e81cc2c4b 100644 --- a/src/main/java/csw/kafka/study/twitter/TwitterProducer.java +++ b/src/main/java/csw/kafka/study/twitter/TwitterProducer.java @@ -11,10 +11,14 @@ import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint; import com.twitter.hbc.core.processor.StringDelimitedProcessor; import com.twitter.hbc.httpclient.auth.Authentication; import com.twitter.hbc.httpclient.auth.OAuth1; +import org.apache.kafka.clients.producer.*; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; +import java.util.Properties; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -39,7 +43,15 @@ public class TwitterProducer { client.connect(); // Kafka Producer - // Kafka에 트윗 전송하기 + KafkaProducer<String, String> producer = createKafkaProducer(); + + // 종료 hook + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + logger.info("Stopping application..."); + client.stop(); + producer.close(); + logger.info("Bye!"); + })); // on a different thread, or multiple different threads.... while (!client.isDone()) { @@ -52,11 +64,32 @@ public class TwitterProducer { } if (msg != null) { logger.info(msg); + // Kafka에 트윗 전송하기 + // kafka-topics --zookeeper localhost:2181 --create --topic twitter-tweets --partitions 6 --replication-factor 1 + producer.send(new ProducerRecord<>("twitter-tweets", null, msg), (recordMetadata, e) -> { + if (e != null) { + logger.info("Something bad happended", e); + } + }); } } logger.info("End of application."); } + private KafkaProducer<String, String> createKafkaProducer() { + String myServer = "localhost:9092"; + + // Kafka Producer 설정 + Properties properties = new Properties(); + properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, myServer); + properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + // Kafka는 데이터를 바이트로 보냄 (0,1) + + // Kafka Producer 만들기 + return new KafkaProducer<>(properties); + } + public Client createTwitterProducer(BlockingQueue<String> msgQueue) { // Declare the host you want to connect to, the endpoint, and authentication (basic auth or oauth) // Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);