Skip to content
Snippets Groups Projects
Unverified Commit 92b3544a authored by Seok Won's avatar Seok Won
Browse files

Create CloudKarafka example

Get free kafka cluster here: https://www.cloudkarafka.com/
parent a6d21870
No related branches found
No related tags found
No related merge requests found
......@@ -26,6 +26,7 @@
* [Ajou notices Parsing Consumer with Slack API](https://github.com/Alfex4936/kafka-Studies/blob/main/python/src/AjouSlackConsumer.py)
* [FastAvro Producer example](https://github.com/Alfex4936/kafka-Studies/blob/main/python/src/avro/AjouSlackProducerAvro.py)
* [FastAvro Consumer example](https://github.com/Alfex4936/kafka-Studies/blob/main/python/src/avro/AjouSlackConsumerAvro.py)
* [Cloud Kafka Producer example](https://github.com/Alfex4936/kafka-Studies/blob/main/python/src/cloudkarafka/AjouSlackProducerCloud.py)
* Kafka (Java)
* [Simple Producer](https://github.com/Alfex4936/kafka-Studies/blob/main/src/main/java/csw/kafka/study/lesson1/ProducerDemo.java)
......
import json
import os
import time
from confluent_kafka import Consumer, KafkaError
from slack import WebClient
from slack.errors import SlackApiError
def error_cb(error):
print(">>>", error)
if error == KafkaError._ALL_BROKERS_DOWN:
print("SERVER DOWN")
# Bot User OAuth Access Token
# used scopes: channels:history, channels:read, chat:write, im:history, mpim:history, users:read
token = os.environ["SLACK_BOT_TOKEN"]
sc = WebClient(token)
# Set 'auto.offset.reset': 'smallest' if you want to consume all messages
# from the beginning of the topic
settings = {
"bootstrap.servers": os.environ["CLOUDKARAFKA_BROKERS"],
"group.id": "%s-consumer" % os.environ["CLOUDKARAFKA_USERNAME"],
"default.topic.config": {"auto.offset.reset": "largest"},
"error_cb": error_cb,
"session.timeout.ms": 6000,
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "SCRAM-SHA-256",
"sasl.username": os.environ["CLOUDKARAFKA_USERNAME"],
"sasl.password": os.environ["CLOUDKARAFKA_PASSWORD"]
# "debug": "broker, cgrp",
}
c = Consumer(settings)
# Topic = "AJOU-NOTIFY
c.subscribe([os.environ["CLOUDKARAFKA_TOPIC"]])
try:
while True:
msg = c.poll(1.0)
time.sleep(1)
if msg is None:
continue
elif not msg.error():
print("Received a message: {0}".format(msg.value()))
if msg.value() is None:
print("But the message value is empty.")
continue
try:
app_msg = json.loads(msg.value().decode())
except:
app_msg = json.loads(msg.value())
title = app_msg["TITLE"]
date = app_msg["DATE"]
href = app_msg["LINK"]
writer = app_msg["WRITER"]
channel = "아주대" # C01G2CR5MEE
# TODO: 학사면 좀 더 중요하게?
text = ":star: `%s` 새로운 공지!\n>%s: %s\n>링크: <%s|공지 확인하기>" % (
date,
writer,
title,
href,
)
print('\nSending message "%s" to channel %s' % (text, channel))
try:
sc_response = sc.chat_postMessage(
channel=channel, text=text, as_user=True, username="아주대 공지 봇"
) # as_user은 new slack app에서 작동 안 함
except SlackApiError as e:
assert e.response["ok"] is False
print("\t** FAILED: %s" % e.response["error"])
elif msg.error().code() == KafkaError._PARTITION_EOF:
print(
"End of partition reached {0}/{1}".format(msg.topic(), msg.partition())
)
else:
print("Error occured: {0}".format(msg.error().str()))
except Exception as e:
print(type(e))
print(dir(e))
except KeyboardInterrupt:
print("Pressed CTRL+C...")
finally:
print("Closing...")
c.close()
import datetime
import json
import os
import ssl
import time
from urllib.error import HTTPError
from urllib.request import urlopen
import mysql.connector
from bs4 import BeautifulSoup
from confluent_kafka import Producer
class AjouParser:
"""
Ajou notices Parser using Slack API and Apache Kafka (MySQL)
Methods
-------
run(server=os.environ["CLOUDKARAFKA_BROKER"], database="ajou_notices")
Usage
-----
ajou = AjouParser(Kafka_server_ip, mysql_db_name)
ajou.run()
"""
ADDRESS = "https://www.ajou.ac.kr/kr/ajou/notice.do"
LENGTH = 10
# MySQL commands
INSERT_COMMAND = (
"INSERT INTO notices (id, title, date, link, writer) "
"VALUES (%s, %s, %s, %s, %s)"
)
DUPLICATE_COMMAND = "SELECT EXISTS(SELECT * FROM notices WHERE id = %(id)s)"
UPDATE_COMMAND = "UPDATE notices SET date = %(date)s WHERE id = 1"
__slots__ = ("db", "cursor", "settings", "producer", "topic")
def __init__(
self, server=os.environ["CLOUDKARAFKA_BROKERS"], database="ajou_notices"
):
print("Initializing...")
# MySQL
self.db = mysql.connector.connect(
host="localhost",
user=os.environ["MYSQL_USER"],
password=os.environ["MYSQL_PASSWORD"],
database=database,
charset="utf8",
)
self.cursor = self.db.cursor(buffered=True)
# Kafka
self.topic = os.environ["CLOUDKARAFKA_TOPIC"]
self.settings = { # Producer settings
"bootstrap.servers": server,
"compression.type": "snappy", # High throughput
# From CloudKarafka
"session.timeout.ms": 6000,
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "SCRAM-SHA-256",
"sasl.username": os.environ["CLOUDKARAFKA_USERNAME"],
"sasl.password": os.environ["CLOUDKARAFKA_PASSWORD"],
}
self.producer = Producer(self.settings)
def run(self, period=3600): # period (second)
"""Check notices from html per period and sends data to Kafka Consumer."""
p = self.producer
db = self.db
cursor = self.cursor
try:
while True: # 1시간마다 무한 반복
print() # Section
PRODUCED = 0 # How many messages did it send?
cursor.execute("SELECT date FROM notices WHERE id = 1")
LAST_PARSED = datetime.datetime.strptime(
cursor.fetchone()[0], "%Y-%m-%d %H:%M:%S.%f"
) # db load date where id = 1
now = self.getTimeNow()
diff = (now - LAST_PARSED).seconds
print("Last parsed at", LAST_PARSED)
if (diff / period) < 1: # 업데이트 후 period시간이 안 지났음, 대기
print(f"Wait for {period - diff} seconds to sync new posts.")
time.sleep(period - diff)
print("Trying to parse new posts...")
ids, posts, dates, writers = self.parser() # 다시 파싱
assert ids is not None, f"Check your parser: {ids}."
# 파싱 오류가 없으면 업데이트
cursor.execute(
self.UPDATE_COMMAND, {"date": now.strftime("%Y-%m-%d %H:%M:%S.%f")},
)
for i in range(self.LENGTH):
postId = ids[i].text.strip()
cursor.execute(
self.DUPLICATE_COMMAND, {"id": int(postId)}
) # db duplication check
if cursor.fetchone()[0]: # (1, )
continue # postId exists
postLink = self.ADDRESS + posts[i].get("href")
postTitle = posts[i].text.strip()
postDate = dates[i].text.strip()
postWriter = writers[i].text
duplicate = "[" + postWriter + "]"
if duplicate in postTitle: # writer: [writer] title
postTitle = postTitle.replace(
duplicate, ""
).strip() # -> writer: title
dbData = (
int(postId),
postTitle,
postDate,
postLink,
postWriter,
)
kafkaData = self.makeData(
postId, postTitle, postDate, postLink, postWriter
)
cursor.execute(self.INSERT_COMMAND, dbData) # db insert
print("\n>>> Sending a new post...:", postId)
try:
p.produce(
self.topic,
value=json.dumps(kafkaData[postId]),
callback=self.acked,
)
except BufferError as e:
print(
f"Local producer queue is full. ({len(p)} messages awaiting delivery)"
)
PRODUCED += 1
p.poll(0) # 데이터 Kafka에게 전송, second
if PRODUCED:
print(f"Sent {PRODUCED} post(s)...")
p.flush()
else:
print("\t** No new posts yet")
print("Parsed at", now)
db.commit() # save data
print(f"Resting {period // 3600} hour...")
time.sleep(period)
except Exception as e: # General exceptions
print(e)
print(dir(e))
except KeyboardInterrupt:
print("Pressed CTRL+C...")
finally:
print("\nExiting...")
cursor.close()
db.commit()
db.close()
p.flush(100)
# Producer callback function
@staticmethod
def acked(err, msg):
if err is not None:
print(
"\t** Failed to deliver message: {0}: {1}".format(
msg.value(), err.str()
)
)
else:
print("Message produced correctly...")
@staticmethod
def makeData(postId, postTitle, postDate, postLink, postWriter):
return {
postId: {
"TITLE": postTitle,
"DATE": postDate,
"LINK": postLink,
"WRITER": postWriter,
}
}
@staticmethod
def getTimeNow() -> datetime.datetime:
return datetime.datetime.now()
# Ajou notices parser
def parser(self):
context = ssl._create_unverified_context()
try:
result = urlopen(
f"{self.ADDRESS}?mode=list&&articleLimit={self.LENGTH}&article.offset=0",
context=context,
)
except HTTPError:
print("Seems like the server is down now.")
return None, None, None, None
html = result.read()
soup = BeautifulSoup(html, "html.parser")
ids = soup.select("table > tbody > tr > td.b-num-box")
posts = soup.select("table > tbody > tr > td.b-td-left > div > a")
dates = soup.select(
"table > tbody > tr > td.b-td-left > div > div > span.b-date"
)
writers = soup.select(
"table > tbody > tr > td.b-td-left > div > div.b-m-con > span.b-writer"
)
return ids, posts, dates, writers
if __name__ == "__main__":
ajou = AjouParser()
ajou.run()
# Apache Kafka example for Python
## Getting started
Setup your free Apache Kafka instance here: https://www.cloudkarafka.com
Configuration
* `export CLOUDKARAFKA_BROKERS="host1:9094,host2:9094,host3:9094"`
Hostnames can be found in the Details view in for your CloudKarafka instance.
* `export CLOUDKARAFKA_USERNAME="username"`
Username can be found in the Details view in for your CloudKarafka instance.
* `export CLOUDKARAFKA_PASSWORD="password"`
Password can be found in the Details view in for your CloudKarafka instance.
* `export CLOUDKARAFKA_TOPIC="username-topic"`
Topic should be the same as your username followed by a dash before the topic.
These export commands must be run in both of the terminal windows below.
```
git clone https://github.com/CloudKarafka/python-kafka-example.git
cd python-kafka-example`
pip install confluent_kafka
python consumer.py
```
Open another terminal window and `cd` into same directory and run `python producer.py`.
Send your messages by pressing your system's EOF key sequence. (ctrl-d in bash)
## Adding a Root CA
In some cases the CloudKarafka Root CA may need to be manually added to the example, particularly if you are seeing the error:
```
Failed to verify broker certificate: unable to get local issuer certificate
```
returned when you run the example. If this is the case you will need to download the [CloudKarakfa Root CA](https://www.cloudkarafka.com/certs/cloudkarafka.ca) (See also the [FAQ](https://www.cloudkarafka.com/docs/faq.html)) and place it in the python-kafka-example directory, then add the following line into the `conf {...}` section:
```
'ssl.ca.location': 'cloudkarafka.ca'
```
This should resolve the error and allow for successful connection to the server.
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment