diff --git a/cqrs/src/main/scala/it/pagopa/interop/commons/cqrs/service/ReadModelService.scala b/cqrs/src/main/scala/it/pagopa/interop/commons/cqrs/service/ReadModelService.scala index d048a96e..9d7f2851 100644 --- a/cqrs/src/main/scala/it/pagopa/interop/commons/cqrs/service/ReadModelService.scala +++ b/cqrs/src/main/scala/it/pagopa/interop/commons/cqrs/service/ReadModelService.scala @@ -11,6 +11,7 @@ import org.mongodb.scala.{ConnectionString, Document, MongoClient, MongoClientSe import spray.json._ import scala.concurrent.{ExecutionContext, Future} +import scala.reflect.ClassTag trait ReadModelService { def findOne[T: JsonReader](collectionName: String, filter: Bson)(implicit ec: ExecutionContext): Future[Option[T]] @@ -34,6 +35,9 @@ trait ReadModelService { limit: Int, allowDiskUse: Boolean = false )(implicit ec: ExecutionContext): Future[Seq[T]] + def distinct[T: ClassTag](collectionName: String, fieldName: String, filter: Bson)(implicit + ec: ExecutionContext + ): Future[Seq[T]] def close(): Unit } @@ -47,19 +51,21 @@ final class MongoDbReadModelService(dbConfig: ReadModelConfig) extends ReadModel .build() ) - def close(): Unit = client.close() + override def close(): Unit = client.close() private val db: MongoDatabase = client.getDatabase(dbConfig.dbName) - def findOne[T: JsonReader](collectionName: String, filter: Bson)(implicit ec: ExecutionContext): Future[Option[T]] = + override def findOne[T: JsonReader](collectionName: String, filter: Bson)(implicit + ec: ExecutionContext + ): Future[Option[T]] = find[T](collectionName, filter, offset = 0, limit = 1).map(_.headOption) - def find[T: JsonReader](collectionName: String, filter: Bson, offset: Int, limit: Int)(implicit + override def find[T: JsonReader](collectionName: String, filter: Bson, offset: Int, limit: Int)(implicit ec: ExecutionContext ): Future[Seq[T]] = find[T](collectionName, filter, Projections.include(), offset, limit) - def find[T: JsonReader](collectionName: String, filter: Bson, projection: Bson, offset: Int, limit: Int)(implicit - ec: ExecutionContext + override def find[T: JsonReader](collectionName: String, filter: Bson, projection: Bson, offset: Int, limit: Int)( + implicit ec: ExecutionContext ): Future[Seq[T]] = for { results <- db @@ -72,7 +78,7 @@ final class MongoDbReadModelService(dbConfig: ReadModelConfig) extends ReadModel model <- results.traverse(extractData[T](_).toFuture) } yield model - def aggregate[T: JsonReader]( + override def aggregate[T: JsonReader]( collectionName: String, pipeline: Seq[Bson], offset: Int, @@ -81,7 +87,7 @@ final class MongoDbReadModelService(dbConfig: ReadModelConfig) extends ReadModel )(implicit ec: ExecutionContext): Future[Seq[T]] = aggregator(collectionName, pipeline, offset, limit, allowDiskUse)(extractData[T]) - def aggregateRaw[T: JsonReader]( + override def aggregateRaw[T: JsonReader]( collectionName: String, pipeline: Seq[Bson], offset: Int, @@ -90,6 +96,13 @@ final class MongoDbReadModelService(dbConfig: ReadModelConfig) extends ReadModel )(implicit ec: ExecutionContext): Future[Seq[T]] = aggregator(collectionName, pipeline, offset, limit, allowDiskUse)(extract[T]) + override def distinct[T: ClassTag](collectionName: String, fieldName: String, filter: Bson)(implicit + ec: ExecutionContext + ): Future[Seq[T]] = + db.getCollection(collectionName) + .distinct[T](fieldName, filter) + .toFuture() + private def aggregator[T]( collectionName: String, pipeline: Seq[Bson],