Skip to content
Snippets Groups Projects
Commit 8ef04d59 authored by Seok Won's avatar Seok Won
Browse files

Create a Faust basic example

pip install faust
parent ab8724c7
No related branches found
No related tags found
No related merge requests found
......@@ -52,6 +52,8 @@ BASE_DIR = os.path.dirname(os.path.abspath(__file__))
JSON_PATH = os.path.join(BASE_DIR, "already_read.json")
LENGTH = 10
PRODUCED = 0
DUMP = lambda x: json.dumps(x)
LOAD = lambda x: json.load(x)
# Bot User OAuth Access Token
# used scopes: channels:history, channels:read, chat:write, im:history, mpim:history, users:read
......@@ -63,11 +65,11 @@ if not Path(JSON_PATH).is_file(): # 파일 없으면 기본 형식 만듬
base_data = {"POSTS": {}, "LAST_PARSED": "1972-12-01 07:00:00.000000"}
with open(JSON_PATH, "a+") as f:
f.write(json.dumps(base_data))
f.write(DUMP(base_data))
# json read
with open(JSON_PATH, "r+") as f_read:
read = json.load(f_read)
read = LOAD(f_read)
# Set last parsed time to rest 1 hour well
LAST_PARSED = datetime.datetime.strptime(read["LAST_PARSED"], "%Y-%m-%d %H:%M:%S.%f")
......@@ -119,9 +121,7 @@ try:
PRODUCED += 1
p.produce(
Config.AJOU_TOPIC_ID,
value=json.dumps(data[postId]),
callback=acked,
Config.AJOU_TOPIC_ID, value=DUMP(data[postId]), callback=acked,
)
p.poll(0.5) # 데이터 Kafka에게 전송
else:
......@@ -136,9 +136,9 @@ try:
print("\t** FAILED: %s" % e.response["error"])
with open(JSON_PATH, "w+") as f:
f.write(json.dumps(read))
f.write(DUMP(read))
with open(JSON_PATH, "r+") as f:
read = json.load(f)
read = LOAD(f)
print("Resting 1 hour...")
......
......@@ -27,6 +27,7 @@ c = Consumer(settings)
# Topic = "SLACK-KAFKA"
c.subscribe(["SLACK-KAFKA"])
# TODO: Make bolts with Apache Storm
try:
while True:
msg = c.poll(0.1) # read data
......
import faust
app = faust.App("hello-world", broker="kafka://localhost:9092", value_serializer="raw",)
greetings_topic = app.topic("first-topic")
@app.agent(greetings_topic)
async def greet(greetings):
async for greeting in greetings:
print(greeting)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment