Skip to content

[lake] Fix union read duplicating rows for non-String partition columns#3500

Open
bryndenZh wants to merge 1 commit into
apache:mainfrom
bryndenZh:zqy/fix-paimon-nonstring-partition-union-read
Open

[lake] Fix union read duplicating rows for non-String partition columns#3500
bryndenZh wants to merge 1 commit into
apache:mainfrom
bryndenZh:zqy/fix-paimon-nonstring-partition-union-read

Conversation

@bryndenZh

@bryndenZh bryndenZh commented Jun 19, 2026

Copy link
Copy Markdown

Purpose

Related issue: #3490

For a partitioned table with table.datalake.enabled=true and a non-STRING
partition column (DATE, INT, TIMESTAMP, ...), union read returns each tiered
row twice.

PaimonSplit#partition() read every field via BinaryRow.getString regardless of
type, so for non-STRING columns the lake-side partition name was garbage and never
matched the Fluss-side name in LakeSplitGenerator — the partition was then emitted
as both a lake split and a Fluss-log split.

Brief change log

  • Add PaimonPartitionUtils#partitionValues to render partition values type-aware,
    matching the Fluss-side format (PartitionUtils#convertValueOfType).

Tests

  • PaimonPartitionUtilsTest: every partition type's output equals
    PartitionUtils#convertValueOfType for the same value; plus string and
    multi-column cases.
  • PaimonSplitTest#testPaimonSplitWithDatePartition: a DATE-partitioned table
    yields ["2024-03-01"] end-to-end through the planner (garbage before).
  • PaimonSplitSerializerTest: partition values round-trip through serialization.

API and Format

PaimonSplit's serialized form now includes the partition values (appended after
the existing fields).

Documentation

No user-facing documentation change.

For a partitioned table with table.datalake.enabled=true and a non-String
partition column (DATE, INT, TIMESTAMP, etc.), default union read returned each
tiered row twice because PaimonSplit#partition() read every partition field via
BinaryRow.getString regardless of logical type. The lake-side partition name
then never matched the Fluss-side name in LakeSplitGenerator, so the same
partition was emitted as both a lake-only split and a Fluss-log split.

Add PaimonPartitionUtils to render partition values in a logical-type-aware way
matching PartitionUtils#convertValueOfType, compute them in PaimonSplitPlanner,
and apply the same fix to DvTableReadableSnapshotRetriever.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Fixes duplicated rows in lake/union reads for partitioned tables when partition columns are non-STRING types by rendering Paimon partition values in the same type-aware format Fluss uses for partition names.

Changes:

  • Add PaimonPartitionUtils.partitionValues(...) to convert BinaryRow partition fields type-aware into Fluss partition-name strings.
  • Use the new conversion in split planning and DV-table partition name mapping so lake-side and Fluss-side partition names match.
  • Extend tests to cover type parity with PartitionUtils#convertValueOfType, end-to-end DATE partition rendering, and split (de)serialization.

Reviewed changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonPartitionUtils.java New utility to render Paimon partition rows into Fluss-formatted partition value strings (type-aware).
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java Use type-aware partition conversion when building Fluss partition names from Paimon BinaryRow.
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitPlanner.java Attach Fluss-formatted partition values onto planned PaimonSplits.
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplit.java Store partition values explicitly rather than re-reading via BinaryRow.getString(...).
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitSerializer.java Serialize/deserialize partition values alongside existing split fields.
fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonPartitionUtilsTest.java New unit tests ensuring Paimon conversion matches Fluss PartitionUtils across supported types.
fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSplitTest.java Adds DATE-partition end-to-end assertion for planned split partition rendering.
fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSplitSerializerTest.java Verifies partition values round-trip through the split serializer.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 48 to +55
DataSplit dataSplit = paimonSplit.dataSplit();
InstantiationUtil.serializeObject(view, dataSplit);
view.writeBoolean(paimonSplit.isBucketUnAware());
List<String> partition = paimonSplit.partition();
view.writeInt(partition.size());
for (String value : partition) {
view.writeUTF(value);
}
Comment on lines 66 to 77
if (version == VERSION_1) {
DataInputStream dis = new DataInputStream(in);
boolean isBucketUnAware = dis.readBoolean();
return new PaimonSplit(dataSplit, isBucketUnAware);
int size = dis.readInt();
List<String> partition = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
partition.add(dis.readUTF());
}
return new PaimonSplit(dataSplit, isBucketUnAware, partition);
} else {
throw new IOException("Unsupported PaimonSplit serialization version: " + version);
}
Comment on lines 55 to 57
public List<String> partition() {
BinaryRow partition = dataSplit.partition();
if (partition.getFieldCount() == 0) {
return Collections.emptyList();
}

List<String> partitions = new ArrayList<>();
for (int i = 0; i < partition.getFieldCount(); i++) {
// Todo Currently, partition column must be String datatype, so we can always use
// consider it as string. Revisit here when
// #489 is finished.
partitions.add(partition.getString(i).toString());
}
return partitions;
return partition;
}
@luoyuxia

luoyuxia commented Jun 20, 2026

Copy link
Copy Markdown
Contributor

@bryndenZh Thanks for the pr.

Nice catch on the root cause. One thing worth tightening before merge:

PaimonPartitionUtils#toFlussPartitionString is a second copy of PartitionUtils#convertValueOfType. The two switch statements now have to stay in lockstep forever — if a new partition type is added on the Fluss side and only convertValueOfType is updated, the lake-side and Fluss-side names silently diverge again, which is exactly the class of bug this PR fixes. The test that asserts "the two produce the same string" is really a symptom of maintaining a mirror.

We can delegate formatting to the single authoritative convertValueOfType, and reuse the existing PaimonRowAsFlussRow adapter + Fluss's InternalRow.createFieldGetter, so there's no per-type logic here at all:

public static List<String> toFlussPartitionValues(
        BinaryRow partition, org.apache.fluss.types.RowType flussPartitionType) {
    PaimonRowAsFlussRow flussRow = new PaimonRowAsFlussRow().replaceRow(partition);
    List<String> values = new ArrayList<>(partition.getFieldCount());
    for (int i = 0; i < partition.getFieldCount(); i++) {
        DataType flussType = flussPartitionType.getTypeAt(i);
        Object value = InternalRow.createFieldGetter(flussType, i).getFieldOrNull(flussRow);
        values.add(PartitionUtils.convertValueOfType(value, flussType.getTypeRoot()));
    }
    return values;
}

This needs a Paimon→Fluss type converter, which doesn't exist yet (we only have the reverse FlussDataTypeToPaimonDataType). Adding the symmetric PaimonDataTypeToFlussDataType as a DataTypeDefaultVisitor is small, mechanical, and reusable across the whole fluss-lake-paimon module. Watch three mappings: Paimon VARCHAR → STRING, VARBINARY → BYTES, LocalZonedTimestamp → TIMESTAMP_LTZ.

A few smaller points:

  • Put these in PaimonConversions (the module's single Paimon↔Fluss entry point) as toFlussRowType / toFlussPartitionValues, instead of a new PaimonPartitionUtils class.
  • DvTableReadableSnapshotRetriever#getPartitionNameFromBinaryRow has the same getString-only bug — point it at the same helper so both paths are fixed together.
  • Perf: hoist the type resolution out of the loop. The Paimon→Fluss conversion is identical for every split / partition bucket. Resolve the Fluss row type once and reuse it per split:
    RowType flussPartitionType = toFlussRowType(fileStoreTable.schema().logicalPartitionType());
    for (Split split : scan.plan().splits()) { ... toFlussPartitionValues(dataSplit.partition(), flussPartitionType) ... }
  • (Separate issue) PaimonSplitSerializer appends the partition fields but keeps VERSION_1. An old VERSION_1 checkpoint would hit EOF on restore. Consider bumping to VERSION_2 with a back-compat read path.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants