diff --git a/python/src/AjouSlackProducer.py b/python/src/AjouSlackProducer.py index e55943bd984e4c272b263a7d712510ea0b97c436..d1d09bc7919ff6da321d9d699e2a71df237a1c34 100644 --- a/python/src/AjouSlackProducer.py +++ b/python/src/AjouSlackProducer.py @@ -52,6 +52,8 @@ BASE_DIR = os.path.dirname(os.path.abspath(__file__)) JSON_PATH = os.path.join(BASE_DIR, "already_read.json") LENGTH = 10 PRODUCED = 0 +DUMP = lambda x: json.dumps(x) +LOAD = lambda x: json.load(x) # Bot User OAuth Access Token # used scopes: channels:history, channels:read, chat:write, im:history, mpim:history, users:read @@ -63,11 +65,11 @@ if not Path(JSON_PATH).is_file(): # 파일 없으면 기본 형식 만듬 base_data = {"POSTS": {}, "LAST_PARSED": "1972-12-01 07:00:00.000000"} with open(JSON_PATH, "a+") as f: - f.write(json.dumps(base_data)) + f.write(DUMP(base_data)) # json read with open(JSON_PATH, "r+") as f_read: - read = json.load(f_read) + read = LOAD(f_read) # Set last parsed time to rest 1 hour well LAST_PARSED = datetime.datetime.strptime(read["LAST_PARSED"], "%Y-%m-%d %H:%M:%S.%f") @@ -119,9 +121,7 @@ try: PRODUCED += 1 p.produce( - Config.AJOU_TOPIC_ID, - value=json.dumps(data[postId]), - callback=acked, + Config.AJOU_TOPIC_ID, value=DUMP(data[postId]), callback=acked, ) p.poll(0.5) # 데이터 Kafka에게 전송 else: @@ -136,9 +136,9 @@ try: print("\t** FAILED: %s" % e.response["error"]) with open(JSON_PATH, "w+") as f: - f.write(json.dumps(read)) + f.write(DUMP(read)) with open(JSON_PATH, "r+") as f: - read = json.load(f) + read = LOAD(f) print("Resting 1 hour...") diff --git a/python/src/SlackKafkaConsumer.py b/python/src/SlackKafkaConsumer.py index 74981dfa8a12bd7273192b041924b91e0086ac2c..5f63b16f194d90c0a0597b51c24111f4caf643e8 100644 --- a/python/src/SlackKafkaConsumer.py +++ b/python/src/SlackKafkaConsumer.py @@ -27,6 +27,7 @@ c = Consumer(settings) # Topic = "SLACK-KAFKA" c.subscribe(["SLACK-KAFKA"]) +# TODO: Make bolts with Apache Storm try: while True: msg = c.poll(0.1) # read data diff --git a/python/src/fausts/hello_world.py b/python/src/fausts/hello_world.py new file mode 100644 index 0000000000000000000000000000000000000000..ac05dfe5fda98a7d12025cc3f666b80a169396e4 --- /dev/null +++ b/python/src/fausts/hello_world.py @@ -0,0 +1,11 @@ +import faust + +app = faust.App("hello-world", broker="kafka://localhost:9092", value_serializer="raw",) + +greetings_topic = app.topic("first-topic") + + +@app.agent(greetings_topic) +async def greet(greetings): + async for greeting in greetings: + print(greeting)