Skip to content
Snippets Groups Projects
Select Git revision
  • c8158346bf64326273aa164dc55ca2e9d2372db9
  • master default protected
2 results

push_to_db.py

Blame
  • push_to_db.py 6.87 KiB
    import dotenv
    dotenv.load_dotenv(dotenv_path=".env", override=False)
    
    import os
    import re
    import sys
    import time
    import redis
    import psycopg2
    import traceback
    
    import parse
    import utils
    
    from datetime import datetime, timedelta
    
    
    def check_similarity(_conn, name, part_type):
        _cursor = _conn.cursor()
        where_clause = "WHERE type = '{}'".format(part_type.upper()) if part_type else ""
        _auto_threshold = 1.0 if part_type in ('RAM', 'SSD', 'HDD',) else 0.13
    
        _cursor.execute("WITH comp AS (SELECT id, name, similarity(name::text, %s::text) as sim FROM parts " + where_clause + ") SELECT * FROM comp WHERE sim >=%s ORDER BY sim DESC LIMIT 5;", (name, _auto_threshold,))
    
        data = _cursor.fetchall()
        # for row in (data or []):
        #     print(row)
    
        return data[:1] if data else None
    
    
    def handle_single_part(_dict, part_type):
        assert "name" in _dict
        assert bool(part_type)
    
        if part_type == 'DISK':
            return None
        
        name = _dict["name"]
    
        if part_type == 'GPU':
            name = name.replace("GeForce", "").replace("NVIDIA", "")
    
        matched_parts = check_similarity(db_conn, name, part_type)
    
        if not matched_parts:
            return None
    
        return matched_parts[0][:-1]
    
    
    def create_new_one(_dict, part_type, _cursor):
        _dict = _dict.copy()
        name = _dict.pop('name')
        _part_type_enum = part_type.upper()
    
        if "serial" in _dict:
            _ = _dict.pop('serial')
    
        _obj = list(_dict.items())
        columns = ", ".join([f'"{key}"' for key, _ in _obj])
        values = [value for _, value in _obj]
    
        sql = """WITH rows AS (INSERT INTO parts (name, type) VALUES (%s, %s) ON CONFLICT (name) DO UPDATE SET name = excluded.name RETURNING id)
    INSERT INTO part_info_{type} (part_id, {columns})
    (SELECT rows.id, {value_entries} FROM rows)
    RETURNING part_id;""".format(type=_part_type_enum, columns=columns, value_entries=", ".join(["%s"] * len(values)))
        _cursor.execute(sql, (name, _part_type_enum, *values))
        data = _cursor.fetchall()
    
        if data:
            return data[0][0]
        return None
    
    
    def create_combination(_cursor, nickname, _user_id):
        _pc_name = "새 PC"
        if nickname:
            _pc_name = "" + str(nickname) + "의 PC"
    
        _cursor.execute("INSERT INTO combinations (owner_id, name) VALUES (%s, %s) RETURNING id;", (_user_id, _pc_name,))
        data = _cursor.fetchall()
    
        if data:
            return data[0][0]
        return None
    
    
    def wire_part_to_combination(_cursor, _combination_id, *_part_ids):
        values = []
        
        for _part_id in _part_ids:
            values.append(_combination_id)
            values.append(_part_id)
    
        _cursor.execute("INSERT INTO relations (combination_id, part_id) VALUES " + ", ".join(["(%s, %s)" for _ in _part_ids]) + "RETURNING id, part_id", values)
        return _cursor.fetchall()
    
    
    def match_part_obj_into_db(_part, part_type):
        matched = handle_single_part(_part, part_type)
        _matched_id = None
        
        if matched:
            _matched_id = matched[0]
    
        return _matched_id
    
    
    def finalize_transaction(_cursor, _transaction_id, _user_id, _combination_id):
        _cursor.execute("INSERT INTO transactions (id, user_id, combination_id) VALUES (%s, %s, %s);", (_transaction_id, _user_id, _combination_id))
        _cursor.fetchall()
    
    
    def func():
        value = R.lpop("mypc:queue")
        if value is None:
            return
    
        code = value.decode()
        print("Processing code:", code)
    
        _user_id = R.get(f"mypc:code:{code}:user_id")
        _transaction_id = R.get(f"mypc:code:{code}:transaction_id")
        _xml_document = R.get(f"mypc:code:{code}:document")
        print("(User ID (raw):", _user_id)
        print("Transaction ID: (raw)", _transaction_id)
        print("XML starts with (raw):", _xml_document)
    
        assert _user_id is not None, "userId must not be null"
        assert _transaction_id is not None, "transactionId must not be null"
        assert _xml_document is not None, "document must not be null"
    
        _user_id = int(_user_id.decode())
        _transaction_id = _transaction_id.decode()
        _xml_document = _xml_document.decode()
    
        print("User ID:", _user_id)
        print("Transaction ID:", _transaction_id)
        print("XML starts with:", repr(_xml_document[:50]))
    
        _parts = parse.handle_xml_body(_xml_document)
        _part_ids = []
    
        print("Matching parts")
    
        for part_type, parts in _parts.items():  # 부품 유형별 iterate
            for part in parts:  # 부품별 iterat
                if part_type == "DISK":
                    disk_part_type = (part.pop("type", None) or "not_set").upper()
    
                    if disk_part_type not in ("SSD", "HDD",):
                        print("[WARNING] not supported disk type", disk_part_type, file=sys.stderr)
                        continue
    
                    part_type = disk_part_type
    
                _matched_id = match_part_obj_into_db(part, part_type)
    
                if not _matched_id:  # DB에 없는 부품이라면
                    print(".. creating new part with", part["name"])
                    with db_conn.cursor() as cursor:
                        _matched_id = create_new_one(part, part_type, cursor)
                        db_conn.commit()
    
                if _matched_id:
                    _part_ids.append(_matched_id,)
                    continue
    
                print(f"[WARNING] failed to match part type={part_type}, part:", part, file=sys.stderr)
        
        print("Creating combination")
        with db_conn.cursor() as cursor:
            _combination_id = create_combination(cursor, None, _user_id)
    
        if not _combination_id:
            print("Failed to create combination", file=sys.stderr)
            exit(1)
    
        print("Wiring parts", _part_ids, "to combination id", _combination_id)
        with db_conn.cursor() as cursor:
            wire_part_to_combination(cursor, _combination_id, *_part_ids)
    
        print("Committing to DB")
        db_conn.commit()
    
        print("Finalizing transaction")
        with db_conn.cursor() as cursor:
            finalize_transaction(cursor, _transaction_id, _user_id, _combination_id)
    
        print("Committing to DB")
        db_conn.commit()
    
        print("Removing anchor key from Redis")
        R.delete(f"mypc:code:{code}:document")
    
    
    if __name__ == "__main__":
        _first_run = True
    
        global R
        R = utils.env_connect_redis()
        if not R:
            exit(1)
    
        global db_conn
        db_conn = utils.env_connect_postgresql()
        if not db_conn:
            exit(1)
    
        _next_run_delta = timedelta(seconds=int(os.getenv("PARSE_DAEMON_REFRESH_INTERVAL", 5)))
        print("Check Redis as interval", _next_run_delta)
    
        last_run = datetime.now()
        last_run -= timedelta(microseconds=last_run.microsecond)
        next_run = last_run
    
        try:
            while True:
                if not _first_run and datetime.now() <= next_run:
                    time.sleep(min(0.1, (next_run - datetime.now()).total_seconds()))
                    continue
    
                _first_run = False
                next_run += _next_run_delta
                
                func()
        except KeyboardInterrupt:
            exit(0)
        except Exception as e:
            raise e
        finally:
            print("Closing connections")
            R.close()
            db_conn.close()