diff --git a/README.md b/README.md index 0651f098f4dd038a86ae98b2df467a6a99c862af..e0e60200fd3a3a77cb0952b2be77491db379bf71 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,8 @@ * [Ajou notices Parsing Producer with Slack API](https://github.com/Alfex4936/kafka-Studies/blob/main/python/src/AjouSlackProducer.py) * [Ajou notices Parsing Producer with Slack API using MySQL](https://github.com/Alfex4936/kafka-Studies/blob/main/python/src/AjouSlackProducerMySQL.py) * [Ajou notices Parsing Consumer with Slack API](https://github.com/Alfex4936/kafka-Studies/blob/main/python/src/AjouSlackConsumer.py) + * [FastAvro Producer example](https://github.com/Alfex4936/kafka-Studies/blob/main/python/src/avro/AjouSlackProducerAvro.py) + * [FastAvro Consumer example](https://github.com/Alfex4936/kafka-Studies/blob/main/python/src/avro/AjouSlackConsumerAvro.py) * Kafka (Java) * [Simple Producer](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson1/ProducerDemo.java) diff --git a/python/README.md b/python/README.md index 10f1404fa4ddb5ed2dbdcfaaa65c6a64399e5e86..4b2e1fe2e1bfcd599e12265fc463ca3f573430ed 100644 --- a/python/README.md +++ b/python/README.md @@ -125,3 +125,33 @@ _Please contact them immediately and see if we can fix the issue *right here, ri <img width="480" src="https://github.com/Alfex4936/kafka-Studies/blob/main/img/slack.png"> </p> </div> + +## FastAvro Producer / Consumer + +Install fastavro with pip + +```console +WIN10@DESKTOP:~$ pip install fastavro +``` + +Example Schema to use +```python +schema = { # avsc + "namespace": "ajou.parser", + "name": "Notice", # seperated but will be namespace.name + "doc": "A notice parser from Ajou university.", + "type": "record", + "fields": [ + {"name": "id", "type": "int"}, + {"name": "title", "type": "string"}, + {"name": "date", "type": "string"}, + {"name": "link", "type": "string"}, + {"name": "writer", "type": "string"}, + ], +} +parsed_schema = parse_schema(schema) +``` + +How to produce single data with Producer & +consume it with Consumer at [pytest](https://github.com/Alfex4936/kafka-Studies/blob/main/python/tests/test_avro.py#L110) + diff --git a/python/src/AjouSlackConsumer.py b/python/src/AjouSlackConsumer.py index b44842f4cc819060c5a85b6798d8c6398ea2e607..972d6081d25494f3476741cfdd75ec3571f8ab78 100644 --- a/python/src/AjouSlackConsumer.py +++ b/python/src/AjouSlackConsumer.py @@ -8,6 +8,12 @@ from slack import WebClient from slack.errors import SlackApiError +def error_cb(error): + print(">>>", error) + if error == KafkaError._ALL_BROKERS_DOWN: + print("SERVER DOWN") + + # Bot User OAuth Access Token # used scopes: channels:history, channels:read, chat:write, im:history, mpim:history, users:read token = os.environ["SLACK_BOT_TOKEN"] @@ -20,6 +26,7 @@ settings = { "bootstrap.servers": Config.VM_SERVER, "group.id": "ajou-notify", "default.topic.config": {"auto.offset.reset": "largest"}, + "error_cb": error_cb, # "debug": "broker, cgrp", } c = Consumer(settings) @@ -36,6 +43,7 @@ try: elif not msg.error(): print("Received a message: {0}".format(msg.value())) if msg.value() is None: + print("But the message value is empty.") continue try: diff --git a/python/src/AjouSlackProducer.py b/python/src/AjouSlackProducer.py index f389f2bd2a9151de639bc10a0f7737ad6dd27d0e..2eec962b8ffaa49422a68fd3aad9e5fe39178436 100644 --- a/python/src/AjouSlackProducer.py +++ b/python/src/AjouSlackProducer.py @@ -1,6 +1,7 @@ import datetime import json import os +import ssl import time from pathlib import Path from urllib.error import HTTPError diff --git a/python/src/AjouSlackProducerMySQL.py b/python/src/AjouSlackProducerMySQL.py index 1ed9fbfe8804b3b1dd46654440a929d695f8b081..0ef4fd2feb8223110eaf8c84397b80caa8a1b328 100644 --- a/python/src/AjouSlackProducerMySQL.py +++ b/python/src/AjouSlackProducerMySQL.py @@ -150,6 +150,7 @@ class AjouParser: time.sleep(period) except Exception as e: # General exceptions + print(e) print(dir(e)) except KeyboardInterrupt: print("Pressed CTRL+C...") diff --git a/python/src/avro/AjouSlackConsumerAvro.py b/python/src/avro/AjouSlackConsumerAvro.py new file mode 100644 index 0000000000000000000000000000000000000000..146b9e8006e604663ee4541ab073c6fd5c439ca9 --- /dev/null +++ b/python/src/avro/AjouSlackConsumerAvro.py @@ -0,0 +1,110 @@ +import os +import time +from io import BytesIO + +from confluent_kafka import Consumer, KafkaError, KafkaException +from fastavro import parse_schema, schemaless_reader, writer +from slack import WebClient +from slack.errors import SlackApiError + + +schema = { # avsc + "namespace": "ajou.parser", + "name": "Notice", # seperated but will be namespace.name + "doc": "A notice parser from Ajou university.", + "type": "record", + "fields": [ + {"name": "id", "type": "int"}, + {"name": "title", "type": "string"}, + {"name": "date", "type": "string"}, + {"name": "link", "type": "string"}, + {"name": "writer", "type": "string"}, + ], +} +parsed_schema = parse_schema(schema) + + +# Bot User OAuth Access Token +# used scopes: channels:history, channels:read, chat:write, im:history, mpim:history, users:read +token = os.environ["SLACK_BOT_TOKEN"] + +sc = WebClient(token) + +# Set 'auto.offset.reset': 'smallest' if you want to consume all messages +# from the beginning of the topic +settings = { + "bootstrap.servers": os.environ["VM_SERVER"], + "group.id": "ajou-notify", + "default.topic.config": {"auto.offset.reset": "largest"}, + # "value.deserializer": lambda v: json.loads(v.decode("utf-8")), + # "debug": "broker, cgrp", +} +c = Consumer(settings) + +# Topic = "AJOU-NOTIFY +c.subscribe(["AJOU-NOTIFY"]) + +try: + while True: + # SIGINT can't be handled when polling, limit timeout to 1 second. + msg = c.poll(1.0) + if msg is None: + time.sleep(10) + continue + elif not msg.error(): + print("Received messages: {0}".format(len(msg))) + if msg.value() is None: + print("But the message value is empty.") + continue + + # Consumer read message + message = msg.value() + rb = BytesIO(message) + + app_msg = schemaless_reader(rb, parsed_schema) # read one record + try: + title = app_msg["title"] + date = app_msg["date"] + href = app_msg["link"] + writer = app_msg["writer"] + + channel = "아주대" # C01G2CR5MEE + text = ":star: `%s` 새로운 공지!\n>%s: %s\n>링크: <%s|공지 확인하기>" % ( + date, + writer, + title, + href, + ) + print('\nSending message "%s" to channel %s' % (text, channel)) + except SlackApiError as e: + print("Failed to get channel/text from message.") + print(e.response["error"]) + channel = "kafka" + text = msg.value() + + try: + sc_response = sc.chat_postMessage( + channel=channel, text=text, as_user=True, username="아주대 공지 봇" + ) # as_user은 new slack app에서 작동 안 함 + + except SlackApiError as e: + assert e.response["ok"] is False + print("\t** FAILED: %s" % e.response["error"]) + elif msg.error().code() == KafkaError._PARTITION_EOF: + print( + "End of partition reached {0}/{1}".format(msg.topic(), msg.partition()) + ) + elif msg.error(): + raise KafkaException(msg.error()) + else: + print("Error occured: {0}".format(msg.error().str())) +except Exception as e: + print(e) + print(dir(e)) + +except KeyboardInterrupt: + print("Pressed CTRL+C...") + +finally: + print("Closing...") + c.close() diff --git a/python/src/avro/AjouSlackProducerAvro.py b/python/src/avro/AjouSlackProducerAvro.py new file mode 100644 index 0000000000000000000000000000000000000000..5f312b580d8cf1f2ef25dbbbcec4f5d36386738c --- /dev/null +++ b/python/src/avro/AjouSlackProducerAvro.py @@ -0,0 +1,204 @@ +import datetime +import os +import ssl +import time +from io import BytesIO +from urllib.error import HTTPError +from urllib.request import urlopen + +from bs4 import BeautifulSoup +from confluent_kafka import Producer +from fastavro import parse_schema, reader, schemaless_writer, writer + + +schema = { # avsc + "namespace": "ajou.parser", + "name": "Notice", # seperated but will be namespace.name + "doc": "A notice parser from Ajou university.", + "type": "record", + "fields": [ + {"name": "id", "type": "int"}, + {"name": "title", "type": "string"}, + {"name": "date", "type": "string"}, + {"name": "link", "type": "string"}, + {"name": "writer", "type": "string"}, + ], +} +parsed_schema = parse_schema(schema) + + +class AjouParserAVRO: + """ + Ajou notices Parser using Slack API and Apache Kafka (AVRO) + + AVRO file will be saved in your current directory. + + Methods + ------- + run(server=Config.VM_SERVER, avro_name="ajou.avro") + + Usage + ----- + ajou = AjouParserAVRO(Kafka_server_ip, avro_name) + ajou.run() + """ + + # HTML + ADDRESS = "https://www.ajou.ac.kr/kr/ajou/notice.do" + LENGTH = 10 + + # AVRO + BASE_DIR = os.path.dirname(os.path.abspath(__file__)) + + __slots__ = ("settings", "producer", "AVRO_PATH") + + def __init__(self, server=os.environ["VM_SERVER"], avro_name="ajou.avro"): + print("Initializing...") + + self.AVRO_PATH = os.path.join( + self.BASE_DIR, avro_name + ) # saves into current dir + + self.settings = { # Producer settings + "bootstrap.servers": server, + "enable.idempotence": True, # Safe + "acks": "all", # Safe + "retries": 5, # Safe + "max.in.flight": 5, # High throughput + "compression.type": "snappy", # High throughput + "linger.ms": 5, # High throughput + } + self.producer = Producer(self.settings) + + def run(self, period=3600): # period (second) + """Check notices from html per period and sends data to Kafka Consumer.""" + p = self.producer + processedIds = [] # already parsed + + with open(self.AVRO_PATH, "rb") as fo: + for record in reader(fo): + processedIds.append(record["id"]) # add id to already parsed list + + try: + while True: # 1시간마다 무한 반복 + records = [] + print() # Section + PRODUCED = 0 # How many messages did it send + + # No checking on last parsed date, always starts new + print("Trying to parse new posts...") + ids, posts, dates, writers = self.parser() # 다시 파싱 + assert ids is not None, f"Check your parser: {ids}." + + for i in range(self.LENGTH): # check all 10 notices + postId = int(ids[i].text.strip()) + if postId in processedIds: + continue + else: + processedIds.append(postId) + postLink = self.ADDRESS + posts[i].get("href") + postTitle = posts[i].text.strip() + postDate = dates[i].text.strip() + postWriter = writers[i].text + + # Removing a name duplication + duplicate = "[" + postWriter + "]" + if duplicate in postTitle: # writer: [writer] title + postTitle = postTitle.replace( + duplicate, "" + ).strip() # -> writer: title + + data = self.makeData( + postId, postTitle, postDate, postLink, postWriter + ) + rb = BytesIO() + schemaless_writer(rb, parsed_schema, data) # write one record + + records.append(data) # to write avro + + print("\n>>> Sending a new post...:", postId) + PRODUCED += 1 + p.produce( + "AJOU-NOTIFY", value=rb.getvalue(), callback=self.acked, + ) + p.poll(1) # 데이터 Kafka에게 전송, second + + if PRODUCED: + print(f"Sent {PRODUCED} post(s)...") + + with open(self.AVRO_PATH, "wb") as out: + # write avro only when there are updates + writer(out, parsed_schema, records) + else: + print("\t** No new posts yet") + print("Parsed at", self.getTimeNow()) + + print(f"Resting {period // 3600} hour...") + time.sleep(period) + + except Exception as e: # General exceptions + print(e) + print(dir(e)) + except KeyboardInterrupt: + print("Pressed CTRL+C...") + finally: + print("\nExiting...") + p.flush(100) + + @staticmethod + def error_cb(error): + print(">>>", error) + + # Producer callback function + @staticmethod + def acked(err, msg): + if err is not None: + print( + "\t** Failed to deliver message: {0}: {1}".format( + msg.value(), err.str() + ) + ) + else: + print("Message produced correctly...") + + @staticmethod + def makeData(postId, postTitle, postDate, postLink, postWriter): + return { + "id": postId, + "title": postTitle, + "date": postDate, + "link": postLink, + "writer": postWriter, + } + + @staticmethod + def getTimeNow() -> datetime.datetime: + return datetime.datetime.now() + + # Ajou notices parser + def parser(self): + context = ssl._create_unverified_context() + try: + result = urlopen( + f"{self.ADDRESS}?mode=list&&articleLimit={self.LENGTH}&article.offset=0", + context=context, + ) + except HTTPError: + print("Seems like the server is down now.") + return None, None, None, None + html = result.read() + soup = BeautifulSoup(html, "html.parser") + ids = soup.select("table > tbody > tr > td.b-num-box") + posts = soup.select("table > tbody > tr > td.b-td-left > div > a") + dates = soup.select( + "table > tbody > tr > td.b-td-left > div > div > span.b-date" + ) + writers = soup.select( + "table > tbody > tr > td.b-td-left > div > div.b-m-con > span.b-writer" + ) + return ids, posts, dates, writers + + +if __name__ == "__main__": + ajou = AjouParserAVRO() + ajou.run() diff --git a/python/src/avro/ajou.avro b/python/src/avro/ajou.avro new file mode 100644 index 0000000000000000000000000000000000000000..5d3677301d88559a4bdc8c3a4904681bb592fc9e Binary files /dev/null and b/python/src/avro/ajou.avro differ diff --git a/python/tests/ajou.avro b/python/tests/ajou.avro new file mode 100644 index 0000000000000000000000000000000000000000..8409bfd5440d150271ae2c04079a8267d63a43a8 Binary files /dev/null and b/python/tests/ajou.avro differ diff --git a/python/tests/test_avro.py b/python/tests/test_avro.py new file mode 100644 index 0000000000000000000000000000000000000000..7eae01180bcae2e025c31f8f37669bd098202a25 --- /dev/null +++ b/python/tests/test_avro.py @@ -0,0 +1,227 @@ +import datetime +import os +import ssl +from io import BytesIO +from urllib.error import HTTPError +from urllib.request import urlopen + +from bs4 import BeautifulSoup +from fastavro import ( + parse_schema, + reader, + schemaless_reader, + schemaless_writer, + writer, +) + + +schema = { # avsc + "namespace": "ajou.parser", + "name": "Notice", # seperated but will be namespace.name + "doc": "A notice parser from Ajou university.", + "type": "record", + "fields": [ + {"name": "id", "type": "int"}, + {"name": "title", "type": "string"}, + {"name": "date", "type": "string"}, + {"name": "link", "type": "string"}, + {"name": "writer", "type": "string"}, + ], +} +parsed_schema = parse_schema(schema) + +# 'records' can be an iterable (including generator) +records = [ + { + "id": 10005, + "title": "대학교 공지 1", + "date": "2020-12-01", + "link": "https://www.ajou.ac.kr/kr/ajou/notice.do?mode=view&articleNo=10005", + "writer": "CSW", + }, + { + "id": 10006, + "title": "대학교 공지 2", + "date": "2020-12-02", + "link": "https://www.ajou.ac.kr/kr/ajou/notice.do?mode=view&articleNo=10006", + "writer": "CSW", + }, + { + "id": 10007, + "title": "대학교 공지 3", + "date": "2020-12-04", + "link": "https://www.ajou.ac.kr/kr/ajou/notice.do?mode=view&articleNo=10007", + "writer": "CSW", + }, + { + "id": 10008, + "title": "대학교 공지 4", + "date": "2020-12-04", + "link": "https://www.ajou.ac.kr/kr/ajou/notice.do?mode=view&articleNo=10008", + "writer": "CSW", + }, + { + "id": 10009, + "title": "대학교 공지 5", + "date": "2020-12-11", + "link": "https://www.ajou.ac.kr/kr/ajou/notice.do?mode=view&articleNo=10009", + "writer": "CSW", + }, +] + +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) +AVRO_PATH = os.path.join(BASE_DIR, "ajou.avro") + +ADDRESS = "https://www.ajou.ac.kr/kr/ajou/notice.do" +LENGTH = 10 + + +def makeData(postId, postTitle, postDate, postLink, postWriter): + return { + "id": postId, + "title": postTitle, + "date": postDate, + "link": postLink, + "writer": postWriter, + } + + +def parser(): + context = ssl._create_unverified_context() + try: + result = urlopen( + f"{ADDRESS}?mode=list&&articleLimit={LENGTH}&article.offset=0", + context=context, + ) + except HTTPError: + print("Seems like the server is down now.") + return None, None, None, None + html = result.read() + soup = BeautifulSoup(html, "html.parser") + ids = soup.select("table > tbody > tr > td.b-num-box") + posts = soup.select("table > tbody > tr > td.b-td-left > div > a") + dates = soup.select("table > tbody > tr > td.b-td-left > div > div > span.b-date") + writers = soup.select( + "table > tbody > tr > td.b-td-left > div > div.b-m-con > span.b-writer" + ) + return ids, posts, dates, writers + + +def test_single_record(): + # To send with producer + message = { + "id": 10000, + "title": "[FastAVRO] 테스트 공지 제목", + "date": "20.12.23", + "link": "https://somelink", + "writer": "alfex4936", + } + + # How producer produces single data + producer_rb = BytesIO() + schemaless_writer(producer_rb, parsed_schema, message) # write one record + produced_data = producer_rb.getvalue() + + # How consumer reads single record + consumer_rb = BytesIO(produced_data) + decoded = schemaless_reader(consumer_rb, parsed_schema) # read one record + assert decoded == { + "id": 10000, + "title": "[FastAVRO] 테스트 공지 제목", + "date": "20.12.23", + "link": "https://somelink", + "writer": "alfex4936", + } + + # {'id': 10000, 'title': '[FastAVRO] 테스트 공지 제목', 'date': '20.12.23', 'link': 'https://somelink', 'writer': 'alfex4936'} + + +def test_write(): + # Writing + with open(AVRO_PATH, "wb") as out: + writer(out, parsed_schema, records) + + assert os.path.isfile(AVRO_PATH), "Writing avro file gone wrong." + + +def test_read(): + # Reading + with open(AVRO_PATH, "rb") as fo: + for record in reader(fo): + print(record) + assert isinstance(record["id"], int) + + +def test_consumer(): + msg = [ + { + "id": 10000, + "title": "[\ud559\uc2b5\ubc95] \uc131\uacf5\ud558\ub294 \ud559\uc2b5\ub9ac\ub354\ub97c \uc704\ud55c \ud559\uc2b5\ub9ac\ub354\uc6cc\ud06c\uc20d \uc548\ub0b4", + "date": "20.12.07", + "link": "https://www.ajou.ac.kr/kr/ajou/notice.do?mode=view&articleNo=104863&article.offset=0&articleLimit=10", + "writer": "\uad50\uc218\ud559\uc2b5\uac1c\ubc1c\uc13c\ud130", + }, + { + "id": 10001, + "title": "[\ud559\uc2b5\ubc95] \uc131\uacf5\ud558\ud558 \ud559\uc2b5\ub9ac\ub354\ub97c \uc704\ud55c \ud559\uc2b5\ub9ac\ub354\uc6cc\ud06c\uc20d \uc548\ub0b4", + "date": "20.12.08", + "link": "https://www.ajou.ac.kr/kr/ajou/notice.do?mode=view&articleNo=104863&article.offset=0&articleLimit=10", + "writer": "\uad50\uc218\ud559\uc2b5\uac1c\ubc1c\uc13c\ud130", + }, + ] + newFile = BytesIO() + writer(newFile, parsed_schema, msg) + newFile.seek(0) + for record in reader(newFile): + print(record) + + +def test_parse(): + processedIds = [] # already parsed + + with open(AVRO_PATH, "rb") as fo: + for record in reader(fo): + processedIds.append(record["id"]) + print("ALREADY PARSED:", processedIds) + + records = [] + print() # Section + PRODUCED = 0 # How many messages did it send + + # No checking on last parsed date, always starts new + print("Trying to parse new posts...") + ids, posts, dates, writers = parser() # 다시 파싱 + assert ids is not None, f"Check your parser: {ids}." + + for i in range(LENGTH): + postId = int(ids[i].text.strip()) + if postId in processedIds: # Already sent + continue + else: + processedIds.append(postId) + postLink = ADDRESS + posts[i].get("href") + postTitle = posts[i].text.strip() + postDate = dates[i].text.strip() + postWriter = writers[i].text + + # Removing a name duplication + duplicate = "[" + postWriter + "]" + if duplicate in postTitle: # writer: [writer] title + postTitle = postTitle.replace(duplicate, "").strip() # -> writer: title + + data = makeData(postId, postTitle, postDate, postLink, postWriter) + records.append(data) + + print("\n>>> Sending a new post...:", postId) + PRODUCED += 1 + + # Producer 자리 + + if PRODUCED: + print(f"Sent {PRODUCED} post(s)...") + with open(AVRO_PATH, "wb") as out: + writer(out, parsed_schema, records) + else: + print("\t** No new posts yet") + print("Parsed at", datetime.datetime.now()) +