Select Git revision
push_to_db.py
-
Eunhak Lee authoredEunhak Lee authored
push_to_db.py 6.87 KiB
import dotenv
dotenv.load_dotenv(dotenv_path=".env", override=False)
import os
import re
import sys
import time
import redis
import psycopg2
import traceback
import parse
import utils
from datetime import datetime, timedelta
def check_similarity(_conn, name, part_type):
_cursor = _conn.cursor()
where_clause = "WHERE type = '{}'".format(part_type.upper()) if part_type else ""
_auto_threshold = 1.0 if part_type in ('RAM', 'SSD', 'HDD',) else 0.13
_cursor.execute("WITH comp AS (SELECT id, name, similarity(name::text, %s::text) as sim FROM parts " + where_clause + ") SELECT * FROM comp WHERE sim >=%s ORDER BY sim DESC LIMIT 5;", (name, _auto_threshold,))
data = _cursor.fetchall()
# for row in (data or []):
# print(row)
return data[:1] if data else None
def handle_single_part(_dict, part_type):
assert "name" in _dict
assert bool(part_type)
if part_type == 'DISK':
return None
name = _dict["name"]
if part_type == 'GPU':
name = name.replace("GeForce", "").replace("NVIDIA", "")
matched_parts = check_similarity(db_conn, name, part_type)
if not matched_parts:
return None
return matched_parts[0][:-1]
def create_new_one(_dict, part_type, _cursor):
_dict = _dict.copy()
name = _dict.pop('name')
_part_type_enum = part_type.upper()
if "serial" in _dict:
_ = _dict.pop('serial')
_obj = list(_dict.items())
columns = ", ".join([f'"{key}"' for key, _ in _obj])
values = [value for _, value in _obj]
sql = """WITH rows AS (INSERT INTO parts (name, type) VALUES (%s, %s) ON CONFLICT (name) DO UPDATE SET name = excluded.name RETURNING id)
INSERT INTO part_info_{type} (part_id, {columns})
(SELECT rows.id, {value_entries} FROM rows)
RETURNING part_id;""".format(type=_part_type_enum, columns=columns, value_entries=", ".join(["%s"] * len(values)))
_cursor.execute(sql, (name, _part_type_enum, *values))
data = _cursor.fetchall()
if data:
return data[0][0]
return None
def create_combination(_cursor, nickname, _user_id):
_pc_name = "새 PC"
if nickname:
_pc_name = "" + str(nickname) + "의 PC"
_cursor.execute("INSERT INTO combinations (owner_id, name) VALUES (%s, %s) RETURNING id;", (_user_id, _pc_name,))
data = _cursor.fetchall()
if data:
return data[0][0]
return None
def wire_part_to_combination(_cursor, _combination_id, *_part_ids):
values = []
for _part_id in _part_ids:
values.append(_combination_id)
values.append(_part_id)
_cursor.execute("INSERT INTO relations (combination_id, part_id) VALUES " + ", ".join(["(%s, %s)" for _ in _part_ids]) + "RETURNING id, part_id", values)
return _cursor.fetchall()
def match_part_obj_into_db(_part, part_type):
matched = handle_single_part(_part, part_type)
_matched_id = None
if matched:
_matched_id = matched[0]
return _matched_id
def finalize_transaction(_cursor, _transaction_id, _user_id, _combination_id):
_cursor.execute("INSERT INTO transactions (id, user_id, combination_id) VALUES (%s, %s, %s);", (_transaction_id, _user_id, _combination_id))
_cursor.fetchall()
def func():
value = R.lpop("mypc:queue")
if value is None:
return
code = value.decode()
print("Processing code:", code)
_user_id = R.get(f"mypc:code:{code}:user_id")
_transaction_id = R.get(f"mypc:code:{code}:transaction_id")
_xml_document = R.get(f"mypc:code:{code}:document")
print("(User ID (raw):", _user_id)
print("Transaction ID: (raw)", _transaction_id)
print("XML starts with (raw):", _xml_document)
assert _user_id is not None, "userId must not be null"
assert _transaction_id is not None, "transactionId must not be null"
assert _xml_document is not None, "document must not be null"
_user_id = int(_user_id.decode())
_transaction_id = _transaction_id.decode()
_xml_document = _xml_document.decode()
print("User ID:", _user_id)
print("Transaction ID:", _transaction_id)
print("XML starts with:", repr(_xml_document[:50]))
_parts = parse.handle_xml_body(_xml_document)
_part_ids = []
print("Matching parts")
for part_type, parts in _parts.items(): # 부품 유형별 iterate
for part in parts: # 부품별 iterat
if part_type == "DISK":
disk_part_type = (part.pop("type", None) or "not_set").upper()
if disk_part_type not in ("SSD", "HDD",):
print("[WARNING] not supported disk type", disk_part_type, file=sys.stderr)
continue
part_type = disk_part_type
_matched_id = match_part_obj_into_db(part, part_type)
if not _matched_id: # DB에 없는 부품이라면
print(".. creating new part with", part["name"])
with db_conn.cursor() as cursor:
_matched_id = create_new_one(part, part_type, cursor)
db_conn.commit()
if _matched_id:
_part_ids.append(_matched_id,)
continue
print(f"[WARNING] failed to match part type={part_type}, part:", part, file=sys.stderr)
print("Creating combination")
with db_conn.cursor() as cursor:
_combination_id = create_combination(cursor, None, _user_id)
if not _combination_id:
print("Failed to create combination", file=sys.stderr)
exit(1)
print("Wiring parts", _part_ids, "to combination id", _combination_id)
with db_conn.cursor() as cursor:
wire_part_to_combination(cursor, _combination_id, *_part_ids)
print("Committing to DB")
db_conn.commit()
print("Finalizing transaction")
with db_conn.cursor() as cursor:
finalize_transaction(cursor, _transaction_id, _user_id, _combination_id)
print("Committing to DB")
db_conn.commit()
print("Removing anchor key from Redis")
R.delete(f"mypc:code:{code}:document")
if __name__ == "__main__":
_first_run = True
global R
R = utils.env_connect_redis()
if not R:
exit(1)
global db_conn
db_conn = utils.env_connect_postgresql()
if not db_conn:
exit(1)
_next_run_delta = timedelta(seconds=int(os.getenv("PARSE_DAEMON_REFRESH_INTERVAL", 5)))
print("Check Redis as interval", _next_run_delta)
last_run = datetime.now()
last_run -= timedelta(microseconds=last_run.microsecond)
next_run = last_run
try:
while True:
if not _first_run and datetime.now() <= next_run:
time.sleep(min(0.1, (next_run - datetime.now()).total_seconds()))
continue
_first_run = False
next_run += _next_run_delta
func()
except KeyboardInterrupt:
exit(0)
except Exception as e:
raise e
finally:
print("Closing connections")
R.close()
db_conn.close()