Skip to content

Commit

Permalink
PIN-4743: Add distinct feature on read model
Browse files Browse the repository at this point in the history
  • Loading branch information
galales committed Mar 27, 2024
1 parent 1f4fa7b commit b74bff9
Showing 1 changed file with 20 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.mongodb.scala.{ConnectionString, Document, MongoClient, MongoClientSe
import spray.json._

import scala.concurrent.{ExecutionContext, Future}
import scala.reflect.ClassTag

trait ReadModelService {
def findOne[T: JsonReader](collectionName: String, filter: Bson)(implicit ec: ExecutionContext): Future[Option[T]]
Expand All @@ -34,6 +35,9 @@ trait ReadModelService {
limit: Int,
allowDiskUse: Boolean = false
)(implicit ec: ExecutionContext): Future[Seq[T]]
def distinct[T: ClassTag](collectionName: String, fieldName: String, filter: Bson)(implicit
ec: ExecutionContext
): Future[Seq[T]]
def close(): Unit
}

Expand All @@ -47,19 +51,21 @@ final class MongoDbReadModelService(dbConfig: ReadModelConfig) extends ReadModel
.build()
)

def close(): Unit = client.close()
override def close(): Unit = client.close()

private val db: MongoDatabase = client.getDatabase(dbConfig.dbName)

def findOne[T: JsonReader](collectionName: String, filter: Bson)(implicit ec: ExecutionContext): Future[Option[T]] =
override def findOne[T: JsonReader](collectionName: String, filter: Bson)(implicit
ec: ExecutionContext
): Future[Option[T]] =
find[T](collectionName, filter, offset = 0, limit = 1).map(_.headOption)

def find[T: JsonReader](collectionName: String, filter: Bson, offset: Int, limit: Int)(implicit
override def find[T: JsonReader](collectionName: String, filter: Bson, offset: Int, limit: Int)(implicit
ec: ExecutionContext
): Future[Seq[T]] = find[T](collectionName, filter, Projections.include(), offset, limit)

def find[T: JsonReader](collectionName: String, filter: Bson, projection: Bson, offset: Int, limit: Int)(implicit
ec: ExecutionContext
override def find[T: JsonReader](collectionName: String, filter: Bson, projection: Bson, offset: Int, limit: Int)(
implicit ec: ExecutionContext
): Future[Seq[T]] =
for {
results <- db
Expand All @@ -72,7 +78,7 @@ final class MongoDbReadModelService(dbConfig: ReadModelConfig) extends ReadModel
model <- results.traverse(extractData[T](_).toFuture)
} yield model

def aggregate[T: JsonReader](
override def aggregate[T: JsonReader](
collectionName: String,
pipeline: Seq[Bson],
offset: Int,
Expand All @@ -81,7 +87,7 @@ final class MongoDbReadModelService(dbConfig: ReadModelConfig) extends ReadModel
)(implicit ec: ExecutionContext): Future[Seq[T]] =
aggregator(collectionName, pipeline, offset, limit, allowDiskUse)(extractData[T])

def aggregateRaw[T: JsonReader](
override def aggregateRaw[T: JsonReader](
collectionName: String,
pipeline: Seq[Bson],
offset: Int,
Expand All @@ -90,6 +96,13 @@ final class MongoDbReadModelService(dbConfig: ReadModelConfig) extends ReadModel
)(implicit ec: ExecutionContext): Future[Seq[T]] =
aggregator(collectionName, pipeline, offset, limit, allowDiskUse)(extract[T])

override def distinct[T: ClassTag](collectionName: String, fieldName: String, filter: Bson)(implicit
ec: ExecutionContext
): Future[Seq[T]] =
db.getCollection(collectionName)
.distinct[T](fieldName, filter)
.toFuture()

private def aggregator[T](
collectionName: String,
pipeline: Seq[Bson],
Expand Down

0 comments on commit b74bff9

Please sign in to comment.