Skip to content
Snippets Groups Projects
Commit 42444e1b authored by Seok Won's avatar Seok Won
Browse files

Create Optimized Producer for Twitter

+ Create app.config to load twitter api tokens

Safe + High throughput by using

compression = snappy,
linger ms = 20,
batch size = 32 * 1024,
acks = all,
idempotence = true,
retries = INT32_MAX
max.in.flight = 5
parent 4086471e
Branches
No related tags found
No related merge requests found
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4" />
\ No newline at end of file
package csw.kafka.study.twitter; package csw.kafka.study.twitter;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.twitter.hbc.ClientBuilder; import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Client; import com.twitter.hbc.core.Client;
import com.twitter.hbc.core.Constants; import com.twitter.hbc.core.Constants;
...@@ -11,12 +10,14 @@ import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint; ...@@ -11,12 +10,14 @@ import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.processor.StringDelimitedProcessor; import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.auth.Authentication; import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1; import com.twitter.hbc.httpclient.auth.OAuth1;
import org.apache.kafka.clients.producer.*; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.*;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
...@@ -95,14 +96,31 @@ public class TwitterProducer { ...@@ -95,14 +96,31 @@ public class TwitterProducer {
Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST); Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint(); StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();
List<String> terms = Lists.newArrayList("kafka"); // tweets about kafka List<String> terms = Lists.newArrayList("kafka", "skorea"); // tweets about
hosebirdEndpoint.trackTerms(terms); hosebirdEndpoint.trackTerms(terms);
// These secrets should be read from a config file // These secrets should be read from a config file
String consumerSecret = ""; final File fileName = new File("src/main/java/csw/kafka/study/twitter/app.config");
String consumerKey = ""; final Properties prop = new Properties();
String token = ""; InputStream is = null;
String tokenSecret = ""; try {
is = new FileInputStream(fileName.getAbsolutePath());
} catch (FileNotFoundException ex) {
logger.error("app.config not found." + fileName.getAbsolutePath());
// do nothing
}
try {
prop.load(is);
} catch (IOException ex) {
logger.error("app.config load failed.");
// do nothing
}
String consumerSecret = prop.getProperty("consumerSecret");
String consumerKey = prop.getProperty("consumerKey");
String token = prop.getProperty("token");
String tokenSecret = prop.getProperty("tokenSecret");
Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, tokenSecret); Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, tokenSecret);
......
package csw.kafka.study.twitter;
import com.google.common.collect.Lists;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Client;
import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.Hosts;
import com.twitter.hbc.core.HttpHosts;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class TwitterProducerOptimized {
final Logger logger = LoggerFactory.getLogger(TwitterProducerOptimized.class.getName());
public TwitterProducerOptimized() {
// pass
}
public static void main(String[] args) {
new TwitterProducerOptimized().run();
}
public void run() {
logger.info("Setting up...");
// 트위터 클라이언트
BlockingQueue<String> msgQueue = new LinkedBlockingQueue<>(1000);
Client client = createTwitterProducer(msgQueue);
client.connect();
// Kafka Producer
KafkaProducer<String, String> producer = createKafkaProducer();
// 종료 hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("Stopping application...");
client.stop();
producer.close();
logger.info("Bye!");
}));
// on a different thread, or multiple different threads....
while (!client.isDone()) {
String msg = null;
try {
msg = msgQueue.poll(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
client.stop();
}
if (msg != null) {
logger.info(msg);
// Kafka에 트윗 전송하기
// kafka-topics --zookeeper localhost:2181 --create --topic twitter-tweets --partitions 6 --replication-factor 1
producer.send(new ProducerRecord<>("twitter-tweets", null, msg), (recordMetadata, e) -> {
if (e != null) {
logger.info("Something bad happended", e);
}
});
}
}
logger.info("End of application.");
}
private KafkaProducer<String, String> createKafkaProducer() {
String myServer = "localhost:9092";
// Kafka Producer 설정
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, myServer);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Safe Producer 설정
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
// High throughput Producer 설정
properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20");
properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32 * 1024));
// Kafka Producer 만들기
return new KafkaProducer<>(properties);
}
public Client createTwitterProducer(BlockingQueue<String> msgQueue) {
// Declare the host you want to connect to, the endpoint, and authentication (basic auth or oauth) //
Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();
List<String> terms = Lists.newArrayList("kafka", "korea"); // tweets about kafka
hosebirdEndpoint.trackTerms(terms);
// These secrets should be read from a config file
final File fileName = new File("src/main/java/csw/kafka/study/twitter/app.config");
final Properties prop = new Properties();
InputStream is = null;
try {
is = new FileInputStream(fileName.getAbsolutePath());
} catch (FileNotFoundException ex) {
logger.error("app.config not found." + fileName.getAbsolutePath());
// do nothing
}
try {
prop.load(is);
} catch (IOException ex) {
logger.error("app.config load failed.");
// do nothing
}
String consumerSecret = prop.getProperty("consumerSecret");
String consumerKey = prop.getProperty("consumerKey");
String token = prop.getProperty("token");
String tokenSecret = prop.getProperty("tokenSecret");
Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, tokenSecret);
ClientBuilder builder = new ClientBuilder()
.name("Hosebird-Client-01") // optional: mainly for the logs
.hosts(hosebirdHosts)
.authentication(hosebirdAuth)
.endpoint(hosebirdEndpoint)
.processor(new StringDelimitedProcessor(msgQueue));
return builder.build();
}
}
consumerSecret =
consumerKey =
token =
tokenSecret =
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment