Skip to content

Commit

Permalink
Python: Streaming content for token usage (microsoft#8902)
Browse files Browse the repository at this point in the history
### Motivation and Context

<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
  1. Why is this change required?
  2. What problem does it solve?
  3. What scenario does it contribute to?
  4. If it fixes an open issue, please link to the issue here.
-->
OpenAI recently starts providing token usage information on their
streaming chat completion API. This ADR opens the discussion on how we
should consume that information within our `StreamingChatMessageContent`
data structure.

### Description

<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->

1. An ADR documenting 4 approaches.
2. Implementation of the selected approach.
3. Fix issues where data in the streaming contents are modified after
concatenation (`__add__`).
4. Add streaming to model diagnostics operation names so that we can
distinguish streaming and non-streaming operations.

### Contribution Checklist

<!-- Before submitting this PR, please make sure: -->

- [x] The code builds clean without any errors or warnings
- [x] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [x] All unit tests pass, and I have added new tests where possible
- [x] I didn't break anyone 😄

---------

Co-authored-by: Ben Thomas <[email protected]>
  • Loading branch information
TaoChenOSU and alliscode authored Sep 26, 2024
1 parent a2a64a3 commit 5c5d761
Show file tree
Hide file tree
Showing 16 changed files with 356 additions and 51 deletions.
170 changes: 170 additions & 0 deletions docs/decisions/0054-python-streaming-content-for-token-usage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
---
# These are optional elements. Feel free to remove any of them.
status: { accepted }
contact: { Tao Chen }
date: { 2024-09-18 }
deciders: { Tao Chen }
consulted: { Eduard van Valkenburg, Evan Mattson }
informed: { Eduard van Valkenburg, Evan Mattson, Ben Thomas }
---

# Streaming Contents for Token Usage Information (Semantic Kernel Python)

## Context and Problem Statement

Currently, `StreamingChatMessageContent` (inherits from `StreamingContentMixin`) in Semantic Kernel requires a choice index to be specified. This creates a limitation since the token usage information from **OpenAI's streaming chat completion** API will be returned in the last chunk where the choices field will be empty, which leads to an unknown choice index for the chunk. For more information, please refer to the [OpenAI API documentation](https://platform.openai.com/docs/api-reference/chat/create) and look for the `stream_options` field.

> The token usage information returned in the last chunk is the **total** token usage for the chat completion request regardless of the number of choices specified. That being said, there will be only one chunk containing the token usage information in the streaming response even when multiple choices are requested.
Our current data structure for `StreamingChatMessageContent`:

```Python
# semantic_kernel/content/streaming_chat_message_content.py
class StreamingChatMessageContent(ChatMessageContent, StreamingContentMixin):

# semantic_kernel/content/chat_message_content.py
class ChatMessageContent(KernelContent):
content_type: Literal[ContentTypes.CHAT_MESSAGE_CONTENT] = Field(CHAT_MESSAGE_CONTENT_TAG, init=False) # type: ignore
tag: ClassVar[str] = CHAT_MESSAGE_CONTENT_TAG
role: AuthorRole
name: str | None = None
items: list[Annotated[ITEM_TYPES, Field(..., discriminator=DISCRIMINATOR_FIELD)]] = Field(default_factory=list)
encoding: str | None = None
finish_reason: FinishReason | None = None

# semantic_kernel/content/streaming_content_mixin.py
class StreamingContentMixin(KernelBaseModel, ABC):
choice_index: int

# semantic_kernel/content/kernel_content.py
class KernelContent(KernelBaseModel, ABC):
inner_content: Any | None = None
ai_model_id: str | None = None
metadata: dict[str, Any] = Field(default_factory=dict)
```

## Proposal 1

In non-streaming responses, the token usage is returned as part of the response from the model along with the choices that can be more than one. We then parse the choices into individual `ChatMessageContent`s, with each containing the token usage information, even though the token usage is for the entire response, not just the individual choice.

Considering the same strategy, all choices from the streaming response should contain the token usage information when they are eventually concatenated by their `choice_index`. Since we know the number of choices requested, we can perform the following steps:

1. Replicate the last chunk for each choice requested to create a list of `StreamingChatMessageContent`s, with the token usage information included in the metadata.
2. Assign a choice index to each replicated chunk, starting from 0.
3. Stream the replicated chunks in a list back to the client.

### Additional considerations

Currently, when two `StreamingChatMessageContent`s are "added" together, the metadata is not merged. We need to ensure that the metadata is merged when the chunks are concatenated. When there are conflicting metadata keys, the metadata from the second chunk should overwrite the metadata from the first chunk:

```Python
class StreamingChatMessageContent(ChatMessageContent, StreamingContentMixin):
...

def __add__(self, other: "StreamingChatMessageContent") -> "StreamingChatMessageContent":
...

return StreamingChatMessageContent(
...,
metadata=self.metadata | other.metadata,
...
)

...
```

### Risks

There are no breaking changes and known risks associated with this proposal.

## Proposal 2

We allow the choice index to be optional in the `StreamingContentMixin` class. This will allow the choice index to be `None` when the token usage information is returned in the last chunk. The choice index will be set to `None` in the last chunk, and the client can handle the token usage information accordingly.

```Python
# semantic_kernel/content/streaming_content_mixin.py
class StreamingContentMixin(KernelBaseModel, ABC):
choice_index: int | None
```

This is a simpler solution compared to Proposal 1, and it is more in line with what the OpenAI API returns, that is the token usage is not associated with any specific choice.

### Risks

This is potentially a breaking change since the `choice_index` field is currently required. This approach also makes streaming content concatenation more complex since the choice index will need to be handled differently when it is `None`.

## Proposal 3

We will merge `ChatMessageContent` and `StreamingChatMessageContent` into a single class, `ChatMessageContent`, and mark `StreamingChatMessageContent` as deprecated. The `StreamingChatMessageContent` class will be removed in a future release. Then we apply the either [Proposal 1](#proposal-1) or [Proposal 2](#proposal-2) to the `ChatMessageContent` class to handle the token usage information.

This approach simplifies the codebase by removing the need for a separate class for streaming chat messages. The `ChatMessageContent` class will be able to handle both streaming and non-streaming chat messages.

```Python
# semantic_kernel/content/streaming_chat_message_content.py
@deprecated("StreamingChatMessageContent is deprecated. Use ChatMessageContent instead.")
class StreamingChatMessageContent(ChatMessageContent):
pass

# semantic_kernel/content/chat_message_content.py
class ChatMessageContent(KernelContent):
...
# Add the choice_index field to the ChatMessageContent class and make it optional
choice_index: int | None

# Add the __add__ method to merge the metadata when two ChatMessageContent instances are added together. This is currently an abstract method in the `StreamingContentMixin` class.
def __add__(self, other: "ChatMessageContent") -> "ChatMessageContent":
...

return ChatMessageContent(
...,
choice_index=self.choice_index,
...
)

# Add the __bytes__ method to return the bytes representation of the ChatMessageContent instance. This is currently an abstract method in the `StreamingContentMixin` class.
def __bytes__(self) -> bytes:
...
```

### Risks

We are unifying the returned data structure for streaming and non-streaming chat messages, which may lead to confusion for developers initially, especially if they are not aware of the deprecation of the `StreamingChatMessageContent` class, or they came from SK .Net. It may also create a sharper learning curve if developers started with Python but later move to .Net for production. This approach also introduces breaking changes to our AI connectors as the returned data type will be different.

> We will also need to update the `StreamingTextContent` and `TextContent` in a similar way too for this proposal.
## Proposal 4

Similar to [Proposal 3](#proposal-3), we will merge `ChatMessageContent` and `StreamingChatMessageContent` into a single class, `ChatMessageContent`, and mark `StreamingChatMessageContent` as deprecated. In addition, we will introduce another a new mixin called `ChatMessageContentConcatenationMixin` to handle the concatenation of two `ChatMessageContent` instances. Then we apply the either [Proposal 1](#proposal-1) or [Proposal 2](#proposal-2) to the `ChatMessageContent` class to handle the token usage information.

```Python
# semantic_kernel/content/streaming_chat_message_content.py
@deprecated("StreamingChatMessageContent is deprecated. Use ChatMessageContent instead.")
class StreamingChatMessageContent(ChatMessageContent):
pass

# semantic_kernel/content/chat_message_content.py
class ChatMessageContent(KernelContent, ChatMessageContentConcatenationMixin):
...
# Add the choice_index field to the ChatMessageContent class and make it optional
choice_index: int | None

# Add the __bytes__ method to return the bytes representation of the ChatMessageContent instance. This is currently an abstract method in the `StreamingContentMixin` class.
def __bytes__(self) -> bytes:
...

class ChatMessageContentConcatenationMixin(KernelBaseModel, ABC):
def __add__(self, other: "ChatMessageContent") -> "ChatMessageContent":
...
```

This approach separates the concerns of the `ChatMessageContent` class and the concatenation logic into two separate classes. This can help to keep the codebase clean and maintainable.

### Risks

Same as [Proposal 3](#proposal-3).

## Decision Outcome

To minimize the impact on customers and the existing codebase, we will go with [Proposal 1](#proposal-1) to handle the token usage information in the OpenAI streaming responses. This proposal is backward compatible and aligns with the current data structure for non-streaming responses. We will also ensure that the metadata is merged correctly when two `StreamingChatMessageContent` instances are concatenated. This approach also makes sure the token usage information will be associated to all choices in the streaming response.

[Proposal 3](#proposal-3) and [Proposal 4](#proposal-4) are still valid but perhaps premature at this stage as most services still return objects of different types for streaming and non-streaming responses. We will keep them in mind for future refactoring efforts.
2 changes: 1 addition & 1 deletion python/samples/demos/telemetry/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ async def main(scenario: Literal["ai_service", "kernel_function", "auto_function
with tracer.start_as_current_span("main") as current_span:
print(f"Trace ID: {format_trace_id(current_span.get_span_context().trace_id)}")

stream = False
stream = True

# Scenarios where telemetry is collected in the SDK, from the most basic to the most complex.
if scenario == "ai_service" or scenario == "all":
Expand Down
4 changes: 4 additions & 0 deletions python/samples/demos/telemetry/scenarios.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ async def run_ai_service(stream: bool = False) -> None:
chat_history, PromptExecutionSettings()
):
print(update[0].content, end="")
print()
elif isinstance(ai_service, TextCompletionClientBase):
if not stream:
completion = await ai_service.get_text_contents(
Expand All @@ -84,6 +85,7 @@ async def run_ai_service(stream: bool = False) -> None:
"Why is the sky blue?", PromptExecutionSettings()
):
print(update[0].content, end="")
print()
else:
raise ValueError("AI service not recognized.")
except Exception as e:
Expand Down Expand Up @@ -129,6 +131,7 @@ async def run_kernel_function(stream: bool = False) -> None:
),
):
print(update[0].content, end="")
print()
except Exception as e:
current_span.record_exception(e)
print(f"Error running kernel plugin: {e}")
Expand Down Expand Up @@ -178,6 +181,7 @@ async def run_auto_function_invocation(stream: bool = False) -> None:
),
):
print(update[0].content, end="")
print()
except Exception as e:
current_span.record_exception(e)
print(f"Error running auto function invocation: {e}")
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ class OpenAIChatPromptExecutionSettings(OpenAIPromptExecutionSettings):
description="Do not set this manually. It is set by the service based on the function choice configuration.",
)
structured_json_response: bool = Field(False, description="Do not set this manually. It is set by the service.")
stream_options: dict[str, Any] | None = Field(
None,
description="Additional options to pass when streaming is used. Do not set this manually.",
)

@field_validator("functions", "function_call", mode="after")
@classmethod
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
# Copyright (c) Microsoft. All rights reserved.

import json
import logging
from collections.abc import Mapping
import sys
from collections.abc import AsyncGenerator, Mapping
from copy import deepcopy
from typing import Any, TypeVar
from uuid import uuid4

from openai import AsyncAzureOpenAI
if sys.version_info >= (3, 12):
from typing import override # pragma: no cover
else:
from typing_extensions import override # pragma: no cover

from openai import AsyncAzureOpenAI, AsyncStream
from openai.lib.azure import AsyncAzureADTokenProvider
from openai.types.chat.chat_completion import ChatCompletion, Choice
from openai.types.chat.chat_completion_chunk import ChatCompletionChunk
Expand All @@ -16,19 +23,24 @@
from semantic_kernel.connectors.ai.open_ai.prompt_execution_settings.azure_chat_prompt_execution_settings import (
AzureChatPromptExecutionSettings,
)
from semantic_kernel.connectors.ai.open_ai.prompt_execution_settings.open_ai_prompt_execution_settings import (
OpenAIChatPromptExecutionSettings,
)
from semantic_kernel.connectors.ai.open_ai.services.azure_config_base import AzureOpenAIConfigBase
from semantic_kernel.connectors.ai.open_ai.services.open_ai_chat_completion_base import OpenAIChatCompletionBase
from semantic_kernel.connectors.ai.open_ai.services.open_ai_handler import OpenAIModelTypes
from semantic_kernel.connectors.ai.open_ai.services.open_ai_text_completion_base import OpenAITextCompletionBase
from semantic_kernel.connectors.ai.open_ai.settings.azure_open_ai_settings import AzureOpenAISettings
from semantic_kernel.connectors.ai.prompt_execution_settings import PromptExecutionSettings
from semantic_kernel.contents.chat_history import ChatHistory
from semantic_kernel.contents.chat_message_content import ChatMessageContent
from semantic_kernel.contents.function_call_content import FunctionCallContent
from semantic_kernel.contents.function_result_content import FunctionResultContent
from semantic_kernel.contents.streaming_chat_message_content import StreamingChatMessageContent
from semantic_kernel.contents.text_content import TextContent
from semantic_kernel.contents.utils.finish_reason import FinishReason
from semantic_kernel.exceptions.service_exceptions import ServiceInitializationError
from semantic_kernel.exceptions.service_exceptions import ServiceInitializationError, ServiceInvalidResponseError
from semantic_kernel.utils.telemetry.model_diagnostics.decorators import trace_streaming_chat_completion

logger: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -126,6 +138,42 @@ def __init__(
client=async_client,
)

@override
@trace_streaming_chat_completion(OpenAIChatCompletionBase.MODEL_PROVIDER_NAME)
async def _inner_get_streaming_chat_message_contents(
self,
chat_history: "ChatHistory",
settings: "PromptExecutionSettings",
) -> AsyncGenerator[list["StreamingChatMessageContent"], Any]:
"""Override the base method.
This is because the latest Azure OpenAI API GA version doesn't support `stream_option`
yet and it will potentially result in errors if the option is included.
This method will be called instead of the base method.
TODO: Remove this method when the `stream_option` is supported by the Azure OpenAI API.
GitHub Issue: https://github.com/microsoft/semantic-kernel/issues/8996
"""
if not isinstance(settings, OpenAIChatPromptExecutionSettings):
settings = self.get_prompt_execution_settings_from_settings(settings)
assert isinstance(settings, OpenAIChatPromptExecutionSettings) # nosec

settings.stream = True
settings.messages = self._prepare_chat_history_for_request(chat_history)
settings.ai_model_id = settings.ai_model_id or self.ai_model_id

response = await self._send_request(request_settings=settings)
if not isinstance(response, AsyncStream):
raise ServiceInvalidResponseError("Expected an AsyncStream[ChatCompletionChunk] response.")
async for chunk in response:
if len(chunk.choices) == 0:
continue

assert isinstance(chunk, ChatCompletionChunk) # nosec
chunk_metadata = self._get_metadata_from_streaming_chat_response(chunk)
yield [
self._create_streaming_chat_message_content(chunk, choice, chunk_metadata) for choice in chunk.choices
]

@classmethod
def from_dict(cls, settings: dict[str, Any]) -> "AzureChatCompletion":
"""Initialize an Azure OpenAI service from a dictionary of settings.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,20 +102,38 @@ async def _inner_get_streaming_chat_message_contents(
assert isinstance(settings, OpenAIChatPromptExecutionSettings) # nosec

settings.stream = True
settings.stream_options = {"include_usage": True}
settings.messages = self._prepare_chat_history_for_request(chat_history)
settings.ai_model_id = settings.ai_model_id or self.ai_model_id

response = await self._send_request(request_settings=settings)
if not isinstance(response, AsyncStream):
raise ServiceInvalidResponseError("Expected an AsyncStream[ChatCompletionChunk] response.")
async for chunk in response:
if len(chunk.choices) == 0:
if len(chunk.choices) == 0 and chunk.usage is None:
continue

assert isinstance(chunk, ChatCompletionChunk) # nosec
chunk_metadata = self._get_metadata_from_streaming_chat_response(chunk)
yield [
self._create_streaming_chat_message_content(chunk, choice, chunk_metadata) for choice in chunk.choices
]
if chunk.usage is not None:
# Usage is contained in the last chunk where the choices are empty
# We are duplicating the usage metadata to all the choices in the response
yield [
StreamingChatMessageContent(
role=AuthorRole.ASSISTANT,
content="",
choice_index=i,
inner_content=chunk,
ai_model_id=settings.ai_model_id,
metadata=chunk_metadata,
)
for i in range(settings.number_of_responses or 1)
]
else:
yield [
self._create_streaming_chat_message_content(chunk, choice, chunk_metadata)
for choice in chunk.choices
]

@override
def _verify_function_choice_settings(self, settings: "PromptExecutionSettings") -> None:
Expand Down Expand Up @@ -206,6 +224,7 @@ def _get_metadata_from_streaming_chat_response(self, response: ChatCompletionChu
"id": response.id,
"created": response.created,
"system_fingerprint": response.system_fingerprint,
"usage": CompletionUsage.from_openai(response.usage) if response.usage is not None else None,
}

def _get_metadata_from_chat_choice(self, choice: Choice | ChunkChoice) -> dict[str, Any]:
Expand Down
Loading

0 comments on commit 5c5d761

Please sign in to comment.