Skip to content
Snippets Groups Projects
Commit cfd8d2f8 authored by Seok Won's avatar Seok Won
Browse files

Create Slack Kafka Producer

this producer reads all messages from "#general", in every 5 seconds (this will cause ratelimited)

and if messages contain a word "bug", it will automatically sends "USERNAME" and "MESSAGE" to Kafka consumer.

And the consumer will leave a message in "#kafka" channel saying "USER found a bug ..."
parent a4508118
No related branches found
No related tags found
No related merge requests found
# @rmoff / 20 Jul 2018
import json
import time
from confluent_kafka import Consumer, KafkaError
from slack import WebClient
from slack.errors import SlackApiError
from confluent_kafka import Consumer, KafkaError
import json
import time
# Bot User OAuth Access Token
# Scope = chat:write
......@@ -16,7 +16,7 @@ sc = WebClient(token)
# from the beginning of the topic
settings = {
"bootstrap.servers": "localhost:9092",
"group.id": "python_kafka_notify.py",
"group.id": "kafka-notify",
"default.topic.config": {"auto.offset.reset": "largest"},
}
c = Consumer(settings)
......@@ -41,12 +41,12 @@ try:
app_msg = json.loads(msg.value())
try:
email = app_msg["EMAIL"]
message = app_msg["MESSAGE"]
user = app_msg["USER"]
message = app_msg["TEXT"]
channel = "kafka"
text = (
"`%s` just left a bad review :disappointed:\n> %s\n\n_Please contact them immediately and see if we can fix the issue *right here, right now*_"
% (email, message)
"`%s` found a bug :\n> %s\n\n_Please see if we can fix the issue *right here, right now*_"
% (user, message)
)
print('\nSending message "%s" to channel %s' % (text, channel))
except SlackApiError as e:
......@@ -56,12 +56,7 @@ try:
text = msg.value()
try:
sc_response = sc.chat_postMessage(
channel=channel,
text=text,
username="KSQL Notifications",
icon_emoji=":rocket:",
)
sc_response = sc.chat_postMessage(channel=channel, text=text,)
except SlackApiError as e:
assert e.response["ok"] is False
print("\t** FAILED: %s" % e.response["error"])
......
import json
import time
from config import Config
from confluent_kafka import Producer
from slack import WebClient
from slack.errors import SlackApiError
# Bot User OAuth Access Token
# used scopes: channels:history, channels:read, chat:write, im:history, mpim:history, users:read
token = ""
# Slack API 초기화
sc = WebClient(token)
# Kafka Producer 만들기 "localhost:9092"
settings = {"bootstrap.servers": Config.MY_SERVER}
p = Producer(settings)
def acked(err, msg): # callback
if err is not None:
print("Failed to deliver message: {0}: {1}".format(msg.value(), err.str()))
else:
print("Message produced: {0}".format(msg.value())) # binary
channel = "C01FVD0QD42" # 아래 sc.conversations_list로 id를 확인
# channel_name = "일반"
# try:
# sc_response = sc.conversations_list(channel=channel)
# # for channel in sc_response["channels"]:
# # print(channel["name"])
# # if channel["name"] == channel_name:
# # channel_id = channel["id"]
# except SlackApiError as e:
# assert e.response["ok"] is False
# print("\t** FAILED: %s" % e.response["error"])
posts = {} # 켤 때마다 중복 메시지 받음, 파일에 저장하는 형식으로 하면 더 나음.
# 매 5초마다 메시지를 계속 읽어옴.
# ratelimited 에러가 발생하면, 시간대를 늘려야 함.
try:
time.sleep(5)
while True:
try:
sc_response = sc.conversations_history(channel=channel)
for msg in sc_response["messages"]:
if msg["ts"] not in posts: # 없는 메시지
posts[msg["ts"]] = True
if "bug" in msg["text"].lower(): # bug를 포함한 글임
print("Someone posted a bug...")
name = sc.users_info(user=msg["user"])["user"][
"name"
] # user id를 name으로 변환
data = {"USER": name, "TEXT": msg["text"]}
# 데이터 Consumer에게 전송
p.produce(
Config.SLACK_TOPID_ID,
value=json.dumps(data),
callback=acked,
)
p.poll(0.5)
else:
# 파일에 저장할 수도
continue
except SlackApiError as e:
assert e.response["ok"] is False
print("\t** FAILED: %s" % e.response["error"])
except Exception as e:
print(type(e))
print(dir(e))
finally:
print("Exiting...")
p.flush(100)
class Config:
MY_SERVER = "localhost:9092"
TOPIC_ID = "first-topic"
SLACK_TOPID_ID = "SLACK-KAFKA"
GROUP_ID = "group-one"
CLIENT_ID = "client-1"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment