diff --git a/python/src/AjouSlackConsumer.py b/python/src/AjouSlackConsumer.py index 4197bc13ca7b6a909d15da48985302299d3df877..a4a31369068a63a061f1505223876ceabf4b8017 100644 --- a/python/src/AjouSlackConsumer.py +++ b/python/src/AjouSlackConsumer.py @@ -16,9 +16,10 @@ sc = WebClient(token) # Set 'auto.offset.reset': 'smallest' if you want to consume all messages # from the beginning of the topic settings = { - "bootstrap.servers": Config.MY_SERVER, + "bootstrap.servers": Config.VM_SERVER, "group.id": "ajou-notify", "default.topic.config": {"auto.offset.reset": "largest"}, + # "debug": "broker, cgrp", } c = Consumer(settings) @@ -77,7 +78,6 @@ try: ) else: print("Error occured: {0}".format(msg.error().str())) - except Exception as e: print(type(e)) print(dir(e)) diff --git a/python/src/AjouSlackProducerMySQL.py b/python/src/AjouSlackProducerMySQL.py index f65b66e129c8523f173afd60c9245b6feae92c12..64ecb23ef745285c793c2ca5d62a24f7be48b645 100644 --- a/python/src/AjouSlackProducerMySQL.py +++ b/python/src/AjouSlackProducerMySQL.py @@ -37,7 +37,11 @@ def makeData(postId, postTitle, postDate, postLink, postWriter): # Ajou notices parser def parser(): - req = requests.get(f"{ADDRESS}?mode=list&&articleLimit=10&article.offset=0") + try: + req = requests.get(f"{ADDRESS}?mode=list&&articleLimit=10&article.offset=0") + req.raise_for_status() + except requests.exceptions.ConnectionError: + print("Seems like the server is down now.") req.encoding = "utf-8" html = req.text soup = BeautifulSoup(html, "html.parser") @@ -95,7 +99,7 @@ channel = "C01G2CR5MEE" # 아주대 # Kafka Producer 만들기 "localhost:9092" settings = { - "bootstrap.servers": Config.MY_SERVER, + "bootstrap.servers": Config.VM_SERVER, # Safe Producer settings "enable.idempotence": True, "acks": "all", @@ -124,12 +128,14 @@ try: print(f"Wait for {3600 - diff} seconds to sync new posts.") time.sleep(3600 - diff) + print("Trying to parse new posts...") + ids, posts, dates, writers = parser() # 다시 파싱 + + # 파싱 오류가 없으면 업데이트 cursor.execute( UPDATE_COMMAND, {"date": now.strftime("%Y-%m-%d %H:%M:%S.%f")} ) - print("Trying to parse new posts...") - ids, posts, dates, writers = parser() # 다시 파싱 for i in range(LENGTH): postId = ids[i].text.strip() @@ -168,7 +174,7 @@ try: p.produce( Config.AJOU_TOPIC_ID, value=DUMP(kafkaData[postId]), callback=acked, ) - p.poll(0.5) # 데이터 Kafka에게 전송 + p.poll(1) # 데이터 Kafka에게 전송 if PRODUCED: print(f"Sent {PRODUCED} posts...") diff --git a/python/src/config.py b/python/src/config.py index f4648f55cf1142efee3ddd4efb2d6d3703908ade..921874957c189e42ba4f58c4406fe4bc80842069 100644 --- a/python/src/config.py +++ b/python/src/config.py @@ -1,5 +1,9 @@ +import os + + class Config: MY_SERVER = "localhost:9092" + VM_SERVER = os.environ["VM_SERVER"] TOPIC_ID = "first-topic" SLACK_TOPID_ID = "SLACK-KAFKA" AJOU_TOPIC_ID = "AJOU-NOTIFY" diff --git a/python/tests/test_mysql.py b/python/tests/test_mysql.py index 0e531c2c7e81f7e80c22246ece068c49e0af340e..e8ab64d34a5614cd7caff1b93c949e5e3b88e938 100644 --- a/python/tests/test_mysql.py +++ b/python/tests/test_mysql.py @@ -139,7 +139,7 @@ def test_lastparsed(): def test_delete(): with OPEN_DB() as cursor: - cursor.execute("DELETE FROM notices WHERE id = 12245") + cursor.execute("DELETE FROM notices WHERE id = 12263") if __name__ == "__main__":