Skip to content

Commit

Permalink
chore: add ai metrics (#1169)
Browse files Browse the repository at this point in the history
  • Loading branch information
appflowy authored Jan 17, 2025
1 parent 0f7a1f4 commit ab9932a
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 22 deletions.
8 changes: 7 additions & 1 deletion libs/appflowy-ai-client/src/dto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub enum OutputLayout {
SimpleTable = 3,
}

#[derive(Clone, Debug, Default, Serialize_repr, Deserialize_repr)]
#[derive(Clone, Debug, Default, Serialize_repr, Deserialize_repr, Eq, PartialEq)]
#[repr(u8)]
pub enum OutputContent {
#[default]
Expand All @@ -62,6 +62,12 @@ pub enum OutputContent {
RichTextImage = 2,
}

impl OutputContent {
pub fn is_image(&self) -> bool {
*self == OutputContent::IMAGE || *self == OutputContent::RichTextImage
}
}

#[derive(Clone, Default, Debug, Serialize, Deserialize)]
pub struct OutputContentMetadata {
/// Custom prompt for image generation.
Expand Down
58 changes: 37 additions & 21 deletions src/api/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ async fn answer_stream_handler(
chat::chat_ops::select_chat_message_content(&state.pg_pool, question_id).await?;
let rag_ids = chat::chat_ops::select_chat_rag_ids(&state.pg_pool, &chat_id).await?;
let ai_model = ai_model_from_header(&req);
state.metrics.ai_metrics.record_total_stream_count(1);
match state
.ai_client
.stream_question(
Expand All @@ -291,13 +292,16 @@ async fn answer_stream_handler(
.streaming(new_answer_stream),
)
},
Err(err) => Ok(
HttpResponse::Ok()
.content_type("text/event-stream")
.streaming(stream::once(async move {
Err(AppError::AIServiceUnavailable(err.to_string()))
})),
),
Err(err) => {
state.metrics.ai_metrics.record_failed_stream_count(1);
Ok(
HttpResponse::Ok()
.content_type("text/event-stream")
.streaming(stream::once(async move {
Err(AppError::AIServiceUnavailable(err.to_string()))
})),
)
},
}
}

Expand All @@ -313,6 +317,7 @@ async fn answer_stream_v2_handler(
let rag_ids = chat::chat_ops::select_chat_rag_ids(&state.pg_pool, &chat_id).await?;
let ai_model = ai_model_from_header(&req);

state.metrics.ai_metrics.record_total_stream_count(1);
trace!(
"[Chat] stream answer for chat: {}, question: {}, rag_ids: {:?}",
chat_id,
Expand Down Expand Up @@ -340,13 +345,16 @@ async fn answer_stream_v2_handler(
.streaming(new_answer_stream),
)
},
Err(err) => Ok(
HttpResponse::ServiceUnavailable()
.content_type("text/event-stream")
.streaming(stream::once(async move {
Err(AppError::AIServiceUnavailable(err.to_string()))
})),
),
Err(err) => {
state.metrics.ai_metrics.record_failed_stream_count(1);
Ok(
HttpResponse::ServiceUnavailable()
.content_type("text/event-stream")
.streaming(stream::once(async move {
Err(AppError::AIServiceUnavailable(err.to_string()))
})),
)
},
}
}

Expand All @@ -363,6 +371,10 @@ async fn answer_stream_v3_handler(
chat::chat_ops::select_chat_message_content(&state.pg_pool, payload.question_id).await?;
let rag_ids = chat::chat_ops::select_chat_rag_ids(&state.pg_pool, &payload.chat_id).await?;
let ai_model = ai_model_from_header(&req);
state.metrics.ai_metrics.record_total_stream_count(1);
if payload.format.output_content.is_image() {
state.metrics.ai_metrics.record_stream_image_count(1);
}

let question = ChatQuestion {
chat_id: payload.chat_id,
Expand All @@ -377,6 +389,7 @@ async fn answer_stream_v3_handler(
rag_ids,
},
};

trace!("[Chat] stream v3 {:?}", question);
match state
.ai_client
Expand All @@ -391,13 +404,16 @@ async fn answer_stream_v3_handler(
.streaming(new_answer_stream),
)
},
Err(err) => Ok(
HttpResponse::ServiceUnavailable()
.content_type("text/event-stream")
.streaming(stream::once(async move {
Err(AppError::AIServiceUnavailable(err.to_string()))
})),
),
Err(err) => {
state.metrics.ai_metrics.record_failed_stream_count(1);
Ok(
HttpResponse::ServiceUnavailable()
.content_type("text/event-stream")
.streaming(stream::once(async move {
Err(AppError::AIServiceUnavailable(err.to_string()))
})),
)
},
}
}

Expand Down
46 changes: 46 additions & 0 deletions src/biz/chat/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use prometheus_client::metrics::counter::Counter;

#[derive(Default, Clone)]
pub struct AIMetrics {
total_stream_count: Counter,
failed_stream_count: Counter,
stream_image_count: Counter,
}

impl AIMetrics {
pub fn register(registry: &mut prometheus_client::registry::Registry) -> Self {
let metrics = Self::default();
let realtime_registry = registry.sub_registry_with_prefix("ai");

// Register each metric with the Prometheus registry
realtime_registry.register(
"total_stream_count",
"Total count of streams processed",
metrics.total_stream_count.clone(),
);
realtime_registry.register(
"failed_stream_count",
"Total count of failed streams",
metrics.failed_stream_count.clone(),
);
realtime_registry.register(
"image_stream_count",
"Total count of image streams processed",
metrics.stream_image_count.clone(),
);

metrics
}

pub fn record_total_stream_count(&self, count: u64) {
self.total_stream_count.inc_by(count);
}

pub fn record_failed_stream_count(&self, count: u64) {
self.failed_stream_count.inc_by(count);
}

pub fn record_stream_image_count(&self, count: u64) {
self.stream_image_count.inc_by(count);
}
}
1 change: 1 addition & 0 deletions src/biz/chat/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod metrics;
pub mod ops;
4 changes: 4 additions & 0 deletions src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use snowflake::Snowflake;
use tonic_proto::history::history_client::HistoryClient;

use crate::api::metrics::{AppFlowyWebMetrics, PublishedCollabMetrics, RequestMetrics};
use crate::biz::chat::metrics::AIMetrics;
use crate::biz::pg_listener::PgListeners;
use crate::biz::workspace::publish::PublishedCollabStore;
use crate::config::config::Config;
Expand Down Expand Up @@ -130,6 +131,7 @@ pub struct AppMetrics {
pub appflowy_web_metrics: Arc<AppFlowyWebMetrics>,
pub embedding_metrics: Arc<EmbeddingMetrics>,
pub collab_stream_metrics: Arc<CollabStreamMetrics>,
pub ai_metrics: Arc<AIMetrics>,
}

impl Default for AppMetrics {
Expand All @@ -149,6 +151,7 @@ impl AppMetrics {
let appflowy_web_metrics = Arc::new(AppFlowyWebMetrics::register(&mut registry));
let embedding_metrics = Arc::new(EmbeddingMetrics::register(&mut registry));
let collab_stream_metrics = Arc::new(CollabStreamMetrics::register(&mut registry));
let ai_metrics = Arc::new(AIMetrics::register(&mut registry));
Self {
registry: Arc::new(registry),
request_metrics,
Expand All @@ -159,6 +162,7 @@ impl AppMetrics {
appflowy_web_metrics,
embedding_metrics,
collab_stream_metrics,
ai_metrics,
}
}
}
Expand Down

0 comments on commit ab9932a

Please sign in to comment.