diff --git a/fang/src/asynk/async_queue.rs b/fang/src/asynk/async_queue.rs index 013670a..942109b 100644 --- a/fang/src/asynk/async_queue.rs +++ b/fang/src/asynk/async_queue.rs @@ -191,6 +191,17 @@ impl InternalPool { _ => panic!("Not a SqlitePool!"), } } + + pub(crate) fn backend(&self) -> BackendSqlX { + match *self { + #[cfg(feature = "asynk-postgres")] + InternalPool::Pg(_) => BackendSqlX::Pg, + #[cfg(feature = "asynk-mysql")] + InternalPool::MySql(_) => BackendSqlX::MySql, + #[cfg(feature = "asynk-sqlite")] + InternalPool::Sqlite(_) => BackendSqlX::Sqlite, + } + } } #[derive(TypedBuilder, Debug, Clone)] @@ -203,8 +214,6 @@ pub struct AsyncQueue { max_pool_size: u32, #[builder(default = false, setter(skip))] connected: bool, - #[builder(default = BackendSqlX::NoBackend, setter(skip))] - backend: BackendSqlX, } #[cfg(test)] @@ -287,7 +296,6 @@ impl AsyncQueue { let (backend, pool) = get_backend(kind, &self.uri, self.max_pool_size).await?; self.pool = Some(pool); - self.backend = backend; self.connected = true; Ok(()) } @@ -420,8 +428,8 @@ impl AsyncQueueable for AsyncQueue { let query_params = QueryParams::builder().uuid(id).build(); - let task = self - .backend + let task = pool + .backend() .execute_query(SqlXQuery::FindTaskById, pool, query_params) .await? .unwrap_task(); @@ -437,7 +445,7 @@ impl AsyncQueueable for AsyncQueue { // this unwrap is safe because we check if connection is established let pool = self.pool.as_ref().unwrap(); - let task = Self::fetch_and_touch_task_query(pool, &self.backend, task_type).await?; + let task = Self::fetch_and_touch_task_query(pool, &pool.backend(), task_type).await?; Ok(task) } @@ -451,7 +459,7 @@ impl AsyncQueueable for AsyncQueue { let task = if !task.uniq() { Self::insert_task_query( pool, - &self.backend, + &pool.backend(), &metadata, &task.task_type(), &Utc::now(), @@ -460,7 +468,7 @@ impl AsyncQueueable for AsyncQueue { } else { Self::insert_task_if_not_exist_query( pool, - &self.backend, + &pool.backend(), &metadata, &task.task_type(), &Utc::now(), @@ -476,7 +484,7 @@ impl AsyncQueueable for AsyncQueue { // this unwrap is safe because we check if connection is established let pool = self.pool.as_ref().unwrap(); - let task = Self::schedule_task_query(pool, &self.backend, task).await?; + let task = Self::schedule_task_query(pool, &pool.backend(), task).await?; Ok(task) } @@ -488,8 +496,8 @@ impl AsyncQueueable for AsyncQueue { let query_params = QueryParams::builder().build(); - let result = self - .backend + let result = pool + .backend() .execute_query(SqlXQuery::RemoveAllTask, pool, query_params) .await? .unwrap_u64(); @@ -504,8 +512,8 @@ impl AsyncQueueable for AsyncQueue { let query_params = QueryParams::builder().build(); - let result = self - .backend + let result = pool + .backend() .execute_query(SqlXQuery::RemoveAllScheduledTask, pool, query_params) .await? .unwrap_u64(); @@ -519,8 +527,8 @@ impl AsyncQueueable for AsyncQueue { let query_params = QueryParams::builder().uuid(id).build(); - let result = self - .backend + let result = pool + .backend() .execute_query(SqlXQuery::RemoveTask, pool, query_params) .await? .unwrap_u64(); @@ -538,8 +546,8 @@ impl AsyncQueueable for AsyncQueue { let query_params = QueryParams::builder().runnable(task).build(); - let result = self - .backend + let result = pool + .backend() .execute_query(SqlXQuery::RemoveTaskByMetadata, pool, query_params) .await? .unwrap_u64(); @@ -556,8 +564,8 @@ impl AsyncQueueable for AsyncQueue { let query_params = QueryParams::builder().task_type(task_type).build(); - let result = self - .backend + let result = pool + .backend() .execute_query(SqlXQuery::RemoveTaskType, pool, query_params) .await? .unwrap_u64(); @@ -575,8 +583,8 @@ impl AsyncQueueable for AsyncQueue { let query_params = QueryParams::builder().uuid(&task.id).state(state).build(); - let task = self - .backend + let task = pool + .backend() .execute_query(SqlXQuery::UpdateTaskState, pool, query_params) .await? .unwrap_task(); @@ -597,8 +605,8 @@ impl AsyncQueueable for AsyncQueue { .task(task) .build(); - let failed_task = self - .backend + let failed_task = pool + .backend() .execute_query(SqlXQuery::FailTask, pool, query_params) .await? .unwrap_task(); @@ -622,8 +630,8 @@ impl AsyncQueueable for AsyncQueue { .task(task) .build(); - let failed_task = self - .backend + let failed_task = pool + .backend() .execute_query(SqlXQuery::RetryTask, pool, query_params) .await? .unwrap_task(); diff --git a/fang/src/asynk/backend_sqlx.rs b/fang/src/asynk/backend_sqlx.rs index b61e835..b1e2598 100644 --- a/fang/src/asynk/backend_sqlx.rs +++ b/fang/src/asynk/backend_sqlx.rs @@ -38,8 +38,6 @@ pub(crate) enum BackendSqlX { #[cfg(feature = "asynk-mysql")] MySql, - - NoBackend, } #[allow(dead_code)]