Skip to content
Snippets Groups Projects
Commit 768e01ea authored by Seok Won's avatar Seok Won
Browse files

Create Consumer with Assign&Seek

In this example, we read "first-topic", in partition 0, from offset 5 until offset becomes 10.

[main] INFO csw.kafka.study.lesson2.ConsumerDemoAssignSeek - Key: null, Value: five
[main] INFO csw.kafka.study.lesson2.ConsumerDemoAssignSeek - Partition:  0, Offset: 5
[main] INFO csw.kafka.study.lesson2.ConsumerDemoAssignSeek - Key: null, Value: Hello World!
[main] INFO csw.kafka.study.lesson2.ConsumerDemoAssignSeek - Partition:  0, Offset: 6
[main] INFO csw.kafka.study.lesson2.ConsumerDemoAssignSeek - Key: id_1, Value: Hello 1
[main] INFO csw.kafka.study.lesson2.ConsumerDemoAssignSeek - Partition:  0, Offset: 7
[main] INFO csw.kafka.study.lesson2.ConsumerDemoAssignSeek - Key: id_3, Value: Hello 3
[main] INFO csw.kafka.study.lesson2.ConsumerDemoAssignSeek - Partition:  0, Offset: 8
[main] INFO csw.kafka.study.lesson2.ConsumerDemoAssignSeek - Key: key_1, Value: world
[main] INFO csw.kafka.study.lesson2.ConsumerDemoAssignSeek - Partition:  0, Offset: 9
[main] INFO csw.kafka.study.lesson2.ConsumerDemoAssignSeek - Exiting...
parent 047cbae2
No related branches found
No related tags found
No related merge requests found
package csw.kafka.study.lesson2;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
public class ConsumerDemoAssignSeek {
public static void main(String[] args) {
final Logger logger = LoggerFactory.getLogger(ConsumerDemoAssignSeek.class.getName());
final String myServer = "localhost:9092";
final String groupId = "group-one";
final String topic = "first-topic";
// Kafka Consumer 설정
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, myServer);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // earliest, latest, none
// Kafka Consumer 만들기
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// Assign
TopicPartition partitionReadFrom = new TopicPartition(topic, 0);
final long offsetReadFrom = 5L;
consumer.assign(Collections.singletonList(partitionReadFrom));
// Seek
consumer.seek(partitionReadFrom, offsetReadFrom);
int msgToRead = 5;
// Poll for new data
while (msgToRead > 0) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
msgToRead -= 1;
logger.info(String.format("Key: %s, Value: %s", record.key(), record.value()));
logger.info(String.format("Partition: %s, Offset: %s", record.partition(), record.offset()));
if (msgToRead == 0) {
break;
}
}
}
logger.info("Exiting...");
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment