Skip to content
Snippets Groups Projects
Commit 68da572d authored by Seok Won's avatar Seok Won
Browse files

Create Slack Kafka Consumer example

modified version of an official confluent example.

topic id: SLACK-KAFKA

this will automatically posts a message to your specified channel in Slack, if someone leaves a bad review

(for now, you can just send a json data to test from CLI, see the README file.)
parent 3066855b
No related branches found
No related tags found
No related merge requests found
......@@ -34,3 +34,44 @@ WIN10@DESKTOP:~$ pip install confluent-kafka
```
## Slack Consumer Usage
Modified version of [official Confluent example](https://github.com/confluentinc/infoq-kafka-ksql)
[Get Slack API](https://api.slack.com/)
Add "chat:write" scope to both user and bot.
Copy Bot User OAuth Access Token from OAuth & Permissions section.
```console
/INVITE @BOTNAME
```
to your channel if you see an error, "** FAILED: not_in_channel"
Using CLI or producers, send a data to your kafka topic.
```console
kafka-console-producer --broker-list localhost:9092 --topic SLACK-KAFKA
> {"CLUB_STATUS":"platinum","EMAIL":"ikr@kakao.com","STARS":1,"MESSAGE":"Exceeded all my expectations!"}
```
Whenever you send a data to kafka,
this consumer consumes email and message from the user,
and posts "EMAIL just left a bad review" to your slack channel.
```console
Result
Sending message "`ikr@kakao.com` just left a bad review :disappointed:
> Exceeded all my expectations!
_Please contact them immediately and see if we can fix the issue *right here, right now*_" to channel kafka
```
<div align="center">
<p>
<img width="480" src="https://github.com/Alfex4936/kafka-Studies/blob/main/img/slack.png">
</p>
</div>
# @rmoff / 20 Jul 2018
from slack import WebClient
from slack.errors import SlackApiError
from confluent_kafka import Consumer, KafkaError
import json
import time
# Bot User OAuth Access Token
# Scope = chat:write
token = ""
sc = WebClient(token)
# Set 'auto.offset.reset': 'smallest' if you want to consume all messages
# from the beginning of the topic
settings = {
"bootstrap.servers": "localhost:9092",
"group.id": "python_kafka_notify.py",
"default.topic.config": {"auto.offset.reset": "largest"},
}
c = Consumer(settings)
# Topic = "SLACK-KAFKA"
c.subscribe(["SLACK-KAFKA"])
try:
while True:
msg = c.poll(0.1)
time.sleep(5)
if msg is None:
continue
elif not msg.error():
print("Received message: {0}".format(msg.value()))
if msg.value() is None:
continue
try:
app_msg = json.loads(msg.value().decode())
except:
app_msg = json.loads(msg.value())
try:
email = app_msg["EMAIL"]
message = app_msg["MESSAGE"]
channel = "kafka"
text = (
"`%s` just left a bad review :disappointed:\n> %s\n\n_Please contact them immediately and see if we can fix the issue *right here, right now*_"
% (email, message)
)
print('\nSending message "%s" to channel %s' % (text, channel))
except SlackApiError as e:
print("Failed to get channel/text from message.")
print(e.response["error"])
channel = "general"
text = msg.value()
try:
sc_response = sc.chat_postMessage(
channel=channel,
text=text,
username="KSQL Notifications",
icon_emoji=":rocket:",
)
except SlackApiError as e:
assert e.response["ok"] is False
print("\t** FAILED: %s" % e.response["error"])
elif msg.error().code() == KafkaError._PARTITION_EOF:
print(
"End of partition reached {0}/{1}".format(msg.topic(), msg.partition())
)
else:
print("Error occured: {0}".format(msg.error().str()))
except Exception as e:
print(type(e))
print(dir(e))
finally:
c.close()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment