From 8ef04d594203c113665f86fb1f198c2aabf70422 Mon Sep 17 00:00:00 2001 From: Seok Won <alfex4936@gmail.com> Date: Sun, 6 Dec 2020 18:08:21 +0900 Subject: [PATCH] Create a Faust basic example pip install faust --- python/src/AjouSlackProducer.py | 14 +++++++------- python/src/SlackKafkaConsumer.py | 1 + python/src/fausts/hello_world.py | 11 +++++++++++ 3 files changed, 19 insertions(+), 7 deletions(-) create mode 100644 python/src/fausts/hello_world.py diff --git a/python/src/AjouSlackProducer.py b/python/src/AjouSlackProducer.py index e55943b..d1d09bc 100644 --- a/python/src/AjouSlackProducer.py +++ b/python/src/AjouSlackProducer.py @@ -52,6 +52,8 @@ BASE_DIR = os.path.dirname(os.path.abspath(__file__)) JSON_PATH = os.path.join(BASE_DIR, "already_read.json") LENGTH = 10 PRODUCED = 0 +DUMP = lambda x: json.dumps(x) +LOAD = lambda x: json.load(x) # Bot User OAuth Access Token # used scopes: channels:history, channels:read, chat:write, im:history, mpim:history, users:read @@ -63,11 +65,11 @@ if not Path(JSON_PATH).is_file(): # 파일 없으면 기본 형식 만듬 base_data = {"POSTS": {}, "LAST_PARSED": "1972-12-01 07:00:00.000000"} with open(JSON_PATH, "a+") as f: - f.write(json.dumps(base_data)) + f.write(DUMP(base_data)) # json read with open(JSON_PATH, "r+") as f_read: - read = json.load(f_read) + read = LOAD(f_read) # Set last parsed time to rest 1 hour well LAST_PARSED = datetime.datetime.strptime(read["LAST_PARSED"], "%Y-%m-%d %H:%M:%S.%f") @@ -119,9 +121,7 @@ try: PRODUCED += 1 p.produce( - Config.AJOU_TOPIC_ID, - value=json.dumps(data[postId]), - callback=acked, + Config.AJOU_TOPIC_ID, value=DUMP(data[postId]), callback=acked, ) p.poll(0.5) # 데이터 Kafka에게 전송 else: @@ -136,9 +136,9 @@ try: print("\t** FAILED: %s" % e.response["error"]) with open(JSON_PATH, "w+") as f: - f.write(json.dumps(read)) + f.write(DUMP(read)) with open(JSON_PATH, "r+") as f: - read = json.load(f) + read = LOAD(f) print("Resting 1 hour...") diff --git a/python/src/SlackKafkaConsumer.py b/python/src/SlackKafkaConsumer.py index 74981df..5f63b16 100644 --- a/python/src/SlackKafkaConsumer.py +++ b/python/src/SlackKafkaConsumer.py @@ -27,6 +27,7 @@ c = Consumer(settings) # Topic = "SLACK-KAFKA" c.subscribe(["SLACK-KAFKA"]) +# TODO: Make bolts with Apache Storm try: while True: msg = c.poll(0.1) # read data diff --git a/python/src/fausts/hello_world.py b/python/src/fausts/hello_world.py new file mode 100644 index 0000000..ac05dfe --- /dev/null +++ b/python/src/fausts/hello_world.py @@ -0,0 +1,11 @@ +import faust + +app = faust.App("hello-world", broker="kafka://localhost:9092", value_serializer="raw",) + +greetings_topic = app.topic("first-topic") + + +@app.agent(greetings_topic) +async def greet(greetings): + async for greeting in greetings: + print(greeting) -- GitLab