Skip to content
Snippets Groups Projects
Commit c998f7bf authored by Seok Won's avatar Seok Won
Browse files

Create Simple Consumer

it reads all datas in three partitions from a topic named "first-topic".

I sent datas in String, like "hello world".
parent 7264e5e5
No related branches found
No related tags found
No related merge requests found
......@@ -9,10 +9,23 @@
</div>
## 목차
<table>
<tr><td width=40% valign=top>
* Kafka
* [Simple Producer](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson1/ProducerDemo.java)
* [Producer with callback](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson1/ProducerDemoCallBack.java)
* [Producer with key](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson1/ProducerDemoWithKey.java)
* [Simple Consumer](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson2/ConsumerDemo.java)
</td></tr>
</table>
## 설치
[Kafka 다운로드](https://kafka.apache.org/downloads)
*Download Binary*
*Make sure to Download "Binary"*
## 실행
......@@ -78,13 +91,3 @@ group-one first-topic 1 0
group-one first-topic 2 0
```
## 목차
<table>
<tr><td width=40% valign=top>
* Kafka
* [Simple Producer](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson1/ProducerDemo.java)
</td></tr>
</table>
......@@ -7,6 +7,18 @@
<groupId>csw.kafka</groupId>
<artifactId>kafkaStudies</artifactId>
<version>1.0</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependencies>
......
package csw.kafka.study.lesson2;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class ConsumerDemo {
public static void main(String[] args) {
final Logger logger = LoggerFactory.getLogger(ConsumerDemo.class.getName());
final String myServer = "localhost:9092";
final String groupId = "group-one";
final String topic = "first-topic";
// Kafka Consumer 설정
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, myServer);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // earliest, latest, none
// Kafka Consumer 만들기
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// topic에 연결
consumer.subscribe(Collections.singleton(topic));
// consumer.subscribe(Arrays.asList(topic));
// Poll for new data
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
logger.info(String.format("Key: %s, Value: %s", record.key(), record.value()));
logger.info(String.format("Partition: %s, Offset: %s", record.partition(), record.offset()));
}
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment