Skip to content
Snippets Groups Projects
Commit 3886280c authored by Seok Won's avatar Seok Won
Browse files

Create Simple Producer in Python

parent c998f7bf
No related branches found
No related tags found
No related merge requests found
......@@ -2,7 +2,7 @@
<p>
<img width="480" src="https://www.andplus.com/hs-fs/hubfs/kafkalogo.jpg?&name=kafkalogo.jpg">
</p>
<h1>Apache Kafka 공부</h1>
<h1>Apache Kafka 공부 (Java, Python)</h1>
<h5>v2.13-2.6.0</h5>
[Apache Kafka](https://kafka.apache.org/)
......@@ -14,7 +14,7 @@
<table>
<tr><td width=40% valign=top>
* Kafka
* Kafka (Java)
* [Simple Producer](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson1/ProducerDemo.java)
* [Producer with callback](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson1/ProducerDemoCallBack.java)
* [Producer with key](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson1/ProducerDemoWithKey.java)
......
<div align="center">
<p>
<img width="480" src="https://www.andplus.com/hs-fs/hubfs/kafkalogo.jpg?&name=kafkalogo.jpg">
</p>
<h1>Apache Kafka 공부 in Python</h1>
<h5>v2.13-2.6.0</h5>
[Apache Kafka](https://kafka.apache.org/)
</div>
## 설치
[Kafka 다운로드](https://kafka.apache.org/downloads)
*Make sure to Download "Binary"*
## 실행
zookeeper & kafka 서버 실행
```console
WIN10@DESKTOP:~$ zookeeper-server-start config/zookeeper.properties
WIN10@DESKTOP:~$ kafka-server-start config/server.properties
```
Kafka 설치
```console
WIN10@DESKTOP:~$ pip install confluent-kafka
```
from confluent_kafka import Producer
from config import Config
p = Producer({"bootstrap.servers": Config.MY_SERVER})
p.produce(Config.TOPIC_ID, key="key_1", value="Hello")
p.flush(100)
# kafka-console-consumer --bootstrap-server localhost:9092 --topic first-topic
from confluent_kafka import Producer
from config import Config
def acked(err, msg):
if err is not None:
print("Failed to deliver message: {0}: {1}".format(msg.value(), err.str()))
else:
print("Message produced: {0}".format(msg.value())) # binary
p = Producer({"bootstrap.servers": Config.MY_SERVER})
try:
for val in range(1, 5):
p.produce(Config.TOPIC_ID, "value #{0}".format(val), callback=acked)
p.poll(0.5)
except KeyboardInterrupt:
pass
p.flush(100)
# kafka-console-consumer --bootstrap-server localhost:9092 --topic first-topic
class Config:
MY_SERVER = "localhost:9092"
TOPIC_ID = "first-topic"
GROUP_ID = "group-one"
......@@ -31,7 +31,7 @@ public class ProducerDemoWithKey {
logger.info("Key: " + key);
// Kafka Producer Record
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, value);
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
// Consumer한테 데이터 보내기 - 비동기
producer.send(record, new Callback() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment