diff --git a/python/src/SlackKafkaConsumer.py b/python/src/SlackKafkaConsumer.py index d6c3524ba8a2838e07663e695de4a4bf17d149df..61eb4827b9048fab627d046f07605d9e2630ebf4 100644 --- a/python/src/SlackKafkaConsumer.py +++ b/python/src/SlackKafkaConsumer.py @@ -1,10 +1,10 @@ -# @rmoff / 20 Jul 2018 +import json +import time + +from confluent_kafka import Consumer, KafkaError from slack import WebClient from slack.errors import SlackApiError -from confluent_kafka import Consumer, KafkaError -import json -import time # Bot User OAuth Access Token # Scope = chat:write @@ -16,7 +16,7 @@ sc = WebClient(token) # from the beginning of the topic settings = { "bootstrap.servers": "localhost:9092", - "group.id": "python_kafka_notify.py", + "group.id": "kafka-notify", "default.topic.config": {"auto.offset.reset": "largest"}, } c = Consumer(settings) @@ -41,12 +41,12 @@ try: app_msg = json.loads(msg.value()) try: - email = app_msg["EMAIL"] - message = app_msg["MESSAGE"] + user = app_msg["USER"] + message = app_msg["TEXT"] channel = "kafka" text = ( - "`%s` just left a bad review :disappointed:\n> %s\n\n_Please contact them immediately and see if we can fix the issue *right here, right now*_" - % (email, message) + "`%s` found a bug :\n> %s\n\n_Please see if we can fix the issue *right here, right now*_" + % (user, message) ) print('\nSending message "%s" to channel %s' % (text, channel)) except SlackApiError as e: @@ -56,12 +56,7 @@ try: text = msg.value() try: - sc_response = sc.chat_postMessage( - channel=channel, - text=text, - username="KSQL Notifications", - icon_emoji=":rocket:", - ) + sc_response = sc.chat_postMessage(channel=channel, text=text,) except SlackApiError as e: assert e.response["ok"] is False print("\t** FAILED: %s" % e.response["error"]) diff --git a/python/src/SlackKafkaProducer.py b/python/src/SlackKafkaProducer.py new file mode 100644 index 0000000000000000000000000000000000000000..ddd438f2c823082c9c0890f6bc4271c120863586 --- /dev/null +++ b/python/src/SlackKafkaProducer.py @@ -0,0 +1,84 @@ +import json +import time + +from config import Config +from confluent_kafka import Producer +from slack import WebClient +from slack.errors import SlackApiError + + +# Bot User OAuth Access Token +# used scopes: channels:history, channels:read, chat:write, im:history, mpim:history, users:read +token = "" + +# Slack API 초기화 +sc = WebClient(token) + +# Kafka Producer 만들기 "localhost:9092" +settings = {"bootstrap.servers": Config.MY_SERVER} +p = Producer(settings) + + +def acked(err, msg): # callback + if err is not None: + print("Failed to deliver message: {0}: {1}".format(msg.value(), err.str())) + else: + print("Message produced: {0}".format(msg.value())) # binary + + +channel = "C01FVD0QD42" # 아래 sc.conversations_list로 id를 확인 + + +# channel_name = "일반" +# try: +# sc_response = sc.conversations_list(channel=channel) +# # for channel in sc_response["channels"]: +# # print(channel["name"]) +# # if channel["name"] == channel_name: +# # channel_id = channel["id"] +# except SlackApiError as e: +# assert e.response["ok"] is False +# print("\t** FAILED: %s" % e.response["error"]) + + +posts = {} # 켤 때마다 중복 메시지 받음, 파일에 저장하는 형식으로 하면 더 나음. + +# 매 5초마다 메시지를 계속 읽어옴. +# ratelimited 에러가 발생하면, 시간대를 늘려야 함. +try: + time.sleep(5) + while True: + try: + sc_response = sc.conversations_history(channel=channel) + for msg in sc_response["messages"]: + if msg["ts"] not in posts: # 없는 메시지 + posts[msg["ts"]] = True + if "bug" in msg["text"].lower(): # bug를 포함한 글임 + print("Someone posted a bug...") + name = sc.users_info(user=msg["user"])["user"][ + "name" + ] # user id를 name으로 변환 + data = {"USER": name, "TEXT": msg["text"]} + + # 데이터 Consumer에게 전송 + p.produce( + Config.SLACK_TOPID_ID, + value=json.dumps(data), + callback=acked, + ) + p.poll(0.5) + else: + # 파일에 저장할 수도 + continue + except SlackApiError as e: + assert e.response["ok"] is False + print("\t** FAILED: %s" % e.response["error"]) + + +except Exception as e: + print(type(e)) + print(dir(e)) + +finally: + print("Exiting...") + p.flush(100) diff --git a/python/src/config.py b/python/src/config.py index 48efc61672ae3815cb2d372a45ae0ee19ec917fb..9df9e1012bda1efc162e286ba8541bef98fe3f94 100644 --- a/python/src/config.py +++ b/python/src/config.py @@ -1,6 +1,7 @@ class Config: MY_SERVER = "localhost:9092" TOPIC_ID = "first-topic" + SLACK_TOPID_ID = "SLACK-KAFKA" GROUP_ID = "group-one" CLIENT_ID = "client-1"