From 09a1c2d1850ca919889726d92d077dc286171f73 Mon Sep 17 00:00:00 2001 From: Seok Won <ikr@kakao.com> Date: Wed, 23 Dec 2020 16:40:47 +0900 Subject: [PATCH] Create FastAvro example Apache Avro with fastavro AjouSlackProducerAvro.py will produce a single record with schema AjouSlackConsumerAvro.py will consume it with given schema --- README.md | 2 + python/README.md | 30 +++ python/src/AjouSlackConsumer.py | 8 + python/src/AjouSlackProducer.py | 1 + python/src/AjouSlackProducerMySQL.py | 1 + python/src/avro/AjouSlackConsumerAvro.py | 110 +++++++++++ python/src/avro/AjouSlackProducerAvro.py | 204 ++++++++++++++++++++ python/src/avro/ajou.avro | Bin 0 -> 2416 bytes python/tests/ajou.avro | Bin 0 -> 882 bytes python/tests/test_avro.py | 227 +++++++++++++++++++++++ 10 files changed, 583 insertions(+) create mode 100644 python/src/avro/AjouSlackConsumerAvro.py create mode 100644 python/src/avro/AjouSlackProducerAvro.py create mode 100644 python/src/avro/ajou.avro create mode 100644 python/tests/ajou.avro create mode 100644 python/tests/test_avro.py diff --git a/README.md b/README.md index 0651f09..e0e6020 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,8 @@ * [Ajou notices Parsing Producer with Slack API](https://github.com/Alfex4936/kafka-Studies/blob/main/python/src/AjouSlackProducer.py) * [Ajou notices Parsing Producer with Slack API using MySQL](https://github.com/Alfex4936/kafka-Studies/blob/main/python/src/AjouSlackProducerMySQL.py) * [Ajou notices Parsing Consumer with Slack API](https://github.com/Alfex4936/kafka-Studies/blob/main/python/src/AjouSlackConsumer.py) + * [FastAvro Producer example](https://github.com/Alfex4936/kafka-Studies/blob/main/python/src/avro/AjouSlackProducerAvro.py) + * [FastAvro Consumer example](https://github.com/Alfex4936/kafka-Studies/blob/main/python/src/avro/AjouSlackConsumerAvro.py) * Kafka (Java) * [Simple Producer](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson1/ProducerDemo.java) diff --git a/python/README.md b/python/README.md index 10f1404..4b2e1fe 100644 --- a/python/README.md +++ b/python/README.md @@ -125,3 +125,33 @@ _Please contact them immediately and see if we can fix the issue *right here, ri <img width="480" src="https://github.com/Alfex4936/kafka-Studies/blob/main/img/slack.png"> </p> </div> + +## FastAvro Producer / Consumer + +Install fastavro with pip + +```console +WIN10@DESKTOP:~$ pip install fastavro +``` + +Example Schema to use +```python +schema = { # avsc + "namespace": "ajou.parser", + "name": "Notice", # seperated but will be namespace.name + "doc": "A notice parser from Ajou university.", + "type": "record", + "fields": [ + {"name": "id", "type": "int"}, + {"name": "title", "type": "string"}, + {"name": "date", "type": "string"}, + {"name": "link", "type": "string"}, + {"name": "writer", "type": "string"}, + ], +} +parsed_schema = parse_schema(schema) +``` + +How to produce single data with Producer & +consume it with Consumer at [pytest](https://github.com/Alfex4936/kafka-Studies/blob/main/python/tests/test_avro.py#L110) + diff --git a/python/src/AjouSlackConsumer.py b/python/src/AjouSlackConsumer.py index b44842f..972d608 100644 --- a/python/src/AjouSlackConsumer.py +++ b/python/src/AjouSlackConsumer.py @@ -8,6 +8,12 @@ from slack import WebClient from slack.errors import SlackApiError +def error_cb(error): + print(">>>", error) + if error == KafkaError._ALL_BROKERS_DOWN: + print("SERVER DOWN") + + # Bot User OAuth Access Token # used scopes: channels:history, channels:read, chat:write, im:history, mpim:history, users:read token = os.environ["SLACK_BOT_TOKEN"] @@ -20,6 +26,7 @@ settings = { "bootstrap.servers": Config.VM_SERVER, "group.id": "ajou-notify", "default.topic.config": {"auto.offset.reset": "largest"}, + "error_cb": error_cb, # "debug": "broker, cgrp", } c = Consumer(settings) @@ -36,6 +43,7 @@ try: elif not msg.error(): print("Received a message: {0}".format(msg.value())) if msg.value() is None: + print("But the message value is empty.") continue try: diff --git a/python/src/AjouSlackProducer.py b/python/src/AjouSlackProducer.py index f389f2b..2eec962 100644 --- a/python/src/AjouSlackProducer.py +++ b/python/src/AjouSlackProducer.py @@ -1,6 +1,7 @@ import datetime import json import os +import ssl import time from pathlib import Path from urllib.error import HTTPError diff --git a/python/src/AjouSlackProducerMySQL.py b/python/src/AjouSlackProducerMySQL.py index 1ed9fbf..0ef4fd2 100644 --- a/python/src/AjouSlackProducerMySQL.py +++ b/python/src/AjouSlackProducerMySQL.py @@ -150,6 +150,7 @@ class AjouParser: time.sleep(period) except Exception as e: # General exceptions + print(e) print(dir(e)) except KeyboardInterrupt: print("Pressed CTRL+C...") diff --git a/python/src/avro/AjouSlackConsumerAvro.py b/python/src/avro/AjouSlackConsumerAvro.py new file mode 100644 index 0000000..146b9e8 --- /dev/null +++ b/python/src/avro/AjouSlackConsumerAvro.py @@ -0,0 +1,110 @@ +import os +import time +from io import BytesIO + +from confluent_kafka import Consumer, KafkaError, KafkaException +from fastavro import parse_schema, schemaless_reader, writer +from slack import WebClient +from slack.errors import SlackApiError + + +schema = { # avsc + "namespace": "ajou.parser", + "name": "Notice", # seperated but will be namespace.name + "doc": "A notice parser from Ajou university.", + "type": "record", + "fields": [ + {"name": "id", "type": "int"}, + {"name": "title", "type": "string"}, + {"name": "date", "type": "string"}, + {"name": "link", "type": "string"}, + {"name": "writer", "type": "string"}, + ], +} +parsed_schema = parse_schema(schema) + + +# Bot User OAuth Access Token +# used scopes: channels:history, channels:read, chat:write, im:history, mpim:history, users:read +token = os.environ["SLACK_BOT_TOKEN"] + +sc = WebClient(token) + +# Set 'auto.offset.reset': 'smallest' if you want to consume all messages +# from the beginning of the topic +settings = { + "bootstrap.servers": os.environ["VM_SERVER"], + "group.id": "ajou-notify", + "default.topic.config": {"auto.offset.reset": "largest"}, + # "value.deserializer": lambda v: json.loads(v.decode("utf-8")), + # "debug": "broker, cgrp", +} +c = Consumer(settings) + +# Topic = "AJOU-NOTIFY +c.subscribe(["AJOU-NOTIFY"]) + +try: + while True: + # SIGINT can't be handled when polling, limit timeout to 1 second. + msg = c.poll(1.0) + if msg is None: + time.sleep(10) + continue + elif not msg.error(): + print("Received messages: {0}".format(len(msg))) + if msg.value() is None: + print("But the message value is empty.") + continue + + # Consumer read message + message = msg.value() + rb = BytesIO(message) + + app_msg = schemaless_reader(rb, parsed_schema) # read one record + try: + title = app_msg["title"] + date = app_msg["date"] + href = app_msg["link"] + writer = app_msg["writer"] + + channel = "아주대" # C01G2CR5MEE + text = ":star: `%s` 새로운 공지!\n>%s: %s\n>링크: <%s|공지 확인하기>" % ( + date, + writer, + title, + href, + ) + print('\nSending message "%s" to channel %s' % (text, channel)) + except SlackApiError as e: + print("Failed to get channel/text from message.") + print(e.response["error"]) + channel = "kafka" + text = msg.value() + + try: + sc_response = sc.chat_postMessage( + channel=channel, text=text, as_user=True, username="아주대 공지 봇" + ) # as_user은 new slack app에서 작동 안 함 + + except SlackApiError as e: + assert e.response["ok"] is False + print("\t** FAILED: %s" % e.response["error"]) + elif msg.error().code() == KafkaError._PARTITION_EOF: + print( + "End of partition reached {0}/{1}".format(msg.topic(), msg.partition()) + ) + elif msg.error(): + raise KafkaException(msg.error()) + else: + print("Error occured: {0}".format(msg.error().str())) +except Exception as e: + print(e) + print(dir(e)) + +except KeyboardInterrupt: + print("Pressed CTRL+C...") + +finally: + print("Closing...") + c.close() diff --git a/python/src/avro/AjouSlackProducerAvro.py b/python/src/avro/AjouSlackProducerAvro.py new file mode 100644 index 0000000..5f312b5 --- /dev/null +++ b/python/src/avro/AjouSlackProducerAvro.py @@ -0,0 +1,204 @@ +import datetime +import os +import ssl +import time +from io import BytesIO +from urllib.error import HTTPError +from urllib.request import urlopen + +from bs4 import BeautifulSoup +from confluent_kafka import Producer +from fastavro import parse_schema, reader, schemaless_writer, writer + + +schema = { # avsc + "namespace": "ajou.parser", + "name": "Notice", # seperated but will be namespace.name + "doc": "A notice parser from Ajou university.", + "type": "record", + "fields": [ + {"name": "id", "type": "int"}, + {"name": "title", "type": "string"}, + {"name": "date", "type": "string"}, + {"name": "link", "type": "string"}, + {"name": "writer", "type": "string"}, + ], +} +parsed_schema = parse_schema(schema) + + +class AjouParserAVRO: + """ + Ajou notices Parser using Slack API and Apache Kafka (AVRO) + + AVRO file will be saved in your current directory. + + Methods + ------- + run(server=Config.VM_SERVER, avro_name="ajou.avro") + + Usage + ----- + ajou = AjouParserAVRO(Kafka_server_ip, avro_name) + ajou.run() + """ + + # HTML + ADDRESS = "https://www.ajou.ac.kr/kr/ajou/notice.do" + LENGTH = 10 + + # AVRO + BASE_DIR = os.path.dirname(os.path.abspath(__file__)) + + __slots__ = ("settings", "producer", "AVRO_PATH") + + def __init__(self, server=os.environ["VM_SERVER"], avro_name="ajou.avro"): + print("Initializing...") + + self.AVRO_PATH = os.path.join( + self.BASE_DIR, avro_name + ) # saves into current dir + + self.settings = { # Producer settings + "bootstrap.servers": server, + "enable.idempotence": True, # Safe + "acks": "all", # Safe + "retries": 5, # Safe + "max.in.flight": 5, # High throughput + "compression.type": "snappy", # High throughput + "linger.ms": 5, # High throughput + } + self.producer = Producer(self.settings) + + def run(self, period=3600): # period (second) + """Check notices from html per period and sends data to Kafka Consumer.""" + p = self.producer + processedIds = [] # already parsed + + with open(self.AVRO_PATH, "rb") as fo: + for record in reader(fo): + processedIds.append(record["id"]) # add id to already parsed list + + try: + while True: # 1시간마다 무한 반복 + records = [] + print() # Section + PRODUCED = 0 # How many messages did it send + + # No checking on last parsed date, always starts new + print("Trying to parse new posts...") + ids, posts, dates, writers = self.parser() # 다시 파싱 + assert ids is not None, f"Check your parser: {ids}." + + for i in range(self.LENGTH): # check all 10 notices + postId = int(ids[i].text.strip()) + if postId in processedIds: + continue + else: + processedIds.append(postId) + postLink = self.ADDRESS + posts[i].get("href") + postTitle = posts[i].text.strip() + postDate = dates[i].text.strip() + postWriter = writers[i].text + + # Removing a name duplication + duplicate = "[" + postWriter + "]" + if duplicate in postTitle: # writer: [writer] title + postTitle = postTitle.replace( + duplicate, "" + ).strip() # -> writer: title + + data = self.makeData( + postId, postTitle, postDate, postLink, postWriter + ) + rb = BytesIO() + schemaless_writer(rb, parsed_schema, data) # write one record + + records.append(data) # to write avro + + print("\n>>> Sending a new post...:", postId) + PRODUCED += 1 + p.produce( + "AJOU-NOTIFY", value=rb.getvalue(), callback=self.acked, + ) + p.poll(1) # 데이터 Kafka에게 전송, second + + if PRODUCED: + print(f"Sent {PRODUCED} post(s)...") + + with open(self.AVRO_PATH, "wb") as out: + # write avro only when there are updates + writer(out, parsed_schema, records) + else: + print("\t** No new posts yet") + print("Parsed at", self.getTimeNow()) + + print(f"Resting {period // 3600} hour...") + time.sleep(period) + + except Exception as e: # General exceptions + print(e) + print(dir(e)) + except KeyboardInterrupt: + print("Pressed CTRL+C...") + finally: + print("\nExiting...") + p.flush(100) + + @staticmethod + def error_cb(error): + print(">>>", error) + + # Producer callback function + @staticmethod + def acked(err, msg): + if err is not None: + print( + "\t** Failed to deliver message: {0}: {1}".format( + msg.value(), err.str() + ) + ) + else: + print("Message produced correctly...") + + @staticmethod + def makeData(postId, postTitle, postDate, postLink, postWriter): + return { + "id": postId, + "title": postTitle, + "date": postDate, + "link": postLink, + "writer": postWriter, + } + + @staticmethod + def getTimeNow() -> datetime.datetime: + return datetime.datetime.now() + + # Ajou notices parser + def parser(self): + context = ssl._create_unverified_context() + try: + result = urlopen( + f"{self.ADDRESS}?mode=list&&articleLimit={self.LENGTH}&article.offset=0", + context=context, + ) + except HTTPError: + print("Seems like the server is down now.") + return None, None, None, None + html = result.read() + soup = BeautifulSoup(html, "html.parser") + ids = soup.select("table > tbody > tr > td.b-num-box") + posts = soup.select("table > tbody > tr > td.b-td-left > div > a") + dates = soup.select( + "table > tbody > tr > td.b-td-left > div > div > span.b-date" + ) + writers = soup.select( + "table > tbody > tr > td.b-td-left > div > div.b-m-con > span.b-writer" + ) + return ids, posts, dates, writers + + +if __name__ == "__main__": + ajou = AjouParserAVRO() + ajou.run() diff --git a/python/src/avro/ajou.avro b/python/src/avro/ajou.avro new file mode 100644 index 0000000000000000000000000000000000000000..5d3677301d88559a4bdc8c3a4904681bb592fc9e GIT binary patch literal 2416 zcmeZI%3@>@Nh~YM*GtY%NloU+E6vFf1M`cMGg5OC_pnqel~fj_Dp@Hg6{RNU7o{la zC@7`mCxZkW74q^+GLusk3KENoQ;QVRit=+69JBIE6-x6m%TkMqGfOJ<K<e`nbHOGg zX62XaK@{uxfwh8U(=t<YQi_$V6r!t@U@9}g_P}h;%qvl<)loo5mt>aYq=NLqR1}vK zW#*-$DoRN#A*d!NGcTKvIpsx}C8<T&RK?aZOq=*_rjpK^%yS-^;U!)Z*NaS3SapDL z17q~-RV&^spQ!M9dB>{_6Jix!Z<z7Azva!QEeda%HoTeMs_?ph=G$pY6<%-XS9r6$ z@%5T53UAtaUTv78@Mgw}*K_y0nY%;b)z%4b<}6frJ9Xxp=D7+G0|kr>^bC#kjEqk( zW|Wi^6kF-*mzS68fkQSiSueXtKf6dD#M6hwjb2K=eQthAs%=?jYPnisQAuWUPO4wN zt)YRbk+BI(KrcTntvI#B)&M5plbM@YVryt%@M`ncw^L_AT=I5C<D2dUZ(8=eZP~Em z0Am~D>-9TcPnq&&+7^X3^IBi8+3{xH1cf)-ro7oSN8!<o);C)^UT@y=YD2^8uBDG= zv?{2-nY-up{55a7mb~rSq3~++o;M4os#DcJMlk2&^pDo-o(52O&fW87-h|i7)<8V7 z>;U6B#%Qckv0$IfXnfP$__lAyo8=Q9&1ltrwRP>A1#{la-SKAG+&A+kyxO$#&9b=) z3hHmCw7g!n=Jom=pdg`z2Mq~&PzS3o6qX!d%rP{2Gkc2ho5g$HY*?Z2X6}|(o43B& z-127bj@Jv8zF9Ky)rL8c&`@}@vE|LIl?rbbOntq13soZsp1w$ophX85r!l^nvE<G2 zMsR$zEO@hF<(rN<3dW|dSG9r?7uG0wvwg+u$#WId-}bM0+qU3M^Hy4<IwMPhk%B$d zEjYlK11b|*=Dga{pzwBP$J@RcZ{}`!v%Ep!P0O6uJ6qm#Em3&0WFlB#YRBuQEeaZj zM*2n;n$(OFBV&TjmjmUPjs=kP@V2L6-T}rYMk50w15hFMYIBRin|W*AEN^_%vf%ZG z4R2;PD7>Az;??dMZ>P?9-8Dr){mrx;Z=2_W^8D-dJKpxRzV6-dX6_cM=6oZ<83TJ9 z%sIfA^tNZp>+Us>IGFlodCQx56W+|70V;h#LH=g$me>6=U-!>^)3QL}&D5UP%l1$; z_z7nTp*LIRzFxECZBN6j1B?X-*S^}m=k=N`;4*i~M1?ndr@US`=XKKzLraCXGeQ1P zc+<4zP4^NSINFeK#es12j023X7^8hW{hYO9L5b$=)S0haTVD6KD7;xP$LQ^>9tCKT zHGRX|sVm=X*!yOB>ziE*-cD)Jg>q(3042Si28}vU@;BFbGk1^XtDQaIe9^t&&8D@Q k3dW!$x?q9=G{-|K9csptff>PUq5^R)q^^V%rRc2#09idrd;kCd literal 0 HcmV?d00001 diff --git a/python/tests/ajou.avro b/python/tests/ajou.avro new file mode 100644 index 0000000000000000000000000000000000000000..8409bfd5440d150271ae2c04079a8267d63a43a8 GIT binary patch literal 882 zcmeZI%3@>@Nh~YM*GtY%NloU+E6vFf1M`cMGg5OC_pnqel~fj_Dp@Hg6{RNU7o{la zC@7`mCxZkW74q^+GLusk3KENoQ;QVRit=+69JBIE6-x6m%TkMqGfOJ<K<e`nbHOGg zX62XaK@{uxfwh8U(=t<YQi_$V6r!t@U@9}g_P}h;%qvl<)loo5mt>aYq=NLqR1}vK zW#*-$DoRN#A*d!NGcTKvIpsx}C8<T&RK?aZ{0~)iYM#)R<f*{TP$ziTh-((dsyU1* zuX`HaPM!H`>jZ^Yo43AM-k@M8Vq{=splfKPYhc*Im{C$vP;8~IUtV6W2M(0PWWDSn z{p=!r5KkWxta>T=_PO~fskUXAspV>kMJ1WZIjMg6wuS}<2BvJz!QpGLS!9H;$cPFS wnSm@?hs`2mgheJ)u*e)_(FSZ5nNZgv3y?*duvugZx5$thabyXy2t9oO081b@iU0rr literal 0 HcmV?d00001 diff --git a/python/tests/test_avro.py b/python/tests/test_avro.py new file mode 100644 index 0000000..7eae011 --- /dev/null +++ b/python/tests/test_avro.py @@ -0,0 +1,227 @@ +import datetime +import os +import ssl +from io import BytesIO +from urllib.error import HTTPError +from urllib.request import urlopen + +from bs4 import BeautifulSoup +from fastavro import ( + parse_schema, + reader, + schemaless_reader, + schemaless_writer, + writer, +) + + +schema = { # avsc + "namespace": "ajou.parser", + "name": "Notice", # seperated but will be namespace.name + "doc": "A notice parser from Ajou university.", + "type": "record", + "fields": [ + {"name": "id", "type": "int"}, + {"name": "title", "type": "string"}, + {"name": "date", "type": "string"}, + {"name": "link", "type": "string"}, + {"name": "writer", "type": "string"}, + ], +} +parsed_schema = parse_schema(schema) + +# 'records' can be an iterable (including generator) +records = [ + { + "id": 10005, + "title": "대학교 공지 1", + "date": "2020-12-01", + "link": "https://www.ajou.ac.kr/kr/ajou/notice.do?mode=view&articleNo=10005", + "writer": "CSW", + }, + { + "id": 10006, + "title": "대학교 공지 2", + "date": "2020-12-02", + "link": "https://www.ajou.ac.kr/kr/ajou/notice.do?mode=view&articleNo=10006", + "writer": "CSW", + }, + { + "id": 10007, + "title": "대학교 공지 3", + "date": "2020-12-04", + "link": "https://www.ajou.ac.kr/kr/ajou/notice.do?mode=view&articleNo=10007", + "writer": "CSW", + }, + { + "id": 10008, + "title": "대학교 공지 4", + "date": "2020-12-04", + "link": "https://www.ajou.ac.kr/kr/ajou/notice.do?mode=view&articleNo=10008", + "writer": "CSW", + }, + { + "id": 10009, + "title": "대학교 공지 5", + "date": "2020-12-11", + "link": "https://www.ajou.ac.kr/kr/ajou/notice.do?mode=view&articleNo=10009", + "writer": "CSW", + }, +] + +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) +AVRO_PATH = os.path.join(BASE_DIR, "ajou.avro") + +ADDRESS = "https://www.ajou.ac.kr/kr/ajou/notice.do" +LENGTH = 10 + + +def makeData(postId, postTitle, postDate, postLink, postWriter): + return { + "id": postId, + "title": postTitle, + "date": postDate, + "link": postLink, + "writer": postWriter, + } + + +def parser(): + context = ssl._create_unverified_context() + try: + result = urlopen( + f"{ADDRESS}?mode=list&&articleLimit={LENGTH}&article.offset=0", + context=context, + ) + except HTTPError: + print("Seems like the server is down now.") + return None, None, None, None + html = result.read() + soup = BeautifulSoup(html, "html.parser") + ids = soup.select("table > tbody > tr > td.b-num-box") + posts = soup.select("table > tbody > tr > td.b-td-left > div > a") + dates = soup.select("table > tbody > tr > td.b-td-left > div > div > span.b-date") + writers = soup.select( + "table > tbody > tr > td.b-td-left > div > div.b-m-con > span.b-writer" + ) + return ids, posts, dates, writers + + +def test_single_record(): + # To send with producer + message = { + "id": 10000, + "title": "[FastAVRO] 테스트 공지 제목", + "date": "20.12.23", + "link": "https://somelink", + "writer": "alfex4936", + } + + # How producer produces single data + producer_rb = BytesIO() + schemaless_writer(producer_rb, parsed_schema, message) # write one record + produced_data = producer_rb.getvalue() + + # How consumer reads single record + consumer_rb = BytesIO(produced_data) + decoded = schemaless_reader(consumer_rb, parsed_schema) # read one record + assert decoded == { + "id": 10000, + "title": "[FastAVRO] 테스트 공지 제목", + "date": "20.12.23", + "link": "https://somelink", + "writer": "alfex4936", + } + + # {'id': 10000, 'title': '[FastAVRO] 테스트 공지 제목', 'date': '20.12.23', 'link': 'https://somelink', 'writer': 'alfex4936'} + + +def test_write(): + # Writing + with open(AVRO_PATH, "wb") as out: + writer(out, parsed_schema, records) + + assert os.path.isfile(AVRO_PATH), "Writing avro file gone wrong." + + +def test_read(): + # Reading + with open(AVRO_PATH, "rb") as fo: + for record in reader(fo): + print(record) + assert isinstance(record["id"], int) + + +def test_consumer(): + msg = [ + { + "id": 10000, + "title": "[\ud559\uc2b5\ubc95] \uc131\uacf5\ud558\ub294 \ud559\uc2b5\ub9ac\ub354\ub97c \uc704\ud55c \ud559\uc2b5\ub9ac\ub354\uc6cc\ud06c\uc20d \uc548\ub0b4", + "date": "20.12.07", + "link": "https://www.ajou.ac.kr/kr/ajou/notice.do?mode=view&articleNo=104863&article.offset=0&articleLimit=10", + "writer": "\uad50\uc218\ud559\uc2b5\uac1c\ubc1c\uc13c\ud130", + }, + { + "id": 10001, + "title": "[\ud559\uc2b5\ubc95] \uc131\uacf5\ud558\ud558 \ud559\uc2b5\ub9ac\ub354\ub97c \uc704\ud55c \ud559\uc2b5\ub9ac\ub354\uc6cc\ud06c\uc20d \uc548\ub0b4", + "date": "20.12.08", + "link": "https://www.ajou.ac.kr/kr/ajou/notice.do?mode=view&articleNo=104863&article.offset=0&articleLimit=10", + "writer": "\uad50\uc218\ud559\uc2b5\uac1c\ubc1c\uc13c\ud130", + }, + ] + newFile = BytesIO() + writer(newFile, parsed_schema, msg) + newFile.seek(0) + for record in reader(newFile): + print(record) + + +def test_parse(): + processedIds = [] # already parsed + + with open(AVRO_PATH, "rb") as fo: + for record in reader(fo): + processedIds.append(record["id"]) + print("ALREADY PARSED:", processedIds) + + records = [] + print() # Section + PRODUCED = 0 # How many messages did it send + + # No checking on last parsed date, always starts new + print("Trying to parse new posts...") + ids, posts, dates, writers = parser() # 다시 파싱 + assert ids is not None, f"Check your parser: {ids}." + + for i in range(LENGTH): + postId = int(ids[i].text.strip()) + if postId in processedIds: # Already sent + continue + else: + processedIds.append(postId) + postLink = ADDRESS + posts[i].get("href") + postTitle = posts[i].text.strip() + postDate = dates[i].text.strip() + postWriter = writers[i].text + + # Removing a name duplication + duplicate = "[" + postWriter + "]" + if duplicate in postTitle: # writer: [writer] title + postTitle = postTitle.replace(duplicate, "").strip() # -> writer: title + + data = makeData(postId, postTitle, postDate, postLink, postWriter) + records.append(data) + + print("\n>>> Sending a new post...:", postId) + PRODUCED += 1 + + # Producer 자리 + + if PRODUCED: + print(f"Sent {PRODUCED} post(s)...") + with open(AVRO_PATH, "wb") as out: + writer(out, parsed_schema, records) + else: + print("\t** No new posts yet") + print("Parsed at", datetime.datetime.now()) + -- GitLab