Skip to content

Commit

Permalink
fix: use concurrency limit function
Browse files Browse the repository at this point in the history
  • Loading branch information
mcarvin8 committed Jan 21, 2025
1 parent 1cb7cae commit a8e53b8
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 125 deletions.
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ export { parseXML } from "./service/parseXML";
export { buildXMLString } from "./service/buildXMLString";
export { XmlElement } from "./helpers/types";
export { getConcurrencyThreshold } from "./service/getConcurrencyThreshold";
export { withConcurrencyLimit } from "./service/withConcurrencyLimit";

// Function to update the log level
export function setLogLevel(level: string) {
Expand Down
90 changes: 40 additions & 50 deletions src/service/buildDisassembledFiles.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { buildRootElementHeader } from "@src/service/buildRootElementHeader";
import { buildLeafFile } from "@src/service/buildLeafFile";
import { parseXML } from "@src/service/parseXML";
import { getConcurrencyThreshold } from "./getConcurrencyThreshold";
import { withConcurrencyLimit } from "./withConcurrencyLimit";

export async function buildDisassembledFiles(
filePath: string,
Expand Down Expand Up @@ -37,19 +38,38 @@ export async function buildDisassembledFiles(
);

const concurrencyLimit = getConcurrencyThreshold();
const activePromises: Promise<void>[] = [];
let currentIndex = 0;

const processChildKey = async (key: string) => {
if (Array.isArray(rootElement[key])) {
await Promise.all(
(rootElement[key] as XmlElement[]).map(async (element) => {
const [
updatedLeafContent,
updatedLeafCount,
updatedHasNestedElements,
] = await processElement({
element,
// Create tasks for processing child keys
const tasks: (() => Promise<void>)[] = childKeys.map((key) => {
return async () => {
if (Array.isArray(rootElement[key])) {
await Promise.all(
(rootElement[key] as XmlElement[]).map(async (element) => {
const [
updatedLeafContent,
updatedLeafCount,
updatedHasNestedElements,
] = await processElement({
element,
disassembledPath,
uniqueIdElements,
rootElementName,
rootElementHeader,
key,
indent,
leafContent: "",
leafCount: 0,
hasNestedElements: false,
});
leafContent += updatedLeafContent;
leafCount += updatedLeafCount;
hasNestedElements = hasNestedElements || updatedHasNestedElements;
}),
);
} else {
const [updatedLeafContent, updatedLeafCount, updatedHasNestedElements] =
await processElement({
element: rootElement[key] as XmlElement,
disassembledPath,
uniqueIdElements,
rootElementName,
Expand All @@ -60,45 +80,15 @@ export async function buildDisassembledFiles(
leafCount: 0,
hasNestedElements: false,
});
leafContent += updatedLeafContent;
leafCount += updatedLeafCount;
hasNestedElements = hasNestedElements || updatedHasNestedElements;
}),
);
} else {
const [updatedLeafContent, updatedLeafCount, updatedHasNestedElements] =
await processElement({
element: rootElement[key] as XmlElement,
disassembledPath,
uniqueIdElements,
rootElementName,
rootElementHeader,
key,
indent,
leafContent: "",
leafCount: 0,
hasNestedElements: false,
});
leafContent += updatedLeafContent;
leafCount += updatedLeafCount;
hasNestedElements = hasNestedElements || updatedHasNestedElements;
}
};
leafContent += updatedLeafContent;
leafCount += updatedLeafCount;
hasNestedElements = hasNestedElements || updatedHasNestedElements;
}
};
});

while (currentIndex < childKeys.length || activePromises.length > 0) {
if (
currentIndex < childKeys.length &&
activePromises.length < concurrencyLimit
) {
const key = childKeys[currentIndex++];
const promise = processChildKey(key).finally(() => {
activePromises.splice(activePromises.indexOf(promise), 1);
});
activePromises.push(promise);
} else {
await Promise.race(activePromises);
}
}
// Execute tasks with concurrency limit
await withConcurrencyLimit(tasks, concurrencyLimit);

if (!hasNestedElements) {
logger.error(
Expand Down
59 changes: 24 additions & 35 deletions src/service/disassembleXMLFileHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { logger } from "@src/index";
import { INDENT } from "@src/helpers/constants";
import { buildDisassembledFiles } from "@src/service/buildDisassembledFiles";
import { getConcurrencyThreshold } from "./getConcurrencyThreshold";
import { withConcurrencyLimit } from "./withConcurrencyLimit";

export class DisassembleXMLFileHandler {
private readonly ign: Ignore = ignore();
Expand All @@ -29,6 +30,7 @@ export class DisassembleXMLFileHandler {
ignorePath = ".xmldisassemblerignore",
} = xmlAttributes;
const resolvedIgnorePath = resolve(ignorePath);

if (existsSync(resolvedIgnorePath)) {
const content = await readFile(resolvedIgnorePath);
this.ign.add(content.toString());
Expand Down Expand Up @@ -59,48 +61,35 @@ export class DisassembleXMLFileHandler {
});
} else if (fileStat.isDirectory()) {
const subFiles = await readdir(filePath);

const concurrencyLimit = getConcurrencyThreshold();
const activePromises: Promise<void>[] = [];
let currentIndex = 0;

// Function to process a single file
const processSubFile = async (subFile: string) => {
// Create tasks for all subfiles
const tasks: (() => Promise<void>)[] = subFiles.map((subFile) => {
const subFilePath = join(filePath, subFile);
const relativeSubFilePath = this.posixPath(
relative(process.cwd(), subFilePath),
);

if (
subFilePath.endsWith(".xml") &&
!this.ign.ignores(relativeSubFilePath)
) {
await this.processFile({
dirPath: filePath,
filePath: subFilePath,
uniqueIdElements,
prePurge,
postPurge,
});
} else if (this.ign.ignores(relativeSubFilePath)) {
logger.warn(`File ignored by ${ignorePath}: ${subFilePath}`);
}
};

while (currentIndex < subFiles.length || activePromises.length > 0) {
if (
currentIndex < subFiles.length &&
activePromises.length < concurrencyLimit
) {
const subFile = subFiles[currentIndex++];
const promise = processSubFile(subFile).finally(() => {
activePromises.splice(activePromises.indexOf(promise), 1);
});
activePromises.push(promise);
} else {
await Promise.race(activePromises); // Wait for any promise to resolve
}
}
return async () => {
if (
subFilePath.endsWith(".xml") &&
!this.ign.ignores(relativeSubFilePath)
) {
await this.processFile({
dirPath: filePath,
filePath: subFilePath,
uniqueIdElements,
prePurge,
postPurge,
});
} else if (this.ign.ignores(relativeSubFilePath)) {
logger.warn(`File ignored by ${ignorePath}: ${subFilePath}`);
}
};
});

// Run tasks with concurrency limit
await withConcurrencyLimit(tasks, concurrencyLimit);
}
}

Expand Down
70 changes: 30 additions & 40 deletions src/service/reassembleXMLFileHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { buildXMLString } from "@src/service/buildXMLString";
import { parseXML } from "@src/service/parseXML";
import { processFilesForRootElement } from "@src/service/processFilesForRootElement";
import { getConcurrencyThreshold } from "./getConcurrencyThreshold";
import { withConcurrencyLimit } from "./withConcurrencyLimit";

export class ReassembleXMLFileHandler {
async processFilesInDirectory(
Expand All @@ -27,46 +28,34 @@ export class ReassembleXMLFileHandler {
let rootResult: [string, string | undefined] | undefined = undefined;

const concurrencyLimit = getConcurrencyThreshold();
const activePromises: Promise<void>[] = [];
let currentIndex = 0;

// Function to process a single file
const processFile = async (file: string, index: number) => {
const filePath = join(dirPath, file);
const fileStat = await stat(filePath);

if (fileStat.isFile() && filePath.endsWith(".xml")) {
const xmlParsed = await parseXML(filePath);
if (xmlParsed === undefined) return;

const rootResultFromFile = await processFilesForRootElement(xmlParsed);
rootResult = rootResultFromFile;

const combinedXmlString = buildXMLString(xmlParsed);
combinedXmlContents[index] = combinedXmlString;
} else if (fileStat.isDirectory()) {
const [subCombinedXmlContents, subRootResult] =
await this.processFilesInDirectory(filePath);
rootResult = subRootResult;
combinedXmlContents[index] = subCombinedXmlContents.join("");
}
};

while (currentIndex < files.length || activePromises.length > 0) {
if (
currentIndex < files.length &&
activePromises.length < concurrencyLimit
) {
const index = currentIndex++;
const file = files[index];
const promise = processFile(file, index).finally(() => {
activePromises.splice(activePromises.indexOf(promise), 1);
});
activePromises.push(promise);
} else {
await Promise.race(activePromises); // Wait for any promise to complete
}
}

// Create tasks for processing files
const tasks: (() => Promise<void>)[] = files.map((file, index) => {
return async () => {
const filePath = join(dirPath, file);
const fileStat = await stat(filePath);

if (fileStat.isFile() && filePath.endsWith(".xml")) {
const xmlParsed = await parseXML(filePath);
if (xmlParsed === undefined) return;

const rootResultFromFile =
await processFilesForRootElement(xmlParsed);
rootResult = rootResultFromFile;

const combinedXmlString = buildXMLString(xmlParsed);
combinedXmlContents[index] = combinedXmlString;
} else if (fileStat.isDirectory()) {
const [subCombinedXmlContents, subRootResult] =
await this.processFilesInDirectory(filePath);
rootResult = subRootResult;
combinedXmlContents[index] = subCombinedXmlContents.join("");
}
};
});

// Execute tasks with concurrency limit
await withConcurrencyLimit(tasks, concurrencyLimit);

return [combinedXmlContents.filter(Boolean), rootResult];
}
Expand All @@ -86,6 +75,7 @@ export class ReassembleXMLFileHandler {
);
return;
}

logger.debug(`Parsing directory to reassemble: ${filePath}`);
const [subCombinedXmlContents, rootResult] =
await this.processFilesInDirectory(filePath);
Expand Down
22 changes: 22 additions & 0 deletions src/service/withConcurrencyLimit.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
export async function withConcurrencyLimit<T>(
tasks: (() => Promise<T>)[],
limit: number,
): Promise<T[]> {
const results: Promise<T>[] = [];
const executing: Promise<T>[] = []; // Change this to Promise<T>[]

for (const task of tasks) {
const p = task().then((result) => {
executing.splice(executing.indexOf(p), 1);
return result;
});
results.push(p);
executing.push(p);

if (executing.length >= limit) {
await Promise.race(executing); // Wait for the first one to complete
}
}

return Promise.all(results);
}

0 comments on commit a8e53b8

Please sign in to comment.