Select Git revision
SlackKafkaConsumer.py
Seok Won authored
1시간마다 모든 공지를 불러 {"TITLE": "제목", "DATE": "올린 날", "LINK": "http 주소", "WRITER": "글쓴이"}를 json 형태로 저장한다. 이 json 파일이나 새로운 공지가 있으면 기존 json과 비교해서 새로운 데이터를 Consumer로 보내고, Consumer는 새로운 데이터를 받으면, Slack API를 이용해, "#아주대" 채널에 공지를 올려준다. 마지막 파싱 시간도 기록해 종료 후 다시 불러도 1시간이 지나지 않으면 파싱하지 않는다. 결과) Last parsing: 1972-12-01 07:00:00 Trying to parse new posts... Sending a new post...: 12179 ... Last parsing: 2020-12-04 19:11:42.839219 Trying to parse new posts... No new posts yet... Resting 1 hour... ... Last parsing: 2020-12-06 11:55:35.386262 Wait for 3494 seconds to sync new posts.
SlackKafkaConsumer.py 2.20 KiB
import json
import time
import os
from confluent_kafka import Consumer, KafkaError
from slack import WebClient
from slack.errors import SlackApiError
# Bot User OAuth Access Token
# Scope = chat:write
token = os.environ["SLACK_BOT_TOKEN"]
sc = WebClient(token)
# Set 'auto.offset.reset': 'smallest' if you want to consume all messages
# from the beginning of the topic
settings = {
"bootstrap.servers": "localhost:9092",
"group.id": "kafka-notify",
"default.topic.config": {"auto.offset.reset": "largest"},
}
c = Consumer(settings)
# Topic = "SLACK-KAFKA"
c.subscribe(["SLACK-KAFKA"])
try:
while True:
msg = c.poll(0.1) # read data
time.sleep(5)
if msg is None:
continue
elif not msg.error():
print("Received message: {0}".format(msg.value()))
if msg.value() is None:
continue
try:
app_msg = json.loads(msg.value().decode())
except:
app_msg = json.loads(msg.value())
try:
user = app_msg["USER"]
message = app_msg["TEXT"]
channel = "kafka"
text = (
"`%s` found a bug :\n> %s\n\n_Please see if we can fix the issue *right here, right now*_"
% (user, message)
)
print('\nSending message "%s" to channel %s' % (text, channel))
except SlackApiError as e:
print("Failed to get channel/text from message.")
print(e.response["error"])
channel = "general"
text = msg.value()
try:
sc_response = sc.chat_postMessage(channel=channel, text=text,)
except SlackApiError as e:
assert e.response["ok"] is False
print("\t** FAILED: %s" % e.response["error"])
elif msg.error().code() == KafkaError._PARTITION_EOF:
print(
"End of partition reached {0}/{1}".format(msg.topic(), msg.partition())
)
else:
print("Error occured: {0}".format(msg.error().str()))
except Exception as e:
print(type(e))
print(dir(e))
finally:
c.close()