diff --git a/README.md b/README.md index 0257d6d2ab8a4935471b6d26b2242900c7640f2b..376f70c0e392dd0a83273dbca7a5240a56ce163d 100644 --- a/README.md +++ b/README.md @@ -9,10 +9,23 @@ </div> +## 목차 + +<table> + <tr><td width=40% valign=top> + +* Kafka + * [Simple Producer](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson1/ProducerDemo.java) + * [Producer with callback](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson1/ProducerDemoCallBack.java) + * [Producer with key](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson1/ProducerDemoWithKey.java) + * [Simple Consumer](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson2/ConsumerDemo.java) +</td></tr> +</table> + ## 설치 [Kafka 다운로드](https://kafka.apache.org/downloads) -*Download Binary* +*Make sure to Download "Binary"* ## 실행 @@ -78,13 +91,3 @@ group-one first-topic 1 0 group-one first-topic 2 0 ``` - -## 목차 - -<table> - <tr><td width=40% valign=top> - -* Kafka - * [Simple Producer](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson1/ProducerDemo.java) -</td></tr> -</table> diff --git a/pom.xml b/pom.xml index 4ebf0a440097f934a05a8d1f2dacdc52a651969e..12a2b88be57eda045c99e699ee8181b3ff40506d 100644 --- a/pom.xml +++ b/pom.xml @@ -7,6 +7,18 @@ <groupId>csw.kafka</groupId> <artifactId>kafkaStudies</artifactId> <version>1.0</version> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>8</source> + <target>8</target> + </configuration> + </plugin> + </plugins> + </build> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependencies> diff --git a/src/main/java/csw/kafka/study/lesson2/ConsumerDemo.java b/src/main/java/csw/kafka/study/lesson2/ConsumerDemo.java new file mode 100644 index 0000000000000000000000000000000000000000..267d935a9c259ee196ea92ef36e91095be832549 --- /dev/null +++ b/src/main/java/csw/kafka/study/lesson2/ConsumerDemo.java @@ -0,0 +1,46 @@ +package csw.kafka.study.lesson2; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Collections; +import java.util.Properties; + +public class ConsumerDemo { + public static void main(String[] args) { + final Logger logger = LoggerFactory.getLogger(ConsumerDemo.class.getName()); + final String myServer = "localhost:9092"; + final String groupId = "group-one"; + final String topic = "first-topic"; + + // Kafka Consumer 설정 + Properties properties = new Properties(); + properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, myServer); + properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); + properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // earliest, latest, none + + // Kafka Consumer 만들기 + KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); + + // topic에 연결 + consumer.subscribe(Collections.singleton(topic)); + // consumer.subscribe(Arrays.asList(topic)); + + // Poll for new data + while (true) { + ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord<String, String> record : records) { + logger.info(String.format("Key: %s, Value: %s", record.key(), record.value())); + logger.info(String.format("Partition: %s, Offset: %s", record.partition(), record.offset())); + } + } + } +}