Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[backend] Ensure log lines would be efficiently streamed to log visualization tools (#4199) #9553

Merged
merged 2 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions opencti-platform/opencti-graphql/src/boot.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { environment, getStoppingState, logApp, setStoppingState } from './confi
import platformInit, { checkFeatureFlags, checkSystemDependencies } from './initialization';
import cacheManager from './manager/cacheManager';
import { shutdownRedisClients } from './database/redis';
import { UnknownError } from './config/errors';
import { shutdownModules, startModules } from './managers';

// region platform start and stop
Expand Down Expand Up @@ -39,7 +38,6 @@ export const platformStart = async () => {
throw modulesError;
}
} catch (mainError) {
logApp.error(mainError);
process.exit(1);
}
};
Expand All @@ -58,7 +56,7 @@ export const platformStop = async () => {

// region signals management
process.on('unhandledRejection', (reason, p) => {
logApp.error(UnknownError('Engine unhandled rejection', { reason: reason?.stack, promise: p?.stack }));
logApp.error('[OPENCTI] Engine unhandled rejection', { reason: reason?.stack, promise: p?.stack });
});

['SIGTERM', 'SIGINT', 'message'].forEach((signal) => {
Expand All @@ -71,7 +69,7 @@ process.on('unhandledRejection', (reason, p) => {
await platformStop();
process.exit(0);
} catch (e) {
logApp.error(e);
logApp.error('[OPENCTI] Error stopping the platform', { cause: e });
process.exit(1);
}
}
Expand Down
36 changes: 33 additions & 3 deletions opencti-platform/opencti-graphql/src/config/conf.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,41 @@ nconf.file('default', resolveEnvFile('default'));
const appLogLevel = nconf.get('app:app_logs:logs_level');
const appLogFileTransport = booleanConf('app:app_logs:logs_files', true);
const appLogConsoleTransport = booleanConf('app:app_logs:logs_console', true);
export const appLogLevelMaxArraySize = nconf.get('app:app_logs:max_array_size') ?? 50;
export const appLogLevelMaxStringSize = nconf.get('app:app_logs:max_string_size') ?? 5000;
export const appLogExtendedErrors = booleanConf('app:app_logs:extended_error_message', false);
export const extendedErrors = (metaExtension) => {
if (appLogExtendedErrors) {
return metaExtension;
}
return {};
};
export const limitMetaErrorComplexity = (obj) => {
if (Array.isArray(obj)) {
// Create a new array with a limited size
const limitedArray = obj.slice(0, appLogLevelMaxArraySize);
// Recursively process each item in the truncated array
const processedArray = [];
for (let i = 0; i < limitedArray.length; i += 1) {
processedArray[i] = limitMetaErrorComplexity(limitedArray[i]);
}
return processedArray;
}
if (typeof obj === 'string' && obj.length > appLogLevelMaxStringSize) {
return `${obj.substring(0, appLogLevelMaxStringSize - 3)}...`;
}
if (obj !== null && typeof obj === 'object') {
// Create a new object to hold the processed properties
const limitedObject = {};
const keys = Object.keys(obj); // Get the keys of the object
for (let i = 0; i < keys.length; i += 1) {
const key = keys[i];
limitedObject[key] = limitMetaErrorComplexity(obj[key]);
}
return limitedObject;
}
return obj;
};

const appLogTransports = [];
const logsDirname = nconf.get('app:app_logs:logs_directory');
Expand Down Expand Up @@ -196,7 +224,6 @@ const buildMetaErrors = (error) => {
if (error instanceof GraphQLError) {
const extensions = error.extensions ?? {};
const extensionsData = extensions.data ?? {};
// const attributes = R.dissoc('cause', extensionsData);
const { cause: _, ...attributes } = extensionsData;
const baseError = { name: extensions.code ?? error.name, message: error.message, stack: error.stack, attributes };
errors.push(baseError);
Expand All @@ -223,8 +250,11 @@ export const logS3Debug = {

export const logApp = {
_log: (level, message, error, meta = {}) => {
if (appLogTransports.length > 0) {
appLogger.log(level, message, addBasicMetaInformation(LOG_APP, error, { ...meta, source: 'backend' }));
if (appLogTransports.length > 0 && appLogger.isLevelEnabled(level)) {
const data = addBasicMetaInformation(LOG_APP, error, { ...meta, source: 'backend' });
// Prevent meta information to be too massive.
const limitedData = limitMetaErrorComplexity(data);
appLogger.log(level, message, limitedData);
}
},
_logWithError: (level, messageOrError, meta = {}) => {
Expand Down
4 changes: 2 additions & 2 deletions opencti-platform/opencti-graphql/src/config/providers.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { OAuth2Strategy as GoogleStrategy } from 'passport-google-oauth';
import validator from 'validator';
import { findById, HEADERS_AUTHENTICATORS, initAdmin, login, loginFromProvider, userDelete } from '../domain/user';
import conf, { getPlatformHttpProxyAgent, logApp } from './conf';
import { AuthenticationFailure, ConfigurationError, UnsupportedError } from './errors';
import { AuthenticationFailure, ConfigurationError } from './errors';
import { isEmptyField, isNotEmptyField } from '../database/utils';
import { DEFAULT_INVALID_CONF_VALUE, SYSTEM_USER } from '../utils/access';
import { enrichWithRemoteCredentials } from './credentials';
Expand Down Expand Up @@ -402,7 +402,7 @@ for (let i = 0; i < providerKeys.length; i += 1) {
passport.use(providerRef, openIDStrategy);
providers.push({ name: providerName, type: AUTH_SSO, strategy, provider: providerRef });
}).catch((err) => {
logApp.error(UnsupportedError('[OPENID] Error initializing authentication provider', { cause: err, provider: providerRef }));
logApp.error('[OPENID] Error initializing authentication provider', { cause: err, provider: providerRef });
});
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { OPENCTI_SYSTEM_UUID } from '../../schema/general';
import { resolveUserByIdFromCache } from '../../domain/user';
import { parseCsvMapper, sanitized, validateCsvMapper } from '../../modules/internal/csvMapper/csvMapper-utils';
import { IMPORT_CSV_CONNECTOR } from './importCsv';
import { DatabaseError, FunctionalError } from '../../config/errors';
import { FunctionalError } from '../../config/errors';
import { uploadToStorage } from '../../database/file-storage-helper';
import { storeLoadByIdWithRefs } from '../../database/middleware';
import type { ConnectorConfig } from '../internalConnector';
Expand Down Expand Up @@ -46,7 +46,7 @@ const processCSVforWorkbench = async (context: AuthContext, fileId: string, opts
hasError = true;
const errorData = { error: error.message, source: fileId };
await reportExpectation(context, applicantUser, workId, errorData);
logApp.error(error);
logApp.error(`${LOG_PREFIX} Error streaming the CSV data`, { error });
}).on('end', async () => {
if (!hasError) {
// it's fine to use deprecated bundleProcess since this whole method is also deprecated for drafts.
Expand Down Expand Up @@ -132,15 +132,14 @@ export const processCSVforWorkers = async (context: AuthContext, fileId: string,
totalBundlesCount += bundleCount;
} catch (error: any) {
const errorData = { error: error.message, source: `${fileId}, from ${lineNumber} and ${BULK_LINE_PARSING_NUMBER} following lines.` };
logApp.error(error, { errorData });
logApp.error(`${LOG_PREFIX} CSV line parsing error`, { error: errorData });
await reportExpectation(context, applicantUser, workId, errorData);
}
}
} catch (error: any) {
logApp.error(error);
logApp.error(`${LOG_PREFIX} CSV global parsing error`, { error });
const errorData = { error: error.message, source: fileId };
await reportExpectation(context, applicantUser, workId, errorData);

// circuit breaker
hasMoreBulk = false;
} finally {
Expand Down Expand Up @@ -197,7 +196,6 @@ const consumeQueueCallback = async (context: AuthContext, message: string) => {
entity,
connectorId: connector.internal_id
};

await updateReceivedTime(context, applicantUser, workId, 'Connector ready to process the operation');
const validateBeforeImport = connectorConfig.config.validate_before_import;
if (validateBeforeImport) {
Expand All @@ -206,8 +204,8 @@ const consumeQueueCallback = async (context: AuthContext, message: string) => {
await processCSVforWorkers(context, fileId, opts);
}
} catch (error: any) {
logApp.error(`${LOG_PREFIX} CSV global parsing error`, { error, source: fileId });
const errorData = { error: error.stack, source: fileId };
logApp.error(error, { context, errorData });
await reportExpectation(context, applicantUser, workId, errorData);
}
};
Expand All @@ -227,7 +225,7 @@ export const initImportCsvConnector = () => {
try {
rabbitMqConnection.close();
} catch (e) {
logApp.error(DatabaseError(`${LOG_PREFIX} Closing RabbitMQ connection failed`, { cause: e }));
logApp.error(`${LOG_PREFIX} Closing RabbitMQ connection failed`, { cause: e });
}
}
// TODO REMOVE TYPING, don't know why it's not working
Expand Down
11 changes: 6 additions & 5 deletions opencti-platform/opencti-graphql/src/database/engine.js
Original file line number Diff line number Diff line change
Expand Up @@ -1079,7 +1079,7 @@ export const elConfigureAttachmentProcessor = async () => {
}
]
}).catch((e) => {
logApp.error(ConfigurationError('Engine attachment processor configuration fail', { cause: e }));
logApp.error('Engine attachment processor configuration fail', { cause: e });
success = false;
});
} else {
Expand All @@ -1101,7 +1101,7 @@ export const elConfigureAttachmentProcessor = async () => {
]
}
}).catch((e) => {
logApp.error(ConfigurationError('Engine attachment processor configuration fail', { cause: e }));
logApp.error('Engine attachment processor configuration fail', { cause: e });
success = false;
});
}
Expand Down Expand Up @@ -1154,7 +1154,7 @@ export const elDeleteIndices = async (indexesToDelete) => {
.catch((err) => {
/* v8 ignore next */
if (err.meta.body && err.meta.body.error.type !== 'index_not_found_exception') {
logApp.error(DatabaseError('Indices deletion fail', { cause: err }));
logApp.error('Indices deletion fail', { cause: err });
}
});
})
Expand Down Expand Up @@ -3737,7 +3737,9 @@ export const elDeleteElements = async (context, user, elements, opts = {}) => {
const { relations, relationsToRemoveMap } = await getRelationsToRemove(context, SYSTEM_USER, elements);
// User must have access to all relations to remove to be able to delete
const filteredRelations = await userFilterStoreElements(context, user, relations);
if (relations.length !== filteredRelations.length) throw FunctionalError('Cannot delete element: cannot access all related relations');
if (relations.length !== filteredRelations.length) {
throw FunctionalError('Cannot delete element: cannot access all related relations');
}
relations.forEach((instance) => controlUserConfidenceAgainstElement(user, instance));
relations.forEach((instance) => controlUserRestrictDeleteAgainstElement(user, instance));
// Compute the id that needs to be removed from rel
Expand Down Expand Up @@ -3765,7 +3767,6 @@ export const elDeleteElements = async (context, user, elements, opts = {}) => {
const ids = idsByIndex.get(sourceIndex);
reindexPromises.push(elReindexElements(context, user, ids, sourceIndex, INDEX_DELETED_OBJECTS));
});

