Skip to content
Snippets Groups Projects
Commit df21baf7 authored by Seok Won's avatar Seok Won
Browse files

Slack + Kafka: 아주대학교 공지 봇

1시간마다 모든 공지를 불러 {"TITLE": "제목", "DATE": "올린 날", "LINK": "http 주소", "WRITER": "글쓴이"}를 json 형태로 저장한다.

이 json 파일이나 새로운 공지가 있으면 기존 json과 비교해서 새로운 데이터를 Consumer로 보내고, Consumer는 새로운 데이터를 받으면, Slack API를 이용해, "#아주대" 채널에 공지를 올려준다.

마지막 파싱 시간도 기록해 종료 후 다시 불러도 1시간이 지나지 않으면 파싱하지 않는다.

결과)
Last parsing: 1972-12-01 07:00:00
Trying to parse new posts...
Sending a new post...: 12179
...

Last parsing: 2020-12-04 19:11:42.839219
Trying to parse new posts...
No new posts yet...
Resting 1 hour...
...
Last parsing: 2020-12-06 11:55:35.386262
Wait for 3494 seconds to sync new posts.
parent c2b6d855
No related branches found
No related tags found
No related merge requests found
img/slack_ajou.png

27.1 KiB

...@@ -34,6 +34,36 @@ WIN10@DESKTOP:~$ pip install confluent-kafka ...@@ -34,6 +34,36 @@ WIN10@DESKTOP:~$ pip install confluent-kafka
``` ```
## Slack API with Ajou University notices parser
[Get Slack API here](https://api.slack.com/)
First, invite your bot to your channel. (In this example, we set it as "#아주대")
The [producer](https://github.com/Alfex4936/kafka-Studies/tree/main/python/src/AjouSlackProducer.py) will notify the [consumer](https://github.com/Alfex4936/kafka-Studies/tree/main/python/src/AjouSlackConsumer.py) whenever there are new notices.
The *producer* checks new notices per an hour, saves latest 10 notices to json file,
and sees if there is/are a new notice/s.
If there is a new notice, it sends {"TITLE": "title", "DATE": "post date", "LINK": "http address", "WRITER": "writer"} to the consumer.
The *consumer* checks new datas every 5 seconds, if it gets a new data,
it consumes the data and leave a comment like this below image.
<div align="center">
<p>
<img width="480" src="https://github.com/Alfex4936/kafka-Studies/blob/main/img/slack_ajou.png">
</p>
</div>
:b: Run the server first to see the results.
```console
WIN10@DESKTOP:~$ python AjouSlackProducer.py
WIN10@DESKTOP:~$ python AjouSlackConsumer.py
```
## Slack API Producer Usage ## Slack API Producer Usage
[Get Slack API](https://api.slack.com/) [Get Slack API](https://api.slack.com/)
......
import json
import time
import os
from config import Config
from confluent_kafka import Consumer, KafkaError
from slack import WebClient
from slack.errors import SlackApiError
# Bot User OAuth Access Token
token = os.environ["SLACK_BOT_TOKEN"]
sc = WebClient(token)
# Set 'auto.offset.reset': 'smallest' if you want to consume all messages
# from the beginning of the topic
settings = {
"bootstrap.servers": Config.MY_SERVER,
"group.id": "ajou-notify",
"default.topic.config": {"auto.offset.reset": "largest"},
}
c = Consumer(settings)
# Topic = "AJOU-NOTIFY
c.subscribe([Config.AJOU_TOPIC_ID])
try:
while True:
msg = c.poll(0.1)
time.sleep(5)
if msg is None:
continue
elif not msg.error():
print("Received a message: {0}".format(msg.value()))
if msg.value() is None:
continue
try:
app_msg = json.loads(msg.value().decode())
except:
app_msg = json.loads(msg.value())
try:
title = app_msg["TITLE"]
date = app_msg["DATE"]
href = app_msg["LINK"]
writer = app_msg["WRITER"]
channel = "아주대"
# TODO: 학사면 좀 더 중요하게?
text = ":star: `%s` 새로운 공지!\n>%s: %s\n>링크: <%s|공지 확인하기>" % (
date,
writer,
title,
href,
)
print('\nSending message "%s" to channel %s' % (text, channel))
except SlackApiError as e:
print("Failed to get channel/text from message.")
print(e.response["error"])
channel = "kafka"
text = msg.value()
try:
sc_response = sc.chat_postMessage(
channel=channel, text=text, as_user=True, username="아주대 공지 봇"
) # as_user은 new slack app에서 작동 안 함
except SlackApiError as e:
assert e.response["ok"] is False
print("\t** FAILED: %s" % e.response["error"])
elif msg.error().code() == KafkaError._PARTITION_EOF:
print(
"End of partition reached {0}/{1}".format(msg.topic(), msg.partition())
)
else:
print("Error occured: {0}".format(msg.error().str()))
except Exception as e:
print(type(e))
print(dir(e))
except KeyboardInterrupt:
print("Pressed CTRL+C...")
finally:
c.close()
import datetime
import json
import os
import time
from contextlib import contextmanager
from pathlib import Path
import requests
from bs4 import BeautifulSoup
from config import Config
from confluent_kafka import Producer
from slack import WebClient
from slack.errors import SlackApiError
# Producer callback function
def acked(err, msg):
if err is not None:
print("Failed to deliver message: {0}: {1}".format(msg.value(), err.str()))
else:
print("Message produced: {0}".format(msg.value())) # binary
# Make data into dictionary format
def makeJson(postId, postTitle, postDate, postLink, postWriter):
return {
postId: {
"TITLE": postTitle,
"DATE": postDate,
"LINK": ADDRESS + postLink,
"WRITER": postWriter,
}
}
# Ajou notices parser
def parser():
req = requests.get(f"{ADDRESS}?mode=list&&articleLimit=10&article.offset=0")
req.encoding = "utf-8"
html = req.text
soup = BeautifulSoup(html, "html.parser")
ids = soup.select("table > tbody > tr > td.b-num-box")
posts = soup.select("table > tbody > tr > td.b-td-left > div > a")
dates = soup.select("table > tbody > tr > td.b-td-left > div > div > span.b-date")
writers = soup.select(
"table > tbody > tr > td.b-td-left > div > div.b-m-con > span.b-writer"
)
return ids, posts, dates, writers
ADDRESS = "https://www.ajou.ac.kr/kr/ajou/notice.do"
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
JSON_PATH = os.path.join(BASE_DIR, "already_read.json")
LENGTH = 10
PRODUCED = 0
# Bot User OAuth Access Token
# used scopes: channels:history, channels:read, chat:write, im:history, mpim:history, users:read
token = os.environ["SLACK_BOT_TOKEN"]
read = None
# 공지 Parser
if not Path(JSON_PATH).is_file(): # 파일 없으면 기본 형식 만듬
base_data = {"POSTS": {}, "LAST_PARSED": "1972-12-01 07:00:00.000000"}
with open(JSON_PATH, "a+") as f:
f.write(json.dumps(base_data))
# json read
with open(JSON_PATH, "r+") as f_read:
read = json.load(f_read)
# Set last parsed time to rest 1 hour well
LAST_PARSED = datetime.datetime.strptime(read["LAST_PARSED"], "%Y-%m-%d %H:%M:%S.%f")
# init parser
ids, posts, dates, writers = parser()
# Slack API 초기화
sc = WebClient(token)
channel = "C01G2CR5MEE" # 아주대
# Kafka Producer 만들기 "localhost:9092"
settings = {"bootstrap.servers": Config.MY_SERVER}
p = Producer(settings)
try:
while True: # 1시간마다 무한 반복
PRODUCED = 0
LAST_PARSED = datetime.datetime.strptime(
read["LAST_PARSED"], "%Y-%m-%d %H:%M:%S.%f"
)
try:
now = datetime.datetime.now()
diff = (now - LAST_PARSED).seconds
print("Last parsing:", LAST_PARSED)
if diff / 3600 < 1: # 업데이트 후 1시간이 안 지났음, 대기
print(f"Wait for {3600 - diff} seconds to sync new posts.")
time.sleep(3600 - diff)
read["LAST_PARSED"] = now.strftime("%Y-%m-%d %H:%M:%S.%f")
print("Trying to parse new posts...")
ids, posts, dates, writers = parser() # 다시 파싱
for i in range(LENGTH):
postId = ids[i].text.strip()
postLink = posts[i].get("href")
postTitle = posts[i].text.strip()
# postTitle = posts[i].get("title")
postDate = dates[i].text.strip()
postWriter = writers[i].text
data = makeJson(postId, postTitle, postDate, postLink, postWriter)
# {'10000': {'TITLE': '설문조사', 'DATE': '20.12.04', 'LINK': 'https', 'WRITER': '입학처'}}
if postId not in read["POSTS"]:
print("Sending a new post...:", postId)
read["POSTS"].update(data)
PRODUCED += 1
p.produce(
Config.AJOU_TOPIC_ID,
value=json.dumps(data[postId]),
callback=acked,
)
p.poll(0.5) # 데이터 Kafka에게 전송
else:
continue
if PRODUCED:
print(f"Sent {PRODUCED} posts...")
else:
print("No new posts yet...")
except SlackApiError as e:
assert e.response["ok"] is False
print("\t** FAILED: %s" % e.response["error"])
with open(JSON_PATH, "w+") as f:
f.write(json.dumps(read))
with open(JSON_PATH, "r+") as f:
read = json.load(f)
print("Resting 1 hour...")
time.sleep(3600)
except Exception as e:
print(type(e))
print(dir(e))
except KeyboardInterrupt:
print("Pressed CTRL+C...")
finally:
print("Exiting...")
p.flush(100)
import json import json
import time import time
import os
from confluent_kafka import Consumer, KafkaError from confluent_kafka import Consumer, KafkaError
from slack import WebClient from slack import WebClient
from slack.errors import SlackApiError from slack.errors import SlackApiError
...@@ -8,7 +11,7 @@ from slack.errors import SlackApiError ...@@ -8,7 +11,7 @@ from slack.errors import SlackApiError
# Bot User OAuth Access Token # Bot User OAuth Access Token
# Scope = chat:write # Scope = chat:write
token = "" token = os.environ["SLACK_BOT_TOKEN"]
sc = WebClient(token) sc = WebClient(token)
...@@ -26,7 +29,7 @@ c.subscribe(["SLACK-KAFKA"]) ...@@ -26,7 +29,7 @@ c.subscribe(["SLACK-KAFKA"])
try: try:
while True: while True:
msg = c.poll(0.1) msg = c.poll(0.1) # read data
time.sleep(5) time.sleep(5)
if msg is None: if msg is None:
continue continue
......
import json import json
import time import time
import os
from config import Config from config import Config
from confluent_kafka import Producer from confluent_kafka import Producer
from slack import WebClient from slack import WebClient
...@@ -9,7 +11,7 @@ from slack.errors import SlackApiError ...@@ -9,7 +11,7 @@ from slack.errors import SlackApiError
# Bot User OAuth Access Token # Bot User OAuth Access Token
# used scopes: channels:history, channels:read, chat:write, im:history, mpim:history, users:read # used scopes: channels:history, channels:read, chat:write, im:history, mpim:history, users:read
token = "" token = os.environ["SLACK_BOT_TOKEN"]
# Slack API 초기화 # Slack API 초기화
sc = WebClient(token) sc = WebClient(token)
......
...@@ -2,6 +2,7 @@ class Config: ...@@ -2,6 +2,7 @@ class Config:
MY_SERVER = "localhost:9092" MY_SERVER = "localhost:9092"
TOPIC_ID = "first-topic" TOPIC_ID = "first-topic"
SLACK_TOPID_ID = "SLACK-KAFKA" SLACK_TOPID_ID = "SLACK-KAFKA"
AJOU_TOPIC_ID = "AJOU-NOTIFY"
GROUP_ID = "group-one" GROUP_ID = "group-one"
CLIENT_ID = "client-1" CLIENT_ID = "client-1"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment