Skip to content
Snippets Groups Projects
Unverified Commit 09a1c2d1 authored by Seok Won's avatar Seok Won
Browse files

Create FastAvro example

Apache Avro with fastavro

AjouSlackProducerAvro.py will produce a single record with schema

AjouSlackConsumerAvro.py will consume it with given schema
parent 41efdb4d
Branches
No related tags found
No related merge requests found
...@@ -24,6 +24,8 @@ ...@@ -24,6 +24,8 @@
* [Ajou notices Parsing Producer with Slack API](https://github.com/Alfex4936/kafka-Studies/blob/main/python/src/AjouSlackProducer.py) * [Ajou notices Parsing Producer with Slack API](https://github.com/Alfex4936/kafka-Studies/blob/main/python/src/AjouSlackProducer.py)
* [Ajou notices Parsing Producer with Slack API using MySQL](https://github.com/Alfex4936/kafka-Studies/blob/main/python/src/AjouSlackProducerMySQL.py) * [Ajou notices Parsing Producer with Slack API using MySQL](https://github.com/Alfex4936/kafka-Studies/blob/main/python/src/AjouSlackProducerMySQL.py)
* [Ajou notices Parsing Consumer with Slack API](https://github.com/Alfex4936/kafka-Studies/blob/main/python/src/AjouSlackConsumer.py) * [Ajou notices Parsing Consumer with Slack API](https://github.com/Alfex4936/kafka-Studies/blob/main/python/src/AjouSlackConsumer.py)
* [FastAvro Producer example](https://github.com/Alfex4936/kafka-Studies/blob/main/python/src/avro/AjouSlackProducerAvro.py)
* [FastAvro Consumer example](https://github.com/Alfex4936/kafka-Studies/blob/main/python/src/avro/AjouSlackConsumerAvro.py)
* Kafka (Java) * Kafka (Java)
* [Simple Producer](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson1/ProducerDemo.java) * [Simple Producer](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson1/ProducerDemo.java)
......
...@@ -125,3 +125,33 @@ _Please contact them immediately and see if we can fix the issue *right here, ri ...@@ -125,3 +125,33 @@ _Please contact them immediately and see if we can fix the issue *right here, ri
<img width="480" src="https://github.com/Alfex4936/kafka-Studies/blob/main/img/slack.png"> <img width="480" src="https://github.com/Alfex4936/kafka-Studies/blob/main/img/slack.png">
</p> </p>
</div> </div>
## FastAvro Producer / Consumer
Install fastavro with pip
```console
WIN10@DESKTOP:~$ pip install fastavro
```
Example Schema to use
```python
schema = { # avsc
"namespace": "ajou.parser",
"name": "Notice", # seperated but will be namespace.name
"doc": "A notice parser from Ajou university.",
"type": "record",
"fields": [
{"name": "id", "type": "int"},
{"name": "title", "type": "string"},
{"name": "date", "type": "string"},
{"name": "link", "type": "string"},
{"name": "writer", "type": "string"},
],
}
parsed_schema = parse_schema(schema)
```
How to produce single data with Producer &
consume it with Consumer at [pytest](https://github.com/Alfex4936/kafka-Studies/blob/main/python/tests/test_avro.py#L110)
...@@ -8,6 +8,12 @@ from slack import WebClient ...@@ -8,6 +8,12 @@ from slack import WebClient
from slack.errors import SlackApiError from slack.errors import SlackApiError
def error_cb(error):
print(">>>", error)
if error == KafkaError._ALL_BROKERS_DOWN:
print("SERVER DOWN")
# Bot User OAuth Access Token # Bot User OAuth Access Token
# used scopes: channels:history, channels:read, chat:write, im:history, mpim:history, users:read # used scopes: channels:history, channels:read, chat:write, im:history, mpim:history, users:read
token = os.environ["SLACK_BOT_TOKEN"] token = os.environ["SLACK_BOT_TOKEN"]
...@@ -20,6 +26,7 @@ settings = { ...@@ -20,6 +26,7 @@ settings = {
"bootstrap.servers": Config.VM_SERVER, "bootstrap.servers": Config.VM_SERVER,
"group.id": "ajou-notify", "group.id": "ajou-notify",
"default.topic.config": {"auto.offset.reset": "largest"}, "default.topic.config": {"auto.offset.reset": "largest"},
"error_cb": error_cb,
# "debug": "broker, cgrp", # "debug": "broker, cgrp",
} }
c = Consumer(settings) c = Consumer(settings)
...@@ -36,6 +43,7 @@ try: ...@@ -36,6 +43,7 @@ try:
elif not msg.error(): elif not msg.error():
print("Received a message: {0}".format(msg.value())) print("Received a message: {0}".format(msg.value()))
if msg.value() is None: if msg.value() is None:
print("But the message value is empty.")
continue continue
try: try:
......
import datetime import datetime
import json import json
import os import os
import ssl
import time import time
from pathlib import Path from pathlib import Path
from urllib.error import HTTPError from urllib.error import HTTPError
......
...@@ -150,6 +150,7 @@ class AjouParser: ...@@ -150,6 +150,7 @@ class AjouParser:
time.sleep(period) time.sleep(period)
except Exception as e: # General exceptions except Exception as e: # General exceptions
print(e)
print(dir(e)) print(dir(e))
except KeyboardInterrupt: except KeyboardInterrupt:
print("Pressed CTRL+C...") print("Pressed CTRL+C...")
......
import os
import time
from io import BytesIO
from confluent_kafka import Consumer, KafkaError, KafkaException
from fastavro import parse_schema, schemaless_reader, writer
from slack import WebClient
from slack.errors import SlackApiError
schema = { # avsc
"namespace": "ajou.parser",
"name": "Notice", # seperated but will be namespace.name
"doc": "A notice parser from Ajou university.",
"type": "record",
"fields": [
{"name": "id", "type": "int"},
{"name": "title", "type": "string"},
{"name": "date", "type": "string"},
{"name": "link", "type": "string"},
{"name": "writer", "type": "string"},
],
}
parsed_schema = parse_schema(schema)
# Bot User OAuth Access Token
# used scopes: channels:history, channels:read, chat:write, im:history, mpim:history, users:read
token = os.environ["SLACK_BOT_TOKEN"]
sc = WebClient(token)
# Set 'auto.offset.reset': 'smallest' if you want to consume all messages
# from the beginning of the topic
settings = {
"bootstrap.servers": os.environ["VM_SERVER"],
"group.id": "ajou-notify",
"default.topic.config": {"auto.offset.reset": "largest"},
# "value.deserializer": lambda v: json.loads(v.decode("utf-8")),
# "debug": "broker, cgrp",
}
c = Consumer(settings)
# Topic = "AJOU-NOTIFY
c.subscribe(["AJOU-NOTIFY"])
try:
while True:
# SIGINT can't be handled when polling, limit timeout to 1 second.
msg = c.poll(1.0)
if msg is None:
time.sleep(10)
continue
elif not msg.error():
print("Received messages: {0}".format(len(msg)))
if msg.value() is None:
print("But the message value is empty.")
continue
# Consumer read message
message = msg.value()
rb = BytesIO(message)
app_msg = schemaless_reader(rb, parsed_schema) # read one record
try:
title = app_msg["title"]
date = app_msg["date"]
href = app_msg["link"]
writer = app_msg["writer"]
channel = "아주대" # C01G2CR5MEE
text = ":star: `%s` 새로운 공지!\n>%s: %s\n>링크: <%s|공지 확인하기>" % (
date,
writer,
title,
href,
)
print('\nSending message "%s" to channel %s' % (text, channel))
except SlackApiError as e:
print("Failed to get channel/text from message.")
print(e.response["error"])
channel = "kafka"
text = msg.value()
try:
sc_response = sc.chat_postMessage(
channel=channel, text=text, as_user=True, username="아주대 공지 봇"
) # as_user은 new slack app에서 작동 안 함
except SlackApiError as e:
assert e.response["ok"] is False
print("\t** FAILED: %s" % e.response["error"])
elif msg.error().code() == KafkaError._PARTITION_EOF:
print(
"End of partition reached {0}/{1}".format(msg.topic(), msg.partition())
)
elif msg.error():
raise KafkaException(msg.error())
else:
print("Error occured: {0}".format(msg.error().str()))
except Exception as e:
print(e)
print(dir(e))
except KeyboardInterrupt:
print("Pressed CTRL+C...")
finally:
print("Closing...")
c.close()
import datetime
import os
import ssl
import time
from io import BytesIO
from urllib.error import HTTPError
from urllib.request import urlopen
from bs4 import BeautifulSoup
from confluent_kafka import Producer
from fastavro import parse_schema, reader, schemaless_writer, writer
schema = { # avsc
"namespace": "ajou.parser",
"name": "Notice", # seperated but will be namespace.name
"doc": "A notice parser from Ajou university.",
"type": "record",
"fields": [
{"name": "id", "type": "int"},
{"name": "title", "type": "string"},
{"name": "date", "type": "string"},
{"name": "link", "type": "string"},
{"name": "writer", "type": "string"},
],
}
parsed_schema = parse_schema(schema)
class AjouParserAVRO:
"""
Ajou notices Parser using Slack API and Apache Kafka (AVRO)
AVRO file will be saved in your current directory.
Methods
-------
run(server=Config.VM_SERVER, avro_name="ajou.avro")
Usage
-----
ajou = AjouParserAVRO(Kafka_server_ip, avro_name)
ajou.run()
"""
# HTML
ADDRESS = "https://www.ajou.ac.kr/kr/ajou/notice.do"
LENGTH = 10
# AVRO
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
__slots__ = ("settings", "producer", "AVRO_PATH")
def __init__(self, server=os.environ["VM_SERVER"], avro_name="ajou.avro"):
print("Initializing...")
self.AVRO_PATH = os.path.join(
self.BASE_DIR, avro_name
) # saves into current dir
self.settings = { # Producer settings
"bootstrap.servers": server,
"enable.idempotence": True, # Safe
"acks": "all", # Safe
"retries": 5, # Safe
"max.in.flight": 5, # High throughput
"compression.type": "snappy", # High throughput
"linger.ms": 5, # High throughput
}
self.producer = Producer(self.settings)
def run(self, period=3600): # period (second)
"""Check notices from html per period and sends data to Kafka Consumer."""
p = self.producer
processedIds = [] # already parsed
with open(self.AVRO_PATH, "rb") as fo:
for record in reader(fo):
processedIds.append(record["id"]) # add id to already parsed list
try:
while True: # 1시간마다 무한 반복
records = []
print() # Section
PRODUCED = 0 # How many messages did it send
# No checking on last parsed date, always starts new
print("Trying to parse new posts...")
ids, posts, dates, writers = self.parser() # 다시 파싱
assert ids is not None, f"Check your parser: {ids}."
for i in range(self.LENGTH): # check all 10 notices
postId = int(ids[i].text.strip())
if postId in processedIds:
continue
else:
processedIds.append(postId)
postLink = self.ADDRESS + posts[i].get("href")
postTitle = posts[i].text.strip()
postDate = dates[i].text.strip()
postWriter = writers[i].text
# Removing a name duplication
duplicate = "[" + postWriter + "]"
if duplicate in postTitle: # writer: [writer] title
postTitle = postTitle.replace(
duplicate, ""
).strip() # -> writer: title
data = self.makeData(
postId, postTitle, postDate, postLink, postWriter
)
rb = BytesIO()
schemaless_writer(rb, parsed_schema, data) # write one record
records.append(data) # to write avro
print("\n>>> Sending a new post...:", postId)
PRODUCED += 1
p.produce(
"AJOU-NOTIFY", value=rb.getvalue(), callback=self.acked,
)
p.poll(1) # 데이터 Kafka에게 전송, second
if PRODUCED:
print(f"Sent {PRODUCED} post(s)...")
with open(self.AVRO_PATH, "wb") as out:
# write avro only when there are updates
writer(out, parsed_schema, records)
else:
print("\t** No new posts yet")
print("Parsed at", self.getTimeNow())
print(f"Resting {period // 3600} hour...")
time.sleep(period)
except Exception as e: # General exceptions
print(e)
print(dir(e))
except KeyboardInterrupt:
print("Pressed CTRL+C...")
finally:
print("\nExiting...")
p.flush(100)
@staticmethod
def error_cb(error):
print(">>>", error)
# Producer callback function
@staticmethod
def acked(err, msg):
if err is not None:
print(
"\t** Failed to deliver message: {0}: {1}".format(
msg.value(), err.str()
)
)
else:
print("Message produced correctly...")
@staticmethod
def makeData(postId, postTitle, postDate, postLink, postWriter):
return {
"id": postId,
"title": postTitle,
"date": postDate,
"link": postLink,
"writer": postWriter,
}
@staticmethod
def getTimeNow() -> datetime.datetime:
return datetime.datetime.now()
# Ajou notices parser
def parser(self):
context = ssl._create_unverified_context()
try:
result = urlopen(
f"{self.ADDRESS}?mode=list&&articleLimit={self.LENGTH}&article.offset=0",
context=context,
)
except HTTPError:
print("Seems like the server is down now.")
return None, None, None, None
html = result.read()
soup = BeautifulSoup(html, "html.parser")
ids = soup.select("table > tbody > tr > td.b-num-box")
posts = soup.select("table > tbody > tr > td.b-td-left > div > a")
dates = soup.select(
"table > tbody > tr > td.b-td-left > div > div > span.b-date"
)
writers = soup.select(
"table > tbody > tr > td.b-td-left > div > div.b-m-con > span.b-writer"
)
return ids, posts, dates, writers
if __name__ == "__main__":
ajou = AjouParserAVRO()
ajou.run()
File added
File added
import datetime
import os
import ssl
from io import BytesIO
from urllib.error import HTTPError
from urllib.request import urlopen
from bs4 import BeautifulSoup
from fastavro import (
parse_schema,
reader,
schemaless_reader,
schemaless_writer,
writer,
)
schema = { # avsc
"namespace": "ajou.parser",
"name": "Notice", # seperated but will be namespace.name
"doc": "A notice parser from Ajou university.",
"type": "record",
"fields": [
{"name": "id", "type": "int"},
{"name": "title", "type": "string"},
{"name": "date", "type": "string"},
{"name": "link", "type": "string"},
{"name": "writer", "type": "string"},
],
}
parsed_schema = parse_schema(schema)
# 'records' can be an iterable (including generator)
records = [
{
"id": 10005,
"title": "대학교 공지 1",
"date": "2020-12-01",
"link": "https://www.ajou.ac.kr/kr/ajou/notice.do?mode=view&articleNo=10005",
"writer": "CSW",
},
{
"id": 10006,
"title": "대학교 공지 2",
"date": "2020-12-02",
"link": "https://www.ajou.ac.kr/kr/ajou/notice.do?mode=view&articleNo=10006",
"writer": "CSW",
},
{
"id": 10007,
"title": "대학교 공지 3",
"date": "2020-12-04",
"link": "https://www.ajou.ac.kr/kr/ajou/notice.do?mode=view&articleNo=10007",
"writer": "CSW",
},
{
"id": 10008,
"title": "대학교 공지 4",
"date": "2020-12-04",
"link": "https://www.ajou.ac.kr/kr/ajou/notice.do?mode=view&articleNo=10008",
"writer": "CSW",
},
{
"id": 10009,
"title": "대학교 공지 5",
"date": "2020-12-11",
"link": "https://www.ajou.ac.kr/kr/ajou/notice.do?mode=view&articleNo=10009",
"writer": "CSW",
},
]
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
AVRO_PATH = os.path.join(BASE_DIR, "ajou.avro")
ADDRESS = "https://www.ajou.ac.kr/kr/ajou/notice.do"
LENGTH = 10
def makeData(postId, postTitle, postDate, postLink, postWriter):
return {
"id": postId,
"title": postTitle,
"date": postDate,
"link": postLink,
"writer": postWriter,
}
def parser():
context = ssl._create_unverified_context()
try:
result = urlopen(
f"{ADDRESS}?mode=list&&articleLimit={LENGTH}&article.offset=0",
context=context,
)
except HTTPError:
print("Seems like the server is down now.")
return None, None, None, None
html = result.read()
soup = BeautifulSoup(html, "html.parser")
ids = soup.select("table > tbody > tr > td.b-num-box")
posts = soup.select("table > tbody > tr > td.b-td-left > div > a")
dates = soup.select("table > tbody > tr > td.b-td-left > div > div > span.b-date")
writers = soup.select(
"table > tbody > tr > td.b-td-left > div > div.b-m-con > span.b-writer"
)
return ids, posts, dates, writers
def test_single_record():
# To send with producer
message = {
"id": 10000,
"title": "[FastAVRO] 테스트 공지 제목",
"date": "20.12.23",
"link": "https://somelink",
"writer": "alfex4936",
}
# How producer produces single data
producer_rb = BytesIO()
schemaless_writer(producer_rb, parsed_schema, message) # write one record
produced_data = producer_rb.getvalue()
# How consumer reads single record
consumer_rb = BytesIO(produced_data)
decoded = schemaless_reader(consumer_rb, parsed_schema) # read one record
assert decoded == {
"id": 10000,
"title": "[FastAVRO] 테스트 공지 제목",
"date": "20.12.23",
"link": "https://somelink",
"writer": "alfex4936",
}
# {'id': 10000, 'title': '[FastAVRO] 테스트 공지 제목', 'date': '20.12.23', 'link': 'https://somelink', 'writer': 'alfex4936'}
def test_write():
# Writing
with open(AVRO_PATH, "wb") as out:
writer(out, parsed_schema, records)
assert os.path.isfile(AVRO_PATH), "Writing avro file gone wrong."
def test_read():
# Reading
with open(AVRO_PATH, "rb") as fo:
for record in reader(fo):
print(record)
assert isinstance(record["id"], int)
def test_consumer():
msg = [
{
"id": 10000,
"title": "[\ud559\uc2b5\ubc95] \uc131\uacf5\ud558\ub294 \ud559\uc2b5\ub9ac\ub354\ub97c \uc704\ud55c \ud559\uc2b5\ub9ac\ub354\uc6cc\ud06c\uc20d \uc548\ub0b4",
"date": "20.12.07",
"link": "https://www.ajou.ac.kr/kr/ajou/notice.do?mode=view&articleNo=104863&article.offset=0&articleLimit=10",
"writer": "\uad50\uc218\ud559\uc2b5\uac1c\ubc1c\uc13c\ud130",
},
{
"id": 10001,
"title": "[\ud559\uc2b5\ubc95] \uc131\uacf5\ud558\ud558 \ud559\uc2b5\ub9ac\ub354\ub97c \uc704\ud55c \ud559\uc2b5\ub9ac\ub354\uc6cc\ud06c\uc20d \uc548\ub0b4",
"date": "20.12.08",
"link": "https://www.ajou.ac.kr/kr/ajou/notice.do?mode=view&articleNo=104863&article.offset=0&articleLimit=10",
"writer": "\uad50\uc218\ud559\uc2b5\uac1c\ubc1c\uc13c\ud130",
},
]
newFile = BytesIO()
writer(newFile, parsed_schema, msg)
newFile.seek(0)
for record in reader(newFile):
print(record)
def test_parse():
processedIds = [] # already parsed
with open(AVRO_PATH, "rb") as fo:
for record in reader(fo):
processedIds.append(record["id"])
print("ALREADY PARSED:", processedIds)
records = []
print() # Section
PRODUCED = 0 # How many messages did it send
# No checking on last parsed date, always starts new
print("Trying to parse new posts...")
ids, posts, dates, writers = parser() # 다시 파싱
assert ids is not None, f"Check your parser: {ids}."
for i in range(LENGTH):
postId = int(ids[i].text.strip())
if postId in processedIds: # Already sent
continue
else:
processedIds.append(postId)
postLink = ADDRESS + posts[i].get("href")
postTitle = posts[i].text.strip()
postDate = dates[i].text.strip()
postWriter = writers[i].text
# Removing a name duplication
duplicate = "[" + postWriter + "]"
if duplicate in postTitle: # writer: [writer] title
postTitle = postTitle.replace(duplicate, "").strip() # -> writer: title
data = makeData(postId, postTitle, postDate, postLink, postWriter)
records.append(data)
print("\n>>> Sending a new post...:", postId)
PRODUCED += 1
# Producer 자리
if PRODUCED:
print(f"Sent {PRODUCED} post(s)...")
with open(AVRO_PATH, "wb") as out:
writer(out, parsed_schema, records)
else:
print("\t** No new posts yet")
print("Parsed at", datetime.datetime.now())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment