Skip to content

Commit

Permalink
Merge pull request #9 from humanloop/flow-complete-fix
Browse files Browse the repository at this point in the history
Flow complete fix
  • Loading branch information
andreibratu authored Jan 9, 2025
2 parents d3dbafa + 8feeed5 commit f0bf780
Show file tree
Hide file tree
Showing 8 changed files with 254 additions and 297 deletions.
60 changes: 30 additions & 30 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,56 +12,56 @@
"test": "jest"
},
"dependencies": {
"url-join": "4.0.1",
"form-data": "^4.0.0",
"formdata-node": "^6.0.3",
"node-fetch": "2.7.0",
"qs": "6.11.2",
"readable-stream": "^4.5.2",
"form-data-encoder": "^4.0.2",
"@opentelemetry/api": "1.9.0",
"@opentelemetry/auto-instrumentations-node": "0.53.0",
"@opentelemetry/sdk-metrics": "1.28.0",
"@opentelemetry/sdk-node": "0.55.0",
"@opentelemetry/sdk-trace-node": "1.28.0",
"@traceloop/ai-semantic-conventions": "0.11.6",
"@traceloop/instrumentation-anthropic": "0.11.1",
"@traceloop/instrumentation-cohere": "0.11.1",
"@traceloop/instrumentation-openai": "0.11.3",
"@traceloop/ai-semantic-conventions": "0.11.6",
"uuid": "11.0.3",
"nanoid": "5.0.9",
"cli-progress": "^3.12.0",
"form-data": "^4.0.0",
"form-data-encoder": "^4.0.2",
"formdata-node": "^6.0.3",
"lodash": "4.17.21",
"nanoid": "5.0.9",
"node-fetch": "2.7.0",
"p-map": "7.0.3",
"stable-hash": "0.0.4"
"qs": "6.11.2",
"readable-stream": "^4.5.2",
"stable-hash": "0.0.4",
"url-join": "4.0.1",
"uuid": "11.0.3"
},
"devDependencies": {
"@types/url-join": "4.0.1",
"@types/qs": "6.9.8",
"@anthropic-ai/sdk": "^0.32.1",
"@babel/core": "^7.26.0",
"@babel/plugin-transform-modules-commonjs": "^7.26.3",
"@babel/preset-env": "^7.26.0",
"@trivago/prettier-plugin-sort-imports": "4.3.0",
"@types/cli-progress": "^3.11.6",
"@types/jest": "29.5.5",
"@types/lodash": "4.14.74",
"@types/node": "17.0.33",
"@types/node-fetch": "2.6.9",
"@types/qs": "6.9.8",
"@types/readable-stream": "^4.0.15",
"@types/url-join": "4.0.1",
"babel-jest": "^29.7.0",
"cohere-ai": "^7.15.0",
"dotenv": "^16.4.6",
"fetch-mock-jest": "^1.5.1",
"webpack": "^5.94.0",
"ts-loader": "^9.3.1",
"jest": "29.7.0",
"@types/jest": "29.5.5",
"ts-jest": "29.1.1",
"jest-environment-jsdom": "29.7.0",
"@types/node": "17.0.33",
"jsonschema": "^1.4.1",
"openai": "^4.74.0",
"prettier": "^3.4.2",
"ts-jest": "29.1.1",
"ts-loader": "^9.3.1",
"typescript": "4.6.4",
"openai": "^4.74.0",
"@anthropic-ai/sdk": "^0.32.1",
"cohere-ai": "^7.15.0",
"dotenv": "^16.4.6",
"jsonschema": "^1.4.1",
"@types/cli-progress": "^3.11.6",
"babel-jest": "^29.7.0",
"@babel/core": "^7.26.0",
"@babel/plugin-transform-modules-commonjs": "^7.26.3",
"@babel/preset-env": "^7.26.0",
"@types/lodash": "4.14.74",
"@trivago/prettier-plugin-sort-imports": "4.3.0"
"webpack": "^5.94.0"
},
"browser": {
"fs": false,
Expand Down
8 changes: 6 additions & 2 deletions src/otel/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,9 @@ export const HUMANLOOP_LOG_KEY = "humanloop.log";
export const HUMANLOOP_FILE_TYPE_KEY = "humanloop.file_type";
export const HUMANLOOP_PATH_KEY = "humanloop.file.path";
export const HUMANLOOP_META_FUNCTION_NAME = "humanloop.meta.function_name";
export const HUMANLOOP_PARENT_SPAN_CTX_KEY = "humanloop.context.parentSpanId";
export const HUMANLOOP_TRACE_FLOW_CTX_KEY = "humanloop.context.traceFlow";
export const HUMANLOOP_FLOW_PREREQUISITES_KEY = "humanloop.flow.prerequisites";

export const HUMANLOOP_SPAN_PREFIX = "humanloop.";
export const HUMANLOOP_FLOW_SPAN_NAME = `${HUMANLOOP_SPAN_PREFIX}flow`;
export const HUMANLOOP_PROMPT_SPAN_NAME = `${HUMANLOOP_SPAN_PREFIX}prompt`;
export const HUMANLOOP_TOOL_SPAN_NAME = `${HUMANLOOP_SPAN_PREFIX}tool`;
46 changes: 46 additions & 0 deletions src/otel/exporter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { HumanloopClient } from "../humanloop.client";
import {
HUMANLOOP_FILE_KEY,
HUMANLOOP_FILE_TYPE_KEY,
HUMANLOOP_FLOW_PREREQUISITES_KEY,
HUMANLOOP_LOG_KEY,
HUMANLOOP_PATH_KEY,
} from "./constants";
Expand All @@ -30,13 +31,17 @@ export class HumanloopSpanExporter implements SpanExporter {
private shutdownFlag: boolean;
private readonly uploadPromises: Promise<void>[];
private readonly exportedSpans: ReadableSpan[];
// List of spans that must be uploaded before completing the Flow log
// This maps [flow log span ID] -> [set of child span IDs]
private readonly prerequisites: Map<string, Set<string>>;

constructor(client: HumanloopClient) {
this.client = client;
this.spanIdToUploadedLogId = new Map();
this.shutdownFlag = false;
this.uploadPromises = [];
this.exportedSpans = [];
this.prerequisites = new Map();
}

export(spans: ReadableSpan[]): ExportResult {
Expand Down Expand Up @@ -69,6 +74,32 @@ export class HumanloopSpanExporter implements SpanExporter {
await this.shutdown();
}

/**
* Mark a span as uploaded to the Humanloop.
*
* A Log might be contained inside a Flow trace, which must be marked as complete
* when all its children are uploaded. Each Flow Log span contains a
* 'humanloop.flow.prerequisites' attribute, which is a list of all spans that must
* be uploaded before the Flow Log is marked as complete.
*
* This method finds the trace the Span belongs to and removes the Span from the list.
* Once all prerequisites are uploaded, the method marks the Flow Log as complete.
*
* @param spanId - The ID of the span that has been uploaded.
*/
private markSpanCompleted(spanId: string) {
for (const [flowLogSpanId, flowChildrenSpanIds] of this.prerequisites) {
if (flowChildrenSpanIds.has(spanId)) {
flowChildrenSpanIds.delete(spanId);
if (flowChildrenSpanIds.size === 0) {
const flowLogId = this.spanIdToUploadedLogId.get(flowLogSpanId)!;
this.client.flows.updateLog(flowLogId, { traceStatus: "complete" });
}
break;
}
}
}

private async exportSpanDispatch(span: ReadableSpan): Promise<void> {
const fileType = span.attributes[HUMANLOOP_FILE_TYPE_KEY];
const parentSpanId = span.parentSpanId;
Expand Down Expand Up @@ -130,6 +161,7 @@ export class HumanloopSpanExporter implements SpanExporter {
} catch (error) {
console.error(`Error exporting prompt: ${error}`);
}
this.markSpanCompleted(span.spanContext().spanId);
}

private async exportTool(span: ReadableSpan): Promise<void> {
Expand Down Expand Up @@ -158,6 +190,7 @@ export class HumanloopSpanExporter implements SpanExporter {
} catch (error) {
console.error(`Error exporting tool: ${error}`);
}
this.markSpanCompleted(span.spanContext().spanId);
}

private async exportFlow(span: ReadableSpan): Promise<void> {
Expand All @@ -168,6 +201,18 @@ export class HumanloopSpanExporter implements SpanExporter {
logObject.startTime = hrTimeToDate(span.startTime);
logObject.endTime = hrTimeToDate(span.endTime);
logObject.createdAt = hrTimeToDate(span.endTime);
// Spans that must be uploaded before the Flow Span is completed
let prerequisites: string[] | undefined = undefined;
try {
prerequisites = readFromOpenTelemetrySpan(
span,
HUMANLOOP_FLOW_PREREQUISITES_KEY,
) as unknown as string[];
} catch (error) {
prerequisites = [];
}

this.prerequisites.set(span.spanContext().spanId, new Set(prerequisites));

const spanParentId = span.parentSpanId;
const traceParentId = spanParentId
Expand All @@ -188,5 +233,6 @@ export class HumanloopSpanExporter implements SpanExporter {
} catch (error) {
console.error("Error exporting flow: ", error, span.spanContext().spanId);
}
this.markSpanCompleted(span.spanContext().spanId);
}
}
16 changes: 3 additions & 13 deletions src/otel/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { AttributeValue, SpanKind } from "@opentelemetry/api";
import { ReadableSpan } from "@opentelemetry/sdk-trace-base";
import { v4 as uuidv4 } from "uuid";

// Constants for Humanloop attributes
import { HUMANLOOP_FILE_TYPE_KEY } from "./constants";
import { HUMANLOOP_SPAN_PREFIX } from "./constants";

export type NestedDict = { [key: string]: NestedDict | AttributeValue };
export type NestedList = Array<NestedDict | AttributeValue>;
Expand Down Expand Up @@ -34,7 +33,7 @@ function _listToOtelFormat(lst: NestedList): NestedDict {
*/
export function writeToOpenTelemetrySpan(
span: ReadableSpan,
value: NestedDict | NestedList | AttributeValue,
value: NestedDict | NestedList | AttributeValue | any[],
key: string,
): void {
let toWriteCopy: NestedDict;
Expand Down Expand Up @@ -191,16 +190,7 @@ export function isLLMProviderCall(span: ReadableSpan): boolean {
* @returns True if the span was created by the Humanloop SDK, false otherwise
*/
export function isHumanloopSpan(span: ReadableSpan): boolean {
return span.attributes[HUMANLOOP_FILE_TYPE_KEY] !== undefined;
}

/**
* Generates a unique span ID.
*
* @returns A UUID string
*/
export function generateSpanId(): string {
return uuidv4();
return span.name.startsWith(HUMANLOOP_SPAN_PREFIX);
}

/**
Expand Down
43 changes: 40 additions & 3 deletions src/otel/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { PromptKernelRequest } from "../api/types/PromptKernelRequest";
import {
HUMANLOOP_FILE_KEY,
HUMANLOOP_FILE_TYPE_KEY,
HUMANLOOP_FLOW_SPAN_NAME,
HUMANLOOP_LOG_KEY,
HUMANLOOP_META_FUNCTION_NAME,
} from "./constants";
Expand All @@ -36,15 +37,37 @@ interface CompletableSpan {
export class HumanloopSpanProcessor implements SpanProcessor {
private spanExporter: SpanExporter;
private children: Map<string, CompletableSpan[]>;
// List of all span IDs that are contained in a Flow trace
// They are passed to the Exporter as a span attribute
// so the Exporter knows when to complete a trace
private prerequisites: Map<string, string[]>;

constructor(exporter: SpanExporter) {
this.spanExporter = exporter;
this.children = new Map();
this.prerequisites = new Map();
}

async forceFlush(): Promise<void> {}

onStart(span: Span, _: Context): void {
const spanId = span.spanContext().spanId;
const parentSpanId = span.parentSpanId;
if (span.name === HUMANLOOP_FLOW_SPAN_NAME) {
this.prerequisites.set(spanId, []);
}
if (parentSpanId !== undefined && isHumanloopSpan(span)) {
for (const [traceHead, allTraceNodes] of this.prerequisites) {
if (
parentSpanId === traceHead ||
allTraceNodes.includes(parentSpanId)
) {
allTraceNodes.push(spanId);
this.prerequisites.set(traceHead, allTraceNodes);
break;
}
}
}
// Handle stream case: when Prompt instrumented function calls a provider with streaming: true
// The instrumentor span will end only when the ChunksResponse is consumed, which can happen
// after the span created by the Prompt utility finishes. To handle this, we register all instrumentor
Expand All @@ -66,6 +89,7 @@ export class HumanloopSpanProcessor implements SpanProcessor {
*/
onEnd(span: ReadableSpan): void {
if (isHumanloopSpan(span)) {
// Wait for children to complete asynchronously
new Promise<void>((resolve) => {
const checkChildrenSpans = () => {
const childrenSpans = this.children.get(span.spanContext().spanId);
Expand All @@ -79,15 +103,28 @@ export class HumanloopSpanProcessor implements SpanProcessor {
};
checkChildrenSpans();
}).then((_) => {
// All children/ instrumentor spans have arrived, we can process the
// All instrumentor spans have arrived, we can process the
// Humanloop parent span owning them
if (span.name === HUMANLOOP_FLOW_SPAN_NAME) {
// If the span if a Flow Log, add attribute with all span IDs it
// needs to wait before completion
writeToOpenTelemetrySpan(
span,
this.prerequisites.get(span.spanContext().spanId) || [],
HUMANLOOP_LOG_KEY,
);
this.prerequisites.delete(span.spanContext().spanId);
}

this.processSpanDispatch(
span,
this.children.get(span.spanContext().spanId) || [],
);

// Release references
this.children.delete(span.spanContext().spanId);
// Export the Humanloop span

// Pass Humanloop span to Exporter
this.spanExporter.export([span], (result: ExportResult) => {
if (result.code !== ExportResultCode.SUCCESS) {
console.error("Failed to export span:", result.error);
Expand Down Expand Up @@ -182,7 +219,7 @@ export class HumanloopSpanProcessor implements SpanProcessor {
// Placeholder for processing other file types
break;
default:
console.error("Unknown Humanloop File Span", span);
console.error("Unknown Humanloop File span", span);
}
}

Expand Down
Loading

0 comments on commit f0bf780

Please sign in to comment.