Skip to content
Snippets Groups Projects
Unverified Commit 1d5a67d1 authored by Seok Won's avatar Seok Won
Browse files

Create Ajou notice parse using MySQL

HTMLParser + Slack API + MySQL in python

First, create a database with a table named "notices".

CREATE TABLE notices (
  id INT,
  title VARCHAR(100),
  date VARCHAR(30),
  link VARCHAR(101),
  writer VARCHAR(25)
)
parent 78c8c9e0
No related branches found
No related tags found
No related merge requests found
import datetime
import json
import os
import time
from contextlib import contextmanager
from pathlib import Path
import mysql.connector
import requests
from bs4 import BeautifulSoup
from config import Config
from confluent_kafka import Producer
from slack import WebClient
from slack.errors import SlackApiError
# Producer callback function
def acked(err, msg):
if err is not None:
print("Failed to deliver message: {0}: {1}".format(msg.value(), err.str()))
else:
print("Message produced: {0}...".format(msg.value()))
@contextmanager
def OPEN_DB():
# connect to my local MySQL instance using connection string
db = mysql.connector.connect(
host="localhost",
user=os.environ["MYSQL_USER"],
password=os.environ["MYSQL_PASSWORD"],
database="ajou_notices",
charset="utf8",
)
cursor = db.cursor(buffered=True)
yield cursor
db.commit() # always commits
cursor.close()
db.close()
def checkOldness(db):
pass
def makeData(postId, postTitle, postDate, postLink, postWriter):
return {
postId: {
"TITLE": postTitle,
"DATE": postDate,
"LINK": postLink,
"WRITER": postWriter,
}
}
# Ajou notices parser
def parser():
req = requests.get(f"{ADDRESS}?mode=list&&articleLimit=10&article.offset=0")
req.encoding = "utf-8"
html = req.text
soup = BeautifulSoup(html, "html.parser")
ids = soup.select("table > tbody > tr > td.b-num-box")
posts = soup.select("table > tbody > tr > td.b-td-left > div > a")
dates = soup.select("table > tbody > tr > td.b-td-left > div > div > span.b-date")
writers = soup.select(
"table > tbody > tr > td.b-td-left > div > div.b-m-con > span.b-writer"
)
return ids, posts, dates, writers
ADDRESS = "https://www.ajou.ac.kr/kr/ajou/notice.do"
LENGTH = 10
MAXIMUM_DAY = 7 # remove notices in json that were posted more than 7days ago
DUMP = lambda x: json.dumps(x)
# MySQL commands
INSERT_COMMAND = (
"INSERT INTO notices (id, title, date, link, writer) " "VALUES (%s, %s, %s, %s, %s)"
)
DUPLICATE_COMMAND = "SELECT EXISTS(SELECT * FROM notices WHERE id = %(id)s)"
UPDATE_COMMAND = "UPDATE notices SET date = %(date)s WHERE id = 1"
# MySQL
db = mysql.connector.connect(
host="localhost",
user=os.environ["MYSQL_USER"],
password=os.environ["MYSQL_PASSWORD"],
database="ajou_notices",
charset="utf8",
)
cursor = db.cursor(buffered=True)
# Bot User OAuth Access Token
# used scopes: channels:history, channels:read, chat:write, im:history, mpim:history, users:read
token = os.environ["SLACK_BOT_TOKEN"]
# Getting LAST_PARSED from MySQL, where id = 1
now = datetime.datetime.now()
now = now.strftime("%Y-%m-%d %H:%M:%S.%f")
cursor.execute("SELECT date FROM notices WHERE id = 1")
# Set last parsed time to rest 1 hour well
LAST_PARSED = cursor.fetchone()[0]
assert LAST_PARSED is not None, "LAST_PASRED is None."
# init parser
ids, posts, dates, writers = parser()
# Slack API 초기화
sc = WebClient(token)
channel = "C01G2CR5MEE" # 아주대
# Kafka Producer 만들기 "localhost:9092"
settings = {
"bootstrap.servers": Config.MY_SERVER,
# Safe Producer settings
"enable.idempotence": True,
"acks": "all",
"retries": 10000000,
"max.in.flight": 5,
"compression.type": "lz4",
"linger.ms": 5,
} # "enable.idempotence": True, "retries": 5
p = Producer(settings)
try:
while True: # 1시간마다 무한 반복
PRODUCED = 0
cursor.execute("SELECT date FROM notices WHERE id = 1")
LAST_PARSED = datetime.datetime.strptime(
cursor.fetchone()[0], "%Y-%m-%d %H:%M:%S.%f"
) # db load date where id = 1
assert LAST_PARSED is not None, "LAST_PASRED is None."
try:
now = datetime.datetime.now()
diff = (now - LAST_PARSED).seconds
print("Last parsing:", LAST_PARSED)
if diff / 3600 < 1: # 업데이트 후 1시간이 안 지났음, 대기
print(f"Wait for {3600 - diff} seconds to sync new posts.")
time.sleep(3600 - diff)
cursor.execute(
UPDATE_COMMAND, {"date": now.strftime("%Y-%m-%d %H:%M:%S.%f")}
)
print("Trying to parse new posts...")
ids, posts, dates, writers = parser() # 다시 파싱
for i in range(LENGTH):
postId = ids[i].text.strip()
cursor.execute(
DUPLICATE_COMMAND, {"id": int(postId)}
) # db duplication check
if cursor.fetchone()[0]: # (1, )
continue # postId exists
postLink = posts[i].get("href")
postTitle = posts[i].text.strip()
postDate = dates[i].text.strip()
postWriter = writers[i].text
duplicate = "[" + postWriter + "]"
if duplicate in postTitle: # writer: [writer] title
postTitle = postTitle.replace(
duplicate, ""
).strip() # -> writer: title
dbData = (
int(postId),
postTitle,
postDate,
ADDRESS + postLink,
postWriter,
)
kafkaData = makeData(
postId, postTitle, postDate, ADDRESS + postLink, postWriter
)
cursor.execute(INSERT_COMMAND, dbData) # db insert
print("Sending a new post...:", postId)
PRODUCED += 1
p.produce(
Config.AJOU_TOPIC_ID, value=DUMP(kafkaData[postId]), callback=acked,
)
p.poll(0.5) # 데이터 Kafka에게 전송
if PRODUCED:
print(f"Sent {PRODUCED} posts...")
else:
print("No new posts yet...")
print("Parsed:", now)
except SlackApiError as e:
assert e.response["ok"] is False
print("\t** FAILED: %s" % e.response["error"])
db.commit() # save data
print("Resting 1 hour...")
time.sleep(3600)
except Exception as e:
print(type(e))
print(dir(e))
except KeyboardInterrupt:
print("Pressed CTRL+C...")
finally:
print("Exiting...")
db.commit()
cursor.close()
db.close()
p.flush(100)
import mysql.connector
import requests
from bs4 import BeautifulSoup
from contextlib import contextmanager
import os
# mysql.connector.paramstyle = "pyformat"
ADDRESS = "https://www.ajou.ac.kr/kr/ajou/notice.do"
LENGTH = 10
FILTER_WORDS = () # only parse if notices contain these words
INSERT_COMMAND = (
"INSERT INTO notices (id, title, date, link, writer) " "VALUES (%s, %s, %s, %s, %s)"
)
DUPLICATE_COMMAND = "SELECT EXISTS(SELECT * FROM notices WHERE id = %(id)s)"
UPDATE_COMMAND = "UPDATE notices SET date = %(date)s WHERE id = 1"
@contextmanager
def OPEN_DB():
# connect to my local MySQL instance using connection string
db = mysql.connector.connect(
host="localhost",
user=os.environ["MYSQL_USER"],
password=os.environ["MYSQL_PASSWORD"],
database="ajou_notices",
charset="utf8",
)
cursor = db.cursor(buffered=True)
yield cursor
db.commit() # always commits
cursor.close()
db.close()
# Update MySQL database
def updateDB(cursor):
ids, posts, dates, writers = parser() # 다시 파싱
for i in range(LENGTH):
postId = int(ids[i].text.strip())
cursor.execute(DUPLICATE_COMMAND, {"id": postId})
if cursor.fetchone()[0]: # (1, )
continue # postId exists
postLink = posts[i].get("href")
postTitle = posts[i].text.strip()
postDate = dates[i].text.strip()
postWriter = writers[i].text
duplicate = "[" + postWriter + "]"
if duplicate in postTitle: # writer: [writer] title
postTitle = postTitle.replace(duplicate, "").strip() # -> writer: title
data = (postId, postTitle, postDate, ADDRESS + postLink, postWriter)
cursor.execute(INSERT_COMMAND, data)
# Ajou notices parser
def parser():
req = requests.get(f"{ADDRESS}?mode=list&&articleLimit={LENGTH}&article.offset=0")
req.encoding = "utf-8"
html = req.text
soup = BeautifulSoup(html, "html.parser")
ids = soup.select("table > tbody > tr > td.b-num-box")
posts = soup.select("table > tbody > tr > td.b-td-left > div > a")
dates = soup.select("table > tbody > tr > td.b-td-left > div > div > span.b-date")
writers = soup.select(
"table > tbody > tr > td.b-td-left > div > div.b-m-con > span.b-writer"
)
return ids, posts, dates, writers
def test_open():
with OPEN_DB() as cursor:
assert cursor is not None, "contextmanager doesn't work."
def test_dbconnect():
db = mysql.connector.connect(
host="localhost",
user=os.environ["MYSQL_USER"],
password=os.environ["MYSQL_PASSWORD"],
database="ajou_notices",
charset="utf8",
)
assert db is not None, "db connection is wrong."
db.close()
def test_duplicate():
with OPEN_DB() as cursor:
cursor.execute(DUPLICATE_COMMAND, {"id": 54321}) # 54321 is for the test
if cursor.fetchone()[0]: # (1, )
print("exists")
def test_updateDB():
with OPEN_DB() as cursor:
updateDB(cursor)
cursor.execute(
"SELECT * FROM notices ORDER BY id"
) # query = SELECT * FROM testdb.notices LIMIT 3;
for row in cursor:
print(row)
assert row is not None
def test_lastparsed():
"""Test LAST_PARSED update"""
from datetime import datetime
now = datetime.now()
now = now.strftime("%Y-%m-%d %H:%M:%S.%f")
with OPEN_DB() as cursor:
cursor.execute(UPDATE_COMMAND, {"date": now})
cursor.execute("SELECT date FROM notices WHERE id = 1")
fetch = cursor.fetchone()[0]
assert fetch == now, "Updating LAST_PARSED date failed."
print(fetch) # 2020-12-14 20:29:24.222095
def test_delete():
with OPEN_DB() as cursor:
cursor.execute("DELETE FROM notices WHERE id = 12245")
if __name__ == "__main__":
# INSERT test data
data = (54321, "내 공지", "20.12.14", ADDRESS + "/csw", "csw")
with OPEN_DB() as cursor:
cursor.execute(INSERT_COMMAND, data)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment