From 047cbae27fe0a73cf81c982581eb59418e7338b6 Mon Sep 17 00:00:00 2001 From: Seok Won <alfex4936@gmail.com> Date: Mon, 30 Nov 2020 16:44:46 +0900 Subject: [PATCH] Create Consumer with a thread + python config updates for consumers + README updates --- README.md | 7 +- python/src/config.py | 14 +++ .../study/lesson2/ConsumerDemoThread.java | 118 ++++++++++++++++++ 3 files changed, 138 insertions(+), 1 deletion(-) create mode 100644 src/main/java/csw/kafka/study/lesson2/ConsumerDemoThread.java diff --git a/README.md b/README.md index e65f661..862dceb 100644 --- a/README.md +++ b/README.md @@ -13,12 +13,17 @@ <table> <tr><td width=40% valign=top> - + +* Kafka (Python) + * [Simple Producer](https://github.com/Alfex4936/kafka-Studies/blob/main/python/src/ProducerDemo.py) + * [Producer with callback](https://github.com/Alfex4936/kafka-Studies/blob/main/python/src/ProducerDemoCallBack.py) + * Kafka (Java) * [Simple Producer](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson1/ProducerDemo.java) * [Producer with callback](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson1/ProducerDemoCallBack.java) * [Producer with key](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson1/ProducerDemoWithKey.java) * [Simple Consumer](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson2/ConsumerDemo.java) + * [Consumer With a thread](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson2/ConsumerDemoThread.java) </td></tr> </table> diff --git a/python/src/config.py b/python/src/config.py index 89de0a6..48efc61 100644 --- a/python/src/config.py +++ b/python/src/config.py @@ -2,3 +2,17 @@ class Config: MY_SERVER = "localhost:9092" TOPIC_ID = "first-topic" GROUP_ID = "group-one" + + CLIENT_ID = "client-1" + SESSION_TIMEOUT_MS = 6000 + OFFSET_REST = "smallest" + + # Consumer + SETTINGS = { + "bootstrap.servers": MY_SERVER, + "group.id": GROUP_ID, + "client.id": CLIENT_ID, + "enable.auto.commit": True, + "session.timeout.ms": SESSION_TIMEOUT_MS, + "default.topic.config": {"auto.offset.reset": OFFSET_REST}, + } diff --git a/src/main/java/csw/kafka/study/lesson2/ConsumerDemoThread.java b/src/main/java/csw/kafka/study/lesson2/ConsumerDemoThread.java new file mode 100644 index 0000000..e674f25 --- /dev/null +++ b/src/main/java/csw/kafka/study/lesson2/ConsumerDemoThread.java @@ -0,0 +1,118 @@ +package csw.kafka.study.lesson2; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; + +public class ConsumerDemoThread { + public static void main(String[] args) { + new ConsumerDemoThread().run(); + } + + private ConsumerDemoThread() {} + + private void run() { + final Logger logger = LoggerFactory.getLogger(ConsumerDemoThread.class.getName()); + final String myServer = "localhost:9092"; + final String groupId = "group-one"; + final String topic = "first-topic"; + + // 멀티 쓰레드를 위한 latch + final CountDownLatch latch = new CountDownLatch(1); + + // Consumer runnable 만들기 + logger.info("Creating the consumer thread."); + + Runnable myConsumerRunnable = new ConsumerRunnable( + myServer, + groupId, + topic, + latch + ); + + // 쓰레드 시작 + Thread myThread = new Thread(myConsumerRunnable); + myThread.start(); + + // 종료 hook + Runtime.getRuntime().addShutdownHook(new Thread( () -> { // lambda JDK 8+ + logger.info("Caught shutdown hook."); + ((ConsumerRunnable) myConsumerRunnable).shutdown(); + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + logger.info("Application has exited."); + })); + try { + latch.await(); + } catch (InterruptedException e) { + logger.error("Application got interrupterd:", e); + } finally { + logger.info("Application is closing."); + } + } + + public class ConsumerRunnable implements Runnable { + private final CountDownLatch latch; + private final KafkaConsumer<String, String> consumer; + private final Logger logger = LoggerFactory.getLogger(ConsumerRunnable.class.getName()); + + public ConsumerRunnable(String bootstrapServer, + String groupId, + String topic, + CountDownLatch latch) { + this.latch = latch; + // Kafka Consumer 설정 + Properties properties = new Properties(); + properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); + properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // earliest, latest, none + + // Kafka Consumer 만들기 + consumer = new KafkaConsumer<>(properties); + + // topic에 연결 + consumer.subscribe(Collections.singleton(topic)); + // consumer.subscribe(Arrays.asList(topic)); + + } + + @Override + public void run() { + // Poll for new data + try { + while (true) { + ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord<String, String> record : records) { + logger.info(String.format("Key: %s, Value: %s", record.key(), record.value())); + logger.info(String.format("Partition: %s, Offset: %s", record.partition(), record.offset())); + } + } + } catch (WakeupException e) { + logger.info("Received shutdown signal!"); + } finally { + consumer.close(); + // 메인 코드 종료 + latch.countDown(); + } + } + + public void shutdown() { + consumer.wakeup(); + } + } +} -- GitLab