await Promise.all(reindexPromises);
await createDeleteOperationElement(context, user, elements[0], entitiesToDelete);
}
Expand Down
6 changes: 3 additions & 3 deletions opencti-platform/opencti-graphql/src/database/file-storage.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { CopyObjectCommand } from '@aws-sdk/client-s3';
import nconf from 'nconf';
import conf, { booleanConf, ENABLED_FILE_INDEX_MANAGER, isFeatureEnabled, logApp, logS3Debug } from '../config/conf';
import { now, sinceNowInMinutes, truncate, utcDate } from '../utils/format';
import { DatabaseError, FunctionalError, UnsupportedError } from '../config/errors';
import { FunctionalError, UnsupportedError } from '../config/errors';
import { createWork, deleteWorkForFile } from '../domain/work';
import { isNotEmptyField, READ_DATA_INDICES, READ_INDEX_DELETED_OBJECTS } from './utils';
import { connectorsForImport } from './repository';
Expand Down Expand Up @@ -141,7 +141,7 @@ export const deleteFile = async (context, user, id) => {
logApp.debug(`[FILE STORAGE] delete file ${id} in index`);
await elDeleteFilesByIds([id])
.catch((err) => {
logApp.error(err);
logApp.error('[FILE STORAGE] Error deleting file', { cause: err });
});
}
return up;
Expand Down Expand Up @@ -403,7 +403,7 @@ export const loadedFilesListing = async (context, user, directory, opts = {}) =>
requestParams.ContinuationToken = response.NextContinuationToken;
}
} catch (err) {
logApp.error(DatabaseError('[FILE STORAGE] Storage files read fail', { cause: err }));
logApp.error('[FILE STORAGE] Storage files read fail', { cause: err });
truncated = false;
}
}
Expand Down
11 changes: 5 additions & 6 deletions opencti-platform/opencti-graphql/src/database/middleware.js
Original file line number Diff line number Diff line change
Expand Up @@ -2240,7 +2240,7 @@ export const updateAttributeFromLoadedWithRefs = async (context, user, initial,
export const updateAttribute = async (context, user, id, type, inputs, opts = {}) => {
const initial = await storeLoadByIdWithRefs(context, user, id, { ...opts, type });
if (!initial) {
throw FunctionalError(`Cant find element to update ${id} (${type})`, { id, type });
throw FunctionalError('Cant find element to update', { id, type });
}
// Validate input attributes
const entitySetting = await getEntitySettingFromCache(context, initial.entity_type);
Expand Down Expand Up @@ -3168,9 +3168,9 @@ export const internalDeleteElementById = async (context, user, id, opts = {}) =>
if (!element) {
throw AlreadyDeletedError({ id });
}

if (element._index.includes(INDEX_DRAFT_OBJECTS)) return draftInternalDeleteElement(context, user, element);

if (element._index.includes(INDEX_DRAFT_OBJECTS)) {
return draftInternalDeleteElement(context, user, element);
}
// region confidence control
controlUserConfidenceAgainstElement(user, element);
// region restrict delete control
Expand Down Expand Up @@ -3248,7 +3248,6 @@ export const internalDeleteElementById = async (context, user, id, opts = {}) =>
// if trash is disabled globally or for this element, delete permanently
await deleteAllObjectFiles(context, user, element);
}

// Delete all linked elements
await elDeleteElements(context, user, [element], { forceDelete });
// Publish event in the stream
Expand Down Expand Up @@ -3325,7 +3324,7 @@ export const deleteInferredRuleElement = async (rule, instance, deletedDependenc
if (err.name === ALREADY_DELETED_ERROR) {
logApp.info(err);
} else {
logApp.error(err);
logApp.error('Error handling inference', { cause: err });
}
}
return false;
Expand Down
4 changes: 2 additions & 2 deletions opencti-platform/opencti-graphql/src/database/migration.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ const migrationStorage = {
logApp.info(`[MIGRATION] Saving current configuration, ${mig.title}`);
return fn();
} catch (err) {
logApp.error(err);
logApp.error('Error handling migration', { cause: err });
return fn();
}
},
Expand Down Expand Up @@ -124,7 +124,7 @@ export const applyMigration = (context) => {
// Start the set migration
set.up((migrationError) => {
if (migrationError) {
logApp.error(migrationError);
logApp.error('Migration up error', { cause: migrationError });
reject(migrationError);
return;
}
Expand Down
6 changes: 3 additions & 3 deletions opencti-platform/opencti-graphql/src/database/rabbitmq.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ export const getConnectorQueueDetails = async (connectorId) => {
messages_size: queueDetailResponse.message_bytes || 0
};
} catch (e) {
logApp.error(e, { connectorId });
logApp.error('Get connector queue details fail', { cause: e, connectorId });
return {
messages_number: 0,
messages_size: 0
Expand Down Expand Up @@ -357,10 +357,10 @@ export const consumeQueue = async (context, connectorId, connectionSetterCallbac
}
}, { noAck: true }, (consumeError) => {
if (consumeError) {
logApp.error(DatabaseError('[QUEUEING] Consumption fail', {
logApp.error('[QUEUEING] Consumption fail', {
connectorId,
cause: consumeError
}));
});
}
});
}
Expand Down
4 changes: 2 additions & 2 deletions opencti-platform/opencti-graphql/src/database/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ export const createRedisClient = async (provider: string, autoReconnect = false)

client.on('close', () => logApp.info(`[REDIS] Redis '${provider}' client closed`));
client.on('ready', () => logApp.info(`[REDIS] Redis '${provider}' client ready`));
client.on('error', (err) => logApp.error(DatabaseError('Redis client connection fail', { cause: err, provider })));
client.on('error', (err) => logApp.error('Redis client connection fail', { cause: err, provider }));
client.on('reconnecting', () => logApp.info(`[REDIS] '${provider}' Redis client reconnecting`));
return client;
};
Expand Down Expand Up @@ -705,7 +705,7 @@ export const createStreamProcessor = <T extends BaseEvent> (
await processStreamResult([], callback, opts.withInternal);
}
} catch (err) {
logApp.error(DatabaseError('Redis stream consume fail', { cause: err, provider }));
logApp.error('Redis stream consume fail', { cause: err, provider });
if (opts.autoReconnect) {
await waitInSec(2);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export const checkRetentionRule = async (context, input) => {
} else if (scope === 'workbench') {
result = await paginatedForPathWithEnrichment(context, RETENTION_MANAGER_USER, 'import/pending', undefined, { notModifiedSince: before.toISOString() });
} else {
logApp.error(`[Retention manager] Scope ${scope} not existing for Retention Rule.`);
logApp.error('[Retention manager] Scope not existing for Retention Rule.', { scope });
}
if (scope === 'file' || scope === 'workbench') { // don't delete progress files or files with works in progress
result.edges = result.edges.filter((e) => DELETABLE_FILE_STATUSES.includes(e.node.uploadStatus)
Expand Down
2 changes: 1 addition & 1 deletion opencti-platform/opencti-graphql/src/domain/user.js
Original file line number Diff line number Diff line change
Expand Up @@ -1579,7 +1579,7 @@ export const authenticateUserFromRequest = async (context, req, res, isSessionRe
return await authenticateUser(context, req, user, loginProvider, opts);
}
} catch (err) {
logApp.error(err);
logApp.error('Error resolving user by token', { cause: err });
}
}
// If user still not identified, try headers authentication
Expand Down
2 changes: 0 additions & 2 deletions opencti-platform/opencti-graphql/src/domain/work.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import { addFilter } from '../utils/filtering/filtering-utils';
import { IMPORT_CSV_CONNECTOR, IMPORT_CSV_CONNECTOR_ID } from '../connector/importCsv/importCsv';
import { RELATION_OBJECT_MARKING } from '../schema/stixRefRelationship';
import { DRAFT_VALIDATION_CONNECTOR, DRAFT_VALIDATION_CONNECTOR_ID } from '../modules/draftWorkspace/draftWorkspace-connector';
import { logApp } from '../config/conf';

export const workToExportFile = (work) => {
const lastModifiedSinceMin = sinceNowInMinutes(work.updated_at);
Expand Down Expand Up @@ -221,7 +220,6 @@ export const reportExpectation = async (context, user, workId, errorData) => {
sourceScript += 'if (ctx._source.errors.length < 100) { ctx._source.errors.add(["timestamp": params.now, "message": params.error, "source": params.source]); }';
params.source = source;
params.error = error;
logApp.error(error, { errorData });
}
// Update elastic
const currentWork = await loadWorkById(context, user, workId);
Expand Down
Loading
Loading