-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flow complete fix #9
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,7 @@ import { PromptKernelRequest } from "../api/types/PromptKernelRequest"; | |
import { | ||
HUMANLOOP_FILE_KEY, | ||
HUMANLOOP_FILE_TYPE_KEY, | ||
HUMANLOOP_FLOW_SPAN_NAME, | ||
HUMANLOOP_LOG_KEY, | ||
HUMANLOOP_META_FUNCTION_NAME, | ||
} from "./constants"; | ||
|
@@ -36,15 +37,37 @@ interface CompletableSpan { | |
export class HumanloopSpanProcessor implements SpanProcessor { | ||
private spanExporter: SpanExporter; | ||
private children: Map<string, CompletableSpan[]>; | ||
// List of all span IDs that are contained in a Flow trace | ||
// They are passed to the Exporter as a span attribute | ||
// so the Exporter knows when to complete a trace | ||
private prerequisites: Map<string, string[]>; | ||
|
||
constructor(exporter: SpanExporter) { | ||
this.spanExporter = exporter; | ||
this.children = new Map(); | ||
this.prerequisites = new Map(); | ||
} | ||
|
||
async forceFlush(): Promise<void> {} | ||
|
||
onStart(span: Span, _: Context): void { | ||
const spanId = span.spanContext().spanId; | ||
const parentSpanId = span.parentSpanId; | ||
if (span.name === HUMANLOOP_FLOW_SPAN_NAME) { | ||
this.prerequisites.set(spanId, []); | ||
} | ||
if (parentSpanId !== undefined && isHumanloopSpan(span)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. noob question - There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yep - i am building a flat list of all logs found inside a trace. this list is passed as an attribute on the flow log span to the expoter. when they're all uploaded, the flow log is also marked as complete |
||
for (const [traceHead, allTraceNodes] of this.prerequisites) { | ||
if ( | ||
parentSpanId === traceHead || | ||
allTraceNodes.includes(parentSpanId) | ||
) { | ||
allTraceNodes.push(spanId); | ||
this.prerequisites.set(traceHead, allTraceNodes); | ||
break; | ||
} | ||
} | ||
} | ||
// Handle stream case: when Prompt instrumented function calls a provider with streaming: true | ||
// The instrumentor span will end only when the ChunksResponse is consumed, which can happen | ||
// after the span created by the Prompt utility finishes. To handle this, we register all instrumentor | ||
|
@@ -66,6 +89,7 @@ export class HumanloopSpanProcessor implements SpanProcessor { | |
*/ | ||
onEnd(span: ReadableSpan): void { | ||
if (isHumanloopSpan(span)) { | ||
// Wait for children to complete asynchronously | ||
new Promise<void>((resolve) => { | ||
const checkChildrenSpans = () => { | ||
const childrenSpans = this.children.get(span.spanContext().spanId); | ||
|
@@ -79,15 +103,28 @@ export class HumanloopSpanProcessor implements SpanProcessor { | |
}; | ||
checkChildrenSpans(); | ||
}).then((_) => { | ||
// All children/ instrumentor spans have arrived, we can process the | ||
// All instrumentor spans have arrived, we can process the | ||
// Humanloop parent span owning them | ||
if (span.name === HUMANLOOP_FLOW_SPAN_NAME) { | ||
// If the span if a Flow Log, add attribute with all span IDs it | ||
// needs to wait before completion | ||
writeToOpenTelemetrySpan( | ||
span, | ||
this.prerequisites.get(span.spanContext().spanId) || [], | ||
HUMANLOOP_LOG_KEY, | ||
); | ||
this.prerequisites.delete(span.spanContext().spanId); | ||
} | ||
|
||
this.processSpanDispatch( | ||
span, | ||
this.children.get(span.spanContext().spanId) || [], | ||
); | ||
|
||
// Release references | ||
this.children.delete(span.spanContext().spanId); | ||
// Export the Humanloop span | ||
|
||
// Pass Humanloop span to Exporter | ||
this.spanExporter.export([span], (result: ExportResult) => { | ||
if (result.code !== ExportResultCode.SUCCESS) { | ||
console.error("Failed to export span:", result.error); | ||
|
@@ -182,7 +219,7 @@ export class HumanloopSpanProcessor implements SpanProcessor { | |
// Placeholder for processing other file types | ||
break; | ||
default: | ||
console.error("Unknown Humanloop File Span", span); | ||
console.error("Unknown Humanloop File span", span); | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
noob question - why
notifySpanUploaded
and not e.g.markSpanCompleted
. Is notify OT jargon?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair point will change - no OT jargon, just bad taste on my side