diff --git a/README.md b/README.md index e65f661799529995d56606f27a33a30e5f90fa3b..862dceb68cef7b68fecfc879f2b9d3119fc22933 100644 --- a/README.md +++ b/README.md @@ -13,12 +13,17 @@ <table> <tr><td width=40% valign=top> - + +* Kafka (Python) + * [Simple Producer](https://github.com/Alfex4936/kafka-Studies/blob/main/python/src/ProducerDemo.py) + * [Producer with callback](https://github.com/Alfex4936/kafka-Studies/blob/main/python/src/ProducerDemoCallBack.py) + * Kafka (Java) * [Simple Producer](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson1/ProducerDemo.java) * [Producer with callback](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson1/ProducerDemoCallBack.java) * [Producer with key](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson1/ProducerDemoWithKey.java) * [Simple Consumer](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson2/ConsumerDemo.java) + * [Consumer With a thread](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson2/ConsumerDemoThread.java) </td></tr> </table> diff --git a/python/src/config.py b/python/src/config.py index 89de0a6ba483f7bfd3bdc8c5764a2cd70f0ff2de..48efc61672ae3815cb2d372a45ae0ee19ec917fb 100644 --- a/python/src/config.py +++ b/python/src/config.py @@ -2,3 +2,17 @@ class Config: MY_SERVER = "localhost:9092" TOPIC_ID = "first-topic" GROUP_ID = "group-one" + + CLIENT_ID = "client-1" + SESSION_TIMEOUT_MS = 6000 + OFFSET_REST = "smallest" + + # Consumer + SETTINGS = { + "bootstrap.servers": MY_SERVER, + "group.id": GROUP_ID, + "client.id": CLIENT_ID, + "enable.auto.commit": True, + "session.timeout.ms": SESSION_TIMEOUT_MS, + "default.topic.config": {"auto.offset.reset": OFFSET_REST}, + } diff --git a/src/main/java/csw/kafka/study/lesson2/ConsumerDemoThread.java b/src/main/java/csw/kafka/study/lesson2/ConsumerDemoThread.java new file mode 100644 index 0000000000000000000000000000000000000000..e674f25dcc250a6025ad0455e06482c261fbc642 --- /dev/null +++ b/src/main/java/csw/kafka/study/lesson2/ConsumerDemoThread.java @@ -0,0 +1,118 @@ +package csw.kafka.study.lesson2; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; + +public class ConsumerDemoThread { + public static void main(String[] args) { + new ConsumerDemoThread().run(); + } + + private ConsumerDemoThread() {} + + private void run() { + final Logger logger = LoggerFactory.getLogger(ConsumerDemoThread.class.getName()); + final String myServer = "localhost:9092"; + final String groupId = "group-one"; + final String topic = "first-topic"; + + // 멀티 쓰레드를 위한 latch + final CountDownLatch latch = new CountDownLatch(1); + + // Consumer runnable 만들기 + logger.info("Creating the consumer thread."); + + Runnable myConsumerRunnable = new ConsumerRunnable( + myServer, + groupId, + topic, + latch + ); + + // 쓰레드 시작 + Thread myThread = new Thread(myConsumerRunnable); + myThread.start(); + + // 종료 hook + Runtime.getRuntime().addShutdownHook(new Thread( () -> { // lambda JDK 8+ + logger.info("Caught shutdown hook."); + ((ConsumerRunnable) myConsumerRunnable).shutdown(); + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + logger.info("Application has exited."); + })); + try { + latch.await(); + } catch (InterruptedException e) { + logger.error("Application got interrupterd:", e); + } finally { + logger.info("Application is closing."); + } + } + + public class ConsumerRunnable implements Runnable { + private final CountDownLatch latch; + private final KafkaConsumer<String, String> consumer; + private final Logger logger = LoggerFactory.getLogger(ConsumerRunnable.class.getName()); + + public ConsumerRunnable(String bootstrapServer, + String groupId, + String topic, + CountDownLatch latch) { + this.latch = latch; + // Kafka Consumer 설정 + Properties properties = new Properties(); + properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); + properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // earliest, latest, none + + // Kafka Consumer 만들기 + consumer = new KafkaConsumer<>(properties); + + // topic에 연결 + consumer.subscribe(Collections.singleton(topic)); + // consumer.subscribe(Arrays.asList(topic)); + + } + + @Override + public void run() { + // Poll for new data + try { + while (true) { + ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord<String, String> record : records) { + logger.info(String.format("Key: %s, Value: %s", record.key(), record.value())); + logger.info(String.format("Partition: %s, Offset: %s", record.partition(), record.offset())); + } + } + } catch (WakeupException e) { + logger.info("Received shutdown signal!"); + } finally { + consumer.close(); + // 메인 코드 종료 + latch.countDown(); + } + } + + public void shutdown() { + consumer.wakeup(); + } + } +}