Skip to content
Snippets Groups Projects
Commit 9b667d5a authored by Seok Won's avatar Seok Won
Browse files

Update TwitterProducer to send data to kafka

Create a topic named "twitter-tweets" with below command,

kafka-topics --zookeeper localhost:2181 --create --topic twitter-tweets --partitions 6 --replication-factor 1

and run this application.
parent cebbd832
No related branches found
No related tags found
No related merge requests found
......@@ -11,10 +11,14 @@ import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
......@@ -39,7 +43,15 @@ public class TwitterProducer {
client.connect();
// Kafka Producer
// Kafka에 트윗 전송하기
KafkaProducer<String, String> producer = createKafkaProducer();
// 종료 hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("Stopping application...");
client.stop();
producer.close();
logger.info("Bye!");
}));
// on a different thread, or multiple different threads....
while (!client.isDone()) {
......@@ -52,11 +64,32 @@ public class TwitterProducer {
}
if (msg != null) {
logger.info(msg);
// Kafka에 트윗 전송하기
// kafka-topics --zookeeper localhost:2181 --create --topic twitter-tweets --partitions 6 --replication-factor 1
producer.send(new ProducerRecord<>("twitter-tweets", null, msg), (recordMetadata, e) -> {
if (e != null) {
logger.info("Something bad happended", e);
}
});
}
}
logger.info("End of application.");
}
private KafkaProducer<String, String> createKafkaProducer() {
String myServer = "localhost:9092";
// Kafka Producer 설정
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, myServer);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Kafka는 데이터를 바이트로 보냄 (0,1)
// Kafka Producer 만들기
return new KafkaProducer<>(properties);
}
public Client createTwitterProducer(BlockingQueue<String> msgQueue) {
// Declare the host you want to connect to, the endpoint, and authentication (basic auth or oauth) //
Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment