From bbb9b62e0af4e221761ef944fa3fd2a13a67ccaa Mon Sep 17 00:00:00 2001 From: sunghwki Date: Tue, 3 Dec 2024 18:30:07 +0900 Subject: [PATCH 01/14] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20refactor:=20?= =?UTF-8?q?=EB=B6=84=EB=B4=89=20=EB=8D=B0=EC=9D=B4=ED=84=B0=20callback=20?= =?UTF-8?q?=ED=98=95=ED=83=9C=EB=A1=9C=20=EB=B0=94=EA=BF=94=20=EC=9A=B0?= =?UTF-8?q?=EC=84=A0=EC=88=9C=EC=9C=84=20=ED=81=90=20=EC=A0=81=EC=9A=A9,?= =?UTF-8?q?=20=EB=A6=AC=ED=8C=A9=ED=86=A0=EB=A7=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../openapi/api/openapiMinuteData.api.ts | 158 ++++++++---------- 1 file changed, 70 insertions(+), 88 deletions(-) diff --git a/packages/backend/src/scraper/openapi/api/openapiMinuteData.api.ts b/packages/backend/src/scraper/openapi/api/openapiMinuteData.api.ts index 608d39cc..36577e44 100644 --- a/packages/backend/src/scraper/openapi/api/openapiMinuteData.api.ts +++ b/packages/backend/src/scraper/openapi/api/openapiMinuteData.api.ts @@ -1,49 +1,93 @@ -import { Inject, Injectable } from '@nestjs/common'; +import { Injectable } from '@nestjs/common'; +import { Cron } from '@nestjs/schedule'; import { DataSource } from 'typeorm'; -import { Logger } from 'winston'; -import { openApiConfig } from '../config/openapi.config'; - +import { + Json, + OpenapiQueue, + OpenapiQueueNodeValue, +} from '../queue/openapi.queue'; import { isMinuteData, MinuteData, UpdateStockQuery, } from '../type/openapiMinuteData.type'; import { TR_IDS } from '../type/openapiUtil.type'; -import { getCurrentTime, getOpenApi } from '../util/openapiUtil.api'; -import { OpenapiTokenApi } from './openapiToken.api'; +import { getCurrentTime } from '../util/openapiUtil.api'; +import { Alarm } from '@/alarm/domain/alarm.entity'; import { Stock } from '@/stock/domain/stock.entity'; import { StockData, StockMinutely } from '@/stock/domain/stockData.entity'; -const STOCK_CUT = 4; - @Injectable() export class OpenapiMinuteData { - private stock: Stock[][] = []; private readonly entity = StockMinutely; private readonly url: string = '/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice'; - private readonly intervals: number = 130; - private flip: number = 0; constructor( private readonly datasource: DataSource, - private readonly openApiToken: OpenapiTokenApi, - @Inject('winston') private readonly logger: Logger, + private readonly openapiQueue: OpenapiQueue, ) { - //this.getStockData(); + this.getStockMinuteData(); } - async getStockData() { + @Cron(`* 9-15 * * 1-5`) + async getStockMinuteData() { if (process.env.NODE_ENV !== 'production') return; - const stock = await this.datasource.manager.findBy(Stock, { - isTrading: true, - }); - const stockSize = Math.ceil(stock.length / STOCK_CUT); - let i = 0; - this.stock = []; - while (i < STOCK_CUT) { - this.stock.push(stock.slice(i * stockSize, (i + 1) * stockSize)); - i++; + const alarms = await this.datasource.manager + .getRepository(Alarm) + .createQueryBuilder('alarm') + .leftJoin('alarm.stock', 'stock') + .select('stock.id', 'stockId') + .addSelect('COUNT(alarm.id)', 'alarmCount') + .groupBy('stock.id') + .orderBy('alarmCount', 'DESC') + .execute(); + console.log(alarms); + for (const alarm of alarms) { + const time = getCurrentTime(); + const query = this.getUpdateStockQuery(alarm.stockId, time); + const node: OpenapiQueueNodeValue = { + url: this.url, + query, + trId: TR_IDS.MINUTE_DATA, + callback: this.getStockMinuteDataCallback(alarm.stockId, time), + }; + this.openapiQueue.enqueue(node); + } + } + + getStockMinuteDataCallback(stockId: string, time: string) { + return async (data: Json) => { + let output; + if (data.output2) output = data.output2; + if (output && output[0] && isMinuteData(output[0])) { + console.log(output); + this.saveMinuteData(stockId, output as MinuteData[], time); + } + }; + } + + private async saveMinuteData( + stockId: string, + item: MinuteData[], + time: string, + ) { + if (!this.isMarketOpenTime(time)) return; + const stockPeriod = item.map((val) => + this.convertResToMinuteData(stockId, val, time), + ); + for (const stock of stockPeriod) { + this.datasource.manager + .createQueryBuilder() + .insert() + .into(this.entity) + .values(stock) + .orUpdate( + ['id', 'close', 'low', 'high', 'open', 'volume', 'created_at'], + ['stock_id', 'start_time'], + ) + .execute(); } + //this.datasource.manager.save(this.entity, stockPeriod); } private convertResToMinuteData( @@ -71,70 +115,8 @@ export class OpenapiMinuteData { private isMarketOpenTime(time: string) { const numberTime = parseInt(time); - return numberTime >= 90000 && numberTime <= 153000; - } - - private async saveMinuteData( - stockId: string, - item: MinuteData[], - time: string, - ) { - const manager = this.datasource.manager; - if (!this.isMarketOpenTime(time)) return; - const stockPeriod = item.map((val) => - this.convertResToMinuteData(stockId, val, time), - ); - manager.save(this.entity, stockPeriod); - } - - private async getMinuteDataInterval( - stockId: string, - time: string, - config: typeof openApiConfig, - ) { - const query = this.getUpdateStockQuery(stockId, time); - try { - const response = await getOpenApi( - this.url, - config, - query, - TR_IDS.MINUTE_DATA, - ); - let output; - if (response.output2) output = response.output2; - if (output && output[0] && isMinuteData(output[0])) { - this.saveMinuteData(stockId, output, time); - } - } catch (error) { - this.logger.warn(error); - } - } - - private async getMinuteDataChunk( - chunk: Stock[], - config: typeof openApiConfig, - ) { - const time = getCurrentTime(); - let interval = 0; - for await (const stock of chunk) { - setTimeout( - () => this.getMinuteDataInterval(stock.id!, time, config), - interval, - ); - interval += this.intervals; - } - } - - async getMinuteData() { - if (process.env.NODE_ENV !== 'production') return; - const configCount = (await this.openApiToken.configs()).length; - const stock = this.stock[this.flip % STOCK_CUT]; - this.flip++; - const chunkSize = Math.ceil(stock.length / configCount); - for (let i = 0; i < configCount; i++) { - const chunk = stock.slice(i * chunkSize, (i + 1) * chunkSize); - this.getMinuteDataChunk(chunk, (await this.openApiToken.configs())[i]); - } + // 이거 바꿔놓음 + return numberTime >= 90000 && numberTime <= 183000; } private getUpdateStockQuery( From fc145a95beb8b552617208f95dae4348b39ef6a3 Mon Sep 17 00:00:00 2001 From: sunghwki Date: Tue, 3 Dec 2024 19:26:50 +0900 Subject: [PATCH 02/14] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20refactor:=20?= =?UTF-8?q?=EB=B6=84=EB=8B=A8=EC=9C=84=20=EB=8D=B0=EC=9D=B4=ED=84=B0=20?= =?UTF-8?q?=EC=88=98=EC=A7=91=20stock=20limit=20200,=20=EC=BD=9C=EB=B0=B1?= =?UTF-8?q?=ED=95=A8=EC=88=98=EB=A1=9C=20=EB=A6=AC=ED=8C=A9=ED=86=A0?= =?UTF-8?q?=EB=A7=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../openapi/api/openapiMinuteData.api.ts | 46 ++++++++++---- .../src/scraper/openapi/liveData.service.ts | 2 +- .../openapi/type/openapiMinuteData.type.ts | 60 +++++++++++++++---- 3 files changed, 83 insertions(+), 25 deletions(-) diff --git a/packages/backend/src/scraper/openapi/api/openapiMinuteData.api.ts b/packages/backend/src/scraper/openapi/api/openapiMinuteData.api.ts index 36577e44..e985a539 100644 --- a/packages/backend/src/scraper/openapi/api/openapiMinuteData.api.ts +++ b/packages/backend/src/scraper/openapi/api/openapiMinuteData.api.ts @@ -1,14 +1,18 @@ -import { Injectable } from '@nestjs/common'; +import { Inject, Injectable } from '@nestjs/common'; import { Cron } from '@nestjs/schedule'; import { DataSource } from 'typeorm'; +import { Logger } from 'winston'; import { Json, OpenapiQueue, OpenapiQueueNodeValue, } from '../queue/openapi.queue'; import { - isMinuteData, + isMinuteDataOutput1, + isMinuteDataOutput2, MinuteData, + MinuteDataOutput1, + MinuteDataOutput2, UpdateStockQuery, } from '../type/openapiMinuteData.type'; import { TR_IDS } from '../type/openapiUtil.type'; @@ -22,9 +26,11 @@ export class OpenapiMinuteData { private readonly entity = StockMinutely; private readonly url: string = '/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice'; + private readonly STOCK_LIMITS: number = 200; constructor( private readonly datasource: DataSource, private readonly openapiQueue: OpenapiQueue, + @Inject('winston') private readonly logger: Logger, ) { this.getStockMinuteData(); } @@ -40,8 +46,8 @@ export class OpenapiMinuteData { .addSelect('COUNT(alarm.id)', 'alarmCount') .groupBy('stock.id') .orderBy('alarmCount', 'DESC') + .limit(this.STOCK_LIMITS) .execute(); - console.log(alarms); for (const alarm of alarms) { const time = getCurrentTime(); const query = this.getUpdateStockQuery(alarm.stockId, time); @@ -57,12 +63,27 @@ export class OpenapiMinuteData { getStockMinuteDataCallback(stockId: string, time: string) { return async (data: Json) => { - let output; - if (data.output2) output = data.output2; - if (output && output[0] && isMinuteData(output[0])) { - console.log(output); - this.saveMinuteData(stockId, output as MinuteData[], time); + let output1: MinuteDataOutput1, output2: MinuteDataOutput2[]; + if (data.output1 && isMinuteDataOutput1(data.output1)) { + output1 = data.output1; + } else { + this.logger.info(`${stockId} has invalid minute data`); + return; } + if ( + data.output2 && + data.output2[0] && + isMinuteDataOutput2(data.output2[0]) + ) { + output2 = data.output2 as MinuteDataOutput2[]; + } else { + this.logger.info(`${stockId} has invalid minute data`); + return; + } + const minuteDatas: MinuteData[] = output2.map((val): MinuteData => { + return { acml_vol: output1.acml_vol, ...val }; + }); + await this.saveMinuteData(stockId, minuteDatas, time); }; } @@ -75,19 +96,18 @@ export class OpenapiMinuteData { const stockPeriod = item.map((val) => this.convertResToMinuteData(stockId, val, time), ); - for (const stock of stockPeriod) { + if (stockPeriod[0]) { this.datasource.manager .createQueryBuilder() .insert() .into(this.entity) - .values(stock) + .values(stockPeriod[0]) .orUpdate( ['id', 'close', 'low', 'high', 'open', 'volume', 'created_at'], ['stock_id', 'start_time'], ) .execute(); } - //this.datasource.manager.save(this.entity, stockPeriod); } private convertResToMinuteData( @@ -108,7 +128,7 @@ export class OpenapiMinuteData { stockPeriod.open = parseInt(item.stck_oprc); stockPeriod.high = parseInt(item.stck_hgpr); stockPeriod.low = parseInt(item.stck_lwpr); - stockPeriod.volume = parseInt(item.cntg_vol); + stockPeriod.volume = parseInt(item.acml_vol); stockPeriod.createdAt = new Date(); return stockPeriod; } @@ -116,7 +136,7 @@ export class OpenapiMinuteData { private isMarketOpenTime(time: string) { const numberTime = parseInt(time); // 이거 바꿔놓음 - return numberTime >= 90000 && numberTime <= 183000; + return numberTime >= 90000 && numberTime <= 203000; } private getUpdateStockQuery( diff --git a/packages/backend/src/scraper/openapi/liveData.service.ts b/packages/backend/src/scraper/openapi/liveData.service.ts index 37d757ae..cc483d43 100644 --- a/packages/backend/src/scraper/openapi/liveData.service.ts +++ b/packages/backend/src/scraper/openapi/liveData.service.ts @@ -130,7 +130,7 @@ export class LiveData { return; } const liveData = this.openapiLiveData.convertLiveData(message); - await this.openapiLiveData.saveLiveData(liveData[0]) + await this.openapiLiveData.saveLiveData(liveData[0]); } catch (error) { this.logger.warn(error); } diff --git a/packages/backend/src/scraper/openapi/type/openapiMinuteData.type.ts b/packages/backend/src/scraper/openapi/type/openapiMinuteData.type.ts index 72fb4bfb..9a1088ce 100644 --- a/packages/backend/src/scraper/openapi/type/openapiMinuteData.type.ts +++ b/packages/backend/src/scraper/openapi/type/openapiMinuteData.type.ts @@ -1,35 +1,73 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ -export type MinuteData = { +export type MinuteDataOutput1 = { + prdy_vrss: string; + prdy_vrss_sign: string; + prdy_ctrt: string; + stck_prdy_clpr: string; + acml_vol: string; + acml_tr_pbmn: string; + hts_kor_isnm: string; + stck_prpr: string; +}; + +export type MinuteDataOutput2 = { stck_bsop_date: string; stck_cntg_hour: string; + acml_tr_pbmn: string; stck_prpr: string; stck_oprc: string; stck_hgpr: string; stck_lwpr: string; cntg_vol: string; +}; + +export type MinuteData = { + stck_bsop_date: string; + stck_cntg_hour: string; acml_tr_pbmn: string; + acml_vol: string; + stck_prpr: string; + stck_oprc: string; + stck_hgpr: string; + stck_lwpr: string; + cntg_vol: string; }; -export type UpdateStockQuery = { - fid_etc_cls_code: string; - fid_cond_mrkt_div_code: 'J' | 'W'; - fid_input_iscd: string; - fid_input_hour_1: string; - fid_pw_data_incu_yn: 'Y' | 'N'; +export const isMinuteDataOutput1 = (data: any): data is MinuteDataOutput1 => { + return ( + data !== null && + typeof data === 'object' && + typeof data.prdy_vrss === 'string' && + typeof data.prdy_vrss_sign === 'string' && + typeof data.prdy_ctrt === 'string' && + typeof data.stck_prdy_clpr === 'string' && + typeof data.acml_vol === 'string' && + typeof data.acml_tr_pbmn === 'string' && + typeof data.hts_kor_isnm === 'string' && + typeof data.stck_prpr === 'string' + ); }; -export const isMinuteData = (data: any) => { +export const isMinuteDataOutput2 = (data: any): data is MinuteDataOutput2 => { return ( - data && + data !== null && typeof data === 'object' && typeof data.stck_bsop_date === 'string' && typeof data.stck_cntg_hour === 'string' && + typeof data.acml_tr_pbmn === 'string' && typeof data.stck_prpr === 'string' && typeof data.stck_oprc === 'string' && typeof data.stck_hgpr === 'string' && typeof data.stck_lwpr === 'string' && - typeof data.cntg_vol === 'string' && - typeof data.acml_tr_pbmn === 'string' + typeof data.cntg_vol === 'string' ); }; + +export type UpdateStockQuery = { + fid_etc_cls_code: string; + fid_cond_mrkt_div_code: 'J' | 'W'; + fid_input_iscd: string; + fid_input_hour_1: string; + fid_pw_data_incu_yn: 'Y' | 'N'; +}; From fc840c985fd2014291395e2444ab6d8b2d795af9 Mon Sep 17 00:00:00 2001 From: sunghwki Date: Tue, 3 Dec 2024 19:33:24 +0900 Subject: [PATCH 03/14] =?UTF-8?q?=F0=9F=90=9B=20fix:=20afterUpdate=20?= =?UTF-8?q?=EC=A0=81=EC=9A=A9=20=EC=9C=84=ED=95=9C=20upsert=EA=B5=AC?= =?UTF-8?q?=EB=AC=B8=EC=9C=BC=EB=A1=9C=20=EB=B3=80=EA=B2=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../scraper/openapi/api/openapiMinuteData.api.ts | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/packages/backend/src/scraper/openapi/api/openapiMinuteData.api.ts b/packages/backend/src/scraper/openapi/api/openapiMinuteData.api.ts index e985a539..69a5f865 100644 --- a/packages/backend/src/scraper/openapi/api/openapiMinuteData.api.ts +++ b/packages/backend/src/scraper/openapi/api/openapiMinuteData.api.ts @@ -97,16 +97,10 @@ export class OpenapiMinuteData { this.convertResToMinuteData(stockId, val, time), ); if (stockPeriod[0]) { - this.datasource.manager - .createQueryBuilder() - .insert() - .into(this.entity) - .values(stockPeriod[0]) - .orUpdate( - ['id', 'close', 'low', 'high', 'open', 'volume', 'created_at'], - ['stock_id', 'start_time'], - ) - .execute(); + this.datasource.manager.upsert(this.entity, stockPeriod[0], [ + 'stock.id', + 'startTime', + ]); } } From 9ae9f3a36fdaf9ce050e186c1d5d32d89fa5387f Mon Sep 17 00:00:00 2001 From: sunghwki Date: Tue, 3 Dec 2024 22:18:30 +0900 Subject: [PATCH 04/14] =?UTF-8?q?=F0=9F=92=84=20style:=20=EB=B6=84?= =?UTF-8?q?=EB=8B=A8=EC=9C=84=20=ED=85=8C=EC=8A=A4=ED=8A=B8=20=EC=9D=B4?= =?UTF-8?q?=ED=9B=84=20=ED=85=8C=EC=8A=A4=ED=8A=B8=20=EC=BD=94=EB=93=9C=20?= =?UTF-8?q?=EC=82=AD=EC=A0=9C=20=EB=B0=8F=20=EC=A1=B0=EA=B1=B4=20=EC=9B=90?= =?UTF-8?q?=EB=B3=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/scraper/openapi/api/openapiMinuteData.api.ts | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/packages/backend/src/scraper/openapi/api/openapiMinuteData.api.ts b/packages/backend/src/scraper/openapi/api/openapiMinuteData.api.ts index 69a5f865..a36fd213 100644 --- a/packages/backend/src/scraper/openapi/api/openapiMinuteData.api.ts +++ b/packages/backend/src/scraper/openapi/api/openapiMinuteData.api.ts @@ -31,9 +31,7 @@ export class OpenapiMinuteData { private readonly datasource: DataSource, private readonly openapiQueue: OpenapiQueue, @Inject('winston') private readonly logger: Logger, - ) { - this.getStockMinuteData(); - } + ) {} @Cron(`* 9-15 * * 1-5`) async getStockMinuteData() { @@ -129,8 +127,7 @@ export class OpenapiMinuteData { private isMarketOpenTime(time: string) { const numberTime = parseInt(time); - // 이거 바꿔놓음 - return numberTime >= 90000 && numberTime <= 203000; + return numberTime >= 90000 && numberTime <= 153000; } private getUpdateStockQuery( From 3ae677ceee20c14cb595aa1ffc0dd1eb94485dde Mon Sep 17 00:00:00 2001 From: sunghwki Date: Tue, 3 Dec 2024 22:19:00 +0900 Subject: [PATCH 05/14] =?UTF-8?q?=F0=9F=92=84=20style:=20dto=20=EC=95=88?= =?UTF-8?q?=20=EC=93=B0=EC=9D=B4=EB=8A=94=20=EC=86=8D=EC=84=B1=20=EC=82=AD?= =?UTF-8?q?=EC=A0=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/backend/src/alarm/dto/subscribe.request.ts | 4 ---- 1 file changed, 4 deletions(-) diff --git a/packages/backend/src/alarm/dto/subscribe.request.ts b/packages/backend/src/alarm/dto/subscribe.request.ts index 1ed453e0..a049184c 100644 --- a/packages/backend/src/alarm/dto/subscribe.request.ts +++ b/packages/backend/src/alarm/dto/subscribe.request.ts @@ -1,10 +1,6 @@ import { ApiProperty } from '@nestjs/swagger'; -//import { User } from '@/user/domain/user.entity'; export class SubscriptionData { - //@ApiProperty({ type: () => User, description: '유저 아이디' }) - //user: User; - @ApiProperty({ type: 'string', description: '엔드 포인트 설정', From 31d2ef70e6c91e2015a7281bf049daeaa79123b6 Mon Sep 17 00:00:00 2001 From: sunghwki Date: Tue, 3 Dec 2024 22:19:40 +0900 Subject: [PATCH 06/14] =?UTF-8?q?=F0=9F=92=84=20style:=20console.log=20?= =?UTF-8?q?=EC=82=AD=EC=A0=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/backend/src/alarm/alarm.subscriber.ts | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/packages/backend/src/alarm/alarm.subscriber.ts b/packages/backend/src/alarm/alarm.subscriber.ts index f5e15a6b..3f320b73 100644 --- a/packages/backend/src/alarm/alarm.subscriber.ts +++ b/packages/backend/src/alarm/alarm.subscriber.ts @@ -28,13 +28,13 @@ export class AlarmSubscriber } isValidAlarm(alarm: Alarm, entity: StockMinutely) { - if (alarm.alarmDate && alarm.alarmDate > entity.createdAt) { + if (alarm.alarmDate && alarm.alarmDate >= entity.createdAt) { return false; } else { - if (alarm.targetPrice && alarm.targetPrice >= entity.open) { + if (alarm.targetPrice && alarm.targetPrice <= entity.open) { return true; } - if (alarm.targetVolume && alarm.targetVolume >= entity.volume) { + if (alarm.targetVolume && alarm.targetVolume <= entity.volume) { return true; } return false; @@ -48,7 +48,6 @@ export class AlarmSubscriber where: { stock: { id: stockMinutely.stock.id } }, relations: ['user', 'stock'], }); - const alarms = rawAlarms.filter((val) => this.isValidAlarm(val, stockMinutely), ); From 72edf0dff3d63b1df18596bb6b4638250054abd4 Mon Sep 17 00:00:00 2001 From: sunghwki Date: Tue, 3 Dec 2024 22:23:46 +0900 Subject: [PATCH 07/14] =?UTF-8?q?=F0=9F=93=9D=20docs:=20liveData=EC=97=90?= =?UTF-8?q?=20unsubscribe,=20subscribe=20=EB=A9=94=EC=8B=9C=EC=A7=80=20inf?= =?UTF-8?q?o=EB=A1=9C=20=EC=B6=9C=EB=A0=A5=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/backend/src/scraper/openapi/liveData.service.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/backend/src/scraper/openapi/liveData.service.ts b/packages/backend/src/scraper/openapi/liveData.service.ts index cc483d43..83b0fb91 100644 --- a/packages/backend/src/scraper/openapi/liveData.service.ts +++ b/packages/backend/src/scraper/openapi/liveData.service.ts @@ -75,6 +75,7 @@ export class LiveData { stockId, '1', ); + this.logger.info(`${idx} : ${message}`); this.websocketClient[idx].subscribe(message); return; } @@ -99,7 +100,7 @@ export class LiveData { stockId, '2', ); - + this.logger.info(`${idx} : ${message}`); this.websocketClient[idx].unsubscribe(message); } } From 65ecaaaddf1a50a65b404d217cb7d2ae402cf242 Mon Sep 17 00:00:00 2001 From: sunghwki Date: Tue, 3 Dec 2024 22:28:36 +0900 Subject: [PATCH 08/14] =?UTF-8?q?=F0=9F=90=9B=20fix:=20=EC=95=8C=EB=9E=8C?= =?UTF-8?q?=EC=9D=84=20=ED=95=9C=EB=B2=88=EB=A7=8C=20=EB=B3=B4=EB=82=B4?= =?UTF-8?q?=EA=B3=A0=20=EC=82=AD=EC=A0=9C=EC=B2=98=EB=A6=AC=ED=95=98?= =?UTF-8?q?=EA=B2=8C=20=EB=A7=8C=EB=93=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/backend/src/alarm/alarm.service.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/backend/src/alarm/alarm.service.ts b/packages/backend/src/alarm/alarm.service.ts index a2b214ce..d8e6343d 100644 --- a/packages/backend/src/alarm/alarm.service.ts +++ b/packages/backend/src/alarm/alarm.service.ts @@ -110,6 +110,8 @@ export class AlarmService { for (const subscription of subscriptions) { await this.pushService.sendPushNotification(subscription, payload); + //한번만 보내고 삭제하게 처리. + this.alarmRepository.delete(alarm.id); } } } From ac48adeba4fa9c70bfc81c9408b682e2c9fc4219 Mon Sep 17 00:00:00 2001 From: sunghwki Date: Wed, 4 Dec 2024 11:04:29 +0900 Subject: [PATCH 09/14] =?UTF-8?q?=F0=9F=90=9B=20fix:=20=EC=9C=A0=EC=A0=80?= =?UTF-8?q?=EA=B0=80=20=EB=8F=99=EC=8B=9C=EC=97=90=20=EB=8B=A4=EB=A5=B8=20?= =?UTF-8?q?=EC=A3=BC=EC=8B=9D=20=EB=B0=A9=EC=97=90=20=EC=9E=88=EB=8A=94=20?= =?UTF-8?q?=EA=B2=83=EC=9D=84=20=EB=B0=A9=EC=A7=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/scraper/openapi/liveData.service.ts | 6 ++++- .../websocket/websocketClient.websocket.ts | 27 +++++++------------ packages/backend/src/stock/stock.gateway.ts | 4 +++ 3 files changed, 19 insertions(+), 18 deletions(-) diff --git a/packages/backend/src/scraper/openapi/liveData.service.ts b/packages/backend/src/scraper/openapi/liveData.service.ts index 83b0fb91..59dea09a 100644 --- a/packages/backend/src/scraper/openapi/liveData.service.ts +++ b/packages/backend/src/scraper/openapi/liveData.service.ts @@ -12,7 +12,7 @@ type TR_IDS = '1' | '2'; @Injectable() export class LiveData { - private readonly startTime: Date = new Date(2024, 0, 1, 9, 0, 0, 0); + private readonly startTime: Date = new Date(2024, 0, 1, 2, 0, 0, 0); private readonly endTime: Date = new Date(2024, 0, 1, 15, 30, 0, 0); private readonly reconnectInterval = 60 * 1000; @@ -37,6 +37,10 @@ export class LiveData { } this.connect(); }); + this.subscribe('005930'); + this.subscribe('000660'); + this.subscribe('000150'); + this.subscribe('000020'); } private async openapiSubscribe(stockId: string) { diff --git a/packages/backend/src/scraper/openapi/websocket/websocketClient.websocket.ts b/packages/backend/src/scraper/openapi/websocket/websocketClient.websocket.ts index d8704787..e036b5b6 100644 --- a/packages/backend/src/scraper/openapi/websocket/websocketClient.websocket.ts +++ b/packages/backend/src/scraper/openapi/websocket/websocketClient.websocket.ts @@ -6,16 +6,15 @@ import { RawData, WebSocket } from 'ws'; export class WebsocketClient { static url = process.env.WS_URL ?? 'ws://ops.koreainvestment.com:21000'; private client: WebSocket; - private messageQueue: string[] = []; + //현재 factory 패턴을 이용해 할당하면 socket이 열리기 전에 message가 가는 문제가 있음. + // 소켓이 할당되기 전에(client에 소켓이 없을 때) message를 보내려 시도함. constructor(@Inject('winston') private readonly logger: Logger) { this.client = new WebSocket(WebsocketClient.url); - this.initOpen(() => this.flushQueue()); - this.initError((error) => this.logger.error('WebSocket error', error)); } - static websocketFactory(logger: Logger) { - return new WebsocketClient(logger); + const websocket = new WebsocketClient(logger); + return websocket; } subscribe(message: string) { @@ -48,28 +47,22 @@ export class WebsocketClient { initCloseCallback: () => void, initErrorCallback: (error: unknown) => void, ) { - this.initOpen(initOpenCallback(this.sendMessage.bind(this))); + this.initOpen(initOpenCallback(this.sendMessage)); this.initMessage(initMessageCallback(this.client)); this.initDisconnect(initCloseCallback); this.initError(initErrorCallback); } private sendMessage(message: string) { + if (!this.client || !this.client.readyState) { + this.logger.warn('WebSocket is not open. Message not sent. '); + return; + } if (this.client.readyState === WebSocket.OPEN) { this.client.send(message); this.logger.info(`Sent message: ${message}`); } else { - this.logger.warn('WebSocket not open. Queueing message.'); - this.messageQueue.push(message); // 큐에 메시지를 추가 - } - } - - private flushQueue() { - while (this.messageQueue.length > 0) { - const message = this.messageQueue.shift(); - if (message) { - this.sendMessage(message); - } + this.logger.warn('WebSocket is not open. Message not sent. '); } } } diff --git a/packages/backend/src/stock/stock.gateway.ts b/packages/backend/src/stock/stock.gateway.ts index 618163d1..aaf54c69 100644 --- a/packages/backend/src/stock/stock.gateway.ts +++ b/packages/backend/src/stock/stock.gateway.ts @@ -37,6 +37,10 @@ export class StockGateway implements OnGatewayDisconnect { ) { try { client.join(stockId); + const beforeStockId = this.users.get(client.id); + if (beforeStockId !== undefined) { + client.leave(beforeStockId); + } this.users.set(client.id, stockId); await this.mutex.runExclusive(async () => { From c69ebd9db280d8f028568119c5a0d2e1208d2473 Mon Sep 17 00:00:00 2001 From: sunghwki Date: Wed, 4 Dec 2024 11:15:52 +0900 Subject: [PATCH 10/14] =?UTF-8?q?=F0=9F=90=9B=20fix:=20=EC=9C=A0=EC=A0=80?= =?UTF-8?q?=EA=B0=80=20=EB=8F=99=EC=8B=9C=EC=97=90=20=EB=8B=A4=EB=A5=B8=20?= =?UTF-8?q?=EC=A3=BC=EC=8B=9D=EC=97=90=20=EC=A0=91=EC=86=8D=ED=95=98?= =?UTF-8?q?=EB=8A=94=20=EA=B2=83=EC=9D=84=20=EB=A7=89=EC=9D=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/backend/src/stock/stock.gateway.ts | 46 +++++++++++++-------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/packages/backend/src/stock/stock.gateway.ts b/packages/backend/src/stock/stock.gateway.ts index aaf54c69..4aef1a37 100644 --- a/packages/backend/src/stock/stock.gateway.ts +++ b/packages/backend/src/stock/stock.gateway.ts @@ -30,6 +30,15 @@ export class StockGateway implements OnGatewayDisconnect { @Inject('winston') private readonly logger: Logger, ) {} + private async handleJoinToRoom(stockId: string) { + const connectedSockets = await this.server.to(stockId).fetchSockets(); + + if (connectedSockets.length > 0 && !this.liveData.isSubscribe(stockId)) { + await this.liveData.subscribe(stockId); + this.logger.info(`${stockId} is subscribed`); + } + } + @SubscribeMessage('connectStock') async handleConnectStock( @MessageBody() stockId: string, @@ -37,22 +46,14 @@ export class StockGateway implements OnGatewayDisconnect { ) { try { client.join(stockId); + const beforeStockId = this.users.get(client.id); - if (beforeStockId !== undefined) { - client.leave(beforeStockId); - } + await this.handleClientStockEvent(beforeStockId, client); + this.users.set(client.id, stockId); await this.mutex.runExclusive(async () => { - const connectedSockets = await this.server.to(stockId).fetchSockets(); - - if ( - connectedSockets.length > 0 && - !this.liveData.isSubscribe(stockId) - ) { - await this.liveData.subscribe(stockId); - this.logger.info(`${stockId} is subscribed`); - } + this.handleJoinToRoom(stockId); }); client.emit('connectionSuccess', { @@ -67,16 +68,27 @@ export class StockGateway implements OnGatewayDisconnect { } } - async handleDisconnect(client: Socket) { - const stockId = this.users.get(client.id); - if (stockId) { + private async handleClientStockEvent( + stockId: string | undefined, + client: Socket, + ) { + if (stockId !== undefined) { await this.mutex.runExclusive(async () => { - await this.liveData.unsubscribe(stockId); - this.users.delete(client.id); + const values = Object.values(this.users); + const isStockIdExists = values.some((value) => stockId === value); + if (!isStockIdExists) { + await this.liveData.unsubscribe(stockId); + this.users.delete(client.id); + } }); } } + async handleDisconnect(client: Socket) { + const stockId = this.users.get(client.id); + await this.handleClientStockEvent(stockId, client); + } + onUpdateStock( stockId: string, price: number, From d45d2e6693a52b7ef89672fad8a7f06cff57194f Mon Sep 17 00:00:00 2001 From: sunghwki Date: Wed, 4 Dec 2024 11:18:09 +0900 Subject: [PATCH 11/14] =?UTF-8?q?=F0=9F=90=9B=20fix:=20client=20leave=20?= =?UTF-8?q?=EC=B6=94=E3=85=8F=E3=84=B1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/backend/src/stock/stock.gateway.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/backend/src/stock/stock.gateway.ts b/packages/backend/src/stock/stock.gateway.ts index 4aef1a37..f72c4307 100644 --- a/packages/backend/src/stock/stock.gateway.ts +++ b/packages/backend/src/stock/stock.gateway.ts @@ -79,6 +79,7 @@ export class StockGateway implements OnGatewayDisconnect { if (!isStockIdExists) { await this.liveData.unsubscribe(stockId); this.users.delete(client.id); + client.leave(stockId); } }); } From a4571c153599f91cfa9fcdba8908d7d0b6acc257 Mon Sep 17 00:00:00 2001 From: sunghwki Date: Wed, 4 Dec 2024 11:21:31 +0900 Subject: [PATCH 12/14] =?UTF-8?q?=F0=9F=90=9B=20fix:=20leave=20=EC=BD=94?= =?UTF-8?q?=EB=93=9C=EA=B0=80=20values=EB=A5=BC=20=EA=B2=80=EC=82=AC?= =?UTF-8?q?=ED=95=98=EB=8A=94=20=EB=A1=9C=EC=A7=81=EB=B3=B4=EB=8B=A4=20?= =?UTF-8?q?=EB=92=A4=EC=97=90=20=EC=9E=88=EC=96=B4=20=EB=B0=9C=EC=83=9D?= =?UTF-8?q?=EB=90=98=EB=8A=94=20=EB=AC=B8=EC=A0=9C=20=ED=95=B4=EA=B2=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/backend/src/stock/stock.gateway.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/backend/src/stock/stock.gateway.ts b/packages/backend/src/stock/stock.gateway.ts index f72c4307..80c829dd 100644 --- a/packages/backend/src/stock/stock.gateway.ts +++ b/packages/backend/src/stock/stock.gateway.ts @@ -74,12 +74,12 @@ export class StockGateway implements OnGatewayDisconnect { ) { if (stockId !== undefined) { await this.mutex.runExclusive(async () => { + client.leave(stockId); const values = Object.values(this.users); const isStockIdExists = values.some((value) => stockId === value); if (!isStockIdExists) { await this.liveData.unsubscribe(stockId); this.users.delete(client.id); - client.leave(stockId); } }); } From f2ad68b61ddc8360fb67f770f33ac5a15c811e4c Mon Sep 17 00:00:00 2001 From: sunghwki Date: Wed, 4 Dec 2024 11:22:22 +0900 Subject: [PATCH 13/14] =?UTF-8?q?=F0=9F=90=9B=20fix:=20delete=EB=8F=84=20?= =?UTF-8?q?=EC=9D=B4=EB=8F=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/backend/src/stock/stock.gateway.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/backend/src/stock/stock.gateway.ts b/packages/backend/src/stock/stock.gateway.ts index 80c829dd..5808a253 100644 --- a/packages/backend/src/stock/stock.gateway.ts +++ b/packages/backend/src/stock/stock.gateway.ts @@ -75,11 +75,11 @@ export class StockGateway implements OnGatewayDisconnect { if (stockId !== undefined) { await this.mutex.runExclusive(async () => { client.leave(stockId); + this.users.delete(client.id); const values = Object.values(this.users); const isStockIdExists = values.some((value) => stockId === value); if (!isStockIdExists) { await this.liveData.unsubscribe(stockId); - this.users.delete(client.id); } }); } From 629b47d2accd3163045657c05e76fc935455df5d Mon Sep 17 00:00:00 2001 From: sunghwki Date: Wed, 4 Dec 2024 11:41:35 +0900 Subject: [PATCH 14/14] =?UTF-8?q?=F0=9F=90=9B=20fix:=20deadlock=20?= =?UTF-8?q?=EB=B0=9C=EC=83=9D=20=EC=97=86=EC=9D=B4=20=EB=8F=99=EC=8B=9C?= =?UTF-8?q?=EC=84=B1=20=EC=A0=9C=EC=96=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/backend/src/stock/stock.gateway.ts | 29 ++++++++++----------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/packages/backend/src/stock/stock.gateway.ts b/packages/backend/src/stock/stock.gateway.ts index 5808a253..f11c7a08 100644 --- a/packages/backend/src/stock/stock.gateway.ts +++ b/packages/backend/src/stock/stock.gateway.ts @@ -47,12 +47,11 @@ export class StockGateway implements OnGatewayDisconnect { try { client.join(stockId); - const beforeStockId = this.users.get(client.id); - await this.handleClientStockEvent(beforeStockId, client); - - this.users.set(client.id, stockId); - await this.mutex.runExclusive(async () => { + const beforeStockId = this.users.get(client.id); + await this.handleClientStockEvent(beforeStockId, client); + + this.users.set(client.id, stockId); this.handleJoinToRoom(stockId); }); @@ -73,21 +72,21 @@ export class StockGateway implements OnGatewayDisconnect { client: Socket, ) { if (stockId !== undefined) { - await this.mutex.runExclusive(async () => { - client.leave(stockId); - this.users.delete(client.id); - const values = Object.values(this.users); - const isStockIdExists = values.some((value) => stockId === value); - if (!isStockIdExists) { - await this.liveData.unsubscribe(stockId); - } - }); + client.leave(stockId); + this.users.delete(client.id); + const values = Object.values(this.users); + const isStockIdExists = values.some((value) => stockId === value); + if (!isStockIdExists) { + await this.liveData.unsubscribe(stockId); + } } } async handleDisconnect(client: Socket) { const stockId = this.users.get(client.id); - await this.handleClientStockEvent(stockId, client); + await this.mutex.runExclusive(async () => { + await this.handleClientStockEvent(stockId, client); + }); } onUpdateStock(