diff --git a/kafkaStudies.iml b/kafkaStudies.iml deleted file mode 100644 index 78b2cc53b203f0b97534bb1184cdc7b474339fb4..0000000000000000000000000000000000000000 --- a/kafkaStudies.iml +++ /dev/null @@ -1,2 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<module type="JAVA_MODULE" version="4" /> \ No newline at end of file diff --git a/src/main/java/csw/kafka/study/twitter/TwitterProducer.java b/src/main/java/csw/kafka/study/twitter/TwitterProducer.java index 533abd93b7c348d4d877740cd254939e81cc2c4b..f4dad3f52ce3a72e5f827b77f781dcf4cc069074 100644 --- a/src/main/java/csw/kafka/study/twitter/TwitterProducer.java +++ b/src/main/java/csw/kafka/study/twitter/TwitterProducer.java @@ -1,7 +1,6 @@ package csw.kafka.study.twitter; import com.google.common.collect.Lists; - import com.twitter.hbc.ClientBuilder; import com.twitter.hbc.core.Client; import com.twitter.hbc.core.Constants; @@ -11,12 +10,14 @@ import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint; import com.twitter.hbc.core.processor.StringDelimitedProcessor; import com.twitter.hbc.httpclient.auth.Authentication; import com.twitter.hbc.httpclient.auth.OAuth1; -import org.apache.kafka.clients.producer.*; -import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.*; import java.util.List; import java.util.Properties; import java.util.concurrent.BlockingQueue; @@ -95,14 +96,31 @@ public class TwitterProducer { Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST); StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint(); - List<String> terms = Lists.newArrayList("kafka"); // tweets about kafka + List<String> terms = Lists.newArrayList("kafka", "skorea"); // tweets about hosebirdEndpoint.trackTerms(terms); // These secrets should be read from a config file - String consumerSecret = ""; - String consumerKey = ""; - String token = ""; - String tokenSecret = ""; + final File fileName = new File("src/main/java/csw/kafka/study/twitter/app.config"); + final Properties prop = new Properties(); + InputStream is = null; + try { + is = new FileInputStream(fileName.getAbsolutePath()); + } catch (FileNotFoundException ex) { + logger.error("app.config not found." + fileName.getAbsolutePath()); + // do nothing + } + + try { + prop.load(is); + } catch (IOException ex) { + logger.error("app.config load failed."); + // do nothing + } + + String consumerSecret = prop.getProperty("consumerSecret"); + String consumerKey = prop.getProperty("consumerKey"); + String token = prop.getProperty("token"); + String tokenSecret = prop.getProperty("tokenSecret"); Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, tokenSecret); diff --git a/src/main/java/csw/kafka/study/twitter/TwitterProducerOptimized.java b/src/main/java/csw/kafka/study/twitter/TwitterProducerOptimized.java new file mode 100644 index 0000000000000000000000000000000000000000..61e499cd95a40990be48a275e3980b2ca64cde0a --- /dev/null +++ b/src/main/java/csw/kafka/study/twitter/TwitterProducerOptimized.java @@ -0,0 +1,146 @@ +package csw.kafka.study.twitter; + +import com.google.common.collect.Lists; +import com.twitter.hbc.ClientBuilder; +import com.twitter.hbc.core.Client; +import com.twitter.hbc.core.Constants; +import com.twitter.hbc.core.Hosts; +import com.twitter.hbc.core.HttpHosts; +import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint; +import com.twitter.hbc.core.processor.StringDelimitedProcessor; +import com.twitter.hbc.httpclient.auth.Authentication; +import com.twitter.hbc.httpclient.auth.OAuth1; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +public class TwitterProducerOptimized { + final Logger logger = LoggerFactory.getLogger(TwitterProducerOptimized.class.getName()); + + public TwitterProducerOptimized() { + // pass + } + + public static void main(String[] args) { + new TwitterProducerOptimized().run(); + } + + public void run() { + logger.info("Setting up..."); + + // 트위터 클라이언트 + BlockingQueue<String> msgQueue = new LinkedBlockingQueue<>(1000); + Client client = createTwitterProducer(msgQueue); + client.connect(); + + // Kafka Producer + KafkaProducer<String, String> producer = createKafkaProducer(); + + // 종료 hook + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + logger.info("Stopping application..."); + client.stop(); + producer.close(); + logger.info("Bye!"); + })); + + // on a different thread, or multiple different threads.... + while (!client.isDone()) { + String msg = null; + try { + msg = msgQueue.poll(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + client.stop(); + } + if (msg != null) { + logger.info(msg); + // Kafka에 트윗 전송하기 + // kafka-topics --zookeeper localhost:2181 --create --topic twitter-tweets --partitions 6 --replication-factor 1 + producer.send(new ProducerRecord<>("twitter-tweets", null, msg), (recordMetadata, e) -> { + if (e != null) { + logger.info("Something bad happended", e); + } + }); + } + } + logger.info("End of application."); + } + + private KafkaProducer<String, String> createKafkaProducer() { + String myServer = "localhost:9092"; + + // Kafka Producer 설정 + Properties properties = new Properties(); + properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, myServer); + properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + + // Safe Producer 설정 + properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); + properties.setProperty(ProducerConfig.ACKS_CONFIG, "all"); + properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE)); + properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); + + // High throughput Producer 설정 + properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); + properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20"); + properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32 * 1024)); + + // Kafka Producer 만들기 + return new KafkaProducer<>(properties); + } + + public Client createTwitterProducer(BlockingQueue<String> msgQueue) { + // Declare the host you want to connect to, the endpoint, and authentication (basic auth or oauth) // + Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST); + StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint(); + + List<String> terms = Lists.newArrayList("kafka", "korea"); // tweets about kafka + hosebirdEndpoint.trackTerms(terms); + + // These secrets should be read from a config file + final File fileName = new File("src/main/java/csw/kafka/study/twitter/app.config"); + final Properties prop = new Properties(); + InputStream is = null; + try { + is = new FileInputStream(fileName.getAbsolutePath()); + } catch (FileNotFoundException ex) { + logger.error("app.config not found." + fileName.getAbsolutePath()); + // do nothing + } + + try { + prop.load(is); + } catch (IOException ex) { + logger.error("app.config load failed."); + // do nothing + } + + String consumerSecret = prop.getProperty("consumerSecret"); + String consumerKey = prop.getProperty("consumerKey"); + String token = prop.getProperty("token"); + String tokenSecret = prop.getProperty("tokenSecret"); + + Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, tokenSecret); + + ClientBuilder builder = new ClientBuilder() + .name("Hosebird-Client-01") // optional: mainly for the logs + .hosts(hosebirdHosts) + .authentication(hosebirdAuth) + .endpoint(hosebirdEndpoint) + .processor(new StringDelimitedProcessor(msgQueue)); + + return builder.build(); + } +} diff --git a/src/main/java/csw/kafka/study/twitter/app.config b/src/main/java/csw/kafka/study/twitter/app.config new file mode 100644 index 0000000000000000000000000000000000000000..a261a7e19edb6aa1d9ccee2c6a1c8ae9802e4748 --- /dev/null +++ b/src/main/java/csw/kafka/study/twitter/app.config @@ -0,0 +1,4 @@ +consumerSecret = +consumerKey = +token = +tokenSecret = \ No newline at end of file