Skip to content

Commit

Permalink
fix getRelationships API when using query param (#1615)
Browse files Browse the repository at this point in the history
  • Loading branch information
Vladysl authored Feb 9, 2024
1 parent 3ce9c55 commit 94650de
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 76 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.opendatadiscovery.oddplatform.repository.reactive;

import org.opendatadiscovery.oddplatform.api.contract.model.RelationshipsType;
import org.opendatadiscovery.oddplatform.dto.RelationshipDto;
import org.opendatadiscovery.oddplatform.utils.Page;
import reactor.core.publisher.Mono;

public interface ReactiveDataEntityRelationshipRepository {

Mono<Page<RelationshipDto>> getRelationships(final Integer page, final Integer size,
final String inputQuery, final RelationshipsType type);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package org.opendatadiscovery.oddplatform.repository.reactive;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.jooq.Condition;
import org.jooq.Field;
import org.jooq.Record;
import org.jooq.ResultQuery;
import org.jooq.Select;
import org.jooq.SortOrder;
import org.jooq.Table;
import org.jooq.impl.DSL;
import org.opendatadiscovery.oddplatform.api.contract.model.RelationshipsType;
import org.opendatadiscovery.oddplatform.dto.DataEntityTypeDto;
import org.opendatadiscovery.oddplatform.dto.RelationshipDto;
import org.opendatadiscovery.oddplatform.model.tables.pojos.DataEntityPojo;
import org.opendatadiscovery.oddplatform.model.tables.pojos.RelationshipsPojo;
import org.opendatadiscovery.oddplatform.model.tables.records.DataEntityRecord;
import org.opendatadiscovery.oddplatform.model.tables.records.RelationshipsRecord;
import org.opendatadiscovery.oddplatform.repository.util.JooqQueryHelper;
import org.opendatadiscovery.oddplatform.repository.util.JooqReactiveOperations;
import org.opendatadiscovery.oddplatform.repository.util.OrderByField;
import org.opendatadiscovery.oddplatform.utils.Page;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Mono;

import static org.opendatadiscovery.oddplatform.model.Tables.DATA_ENTITY;
import static org.opendatadiscovery.oddplatform.model.Tables.RELATIONSHIPS;

@Slf4j
@Repository
@RequiredArgsConstructor
public class ReactiveDataEntityRelationshipRepositoryImpl
implements ReactiveDataEntityRelationshipRepository {
private static final String RELATIONSHIPS_CTE = "relationships_cte";
private static final String SOURCE_DATA_ENTITY = "source_data_entity";
private static final String TARGET_DATA_ENTITY = "target_data_entity";

private final JooqReactiveOperations jooqReactiveOperations;
private final JooqQueryHelper jooqQueryHelper;

@Override
public Mono<Page<RelationshipDto>> getRelationships(final Integer page, final Integer size,
final String inputQuery, final RelationshipsType type) {
final Table<RelationshipsRecord> relationships = RELATIONSHIPS.asTable(RELATIONSHIPS_CTE);
final Table<DataEntityRecord> srcDataEntity = DATA_ENTITY.asTable(SOURCE_DATA_ENTITY);
final Table<DataEntityRecord> trgtDataEntity = DATA_ENTITY.asTable(TARGET_DATA_ENTITY);

final List<Condition> conditionList = new ArrayList<>();

if (!StringUtils.isBlank(inputQuery)) {
conditionList.add(DATA_ENTITY.EXTERNAL_NAME.containsIgnoreCase(inputQuery));
}

conditionList.add(DATA_ENTITY.TYPE_ID.eq(DataEntityTypeDto.RELATIONSHIP.getId()));

final Select<DataEntityRecord> homogeneousQuery = DSL.selectFrom(DATA_ENTITY)
.where(conditionList);

final Select<? extends Record> relationshipSelect =
jooqQueryHelper.paginate(homogeneousQuery,
List.of(new OrderByField(DATA_ENTITY.ID, SortOrder.ASC)), (page - 1) * size, size);

final Table<? extends Record> relationshipsDataEntityCTE =
relationshipSelect.asTable("data_entity_relationship_cte");

final List<Field<?>> groupByFields =
Stream.of(relationshipsDataEntityCTE.fields(), srcDataEntity.fields(),
trgtDataEntity.fields(), relationships.fields())
.flatMap(Arrays::stream)
.toList();

final ResultQuery<Record> resultQuery = DSL.with(relationshipsDataEntityCTE.getName())
.as(relationshipSelect)
.select(relationshipsDataEntityCTE.fields())
.select(relationships.asterisk(), srcDataEntity.asterisk(), trgtDataEntity.asterisk())
.from(relationshipsDataEntityCTE.getName())
.join(relationships)
.on(relationshipsDataEntityCTE.field(DATA_ENTITY.ID).eq(relationships.field(RELATIONSHIPS.DATA_ENTITY_ID))
.and(RelationshipsType.ALL == type
? DSL.noCondition()
: relationships.field(RELATIONSHIPS.RELATIONSHIP_TYPE).eq(type.getValue())))
.leftJoin(srcDataEntity)
.on(relationships.field(RELATIONSHIPS.SOURCE_DATASET_ODDRN).eq(srcDataEntity.field(DATA_ENTITY.ODDRN)))
.leftJoin(trgtDataEntity)
.on(relationships.field(RELATIONSHIPS.TARGET_DATASET_ODDRN).eq(trgtDataEntity.field(DATA_ENTITY.ODDRN)))
.groupBy(groupByFields);

return jooqReactiveOperations.flux(resultQuery)
.collectList()
.flatMap(record -> jooqQueryHelper.pageifyResult(
record,
r -> RelationshipDto.builder()
.relationshipPojo(r.into(relationships).into(RelationshipsPojo.class))
.dataEntityRelationship(r.into(relationshipsDataEntityCTE).into(DataEntityPojo.class))
.sourceDataEntity(r.into(srcDataEntity).into(DataEntityPojo.class))
.targetDataEntity(r.into(trgtDataEntity).into(DataEntityPojo.class))
.build(),
jooqReactiveOperations
.mono(DSL.selectCount().from(DATA_ENTITY).where(conditionList))
.map(r -> r.component1().longValue())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import org.opendatadiscovery.oddplatform.api.contract.model.RelationshipsType;
import org.opendatadiscovery.oddplatform.dto.RelationshipDto;
import org.opendatadiscovery.oddplatform.model.tables.pojos.RelationshipsPojo;
import org.opendatadiscovery.oddplatform.utils.Page;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand All @@ -13,7 +12,4 @@ public interface ReactiveRelationshipsRepository extends ReactiveCRUDRepository<
Mono<List<RelationshipsPojo>> getRelationshipByDataEntityIds(final List<Long> dataEntityRelationshipIds);

Flux<RelationshipDto> getRelationsByDatasetIdAndType(final Long dataEntityId, final RelationshipsType type);

Mono<Page<RelationshipDto>> getRelationships(final Integer page, final Integer size,
final String inputQuery, final RelationshipsType type);
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
package org.opendatadiscovery.oddplatform.repository.reactive;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import org.jooq.Field;
import org.jooq.Record;
import org.jooq.Record1;
import org.jooq.ResultQuery;
import org.jooq.Select;
import org.jooq.SelectConditionStep;
import org.jooq.SelectOnConditionStep;
import org.jooq.SortOrder;
import org.jooq.Table;
import org.jooq.impl.DSL;
import org.opendatadiscovery.oddplatform.api.contract.model.RelationshipsType;
Expand All @@ -23,8 +18,6 @@
import org.opendatadiscovery.oddplatform.repository.util.JooqQueryHelper;
import org.opendatadiscovery.oddplatform.repository.util.JooqReactiveOperations;
import org.opendatadiscovery.oddplatform.repository.util.JooqRecordHelper;
import org.opendatadiscovery.oddplatform.repository.util.OrderByField;
import org.opendatadiscovery.oddplatform.utils.Page;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand All @@ -42,14 +35,10 @@ public class ReactiveRelationshipsRepositoryImpl
private static final String SOURCE_DATA_ENTITY = "source_data_entity";
private static final String TARGET_DATA_ENTITY = "target_data_entity";

private final JooqRecordHelper jooqRecordHelper;

public ReactiveRelationshipsRepositoryImpl(final JooqReactiveOperations jooqReactiveOperations,
final JooqQueryHelper jooqQueryHelper,
final JooqRecordHelper jooqRecordHelper) {
super(jooqReactiveOperations, jooqQueryHelper, RELATIONSHIPS, RelationshipsPojo.class);

this.jooqRecordHelper = jooqRecordHelper;
}

@Override
Expand Down Expand Up @@ -95,70 +84,22 @@ public Flux<RelationshipDto> getRelationsByDatasetIdAndType(final Long dataEntit
}

return jooqReactiveOperations.flux(finalQuery)
.map(r -> mapToDto(r.into(RELATIONSHIPS).into(RelationshipsPojo.class), r, relationshipsDataEntity,
srcDataEntity, trgtDataEntity));
}

@Override
public Mono<Page<RelationshipDto>> getRelationships(final Integer page, final Integer size,
final String inputQuery, final RelationshipsType type) {
final Table<DataEntityRecord> relationshipsDataEntity = DATA_ENTITY.asTable(RELATIONSHIPS_DATA_ENTITY);
final Table<DataEntityRecord> srcDataEntity = DATA_ENTITY.asTable(SOURCE_DATA_ENTITY);
final Table<DataEntityRecord> trgtDataEntity = DATA_ENTITY.asTable(TARGET_DATA_ENTITY);

final Select<RelationshipsRecord> homogeneousQuery = DSL.selectFrom(RELATIONSHIPS)
.where(listCondition(inputQuery));

final Select<? extends Record> relationshipSelect =
paginate(homogeneousQuery,
List.of(new OrderByField(RELATIONSHIPS.ID, SortOrder.ASC)), (page - 1) * size, size);

final Table<? extends Record> relationshipCTE = relationshipSelect.asTable("relationship_cte");

final List<Field<?>> groupByFields =
Stream.of(relationshipCTE.fields(), srcDataEntity.fields(),
trgtDataEntity.fields(), relationshipsDataEntity.fields())
.flatMap(Arrays::stream)
.toList();

final SelectOnConditionStep<Record> generalQuery = DSL.with(relationshipCTE.getName())
.as(relationshipSelect)
.select(relationshipCTE.fields())
.select(relationshipsDataEntity.asterisk(), srcDataEntity.asterisk(), trgtDataEntity.asterisk())
.from(relationshipCTE.getName())
.join(relationshipsDataEntity)
.on(relationshipCTE.field(RELATIONSHIPS.DATA_ENTITY_ID).eq(relationshipsDataEntity.field(DATA_ENTITY.ID)))
.leftJoin(srcDataEntity)
.on(relationshipCTE.field(RELATIONSHIPS.SOURCE_DATASET_ODDRN).eq(srcDataEntity.field(DATA_ENTITY.ODDRN)))
.leftJoin(trgtDataEntity)
.on(relationshipCTE.field(RELATIONSHIPS.TARGET_DATASET_ODDRN).eq(trgtDataEntity.field(DATA_ENTITY.ODDRN)));

final ResultQuery<Record> resultQuery =
RelationshipsType.ALL == type
? generalQuery.groupBy(groupByFields)
: generalQuery.where(relationshipCTE.field(RELATIONSHIPS.RELATIONSHIP_TYPE).eq(type.getValue()))
.groupBy(groupByFields);

return jooqReactiveOperations.flux(resultQuery)
.collectList()
.flatMap(record -> jooqQueryHelper.pageifyResult(
record,
r -> mapToDto(jooqRecordHelper.remapCte(r, relationshipCTE.getName(), RELATIONSHIPS)
.into(RelationshipsPojo.class), r, relationshipsDataEntity, srcDataEntity, trgtDataEntity),
fetchCount(inputQuery)));
.map(r -> mapToDto(r.into(RELATIONSHIPS).into(RelationshipsPojo.class),
r.into(relationshipsDataEntity).into(DataEntityPojo.class),
r.into(srcDataEntity).into(DataEntityPojo.class),
r.into(trgtDataEntity).into(DataEntityPojo.class))
);
}

private RelationshipDto mapToDto(final RelationshipsPojo pojo, final Record record,
final Table<DataEntityRecord> relationshipsDataEntity,
final Table<DataEntityRecord> srcDataEntity,
final Table<DataEntityRecord> trgtDataEntity) {
private RelationshipDto mapToDto(final RelationshipsPojo pojo,
final DataEntityPojo relationshipsDataEntity,
final DataEntityPojo srcDataEntity,
final DataEntityPojo trgtDataEntity) {
return RelationshipDto.builder()
.relationshipPojo(pojo)
.dataEntityRelationship(
relationshipsDataEntity != null ? record.into(relationshipsDataEntity).into(DataEntityPojo.class) :
null)
.sourceDataEntity(srcDataEntity != null ? record.into(srcDataEntity).into(DataEntityPojo.class) : null)
.targetDataEntity(trgtDataEntity != null ? record.into(trgtDataEntity).into(DataEntityPojo.class) : null)
.dataEntityRelationship(relationshipsDataEntity)
.sourceDataEntity(srcDataEntity)
.targetDataEntity(trgtDataEntity)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.opendatadiscovery.oddplatform.exception.NotFoundException;
import org.opendatadiscovery.oddplatform.mapper.RelationshipDetailsMapper;
import org.opendatadiscovery.oddplatform.mapper.RelationshipMapper;
import org.opendatadiscovery.oddplatform.repository.reactive.ReactiveDataEntityRelationshipRepository;
import org.opendatadiscovery.oddplatform.repository.reactive.ReactiveRelationshipsDetailsRepository;
import org.opendatadiscovery.oddplatform.repository.reactive.ReactiveRelationshipsRepository;
import org.springframework.stereotype.Service;
Expand All @@ -17,6 +18,7 @@
@RequiredArgsConstructor
public class RelationshipsServiceImpl implements RelationshipsService {
private final ReactiveRelationshipsRepository relationshipsRepository;
private final ReactiveDataEntityRelationshipRepository dataEntityRelationshipRepository;
private final ReactiveRelationshipsDetailsRepository relationshipsDetailsRepository;
private final RelationshipMapper relationshipMapper;
private final RelationshipDetailsMapper relationshipDetailsMapper;
Expand All @@ -32,7 +34,7 @@ public Mono<DataEntityRelationshipList> getRelationsByDatasetId(final Long dataE
@Override
public Mono<DataEntityRelationshipList> getRelationships(final Integer page, final Integer size,
final RelationshipsType type, final String query) {
return relationshipsRepository.getRelationships(page, size, query, type)
return dataEntityRelationshipRepository.getRelationships(page, size, query, type)
.map(relationshipMapper::mapListToRelationshipPage);
}

Expand Down

0 comments on commit 94650de

Please sign in to comment.