From 1decb7e27bb808e6c6efefc4c20638ee22871c68 Mon Sep 17 00:00:00 2001 From: Seok Won <ikr@kakao.com> Date: Wed, 16 Dec 2020 22:23:25 +0900 Subject: [PATCH] Moved to Hyper-V Ubuntu kafka server After spending 4hours, managed to make it to work right. I made, inner hyper-v nat, outer hyper-v nat and outer nat shares with inner nat. Setting advertised.listener to EXTERNAL://ubuntuIP:9093, I could finally get this server working. --- python/src/AjouSlackConsumer.py | 4 ++-- python/src/AjouSlackProducerMySQL.py | 16 +++++++++++----- python/src/config.py | 4 ++++ python/tests/test_mysql.py | 2 +- 4 files changed, 18 insertions(+), 8 deletions(-) diff --git a/python/src/AjouSlackConsumer.py b/python/src/AjouSlackConsumer.py index 4197bc1..a4a3136 100644 --- a/python/src/AjouSlackConsumer.py +++ b/python/src/AjouSlackConsumer.py @@ -16,9 +16,10 @@ sc = WebClient(token) # Set 'auto.offset.reset': 'smallest' if you want to consume all messages # from the beginning of the topic settings = { - "bootstrap.servers": Config.MY_SERVER, + "bootstrap.servers": Config.VM_SERVER, "group.id": "ajou-notify", "default.topic.config": {"auto.offset.reset": "largest"}, + # "debug": "broker, cgrp", } c = Consumer(settings) @@ -77,7 +78,6 @@ try: ) else: print("Error occured: {0}".format(msg.error().str())) - except Exception as e: print(type(e)) print(dir(e)) diff --git a/python/src/AjouSlackProducerMySQL.py b/python/src/AjouSlackProducerMySQL.py index f65b66e..64ecb23 100644 --- a/python/src/AjouSlackProducerMySQL.py +++ b/python/src/AjouSlackProducerMySQL.py @@ -37,7 +37,11 @@ def makeData(postId, postTitle, postDate, postLink, postWriter): # Ajou notices parser def parser(): - req = requests.get(f"{ADDRESS}?mode=list&&articleLimit=10&article.offset=0") + try: + req = requests.get(f"{ADDRESS}?mode=list&&articleLimit=10&article.offset=0") + req.raise_for_status() + except requests.exceptions.ConnectionError: + print("Seems like the server is down now.") req.encoding = "utf-8" html = req.text soup = BeautifulSoup(html, "html.parser") @@ -95,7 +99,7 @@ channel = "C01G2CR5MEE" # 아주대 # Kafka Producer 만들기 "localhost:9092" settings = { - "bootstrap.servers": Config.MY_SERVER, + "bootstrap.servers": Config.VM_SERVER, # Safe Producer settings "enable.idempotence": True, "acks": "all", @@ -124,12 +128,14 @@ try: print(f"Wait for {3600 - diff} seconds to sync new posts.") time.sleep(3600 - diff) + print("Trying to parse new posts...") + ids, posts, dates, writers = parser() # 다시 파싱 + + # 파싱 오류가 없으면 업데이트 cursor.execute( UPDATE_COMMAND, {"date": now.strftime("%Y-%m-%d %H:%M:%S.%f")} ) - print("Trying to parse new posts...") - ids, posts, dates, writers = parser() # 다시 파싱 for i in range(LENGTH): postId = ids[i].text.strip() @@ -168,7 +174,7 @@ try: p.produce( Config.AJOU_TOPIC_ID, value=DUMP(kafkaData[postId]), callback=acked, ) - p.poll(0.5) # 데이터 Kafka에게 전송 + p.poll(1) # 데이터 Kafka에게 전송 if PRODUCED: print(f"Sent {PRODUCED} posts...") diff --git a/python/src/config.py b/python/src/config.py index f4648f5..9218749 100644 --- a/python/src/config.py +++ b/python/src/config.py @@ -1,5 +1,9 @@ +import os + + class Config: MY_SERVER = "localhost:9092" + VM_SERVER = os.environ["VM_SERVER"] TOPIC_ID = "first-topic" SLACK_TOPID_ID = "SLACK-KAFKA" AJOU_TOPIC_ID = "AJOU-NOTIFY" diff --git a/python/tests/test_mysql.py b/python/tests/test_mysql.py index 0e531c2..e8ab64d 100644 --- a/python/tests/test_mysql.py +++ b/python/tests/test_mysql.py @@ -139,7 +139,7 @@ def test_lastparsed(): def test_delete(): with OPEN_DB() as cursor: - cursor.execute("DELETE FROM notices WHERE id = 12245") + cursor.execute("DELETE FROM notices WHERE id = 12263") if __name__ == "__main__": -- GitLab