From eafa2b329f8230be4079198b75821ec16bdcc5c0 Mon Sep 17 00:00:00 2001 From: sunghwki <52474291+swkim12345@users.noreply.github.com> Date: Tue, 26 Nov 2024 12:51:53 +0900 Subject: [PATCH] =?UTF-8?q?livedata=20=EC=88=98=EC=A7=91=20=EC=B6=94?= =?UTF-8?q?=EA=B0=80=20+=20openapi=EB=A1=9C=20=EC=9E=A5=EC=8B=9C=EA=B0=84,?= =?UTF-8?q?=EB=A7=88=EA=B0=90=20=EB=B3=80=EA=B2=BD=20=EB=A1=9C=EC=A7=81,?= =?UTF-8?q?=20token=EC=9D=84=20=EC=A3=BC=EC=9E=85=ED=95=A0=20=EC=88=98=20?= =?UTF-8?q?=EC=9E=88=EA=B2=8C=20=EB=B3=80=EA=B2=BD=20(#246)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * ✨ feat: token entity 추가 * ✨ feat: entity 에 저장, expire 검사 로직 추가 * 🐛 fix: token 주입으로 로직 변경 * ♻️ refactor: token 주입으로 변경, 그로 인한 오류 수정 및 console.log 삭제 * 🐛 fix: live 데이터 수집 오류 해결, 데이터 없을 때 insert 오류 해결 * 🐛 fix: stock, livedata entity 수정 * ♻️ refactor: develop 환경시 logging 활성화 * 📦️ ci: production 환경일 때 작동되게 변경 * ♻️ refactor: websocket 모듈에서 liveData로 서비스 로직 분리 * ♻️ refactor: livedata stock module로 이동 * ✨ feat: 장 마감시 openapi로 부르는 로직 추가 * 🐛 fix: websocket logger 추가, client stock 저장되지 않는 오류 해결 * 🐛 fix: openapi는 저장이 필요 없음 롤 백 * 🐛 fix: open api로 데이터 받지 못하는 문제 해결 * 💄 style: 안 쓰이는 것 빼기 * 💄 style: console.log 삭제 --- packages/backend/src/configs/typeormConfig.ts | 2 +- .../openapi/api/openapiDetailData.api.ts | 6 +- .../openapi/api/openapiLiveData.api.ts | 52 +++++- .../openapi/api/openapiMinuteData.api.ts | 6 +- .../openapi/api/openapiPeriodData.api.ts | 4 +- .../scraper/openapi/api/openapiToken.api.ts | 22 +-- .../src/scraper/openapi/liveData.service.ts | 172 ++++++++++++++++++ .../scraper/openapi/openapi-scraper.module.ts | 6 +- .../openapi/type/openapiLiveData.type.ts | 105 +++++++++++ .../scraper/openapi/type/openapiUtil.type.ts | 2 + .../websocket/websocketClient.websocket.ts | 61 +++++++ .../openapi/websocketClient.service.ts | 150 --------------- .../src/stock/domain/stockLiveData.entity.ts | 3 - packages/backend/src/stock/stock.gateway.ts | 8 +- packages/backend/src/stock/stock.module.ts | 8 + 15 files changed, 425 insertions(+), 182 deletions(-) create mode 100644 packages/backend/src/scraper/openapi/liveData.service.ts create mode 100644 packages/backend/src/scraper/openapi/websocket/websocketClient.websocket.ts delete mode 100644 packages/backend/src/scraper/openapi/websocketClient.service.ts diff --git a/packages/backend/src/configs/typeormConfig.ts b/packages/backend/src/configs/typeormConfig.ts index f8ff59ac..a8c70a18 100644 --- a/packages/backend/src/configs/typeormConfig.ts +++ b/packages/backend/src/configs/typeormConfig.ts @@ -21,6 +21,6 @@ export const typeormDevelopConfig: TypeOrmModuleOptions = { password: process.env.DB_PASS, database: process.env.DB_NAME, entities: [__dirname + '/../**/*.entity.{js,ts}'], - logging: true, + //logging: true, synchronize: true, }; diff --git a/packages/backend/src/scraper/openapi/api/openapiDetailData.api.ts b/packages/backend/src/scraper/openapi/api/openapiDetailData.api.ts index 6a32435f..982ebaf6 100644 --- a/packages/backend/src/scraper/openapi/api/openapiDetailData.api.ts +++ b/packages/backend/src/scraper/openapi/api/openapiDetailData.api.ts @@ -39,13 +39,13 @@ export class OpenapiDetailData { if (process.env.NODE_ENV !== 'production') return; const entityManager = this.datasource.manager; const stocks = await entityManager.find(Stock); - const configCount = this.openApiToken.configs.length; + const configCount = (await this.openApiToken.configs()).length; const chunkSize = Math.ceil(stocks.length / configCount); for (let i = 0; i < configCount; i++) { - this.logger.info(this.openApiToken.configs[i]); + this.logger.info((await this.openApiToken.configs())[i]); const chunk = stocks.slice(i * chunkSize, (i + 1) * chunkSize); - this.getDetailDataChunk(chunk, this.openApiToken.configs[i]); + this.getDetailDataChunk(chunk, (await this.openApiToken.configs())[i]); } } diff --git a/packages/backend/src/scraper/openapi/api/openapiLiveData.api.ts b/packages/backend/src/scraper/openapi/api/openapiLiveData.api.ts index 6fa15505..7db12b2c 100644 --- a/packages/backend/src/scraper/openapi/api/openapiLiveData.api.ts +++ b/packages/backend/src/scraper/openapi/api/openapiLiveData.api.ts @@ -1,12 +1,17 @@ import { Inject, Injectable } from '@nestjs/common'; import { DataSource } from 'typeorm'; import { Logger } from 'winston'; +import { openApiConfig } from '../config/openapi.config'; +import { isOpenapiLiveData } from '../type/openapiLiveData.type'; +import { TR_IDS } from '../type/openapiUtil.type'; +import { getOpenApi } from '../util/openapiUtil.api'; import { Stock } from '@/stock/domain/stock.entity'; import { StockLiveData } from '@/stock/domain/stockLiveData.entity'; @Injectable() export class OpenapiLiveData { - public readonly TR_ID: string = 'H0STCNT0'; + private readonly url: string = + '/uapi/domestic-stock/v1/quotations/inquire-ccnl'; constructor( private readonly datasource: DataSource, @Inject('winston') private readonly logger: Logger, @@ -37,6 +42,26 @@ export class OpenapiLiveData { } } + // 현재가 체결로는 데이터가 부족해 현재가 시세를 사용함. + convertResponseToStockLiveData = ( + data: OpenapiLiveData, + stockId: string, + ): StockLiveData | undefined => { + const stockLiveData = new StockLiveData(); + if (isOpenapiLiveData(data)) { + stockLiveData.stock = { id: stockId } as Stock; + stockLiveData.currentPrice = parseFloat(data.stck_prpr); + stockLiveData.changeRate = parseFloat(data.prdy_ctrt); + stockLiveData.volume = parseInt(data.acml_vol); + stockLiveData.high = parseFloat(data.stck_hgpr); + stockLiveData.low = parseFloat(data.stck_lwpr); + stockLiveData.open = parseFloat(data.stck_oprc); + stockLiveData.updatedAt = new Date(); + + return stockLiveData; + } + }; + convertLiveData(messages: Record[]): StockLiveData[] { const stockData: StockLiveData[] = []; messages.map((message) => { @@ -48,10 +73,33 @@ export class OpenapiLiveData { stockLiveData.high = parseFloat(message.STCK_HGPR); stockLiveData.low = parseFloat(message.STCK_LWPR); stockLiveData.open = parseFloat(message.STCK_OPRC); - stockLiveData.previousClose = parseFloat(message.WGHN_AVRG_STCK_PRC); stockLiveData.updatedAt = new Date(); + stockData.push(stockLiveData); }); return stockData; } + + async connectLiveData(stockId: string, config: typeof openApiConfig) { + const query = this.makeLiveDataQuery(stockId); + + try { + const result = await getOpenApi( + this.url, + config, + query, + TR_IDS.LIVE_DATA, + ); + return result; + } catch (error) { + this.logger.warn(`Connect live data error : ${error}`); + } + } + + private makeLiveDataQuery(stockId: string, code: 'J' = 'J') { + return { + fid_cond_mrkt_div_code: code, + fid_input_iscd: stockId, + }; + } } diff --git a/packages/backend/src/scraper/openapi/api/openapiMinuteData.api.ts b/packages/backend/src/scraper/openapi/api/openapiMinuteData.api.ts index e64e26c7..4e193b74 100644 --- a/packages/backend/src/scraper/openapi/api/openapiMinuteData.api.ts +++ b/packages/backend/src/scraper/openapi/api/openapiMinuteData.api.ts @@ -128,15 +128,15 @@ export class OpenapiMinuteData { } //@Cron(`*/${STOCK_CUT} 9-15 * * 1-5`) - getMinuteData() { + async getMinuteData() { if (process.env.NODE_ENV !== 'production') return; - const configCount = this.openApiToken.configs.length; + const configCount = (await this.openApiToken.configs()).length; const stock = this.stock[this.flip % STOCK_CUT]; this.flip++; const chunkSize = Math.ceil(stock.length / configCount); for (let i = 0; i < configCount; i++) { const chunk = stock.slice(i * chunkSize, (i + 1) * chunkSize); - this.getMinuteDataChunk(chunk, this.openApiToken.configs[i]); + this.getMinuteDataChunk(chunk, (await this.openApiToken.configs())[i]); } } diff --git a/packages/backend/src/scraper/openapi/api/openapiPeriodData.api.ts b/packages/backend/src/scraper/openapi/api/openapiPeriodData.api.ts index 4b976be8..5471cf5e 100644 --- a/packages/backend/src/scraper/openapi/api/openapiPeriodData.api.ts +++ b/packages/backend/src/scraper/openapi/api/openapiPeriodData.api.ts @@ -96,7 +96,7 @@ export class OpenapiPeriodData { let isFail = false; while (!isFail) { - configIdx = (configIdx + 1) % this.openApiToken.configs.length; + configIdx = (configIdx + 1) % (await this.openApiToken.configs()).length; this.setStockPeriod(stockPeriod, stock.id!, end); // chart 데이터가 있는 지 확인 -> 리턴 @@ -130,7 +130,7 @@ export class OpenapiPeriodData { try { const response = await getOpenApi( this.url, - this.openApiToken.configs[configIdx], + (await this.openApiToken.configs())[configIdx], query, TR_IDS.ITEM_CHART_PRICE, ); diff --git a/packages/backend/src/scraper/openapi/api/openapiToken.api.ts b/packages/backend/src/scraper/openapi/api/openapiToken.api.ts index 829ca9a3..fee72534 100644 --- a/packages/backend/src/scraper/openapi/api/openapiToken.api.ts +++ b/packages/backend/src/scraper/openapi/api/openapiToken.api.ts @@ -15,7 +15,6 @@ export class OpenapiTokenApi { @Inject('winston') private readonly logger: Logger, private readonly datasource: DataSource, ) { - if (process.env.NODE_ENV !== 'production') return; const accounts = openApiConfig.STOCK_ACCOUNT!.split(','); const api_keys = openApiConfig.STOCK_API_KEY!.split(','); const api_passwords = openApiConfig.STOCK_API_PASSWORD!.split(','); @@ -34,17 +33,16 @@ export class OpenapiTokenApi { STOCK_API_PASSWORD: api_passwords[i], }); } - this.init(); } - get configs() { - this.init(); + async configs() { + await this.init(); return this.config; } @Cron('30 0 * * 1-5') async init() { - const tokens = await this.convertConfigToTokenEntity(this.config); + const tokens = this.convertConfigToTokenEntity(this.config); const config = await this.getPropertyFromDB(tokens); const expired = config.filter( (val) => @@ -54,24 +52,24 @@ export class OpenapiTokenApi { if (expired.length || !config.length) { await this.initAuthenValue(); - const newTokens = await this.convertConfigToTokenEntity(this.config); - this.savePropertyToDB(newTokens); + const newTokens = this.convertConfigToTokenEntity(this.config); + await this.savePropertyToDB(newTokens); } else { - this.config = await this.convertTokenEntityToConfig(config); + this.config = this.convertTokenEntityToConfig(config); } } private isTokenExpired(startDate?: Date) { if (!startDate) return true; const now = new Date(); - //실제 만료 시간은 24시간이지만, 문제의 소지가 발생하는 것을 방지하기 위해 20시간으로 설정함. + //실제 만료 시간은 24시간이지만, 문제가 발생할 여지를 줄이기 위해 20시간으로 설정 const baseTimeToMilliSec = 20 * 60 * 60 * 1000; const timeDiff = now.getTime() - startDate.getTime(); return timeDiff >= baseTimeToMilliSec; } - private async convertTokenEntityToConfig(tokens: OpenapiToken[]) { + private convertTokenEntityToConfig(tokens: OpenapiToken[]) { const result: (typeof openApiConfig)[] = []; tokens.forEach((val) => { const config: typeof openApiConfig = { @@ -87,7 +85,7 @@ export class OpenapiTokenApi { return result; } - private async convertConfigToTokenEntity(config: (typeof openApiConfig)[]) { + private convertConfigToTokenEntity(config: (typeof openApiConfig)[]) { const result: OpenapiToken[] = []; config.forEach((val) => { const token = new OpenapiToken(); @@ -171,6 +169,7 @@ export class OpenapiTokenApi { }), ); this.config = updatedConfig; + this.logger.info(`Init access token : ${this.config}`); } private async initWebSocketKey() { @@ -181,6 +180,7 @@ export class OpenapiTokenApi { }), ); this.config = updatedConfig; + this.logger.info(`Init websocket token : ${this.config}`); } private async getToken(config: typeof openApiConfig): Promise { diff --git a/packages/backend/src/scraper/openapi/liveData.service.ts b/packages/backend/src/scraper/openapi/liveData.service.ts new file mode 100644 index 00000000..68ed8c0f --- /dev/null +++ b/packages/backend/src/scraper/openapi/liveData.service.ts @@ -0,0 +1,172 @@ +import { Inject, Injectable } from '@nestjs/common'; +import { Cron } from '@nestjs/schedule'; +import { Logger } from 'winston'; +import { RawData, WebSocket } from 'ws'; +import { OpenapiLiveData } from './api/openapiLiveData.api'; +import { OpenapiTokenApi } from './api/openapiToken.api'; +import { openApiConfig } from './config/openapi.config'; +import { parseMessage } from './parse/openapi.parser'; +import { WebsocketClient } from './websocket/websocketClient.websocket'; + +type TR_IDS = '0' | '1'; + +@Injectable() +export class LiveData { + private readonly clientStock: Set = new Set(); + private readonly reconnectInterval = 60 * 1000 * 1000; + + private readonly startTime: Date = new Date(2024, 0, 1, 9, 0, 0, 0); + private readonly endTime: Date = new Date(2024, 0, 1, 15, 30, 0, 0); + constructor( + private readonly openApiToken: OpenapiTokenApi, + private readonly webSocketClient: WebsocketClient, + private readonly openapiLiveData: OpenapiLiveData, + @Inject('winston') private readonly logger: Logger, + ) { + this.connect(); + this.subscribe('000020'); + } + + private async openapiSubscribe(stockId: string) { + const config = (await this.openApiToken.configs())[0]; + const result = await this.openapiLiveData.connectLiveData(stockId, config); + this.logger.info(JSON.stringify(result)); + try { + const stockLiveData = this.openapiLiveData.convertResponseToStockLiveData( + result.output, + stockId, + ); + if (stockLiveData) { + this.openapiLiveData.saveLiveData([stockLiveData]); + } + } catch (error) { + this.logger.warn(`Subscribe error in open api : ${error}`); + } + } + + async subscribe(stockId: string) { + if (this.isCloseTime(new Date(), this.startTime, this.endTime)) { + await this.openapiSubscribe(stockId); + } else { + // TODO : 하나의 config만 사용중. + this.clientStock.add(stockId); + const message = this.convertObjectToMessage( + (await this.openApiToken.configs())[0], + stockId, + '1', + ); + this.webSocketClient.subscribe(message); + } + } + + async discribe(stockId: string) { + if (this.clientStock.has(stockId)) { + this.clientStock.delete(stockId); + const message = this.convertObjectToMessage( + (await this.openApiToken.configs())[0], + stockId, + '0', + ); + this.webSocketClient.discribe(message); + } + } + + private initOpenCallback = + (sendMessage: (message: string) => void) => async () => { + this.logger.info('WebSocket connection established'); + for (const stockId of this.clientStock.keys()) { + const message = this.convertObjectToMessage( + (await this.openApiToken.configs())[0], + stockId, + '1', + ); + sendMessage(message); + } + }; + + private initMessageCallback = + (client: WebSocket) => async (data: RawData) => { + try { + const message = this.parseMessage(data); + if (message.header) { + if (message.header.tr_id === 'PINGPONG') { + this.logger.info(`Received PING: ${data}`); + client.pong(data); + } + return; + } + this.logger.info(`Recived data : ${data}`); + this.logger.info(`Stock id : ${message[0]['STOCK_ID']}`); + const liveData = this.openapiLiveData.convertLiveData(message); + await this.openapiLiveData.saveLiveData(liveData); + } catch (error) { + this.logger.warn(error); + } + }; + + private initCloseCallback = () => { + this.logger.warn( + `WebSocket connection closed. Reconnecting in ${this.reconnectInterval / 60 / 1000} minute...`, + ); + }; + + private initErrorCallback = (error: unknown) => { + if (error instanceof Error) { + this.logger.error(`WebSocket error: ${error.message}`); + } else { + this.logger.error('WebSocket error: callback function'); + } + setTimeout(() => this.connect(), this.reconnectInterval); + }; + + private isCloseTime(date: Date, start: Date, end: Date): boolean { + const dateMinutes = date.getHours() * 60 + date.getMinutes(); + const startMinutes = start.getHours() * 60 + start.getMinutes(); + const endMinutes = end.getHours() * 60 + end.getMinutes(); + + return dateMinutes <= startMinutes || dateMinutes >= endMinutes; + } + + @Cron('0 2 * * 1-5') + connect() { + this.webSocketClient.connectPacade( + this.initOpenCallback, + this.initMessageCallback, + this.initCloseCallback, + this.initErrorCallback, + ); + } + + private convertObjectToMessage( + config: typeof openApiConfig, + stockId: string, + tr_type: TR_IDS, + ): string { + this.logger.info(JSON.stringify(config)); + const message = { + header: { + approval_key: config.STOCK_WEBSOCKET_KEY!, + custtype: 'P', + tr_type, + 'content-type': 'utf-8', + }, + body: { + input: { + tr_id: 'H0STCNT0', + tr_key: stockId, + }, + }, + }; + return JSON.stringify(message); + } + + private parseMessage(data: RawData) { + if (typeof data === 'object' && !(data instanceof Buffer)) { + return data; + } else if (typeof data === 'object') { + return parseMessage(data.toString()); + } else { + return parseMessage(data as string); + } + } +} diff --git a/packages/backend/src/scraper/openapi/openapi-scraper.module.ts b/packages/backend/src/scraper/openapi/openapi-scraper.module.ts index 73f33f23..4afbf154 100644 --- a/packages/backend/src/scraper/openapi/openapi-scraper.module.ts +++ b/packages/backend/src/scraper/openapi/openapi-scraper.module.ts @@ -1,12 +1,10 @@ import { Module } from '@nestjs/common'; import { TypeOrmModule } from '@nestjs/typeorm'; import { OpenapiDetailData } from './api/openapiDetailData.api'; -import { OpenapiLiveData } from './api/openapiLiveData.api'; import { OpenapiMinuteData } from './api/openapiMinuteData.api'; import { OpenapiPeriodData } from './api/openapiPeriodData.api'; import { OpenapiTokenApi } from './api/openapiToken.api'; import { OpenapiScraperService } from './openapi-scraper.service'; -import { WebsocketClient } from './websocketClient.service'; import { Stock } from '@/stock/domain/stock.entity'; import { StockDaily, @@ -38,9 +36,7 @@ import { StockLiveData } from '@/stock/domain/stockLiveData.entity'; OpenapiMinuteData, OpenapiDetailData, OpenapiScraperService, - OpenapiLiveData, - WebsocketClient, + OpenapiTokenApi, ], - exports: [WebsocketClient], }) export class OpenapiScraperModule {} diff --git a/packages/backend/src/scraper/openapi/type/openapiLiveData.type.ts b/packages/backend/src/scraper/openapi/type/openapiLiveData.type.ts index e1687cee..18bead3e 100644 --- a/packages/backend/src/scraper/openapi/type/openapiLiveData.type.ts +++ b/packages/backend/src/scraper/openapi/type/openapiLiveData.type.ts @@ -148,3 +148,108 @@ export const stockDataKeys = [ 'MRKT_TRTM_CLS_CODE', 'VI_STND_PRC', ]; + +export type OpenapiLiveData = { + iscd_stat_cls_code: string; + marg_rate: string; + rprs_mrkt_kor_name: string; + bstp_kor_isnm: string; + temp_stop_yn: string; + oprc_rang_cont_yn: string; + clpr_rang_cont_yn: string; + crdt_able_yn: string; + grmn_rate_cls_code: string; + elw_pblc_yn: string; + stck_prpr: string; + prdy_vrss: string; + prdy_vrss_sign: string; + prdy_ctrt: string; + acml_tr_pbmn: string; + acml_vol: string; + prdy_vrss_vol_rate: string; + stck_oprc: string; + stck_hgpr: string; + stck_lwpr: string; + stck_mxpr: string; + stck_llam: string; + stck_sdpr: string; + wghn_avrg_stck_prc: string; + hts_frgn_ehrt: string; + frgn_ntby_qty: string; + pgtr_ntby_qty: string; + pvt_scnd_dmrs_prc: string; + pvt_frst_dmrs_prc: string; + pvt_pont_val: string; + pvt_frst_dmsp_prc: string; + pvt_scnd_dmsp_prc: string; + dmrs_val: string; + dmsp_val: string; + cpfn: string; + rstc_wdth_prc: string; + stck_fcam: string; + stck_sspr: string; + aspr_unit: string; + hts_deal_qty_unit_val: string; + lstn_stcn: string; + hts_avls: string; + per: string; + pbr: string; + stac_month: string; + vol_tnrt: string; + eps: string; + bps: string; + d250_hgpr: string; + d250_hgpr_date: string; + d250_hgpr_vrss_prpr_rate: string; + d250_lwpr: string; + d250_lwpr_date: string; + d250_lwpr_vrss_prpr_rate: string; + stck_dryy_hgpr: string; + dryy_hgpr_vrss_prpr_rate: string; + dryy_hgpr_date: string; + stck_dryy_lwpr: string; + dryy_lwpr_vrss_prpr_rate: string; + dryy_lwpr_date: string; + w52_hgpr: string; + w52_hgpr_vrss_prpr_ctrt: string; + w52_hgpr_date: string; + w52_lwpr: string; + w52_lwpr_vrss_prpr_ctrt: string; + w52_lwpr_date: string; + whol_loan_rmnd_rate: string; + ssts_yn: string; + stck_shrn_iscd: string; + fcam_cnnm: string; + cpfn_cnnm: string; + frgn_hldn_qty: string; + vi_cls_code: string; + ovtm_vi_cls_code: string; + last_ssts_cntg_qty: string; + invt_caful_yn: string; + mrkt_warn_cls_code: string; + short_over_yn: string; + sltr_yn: string; +}; + +export const isOpenapiLiveData = (data: any): data is OpenapiLiveData => { + return ( + typeof data === 'object' && + data !== null && + typeof data.iscd_stat_cls_code === 'string' && + typeof data.marg_rate === 'string' && + typeof data.rprs_mrkt_kor_name === 'string' && + typeof data.bstp_kor_isnm === 'string' && + typeof data.temp_stop_yn === 'string' && + typeof data.oprc_rang_cont_yn === 'string' && + typeof data.clpr_rang_cont_yn === 'string' && + typeof data.crdt_able_yn === 'string' && + typeof data.stck_prpr === 'string' && + typeof data.prdy_ctrt === 'string' && + typeof data.acml_vol === 'string' && + typeof data.stck_oprc === 'string' && + typeof data.stck_hgpr === 'string' && + typeof data.stck_lwpr === 'string' && + typeof data.wghn_avrg_stck_prc === 'string' && + typeof data.stck_shrn_iscd === 'string' + ); +}; diff --git a/packages/backend/src/scraper/openapi/type/openapiUtil.type.ts b/packages/backend/src/scraper/openapi/type/openapiUtil.type.ts index 6df0ca19..e9f0869d 100644 --- a/packages/backend/src/scraper/openapi/type/openapiUtil.type.ts +++ b/packages/backend/src/scraper/openapi/type/openapiUtil.type.ts @@ -3,6 +3,7 @@ export type TR_ID = | 'FHKST03010200' | 'FHKST66430300' | 'HHKDB669107C0' + | 'FHKST01010100' | 'CTPF1002R'; export const TR_IDS: Record = { @@ -10,4 +11,5 @@ export const TR_IDS: Record = { MINUTE_DATA: 'FHKST03010200', FINANCIAL_DATA: 'FHKST66430300', PRODUCTION_DETAIL: 'CTPF1002R', + LIVE_DATA: 'FHKST01010100', }; diff --git a/packages/backend/src/scraper/openapi/websocket/websocketClient.websocket.ts b/packages/backend/src/scraper/openapi/websocket/websocketClient.websocket.ts new file mode 100644 index 00000000..06451a53 --- /dev/null +++ b/packages/backend/src/scraper/openapi/websocket/websocketClient.websocket.ts @@ -0,0 +1,61 @@ +/* eslint-disable @typescript-eslint/no-explicit-any */ +import { Inject, Injectable } from '@nestjs/common'; +import { Logger } from 'winston'; +import { RawData, WebSocket } from 'ws'; + +@Injectable() +export class WebsocketClient { + private readonly url = + process.env.WS_URL ?? 'ws://ops.koreainvestment.com:21000'; + private client: WebSocket = new WebSocket(this.url); + + constructor(@Inject('winston') private readonly logger: Logger) {} + + subscribe(message: string) { + this.logger.info(`Subscribe : ${message}`); + this.sendMessage(message); + } + + discribe(message: string) { + this.logger.info(`Discribe : ${message}`); + this.sendMessage(message); + } + + // TODO : 분리 + private initDisconnect( + initCloseCallback: () => void, + initErrorCallback: (error: unknown) => void, + ) { + this.client.on('close', initCloseCallback); + + this.client.on('error', initErrorCallback); + } + + private initOpen(fn: () => void) { + this.client.on('open', fn); + } + + private initMessage(fn: (data: RawData) => void) { + this.client.on('message', fn); + } + + connectPacade( + initOpenCallback: (fn: (message: string) => void) => () => void, + initMessageCallback: (client: WebSocket) => (data: RawData) => void, + initCloseCallback: () => void, + initErrorCallback: (error: unknown) => void, + ) { + this.initOpen(initOpenCallback(this.sendMessage)); + this.initMessage(initMessageCallback(this.client)); + this.initDisconnect(initCloseCallback, initErrorCallback); + } + + private sendMessage(message: string) { + if (this.client.readyState === WebSocket.OPEN) { + this.client.send(message); + this.logger.info(`Sent message: ${message}`); + } else { + this.logger.warn('WebSocket is not open. Message not sent.'); + } + } +} diff --git a/packages/backend/src/scraper/openapi/websocketClient.service.ts b/packages/backend/src/scraper/openapi/websocketClient.service.ts deleted file mode 100644 index e784afe5..00000000 --- a/packages/backend/src/scraper/openapi/websocketClient.service.ts +++ /dev/null @@ -1,150 +0,0 @@ -/* eslint-disable @typescript-eslint/no-explicit-any */ -import { Inject, Injectable } from '@nestjs/common'; -import { Cron } from '@nestjs/schedule'; -import { Logger } from 'winston'; -import { RawData, WebSocket } from 'ws'; -import { OpenapiLiveData } from './api/openapiLiveData.api'; -import { OpenapiTokenApi } from './api/openapiToken.api'; -import { openApiConfig } from './config/openapi.config'; -import { parseMessage } from './parse/openapi.parser'; - -type TR_IDS = '0' | '1'; - -@Injectable() -export class WebsocketClient { - private client: WebSocket; - private readonly reconnectInterval = 60000; - private readonly url = - process.env.WS_URL ?? 'ws://ops.koreainvestment.com:21000'; - private readonly clientStock: Set = new Set(); - - constructor( - @Inject('winston') private readonly logger: Logger, - private readonly openApiToken: OpenapiTokenApi, - private readonly openapiLiveData: OpenapiLiveData, - ) { - if (process.env.NODE_ENV === 'production') { - this.connect(); - } - } - - // TODO : subscribe 구조로 리팩토링 - subscribe(stockId: string) { - this.clientStock.add(stockId); - // TODO : 하나의 config만 사용중. - const message = this.convertObjectToMessage( - this.openApiToken.configs[0], - stockId, - '1', - ); - this.sendMessage(message); - } - - discribe(stockId: string) { - this.clientStock.delete(stockId); - const message = this.convertObjectToMessage( - this.openApiToken.configs[0], - stockId, - '0', - ); - this.sendMessage(message); - } - - private initDisconnect() { - this.client.on('close', () => { - this.logger.warn( - `WebSocket connection closed. Reconnecting in ${this.reconnectInterval / 60 / 1000} minute...`, - ); - }); - - this.client.on('error', (error: any) => { - this.logger.error(`WebSocket error: ${error.message}`); - setTimeout(() => this.connect(), this.reconnectInterval); - }); - } - - private initOpen() { - this.client.on('open', () => { - this.logger.info('WebSocket connection established'); - for (const stockId of this.clientStock.keys()) { - const message = this.convertObjectToMessage( - this.openApiToken.configs[0], - stockId, - '1', - ); - this.sendMessage(message); - } - }); - } - - private initMessage() { - this.client.on('message', async (data: RawData) => { - try { - console.log(data); - const message = this.parseMessage(data); - if (message.header) { - if (message.header.tr_id === 'PINGPONG') { - this.logger.info(`Received PING: ${data}`); - this.client.pong(data); - } - return; - } - this.logger.info(`Recived data : ${data}`); - this.logger.info(`Stock id : ${message[0]['STOCK_ID']}`); - const liveData = this.openapiLiveData.convertLiveData(message); - await this.openapiLiveData.saveLiveData(liveData); - } catch (error) { - this.logger.warn(error); - } - }); - } - - private parseMessage(data: RawData) { - if (typeof data === 'object' && !(data instanceof Buffer)) { - return data; - } else if (typeof data === 'object') { - return parseMessage(data.toString()); - } else { - return parseMessage(data as string); - } - } - - @Cron('0 2 * * 1-5') - connect() { - this.client = new WebSocket(this.url); - this.initOpen(); - this.initMessage(); - this.initDisconnect(); - } - - private convertObjectToMessage( - config: typeof openApiConfig, - stockId: string, - tr_type: TR_IDS, - ): string { - const message = { - header: { - approval_key: config.STOCK_WEBSOCKET_KEY!, - custtype: 'P', - tr_type, - 'content-type': 'utf-8', - }, - body: { - input: { - tr_id: 'H0STCNT0', - tr_key: stockId, - }, - }, - }; - return JSON.stringify(message); - } - - private sendMessage(message: string) { - if (this.client.readyState === WebSocket.OPEN) { - this.client.send(message); - this.logger.info(`Sent message: ${message}`); - } else { - this.logger.warn('WebSocket is not open. Message not sent.'); - } - } -} diff --git a/packages/backend/src/stock/domain/stockLiveData.entity.ts b/packages/backend/src/stock/domain/stockLiveData.entity.ts index bf82f8d6..ea480d6b 100644 --- a/packages/backend/src/stock/domain/stockLiveData.entity.ts +++ b/packages/backend/src/stock/domain/stockLiveData.entity.ts @@ -31,9 +31,6 @@ export class StockLiveData { @Column({ type: 'decimal', precision: 15, scale: 2 }) open: number; - @Column({ type: 'decimal', precision: 15, scale: 2 }) - previousClose: number; - @UpdateDateColumn() @Column({ type: 'timestamp' }) updatedAt: Date; diff --git a/packages/backend/src/stock/stock.gateway.ts b/packages/backend/src/stock/stock.gateway.ts index 1f4ab32d..2674690d 100644 --- a/packages/backend/src/stock/stock.gateway.ts +++ b/packages/backend/src/stock/stock.gateway.ts @@ -6,6 +6,7 @@ import { WebSocketServer, } from '@nestjs/websockets'; import { Server, Socket } from 'socket.io'; +import { LiveData } from '@/scraper/openapi/liveData.service'; @WebSocketGateway({ namespace: '/api/stock/realtime', @@ -14,15 +15,18 @@ export class StockGateway { @WebSocketServer() server: Server; - constructor() {} + constructor(private readonly liveData: LiveData) {} @SubscribeMessage('connectStock') - handleConnectStock( + async handleConnectStock( @MessageBody() stockId: string, @ConnectedSocket() client: Socket, ) { client.join(stockId); + if ((await this.server.in(stockId).fetchSockets()).length === 0) { + this.liveData.subscribe(stockId); + } client.emit('connectionSuccess', { message: `Successfully connected to stock room: ${stockId}`, stockId, diff --git a/packages/backend/src/stock/stock.module.ts b/packages/backend/src/stock/stock.module.ts index 13df81b0..3008feb8 100644 --- a/packages/backend/src/stock/stock.module.ts +++ b/packages/backend/src/stock/stock.module.ts @@ -23,6 +23,10 @@ import { } from './stockData.service'; import { StockDetailService } from './stockDetail.service'; import { StockLiveDataSubscriber } from './stockLiveData.subscriber'; +import { OpenapiLiveData } from '@/scraper/openapi/api/openapiLiveData.api'; +import { OpenapiTokenApi } from '@/scraper/openapi/api/openapiToken.api'; +import { LiveData } from '@/scraper/openapi/liveData.service'; +import { WebsocketClient } from '@/scraper/openapi/websocket/websocketClient.websocket'; @Module({ imports: [ @@ -40,6 +44,10 @@ import { StockLiveDataSubscriber } from './stockLiveData.subscriber'; controllers: [StockController], providers: [ StockService, + WebsocketClient, + OpenapiTokenApi, + OpenapiLiveData, + LiveData, StockGateway, StockLiveDataSubscriber, StockDataService,