> {
+ let schema = Arc::new(Schema::empty());
+ let table = MemTable::try_new(schema, vec![vec![]])?;
+ Ok(Arc::new(table))
+}
+
+export_bridge! {
+ jni_class: "com_example_testbridge_BridgeNative",
+ build_provider: build_provider,
+}
+
+#[test]
+fn builder_contract_runs_outside_jvm() {
+ // Expansion + linking is the macro test; this additionally runs the
+ // builder through the same BridgeContext the expansion hands it.
+ let ctx = BridgeContext::get();
+ let provider = build_provider(&ctx, &[], &[]).expect("builder failed");
+ assert_eq!(provider.schema().fields().len(), 0);
+}
diff --git a/spark/pom.xml b/spark/pom.xml
new file mode 100644
index 0000000..90e4e6d
--- /dev/null
+++ b/spark/pom.xml
@@ -0,0 +1,150 @@
+
+
+
+ 4.0.0
+
+
+ org.apache.datafusion
+ datafusion-java-parent
+ 0.2.0-SNAPSHOT
+
+
+ datafusion-java-spark_2.13
+ jar
+
+ Apache DataFusion Java Spark Connector
+
+ Generic Spark DataSource V2 connector for DataFusion TableProviders.
+ Domain bridges implement BridgeProviderFactory over a cdylib built
+ with the datafusion-spark-bridge Rust SDK; this module supplies the
+ Spark plumbing, predicate translation, Arrow-to-Spark schema
+ conversion, and the shared-scan cache. Pure JVM artifact — the
+ native code ships inside each bridge's own jar.
+
+
+
+ 2.13
+ 2.13.14
+ 3.5.7
+
+
+
+
+ org.scala-lang
+ scala-library
+ ${scala.version}
+
+
+
+ org.apache.spark
+ spark-core_${scala.compat.version}
+ ${spark.version}
+ provided
+
+
+ org.apache.spark
+ spark-sql_${scala.compat.version}
+ ${spark.version}
+ provided
+
+
+
+ org.apache.datafusion
+ datafusion-java
+
+
+
+ org.apache.arrow
+ arrow-vector
+
+
+ org.apache.arrow
+ arrow-c-data
+
+
+ org.apache.arrow
+ arrow-memory-netty
+ runtime
+
+
+
+ org.scalatest
+ scalatest_${scala.compat.version}
+ 3.2.18
+ test
+
+
+
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+ 4.8.1
+
+
+
+ compile
+ testCompile
+
+
+
+
+ ${scala.version}
+
+ -deprecation
+ -feature
+ -unchecked
+
+ all
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+ --add-opens=java.base/java.nio=ALL-UNNAMED
+ true
+
+
+
+ org.scalatest
+ scalatest-maven-plugin
+ 2.2.0
+
+ ${project.build.directory}/scalatest-reports
+ .
+ WDF TestSuite.txt
+ --add-opens=java.base/java.nio=ALL-UNNAMED
+
+
+
+ test
+ test
+
+
+
+
+
+
+
diff --git a/spark/src/main/java/io/datafusion/spark/BridgeProviderFactory.java b/spark/src/main/java/io/datafusion/spark/BridgeProviderFactory.java
new file mode 100644
index 0000000..3bcf7ad
--- /dev/null
+++ b/spark/src/main/java/io/datafusion/spark/BridgeProviderFactory.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.datafusion.spark;
+
+import java.util.Map;
+
+/**
+ * Bridge interface implemented per domain (HDF5, custom Iceberg, an in-house format, etc.). A
+ * bridge owns its options encoding and a native scan implementation built with {@code
+ * datafusion_spark_bridge::export_bridge!}; the connector Spark plumbing is generic — it knows only
+ * this interface.
+ *
+ * The single required method is {@link #scanBackend()}, returning the delegations to the JNI
+ * class the bridge named in its {@code export_bridge!} invocation. Everything else has a working
+ * default: {@link #encodeOptions(Map)} encodes the Spark options via {@link OptionsCodec}, and
+ * {@link #listPartitions(byte[])} reports a single partition.
+ *
+ *
Implementations must be no-arg constructable so the Spark connector can instantiate them
+ * reflectively via {@link Class#forName(String)} on the executor.
+ */
+public interface BridgeProviderFactory {
+
+ /**
+ * The native scan implementation this bridge talks to: delegations to the JNI class named in the
+ * bridge's {@code export_bridge!} invocation, whose generated {@code createScan} builds the
+ * provider from the options/partition bytes in process. Called wherever the connector needs
+ * native work — driver-side schema/plan probes and executor-side streams — always on a factory
+ * freshly instantiated from its class name, so the returned backend never has to be serializable.
+ */
+ ScanBackend scanBackend();
+
+ /**
+ * Convert Spark's flat option map to the bridge's encoded options. Driver-side only; the bytes
+ * ship verbatim through {@code DatafusionInputPartition} and are the scan's identity in
+ * shared-scan mode (encode deterministically).
+ *
+ *
Default: {@link OptionsCodec#encode(Map)} — the key-sorted length-prefixed pair format that
+ * {@code datafusion_spark_bridge::options} decodes on the Rust side. Override only if the bridge
+ * already has its own options schema (e.g. a protobuf).
+ *
+ * @throws IllegalArgumentException if required options are missing or invalid
+ */
+ default byte[] encodeOptions(Map sparkOptions) {
+ return OptionsCodec.encode(sparkOptions);
+ }
+
+ /**
+ * Enumerate partitions for this dataset. One Spark task is created per returned {@link
+ * PartitionInfo}. Driver-side only.
+ *
+ * Each partition's {@code partitionBytes} ships verbatim through {@code
+ * DatafusionInputPartition} to the executor, where it is passed to {@link
+ * ScanBackend#createScan}. Use it to encode whatever slice metadata (row range, sub-options, file
+ * offsets, segment id, …) the bridge needs to materialise *that* partition.
+ *
+ *
Each partition's {@code preferredLocations} hostnames are returned from {@code
+ * InputPartition.preferredLocations()} so Spark co-locates the task with the data; empty array =
+ * no preference.
+ *
+ *
Default: one partition ({@code "p0"}, empty payload, no host preference) — one Spark task
+ * scans the whole dataset. Fine for small tables and first bring-up; override (or opt into {@link
+ * #sharedScan(byte[])}) before pointing it at anything large. Size guidance lives in {@code
+ * spark/README.md}.
+ */
+ default PartitionInfo[] listPartitions(byte[] optionsBytes) {
+ return new PartitionInfo[] {new PartitionInfo("p0", new byte[0], new String[0])};
+ }
+
+ /**
+ * Filter-aware variant of {@link #listPartitions(byte[])}. The connector calls this overload with
+ * the pushed-down predicates ({@code LogicalExprNode} proto bytes, one array per predicate, same
+ * encoding the executor later replays via {@link ScanBackend#createScan}). Bridges that can map
+ * predicates onto their partition layout (e.g. {@code segment_id = 'x'}) should prune partitions
+ * that cannot match — pruning here eliminates whole Spark tasks, whereas the per-task filter only
+ * reduces rows inside a task.
+ *
+ *
Pruning must be conservative: only drop a partition when NO row in it can satisfy the
+ * conjunction of all pushed predicates. The default delegates to the filter-unaware overload (no
+ * pruning), which is always correct.
+ */
+ default PartitionInfo[] listPartitions(byte[] optionsBytes, byte[][] filterProtoBytes) {
+ return listPartitions(optionsBytes);
+ }
+
+ /**
+ * Opt into shared-scan mode for this dataset. Default {@code false} (per-partition payload mode,
+ * the {@link #listPartitions(byte[])} path).
+ *
+ *
When {@code true}, the connector builds ONE provider per (executor JVM × scan) with empty
+ * {@code partitionBytes}, plans it once, and runs one Spark task per DataFusion output partition
+ * — task {@code i} streams plan partition {@code i} from the shared, cached plan. This amortises
+ * provider construction cost across all tasks on an executor and is the right model when the
+ * dataset has many small partitions or provider construction is expensive (remote metadata,
+ * connections). {@link #listPartitions(byte[])} and {@link #reportPartitioning(byte[])} are NOT
+ * called in this mode, and the scan reports {@code UnknownPartitioning} (DataFusion-native
+ * partitions carry no key contract).
+ *
+ *
Determinism contract. The driver counts partitions by planning once; every executor
+ * re-plans independently and must arrive at the same result. A bridge returning {@code true}
+ * guarantees:
+ *
+ *
+ * - The provider's schema, partitioning, and per-partition row content are a pure function of
+ * {@code optionsBytes}. Remote sources must pin a snapshot (version, timestamp) inside
+ * the options; data that compacts or moves between driver planning and executor execution
+ * otherwise yields wrong results that no runtime check can catch.
+ *
- The provider's {@code ExecutionPlan} supports calling {@code execute(i)} more than once
+ * per plan instance (Spark task retry and speculative execution re-execute a partition
+ * index, sometimes concurrently). Stateless scans satisfy this; single-shot streams do not.
+ *
+ *
+ * The connector fails tasks with a clear error when the executor's partition count diverges
+ * from the driver's — but identical counts with different contents cannot be detected.
+ */
+ default boolean sharedScan(byte[] optionsBytes) {
+ return false;
+ }
+
+ /**
+ * Declare how rows are partitioned across the {@link PartitionInfo} entries returned by {@link
+ * #listPartitions(byte[])}. Driver-side only.
+ *
+ *
When non-null, the connector surfaces a {@code KeyGroupedPartitioning(keys,
+ * listPartitions(...).length)} to Spark via {@code SupportsReportPartitioning} so the optimizer
+ * can elide shuffles ahead of joins/aggregations on the declared keys.
+ *
+ *
Default returns {@code null} — no partitioning guarantees, Spark plans as if the scan's
+ * output ordering and grouping are unknown.
+ *
+ *
If a bridge implements this, it must hold the {@link ReportedPartitioning} contract: every
+ * row in a given partition evaluates to the same tuple of key values under the declared
+ * transforms.
+ *
+ *
Spark 3.3+ caveat: the reported partitioning only takes effect when every {@link
+ * PartitionInfo} also carries {@link PartitionInfo#partitionKeyValues()} (surfaced to Spark via
+ * {@code HasPartitionKey}); without key values Spark ignores the declared {@code
+ * KeyGroupedPartitioning} entirely. Storage-partitioned joins additionally require {@code
+ * spark.sql.sources.v2.bucketing.enabled=true}.
+ */
+ default ReportedPartitioning reportPartitioning(byte[] optionsBytes) {
+ return null;
+ }
+}
diff --git a/spark/src/main/java/io/datafusion/spark/NativeLibraryLoader.java b/spark/src/main/java/io/datafusion/spark/NativeLibraryLoader.java
new file mode 100644
index 0000000..eb4766a
--- /dev/null
+++ b/spark/src/main/java/io/datafusion/spark/NativeLibraryLoader.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.datafusion.spark;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.Locale;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Extracts a cdylib bundled inside a jar to a temp file and loads it via {@link System#load}.
+ * Expected layout inside the jar:
+ *
+ *
+ * <resourcePrefix>/<os>/<arch>/lib<name>.<ext>
+ *
+ *
+ * where {@code } is one of {@code linux}, {@code darwin}, {@code windows} and {@code } is
+ * {@code x86_64} or {@code aarch64}.
+ *
+ * Bridges call {@link #load(Class, String, String)} from their native class's static
+ * initializer, with their own resource prefix, instead of hand-rolling extraction. Bundle the
+ * cdylib with the antrun-copy pattern shown in "Packaging your bridge" in {@code spark/README.md}.
+ */
+public final class NativeLibraryLoader {
+
+ /** {@code /} entries already extracted and loaded by this classloader. */
+ private static final Set LOADED = ConcurrentHashMap.newKeySet();
+
+ private NativeLibraryLoader() {}
+
+ /**
+ * Extract {@code ///} from {@code anchor}'s classloader
+ * and {@link System#load} it. Idempotent per (prefix, name): repeated calls — e.g. one per Spark
+ * task instantiating the bridge's native class — load once.
+ *
+ * @param anchor class whose classloader holds the resource (the bridge's own native class, so the
+ * lookup works under Spark's per-application classloaders)
+ * @param resourcePrefix jar-internal directory, no leading or trailing slash (e.g. {@code
+ * "com/example/mybridge"})
+ * @param name unmapped library name (e.g. {@code "my_bridge"} for {@code libmy_bridge.so})
+ * @throws UnsatisfiedLinkError if the resource is missing or extraction fails
+ */
+ public static void load(Class> anchor, String resourcePrefix, String name) {
+ String key = resourcePrefix + "/" + name;
+ if (!LOADED.add(key)) {
+ return;
+ }
+ String resource =
+ String.format(
+ "/%s/%s/%s/%s",
+ resourcePrefix, currentOs(), currentArch(), System.mapLibraryName(name));
+ try (InputStream in = anchor.getResourceAsStream(resource)) {
+ if (in == null) {
+ LOADED.remove(key);
+ throw new UnsatisfiedLinkError("Native library not found on classpath: " + resource);
+ }
+ Path tmp = Files.createTempFile("libdatafusion-spark-", "-" + System.mapLibraryName(name));
+ tmp.toFile().deleteOnExit();
+ Files.copy(in, tmp, StandardCopyOption.REPLACE_EXISTING);
+ System.load(tmp.toAbsolutePath().toString());
+ } catch (IOException e) {
+ LOADED.remove(key);
+ throw new UnsatisfiedLinkError(
+ "Failed to extract native library " + resource + ": " + e.getMessage());
+ } catch (RuntimeException | Error e) {
+ LOADED.remove(key);
+ throw e;
+ }
+ }
+
+ private static String currentOs() {
+ String os = System.getProperty("os.name", "").toLowerCase(Locale.ROOT);
+ if (os.contains("linux")) return "linux";
+ if (os.contains("mac") || os.contains("darwin")) return "darwin";
+ if (os.contains("windows")) return "windows";
+ throw new UnsupportedOperationException("Unsupported OS: " + os);
+ }
+
+ private static String currentArch() {
+ String arch = System.getProperty("os.arch", "").toLowerCase(Locale.ROOT);
+ if (arch.equals("amd64") || arch.equals("x86_64")) return "x86_64";
+ if (arch.equals("aarch64") || arch.equals("arm64")) return "aarch64";
+ throw new UnsupportedOperationException("Unsupported arch: " + arch);
+ }
+}
diff --git a/spark/src/main/java/io/datafusion/spark/OptionsCodec.java b/spark/src/main/java/io/datafusion/spark/OptionsCodec.java
new file mode 100644
index 0000000..0d16a28
--- /dev/null
+++ b/spark/src/main/java/io/datafusion/spark/OptionsCodec.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.datafusion.spark;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Default wire format for {@link BridgeProviderFactory#encodeOptions(Map)}: the Spark options map
+ * as length-prefixed UTF-8 pairs, sorted by key.
+ *
+ * Layout (all integers big-endian {@code int32}): entry count, then per entry key length, key
+ * bytes, value length, value bytes. Key-sorting makes the bytes a pure function of the map's
+ * contents regardless of source iteration order — required by the shared-scan determinism contract,
+ * where the options bytes are the cache/plan identity.
+ *
+ *
The Rust decoder lives in {@code datafusion_spark_bridge::options}; bridges using the default
+ * {@code encodeOptions} read their options there as a {@code BTreeMap}. The two
+ * implementations are pinned to each other by a shared test fixture.
+ */
+public final class OptionsCodec {
+
+ private OptionsCodec() {}
+
+ /** Encode {@code options} sorted by key. {@code null} or empty map encodes as count 0. */
+ public static byte[] encode(Map options) {
+ TreeMap sorted = options == null ? new TreeMap<>() : new TreeMap<>(options);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ writeInt(out, sorted.size());
+ for (Map.Entry e : sorted.entrySet()) {
+ if (e.getKey() == null || e.getValue() == null) {
+ throw new IllegalArgumentException("OptionsCodec does not accept null keys or values");
+ }
+ writeBytes(out, e.getKey().getBytes(StandardCharsets.UTF_8));
+ writeBytes(out, e.getValue().getBytes(StandardCharsets.UTF_8));
+ }
+ return out.toByteArray();
+ }
+
+ /** Decode bytes produced by {@link #encode(Map)}. Preserves the encoded (sorted) order. */
+ public static Map decode(byte[] bytes) {
+ Map out = new LinkedHashMap<>();
+ if (bytes == null || bytes.length == 0) {
+ return out;
+ }
+ ByteBuffer buf = ByteBuffer.wrap(bytes);
+ int count = readCount(buf, "entry count");
+ for (int i = 0; i < count; i++) {
+ String key = readString(buf, "key of entry " + i);
+ String value = readString(buf, "value of entry " + i);
+ out.put(key, value);
+ }
+ if (buf.hasRemaining()) {
+ throw new IllegalArgumentException(
+ "OptionsCodec: " + buf.remaining() + " trailing byte(s) after " + count + " entries");
+ }
+ return out;
+ }
+
+ private static void writeInt(ByteArrayOutputStream out, int v) {
+ out.write((v >>> 24) & 0xFF);
+ out.write((v >>> 16) & 0xFF);
+ out.write((v >>> 8) & 0xFF);
+ out.write(v & 0xFF);
+ }
+
+ private static void writeBytes(ByteArrayOutputStream out, byte[] bytes) {
+ writeInt(out, bytes.length);
+ out.write(bytes, 0, bytes.length);
+ }
+
+ private static int readCount(ByteBuffer buf, String what) {
+ if (buf.remaining() < 4) {
+ throw new IllegalArgumentException("OptionsCodec: truncated " + what);
+ }
+ int v = buf.getInt();
+ if (v < 0) {
+ throw new IllegalArgumentException("OptionsCodec: negative " + what + ": " + v);
+ }
+ return v;
+ }
+
+ private static String readString(ByteBuffer buf, String what) {
+ int len = readCount(buf, "length of " + what);
+ if (buf.remaining() < len) {
+ throw new IllegalArgumentException("OptionsCodec: truncated " + what);
+ }
+ byte[] bytes = new byte[len];
+ buf.get(bytes);
+ return new String(bytes, StandardCharsets.UTF_8);
+ }
+}
diff --git a/spark/src/main/java/io/datafusion/spark/PartitionInfo.java b/spark/src/main/java/io/datafusion/spark/PartitionInfo.java
new file mode 100644
index 0000000..e6e061b
--- /dev/null
+++ b/spark/src/main/java/io/datafusion/spark/PartitionInfo.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.datafusion.spark;
+
+/**
+ * Driver-side descriptor for a single partition produced by {@link
+ * BridgeProviderFactory#listPartitions(byte[])}. Carries the bridge-specific slice payload that the
+ * executor passes back into {@link ScanBackend#createScan}, plus
+ * optional host hints for Spark's scheduler.
+ *
+ * Fields:
+ *
+ *
+ * - {@code id} — stable, human-readable identifier for this partition (e.g. a segment id).
+ * Surfaces in Spark UI, logs, and exception messages. Must be non-empty.
+ *
- {@code partitionBytes} — opaque per-partition payload. Bridge encodes whatever the executor
+ * needs to materialise *this* slice (offsets, row ranges, sub-options, etc.). Combined with
+ * the global {@code optionsBytes} in {@link ScanBackend#createScan}. Empty array = no
+ * per-partition state (single-partition table).
+ *
- {@code preferredLocations} — hostnames where this partition's data lives. Returned from
+ * {@code InputPartition.preferredLocations()} so Spark can co-locate the task with the data.
+ * Empty array = no preference. Honoured subject to {@code spark.locality.wait}.
+ *
- {@code partitionKeyValues} — optional values of the partitioning keys for every row in this
+ * partition, in the same order as {@link BridgeProviderFactory#reportPartitioning(byte[])}'s
+ * declared transforms. {@code null} = no key (the default). When the bridge reports a
+ * partitioning AND every partition carries key values, the connector exposes them to Spark
+ * via {@code HasPartitionKey} — required on Spark 3.3+ for the reported {@code
+ * KeyGroupedPartitioning} to have any effect (and storage-partitioned joins additionally
+ * require {@code spark.sql.sources.v2.bucketing.enabled=true}). Values must be Java types
+ * that Spark's {@code CatalystTypeConverters} can convert for the key columns' data types
+ * (e.g. {@code String}, {@code Long}, {@code Integer}, {@code java.time.Instant}, {@code
+ * java.time.LocalDate}, {@code java.math.BigDecimal}), and the array length must equal the
+ * number of declared keys.
+ *
+ */
+public record PartitionInfo(
+ String id, byte[] partitionBytes, String[] preferredLocations, Object[] partitionKeyValues) {
+
+ public PartitionInfo {
+ if (id == null || id.isEmpty()) {
+ throw new IllegalArgumentException("PartitionInfo: id must be non-empty");
+ }
+ if (partitionBytes == null) {
+ partitionBytes = new byte[0];
+ }
+ if (preferredLocations == null) {
+ preferredLocations = new String[0];
+ }
+ // partitionKeyValues stays null when absent: null and "no key" are the same state,
+ // and DatafusionBatch distinguishes keyed from unkeyed partitions by it.
+ }
+
+ /** Without partition key values — the common case. */
+ public PartitionInfo(String id, byte[] partitionBytes, String[] preferredLocations) {
+ this(id, partitionBytes, preferredLocations, null);
+ }
+}
diff --git a/spark/src/main/java/io/datafusion/spark/ReportedPartitioning.java b/spark/src/main/java/io/datafusion/spark/ReportedPartitioning.java
new file mode 100644
index 0000000..639fec9
--- /dev/null
+++ b/spark/src/main/java/io/datafusion/spark/ReportedPartitioning.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.datafusion.spark;
+
+import java.util.Arrays;
+
+import org.apache.spark.sql.connector.expressions.Expressions;
+import org.apache.spark.sql.connector.expressions.Transform;
+
+/**
+ * Driver-side declaration of how a bridge's data is partitioned on the key columns. When supplied
+ * via {@link BridgeProviderFactory#reportPartitioning(byte[])}, the connector surfaces a {@link
+ * org.apache.spark.sql.connector.read.partitioning.KeyGroupedPartitioning} from {@link
+ * org.apache.spark.sql.connector.read.SupportsReportPartitioning#outputPartitioning()} — Spark's
+ * optimizer can then skip the shuffle ahead of joins/aggregations whose grouping keys line up with
+ * these transforms.
+ *
+ * Contract: for any partition reported by {@link BridgeProviderFactory#listPartitions(byte[])},
+ * every row produced by that partition must evaluate to the same tuple of key values under these
+ * transforms. Different partitions may share key values (Spark will fuse them); they must
+ * not straddle key values.
+ *
+ *
The partition count Spark sees is {@code listPartitions(...).length}; it is not carried here
+ * to keep a single source of truth.
+ */
+public final class ReportedPartitioning {
+
+ private final Transform[] keys;
+
+ public ReportedPartitioning(Transform[] keys) {
+ if (keys == null || keys.length == 0) {
+ throw new IllegalArgumentException(
+ "ReportedPartitioning: keys must contain at least one transform");
+ }
+ this.keys = keys;
+ }
+
+ public Transform[] keys() {
+ return keys;
+ }
+
+ /**
+ * Convenience: declare identity partitioning on one or more columns (a row in partition P has the
+ * same {@code (col1, col2, …)} values as every other row in P).
+ */
+ public static ReportedPartitioning identity(String... columns) {
+ if (columns == null || columns.length == 0) {
+ throw new IllegalArgumentException(
+ "ReportedPartitioning.identity: at least one column required");
+ }
+ Transform[] ts = Arrays.stream(columns).map(Expressions::identity).toArray(Transform[]::new);
+ return new ReportedPartitioning(ts);
+ }
+
+ /**
+ * Convenience: declare hash-bucket partitioning. Mirrors Spark's {@code bucket(N, cols…)}
+ * transform — each row is assigned to bucket {@code hash(cols) mod numBuckets}.
+ */
+ public static ReportedPartitioning bucket(int numBuckets, String... columns) {
+ if (numBuckets <= 0) {
+ throw new IllegalArgumentException(
+ "ReportedPartitioning.bucket: numBuckets must be > 0, got " + numBuckets);
+ }
+ if (columns == null || columns.length == 0) {
+ throw new IllegalArgumentException(
+ "ReportedPartitioning.bucket: at least one column required");
+ }
+ return new ReportedPartitioning(new Transform[] {Expressions.bucket(numBuckets, columns)});
+ }
+}
diff --git a/spark/src/main/java/io/datafusion/spark/ScanBackend.java b/spark/src/main/java/io/datafusion/spark/ScanBackend.java
new file mode 100644
index 0000000..a994c98
--- /dev/null
+++ b/spark/src/main/java/io/datafusion/spark/ScanBackend.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.datafusion.spark;
+
+/**
+ * Native scan surface the connector plumbing talks to: one method per JNI entry point generated by
+ * the bridge's {@code datafusion_spark_bridge::export_bridge!} invocation. A bridge's
+ * implementation is six one-line delegations to the JNI class named in that macro, whose {@code
+ * createScan} builds the provider from {@code options}/{@code partitionBytes} in process.
+ *
+ *
Implementations must be stateless or thread-safe: the driver probes schemas and plans through
+ * one instance while executor tasks stream through others, and scan handles are shared across
+ * threads by the shared-scan cache. Handle-based methods accept handles produced by {@code
+ * createScan} on any instance of the same implementation.
+ */
+public interface ScanBackend {
+
+ /**
+ * Driver-side schema probe: the widened Arrow schema of the provider described by {@code options}
+ * + {@code partitionBytes}, serialized as Arrow IPC bytes (deserialize with {@code
+ * MessageSerializer.deserializeSchema}).
+ */
+ byte[] providerSchemaIpc(byte[] options, byte[] partitionBytes);
+
+ /**
+ * Build a planned scan and return its handle. {@code targetPartitions}/{@code batchSize} {@code
+ * <= 0} leave DataFusion defaults; {@code optionKeys}/{@code optionValues} are parallel config
+ * override arrays; empty {@code projectionColumns} selects all columns; each {@code filterProtos}
+ * element is a serialized {@code datafusion.LogicalExprNode}.
+ *
+ *
The caller owns the handle and must pair it with {@link #closeScan(long)}. Closing while a
+ * stream opened from the handle is in flight is undefined behaviour — the shared-scan cache's
+ * refcount enforces this; any other caller must serialize close itself.
+ */
+ long createScan(
+ byte[] options,
+ byte[] partitionBytes,
+ int targetPartitions,
+ int batchSize,
+ String[] optionKeys,
+ String[] optionValues,
+ String[] projectionColumns,
+ byte[][] filterProtos);
+
+ /** Output partition count of the planned physical plan. */
+ int partitionCount(long scanHandle);
+
+ /**
+ * Open an independent stream over ONE plan partition, writing an {@code FFI_ArrowArrayStream}
+ * into the caller-allocated struct at {@code ffiStreamAddr}. Concurrent-safe across JVM threads.
+ */
+ void executeStreamPartition(long scanHandle, int partition, long ffiStreamAddr);
+
+ /**
+ * Stream the WHOLE plan (all partitions coalesced) into the caller-allocated {@code
+ * FFI_ArrowArrayStream} at {@code ffiStreamAddr}. Used by per-partition mode.
+ */
+ void executeStream(long scanHandle, long ffiStreamAddr);
+
+ /** Drop the planned scan. See {@link #createScan} for the close-vs-in-flight contract. */
+ void closeScan(long scanHandle);
+}
diff --git a/spark/src/test/scala/io/datafusion/spark/BridgeProviderFactoryDefaultsTest.scala b/spark/src/test/scala/io/datafusion/spark/BridgeProviderFactoryDefaultsTest.scala
new file mode 100644
index 0000000..0b94eee
--- /dev/null
+++ b/spark/src/test/scala/io/datafusion/spark/BridgeProviderFactoryDefaultsTest.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.datafusion.spark
+
+import org.scalatest.funsuite.AnyFunSuite
+
+class BridgeProviderFactoryDefaultsTest extends AnyFunSuite {
+
+ /** Backend stub: the defaults under test never touch native code. */
+ private object StubBackend extends ScanBackend {
+ def providerSchemaIpc(options: Array[Byte], partitionBytes: Array[Byte]): Array[Byte] =
+ throw new UnsupportedOperationException
+ def createScan(
+ options: Array[Byte],
+ partitionBytes: Array[Byte],
+ targetPartitions: Int,
+ batchSize: Int,
+ optionKeys: Array[String],
+ optionValues: Array[String],
+ projectionColumns: Array[String],
+ filterProtos: Array[Array[Byte]]): Long = throw new UnsupportedOperationException
+ def partitionCount(scanHandle: Long): Int = throw new UnsupportedOperationException
+ def executeStreamPartition(scanHandle: Long, partition: Int, ffiStreamAddr: Long): Unit =
+ throw new UnsupportedOperationException
+ def executeStream(scanHandle: Long, ffiStreamAddr: Long): Unit =
+ throw new UnsupportedOperationException
+ def closeScan(scanHandle: Long): Unit = throw new UnsupportedOperationException
+ }
+
+ /** Factory overriding only listPartitions (to spy on its inputs). */
+ private class MinimalFactory extends BridgeProviderFactory {
+ var lastListPartitionsOpts: Array[Byte] = _
+
+ override def scanBackend(): ScanBackend = StubBackend
+
+ override def listPartitions(optionsBytes: Array[Byte]): Array[PartitionInfo] = {
+ lastListPartitionsOpts = optionsBytes
+ Array(new PartitionInfo("p0", Array.emptyByteArray, Array.empty[String]))
+ }
+ }
+
+ /** Only the required method implemented — the literal minimum a bridge can ship. */
+ private class EmptyFactory extends BridgeProviderFactory {
+ override def scanBackend(): ScanBackend = StubBackend
+ }
+
+ test("sharedScan defaults to false") {
+ assert(!new MinimalFactory().sharedScan(Array[Byte](1, 2, 3)))
+ }
+
+ test("default encodeOptions uses OptionsCodec") {
+ val opts = new java.util.HashMap[String, String]()
+ opts.put("url", "grpc://h:1")
+ val bytes = new EmptyFactory().encodeOptions(opts)
+ assert(java.util.Arrays.equals(bytes, OptionsCodec.encode(opts)))
+ assert(OptionsCodec.decode(bytes).get("url") == "grpc://h:1")
+ }
+
+ test("default listPartitions reports a single whole-dataset partition") {
+ val partitions = new EmptyFactory().listPartitions(Array[Byte](1))
+ assert(partitions.length == 1)
+ assert(partitions(0).id == "p0")
+ assert(partitions(0).partitionBytes().isEmpty)
+ assert(partitions(0).preferredLocations().isEmpty)
+ }
+
+ test("filter-aware listPartitions delegates to the filter-unaware overload") {
+ val factory = new MinimalFactory
+ val opts = Array[Byte](7, 8)
+ val filters = Array(Array[Byte](1), Array[Byte](2))
+ val partitions = factory.listPartitions(opts, filters)
+ assert(partitions.length == 1)
+ assert(partitions(0).id == "p0")
+ assert(factory.lastListPartitionsOpts eq opts)
+ }
+
+ test("reportPartitioning defaults to null") {
+ assert(new MinimalFactory().reportPartitioning(Array.emptyByteArray) == null)
+ }
+
+ test("PartitionInfo 3-arg constructor leaves partitionKeyValues null") {
+ val p = new PartitionInfo("p0", Array.emptyByteArray, Array.empty[String])
+ assert(p.partitionKeyValues() == null)
+ }
+
+ test("PartitionInfo 4-arg constructor carries key values") {
+ val p = new PartitionInfo(
+ "p0",
+ Array.emptyByteArray,
+ Array.empty[String],
+ Array[AnyRef]("segment-a", Long.box(42L)))
+ assert(p.partitionKeyValues().length == 2)
+ assert(p.partitionKeyValues()(0) == "segment-a")
+ }
+}
diff --git a/spark/src/test/scala/io/datafusion/spark/OptionsCodecTest.scala b/spark/src/test/scala/io/datafusion/spark/OptionsCodecTest.scala
new file mode 100644
index 0000000..59f6c8f
--- /dev/null
+++ b/spark/src/test/scala/io/datafusion/spark/OptionsCodecTest.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.datafusion.spark
+
+import java.io.ByteArrayOutputStream
+import java.nio.charset.StandardCharsets
+
+import org.scalatest.funsuite.AnyFunSuite
+
+class OptionsCodecTest extends AnyFunSuite {
+
+ /**
+ * Shared fixture: must stay byte-identical to the one asserted by the Rust-side
+ * `datafusion_spark_bridge::options` tests. {"table": "t1", "url": "grpc://h:1"} encodes
+ * (sorted: table < url) as below.
+ */
+ private def fixtureBytes(): Array[Byte] = {
+ val out = new ByteArrayOutputStream()
+ def writeInt(v: Int): Unit = {
+ out.write((v >>> 24) & 0xFF); out.write((v >>> 16) & 0xFF)
+ out.write((v >>> 8) & 0xFF); out.write(v & 0xFF)
+ }
+ def writeString(s: String): Unit = {
+ val b = s.getBytes(StandardCharsets.UTF_8)
+ writeInt(b.length)
+ out.write(b, 0, b.length)
+ }
+ writeInt(2)
+ Seq("table" -> "t1", "url" -> "grpc://h:1").foreach { case (k, v) =>
+ writeString(k); writeString(v)
+ }
+ out.toByteArray
+ }
+
+ test("encodes the cross-language fixture byte-identically, sorted by key") {
+ // Insertion order deliberately unsorted; encoding must sort.
+ val opts = new java.util.LinkedHashMap[String, String]()
+ opts.put("url", "grpc://h:1")
+ opts.put("table", "t1")
+ assert(java.util.Arrays.equals(OptionsCodec.encode(opts), fixtureBytes()))
+ }
+
+ test("round-trips including unicode values") {
+ val opts = new java.util.HashMap[String, String]()
+ opts.put("a", "1")
+ opts.put("unicode", "héllo→world")
+ val decoded = OptionsCodec.decode(OptionsCodec.encode(opts))
+ assert(decoded.size() == 2)
+ assert(decoded.get("unicode") == "héllo→world")
+ }
+
+ test("null and empty maps encode to a zero count and decode back empty") {
+ assert(OptionsCodec.decode(OptionsCodec.encode(null)).isEmpty)
+ assert(OptionsCodec.decode(Array.emptyByteArray).isEmpty)
+ }
+
+ test("rejects truncation and trailing bytes") {
+ val bytes = fixtureBytes()
+ intercept[IllegalArgumentException] {
+ OptionsCodec.decode(java.util.Arrays.copyOf(bytes, bytes.length - 1))
+ }
+ intercept[IllegalArgumentException] {
+ OptionsCodec.decode(java.util.Arrays.copyOf(bytes, bytes.length + 1))
+ }
+ }
+
+ test("rejects null keys or values") {
+ val opts = new java.util.HashMap[String, String]()
+ opts.put("k", null)
+ intercept[IllegalArgumentException] { OptionsCodec.encode(opts) }
+ }
+}