Skip to content

Commit

Permalink
feature: upsert integrations instead of just inserting them (#3168)
Browse files Browse the repository at this point in the history
When we attempt to insert integrations that already exist in Kessel, we
receive an error response from that operation. Activating the "upsert"
flag when migrating will help us avoid that.

RHCLOUD-35454
  • Loading branch information
MikelAlejoBR authored Dec 4, 2024
1 parent 2751e4c commit f790ccf
Showing 1 changed file with 7 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void migrateAssets() {
int offset = 0;
int traceLoops = 0;
do {
Log.tracef("[loops: %s] Loops", traceLoops);
Log.debugf("[loops: %s] Loops", traceLoops);

final List<Endpoint> fetchedEndpoints = this.endpointRepository.getNonSystemEndpointsWithLimitAndOffset(this.backendConfig.getKesselMigrationBatchSize(), offset);
Log.debugf("[offset: %s][first_integration: %s][last_integration: %s] Fetched batch of %s integrations", offset, (fetchedEndpoints.isEmpty()) ? "none" : fetchedEndpoints.getFirst().getId(), (fetchedEndpoints.isEmpty()) ? "none" : fetchedEndpoints.getLast().getId(), fetchedEndpoints.size());
Expand All @@ -74,7 +74,7 @@ public void migrateAssets() {
// all the time, the last one might be empty, so there is no need
// to attempt calling Kessel.
if (fetchedEndpoints.isEmpty()) {
Log.trace("Breaking the do-while loop because the size of the fetched integrations is zero");
Log.debug("Breaking the do-while loop because the size of the fetched integrations is zero");
break;
}

Expand All @@ -85,19 +85,19 @@ public void migrateAssets() {
this.relationTuplesClient.createTuples(request, new StreamObserver<>() {
@Override
public void onNext(final CreateTuplesResponse createTuplesResponse) {
Log.trace("Calling onNext");
Log.debug("Calling onNext");
Log.infof("[offset: %s][first_integration: %s][last_integration: %s] Sent batch of % integrations to Kessel", finalOffset, fetchedEndpoints.getFirst().getId(), fetchedEndpoints.getLast().getId(), fetchedEndpoints.size());
}

@Override
public void onError(final Throwable throwable) {
Log.trace("Calling onError");
Log.debug("Calling onError");
Log.errorf(throwable, "[offset: %s][first_integration: %s][last_integration: %s] Unable to send batch of tuples to Kessel", finalOffset, fetchedEndpoints.getFirst().getId(), fetchedEndpoints.getLast().getId());
}

@Override
public void onCompleted() {
Log.trace("Calling onCompleted");
Log.debug("Calling onCompleted");
Log.infof("[offset: %s][first_integration: %s][last_integration: %s] Sent batch of % integrations to Kessel", finalOffset, fetchedEndpoints.getFirst().getId(), fetchedEndpoints.getLast().getId(), fetchedEndpoints.size());
}
});
Expand All @@ -106,7 +106,7 @@ public void onCompleted() {
offset += fetchedEndpoints.size();
traceLoops += 1;

Log.tracef("[fetchedEndpointsSize: %s][kesselMigrationBatchSize: %s][offset: %s] do-while loop condition", fetchedEndpointsSize, offset, this.backendConfig.getKesselMigrationBatchSize());
Log.debugf("[fetchedEndpointsSize: %s][kesselMigrationBatchSize: %s][offset: %s] do-while loop condition", fetchedEndpointsSize, offset, this.backendConfig.getKesselMigrationBatchSize());
} while (fetchedEndpointsSize == this.backendConfig.getKesselMigrationBatchSize());

Log.info("Finished migrating integrations to the Kessel inventory");
Expand Down Expand Up @@ -172,6 +172,7 @@ protected CreateTuplesRequest createTuplesRequest(final List<Endpoint> endpoints
return CreateTuplesRequest
.newBuilder()
.addAllTuples(relations)
.setUpsert(true)
.build();
}

Expand Down

0 comments on commit f790ccf

Please sign in to comment.