From c998f7bfab4827573f364d4fb3397e82c8764814 Mon Sep 17 00:00:00 2001
From: Seok Won <alfex4936@gmail.com>
Date: Sun, 29 Nov 2020 20:29:17 +0900
Subject: [PATCH] Create Simple Consumer

it reads all datas in three partitions from a topic named "first-topic".

I sent datas in String, like "hello world".
---
 README.md                                     | 25 +++++-----
 pom.xml                                       | 12 +++++
 .../csw/kafka/study/lesson2/ConsumerDemo.java | 46 +++++++++++++++++++
 3 files changed, 72 insertions(+), 11 deletions(-)
 create mode 100644 src/main/java/csw/kafka/study/lesson2/ConsumerDemo.java

diff --git a/README.md b/README.md
index 0257d6d..376f70c 100644
--- a/README.md
+++ b/README.md
@@ -9,10 +9,23 @@
 
 </div>
 
+## 목차
+
+<table>
+    <tr><td width=40% valign=top>
+        
+* Kafka
+    * [Simple Producer](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson1/ProducerDemo.java)
+    * [Producer with callback](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson1/ProducerDemoCallBack.java)
+    * [Producer with key](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson1/ProducerDemoWithKey.java)
+    * [Simple Consumer](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson2/ConsumerDemo.java)
+</td></tr>
+</table>
+
 ## 설치
 [Kafka 다운로드](https://kafka.apache.org/downloads)
 
-*Download Binary*
+*Make sure to Download "Binary"*
 
 ## 실행
 
@@ -78,13 +91,3 @@ group-one                      first-topic                    1          0
 group-one                      first-topic                    2          0
 
 ```
-
-## 목차
-
-<table>
-    <tr><td width=40% valign=top>
-        
-* Kafka
-    * [Simple Producer](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson1/ProducerDemo.java)
-</td></tr>
-</table>
diff --git a/pom.xml b/pom.xml
index 4ebf0a4..12a2b88 100644
--- a/pom.xml
+++ b/pom.xml
@@ -7,6 +7,18 @@
     <groupId>csw.kafka</groupId>
     <artifactId>kafkaStudies</artifactId>
     <version>1.0</version>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>8</source>
+                    <target>8</target>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
     <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
 
     <dependencies>
diff --git a/src/main/java/csw/kafka/study/lesson2/ConsumerDemo.java b/src/main/java/csw/kafka/study/lesson2/ConsumerDemo.java
new file mode 100644
index 0000000..267d935
--- /dev/null
+++ b/src/main/java/csw/kafka/study/lesson2/ConsumerDemo.java
@@ -0,0 +1,46 @@
+package csw.kafka.study.lesson2;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Properties;
+
+public class ConsumerDemo {
+    public static void main(String[] args) {
+        final Logger logger = LoggerFactory.getLogger(ConsumerDemo.class.getName());
+        final String myServer = "localhost:9092";
+        final String groupId = "group-one";
+        final String topic = "first-topic";
+
+        // Kafka Consumer 설정
+        Properties properties = new Properties();
+        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, myServer);
+        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");  // earliest, latest, none
+
+        // Kafka Consumer 만들기
+        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
+
+        // topic에 연결
+        consumer.subscribe(Collections.singleton(topic));
+        // consumer.subscribe(Arrays.asList(topic));
+
+        // Poll for new data
+        while (true) {
+            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
+            for (ConsumerRecord<String, String> record : records) {
+                logger.info(String.format("Key: %s, Value: %s", record.key(), record.value()));
+                logger.info(String.format("Partition:  %s, Offset: %s", record.partition(), record.offset()));
+            }
+        }
+    }
+}
-- 
GitLab