Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/#341 분봉 리팩토링 및 장 시간대에 분봉 받아오기, alarm 한번만 작동 #343

Merged
merged 8 commits into from
Dec 3, 2024
2 changes: 2 additions & 0 deletions packages/backend/src/alarm/alarm.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ export class AlarmService {

for (const subscription of subscriptions) {
await this.pushService.sendPushNotification(subscription, payload);
//한번만 보내고 삭제하게 처리.
this.alarmRepository.delete(alarm.id);
}
}
}
7 changes: 3 additions & 4 deletions packages/backend/src/alarm/alarm.subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ export class AlarmSubscriber
}

isValidAlarm(alarm: Alarm, entity: StockMinutely) {
if (alarm.alarmDate && alarm.alarmDate > entity.createdAt) {
if (alarm.alarmDate && alarm.alarmDate >= entity.createdAt) {
return false;
} else {
if (alarm.targetPrice && alarm.targetPrice >= entity.open) {
if (alarm.targetPrice && alarm.targetPrice <= entity.open) {
return true;
}
if (alarm.targetVolume && alarm.targetVolume >= entity.volume) {
if (alarm.targetVolume && alarm.targetVolume <= entity.volume) {
return true;
}
return false;
Expand All @@ -48,7 +48,6 @@ export class AlarmSubscriber
where: { stock: { id: stockMinutely.stock.id } },
relations: ['user', 'stock'],
});

const alarms = rawAlarms.filter((val) =>
this.isValidAlarm(val, stockMinutely),
);
Expand Down
4 changes: 0 additions & 4 deletions packages/backend/src/alarm/dto/subscribe.request.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
import { ApiProperty } from '@nestjs/swagger';
//import { User } from '@/user/domain/user.entity';

export class SubscriptionData {
//@ApiProperty({ type: () => User, description: '유저 아이디' })
//user: User;

@ApiProperty({
type: 'string',
description: '엔드 포인트 설정',
Expand Down
169 changes: 81 additions & 88 deletions packages/backend/src/scraper/openapi/api/openapiMinuteData.api.ts
Original file line number Diff line number Diff line change
@@ -1,48 +1,104 @@
import { Inject, Injectable } from '@nestjs/common';
import { Cron } from '@nestjs/schedule';
import { DataSource } from 'typeorm';
import { Logger } from 'winston';
import { openApiConfig } from '../config/openapi.config';

import {
isMinuteData,
Json,
OpenapiQueue,
OpenapiQueueNodeValue,
} from '../queue/openapi.queue';
import {
isMinuteDataOutput1,
isMinuteDataOutput2,
MinuteData,
MinuteDataOutput1,
MinuteDataOutput2,
UpdateStockQuery,
} from '../type/openapiMinuteData.type';
import { TR_IDS } from '../type/openapiUtil.type';
import { getCurrentTime, getOpenApi } from '../util/openapiUtil.api';
import { OpenapiTokenApi } from './openapiToken.api';
import { getCurrentTime } from '../util/openapiUtil.api';
import { Alarm } from '@/alarm/domain/alarm.entity';
import { Stock } from '@/stock/domain/stock.entity';
import { StockData, StockMinutely } from '@/stock/domain/stockData.entity';

const STOCK_CUT = 4;

@Injectable()
export class OpenapiMinuteData {
private stock: Stock[][] = [];
private readonly entity = StockMinutely;
private readonly url: string =
'/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice';
private readonly intervals: number = 130;
private flip: number = 0;
private readonly STOCK_LIMITS: number = 200;
constructor(
private readonly datasource: DataSource,
private readonly openApiToken: OpenapiTokenApi,
private readonly openapiQueue: OpenapiQueue,
@Inject('winston') private readonly logger: Logger,
) {
//this.getStockData();
}
) {}

async getStockData() {
@Cron(`* 9-15 * * 1-5`)
async getStockMinuteData() {
if (process.env.NODE_ENV !== 'production') return;
const stock = await this.datasource.manager.findBy(Stock, {
isTrading: true,
});
const stockSize = Math.ceil(stock.length / STOCK_CUT);
let i = 0;
this.stock = [];
while (i < STOCK_CUT) {
this.stock.push(stock.slice(i * stockSize, (i + 1) * stockSize));
i++;
const alarms = await this.datasource.manager
.getRepository(Alarm)
.createQueryBuilder('alarm')
.leftJoin('alarm.stock', 'stock')
.select('stock.id', 'stockId')
.addSelect('COUNT(alarm.id)', 'alarmCount')
.groupBy('stock.id')
.orderBy('alarmCount', 'DESC')
.limit(this.STOCK_LIMITS)
.execute();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

알람 카운터가 많은 순으로 보내긴 하지만, 저희가 구현한 우선순위큐는 마지막에 실행될거에요

for (const alarm of alarms) {
const time = getCurrentTime();
const query = this.getUpdateStockQuery(alarm.stockId, time);
const node: OpenapiQueueNodeValue = {
url: this.url,
query,
trId: TR_IDS.MINUTE_DATA,
callback: this.getStockMinuteDataCallback(alarm.stockId, time),
};
this.openapiQueue.enqueue(node);
}
}

getStockMinuteDataCallback(stockId: string, time: string) {
return async (data: Json) => {
let output1: MinuteDataOutput1, output2: MinuteDataOutput2[];
if (data.output1 && isMinuteDataOutput1(data.output1)) {
output1 = data.output1;
} else {
this.logger.info(`${stockId} has invalid minute data`);
return;
}
if (
data.output2 &&
data.output2[0] &&
isMinuteDataOutput2(data.output2[0])
) {
output2 = data.output2 as MinuteDataOutput2[];
} else {
this.logger.info(`${stockId} has invalid minute data`);
return;
}
const minuteDatas: MinuteData[] = output2.map((val): MinuteData => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reduce로 순환하면서 타입을 체크하고 진행하는 방법도 있습니다!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

오..... reduce도 괜찮겠네요!

return { acml_vol: output1.acml_vol, ...val };
});
await this.saveMinuteData(stockId, minuteDatas, time);
};
}

private async saveMinuteData(
stockId: string,
item: MinuteData[],
time: string,
) {
if (!this.isMarketOpenTime(time)) return;
const stockPeriod = item.map((val) =>
this.convertResToMinuteData(stockId, val, time),
);
if (stockPeriod[0]) {
this.datasource.manager.upsert(this.entity, stockPeriod[0], [
'stock.id',
'startTime',
]);
}
}

Expand All @@ -64,7 +120,7 @@ export class OpenapiMinuteData {
stockPeriod.open = parseInt(item.stck_oprc);
stockPeriod.high = parseInt(item.stck_hgpr);
stockPeriod.low = parseInt(item.stck_lwpr);
stockPeriod.volume = parseInt(item.cntg_vol);
stockPeriod.volume = parseInt(item.acml_vol);
stockPeriod.createdAt = new Date();
return stockPeriod;
}
Expand All @@ -74,69 +130,6 @@ export class OpenapiMinuteData {
return numberTime >= 90000 && numberTime <= 153000;
}

private async saveMinuteData(
stockId: string,
item: MinuteData[],
time: string,
) {
const manager = this.datasource.manager;
if (!this.isMarketOpenTime(time)) return;
const stockPeriod = item.map((val) =>
this.convertResToMinuteData(stockId, val, time),
);
manager.save(this.entity, stockPeriod);
}

private async getMinuteDataInterval(
stockId: string,
time: string,
config: typeof openApiConfig,
) {
const query = this.getUpdateStockQuery(stockId, time);
try {
const response = await getOpenApi(
this.url,
config,
query,
TR_IDS.MINUTE_DATA,
);
let output;
if (response.output2) output = response.output2;
if (output && output[0] && isMinuteData(output[0])) {
this.saveMinuteData(stockId, output, time);
}
} catch (error) {
this.logger.warn(error);
}
}

private async getMinuteDataChunk(
chunk: Stock[],
config: typeof openApiConfig,
) {
const time = getCurrentTime();
let interval = 0;
for await (const stock of chunk) {
setTimeout(
() => this.getMinuteDataInterval(stock.id!, time, config),
interval,
);
interval += this.intervals;
}
}

async getMinuteData() {
if (process.env.NODE_ENV !== 'production') return;
const configCount = (await this.openApiToken.configs()).length;
const stock = this.stock[this.flip % STOCK_CUT];
this.flip++;
const chunkSize = Math.ceil(stock.length / configCount);
for (let i = 0; i < configCount; i++) {
const chunk = stock.slice(i * chunkSize, (i + 1) * chunkSize);
this.getMinuteDataChunk(chunk, (await this.openApiToken.configs())[i]);
}
}

private getUpdateStockQuery(
stockId: string,
time: string,
Expand Down
5 changes: 3 additions & 2 deletions packages/backend/src/scraper/openapi/liveData.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ export class LiveData {
stockId,
'1',
);
this.logger.info(`${idx} : ${message}`);
this.websocketClient[idx].subscribe(message);
return;
}
Expand All @@ -99,7 +100,7 @@ export class LiveData {
stockId,
'2',
);

this.logger.info(`${idx} : ${message}`);
this.websocketClient[idx].unsubscribe(message);
}
}
Expand Down Expand Up @@ -130,7 +131,7 @@ export class LiveData {
return;
}
const liveData = this.openapiLiveData.convertLiveData(message);
await this.openapiLiveData.saveLiveData(liveData[0])
await this.openapiLiveData.saveLiveData(liveData[0]);
} catch (error) {
this.logger.warn(error);
}
Expand Down
60 changes: 49 additions & 11 deletions packages/backend/src/scraper/openapi/type/openapiMinuteData.type.ts
Original file line number Diff line number Diff line change
@@ -1,35 +1,73 @@
/* eslint-disable @typescript-eslint/no-explicit-any */

export type MinuteData = {
export type MinuteDataOutput1 = {
prdy_vrss: string;
prdy_vrss_sign: string;
prdy_ctrt: string;
stck_prdy_clpr: string;
acml_vol: string;
acml_tr_pbmn: string;
hts_kor_isnm: string;
stck_prpr: string;
};

export type MinuteDataOutput2 = {
stck_bsop_date: string;
stck_cntg_hour: string;
acml_tr_pbmn: string;
stck_prpr: string;
stck_oprc: string;
stck_hgpr: string;
stck_lwpr: string;
cntg_vol: string;
};

export type MinuteData = {
stck_bsop_date: string;
stck_cntg_hour: string;
acml_tr_pbmn: string;
acml_vol: string;
stck_prpr: string;
stck_oprc: string;
stck_hgpr: string;
stck_lwpr: string;
cntg_vol: string;
};

export type UpdateStockQuery = {
fid_etc_cls_code: string;
fid_cond_mrkt_div_code: 'J' | 'W';
fid_input_iscd: string;
fid_input_hour_1: string;
fid_pw_data_incu_yn: 'Y' | 'N';
export const isMinuteDataOutput1 = (data: any): data is MinuteDataOutput1 => {
return (
data !== null &&
typeof data === 'object' &&
typeof data.prdy_vrss === 'string' &&
typeof data.prdy_vrss_sign === 'string' &&
typeof data.prdy_ctrt === 'string' &&
typeof data.stck_prdy_clpr === 'string' &&
typeof data.acml_vol === 'string' &&
typeof data.acml_tr_pbmn === 'string' &&
typeof data.hts_kor_isnm === 'string' &&
typeof data.stck_prpr === 'string'
);
};

export const isMinuteData = (data: any) => {
export const isMinuteDataOutput2 = (data: any): data is MinuteDataOutput2 => {
return (
data &&
data !== null &&
typeof data === 'object' &&
typeof data.stck_bsop_date === 'string' &&
typeof data.stck_cntg_hour === 'string' &&
typeof data.acml_tr_pbmn === 'string' &&
typeof data.stck_prpr === 'string' &&
typeof data.stck_oprc === 'string' &&
typeof data.stck_hgpr === 'string' &&
typeof data.stck_lwpr === 'string' &&
typeof data.cntg_vol === 'string' &&
typeof data.acml_tr_pbmn === 'string'
typeof data.cntg_vol === 'string'
);
};

export type UpdateStockQuery = {
fid_etc_cls_code: string;
fid_cond_mrkt_div_code: 'J' | 'W';
fid_input_iscd: string;
fid_input_hour_1: string;
fid_pw_data_incu_yn: 'Y' | 'N';
};
Loading