diff --git a/img/slack_ajou.png b/img/slack_ajou.png new file mode 100644 index 0000000000000000000000000000000000000000..46307c275336b55279f9bfe61b26cad09d1a4af3 Binary files /dev/null and b/img/slack_ajou.png differ diff --git a/python/README.md b/python/README.md index d9e159b8ea383b8aa80d4e8f501ae255626e9b38..591b4bc2331f8148b8ffaf871bbb4a631b6503ff 100644 --- a/python/README.md +++ b/python/README.md @@ -34,6 +34,36 @@ WIN10@DESKTOP:~$ pip install confluent-kafka ``` +## Slack API with Ajou University notices parser + +[Get Slack API here](https://api.slack.com/) + +First, invite your bot to your channel. (In this example, we set it as "#아주대") + +The [producer](https://github.com/Alfex4936/kafka-Studies/tree/main/python/src/AjouSlackProducer.py) will notify the [consumer](https://github.com/Alfex4936/kafka-Studies/tree/main/python/src/AjouSlackConsumer.py) whenever there are new notices. + +The *producer* checks new notices per an hour, saves latest 10 notices to json file, +and sees if there is/are a new notice/s. + +If there is a new notice, it sends {"TITLE": "title", "DATE": "post date", "LINK": "http address", "WRITER": "writer"} to the consumer. + +The *consumer* checks new datas every 5 seconds, if it gets a new data, +it consumes the data and leave a comment like this below image. + +<div align="center"> +<p> + <img width="480" src="https://github.com/Alfex4936/kafka-Studies/blob/main/img/slack_ajou.png"> +</p> +</div> + +:b: Run the server first to see the results. + +```console +WIN10@DESKTOP:~$ python AjouSlackProducer.py + +WIN10@DESKTOP:~$ python AjouSlackConsumer.py +``` + ## Slack API Producer Usage [Get Slack API](https://api.slack.com/) diff --git a/python/src/AjouSlackConsumer.py b/python/src/AjouSlackConsumer.py new file mode 100644 index 0000000000000000000000000000000000000000..60f60d8543a4cce5ff6a2c36933081ea1ed236e7 --- /dev/null +++ b/python/src/AjouSlackConsumer.py @@ -0,0 +1,89 @@ +import json +import time + +import os + +from config import Config +from confluent_kafka import Consumer, KafkaError +from slack import WebClient +from slack.errors import SlackApiError + + +# Bot User OAuth Access Token +token = os.environ["SLACK_BOT_TOKEN"] + +sc = WebClient(token) + +# Set 'auto.offset.reset': 'smallest' if you want to consume all messages +# from the beginning of the topic +settings = { + "bootstrap.servers": Config.MY_SERVER, + "group.id": "ajou-notify", + "default.topic.config": {"auto.offset.reset": "largest"}, +} +c = Consumer(settings) + +# Topic = "AJOU-NOTIFY +c.subscribe([Config.AJOU_TOPIC_ID]) + +try: + while True: + msg = c.poll(0.1) + time.sleep(5) + if msg is None: + continue + elif not msg.error(): + print("Received a message: {0}".format(msg.value())) + if msg.value() is None: + continue + + try: + app_msg = json.loads(msg.value().decode()) + except: + app_msg = json.loads(msg.value()) + + try: + title = app_msg["TITLE"] + date = app_msg["DATE"] + href = app_msg["LINK"] + writer = app_msg["WRITER"] + + channel = "아주대" + # TODO: 학사면 좀 더 중요하게? + text = ":star: `%s` 새로운 공지!\n>%s: %s\n>링크: <%s|공지 확인하기>" % ( + date, + writer, + title, + href, + ) + print('\nSending message "%s" to channel %s' % (text, channel)) + except SlackApiError as e: + print("Failed to get channel/text from message.") + print(e.response["error"]) + channel = "kafka" + text = msg.value() + + try: + sc_response = sc.chat_postMessage( + channel=channel, text=text, as_user=True, username="아주대 공지 봇" + ) # as_user은 new slack app에서 작동 안 함 + + except SlackApiError as e: + assert e.response["ok"] is False + print("\t** FAILED: %s" % e.response["error"]) + elif msg.error().code() == KafkaError._PARTITION_EOF: + print( + "End of partition reached {0}/{1}".format(msg.topic(), msg.partition()) + ) + else: + print("Error occured: {0}".format(msg.error().str())) + +except Exception as e: + print(type(e)) + print(dir(e)) + +except KeyboardInterrupt: + print("Pressed CTRL+C...") + +finally: + c.close() diff --git a/python/src/AjouSlackProducer.py b/python/src/AjouSlackProducer.py new file mode 100644 index 0000000000000000000000000000000000000000..e55943bd984e4c272b263a7d712510ea0b97c436 --- /dev/null +++ b/python/src/AjouSlackProducer.py @@ -0,0 +1,157 @@ +import datetime +import json +import os +import time +from contextlib import contextmanager +from pathlib import Path + +import requests +from bs4 import BeautifulSoup +from config import Config +from confluent_kafka import Producer +from slack import WebClient +from slack.errors import SlackApiError + +# Producer callback function +def acked(err, msg): + if err is not None: + print("Failed to deliver message: {0}: {1}".format(msg.value(), err.str())) + else: + print("Message produced: {0}".format(msg.value())) # binary + + +# Make data into dictionary format +def makeJson(postId, postTitle, postDate, postLink, postWriter): + return { + postId: { + "TITLE": postTitle, + "DATE": postDate, + "LINK": ADDRESS + postLink, + "WRITER": postWriter, + } + } + + +# Ajou notices parser +def parser(): + req = requests.get(f"{ADDRESS}?mode=list&&articleLimit=10&article.offset=0") + req.encoding = "utf-8" + html = req.text + soup = BeautifulSoup(html, "html.parser") + ids = soup.select("table > tbody > tr > td.b-num-box") + posts = soup.select("table > tbody > tr > td.b-td-left > div > a") + dates = soup.select("table > tbody > tr > td.b-td-left > div > div > span.b-date") + writers = soup.select( + "table > tbody > tr > td.b-td-left > div > div.b-m-con > span.b-writer" + ) + return ids, posts, dates, writers + + +ADDRESS = "https://www.ajou.ac.kr/kr/ajou/notice.do" +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) +JSON_PATH = os.path.join(BASE_DIR, "already_read.json") +LENGTH = 10 +PRODUCED = 0 + +# Bot User OAuth Access Token +# used scopes: channels:history, channels:read, chat:write, im:history, mpim:history, users:read +token = os.environ["SLACK_BOT_TOKEN"] + +read = None +# 공지 Parser +if not Path(JSON_PATH).is_file(): # 파일 없으면 기본 형식 만듬 + base_data = {"POSTS": {}, "LAST_PARSED": "1972-12-01 07:00:00.000000"} + + with open(JSON_PATH, "a+") as f: + f.write(json.dumps(base_data)) + +# json read +with open(JSON_PATH, "r+") as f_read: + read = json.load(f_read) + +# Set last parsed time to rest 1 hour well +LAST_PARSED = datetime.datetime.strptime(read["LAST_PARSED"], "%Y-%m-%d %H:%M:%S.%f") + +# init parser +ids, posts, dates, writers = parser() + +# Slack API 초기화 +sc = WebClient(token) +channel = "C01G2CR5MEE" # 아주대 + +# Kafka Producer 만들기 "localhost:9092" +settings = {"bootstrap.servers": Config.MY_SERVER} +p = Producer(settings) + +try: + while True: # 1시간마다 무한 반복 + PRODUCED = 0 + LAST_PARSED = datetime.datetime.strptime( + read["LAST_PARSED"], "%Y-%m-%d %H:%M:%S.%f" + ) + + try: + now = datetime.datetime.now() + diff = (now - LAST_PARSED).seconds + print("Last parsing:", LAST_PARSED) + if diff / 3600 < 1: # 업데이트 후 1시간이 안 지났음, 대기 + print(f"Wait for {3600 - diff} seconds to sync new posts.") + time.sleep(3600 - diff) + + read["LAST_PARSED"] = now.strftime("%Y-%m-%d %H:%M:%S.%f") + + print("Trying to parse new posts...") + ids, posts, dates, writers = parser() # 다시 파싱 + for i in range(LENGTH): + postId = ids[i].text.strip() + postLink = posts[i].get("href") + postTitle = posts[i].text.strip() + # postTitle = posts[i].get("title") + postDate = dates[i].text.strip() + postWriter = writers[i].text + + data = makeJson(postId, postTitle, postDate, postLink, postWriter) + # {'10000': {'TITLE': '설문조사', 'DATE': '20.12.04', 'LINK': 'https', 'WRITER': '입학처'}} + + if postId not in read["POSTS"]: + print("Sending a new post...:", postId) + read["POSTS"].update(data) + + PRODUCED += 1 + p.produce( + Config.AJOU_TOPIC_ID, + value=json.dumps(data[postId]), + callback=acked, + ) + p.poll(0.5) # 데이터 Kafka에게 전송 + else: + continue + if PRODUCED: + print(f"Sent {PRODUCED} posts...") + else: + print("No new posts yet...") + + except SlackApiError as e: + assert e.response["ok"] is False + print("\t** FAILED: %s" % e.response["error"]) + + with open(JSON_PATH, "w+") as f: + f.write(json.dumps(read)) + with open(JSON_PATH, "r+") as f: + read = json.load(f) + + print("Resting 1 hour...") + + time.sleep(3600) + + +except Exception as e: + print(type(e)) + print(dir(e)) + +except KeyboardInterrupt: + print("Pressed CTRL+C...") + +finally: + print("Exiting...") + p.flush(100) diff --git a/python/src/SlackKafkaConsumer.py b/python/src/SlackKafkaConsumer.py index 61eb4827b9048fab627d046f07605d9e2630ebf4..74981dfa8a12bd7273192b041924b91e0086ac2c 100644 --- a/python/src/SlackKafkaConsumer.py +++ b/python/src/SlackKafkaConsumer.py @@ -1,6 +1,9 @@ import json import time + +import os + from confluent_kafka import Consumer, KafkaError from slack import WebClient from slack.errors import SlackApiError @@ -8,7 +11,7 @@ from slack.errors import SlackApiError # Bot User OAuth Access Token # Scope = chat:write -token = "" +token = os.environ["SLACK_BOT_TOKEN"] sc = WebClient(token) @@ -26,7 +29,7 @@ c.subscribe(["SLACK-KAFKA"]) try: while True: - msg = c.poll(0.1) + msg = c.poll(0.1) # read data time.sleep(5) if msg is None: continue diff --git a/python/src/SlackKafkaProducer.py b/python/src/SlackKafkaProducer.py index ddd438f2c823082c9c0890f6bc4271c120863586..8ec7f8601c90a59eb12a4b9b82f737e345c8609d 100644 --- a/python/src/SlackKafkaProducer.py +++ b/python/src/SlackKafkaProducer.py @@ -1,6 +1,8 @@ import json import time +import os + from config import Config from confluent_kafka import Producer from slack import WebClient @@ -9,7 +11,7 @@ from slack.errors import SlackApiError # Bot User OAuth Access Token # used scopes: channels:history, channels:read, chat:write, im:history, mpim:history, users:read -token = "" +token = os.environ["SLACK_BOT_TOKEN"] # Slack API 초기화 sc = WebClient(token) diff --git a/python/src/config.py b/python/src/config.py index 9df9e1012bda1efc162e286ba8541bef98fe3f94..f4648f55cf1142efee3ddd4efb2d6d3703908ade 100644 --- a/python/src/config.py +++ b/python/src/config.py @@ -2,6 +2,7 @@ class Config: MY_SERVER = "localhost:9092" TOPIC_ID = "first-topic" SLACK_TOPID_ID = "SLACK-KAFKA" + AJOU_TOPIC_ID = "AJOU-NOTIFY" GROUP_ID = "group-one" CLIENT_ID = "client-1"