Skip to content

Commit

Permalink
livekit-plugins-llama-index (#696)
Browse files Browse the repository at this point in the history
  • Loading branch information
theomonnom authored Oct 9, 2024
1 parent cb6cb4f commit d0b0ab9
Show file tree
Hide file tree
Showing 14 changed files with 449 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .changeset/late-jars-taste.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"livekit-plugins-llama-index": patch
---

livekit-plugins-llama-index
56 changes: 56 additions & 0 deletions examples/voice-assistant/llamaindex-rag/chat_engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import os

from dotenv import load_dotenv
from livekit.agents import AutoSubscribe, JobContext, WorkerOptions, cli, llm
from livekit.agents.voice_assistant import VoiceAssistant
from livekit.plugins import deepgram, llama_index, openai, silero
from llama_index.core import (
SimpleDirectoryReader,
StorageContext,
VectorStoreIndex,
load_index_from_storage,
)
from llama_index.core.chat_engine.types import ChatMode

load_dotenv()

# check if storage already exists
PERSIST_DIR = "./chat-engine-storage"
if not os.path.exists(PERSIST_DIR):
# load the documents and create the index
documents = SimpleDirectoryReader("data").load_data()
index = VectorStoreIndex.from_documents(documents)
# store it for later
index.storage_context.persist(persist_dir=PERSIST_DIR)
else:
# load the existing index
storage_context = StorageContext.from_defaults(persist_dir=PERSIST_DIR)
index = load_index_from_storage(storage_context)

chat_engine = index.as_chat_engine(chat_mode=ChatMode.CONTEXT)


async def entrypoint(ctx: JobContext):
initial_ctx = llm.ChatContext().append(
role="system",
text=(
"You are a voice assistant created by LiveKit. Your interface with users will be voice. "
"You should use short and concise responses, and avoiding usage of unpronouncable punctuation."
),
)

await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)

assistant = VoiceAssistant(
vad=silero.VAD.load(),
stt=deepgram.STT(),
llm=llama_index.LLM(chat_engine=chat_engine),
tts=openai.TTS(),
chat_ctx=initial_ctx,
)
assistant.start(ctx.room)
await assistant.say("Hey, how can I help you today?", allow_interruptions=True)


if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))
26 changes: 26 additions & 0 deletions examples/voice-assistant/llamaindex-rag/data/raw_data.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
Cloud Architecture
LiveKit Cloud gives you the flexibility of LiveKit's WebRTC stack, combined with global, CDN-scale infrastructure offering 99.99% uptime.

Built with LiveKit SFU
LiveKit Cloud builds on our open-source SFU. This means it supports the exact same client and server APIs as the open-source stack.
Maintaining compatibility with LiveKit's Open Source stack (OSS) is important to us. We didn't want any developer locked into using Cloud, or needing to integrate a different set of features, APIs or SDKs for their applications to work with it. Our design goal: a developer should be able to switch between Cloud or self-hosted without changing a line of code.

Distributed Mesh Architecture
In contrast to traditional WebRTC architectures, LiveKit Cloud runs multiple SFU instances in a mesh formation. We've developed capabilities for media servers to discover and connect to one another, in order to relay media between servers. This key capability allows us to bypass the single-server limitation that exists in traditional SFU and MCU architectures.

Multi-home
Cloud multi-home architecture
With a multi-home architecture, participants no longer need to connect to the same server. When participants from different regions join the same meeting, they'll each connect to the SFU closest to them, minimizing latency and transmission loss between the participant and SFU.
Each SFU instance establishes connections to other instances over optimized inter-data center networks. Inter-data center networks often run close to internet backbones, delivering high throughput with a minimal number of network hops.

No SPOF
Anything that can fail, will. LiveKit Cloud is designed to anticipate (and recover from) failures in every software and hardware component.
Layers of redundancy are built into the system. A media server failure is recovered from by moving impacted participants to another instance. We isolate shared infrastructure, like our message bus, to individual data centers.
When an entire data center fails, customer traffic is automatically migrated to the next closest data center. LiveKit's client SDKs will perform a "session migration": moving existing WebRTC sessions to a different media server without service interruption for your users.

Globally distributed
To serve end users around the world, our infrastructure runs across multiple Cloud vendors and data centers. Today we have data centers in North America, South America, Southeast Asia, East Asia, and Europe, delivering under 100ms of latency for users in those regions.

Designed to scale
When you need to place many viewers on a media track, like in a livestream, LiveKit Cloud handles that capacity dynamically by forming a distribution mesh, similar to a CDN. It's important to note that this process happens automatically as your sessions scales up. There are no special configurations necessary. Every LiveKit Cloud project scales automatically.
The theoretical limits of this architecture is on the order of millions per room/session. For practical purposes, we've placed a limit of 100k simulteneous participants in the same session. If you have a realtime application operating at a scale larger than this, you can request a limit increase in your Cloud dashboard or get in touch with us.
63 changes: 63 additions & 0 deletions examples/voice-assistant/llamaindex-rag/query_engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import os

from dotenv import load_dotenv
from livekit.agents import AutoSubscribe, JobContext, WorkerOptions, cli, llm
from livekit.agents.voice_assistant import VoiceAssistant
from livekit.plugins import deepgram, openai, silero
from llama_index.core import (
SimpleDirectoryReader,
StorageContext,
VectorStoreIndex,
load_index_from_storage,
)

load_dotenv()

# check if storage already exists
PERSIST_DIR = "./query-engine-storage"
if not os.path.exists(PERSIST_DIR):
# load the documents and create the index
documents = SimpleDirectoryReader("data").load_data()
index = VectorStoreIndex.from_documents(documents)
# store it for later
index.storage_context.persist(persist_dir=PERSIST_DIR)
else:
# load the existing index
storage_context = StorageContext.from_defaults(persist_dir=PERSIST_DIR)
index = load_index_from_storage(storage_context)


async def entrypoint(ctx: JobContext):
initial_ctx = llm.ChatContext().append(
role="system",
text=(
"You are a voice assistant created by LiveKit. Your interface with users will be voice. "
"You should use short and concise responses, and avoiding usage of unpronouncable punctuation."
),
)

await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)

fnc_ctx = llm.FunctionContext()

@fnc_ctx.ai_callable(description="Get more information about a specific topic")
async def query_info(query: str) -> str:
query_engine = index.as_query_engine(use_async=True)
res = await query_engine.aquery(query)
print("Query result:", res)
return str(res)

assistant = VoiceAssistant(
vad=silero.VAD.load(),
stt=deepgram.STT(),
llm=openai.LLM(),
tts=openai.TTS(),
chat_ctx=initial_ctx,
fnc_ctx=fnc_ctx,
)
assistant.start(ctx.room)
await assistant.say("Hey, how can I help you today?", allow_interruptions=True)


if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))
73 changes: 73 additions & 0 deletions examples/voice-assistant/llamaindex-rag/retrieval.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import os

from dotenv import load_dotenv
from livekit.agents import AutoSubscribe, JobContext, WorkerOptions, cli, llm
from livekit.agents.voice_assistant import VoiceAssistant
from livekit.plugins import deepgram, openai, silero
from llama_index.core import (
SimpleDirectoryReader,
StorageContext,
VectorStoreIndex,
load_index_from_storage,
)
from llama_index.core.schema import MetadataMode

load_dotenv()

# check if storage already exists
PERSIST_DIR = "./retrieval-engine-storage"
if not os.path.exists(PERSIST_DIR):
# load the documents and create the index
documents = SimpleDirectoryReader("data").load_data()
index = VectorStoreIndex.from_documents(documents)
# store it for later
index.storage_context.persist(persist_dir=PERSIST_DIR)
else:
# load the existing index
storage_context = StorageContext.from_defaults(persist_dir=PERSIST_DIR)
index = load_index_from_storage(storage_context)


async def entrypoint(ctx: JobContext):
system_msg = llm.ChatMessage(
role="system",
content=(
"You are a voice assistant created by LiveKit. Your interface with users will be voice. "
"You should use short and concise responses, and avoiding usage of unpronouncable punctuation."
),
)
initial_ctx = llm.ChatContext()
initial_ctx.messages.append(system_msg)

async def _will_synthesize_assistant_reply(
assistant: VoiceAssistant, chat_ctx: llm.ChatContext
):
ctx_msg = system_msg.copy()
user_msg = chat_ctx.messages[-1]
retriever = index.as_retriever()
nodes = await retriever.aretrieve(user_msg.content)

ctx_msg.content = "Context that might help answer the user's question:"
for node in nodes:
node_content = node.get_content(metadata_mode=MetadataMode.LLM)
ctx_msg.content += f"\n\n{node_content}"

chat_ctx.messages[0] = ctx_msg # the first message is the system message
return assistant.llm.chat(chat_ctx=chat_ctx)

await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)

assistant = VoiceAssistant(
vad=silero.VAD.load(),
stt=deepgram.STT(),
llm=openai.LLM(),
tts=openai.TTS(),
chat_ctx=initial_ctx,
will_synthesize_assistant_reply=_will_synthesize_assistant_reply,
)
assistant.start(ctx.room)
await assistant.say("Hey, how can I help you today?", allow_interruptions=True)


if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))
1 change: 1 addition & 0 deletions livekit-plugins/install_plugins_editable.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ pip install -e ./livekit-plugins-openai --config-settings editable_mode=strict
pip install -e ./livekit-plugins-rag --config-settings editable_mode=strict
pip install -e ./livekit-plugins-silero --config-settings editable_mode=strict
pip install -e ./livekit-plugins-browser --config-settings editable_mode=strict
pip install -e ./livekit-plugins-llama-index --config-settings editable_mode=strict
14 changes: 14 additions & 0 deletions livekit-plugins/livekit-plugins-llama-index/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# LiveKit Plugins Minimal

This is a minimal example of a LiveKit plugin for Agents.

### Developer note

When copying this directory over to create a new `livekit-plugins` package, make sure it's nested within the `livekit-plugins` folder and that the `"name"` field in `package.json` follows the proper naming convention for CI:

```json
{
"name": "livekit-plugins-<name>",
"private": true
}
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright 2023 LiveKit, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from livekit.agents import Plugin

from .llm import LLM, LLMStream
from .log import logger
from .version import __version__

__all__ = ["LLM", "LLMStream"]


class LlamaIndexPlugin(Plugin):
def __init__(self):
super().__init__(__name__, __version__, __package__, logger)


Plugin.register_plugin(LlamaIndexPlugin())
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from __future__ import annotations

from livekit.agents import llm

from llama_index.core.chat_engine.types import (
BaseChatEngine,
StreamingAgentChatResponse,
)
from llama_index.core.llms import ChatMessage, MessageRole

from .log import logger


class LLM(llm.LLM):
def __init__(
self,
*,
chat_engine: BaseChatEngine,
) -> None:
self._chat_engine = chat_engine

def chat(
self,
*,
chat_ctx: llm.ChatContext,
fnc_ctx: llm.FunctionContext | None = None,
temperature: float | None = None,
n: int | None = 1,
parallel_tool_calls: bool | None = None,
) -> "LLMStream":
if fnc_ctx is not None:
logger.warning("fnc_ctx is currently not supported with llama_index.LLM")

return LLMStream(
chat_engine=self._chat_engine,
chat_ctx=chat_ctx,
)


class LLMStream(llm.LLMStream):
def __init__(
self,
*,
chat_engine: BaseChatEngine,
chat_ctx: llm.ChatContext,
) -> None:
super().__init__(chat_ctx=chat_ctx, fnc_ctx=None)
self._chat_engine = chat_engine
self._stream: StreamingAgentChatResponse | None = None

async def __anext__(self) -> llm.ChatChunk:
# remove and get the last message from the chat_ctx
chat_ctx = self._chat_ctx.copy()
user_msg = chat_ctx.messages.pop()
if user_msg.role != "user":
raise ValueError("last message in chat_ctx must be from user")

assert isinstance(
user_msg.content, str
), "user message content must be a string"

if not self._stream:
self._stream = await self._chat_engine.astream_chat(
user_msg.content, chat_history=_to_llama_chat_messages(self._chat_ctx)
)

async for delta in self._stream.async_response_gen():
return llm.ChatChunk(
choices=[
llm.Choice(
delta=llm.ChoiceDelta(
role="assistant",
content=delta,
)
)
]
)

raise StopAsyncIteration


def _to_llama_chat_messages(chat_ctx: llm.ChatContext) -> list[ChatMessage]:
return [
ChatMessage(content=msg.content, role=_to_llama_message_role(msg.role))
for msg in chat_ctx.messages
]


def _to_llama_message_role(chat_role: llm.ChatRole) -> MessageRole:
if chat_role == "system":
return MessageRole.SYSTEM
elif chat_role == "user":
return MessageRole.USER
elif chat_role == "assistant":
return MessageRole.ASSISTANT
elif chat_role == "tool":
return MessageRole.TOOL
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import logging

logger = logging.getLogger("livekit.plugins.llama_index")
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright 2023 LiveKit, Inc.

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

__version__ = "0.1.0"
Loading

0 comments on commit d0b0ab9

Please sign in to comment.