Skip to content
Snippets Groups Projects
Commit 539ce491 authored by Seok Won's avatar Seok Won
Browse files

Create Producer using key

At line 34, we added key parameter.

Whenever we use same key when sending a data, same key always use same partition.

i.e)
i = 0, key = "truck_1" -> partition_0
i = 1, key = "truck_1" -> partition_0
...
parent a624b66d
No related branches found
No related tags found
No related merge requests found
package csw.kafka.study.lesson1;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public class ProducerDemoWithKey {
public static void main(String[] args) {
final Logger logger = LoggerFactory.getLogger(ProducerDemoWithKey.class);
String myServer = "localhost:9092";
// Kafka Producer 설정
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, myServer);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Kafka는 데이터를 바이트로 보냄 (0,1)
// Kafka Producer 만들기
final KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
for (int i = 0; i < 5; i++) {
String topic = "first-topic";
String value = "Hello " + i;
String key = "id_" + i;
// 같인 key는 항상 같은 파티션에 데이터가 주입된다.
logger.info("Key: " + key);
// Kafka Producer Record
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, value);
// Consumer한테 데이터 보내기 - 비동기
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
// 레코드가 잘 보내졌거나, 오류가 발생할 때마다 실행됨.
if (e == null) {
// 레코드 전송 성공
final String log = String.format("새 메타데이터%nTopic: %s%nPartition: %d%nOffset: %d%nTimestamp: %d",
recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), recordMetadata.timestamp());
logger.info(log);
/* 예)
새 메타데이터
Topic: first-topic
Partition: 2
Offset: 13
Timestamp: 1606555474565
*/
} else {
logger.info(String.format("오류가 발생함: %s", e));
}
}
});
}
producer.flush();
producer.close();
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment