diff --git a/pom.xml b/pom.xml index 12a2b88be57eda045c99e699ee8181b3ff40506d..e60481fc4aef134bf6ac376fce92db6245b08863 100644 --- a/pom.xml +++ b/pom.xml @@ -50,6 +50,12 @@ <version>2.11.3</version> </dependency> + <dependency> + <groupId>com.twitter</groupId> + <artifactId>hbc-core</artifactId> <!-- or hbc-twitter4j --> + <version>2.2.0</version> <!-- or whatever the latest version is --> + </dependency> + </dependencies> diff --git a/src/main/java/csw/kafka/study/twitter/TwitterProducer.java b/src/main/java/csw/kafka/study/twitter/TwitterProducer.java new file mode 100644 index 0000000000000000000000000000000000000000..e5a7798ab45bd1aa0f5e0143ef97cc2df37d3c31 --- /dev/null +++ b/src/main/java/csw/kafka/study/twitter/TwitterProducer.java @@ -0,0 +1,85 @@ +package csw.kafka.study.twitter; + +import com.google.common.collect.Lists; + +import com.twitter.hbc.ClientBuilder; +import com.twitter.hbc.core.Client; +import com.twitter.hbc.core.Constants; +import com.twitter.hbc.core.Hosts; +import com.twitter.hbc.core.HttpHosts; +import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint; +import com.twitter.hbc.core.processor.StringDelimitedProcessor; +import com.twitter.hbc.httpclient.auth.Authentication; +import com.twitter.hbc.httpclient.auth.OAuth1; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +public class TwitterProducer { + final Logger logger = LoggerFactory.getLogger(TwitterProducer.class.getName()); + + public TwitterProducer() { + // pass + } + + public static void main(String[] args) { + new TwitterProducer().run(); + } + + public void run() { + logger.info("Setting up..."); + + // 트위터 클라이언트 + BlockingQueue<String> msgQueue = new LinkedBlockingQueue<>(1000); + Client client = createTwitterProducer(msgQueue); + client.connect(); + + // Kafka Producer + // Kafka에 트윗 전송하기 + + // on a different thread, or multiple different threads.... + while (!client.isDone()) { + String msg = null; + try { + msg = msgQueue.poll(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + client.stop(); + } + if (msg != null) { + logger.info(msg); + } + } + logger.info("End of application."); + } + + public Client createTwitterProducer(BlockingQueue<String> msgQueue) { + // Declare the host you want to connect to, the endpoint, and authentication (basic auth or oauth) // + Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST); + StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint(); + + List<String> terms = Lists.newArrayList("kafka"); // tweets about kafka + hosebirdEndpoint.trackTerms(terms); + + // These secrets should be read from a config file + String consumerSecret = ""; + String consumerKey = ""; + String token = ""; + String tokenSecret = ""; + + Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, tokenSecret); + + ClientBuilder builder = new ClientBuilder() + .name("Hosebird-Client-01") // optional: mainly for the logs + .hosts(hosebirdHosts) + .authentication(hosebirdAuth) + .endpoint(hosebirdEndpoint) + .processor(new StringDelimitedProcessor(msgQueue)); + + return builder.build(); + } +}