Skip to content
Snippets Groups Projects
Commit 047cbae2 authored by Seok Won's avatar Seok Won
Browse files

Create Consumer with a thread

+ python config updates for consumers

+ README updates
parent 3886280c
No related branches found
No related tags found
No related merge requests found
......@@ -13,12 +13,17 @@
<table>
<tr><td width=40% valign=top>
* Kafka (Python)
* [Simple Producer](https://github.com/Alfex4936/kafka-Studies/blob/main/python/src/ProducerDemo.py)
* [Producer with callback](https://github.com/Alfex4936/kafka-Studies/blob/main/python/src/ProducerDemoCallBack.py)
* Kafka (Java)
* [Simple Producer](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson1/ProducerDemo.java)
* [Producer with callback](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson1/ProducerDemoCallBack.java)
* [Producer with key](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson1/ProducerDemoWithKey.java)
* [Simple Consumer](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson2/ConsumerDemo.java)
* [Consumer With a thread](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson2/ConsumerDemoThread.java)
</td></tr>
</table>
......
......@@ -2,3 +2,17 @@ class Config:
MY_SERVER = "localhost:9092"
TOPIC_ID = "first-topic"
GROUP_ID = "group-one"
CLIENT_ID = "client-1"
SESSION_TIMEOUT_MS = 6000
OFFSET_REST = "smallest"
# Consumer
SETTINGS = {
"bootstrap.servers": MY_SERVER,
"group.id": GROUP_ID,
"client.id": CLIENT_ID,
"enable.auto.commit": True,
"session.timeout.ms": SESSION_TIMEOUT_MS,
"default.topic.config": {"auto.offset.reset": OFFSET_REST},
}
package csw.kafka.study.lesson2;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class ConsumerDemoThread {
public static void main(String[] args) {
new ConsumerDemoThread().run();
}
private ConsumerDemoThread() {}
private void run() {
final Logger logger = LoggerFactory.getLogger(ConsumerDemoThread.class.getName());
final String myServer = "localhost:9092";
final String groupId = "group-one";
final String topic = "first-topic";
// 멀티 쓰레드를 위한 latch
final CountDownLatch latch = new CountDownLatch(1);
// Consumer runnable 만들기
logger.info("Creating the consumer thread.");
Runnable myConsumerRunnable = new ConsumerRunnable(
myServer,
groupId,
topic,
latch
);
// 쓰레드 시작
Thread myThread = new Thread(myConsumerRunnable);
myThread.start();
// 종료 hook
Runtime.getRuntime().addShutdownHook(new Thread( () -> { // lambda JDK 8+
logger.info("Caught shutdown hook.");
((ConsumerRunnable) myConsumerRunnable).shutdown();
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info("Application has exited.");
}));
try {
latch.await();
} catch (InterruptedException e) {
logger.error("Application got interrupterd:", e);
} finally {
logger.info("Application is closing.");
}
}
public class ConsumerRunnable implements Runnable {
private final CountDownLatch latch;
private final KafkaConsumer<String, String> consumer;
private final Logger logger = LoggerFactory.getLogger(ConsumerRunnable.class.getName());
public ConsumerRunnable(String bootstrapServer,
String groupId,
String topic,
CountDownLatch latch) {
this.latch = latch;
// Kafka Consumer 설정
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // earliest, latest, none
// Kafka Consumer 만들기
consumer = new KafkaConsumer<>(properties);
// topic에 연결
consumer.subscribe(Collections.singleton(topic));
// consumer.subscribe(Arrays.asList(topic));
}
@Override
public void run() {
// Poll for new data
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
logger.info(String.format("Key: %s, Value: %s", record.key(), record.value()));
logger.info(String.format("Partition: %s, Offset: %s", record.partition(), record.offset()));
}
}
} catch (WakeupException e) {
logger.info("Received shutdown signal!");
} finally {
consumer.close();
// 메인 코드 종료
latch.countDown();
}
}
public void shutdown() {
consumer.wakeup();
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment