본문 바로가기
카테고리 없음

업비트 postgresql 2

by 나스닥171819 2023. 3. 1.
728x90
반응형
import os
import sys
import time
import json
import datetime
import asyncio
import logging
import traceback
import websockets
import psycopg2
 
# 실행 환경에 따른 공통 모듈 Import
#sys.path.append(os.path.dirname(os.path.dirname(__file__)))
import upbit2
 
# 프로그램 정보
pgm_name = 'save_ticker_pg'
pgm_name_kr = '업비트 TICKER 웹소켓 데이터 저장(PostgreSQL)'
 
 
# -----------------------------------------------------------------------------
# - Name : get_subscribe_items
# - Desc : 구독 대상 종목 조회
# -----------------------------------------------------------------------------
def get_subscribe_items():
    try:
        subscribe_items = []
 
        # KRW 마켓 전 종목 추출
        items = upbit2.get_items('KRW', '')
 
        # 종목코드 배열로 변환
        for item in items:
            subscribe_items.append(item['market'])
 
        return subscribe_items
 
    # ---------------------------------------
    # Exception 처리
    # ----------------------------------------
    except Exception:
        raise
 
 
# -----------------------------------------------------------------------------
# - Name : upbit_ws_client
# - Desc : 업비트 웹소켓
# -----------------------------------------------------------------------------
async def upbit_ws_client():
    try:
 
        # 처리 Count 용
        data_cnt = 0
 
        # 중복 실행 방지용
        seconds = 0
 
        # 종목별 시퀀스
        item_seq = {}
 
        # 구독 데이터 조회
        subscribe_items = get_subscribe_items()
 
        logging.info('구독 종목 개수 : ' + str(len(subscribe_items)))
        logging.info('구독 종목 : ' + str(subscribe_items))
 
        # 구독 데이터 조립
        subscribe_fmt = [
            {"ticket": "test-websocket"},
            {
                "type": "ticker",
                "codes": subscribe_items,
                "isOnlyRealtime": True
            },
            {"format": "SIMPLE"}
        ]
 
        subscribe_data = json.dumps(subscribe_fmt)
 
        conn=psycopg2.connect(database="postgres",user="postgres",password="postgres",host="192.168.219.103",port="5432")


        # PostgreSQL 데이터 베이스 연결
        #conn = psycopg2.connect(host=upbit2.get_env_keyvalue('192.168.219.103')
        #                       ,dbname=upbit2.get_env_keyvalue('postgres')
        #                       ,user=upbit2.get_env_keyvalue('postgres')
        #                       ,password=upbit2.get_env_keyvalue('postgres')
        #                       ,port=upbit2.get_env_keyvalue('5432'))
 
        # 자동 커밋
        conn.autocommit = True
 
        # 커서 획득
        cur = conn.cursor()
 
        # INSERT SQL 준비
        sql = "INSERT INTO TICKER_DATA (DATETIME,CODE,OPENING_PRICE,HIGH_PRICE,LOW_PRICE,TRADE_PRICE,PREV_CLOSING_PRICE,CHANGE,CHANGE_PRICE,SIGNED_CHANGE_PRICE \
                                       ,CHANGE_RATE,SIGNED_CHANGE_RATE,TRADE_VOLUME,ACC_TRADE_VOLUME,ACC_TRADE_VOLUME_24H,ACC_TRADE_PRICE,ACC_TRADE_PRICE_24H,TRADE_DATE,TRADE_TIME,TRADE_TIMESTAMP \
                                       ,ASK_BID,ACC_ASK_VOLUME,ACC_BID_VOLUME,HIGHEST_52_WEEK_PRICE,HIGHEST_52_WEEK_DATE,LOWEST_52_WEEK_PRICE,LOWEST_52_WEEK_DATE,MARKET_STATE,IS_TRADING_SUSPENDED,DELISTING_DATE \
                                       ,MARKET_WARNING,TIMESTAMP,STREAM_TYPE,SYS_DATETIME) \
                                VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,CURRENT_TIMESTAMP);"
 
        async with websockets.connect(upbit2.ws_url) as websocket:
 
            await websocket.send(subscribe_data)
 
            while True:
                # 5초 단위 종목 개수 변동여부 확인 용
                period = datetime.datetime.now()
 
                data = await websocket.recv()
                data = json.loads(data)
 
                # 저장용 데이터 조립
                args = (datetime.datetime.fromtimestamp(int(data['tms']/1000)), data['cd'], data['op'], data['hp'], data['lp'], data['tp'],data['pcp'], data['c'], data['cp'], data['scp']
                       ,data['cr'], data['scr'], data['tv'], data['atv'], data['atv24h'], data['atp'], data['atp24h'], data['tdt'], data['ttm'], data['ttms']
                       ,data['ab'], data['aav'], data['abv'], data['h52wp'], data['h52wdt'], data['l52wp'], data['l52wdt'], data['ms'], data['its'], data['dd']
                       ,data['mw'], data['tms'], data['st'])
 
                # 데이터 저장
                cur.execute(sql, args)
 
                # 데이터 저장 로깅
                data_cnt = data_cnt + 1
                if data_cnt % 1000 == 0:
                    logging.info("[" + str(datetime.datetime.now()) + "] [TICKER_DATA] Inserting Data....[" + str(data_cnt) + "]")
 
                # 5초마다 종목 정보 재 조회 후 추가된 종목이 있으면 웹소켓 다시 시작
                if (period.second % 5) == 0 and seconds != period.second:
                    # 중복 실행 방지
                    seconds = period.second
 
                    # 종목 재조회
                    re_subscribe_items = get_subscribe_items()
 
                    # 현재 종목과 다르면 웹소켓 다시 시작
                    if subscribe_items != re_subscribe_items:
                        logging.info('종목 달리짐! 웹소켓 다시 시작')
                        logging.info('\n\n')
                        logging.info('*************************************************')
                        logging.info('기존 종목[' + str(len(subscribe_items)) + '] : ' + str(subscribe_items))
                        logging.info('종목 재조회[' + str(len(re_subscribe_items)) + '] : ' + str(re_subscribe_items))
                        logging.info('*************************************************')
                        logging.info('\n\n')
                        await websocket.close()
                        time.sleep(1)
                        await upbit_ws_client()
 
    # ----------------------------------------
    # 모든 함수의 공통 부분(Exception 처리)
    # ----------------------------------------
    except Exception as e:
        logging.error('Exception Raised!')
        logging.error(e)
        logging.error('Connect Again!')
 
        # 웹소켓 다시 시작
        await upbit_ws_client()
 
 
# -----------------------------------------------------------------------------
# - Name : main
# - Desc : 메인
# -----------------------------------------------------------------------------
async def main():
    try:
        # 웹소켓 시작
        await upbit_ws_client()
 
    except Exception as e:
        logging.error('Exception Raised!')
        logging.error(e)
 
 
# -----------------------------------------------------------------------------
# - Name : main
# - Desc : 메인
# -----------------------------------------------------------------------------
if __name__ == "__main__":
 
    # noinspection PyBroadException
    try:
        print("***** USAGE ******")
        print("[1] 로그레벨(D:DEBUG, E:ERROR, 그외:INFO)")
 
        if sys.platform.startswith('win32'):
            # 로그레벨(D:DEBUG, E:ERROR, 그외:INFO)
            log_level = 'I'
            upbit2.set_loglevel(log_level)
        else:
            # 로그레벨(D:DEBUG, E:ERROR, 그외:INFO)
            log_level = sys.argv[1].upper()
            upbit2.set_loglevel(log_level)
 
        if log_level == '':
            logging.error("입력값 오류!")
            sys.exit(-1)
 
        logging.info("***** INPUT ******")
        logging.info("[1] 로그레벨(D:DEBUG, E:ERROR, 그외:INFO):" + str(log_level))
 
        # ---------------------------------------------------------------------
        # Logic Start!
        # ---------------------------------------------------------------------
        # 웹소켓 시작
        asyncio.run(main())
 
    except KeyboardInterrupt:
        logging.error("KeyboardInterrupt Exception 발생!")
        logging.error(traceback.format_exc())
        sys.exit(-100)
 
    except Exception:
        logging.error("Exception 발생!")
        logging.error(traceback.format_exc())
        sys.exit(-200)
반응형