Skip to content
Snippets Groups Projects
Unverified Commit 1decb7e2 authored by Seok Won's avatar Seok Won
Browse files

Moved to Hyper-V Ubuntu kafka server

After spending 4hours, managed to make it to work right.

I made, inner hyper-v nat, outer hyper-v nat

and outer nat shares with inner nat.

Setting advertised.listener to EXTERNAL://ubuntuIP:9093, I could finally get this server working.
parent eb47196f
Branches
No related tags found
No related merge requests found
......@@ -16,9 +16,10 @@ sc = WebClient(token)
# Set 'auto.offset.reset': 'smallest' if you want to consume all messages
# from the beginning of the topic
settings = {
"bootstrap.servers": Config.MY_SERVER,
"bootstrap.servers": Config.VM_SERVER,
"group.id": "ajou-notify",
"default.topic.config": {"auto.offset.reset": "largest"},
# "debug": "broker, cgrp",
}
c = Consumer(settings)
......@@ -77,7 +78,6 @@ try:
)
else:
print("Error occured: {0}".format(msg.error().str()))
except Exception as e:
print(type(e))
print(dir(e))
......
......@@ -37,7 +37,11 @@ def makeData(postId, postTitle, postDate, postLink, postWriter):
# Ajou notices parser
def parser():
req = requests.get(f"{ADDRESS}?mode=list&&articleLimit=10&article.offset=0")
try:
req = requests.get(f"{ADDRESS}?mode=list&&articleLimit=10&article.offset=0")
req.raise_for_status()
except requests.exceptions.ConnectionError:
print("Seems like the server is down now.")
req.encoding = "utf-8"
html = req.text
soup = BeautifulSoup(html, "html.parser")
......@@ -95,7 +99,7 @@ channel = "C01G2CR5MEE" # 아주대
# Kafka Producer 만들기 "localhost:9092"
settings = {
"bootstrap.servers": Config.MY_SERVER,
"bootstrap.servers": Config.VM_SERVER,
# Safe Producer settings
"enable.idempotence": True,
"acks": "all",
......@@ -124,12 +128,14 @@ try:
print(f"Wait for {3600 - diff} seconds to sync new posts.")
time.sleep(3600 - diff)
print("Trying to parse new posts...")
ids, posts, dates, writers = parser() # 다시 파싱
# 파싱 오류가 없으면 업데이트
cursor.execute(
UPDATE_COMMAND, {"date": now.strftime("%Y-%m-%d %H:%M:%S.%f")}
)
print("Trying to parse new posts...")
ids, posts, dates, writers = parser() # 다시 파싱
for i in range(LENGTH):
postId = ids[i].text.strip()
......@@ -168,7 +174,7 @@ try:
p.produce(
Config.AJOU_TOPIC_ID, value=DUMP(kafkaData[postId]), callback=acked,
)
p.poll(0.5) # 데이터 Kafka에게 전송
p.poll(1) # 데이터 Kafka에게 전송
if PRODUCED:
print(f"Sent {PRODUCED} posts...")
......
import os
class Config:
MY_SERVER = "localhost:9092"
VM_SERVER = os.environ["VM_SERVER"]
TOPIC_ID = "first-topic"
SLACK_TOPID_ID = "SLACK-KAFKA"
AJOU_TOPIC_ID = "AJOU-NOTIFY"
......
......@@ -139,7 +139,7 @@ def test_lastparsed():
def test_delete():
with OPEN_DB() as cursor:
cursor.execute("DELETE FROM notices WHERE id = 12245")
cursor.execute("DELETE FROM notices WHERE id = 12263")
if __name__ == "__main__":
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment