MPI: revamp mpi indexing for full arbitrary indexing support#2955
MPI: revamp mpi indexing for full arbitrary indexing support#2955mloubout wants to merge 33 commits into
Conversation
Point-to-point all-to-some via nonblocking consensus; only ranks that share data communicate, with no dense collective.
Maps any index (scalars, slices, negative steps, integer arrays, boolean masks) to per-axis selectors, independent of MPI layout.
Layout resolves a global coordinate to (rank, local offset); ExchangePlan reduces every routed element to an owner and offset plus a replicated payload block.
Exchange(data, idx).get()/.put() runs the plan in either direction; cached_exchange reuses the communication-free plan across identical indexed accesses.
…ngine When an index addresses a distributed axis with an array (the scattered case where the value is rank-local), Data get/set go through a cached Exchange.
data_local is a rank-local view with global indexing disabled; local_indices is now per-Dimension indexable while remaining a valid NumPy multi-axis index.
Replaces the all-to-all broadcast in self[idx]=other (other distributed) with a structured push: the source-to-destination mapping is derived directly from the index as a per-axis affine map, bypassing the legacy _process_args distributor rebuild. A single collective agrees whether every rank can use the structured path, so the choice never diverges (and cannot deadlock); unsupported patterns fall back to the legacy broadcast.
self[idx] with a negative step on a distributed axis now returns a distributed reversed Data by pulling each rank's block from the owners (source coord = start+k*step), instead of the per-element legacy mpi_index_maps path. The result shell is a fresh forward-sliced copy carrying the correct decomposition/shape; the pull overwrites its content. A collective LAND agrees on the structured path. Stepped (|step|>1) distributed slices, whose decomposition Devito does not reduce, still defer to the legacy path.
Decomposition.index_decomposition computes the decomposition induced by NumPy slicing an axis: each subdomain keeps the result positions, in index order, of the global indices it owns. A strided or reversed slice thus only relabels which rank owns which result index -- distinct from the boundary-adjusting reshape used for halos.
Indexing -- including strided and negative-step slices -- never moves data: each rank selects the elements it already owns, and the result decomposition is induced. __getitem__ drops the per-element communication path; data_gather is reimplemented as a local index plus a single collect of each rank's owned result indices. Deletes mpi_index_maps (and its helper) and the negative-step pull redistribute_get, which existed only to force a canonical layout by re-decomposing -- which broke the indexing abstraction.
Assert that each rank holds exactly the part of the global result it owns (reassembled and compared globally), rather than a communicated canonical layout.
Documents how to initialize Function/TimeFunction/SparseFunction data under MPI: global-replicated RHS via data, same-distribution RHS via data_local, and external distributions via global-index routed assignment (data[:, glb_idx] = ...). Wording reflects the local-indexing model -- basic slices are local (induced decomposition), only advanced global-index indexing routes point-to-point.
Collapse duplicated logic in the devito/data/distributed engine: - Define the NumPy advanced-index result ordering once (result_dims in selection.py); derive both result_shape and the plan's row/payload layout from it instead of encoding the rule twice. - Extract a single nbx_push primitive in plan.py and use it from both ExchangePlan.put and redistribution._push (a structured assignment is just a push with payload size 1). - Drop dead parameters and an unused import surfaced by the above.
The _check_idx decorator tagged every __setitem__ with a comm_type (index_by_index / serial) whose only reader sat inside the val._is_decomposed branch -- the exact condition that produced the tag, so the check was always true and gated two unreachable sibling branches. Remove the decorator, the CommType tags, and the comm_type parameters from __getitem__/__setitem__, inlining the single live branch. The legacy broadcast fallback is retained (it is the only path for assigning a value indexed by a scalar on a distributed axis, whose shape is rank-dependent) and documented as such.
Cover the pure layers of devito/data/distributed in serial (no MPI): Selection result shapes validated against NumPy across scalars, slices, negative steps, ellipsis, integer/boolean advanced indexing and mixed cases; result_dims ordering; index_has_array gating; and Layout owner/ local maps and topology resolution.
Bring the engine to the codebase's numpy-docstring convention: add the missing ExchangePlan class docstring, give public/routing functions (build, get, put, nbx_push, _resolve_owners, _group_peers, from_index, redistribute_set) Parameters/Returns sections, document the remaining helpers, and add short explicit comments and blank lines for clarity.
Correct the assignment-support table: dense objects DO support an external (different) distribution, via routed advanced indexing (f.data[glb_idx] = loc_ext_a) or assignment from another distributed Devito object (f.data[region] = g.data[...]). Add tutorial cases for the dense paths that were missing: routed advanced indexing in 1D and across two distributed axes, dense-to-dense redistribution with differing decompositions, and reading distributed data via induced slices and data_gather. Update the terminology, indexing-forms and layout notes accordingly. All code cells verified on 4 ranks.
index_glb_to_loc mis-converted a strided global slice when its start (or, for a negative step, its high end) fell strictly inside the owning subdomain: the stride phase was computed from the subdomain boundary instead of the slice start, so the local block picked spurious leading or trailing elements (e.g. global 2:8:2 gave rank 0 local [0, 2] instead of [2]). This disagreed with the induced decomposition and made data_gather on such slices raise a shape mismatch. Begin the stride phase at the slice start when it lies within the subdomain, for both the positive (loc_min) and negative (loc_max) cases. Add a serial regression test for the conversion and exercise the failing gather slices (2:8:2, 7:0:-2) in the 2D/3D sliced-gather tests.
Now that data_gather handles a strided slice whose start falls inside a subdomain, demonstrate it in the reading example (f.data_gather(start=2, stop=8, step=2)).
|
Check out this pull request on See visual diffs & provide feedback on Jupyter Notebooks. Powered by ReviewNB |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #2955 +/- ##
==========================================
- Coverage 83.60% 82.73% -0.88%
==========================================
Files 250 257 +7
Lines 52908 53383 +475
Branches 4564 4573 +9
==========================================
- Hits 44235 44167 -68
- Misses 7895 8393 +498
- Partials 778 823 +45
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
The distributed-engine docstrings used Sphinx cross-reference roles (:mod:, :class:, :func:, :meth:) that don't match Devito's plain-prose style (cf. Decomposition). Replace them with plain double-backtick names -- reducing :class:`~pkg.mod.Name` shorthand to the bare name -- and reflow the few lines left ragged by the shorter text. Docstrings only.
"Distributed data redistribution engine" was redundant and misleading: indexing is local/induced and only communicates for the scattered and structured-assignment cases, so the package is an indexing engine, not a redistribution one.
These are serial unit tests of devito/data internals (Selection, Layout, result_dims, index_has_array), same as TestDecomposition, so they belong beside it rather than in a separate file. Move the four classes into test_data.py (aliasing the engine's Scalar to avoid clashing with devito.types.Scalar) and drop tests/test_distributed_engine.py.
The selection-layer Scalar collided with devito.types.Scalar, forcing an alias wherever both are imported (e.g. test_data.py). Rename it to IndexScalar at the source so the name is unambiguous everywhere and no aliasing is needed.
Match the single-backtick `name` convention rather than the double ``name`` form, across the distributed engine and the docstrings/comments this branch added to data.py, decomposition.py and test_data.py. Pre-existing double-backtick text elsewhere is left untouched.
Times distributed data[idx] get/set against single-process NumPy on the full domain, for a routed scatter assignment, the matching routed read, and a local strided slice. Routed cases report cold (plan construction) and warm (cached plan) timings, so the steady-state cost is visible.
Two hot-path costs in serial/local data[idx], surfaced by benchmarking: * index_decomposition (used by every basic slice) built a full np.arange over the axis and ran np.isin against each whole subdomain -- O(domain) per slice. Subdomains are contiguous ranges, so bucket the selected indices by bounds in O(result size); an np.isin fallback covers any non-contiguous subdomain. A 1024-wide slice of a 2**20 axis drops from ~2.1 ms to ~20 us. * convert_index ran index_glb_to_loc per element via np.vectorize for an array index. For a single (non-distributed) subdomain the map is just a negative-index wrap, so vectorize it; the distributed path (handled by the redistribution engine) is unchanged. A 65536-point assignment drops from ~62 ms to ~1.6 ms. Both are behaviour-preserving (doctests, TestDecomposition, and the mode-4 slicing/advanced-indexing tests pass).
The first version pitted distributed devito against single-process NumPy as if competing, which is the wrong comparison. Reframe into two studies, auto-selected by the launch size, each making one point: * serial (no mpirun): the data[idx] wrapper adds only a small overhead over NumPy, with no per-element Python loop; * parallel (mpirun -n N): a routed assignment's cost scales with the volume communicated, while the NumPy reference (pure compute) is unaffected by the rank count. Sweeps the number of indexed/communicated points and prints a table per study.
index_handle_oob dropped out-of-bounds (None) entries with np.delete(idx, where(idx == None)) on every 1-D array index. Only an object array can hold those sentinels; a plain numeric index has none, so the call was a wasted O(n) copy (plus an elementwise '== None'). Return the index unchanged in that case. A 65536-point assignment drops from ~1.6 ms to ~0.17 ms; the object-array path (real OOB drops) is unchanged.
Add a ratio column and stop calling the serial overhead 'negligible'. It is a fixed per-call cost (the .data accessor and global-index translation): large relative to NumPy for a trivial index -- whose NumPy cost is almost nothing -- but, being fixed, amortising to ~2x at realistic operation sizes.
nbx_exchange polled with a tight 'while True: Iprobe' busy-wait, spinning at 100% CPU while waiting for peer messages and the termination barrier. When ranks are oversubscribed (more ranks than cores), a spinning rank starves the very peer it is waiting on, so MPI makes no progress and the exchange deadlocks. This is exactly the MPI-notebook CI configuration: a 4-engine ipyparallel cluster on a 2-core runner (OpenMPI --oversubscribe), where it manifested as a multi-minute cell timeout. Yield (time.sleep(0)) whenever a poll pass finds nothing ready, so co-scheduled ranks can run; drain ready messages first via . Verified: correctness unchanged (44 routed/gather mode-4 tests pass) and 16 ranks on 8 cores complete in 0.35s instead of hanging.
The data-initialization notebook's point-to-point routing (NBX exchange) hung the 4-engine ipyparallel cluster under OpenMPI, while IntelMPI ran fine. OpenMPI defaults to an aggressive busy-wait: with the engines contending for cores, the probe/barrier loop spins at 100% and starves the peers it waits on, so MPI never progresses. IntelMPI yields by default. Set OMPI_MCA_mpi_yield_when_idle=1 in the examples-mpi job so OpenMPI yields the CPU while waiting. Also revert the earlier Python-level sleep(0) in nbx_exchange: sched_yield at the wrong layer (the spin is in OpenMPI's C progress engine), which did not help.
60d8c80 to
52c7525
Compare
| def __init__(self, data, idx): | ||
| self.data = data | ||
| self.idx = idx | ||
| self._sig = _signature(data, idx) |
There was a problem hiding this comment.
should be a cached_property
the docstring should also be clarified because at the moment it's pretty unclear what it actually does. We are (obviously) not using the data values for the cache key, but instead, we're using all the metadata associated with data
There was a problem hiding this comment.
should be a cached_property
No, that's conflicting with __slots__ that stores an instance as a compact object without a __dict__. Adding a cached_property defeats the purpose.
|
related? |
|
Fixes #2217 |
52c7525 to
1da5547
Compare
The point-to-point router used the NBX nonblocking-consensus algorithm (Issend + Iprobe(ANY_SOURCE) + Ibarrier). It works under IntelMPI and mpirun, but deadlocks under OpenMPI inside the ipyparallel example notebooks -- whereas ordinary collective/halo MPI (e.g. op.apply) runs fine there. So the NBX pattern itself is the fragile part. Replace it with a reduce-scatter-based sparse exchange: each rank learns how many peers will send to it via one Reduce_scatter_block over a length-nprocs 0/1 indicator, then Isend / Probe / Recv / Waitall. These are the same standard, portable calls that work elsewhere; payloads still move strictly point-to-point (no data all-to-all). Rename nbx_exchange -> sparse_exchange and nbx_push -> sparse_push accordingly, and revert the ineffective OpenMPI yield CI env var. Correctness verified under mpirun: TestDataDistributed, TestDataGather and the sparse advanced-indexing tests pass (modes 4 and 6).
1da5547 to
f17919a
Compare
now also benchmarked against plain numpy for reference in
examples/mpi/benchmark_data_indexing.pyOn my laptop