Skip to content
Snippets Groups Projects
Commit cebbd832 authored by Seok Won's avatar Seok Won
Browse files

Create TwitterProducer with just TwitterAPI

Next we will build kafka producer to send tweets to kafka.

https://github.com/twitter/hbc
parent 768e01ea
No related branches found
No related tags found
No related merge requests found
...@@ -50,6 +50,12 @@ ...@@ -50,6 +50,12 @@
<version>2.11.3</version> <version>2.11.3</version>
</dependency> </dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>hbc-core</artifactId> <!-- or hbc-twitter4j -->
<version>2.2.0</version> <!-- or whatever the latest version is -->
</dependency>
</dependencies> </dependencies>
......
package csw.kafka.study.twitter;
import com.google.common.collect.Lists;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Client;
import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.Hosts;
import com.twitter.hbc.core.HttpHosts;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class TwitterProducer {
final Logger logger = LoggerFactory.getLogger(TwitterProducer.class.getName());
public TwitterProducer() {
// pass
}
public static void main(String[] args) {
new TwitterProducer().run();
}
public void run() {
logger.info("Setting up...");
// 트위터 클라이언트
BlockingQueue<String> msgQueue = new LinkedBlockingQueue<>(1000);
Client client = createTwitterProducer(msgQueue);
client.connect();
// Kafka Producer
// Kafka에 트윗 전송하기
// on a different thread, or multiple different threads....
while (!client.isDone()) {
String msg = null;
try {
msg = msgQueue.poll(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
client.stop();
}
if (msg != null) {
logger.info(msg);
}
}
logger.info("End of application.");
}
public Client createTwitterProducer(BlockingQueue<String> msgQueue) {
// Declare the host you want to connect to, the endpoint, and authentication (basic auth or oauth) //
Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();
List<String> terms = Lists.newArrayList("kafka"); // tweets about kafka
hosebirdEndpoint.trackTerms(terms);
// These secrets should be read from a config file
String consumerSecret = "";
String consumerKey = "";
String token = "";
String tokenSecret = "";
Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, tokenSecret);
ClientBuilder builder = new ClientBuilder()
.name("Hosebird-Client-01") // optional: mainly for the logs
.hosts(hosebirdHosts)
.authentication(hosebirdAuth)
.endpoint(hosebirdEndpoint)
.processor(new StringDelimitedProcessor(msgQueue));
return builder.build();
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment