diff --git a/python/src/AjouSlackConsumer.py b/python/src/AjouSlackConsumer.py index a4a31369068a63a061f1505223876ceabf4b8017..b44842f4cc819060c5a85b6798d8c6398ea2e607 100644 --- a/python/src/AjouSlackConsumer.py +++ b/python/src/AjouSlackConsumer.py @@ -9,6 +9,7 @@ from slack.errors import SlackApiError # Bot User OAuth Access Token +# used scopes: channels:history, channels:read, chat:write, im:history, mpim:history, users:read token = os.environ["SLACK_BOT_TOKEN"] sc = WebClient(token) @@ -29,9 +30,8 @@ c.subscribe([Config.AJOU_TOPIC_ID]) try: while True: msg = c.poll(0.1) - time.sleep(5) + time.sleep(10) if msg is None: - time.sleep(10) continue elif not msg.error(): print("Received a message: {0}".format(msg.value())) @@ -49,7 +49,7 @@ try: href = app_msg["LINK"] writer = app_msg["WRITER"] - channel = "아주대" + channel = "아주대" # C01G2CR5MEE # TODO: 학사면 좀 더 중요하게? text = ":star: `%s` 새로운 공지!\n>%s: %s\n>링크: <%s|공지 확인하기>" % ( date, @@ -86,4 +86,5 @@ except KeyboardInterrupt: print("Pressed CTRL+C...") finally: + print("Closing...") c.close() diff --git a/python/src/AjouSlackProducer.py b/python/src/AjouSlackProducer.py index c1b242c48cef6b3d10ce296aac074bf41cfe6000..a0dad953e162aa7dbe3a17beb60c03d1e01cb963 100644 --- a/python/src/AjouSlackProducer.py +++ b/python/src/AjouSlackProducer.py @@ -2,15 +2,12 @@ import datetime import json import os import time -from contextlib import contextmanager from pathlib import Path import requests from bs4 import BeautifulSoup from config import Config from confluent_kafka import Producer -from slack import WebClient -from slack.errors import SlackApiError # Producer callback function def acked(err, msg): @@ -77,11 +74,7 @@ MAXIMUM_DAY = 7 # remove notices in json that were posted more than 7days ago DUMP = lambda x: json.dumps(x) LOAD = lambda x: json.load(x) -# Bot User OAuth Access Token -# used scopes: channels:history, channels:read, chat:write, im:history, mpim:history, users:read -token = os.environ["SLACK_BOT_TOKEN"] -read = None # 공지 Parser if not Path(JSON_PATH).is_file(): # 파일 없으면 기본 형식 만듬 base_data = {"POSTS": {}, "LAST_PARSED": "1972-12-01 07:00:00.000000"} @@ -89,23 +82,15 @@ if not Path(JSON_PATH).is_file(): # 파일 없으면 기본 형식 만듬 with open(JSON_PATH, "a+") as f: f.write(DUMP(base_data)) +read = None + # json read with open(JSON_PATH, "r+") as f_read: read = LOAD(f_read) read = checkOldness(read) -# Set last parsed time to rest 1 hour well -LAST_PARSED = datetime.datetime.strptime(read["LAST_PARSED"], "%Y-%m-%d %H:%M:%S.%f") - -# init parser -ids, posts, dates, writers = parser() - -# Slack API 초기화 -sc = WebClient(token) -channel = "C01G2CR5MEE" # 아주대 - -# Kafka Producer 만들기 "localhost:9092" +# Kafka Producer 만들기 settings = { "bootstrap.servers": Config.MY_SERVER, # Safe Producer settings @@ -125,49 +110,44 @@ try: read["LAST_PARSED"], "%Y-%m-%d %H:%M:%S.%f" ) - try: - now = datetime.datetime.now() - diff = (now - LAST_PARSED).seconds - print("Last parsing:", LAST_PARSED) - if diff / 3600 < 1: # 업데이트 후 1시간이 안 지났음, 대기 - print(f"Wait for {3600 - diff} seconds to sync new posts.") - time.sleep(3600 - diff) - - read["LAST_PARSED"] = now.strftime("%Y-%m-%d %H:%M:%S.%f") - - print("Trying to parse new posts...") - ids, posts, dates, writers = parser() # 다시 파싱 - for i in range(LENGTH): - postId = ids[i].text.strip() - postLink = posts[i].get("href") - postTitle = posts[i].text.strip() - # postTitle = posts[i].get("title") - postDate = dates[i].text.strip() - postWriter = writers[i].text - - data = makeJson(postId, postTitle, postDate, postLink, postWriter) - # {'10000': {'TITLE': '설문조사', 'DATE': '20.12.04', 'LINK': 'https', 'WRITER': '입학처'}} - - if postId not in read["POSTS"]: - print("Sending a new post...:", postId) - read["POSTS"].update(data) - - PRODUCED += 1 - p.produce( - Config.AJOU_TOPIC_ID, value=DUMP(data[postId]), callback=acked, - ) - p.poll(0.5) # 데이터 Kafka에게 전송 - else: - continue - if PRODUCED: - print(f"Sent {PRODUCED} posts...") + now = datetime.datetime.now() + diff = (now - LAST_PARSED).seconds + print("Last parsing:", LAST_PARSED) + if diff / 3600 < 1: # 업데이트 후 1시간이 안 지났음, 대기 + print(f"Wait for {3600 - diff} seconds to sync new posts.") + time.sleep(3600 - diff) + + read["LAST_PARSED"] = now.strftime("%Y-%m-%d %H:%M:%S.%f") + + print("Trying to parse new posts...") + ids, posts, dates, writers = parser() # 다시 파싱 + for i in range(LENGTH): + postId = ids[i].text.strip() + postLink = posts[i].get("href") + postTitle = posts[i].text.strip() + # postTitle = posts[i].get("title") + postDate = dates[i].text.strip() + postWriter = writers[i].text + + data = makeJson(postId, postTitle, postDate, postLink, postWriter) + # {'10000': {'TITLE': '설문조사', 'DATE': '20.12.04', 'LINK': 'https', 'WRITER': '입학처'}} + + if postId not in read["POSTS"]: + print("Sending a new post...:", postId) + read["POSTS"].update(data) + + PRODUCED += 1 + p.produce( + Config.AJOU_TOPIC_ID, value=DUMP(data[postId]), callback=acked, + ) + p.poll(1) # 데이터 Kafka에게 전송 else: - print("No new posts yet...") - print("Parsed:", datetime.datetime.now()) - - except SlackApiError as e: - assert e.response["ok"] is False - print("\t** FAILED: %s" % e.response["error"]) + continue + if PRODUCED: + print(f"Sent {PRODUCED} post(s)...") + else: + print("No new posts yet...") + print("Parsed:", datetime.datetime.now()) with open(JSON_PATH, "w+") as f: f.write(DUMP(read)) diff --git a/python/src/AjouSlackProducerMySQL.py b/python/src/AjouSlackProducerMySQL.py index 64ecb23ef745285c793c2ca5d62a24f7be48b645..d9cc7585f228deeb6466d39e16fa82a262c478d5 100644 --- a/python/src/AjouSlackProducerMySQL.py +++ b/python/src/AjouSlackProducerMySQL.py @@ -8,199 +8,207 @@ import requests from bs4 import BeautifulSoup from config import Config from confluent_kafka import Producer -from slack import WebClient -from slack.errors import SlackApiError -# Producer callback function -def acked(err, msg): - if err is not None: - print("Failed to deliver message: {0}: {1}".format(msg.value(), err.str())) - else: - print("Message produced: {0}...".format(msg.value())) - - -def checkOldness(db): - pass +class AjouParser: + """ + Ajou notices Parser using Slack API and Apache Kafka + + Methods + ------- + run(server=Config.VM_SERVER, channel="C01G2CR5MEE", database="ajou_notices") + + Usage + ----- + ajou = AjouParser(Kafka_server_ip, mysql_db_name) + ajou.run() + """ + + ADDRESS = "https://www.ajou.ac.kr/kr/ajou/notice.do" + LENGTH = 10 + MAXIMUM_DAY = 7 # remove notices in json that were posted more than 7days ago + + # MySQL commands + INSERT_COMMAND = ( + "INSERT INTO notices (id, title, date, link, writer) " + "VALUES (%s, %s, %s, %s, %s)" + ) + DUPLICATE_COMMAND = "SELECT EXISTS(SELECT * FROM notices WHERE id = %(id)s)" + UPDATE_COMMAND = "UPDATE notices SET date = %(date)s WHERE id = 1" + + __slots__ = ("db", "cursor", "settings", "producer") + + def __init__(self, server=Config.VM_SERVER, database="ajou_notices"): + print("Initializing...") + + # MySQL + self.db = mysql.connector.connect( + host="localhost", + user=os.environ["MYSQL_USER"], + password=os.environ["MYSQL_PASSWORD"], + database=database, + charset="utf8", + ) + self.cursor = self.db.cursor(buffered=True) + + self.settings = { # Producer settings + "bootstrap.servers": server, + "enable.idempotence": True, # Safe + "acks": "all", # Safe + "retries": 10000000, # Safe + "max.in.flight": 5, # High throughput + "compression.type": "lz4", # High throughput + "linger.ms": 5, # High throughput + } + self.producer = Producer(self.settings) + def run(self, period=3600): # period (second) + """Check notices from html per period and sends data to Kafka Consumer.""" + p = self.producer + db = self.db + cursor = self.cursor -def makeData(postId, postTitle, postDate, postLink, postWriter): - return { - postId: { - "TITLE": postTitle, - "DATE": postDate, - "LINK": postLink, - "WRITER": postWriter, - } - } - - -# Ajou notices parser -def parser(): - try: - req = requests.get(f"{ADDRESS}?mode=list&&articleLimit=10&article.offset=0") - req.raise_for_status() - except requests.exceptions.ConnectionError: - print("Seems like the server is down now.") - req.encoding = "utf-8" - html = req.text - soup = BeautifulSoup(html, "html.parser") - ids = soup.select("table > tbody > tr > td.b-num-box") - posts = soup.select("table > tbody > tr > td.b-td-left > div > a") - dates = soup.select("table > tbody > tr > td.b-td-left > div > div > span.b-date") - writers = soup.select( - "table > tbody > tr > td.b-td-left > div > div.b-m-con > span.b-writer" - ) - return ids, posts, dates, writers - - -ADDRESS = "https://www.ajou.ac.kr/kr/ajou/notice.do" -LENGTH = 10 -MAXIMUM_DAY = 7 # remove notices in json that were posted more than 7days ago -DUMP = lambda x: json.dumps(x) - -# MySQL commands -INSERT_COMMAND = ( - "INSERT INTO notices (id, title, date, link, writer) " "VALUES (%s, %s, %s, %s, %s)" -) -DUPLICATE_COMMAND = "SELECT EXISTS(SELECT * FROM notices WHERE id = %(id)s)" -UPDATE_COMMAND = "UPDATE notices SET date = %(date)s WHERE id = 1" - -# MySQL -db = mysql.connector.connect( - host="localhost", - user=os.environ["MYSQL_USER"], - password=os.environ["MYSQL_PASSWORD"], - database="ajou_notices", - charset="utf8", -) -cursor = db.cursor(buffered=True) - -# Bot User OAuth Access Token -# used scopes: channels:history, channels:read, chat:write, im:history, mpim:history, users:read -token = os.environ["SLACK_BOT_TOKEN"] - -# Getting LAST_PARSED from MySQL, where id = 1 -now = datetime.datetime.now() -now = now.strftime("%Y-%m-%d %H:%M:%S.%f") - -cursor.execute("SELECT date FROM notices WHERE id = 1") - -# Set last parsed time to rest 1 hour well -LAST_PARSED = cursor.fetchone()[0] -assert LAST_PARSED is not None, "LAST_PASRED is None." - -# init parser -ids, posts, dates, writers = parser() - -# Slack API 초기화 -sc = WebClient(token) -channel = "C01G2CR5MEE" # 아주대 - -# Kafka Producer 만들기 "localhost:9092" -settings = { - "bootstrap.servers": Config.VM_SERVER, - # Safe Producer settings - "enable.idempotence": True, - "acks": "all", - "retries": 10000000, - "max.in.flight": 5, - "compression.type": "lz4", - "linger.ms": 5, -} # "enable.idempotence": True, "retries": 5 -p = Producer(settings) - -try: - while True: # 1시간마다 무한 반복 - PRODUCED = 0 - - cursor.execute("SELECT date FROM notices WHERE id = 1") - LAST_PARSED = datetime.datetime.strptime( - cursor.fetchone()[0], "%Y-%m-%d %H:%M:%S.%f" - ) # db load date where id = 1 - assert LAST_PARSED is not None, "LAST_PASRED is None." try: - now = datetime.datetime.now() - diff = (now - LAST_PARSED).seconds + while True: # 1시간마다 무한 반복 + print() # Section + PRODUCED = 0 # How many messages did it send? - print("Last parsing:", LAST_PARSED) - if diff / 3600 < 1: # 업데이트 후 1시간이 안 지났음, 대기 - print(f"Wait for {3600 - diff} seconds to sync new posts.") - time.sleep(3600 - diff) + cursor.execute("SELECT date FROM notices WHERE id = 1") + LAST_PARSED = datetime.datetime.strptime( + cursor.fetchone()[0], "%Y-%m-%d %H:%M:%S.%f" + ) # db load date where id = 1 - print("Trying to parse new posts...") - ids, posts, dates, writers = parser() # 다시 파싱 + now = self.getTimeNow() + diff = (now - LAST_PARSED).seconds - # 파싱 오류가 없으면 업데이트 - cursor.execute( - UPDATE_COMMAND, {"date": now.strftime("%Y-%m-%d %H:%M:%S.%f")} - ) + print("Last parsed at", LAST_PARSED) + if (diff / period) < 1: # 업데이트 후 period시간이 안 지났음, 대기 + print(f"Wait for {period - diff} seconds to sync new posts.") + time.sleep(period - diff) - for i in range(LENGTH): - postId = ids[i].text.strip() + print("Trying to parse new posts...") + ids, posts, dates, writers = self.parser() # 다시 파싱 + assert ids is not None, f"Check your parser: {ids}." + # 파싱 오류가 없으면 업데이트 cursor.execute( - DUPLICATE_COMMAND, {"id": int(postId)} - ) # db duplication check - if cursor.fetchone()[0]: # (1, ) - continue # postId exists - - postLink = posts[i].get("href") - postTitle = posts[i].text.strip() - postDate = dates[i].text.strip() - postWriter = writers[i].text - - duplicate = "[" + postWriter + "]" - if duplicate in postTitle: # writer: [writer] title - postTitle = postTitle.replace( - duplicate, "" - ).strip() # -> writer: title - - dbData = ( - int(postId), - postTitle, - postDate, - ADDRESS + postLink, - postWriter, + self.UPDATE_COMMAND, {"date": now.strftime("%Y-%m-%d %H:%M:%S.%f")}, ) - kafkaData = makeData( - postId, postTitle, postDate, ADDRESS + postLink, postWriter - ) - - cursor.execute(INSERT_COMMAND, dbData) # db insert - print("Sending a new post...:", postId) - PRODUCED += 1 - p.produce( - Config.AJOU_TOPIC_ID, value=DUMP(kafkaData[postId]), callback=acked, + for i in range(self.LENGTH): + postId = ids[i].text.strip() + + cursor.execute( + self.DUPLICATE_COMMAND, {"id": int(postId)} + ) # db duplication check + if cursor.fetchone()[0]: # (1, ) + continue # postId exists + + postLink = self.ADDRESS + posts[i].get("href") + postTitle = posts[i].text.strip() + postDate = dates[i].text.strip() + postWriter = writers[i].text + + duplicate = "[" + postWriter + "]" + if duplicate in postTitle: # writer: [writer] title + postTitle = postTitle.replace( + duplicate, "" + ).strip() # -> writer: title + + dbData = ( + int(postId), + postTitle, + postDate, + postLink, + postWriter, + ) + kafkaData = self.makeData( + postId, postTitle, postDate, postLink, postWriter + ) + + cursor.execute(self.INSERT_COMMAND, dbData) # db insert + + print("\n>>> Sending a new post...:", postId) + PRODUCED += 1 + p.produce( + Config.AJOU_TOPIC_ID, + value=json.dumps(kafkaData[postId]), + callback=self.acked, + ) + p.poll(1) # 데이터 Kafka에게 전송, second + + if PRODUCED: + print(f"Sent {PRODUCED} post(s)...") + else: + print("\t** No new posts yet") + print("Parsed at", now) + + db.commit() # save data + print(f"Resting {period // 3600} hour...") + time.sleep(period) + + except Exception as e: # General exceptions + print(dir(e)) + except KeyboardInterrupt: + print("Pressed CTRL+C...") + finally: + print("\nExiting...") + cursor.close() + db.commit() + db.close() + p.flush(100) + + # Producer callback function + @staticmethod + def acked(err, msg): + if err is not None: + print( + "\t** Failed to deliver message: {0}: {1}".format( + msg.value(), err.str() ) - p.poll(1) # 데이터 Kafka에게 전송 - - if PRODUCED: - print(f"Sent {PRODUCED} posts...") - else: - print("No new posts yet...") - print("Parsed:", now) - - except SlackApiError as e: - assert e.response["ok"] is False - print("\t** FAILED: %s" % e.response["error"]) - - db.commit() # save data - print("Resting 1 hour...") - time.sleep(3600) - - -except Exception as e: - print(type(e)) - print(dir(e)) + ) + else: + print("Message produced correctly...") + + @staticmethod + def makeData(postId, postTitle, postDate, postLink, postWriter): + return { + postId: { + "TITLE": postTitle, + "DATE": postDate, + "LINK": postLink, + "WRITER": postWriter, + } + } -except KeyboardInterrupt: - print("Pressed CTRL+C...") + @staticmethod + def getTimeNow() -> datetime.datetime: + return datetime.datetime.now() -finally: - print("Exiting...") - db.commit() - cursor.close() - db.close() - p.flush(100) + # Ajou notices parser + def parser(self): + try: + req = requests.get( + f"{self.ADDRESS}?mode=list&&articleLimit=10&article.offset=0" + ) + req.raise_for_status() + except requests.exceptions.ConnectionError: + print("Seems like the server is down now.") + return None, None, None, None + req.encoding = "utf-8" + html = req.text + soup = BeautifulSoup(html, "html.parser") + ids = soup.select("table > tbody > tr > td.b-num-box") + posts = soup.select("table > tbody > tr > td.b-td-left > div > a") + dates = soup.select( + "table > tbody > tr > td.b-td-left > div > div > span.b-date" + ) + writers = soup.select( + "table > tbody > tr > td.b-td-left > div > div.b-m-con > span.b-writer" + ) + return ids, posts, dates, writers + + +if __name__ == "__main__": + ajou = AjouParser() + ajou.run()