From 3886280c193e6ca17e9c3e5b0d9aac43546f2e8c Mon Sep 17 00:00:00 2001 From: Seok Won <alfex4936@gmail.com> Date: Mon, 30 Nov 2020 12:18:29 +0900 Subject: [PATCH] Create Simple Producer in Python --- README.md | 4 +-- python/README.md | 36 +++++++++++++++++++ python/src/ProducerDemo.py | 8 +++++ python/src/ProducerDemoCallBack.py | 24 +++++++++++++ python/src/__init__.py | 0 python/src/config.py | 4 +++ .../study/lesson1/ProducerDemoWithKey.java | 2 +- 7 files changed, 75 insertions(+), 3 deletions(-) create mode 100644 python/README.md create mode 100644 python/src/ProducerDemo.py create mode 100644 python/src/ProducerDemoCallBack.py create mode 100644 python/src/__init__.py create mode 100644 python/src/config.py diff --git a/README.md b/README.md index 376f70c..e65f661 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ <p> <img width="480" src="https://www.andplus.com/hs-fs/hubfs/kafkalogo.jpg?&name=kafkalogo.jpg"> </p> -<h1>Apache Kafka 공부</h1> +<h1>Apache Kafka 공부 (Java, Python)</h1> <h5>v2.13-2.6.0</h5> [Apache Kafka](https://kafka.apache.org/) @@ -14,7 +14,7 @@ <table> <tr><td width=40% valign=top> -* Kafka +* Kafka (Java) * [Simple Producer](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson1/ProducerDemo.java) * [Producer with callback](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson1/ProducerDemoCallBack.java) * [Producer with key](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson1/ProducerDemoWithKey.java) diff --git a/python/README.md b/python/README.md new file mode 100644 index 0000000..e540dc5 --- /dev/null +++ b/python/README.md @@ -0,0 +1,36 @@ +<div align="center"> +<p> + <img width="480" src="https://www.andplus.com/hs-fs/hubfs/kafkalogo.jpg?&name=kafkalogo.jpg"> +</p> +<h1>Apache Kafka 공부 in Python</h1> + <h5>v2.13-2.6.0</h5> + +[Apache Kafka](https://kafka.apache.org/) + +</div> + +## 설치 +[Kafka 다운로드](https://kafka.apache.org/downloads) + +*Make sure to Download "Binary"* + +## 실행 + +zookeeper & kafka 서버 실행 + +```console + +WIN10@DESKTOP:~$ zookeeper-server-start config/zookeeper.properties + +WIN10@DESKTOP:~$ kafka-server-start config/server.properties + +``` + +Kafka 설치 + +```console + +WIN10@DESKTOP:~$ pip install confluent-kafka + +``` + diff --git a/python/src/ProducerDemo.py b/python/src/ProducerDemo.py new file mode 100644 index 0000000..9a8e8d6 --- /dev/null +++ b/python/src/ProducerDemo.py @@ -0,0 +1,8 @@ +from confluent_kafka import Producer +from config import Config + +p = Producer({"bootstrap.servers": Config.MY_SERVER}) +p.produce(Config.TOPIC_ID, key="key_1", value="Hello") +p.flush(100) + +# kafka-console-consumer --bootstrap-server localhost:9092 --topic first-topic diff --git a/python/src/ProducerDemoCallBack.py b/python/src/ProducerDemoCallBack.py new file mode 100644 index 0000000..b196cd5 --- /dev/null +++ b/python/src/ProducerDemoCallBack.py @@ -0,0 +1,24 @@ +from confluent_kafka import Producer +from config import Config + + +def acked(err, msg): + if err is not None: + print("Failed to deliver message: {0}: {1}".format(msg.value(), err.str())) + else: + print("Message produced: {0}".format(msg.value())) # binary + + +p = Producer({"bootstrap.servers": Config.MY_SERVER}) + +try: + for val in range(1, 5): + p.produce(Config.TOPIC_ID, "value #{0}".format(val), callback=acked) + p.poll(0.5) + +except KeyboardInterrupt: + pass + +p.flush(100) + +# kafka-console-consumer --bootstrap-server localhost:9092 --topic first-topic diff --git a/python/src/__init__.py b/python/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python/src/config.py b/python/src/config.py new file mode 100644 index 0000000..89de0a6 --- /dev/null +++ b/python/src/config.py @@ -0,0 +1,4 @@ +class Config: + MY_SERVER = "localhost:9092" + TOPIC_ID = "first-topic" + GROUP_ID = "group-one" diff --git a/src/main/java/csw/kafka/study/lesson1/ProducerDemoWithKey.java b/src/main/java/csw/kafka/study/lesson1/ProducerDemoWithKey.java index 1baedd1..8c8c198 100644 --- a/src/main/java/csw/kafka/study/lesson1/ProducerDemoWithKey.java +++ b/src/main/java/csw/kafka/study/lesson1/ProducerDemoWithKey.java @@ -31,7 +31,7 @@ public class ProducerDemoWithKey { logger.info("Key: " + key); // Kafka Producer Record - ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, value); + ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value); // Consumer한테 데이터 보내기 - 비동기 producer.send(record, new Callback() { -- GitLab