Skip to content

Commit

Permalink
Fixed parsing of the offset
Browse files Browse the repository at this point in the history
Summary:
Sometimes offset partition is provided as Long value. It caused class cast exception
  • Loading branch information
AdalbertMemSQL committed Jan 19, 2024
1 parent 92c7516 commit e97fa1e
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,20 @@ private List<String> parseOffsets(String offsets) {
}).collect(Collectors.toList());
}

private Integer parsePartitionId(Object partitionId) {
if (partitionId == null) {
return null;
} else if (partitionId instanceof Long) {
return ((Long) partitionId).intValue();
} else {
return (Integer) partitionId;
}
}

@Override
public SingleStoreDBOffsetContext load(Map<String, ?> offset) {
String txId = (String) offset.get(SourceInfo.TXID_KEY);
Integer partitionId = (Integer) offset.get(SourceInfo.PARTITIONID_KEY);
Integer partitionId = parsePartitionId(offset.get(SourceInfo.PARTITIONID_KEY));
List<String> offsets = parseOffsets((String) offset.get(SourceInfo.OFFSETS_KEY));
Boolean snapshot = (Boolean) ((Map<String, Object>) offset).getOrDefault(SourceInfo.SNAPSHOT_KEY, Boolean.FALSE);
Boolean snapshotCompleted = (Boolean) ((Map<String, Object>) offset).getOrDefault(SNAPSHOT_COMPLETED_KEY, Boolean.FALSE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;

import java.time.Instant;
import java.util.Arrays;
Expand Down Expand Up @@ -32,7 +33,7 @@ public void saveAndLoad() {
offsetContext.update(1, "3", "10");
offsetContext.preSnapshotCompletion();

Map<String, ?> offset = offsetContext.getOffset();
Map<String, Object> offset = (Map<String, Object>)offsetContext.getOffset();

Loader loader = new SingleStoreDBOffsetContext.Loader(conf);
SingleStoreDBOffsetContext loadedOffsetContext = loader.load(offset);
Expand All @@ -41,5 +42,13 @@ public void saveAndLoad() {
assertEquals(loadedOffsetContext.txId(), "3");
assertEquals(loadedOffsetContext.offsets(), Arrays.asList("1", "10", null, "2"));
assertFalse(loadedOffsetContext.isSnapshotRunning());

offset.put("partitionId", Long.valueOf(2));
loadedOffsetContext = loader.load(offset);
assertEquals(loadedOffsetContext.partitionId(), (Integer)2);

offset.put("partitionId", null);
loadedOffsetContext = loader.load(offset);
assertNull(loadedOffsetContext.partitionId());
}
}

0 comments on commit e97fa1e

Please sign in to comment.