From e97fa1e715fae66141e6fa1e8280e9421ef5ed6c Mon Sep 17 00:00:00 2001 From: Adalbert Makarovych Date: Fri, 19 Jan 2024 14:39:16 +0200 Subject: [PATCH] Fixed parsing of the offset Summary: Sometimes offset partition is provided as Long value. It caused class cast exception --- .../debezium/SingleStoreDBOffsetContext.java | 12 +++++++++++- .../debezium/SingleStoreDBOffsetContextTest.java | 11 ++++++++++- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/singlestore/debezium/SingleStoreDBOffsetContext.java b/src/main/java/com/singlestore/debezium/SingleStoreDBOffsetContext.java index 35a3796..2ba6fea 100644 --- a/src/main/java/com/singlestore/debezium/SingleStoreDBOffsetContext.java +++ b/src/main/java/com/singlestore/debezium/SingleStoreDBOffsetContext.java @@ -71,10 +71,20 @@ private List parseOffsets(String offsets) { }).collect(Collectors.toList()); } + private Integer parsePartitionId(Object partitionId) { + if (partitionId == null) { + return null; + } else if (partitionId instanceof Long) { + return ((Long) partitionId).intValue(); + } else { + return (Integer) partitionId; + } + } + @Override public SingleStoreDBOffsetContext load(Map offset) { String txId = (String) offset.get(SourceInfo.TXID_KEY); - Integer partitionId = (Integer) offset.get(SourceInfo.PARTITIONID_KEY); + Integer partitionId = parsePartitionId(offset.get(SourceInfo.PARTITIONID_KEY)); List offsets = parseOffsets((String) offset.get(SourceInfo.OFFSETS_KEY)); Boolean snapshot = (Boolean) ((Map) offset).getOrDefault(SourceInfo.SNAPSHOT_KEY, Boolean.FALSE); Boolean snapshotCompleted = (Boolean) ((Map) offset).getOrDefault(SNAPSHOT_COMPLETED_KEY, Boolean.FALSE); diff --git a/src/test/java/com/singlestore/debezium/SingleStoreDBOffsetContextTest.java b/src/test/java/com/singlestore/debezium/SingleStoreDBOffsetContextTest.java index cc71273..effaab2 100644 --- a/src/test/java/com/singlestore/debezium/SingleStoreDBOffsetContextTest.java +++ b/src/test/java/com/singlestore/debezium/SingleStoreDBOffsetContextTest.java @@ -2,6 +2,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import java.time.Instant; import java.util.Arrays; @@ -32,7 +33,7 @@ public void saveAndLoad() { offsetContext.update(1, "3", "10"); offsetContext.preSnapshotCompletion(); - Map offset = offsetContext.getOffset(); + Map offset = (Map)offsetContext.getOffset(); Loader loader = new SingleStoreDBOffsetContext.Loader(conf); SingleStoreDBOffsetContext loadedOffsetContext = loader.load(offset); @@ -41,5 +42,13 @@ public void saveAndLoad() { assertEquals(loadedOffsetContext.txId(), "3"); assertEquals(loadedOffsetContext.offsets(), Arrays.asList("1", "10", null, "2")); assertFalse(loadedOffsetContext.isSnapshotRunning()); + + offset.put("partitionId", Long.valueOf(2)); + loadedOffsetContext = loader.load(offset); + assertEquals(loadedOffsetContext.partitionId(), (Integer)2); + + offset.put("partitionId", null); + loadedOffsetContext = loader.load(offset); + assertNull(loadedOffsetContext.partitionId()); } }