Skip to content
GitLab
Explore
Sign in
Register
Primary navigation
Search or go to…
Project
K
kafka-Studies
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package registry
Container registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Terms and privacy
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
최석원
kafka-Studies
Commits
3d33549b
Unverified
Commit
3d33549b
authored
4 years ago
by
Seok Won
Browse files
Options
Downloads
Patches
Plain Diff
Refactor AjouSlackProducer
using classes and optimizations. more human readable
parent
dcf9e14c
Branches
Branches containing commit
No related tags found
No related merge requests found
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
python/src/AjouSlackProducer.py
+199
-144
199 additions, 144 deletions
python/src/AjouSlackProducer.py
python/src/AjouSlackProducerMySQL.py
+3
-3
3 additions, 3 deletions
python/src/AjouSlackProducerMySQL.py
with
202 additions
and
147 deletions
python/src/AjouSlackProducer.py
+
199
−
144
View file @
3d33549b
...
...
@@ -9,163 +9,218 @@ from bs4 import BeautifulSoup
from
config
import
Config
from
confluent_kafka
import
Producer
# Producer callback function
def
acked
(
err
,
msg
):
if
err
is
not
None
:
print
(
"
Failed to deliver message: {0}: {1}
"
.
format
(
msg
.
value
(),
err
.
str
()))
else
:
print
(
"
Message produced: {0}...
"
.
format
(
msg
.
value
()))
# Make data into dictionary format
def
makeJson
(
postId
,
postTitle
,
postDate
,
postLink
,
postWriter
):
duplicate
=
"
[
"
+
postWriter
+
"
]
"
if
duplicate
in
postTitle
:
# writer: [writer] title
postTitle
=
postTitle
.
replace
(
duplicate
,
""
).
strip
()
# -> writer: title
return
{
postId
:
{
"
TITLE
"
:
postTitle
,
"
DATE
"
:
postDate
,
"
LINK
"
:
ADDRESS
+
postLink
,
"
WRITER
"
:
postWriter
,
}
}
def
checkOldness
(
jsonFile
):
today
=
datetime
.
datetime
.
today
()
today
=
datetime
.
datetime
(
today
.
year
,
today
.
month
,
today
.
day
)
for
post
in
list
(
jsonFile
[
"
POSTS
"
]):
currentDate
=
jsonFile
[
"
POSTS
"
][
post
][
"
DATE
"
]
# string
savedDate
=
datetime
.
datetime
.
strptime
(
currentDate
,
"
%y.%m.%d
"
)
if
(
today
-
savedDate
).
days
>
MAXIMUM_DAY
:
del
jsonFile
[
"
POSTS
"
][
post
]
with
open
(
JSON_PATH
,
"
w+
"
)
as
f
:
f
.
write
(
json
.
dumps
(
jsonFile
))
with
open
(
JSON_PATH
,
"
r+
"
)
as
f
:
read
=
json
.
load
(
f
)
return
read
# Ajou notices parser
def
parser
():
req
=
requests
.
get
(
f
"
{
ADDRESS
}
?mode=list&&articleLimit=10&article.offset=0
"
)
req
.
encoding
=
"
utf-8
"
html
=
req
.
text
soup
=
BeautifulSoup
(
html
,
"
html.parser
"
)
ids
=
soup
.
select
(
"
table > tbody > tr > td.b-num-box
"
)
posts
=
soup
.
select
(
"
table > tbody > tr > td.b-td-left > div > a
"
)
dates
=
soup
.
select
(
"
table > tbody > tr > td.b-td-left > div > div > span.b-date
"
)
writers
=
soup
.
select
(
"
table > tbody > tr > td.b-td-left > div > div.b-m-con > span.b-writer
"
)
return
ids
,
posts
,
dates
,
writers
ADDRESS
=
"
https://www.ajou.ac.kr/kr/ajou/notice.do
"
BASE_DIR
=
os
.
path
.
dirname
(
os
.
path
.
abspath
(
__file__
))
JSON_PATH
=
os
.
path
.
join
(
BASE_DIR
,
"
already_read.json
"
)
LENGTH
=
10
PRODUCED
=
0
MAXIMUM_DAY
=
7
# remove notices in json that were posted more than 7days ago
DUMP
=
lambda
x
:
json
.
dumps
(
x
)
LOAD
=
lambda
x
:
json
.
load
(
x
)
# 공지 Parser
if
not
Path
(
JSON_PATH
).
is_file
():
# 파일 없으면 기본 형식 만듬
base_data
=
{
"
POSTS
"
:
{},
"
LAST_PARSED
"
:
"
1972-12-01 07:00:00.000000
"
}
with
open
(
JSON_PATH
,
"
a+
"
)
as
f
:
f
.
write
(
DUMP
(
base_data
))
read
=
None
class
AjouParserJSON
:
"""
Ajou notices Parser using Slack API and Apache Kafka (JSON)
JSON file will be saved in your current directory.
Methods
-------
run(server=Config.VM_SERVER, json_name=
"
already_read.json
"
)
Usage
-----
ajou = AjouParserJSON(Kafka_server_ip, json_path)
ajou.run()
"""
# HTML
ADDRESS
=
"
https://www.ajou.ac.kr/kr/ajou/notice.do
"
LENGTH
=
10
# JSON
BASE_DIR
=
os
.
path
.
dirname
(
os
.
path
.
abspath
(
__file__
))
MAXIMUM_DAY
=
7
# remove notices in json that were posted more than 7days ago
global
DUMP
,
LOAD
__slots__
=
(
"
settings
"
,
"
producer
"
,
"
read
"
)
def
__init__
(
self
,
server
=
Config
.
VM_SERVER
,
json_name
=
"
already_read.json
"
):
print
(
"
Initializing...
"
)
self
.
JSON_PATH
=
os
.
path
.
join
(
self
.
BASE_DIR
,
json_name
)
self
.
settings
=
{
# Producer settings
"
bootstrap.servers
"
:
server
,
"
enable.idempotence
"
:
True
,
# Safe
"
acks
"
:
"
all
"
,
# Safe
"
retries
"
:
5
,
# Safe
"
max.in.flight
"
:
5
,
# High throughput
"
compression.type
"
:
"
lz4
"
,
# High throughput
"
linger.ms
"
:
5
,
# High throughput
}
self
.
producer
=
Producer
(
self
.
settings
)
# json read
with
open
(
JSON_PATH
,
"
r+
"
)
as
f_read
:
read
=
LOAD
(
f_read
)
# 공지 Parser
if
not
Path
(
self
.
JSON_PATH
).
is_file
():
# 파일 없으면 기본 형식 만듬
base_data
=
{
"
POSTS
"
:
{},
"
LAST_PARSED
"
:
"
1972-12-01 07:00:00.000000
"
}
read
=
checkOldness
(
read
)
with
open
(
self
.
JSON_PATH
,
"
a+
"
)
as
f
:
f
.
write
(
DUMP
(
base_data
))
# Kafka Producer 만들기
settings
=
{
"
bootstrap.servers
"
:
Config
.
MY_SERVER
,
# Safe Producer settings
"
enable.idempotence
"
:
True
,
"
acks
"
:
"
all
"
,
"
retries
"
:
10000000
,
"
max.in.flight
"
:
5
,
"
compression.type
"
:
"
lz4
"
,
"
linger.ms
"
:
5
,
}
# "enable.idempotence": True, "retries": 5
p
=
Producer
(
settings
)
self
.
read
=
None
try
:
while
True
:
# 1시간마다 무한 반복
PRODUCED
=
0
LAST_PARSED
=
datetime
.
datetime
.
strptime
(
read
[
"
LAST_PARSED
"
],
"
%Y-%m-%d %H:%M:%S.%f
"
)
# json read
with
open
(
self
.
JSON_PATH
,
"
r+
"
)
as
f_read
:
self
.
read
=
LOAD
(
f_read
)
now
=
datetime
.
datetime
.
now
()
diff
=
(
now
-
LAST_PARSED
).
seconds
print
(
"
Last parsing:
"
,
LAST_PARSED
)
if
diff
/
3600
<
1
:
# 업데이트 후 1시간이 안 지났음, 대기
print
(
f
"
Wait for
{
3600
-
diff
}
seconds to sync new posts.
"
)
time
.
sleep
(
3600
-
diff
)
read
[
"
LAST_PARSED
"
]
=
now
.
strftime
(
"
%Y-%m-%d %H:%M:%S.%f
"
)
print
(
"
Trying to parse new posts...
"
)
ids
,
posts
,
dates
,
writers
=
parser
()
# 다시 파싱
for
i
in
range
(
LENGTH
):
postId
=
ids
[
i
].
text
.
strip
()
postLink
=
posts
[
i
].
get
(
"
href
"
)
postTitle
=
posts
[
i
].
text
.
strip
()
# postTitle = posts[i].get("title")
postDate
=
dates
[
i
].
text
.
strip
()
postWriter
=
writers
[
i
].
text
data
=
makeJson
(
postId
,
postTitle
,
postDate
,
postLink
,
postWriter
)
# {'10000': {'TITLE': '설문조사', 'DATE': '20.12.04', 'LINK': 'https', 'WRITER': '입학처'}}
if
postId
not
in
read
[
"
POSTS
"
]:
print
(
"
Sending a new post...:
"
,
postId
)
read
[
"
POSTS
"
].
update
(
data
)
PRODUCED
+=
1
p
.
produce
(
Config
.
AJOU_TOPIC_ID
,
value
=
DUMP
(
data
[
postId
]),
callback
=
acked
,
)
p
.
poll
(
1
)
# 데이터 Kafka에게 전송
else
:
continue
if
PRODUCED
:
print
(
f
"
Sent
{
PRODUCED
}
post(s)...
"
)
else
:
print
(
"
No new posts yet...
"
)
print
(
"
Parsed:
"
,
datetime
.
datetime
.
now
())
self
.
read
=
self
.
checkOldness
(
self
.
read
)
with
open
(
JSON_PATH
,
"
w+
"
)
as
f
:
f
.
write
(
DUMP
(
read
))
with
open
(
JSON_PATH
,
"
r+
"
)
as
f
:
read
=
LOAD
(
f
)
def
run
(
self
,
period
=
3600
):
# period (second)
"""
Check notices from html per period and sends data to Kafka Consumer.
"""
p
=
self
.
producer
read
=
self
.
read
print
(
"
Resting 1 hour...
"
)
try
:
while
True
:
# 1시간마다 무한 반복
print
()
# Section
PRODUCED
=
0
# How many messages did it send?
time
.
sleep
(
3600
)
LAST_PARSED
=
datetime
.
datetime
.
strptime
(
read
[
"
LAST_PARSED
"
],
"
%Y-%m-%d %H:%M:%S.%f
"
)
now
=
self
.
getTimeNow
()
diff
=
(
now
-
LAST_PARSED
).
seconds
print
(
"
Last parsed at
"
,
LAST_PARSED
)
if
(
diff
/
period
)
<
1
:
# 업데이트 후 period시간이 안 지났음, 대기
print
(
f
"
Wait for
{
period
-
diff
}
seconds to sync new posts.
"
)
time
.
sleep
(
period
-
diff
)
print
(
"
Trying to parse new posts...
"
)
ids
,
posts
,
dates
,
writers
=
self
.
parser
()
# 다시 파싱
assert
ids
is
not
None
,
f
"
Check your parser:
{
ids
}
.
"
for
i
in
range
(
self
.
LENGTH
):
postId
=
ids
[
i
].
text
.
strip
()
if
postId
in
read
[
"
POSTS
"
]:
continue
postLink
=
self
.
ADDRESS
+
posts
[
i
].
get
(
"
href
"
)
postTitle
=
posts
[
i
].
text
.
strip
()
postDate
=
dates
[
i
].
text
.
strip
()
postWriter
=
writers
[
i
].
text
# Removing a name duplication
duplicate
=
"
[
"
+
postWriter
+
"
]
"
if
duplicate
in
postTitle
:
# writer: [writer] title
postTitle
=
postTitle
.
replace
(
duplicate
,
""
).
strip
()
# -> writer: title
kafkaData
=
self
.
makeData
(
postId
,
postTitle
,
postDate
,
postLink
,
postWriter
)
print
(
"
\n
>>> Sending a new post...:
"
,
postId
)
PRODUCED
+=
1
p
.
produce
(
Config
.
AJOU_TOPIC_ID
,
value
=
DUMP
(
kafkaData
[
postId
]),
callback
=
self
.
acked
,
)
p
.
poll
(
1
)
# 데이터 Kafka에게 전송, second
read
[
"
LAST_PARSED
"
]
=
now
.
strftime
(
"
%Y-%m-%d %H:%M:%S.%f
"
)
read
[
"
POSTS
"
].
update
(
kafkaData
)
if
PRODUCED
:
print
(
f
"
Sent
{
PRODUCED
}
post(s)...
"
)
else
:
print
(
"
\t
** No new posts yet
"
)
print
(
"
Parsed at
"
,
now
)
with
open
(
self
.
JSON_PATH
,
"
w+
"
)
as
f
:
f
.
write
(
DUMP
(
read
))
with
open
(
self
.
JSON_PATH
,
"
r+
"
)
as
f
:
read
=
LOAD
(
f
)
print
(
f
"
Resting
{
period
//
3600
}
hour...
"
)
time
.
sleep
(
period
)
except
Exception
as
e
:
# General exceptions
print
(
dir
(
e
))
except
KeyboardInterrupt
:
print
(
"
Pressed CTRL+C...
"
)
finally
:
print
(
"
\n
Exiting...
"
)
p
.
flush
(
100
)
# Producer callback function
@staticmethod
def
acked
(
err
,
msg
):
if
err
is
not
None
:
print
(
"
\t
** Failed to deliver message: {0}: {1}
"
.
format
(
msg
.
value
(),
err
.
str
()
)
)
else
:
print
(
"
Message produced correctly...
"
)
@staticmethod
def
makeData
(
postId
,
postTitle
,
postDate
,
postLink
,
postWriter
):
return
{
postId
:
{
"
TITLE
"
:
postTitle
,
"
DATE
"
:
postDate
,
"
LINK
"
:
postLink
,
"
WRITER
"
:
postWriter
,
}
}
except
Exception
as
e
:
print
(
type
(
e
))
print
(
dir
(
e
))
def
checkOldness
(
self
,
jsonFile
):
today
=
datetime
.
datetime
.
today
()
today
=
datetime
.
datetime
(
today
.
year
,
today
.
month
,
today
.
day
)
for
post
in
list
(
jsonFile
[
"
POSTS
"
]):
currentDate
=
jsonFile
[
"
POSTS
"
][
post
][
"
DATE
"
]
# string
savedDate
=
datetime
.
datetime
.
strptime
(
currentDate
,
"
%y.%m.%d
"
)
if
(
today
-
savedDate
).
days
>
self
.
MAXIMUM_DAY
:
del
jsonFile
[
"
POSTS
"
][
post
]
with
open
(
self
.
JSON_PATH
,
"
w+
"
)
as
f
:
f
.
write
(
json
.
dumps
(
jsonFile
))
with
open
(
self
.
JSON_PATH
,
"
r+
"
)
as
f
:
read
=
json
.
load
(
f
)
return
read
@staticmethod
def
getTimeNow
()
->
datetime
.
datetime
:
return
datetime
.
datetime
.
now
()
# Ajou notices parser
def
parser
(
self
):
try
:
req
=
requests
.
get
(
f
"
{
self
.
ADDRESS
}
?mode=list&&articleLimit=10&article.offset=0
"
)
req
.
raise_for_status
()
except
requests
.
exceptions
.
ConnectionError
:
print
(
"
Seems like the server is down now.
"
)
return
None
,
None
,
None
,
None
req
.
encoding
=
"
utf-8
"
html
=
req
.
text
soup
=
BeautifulSoup
(
html
,
"
html.parser
"
)
ids
=
soup
.
select
(
"
table > tbody > tr > td.b-num-box
"
)
posts
=
soup
.
select
(
"
table > tbody > tr > td.b-td-left > div > a
"
)
dates
=
soup
.
select
(
"
table > tbody > tr > td.b-td-left > div > div > span.b-date
"
)
writers
=
soup
.
select
(
"
table > tbody > tr > td.b-td-left > div > div.b-m-con > span.b-writer
"
)
return
ids
,
posts
,
dates
,
writers
except
KeyboardInterrupt
:
print
(
"
Pressed CTRL+C...
"
)
finally
:
print
(
"
Exiting...
"
)
p
.
flush
(
100
)
if
__name__
==
"
__main__
"
:
ajou
=
AjouParserJSON
(
)
ajou
.
run
(
)
This diff is collapsed.
Click to expand it.
python/src/AjouSlackProducerMySQL.py
+
3
−
3
View file @
3d33549b
...
...
@@ -12,11 +12,11 @@ from confluent_kafka import Producer
class
AjouParser
:
"""
Ajou notices Parser using Slack API and Apache Kafka
Ajou notices Parser using Slack API and Apache Kafka
(MySQL)
Methods
-------
run(server=Config.VM_SERVER,
channel=
"
C01G2CR5MEE
"
,
database=
"
ajou_notices
"
)
run(server=Config.VM_SERVER, database=
"
ajou_notices
"
)
Usage
-----
...
...
@@ -55,7 +55,7 @@ class AjouParser:
"
bootstrap.servers
"
:
server
,
"
enable.idempotence
"
:
True
,
# Safe
"
acks
"
:
"
all
"
,
# Safe
"
retries
"
:
10000000
,
# Safe
"
retries
"
:
5
,
# Safe
"
max.in.flight
"
:
5
,
# High throughput
"
compression.type
"
:
"
lz4
"
,
# High throughput
"
linger.ms
"
:
5
,
# High throughput
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment