[lake] Fix union read duplicating rows for non-String partition columns#3500
[lake] Fix union read duplicating rows for non-String partition columns#3500bryndenZh wants to merge 1 commit into
Conversation
For a partitioned table with table.datalake.enabled=true and a non-String partition column (DATE, INT, TIMESTAMP, etc.), default union read returned each tiered row twice because PaimonSplit#partition() read every partition field via BinaryRow.getString regardless of logical type. The lake-side partition name then never matched the Fluss-side name in LakeSplitGenerator, so the same partition was emitted as both a lake-only split and a Fluss-log split. Add PaimonPartitionUtils to render partition values in a logical-type-aware way matching PartitionUtils#convertValueOfType, compute them in PaimonSplitPlanner, and apply the same fix to DvTableReadableSnapshotRetriever. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Fixes duplicated rows in lake/union reads for partitioned tables when partition columns are non-STRING types by rendering Paimon partition values in the same type-aware format Fluss uses for partition names.
Changes:
- Add
PaimonPartitionUtils.partitionValues(...)to convertBinaryRowpartition fields type-aware into Fluss partition-name strings. - Use the new conversion in split planning and DV-table partition name mapping so lake-side and Fluss-side partition names match.
- Extend tests to cover type parity with
PartitionUtils#convertValueOfType, end-to-end DATE partition rendering, and split (de)serialization.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonPartitionUtils.java | New utility to render Paimon partition rows into Fluss-formatted partition value strings (type-aware). |
| fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java | Use type-aware partition conversion when building Fluss partition names from Paimon BinaryRow. |
| fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitPlanner.java | Attach Fluss-formatted partition values onto planned PaimonSplits. |
| fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplit.java | Store partition values explicitly rather than re-reading via BinaryRow.getString(...). |
| fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitSerializer.java | Serialize/deserialize partition values alongside existing split fields. |
| fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonPartitionUtilsTest.java | New unit tests ensuring Paimon conversion matches Fluss PartitionUtils across supported types. |
| fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSplitTest.java | Adds DATE-partition end-to-end assertion for planned split partition rendering. |
| fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSplitSerializerTest.java | Verifies partition values round-trip through the split serializer. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| DataSplit dataSplit = paimonSplit.dataSplit(); | ||
| InstantiationUtil.serializeObject(view, dataSplit); | ||
| view.writeBoolean(paimonSplit.isBucketUnAware()); | ||
| List<String> partition = paimonSplit.partition(); | ||
| view.writeInt(partition.size()); | ||
| for (String value : partition) { | ||
| view.writeUTF(value); | ||
| } |
| if (version == VERSION_1) { | ||
| DataInputStream dis = new DataInputStream(in); | ||
| boolean isBucketUnAware = dis.readBoolean(); | ||
| return new PaimonSplit(dataSplit, isBucketUnAware); | ||
| int size = dis.readInt(); | ||
| List<String> partition = new ArrayList<>(size); | ||
| for (int i = 0; i < size; i++) { | ||
| partition.add(dis.readUTF()); | ||
| } | ||
| return new PaimonSplit(dataSplit, isBucketUnAware, partition); | ||
| } else { | ||
| throw new IOException("Unsupported PaimonSplit serialization version: " + version); | ||
| } |
| public List<String> partition() { | ||
| BinaryRow partition = dataSplit.partition(); | ||
| if (partition.getFieldCount() == 0) { | ||
| return Collections.emptyList(); | ||
| } | ||
|
|
||
| List<String> partitions = new ArrayList<>(); | ||
| for (int i = 0; i < partition.getFieldCount(); i++) { | ||
| // Todo Currently, partition column must be String datatype, so we can always use | ||
| // consider it as string. Revisit here when | ||
| // #489 is finished. | ||
| partitions.add(partition.getString(i).toString()); | ||
| } | ||
| return partitions; | ||
| return partition; | ||
| } |
|
@bryndenZh Thanks for the pr. Nice catch on the root cause. One thing worth tightening before merge:
We can delegate formatting to the single authoritative public static List<String> toFlussPartitionValues(
BinaryRow partition, org.apache.fluss.types.RowType flussPartitionType) {
PaimonRowAsFlussRow flussRow = new PaimonRowAsFlussRow().replaceRow(partition);
List<String> values = new ArrayList<>(partition.getFieldCount());
for (int i = 0; i < partition.getFieldCount(); i++) {
DataType flussType = flussPartitionType.getTypeAt(i);
Object value = InternalRow.createFieldGetter(flussType, i).getFieldOrNull(flussRow);
values.add(PartitionUtils.convertValueOfType(value, flussType.getTypeRoot()));
}
return values;
}This needs a Paimon→Fluss type converter, which doesn't exist yet (we only have the reverse A few smaller points:
|
Purpose
Related issue: #3490
For a partitioned table with
table.datalake.enabled=trueand a non-STRINGpartition column (
DATE,INT,TIMESTAMP, ...), union read returns each tieredrow twice.
PaimonSplit#partition()read every field viaBinaryRow.getStringregardless oftype, so for non-
STRINGcolumns the lake-side partition name was garbage and nevermatched the Fluss-side name in
LakeSplitGenerator— the partition was then emittedas both a lake split and a Fluss-log split.
Brief change log
PaimonPartitionUtils#partitionValuesto render partition values type-aware,matching the Fluss-side format (
PartitionUtils#convertValueOfType).Tests
PaimonPartitionUtilsTest: every partition type's output equalsPartitionUtils#convertValueOfTypefor the same value; plus string andmulti-column cases.
PaimonSplitTest#testPaimonSplitWithDatePartition: a DATE-partitioned tableyields
["2024-03-01"]end-to-end through the planner (garbage before).PaimonSplitSerializerTest: partition values round-trip through serialization.API and Format
PaimonSplit's serialized form now includes the partition values (appended afterthe existing fields).
Documentation
No user-facing documentation change.