Skip to content

Commit

Permalink
livedata 수집 추가 + openapi로 장시간,마감 변경 로직, token을 주입할 수 있게 변경 (#246)
Browse files Browse the repository at this point in the history
* ✨ feat: token entity 추가

* ✨ feat: entity 에 저장, expire 검사 로직 추가

* 🐛 fix: token 주입으로 로직 변경

* ♻️ refactor: token 주입으로 변경, 그로 인한 오류 수정 및 console.log 삭제

* 🐛 fix: live 데이터 수집 오류 해결, 데이터 없을 때 insert 오류 해결

* 🐛 fix: stock, livedata entity 수정

* ♻️ refactor: develop 환경시 logging 활성화

* 📦️ ci: production 환경일 때 작동되게 변경

* ♻️ refactor: websocket 모듈에서 liveData로 서비스 로직 분리

* ♻️ refactor: livedata stock module로 이동

* ✨ feat: 장 마감시 openapi로 부르는 로직 추가

* 🐛 fix: websocket logger 추가, client stock 저장되지 않는 오류 해결

* 🐛 fix: openapi는 저장이 필요 없음 롤 백

* 🐛 fix: open api로 데이터 받지 못하는 문제 해결

* 💄 style: 안 쓰이는 것 빼기

* 💄 style: console.log 삭제
  • Loading branch information
swkim12345 authored Nov 26, 2024
1 parent 46a9b28 commit eafa2b3
Show file tree
Hide file tree
Showing 15 changed files with 425 additions and 182 deletions.
2 changes: 1 addition & 1 deletion packages/backend/src/configs/typeormConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ export const typeormDevelopConfig: TypeOrmModuleOptions = {
password: process.env.DB_PASS,
database: process.env.DB_NAME,
entities: [__dirname + '/../**/*.entity.{js,ts}'],
logging: true,
//logging: true,
synchronize: true,
};
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ export class OpenapiDetailData {
if (process.env.NODE_ENV !== 'production') return;
const entityManager = this.datasource.manager;
const stocks = await entityManager.find(Stock);
const configCount = this.openApiToken.configs.length;
const configCount = (await this.openApiToken.configs()).length;
const chunkSize = Math.ceil(stocks.length / configCount);

for (let i = 0; i < configCount; i++) {
this.logger.info(this.openApiToken.configs[i]);
this.logger.info((await this.openApiToken.configs())[i]);
const chunk = stocks.slice(i * chunkSize, (i + 1) * chunkSize);
this.getDetailDataChunk(chunk, this.openApiToken.configs[i]);
this.getDetailDataChunk(chunk, (await this.openApiToken.configs())[i]);
}
}

Expand Down
52 changes: 50 additions & 2 deletions packages/backend/src/scraper/openapi/api/openapiLiveData.api.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import { Inject, Injectable } from '@nestjs/common';
import { DataSource } from 'typeorm';
import { Logger } from 'winston';
import { openApiConfig } from '../config/openapi.config';
import { isOpenapiLiveData } from '../type/openapiLiveData.type';
import { TR_IDS } from '../type/openapiUtil.type';
import { getOpenApi } from '../util/openapiUtil.api';
import { Stock } from '@/stock/domain/stock.entity';
import { StockLiveData } from '@/stock/domain/stockLiveData.entity';

@Injectable()
export class OpenapiLiveData {
public readonly TR_ID: string = 'H0STCNT0';
private readonly url: string =
'/uapi/domestic-stock/v1/quotations/inquire-ccnl';
constructor(
private readonly datasource: DataSource,
@Inject('winston') private readonly logger: Logger,
Expand Down Expand Up @@ -37,6 +42,26 @@ export class OpenapiLiveData {
}
}

// 현재가 체결로는 데이터가 부족해 현재가 시세를 사용함.
convertResponseToStockLiveData = (
data: OpenapiLiveData,
stockId: string,
): StockLiveData | undefined => {
const stockLiveData = new StockLiveData();
if (isOpenapiLiveData(data)) {
stockLiveData.stock = { id: stockId } as Stock;
stockLiveData.currentPrice = parseFloat(data.stck_prpr);
stockLiveData.changeRate = parseFloat(data.prdy_ctrt);
stockLiveData.volume = parseInt(data.acml_vol);
stockLiveData.high = parseFloat(data.stck_hgpr);
stockLiveData.low = parseFloat(data.stck_lwpr);
stockLiveData.open = parseFloat(data.stck_oprc);
stockLiveData.updatedAt = new Date();

return stockLiveData;
}
};

convertLiveData(messages: Record<string, string>[]): StockLiveData[] {
const stockData: StockLiveData[] = [];
messages.map((message) => {
Expand All @@ -48,10 +73,33 @@ export class OpenapiLiveData {
stockLiveData.high = parseFloat(message.STCK_HGPR);
stockLiveData.low = parseFloat(message.STCK_LWPR);
stockLiveData.open = parseFloat(message.STCK_OPRC);
stockLiveData.previousClose = parseFloat(message.WGHN_AVRG_STCK_PRC);
stockLiveData.updatedAt = new Date();

stockData.push(stockLiveData);
});
return stockData;
}

async connectLiveData(stockId: string, config: typeof openApiConfig) {
const query = this.makeLiveDataQuery(stockId);

try {
const result = await getOpenApi(
this.url,
config,
query,
TR_IDS.LIVE_DATA,
);
return result;
} catch (error) {
this.logger.warn(`Connect live data error : ${error}`);
}
}

private makeLiveDataQuery(stockId: string, code: 'J' = 'J') {
return {
fid_cond_mrkt_div_code: code,
fid_input_iscd: stockId,
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,15 @@ export class OpenapiMinuteData {
}

//@Cron(`*/${STOCK_CUT} 9-15 * * 1-5`)
getMinuteData() {
async getMinuteData() {
if (process.env.NODE_ENV !== 'production') return;
const configCount = this.openApiToken.configs.length;
const configCount = (await this.openApiToken.configs()).length;
const stock = this.stock[this.flip % STOCK_CUT];
this.flip++;
const chunkSize = Math.ceil(stock.length / configCount);
for (let i = 0; i < configCount; i++) {
const chunk = stock.slice(i * chunkSize, (i + 1) * chunkSize);
this.getMinuteDataChunk(chunk, this.openApiToken.configs[i]);
this.getMinuteDataChunk(chunk, (await this.openApiToken.configs())[i]);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ export class OpenapiPeriodData {
let isFail = false;

while (!isFail) {
configIdx = (configIdx + 1) % this.openApiToken.configs.length;
configIdx = (configIdx + 1) % (await this.openApiToken.configs()).length;
this.setStockPeriod(stockPeriod, stock.id!, end);

// chart 데이터가 있는 지 확인 -> 리턴
Expand Down Expand Up @@ -130,7 +130,7 @@ export class OpenapiPeriodData {
try {
const response = await getOpenApi(
this.url,
this.openApiToken.configs[configIdx],
(await this.openApiToken.configs())[configIdx],
query,
TR_IDS.ITEM_CHART_PRICE,
);
Expand Down
22 changes: 11 additions & 11 deletions packages/backend/src/scraper/openapi/api/openapiToken.api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ export class OpenapiTokenApi {
@Inject('winston') private readonly logger: Logger,
private readonly datasource: DataSource,
) {
if (process.env.NODE_ENV !== 'production') return;
const accounts = openApiConfig.STOCK_ACCOUNT!.split(',');
const api_keys = openApiConfig.STOCK_API_KEY!.split(',');
const api_passwords = openApiConfig.STOCK_API_PASSWORD!.split(',');
Expand All @@ -34,17 +33,16 @@ export class OpenapiTokenApi {
STOCK_API_PASSWORD: api_passwords[i],
});
}
this.init();
}

get configs() {
this.init();
async configs() {
await this.init();
return this.config;
}

@Cron('30 0 * * 1-5')
async init() {
const tokens = await this.convertConfigToTokenEntity(this.config);
const tokens = this.convertConfigToTokenEntity(this.config);
const config = await this.getPropertyFromDB(tokens);
const expired = config.filter(
(val) =>
Expand All @@ -54,24 +52,24 @@ export class OpenapiTokenApi {

if (expired.length || !config.length) {
await this.initAuthenValue();
const newTokens = await this.convertConfigToTokenEntity(this.config);
this.savePropertyToDB(newTokens);
const newTokens = this.convertConfigToTokenEntity(this.config);
await this.savePropertyToDB(newTokens);
} else {
this.config = await this.convertTokenEntityToConfig(config);
this.config = this.convertTokenEntityToConfig(config);
}
}

private isTokenExpired(startDate?: Date) {
if (!startDate) return true;
const now = new Date();
//실제 만료 시간은 24시간이지만, 문제의 소지가 발생하는 것을 방지하기 위해 20시간으로 설정함.
//실제 만료 시간은 24시간이지만, 문제가 발생할 여지를 줄이기 위해 20시간으로 설정
const baseTimeToMilliSec = 20 * 60 * 60 * 1000;
const timeDiff = now.getTime() - startDate.getTime();

return timeDiff >= baseTimeToMilliSec;
}

private async convertTokenEntityToConfig(tokens: OpenapiToken[]) {
private convertTokenEntityToConfig(tokens: OpenapiToken[]) {
const result: (typeof openApiConfig)[] = [];
tokens.forEach((val) => {
const config: typeof openApiConfig = {
Expand All @@ -87,7 +85,7 @@ export class OpenapiTokenApi {
return result;
}

private async convertConfigToTokenEntity(config: (typeof openApiConfig)[]) {
private convertConfigToTokenEntity(config: (typeof openApiConfig)[]) {
const result: OpenapiToken[] = [];
config.forEach((val) => {
const token = new OpenapiToken();
Expand Down Expand Up @@ -171,6 +169,7 @@ export class OpenapiTokenApi {
}),
);
this.config = updatedConfig;
this.logger.info(`Init access token : ${this.config}`);
}

private async initWebSocketKey() {
Expand All @@ -181,6 +180,7 @@ export class OpenapiTokenApi {
}),
);
this.config = updatedConfig;
this.logger.info(`Init websocket token : ${this.config}`);
}

private async getToken(config: typeof openApiConfig): Promise<string> {
Expand Down
172 changes: 172 additions & 0 deletions packages/backend/src/scraper/openapi/liveData.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
import { Inject, Injectable } from '@nestjs/common';
import { Cron } from '@nestjs/schedule';
import { Logger } from 'winston';
import { RawData, WebSocket } from 'ws';
import { OpenapiLiveData } from './api/openapiLiveData.api';
import { OpenapiTokenApi } from './api/openapiToken.api';
import { openApiConfig } from './config/openapi.config';
import { parseMessage } from './parse/openapi.parser';
import { WebsocketClient } from './websocket/websocketClient.websocket';

type TR_IDS = '0' | '1';

@Injectable()
export class LiveData {
private readonly clientStock: Set<string> = new Set();
private readonly reconnectInterval = 60 * 1000 * 1000;

private readonly startTime: Date = new Date(2024, 0, 1, 9, 0, 0, 0);
private readonly endTime: Date = new Date(2024, 0, 1, 15, 30, 0, 0);
constructor(
private readonly openApiToken: OpenapiTokenApi,
private readonly webSocketClient: WebsocketClient,
private readonly openapiLiveData: OpenapiLiveData,
@Inject('winston') private readonly logger: Logger,
) {
this.connect();
this.subscribe('000020');
}

private async openapiSubscribe(stockId: string) {
const config = (await this.openApiToken.configs())[0];
const result = await this.openapiLiveData.connectLiveData(stockId, config);
this.logger.info(JSON.stringify(result));
try {
const stockLiveData = this.openapiLiveData.convertResponseToStockLiveData(
result.output,
stockId,
);
if (stockLiveData) {
this.openapiLiveData.saveLiveData([stockLiveData]);
}
} catch (error) {
this.logger.warn(`Subscribe error in open api : ${error}`);
}
}

async subscribe(stockId: string) {
if (this.isCloseTime(new Date(), this.startTime, this.endTime)) {
await this.openapiSubscribe(stockId);
} else {
// TODO : 하나의 config만 사용중.
this.clientStock.add(stockId);
const message = this.convertObjectToMessage(
(await this.openApiToken.configs())[0],
stockId,
'1',
);
this.webSocketClient.subscribe(message);
}
}

async discribe(stockId: string) {
if (this.clientStock.has(stockId)) {
this.clientStock.delete(stockId);
const message = this.convertObjectToMessage(
(await this.openApiToken.configs())[0],
stockId,
'0',
);
this.webSocketClient.discribe(message);
}
}

private initOpenCallback =
(sendMessage: (message: string) => void) => async () => {
this.logger.info('WebSocket connection established');
for (const stockId of this.clientStock.keys()) {
const message = this.convertObjectToMessage(
(await this.openApiToken.configs())[0],
stockId,
'1',
);
sendMessage(message);
}
};

private initMessageCallback =
(client: WebSocket) => async (data: RawData) => {
try {
const message = this.parseMessage(data);
if (message.header) {
if (message.header.tr_id === 'PINGPONG') {
this.logger.info(`Received PING: ${data}`);
client.pong(data);
}
return;
}
this.logger.info(`Recived data : ${data}`);
this.logger.info(`Stock id : ${message[0]['STOCK_ID']}`);
const liveData = this.openapiLiveData.convertLiveData(message);
await this.openapiLiveData.saveLiveData(liveData);
} catch (error) {
this.logger.warn(error);
}
};

private initCloseCallback = () => {
this.logger.warn(
`WebSocket connection closed. Reconnecting in ${this.reconnectInterval / 60 / 1000} minute...`,
);
};

private initErrorCallback = (error: unknown) => {
if (error instanceof Error) {
this.logger.error(`WebSocket error: ${error.message}`);
} else {
this.logger.error('WebSocket error: callback function');
}
setTimeout(() => this.connect(), this.reconnectInterval);
};

private isCloseTime(date: Date, start: Date, end: Date): boolean {
const dateMinutes = date.getHours() * 60 + date.getMinutes();
const startMinutes = start.getHours() * 60 + start.getMinutes();
const endMinutes = end.getHours() * 60 + end.getMinutes();

return dateMinutes <= startMinutes || dateMinutes >= endMinutes;
}

@Cron('0 2 * * 1-5')
connect() {
this.webSocketClient.connectPacade(
this.initOpenCallback,
this.initMessageCallback,
this.initCloseCallback,
this.initErrorCallback,
);
}

private convertObjectToMessage(
config: typeof openApiConfig,
stockId: string,
tr_type: TR_IDS,
): string {
this.logger.info(JSON.stringify(config));
const message = {
header: {
approval_key: config.STOCK_WEBSOCKET_KEY!,
custtype: 'P',
tr_type,
'content-type': 'utf-8',
},
body: {
input: {
tr_id: 'H0STCNT0',
tr_key: stockId,
},
},
};
return JSON.stringify(message);
}

private parseMessage(data: RawData) {
if (typeof data === 'object' && !(data instanceof Buffer)) {
return data;
} else if (typeof data === 'object') {
return parseMessage(data.toString());
} else {
return parseMessage(data as string);
}
}
}
Loading

0 comments on commit eafa2b3

Please sign in to comment.