diff --git a/README.md b/README.md index e0e60200fd3a3a77cb0952b2be77491db379bf71..7ea2ec8fe65151e01dcf57cb1b139118115af659 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,7 @@ * [Ajou notices Parsing Consumer with Slack API](https://github.com/Alfex4936/kafka-Studies/blob/main/python/src/AjouSlackConsumer.py) * [FastAvro Producer example](https://github.com/Alfex4936/kafka-Studies/blob/main/python/src/avro/AjouSlackProducerAvro.py) * [FastAvro Consumer example](https://github.com/Alfex4936/kafka-Studies/blob/main/python/src/avro/AjouSlackConsumerAvro.py) + * [Cloud Kafka Producer example](https://github.com/Alfex4936/kafka-Studies/blob/main/python/src/cloudkarafka/AjouSlackProducerCloud.py) * Kafka (Java) * [Simple Producer](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson1/ProducerDemo.java) diff --git a/python/src/cloudkarafka/AjouSlackConsumerCloud.py b/python/src/cloudkarafka/AjouSlackConsumerCloud.py new file mode 100644 index 0000000000000000000000000000000000000000..e22892b3da533adb695db815d84b21e1bd7959ab --- /dev/null +++ b/python/src/cloudkarafka/AjouSlackConsumerCloud.py @@ -0,0 +1,96 @@ +import json +import os +import time + +from confluent_kafka import Consumer, KafkaError +from slack import WebClient +from slack.errors import SlackApiError + + +def error_cb(error): + print(">>>", error) + if error == KafkaError._ALL_BROKERS_DOWN: + print("SERVER DOWN") + + +# Bot User OAuth Access Token +# used scopes: channels:history, channels:read, chat:write, im:history, mpim:history, users:read +token = os.environ["SLACK_BOT_TOKEN"] + +sc = WebClient(token) + +# Set 'auto.offset.reset': 'smallest' if you want to consume all messages +# from the beginning of the topic +settings = { + "bootstrap.servers": os.environ["CLOUDKARAFKA_BROKERS"], + "group.id": "%s-consumer" % os.environ["CLOUDKARAFKA_USERNAME"], + "default.topic.config": {"auto.offset.reset": "largest"}, + "error_cb": error_cb, + "session.timeout.ms": 6000, + "security.protocol": "SASL_SSL", + "sasl.mechanisms": "SCRAM-SHA-256", + "sasl.username": os.environ["CLOUDKARAFKA_USERNAME"], + "sasl.password": os.environ["CLOUDKARAFKA_PASSWORD"] + # "debug": "broker, cgrp", +} +c = Consumer(settings) + +# Topic = "AJOU-NOTIFY +c.subscribe([os.environ["CLOUDKARAFKA_TOPIC"]]) + +try: + while True: + msg = c.poll(1.0) + time.sleep(1) + if msg is None: + continue + elif not msg.error(): + print("Received a message: {0}".format(msg.value())) + if msg.value() is None: + print("But the message value is empty.") + continue + + try: + app_msg = json.loads(msg.value().decode()) + except: + app_msg = json.loads(msg.value()) + + title = app_msg["TITLE"] + date = app_msg["DATE"] + href = app_msg["LINK"] + writer = app_msg["WRITER"] + + channel = "아주대" # C01G2CR5MEE + # TODO: 학사면 좀 더 중요하게? + text = ":star: `%s` 새로운 공지!\n>%s: %s\n>링크: <%s|공지 확인하기>" % ( + date, + writer, + title, + href, + ) + print('\nSending message "%s" to channel %s' % (text, channel)) + + try: + sc_response = sc.chat_postMessage( + channel=channel, text=text, as_user=True, username="아주대 공지 봇" + ) # as_user은 new slack app에서 작동 안 함 + + except SlackApiError as e: + assert e.response["ok"] is False + print("\t** FAILED: %s" % e.response["error"]) + elif msg.error().code() == KafkaError._PARTITION_EOF: + print( + "End of partition reached {0}/{1}".format(msg.topic(), msg.partition()) + ) + else: + print("Error occured: {0}".format(msg.error().str())) +except Exception as e: + print(type(e)) + print(dir(e)) + +except KeyboardInterrupt: + print("Pressed CTRL+C...") + +finally: + print("Closing...") + c.close() diff --git a/python/src/cloudkarafka/AjouSlackProducerCloud.py b/python/src/cloudkarafka/AjouSlackProducerCloud.py new file mode 100644 index 0000000000000000000000000000000000000000..e64d8e2b69c5ccbccda7759a4bf7352dc59a05a9 --- /dev/null +++ b/python/src/cloudkarafka/AjouSlackProducerCloud.py @@ -0,0 +1,228 @@ +import datetime +import json +import os +import ssl +import time +from urllib.error import HTTPError +from urllib.request import urlopen + +import mysql.connector +from bs4 import BeautifulSoup +from confluent_kafka import Producer + + +class AjouParser: + """ + Ajou notices Parser using Slack API and Apache Kafka (MySQL) + + Methods + ------- + run(server=os.environ["CLOUDKARAFKA_BROKER"], database="ajou_notices") + + Usage + ----- + ajou = AjouParser(Kafka_server_ip, mysql_db_name) + ajou.run() + """ + + ADDRESS = "https://www.ajou.ac.kr/kr/ajou/notice.do" + LENGTH = 10 + + # MySQL commands + INSERT_COMMAND = ( + "INSERT INTO notices (id, title, date, link, writer) " + "VALUES (%s, %s, %s, %s, %s)" + ) + DUPLICATE_COMMAND = "SELECT EXISTS(SELECT * FROM notices WHERE id = %(id)s)" + UPDATE_COMMAND = "UPDATE notices SET date = %(date)s WHERE id = 1" + + __slots__ = ("db", "cursor", "settings", "producer", "topic") + + def __init__( + self, server=os.environ["CLOUDKARAFKA_BROKERS"], database="ajou_notices" + ): + print("Initializing...") + + # MySQL + self.db = mysql.connector.connect( + host="localhost", + user=os.environ["MYSQL_USER"], + password=os.environ["MYSQL_PASSWORD"], + database=database, + charset="utf8", + ) + self.cursor = self.db.cursor(buffered=True) + + # Kafka + self.topic = os.environ["CLOUDKARAFKA_TOPIC"] + + self.settings = { # Producer settings + "bootstrap.servers": server, + "compression.type": "snappy", # High throughput + # From CloudKarafka + "session.timeout.ms": 6000, + "security.protocol": "SASL_SSL", + "sasl.mechanisms": "SCRAM-SHA-256", + "sasl.username": os.environ["CLOUDKARAFKA_USERNAME"], + "sasl.password": os.environ["CLOUDKARAFKA_PASSWORD"], + } + self.producer = Producer(self.settings) + + def run(self, period=3600): # period (second) + """Check notices from html per period and sends data to Kafka Consumer.""" + p = self.producer + db = self.db + cursor = self.cursor + + try: + while True: # 1시간마다 무한 반복 + print() # Section + PRODUCED = 0 # How many messages did it send? + + cursor.execute("SELECT date FROM notices WHERE id = 1") + LAST_PARSED = datetime.datetime.strptime( + cursor.fetchone()[0], "%Y-%m-%d %H:%M:%S.%f" + ) # db load date where id = 1 + + now = self.getTimeNow() + diff = (now - LAST_PARSED).seconds + + print("Last parsed at", LAST_PARSED) + if (diff / period) < 1: # 업데이트 후 period시간이 안 지났음, 대기 + print(f"Wait for {period - diff} seconds to sync new posts.") + time.sleep(period - diff) + + print("Trying to parse new posts...") + ids, posts, dates, writers = self.parser() # 다시 파싱 + assert ids is not None, f"Check your parser: {ids}." + + # 파싱 오류가 없으면 업데이트 + cursor.execute( + self.UPDATE_COMMAND, {"date": now.strftime("%Y-%m-%d %H:%M:%S.%f")}, + ) + + for i in range(self.LENGTH): + postId = ids[i].text.strip() + + cursor.execute( + self.DUPLICATE_COMMAND, {"id": int(postId)} + ) # db duplication check + if cursor.fetchone()[0]: # (1, ) + continue # postId exists + + postLink = self.ADDRESS + posts[i].get("href") + postTitle = posts[i].text.strip() + postDate = dates[i].text.strip() + postWriter = writers[i].text + + duplicate = "[" + postWriter + "]" + if duplicate in postTitle: # writer: [writer] title + postTitle = postTitle.replace( + duplicate, "" + ).strip() # -> writer: title + + dbData = ( + int(postId), + postTitle, + postDate, + postLink, + postWriter, + ) + kafkaData = self.makeData( + postId, postTitle, postDate, postLink, postWriter + ) + + cursor.execute(self.INSERT_COMMAND, dbData) # db insert + + print("\n>>> Sending a new post...:", postId) + + try: + p.produce( + self.topic, + value=json.dumps(kafkaData[postId]), + callback=self.acked, + ) + except BufferError as e: + print( + f"Local producer queue is full. ({len(p)} messages awaiting delivery)" + ) + PRODUCED += 1 + p.poll(0) # 데이터 Kafka에게 전송, second + + if PRODUCED: + print(f"Sent {PRODUCED} post(s)...") + p.flush() + else: + print("\t** No new posts yet") + print("Parsed at", now) + + db.commit() # save data + print(f"Resting {period // 3600} hour...") + time.sleep(period) + + except Exception as e: # General exceptions + print(e) + print(dir(e)) + except KeyboardInterrupt: + print("Pressed CTRL+C...") + finally: + print("\nExiting...") + cursor.close() + db.commit() + db.close() + p.flush(100) + + # Producer callback function + @staticmethod + def acked(err, msg): + if err is not None: + print( + "\t** Failed to deliver message: {0}: {1}".format( + msg.value(), err.str() + ) + ) + else: + print("Message produced correctly...") + + @staticmethod + def makeData(postId, postTitle, postDate, postLink, postWriter): + return { + postId: { + "TITLE": postTitle, + "DATE": postDate, + "LINK": postLink, + "WRITER": postWriter, + } + } + + @staticmethod + def getTimeNow() -> datetime.datetime: + return datetime.datetime.now() + + # Ajou notices parser + def parser(self): + context = ssl._create_unverified_context() + try: + result = urlopen( + f"{self.ADDRESS}?mode=list&&articleLimit={self.LENGTH}&article.offset=0", + context=context, + ) + except HTTPError: + print("Seems like the server is down now.") + return None, None, None, None + html = result.read() + soup = BeautifulSoup(html, "html.parser") + ids = soup.select("table > tbody > tr > td.b-num-box") + posts = soup.select("table > tbody > tr > td.b-td-left > div > a") + dates = soup.select( + "table > tbody > tr > td.b-td-left > div > div > span.b-date" + ) + writers = soup.select( + "table > tbody > tr > td.b-td-left > div > div.b-m-con > span.b-writer" + ) + return ids, posts, dates, writers + + +if __name__ == "__main__": + ajou = AjouParser() + ajou.run() diff --git a/python/src/cloudkarafka/README.md b/python/src/cloudkarafka/README.md new file mode 100644 index 0000000000000000000000000000000000000000..e2ff73a8b3f8d229409438c6ab4f97d9e9af1582 --- /dev/null +++ b/python/src/cloudkarafka/README.md @@ -0,0 +1,41 @@ +# Apache Kafka example for Python + + +## Getting started + +Setup your free Apache Kafka instance here: https://www.cloudkarafka.com + +Configuration + +* `export CLOUDKARAFKA_BROKERS="host1:9094,host2:9094,host3:9094"` + Hostnames can be found in the Details view in for your CloudKarafka instance. +* `export CLOUDKARAFKA_USERNAME="username"` + Username can be found in the Details view in for your CloudKarafka instance. +* `export CLOUDKARAFKA_PASSWORD="password"` + Password can be found in the Details view in for your CloudKarafka instance. +* `export CLOUDKARAFKA_TOPIC="username-topic"` + Topic should be the same as your username followed by a dash before the topic. + +These export commands must be run in both of the terminal windows below. + +``` +git clone https://github.com/CloudKarafka/python-kafka-example.git +cd python-kafka-example` +pip install confluent_kafka +python consumer.py +``` + +Open another terminal window and `cd` into same directory and run `python producer.py`. +Send your messages by pressing your system's EOF key sequence. (ctrl-d in bash) + +## Adding a Root CA + +In some cases the CloudKarafka Root CA may need to be manually added to the example, particularly if you are seeing the error: +``` +Failed to verify broker certificate: unable to get local issuer certificate +``` +returned when you run the example. If this is the case you will need to download the [CloudKarakfa Root CA](https://www.cloudkarafka.com/certs/cloudkarafka.ca) (See also the [FAQ](https://www.cloudkarafka.com/docs/faq.html)) and place it in the python-kafka-example directory, then add the following line into the `conf {...}` section: +``` +'ssl.ca.location': 'cloudkarafka.ca' +``` +This should resolve the error and allow for successful connection to the server. \ No newline at end of file