Skip to content
Snippets Groups Projects
Select Git revision
  • ab8724c7a964661b8f62245a756aee3b60328dbb
  • main default protected
2 results

SlackKafkaConsumer.py

Blame
  • user avatar
    Seok Won authored
    1시간마다 모든 공지를 불러 {"TITLE": "제목", "DATE": "올린 날", "LINK": "http 주소", "WRITER": "글쓴이"}를 json 형태로 저장한다.
    
    이 json 파일이나 새로운 공지가 있으면 기존 json과 비교해서 새로운 데이터를 Consumer로 보내고, Consumer는 새로운 데이터를 받으면, Slack API를 이용해, "#아주대" 채널에 공지를 올려준다.
    
    마지막 파싱 시간도 기록해 종료 후 다시 불러도 1시간이 지나지 않으면 파싱하지 않는다.
    
    결과)
    Last parsing: 1972-12-01 07:00:00
    Trying to parse new posts...
    Sending a new post...: 12179
    ...
    
    Last parsing: 2020-12-04 19:11:42.839219
    Trying to parse new posts...
    No new posts yet...
    Resting 1 hour...
    ...
    Last parsing: 2020-12-06 11:55:35.386262
    Wait for 3494 seconds to sync new posts.
    df21baf7
    History
    SlackKafkaConsumer.py 2.20 KiB
    import json
    import time
    
    
    import os
    
    from confluent_kafka import Consumer, KafkaError
    from slack import WebClient
    from slack.errors import SlackApiError
    
    
    # Bot User OAuth Access Token
    # Scope = chat:write
    token = os.environ["SLACK_BOT_TOKEN"]
    
    sc = WebClient(token)
    
    # Set 'auto.offset.reset': 'smallest' if you want to consume all messages
    # from the beginning of the topic
    settings = {
        "bootstrap.servers": "localhost:9092",
        "group.id": "kafka-notify",
        "default.topic.config": {"auto.offset.reset": "largest"},
    }
    c = Consumer(settings)
    
    # Topic = "SLACK-KAFKA"
    c.subscribe(["SLACK-KAFKA"])
    
    try:
        while True:
            msg = c.poll(0.1)  # read data
            time.sleep(5)
            if msg is None:
                continue
            elif not msg.error():
                print("Received message: {0}".format(msg.value()))
                if msg.value() is None:
                    continue
    
                try:
                    app_msg = json.loads(msg.value().decode())
                except:
                    app_msg = json.loads(msg.value())
    
                try:
                    user = app_msg["USER"]
                    message = app_msg["TEXT"]
                    channel = "kafka"
                    text = (
                        "`%s` found a bug :\n> %s\n\n_Please see if we can fix the issue *right here, right now*_"
                        % (user, message)
                    )
                    print('\nSending message "%s" to channel %s' % (text, channel))
                except SlackApiError as e:
                    print("Failed to get channel/text from message.")
                    print(e.response["error"])
                    channel = "general"
                    text = msg.value()
    
                try:
                    sc_response = sc.chat_postMessage(channel=channel, text=text,)
                except SlackApiError as e:
                    assert e.response["ok"] is False
                    print("\t** FAILED: %s" % e.response["error"])
            elif msg.error().code() == KafkaError._PARTITION_EOF:
                print(
                    "End of partition reached {0}/{1}".format(msg.topic(), msg.partition())
                )
            else:
                print("Error occured: {0}".format(msg.error().str()))
    
    except Exception as e:
        print(type(e))
        print(dir(e))
    
    finally:
        c.close()