From cebbd8325e19d10ca6b9f108b3e82e07eb46026d Mon Sep 17 00:00:00 2001
From: Seok Won <alfex4936@gmail.com>
Date: Mon, 30 Nov 2020 20:03:24 +0900
Subject: [PATCH] Create TwitterProducer with just TwitterAPI

Next we will build kafka producer to send tweets to kafka.

https://github.com/twitter/hbc
---
 pom.xml                                       |  6 ++
 .../kafka/study/twitter/TwitterProducer.java  | 85 +++++++++++++++++++
 2 files changed, 91 insertions(+)
 create mode 100644 src/main/java/csw/kafka/study/twitter/TwitterProducer.java

diff --git a/pom.xml b/pom.xml
index 12a2b88..e60481f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -50,6 +50,12 @@
             <version>2.11.3</version>
         </dependency>
 
+        <dependency>
+            <groupId>com.twitter</groupId>
+            <artifactId>hbc-core</artifactId> <!-- or hbc-twitter4j -->
+            <version>2.2.0</version> <!-- or whatever the latest version is -->
+        </dependency>
+
     </dependencies>
 
 
diff --git a/src/main/java/csw/kafka/study/twitter/TwitterProducer.java b/src/main/java/csw/kafka/study/twitter/TwitterProducer.java
new file mode 100644
index 0000000..e5a7798
--- /dev/null
+++ b/src/main/java/csw/kafka/study/twitter/TwitterProducer.java
@@ -0,0 +1,85 @@
+package csw.kafka.study.twitter;
+
+import com.google.common.collect.Lists;
+
+import com.twitter.hbc.ClientBuilder;
+import com.twitter.hbc.core.Client;
+import com.twitter.hbc.core.Constants;
+import com.twitter.hbc.core.Hosts;
+import com.twitter.hbc.core.HttpHosts;
+import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
+import com.twitter.hbc.core.processor.StringDelimitedProcessor;
+import com.twitter.hbc.httpclient.auth.Authentication;
+import com.twitter.hbc.httpclient.auth.OAuth1;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class TwitterProducer {
+    final Logger logger = LoggerFactory.getLogger(TwitterProducer.class.getName());
+
+    public TwitterProducer() {
+        // pass
+    }
+
+    public static void main(String[] args) {
+        new TwitterProducer().run();
+    }
+
+    public void run() {
+        logger.info("Setting up...");
+
+        // 트위터 클라이언트
+        BlockingQueue<String> msgQueue = new LinkedBlockingQueue<>(1000);
+        Client client = createTwitterProducer(msgQueue);
+        client.connect();
+
+        // Kafka Producer
+        // Kafka에 트윗 전송하기
+
+        // on a different thread, or multiple different threads....
+        while (!client.isDone()) {
+            String msg = null;
+            try {
+                msg = msgQueue.poll(5, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+                client.stop();
+            }
+            if (msg != null) {
+                logger.info(msg);
+            }
+        }
+        logger.info("End of application.");
+    }
+
+    public Client createTwitterProducer(BlockingQueue<String> msgQueue) {
+        // Declare the host you want to connect to, the endpoint, and authentication (basic auth or oauth) //
+        Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
+        StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();
+
+        List<String> terms = Lists.newArrayList("kafka");  // tweets about kafka
+        hosebirdEndpoint.trackTerms(terms);
+
+        // These secrets should be read from a config file
+        String consumerSecret = "";
+        String consumerKey = "";
+        String token = "";
+        String tokenSecret = "";
+
+        Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, tokenSecret);
+
+        ClientBuilder builder = new ClientBuilder()
+                .name("Hosebird-Client-01")  // optional: mainly for the logs
+                .hosts(hosebirdHosts)
+                .authentication(hosebirdAuth)
+                .endpoint(hosebirdEndpoint)
+                .processor(new StringDelimitedProcessor(msgQueue));
+
+        return builder.build();
+    }
+}
-- 
GitLab