diff --git a/python/README.md b/python/README.md index e540dc5018c43664726cd7d881a9a9175beb03ed..0a665e4347930b39966873a1cc5704a5c621ab8c 100644 --- a/python/README.md +++ b/python/README.md @@ -34,3 +34,44 @@ WIN10@DESKTOP:~$ pip install confluent-kafka ``` +## Slack Consumer Usage + +Modified version of [official Confluent example](https://github.com/confluentinc/infoq-kafka-ksql) + +[Get Slack API](https://api.slack.com/) + +Add "chat:write" scope to both user and bot. + +Copy Bot User OAuth Access Token from OAuth & Permissions section. + +```console +/INVITE @BOTNAME +``` + +to your channel if you see an error, "** FAILED: not_in_channel" + +Using CLI or producers, send a data to your kafka topic. + +```console +kafka-console-producer --broker-list localhost:9092 --topic SLACK-KAFKA +> {"CLUB_STATUS":"platinum","EMAIL":"ikr@kakao.com","STARS":1,"MESSAGE":"Exceeded all my expectations!"} +``` + +Whenever you send a data to kafka, +this consumer consumes email and message from the user, +and posts "EMAIL just left a bad review" to your slack channel. + +```console +Result + +Sending message "`ikr@kakao.com` just left a bad review :disappointed: +> Exceeded all my expectations! + +_Please contact them immediately and see if we can fix the issue *right here, right now*_" to channel kafka +``` + +<div align="center"> +<p> + <img width="480" src="https://github.com/Alfex4936/kafka-Studies/blob/main/img/slack.png"> +</p> +</div> diff --git a/python/src/SlackKafkaConsumer.py b/python/src/SlackKafkaConsumer.py new file mode 100644 index 0000000000000000000000000000000000000000..d6c3524ba8a2838e07663e695de4a4bf17d149df --- /dev/null +++ b/python/src/SlackKafkaConsumer.py @@ -0,0 +1,80 @@ +# @rmoff / 20 Jul 2018 +from slack import WebClient +from slack.errors import SlackApiError + +from confluent_kafka import Consumer, KafkaError +import json +import time + +# Bot User OAuth Access Token +# Scope = chat:write +token = "" + +sc = WebClient(token) + +# Set 'auto.offset.reset': 'smallest' if you want to consume all messages +# from the beginning of the topic +settings = { + "bootstrap.servers": "localhost:9092", + "group.id": "python_kafka_notify.py", + "default.topic.config": {"auto.offset.reset": "largest"}, +} +c = Consumer(settings) + +# Topic = "SLACK-KAFKA" +c.subscribe(["SLACK-KAFKA"]) + +try: + while True: + msg = c.poll(0.1) + time.sleep(5) + if msg is None: + continue + elif not msg.error(): + print("Received message: {0}".format(msg.value())) + if msg.value() is None: + continue + + try: + app_msg = json.loads(msg.value().decode()) + except: + app_msg = json.loads(msg.value()) + + try: + email = app_msg["EMAIL"] + message = app_msg["MESSAGE"] + channel = "kafka" + text = ( + "`%s` just left a bad review :disappointed:\n> %s\n\n_Please contact them immediately and see if we can fix the issue *right here, right now*_" + % (email, message) + ) + print('\nSending message "%s" to channel %s' % (text, channel)) + except SlackApiError as e: + print("Failed to get channel/text from message.") + print(e.response["error"]) + channel = "general" + text = msg.value() + + try: + sc_response = sc.chat_postMessage( + channel=channel, + text=text, + username="KSQL Notifications", + icon_emoji=":rocket:", + ) + except SlackApiError as e: + assert e.response["ok"] is False + print("\t** FAILED: %s" % e.response["error"]) + elif msg.error().code() == KafkaError._PARTITION_EOF: + print( + "End of partition reached {0}/{1}".format(msg.topic(), msg.partition()) + ) + else: + print("Error occured: {0}".format(msg.error().str())) + +except Exception as e: + print(type(e)) + print(dir(e)) + +finally: + c.close()