Skip to content

MPI: revamp mpi indexing for full arbitrary indexing support#2955

Open
mloubout wants to merge 33 commits into
mainfrom
mpi-redistribution-engine
Open

MPI: revamp mpi indexing for full arbitrary indexing support#2955
mloubout wants to merge 33 commits into
mainfrom
mpi-redistribution-engine

Conversation

@mloubout

@mloubout mloubout commented Jun 25, 2026

Copy link
Copy Markdown
Contributor

now also benchmarked against plain numpy for reference in examples/mpi/benchmark_data_indexing.py

On my laptop

Serial overhead of data[idx] = v over NumPy (axis=1048576, reps=20)

    points    numpy [us]   devito [us]   overhead [us]    ratio
---------------------------------------------------------------
        16           0.2           5.6             5.4      34x
       128           0.8           6.3             5.5       8x
      1024           4.1          10.5             6.5       3x
      8192          12.0          25.0            13.0       2x
     65536          96.4         162.5            66.1       2x
Routed assignment data[idx] = v under MPI (axis=1048576, ranks=4, reps=20)

   points    KiB sent    numpy [us]   devito [us]     comm [us]
----------------------------------------------------------------
       16         0.1           0.2          47.4          47.2
      128         0.5           1.0          50.0          49.1
     1024         4.0           6.5          97.6          91.1
     8192        32.0          36.4        1058.5        1022.2
    65536       256.0         141.7        6101.6        5959.9

mloubout added 20 commits June 24, 2026 10:41
Point-to-point all-to-some via nonblocking consensus; only ranks that share
data communicate, with no dense collective.
Maps any index (scalars, slices, negative steps, integer arrays, boolean masks)
to per-axis selectors, independent of MPI layout.
Layout resolves a global coordinate to (rank, local offset); ExchangePlan reduces
every routed element to an owner and offset plus a replicated payload block.
Exchange(data, idx).get()/.put() runs the plan in either direction; cached_exchange
reuses the communication-free plan across identical indexed accesses.
…ngine

When an index addresses a distributed axis with an array (the scattered case
where the value is rank-local), Data get/set go through a cached Exchange.
data_local is a rank-local view with global indexing disabled; local_indices is
now per-Dimension indexable while remaining a valid NumPy multi-axis index.
Replaces the all-to-all broadcast in self[idx]=other (other distributed) with a
structured push: the source-to-destination mapping is derived directly from the
index as a per-axis affine map, bypassing the legacy _process_args distributor
rebuild. A single collective agrees whether every rank can use the structured
path, so the choice never diverges (and cannot deadlock); unsupported patterns
fall back to the legacy broadcast.
self[idx] with a negative step on a distributed axis now returns a distributed
reversed Data by pulling each rank's block from the owners (source coord =
start+k*step), instead of the per-element legacy mpi_index_maps path. The result
shell is a fresh forward-sliced copy carrying the correct decomposition/shape;
the pull overwrites its content. A collective LAND agrees on the structured path.
Stepped (|step|>1) distributed slices, whose decomposition Devito does not
reduce, still defer to the legacy path.
Decomposition.index_decomposition computes the decomposition induced by NumPy
slicing an axis: each subdomain keeps the result positions, in index order, of
the global indices it owns. A strided or reversed slice thus only relabels which
rank owns which result index -- distinct from the boundary-adjusting reshape used
for halos.
Indexing -- including strided and negative-step slices -- never moves data: each
rank selects the elements it already owns, and the result decomposition is
induced. __getitem__ drops the per-element communication path; data_gather is
reimplemented as a local index plus a single collect of each rank's owned result
indices. Deletes mpi_index_maps (and its helper) and the negative-step pull
redistribute_get, which existed only to force a canonical layout by re-decomposing
-- which broke the indexing abstraction.
Assert that each rank holds exactly the part of the global result it owns
(reassembled and compared globally), rather than a communicated canonical layout.
Documents how to initialize Function/TimeFunction/SparseFunction data under MPI:
global-replicated RHS via data, same-distribution RHS via data_local, and external
distributions via global-index routed assignment (data[:, glb_idx] = ...). Wording
reflects the local-indexing model -- basic slices are local (induced decomposition),
only advanced global-index indexing routes point-to-point.
Collapse duplicated logic in the devito/data/distributed engine:

- Define the NumPy advanced-index result ordering once (result_dims in
  selection.py); derive both result_shape and the plan's row/payload
  layout from it instead of encoding the rule twice.
- Extract a single nbx_push primitive in plan.py and use it from both
  ExchangePlan.put and redistribution._push (a structured assignment is
  just a push with payload size 1).
- Drop dead parameters and an unused import surfaced by the above.
The _check_idx decorator tagged every __setitem__ with a comm_type
(index_by_index / serial) whose only reader sat inside the
val._is_decomposed branch -- the exact condition that produced the tag,
so the check was always true and gated two unreachable sibling branches.

Remove the decorator, the CommType tags, and the comm_type parameters
from __getitem__/__setitem__, inlining the single live branch. The
legacy broadcast fallback is retained (it is the only path for assigning
a value indexed by a scalar on a distributed axis, whose shape is
rank-dependent) and documented as such.
Cover the pure layers of devito/data/distributed in serial (no MPI):
Selection result shapes validated against NumPy across scalars, slices,
negative steps, ellipsis, integer/boolean advanced indexing and mixed
cases; result_dims ordering; index_has_array gating; and Layout owner/
local maps and topology resolution.
Bring the engine to the codebase's numpy-docstring convention: add the
missing ExchangePlan class docstring, give public/routing functions
(build, get, put, nbx_push, _resolve_owners, _group_peers, from_index,
redistribute_set) Parameters/Returns sections, document the remaining
helpers, and add short explicit comments and blank lines for clarity.
Correct the assignment-support table: dense objects DO support an external
(different) distribution, via routed advanced indexing (f.data[glb_idx] =
loc_ext_a) or assignment from another distributed Devito object
(f.data[region] = g.data[...]).

Add tutorial cases for the dense paths that were missing: routed advanced
indexing in 1D and across two distributed axes, dense-to-dense
redistribution with differing decompositions, and reading distributed data
via induced slices and data_gather. Update the terminology, indexing-forms
and layout notes accordingly. All code cells verified on 4 ranks.
index_glb_to_loc mis-converted a strided global slice when its start (or,
for a negative step, its high end) fell strictly inside the owning
subdomain: the stride phase was computed from the subdomain boundary
instead of the slice start, so the local block picked spurious leading or
trailing elements (e.g. global 2:8:2 gave rank 0 local [0, 2] instead of
[2]). This disagreed with the induced decomposition and made data_gather
on such slices raise a shape mismatch.

Begin the stride phase at the slice start when it lies within the
subdomain, for both the positive (loc_min) and negative (loc_max) cases.

Add a serial regression test for the conversion and exercise the failing
gather slices (2:8:2, 7:0:-2) in the 2D/3D sliced-gather tests.
Now that data_gather handles a strided slice whose start falls inside a
subdomain, demonstrate it in the reading example
(f.data_gather(start=2, stop=8, step=2)).
@mloubout mloubout added the MPI mpi-related label Jun 25, 2026
@review-notebook-app

Copy link
Copy Markdown

Check out this pull request on  ReviewNB

See visual diffs & provide feedback on Jupyter Notebooks.


Powered by ReviewNB

Comment thread devito/data/distributed/__init__.py Outdated
Comment thread devito/data/distributed/__init__.py Outdated
Comment thread tests/test_distributed_engine.py Outdated
Comment thread tests/test_distributed_engine.py Outdated
@codecov

codecov Bot commented Jun 25, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 44.27141% with 501 lines in your changes missing coverage. Please review.
✅ Project coverage is 82.73%. Comparing base (c01281c) to head (f17919a).

Files with missing lines Patch % Lines
devito/data/distributed/plan.py 16.00% 168 Missing ⚠️
tests/test_data.py 46.12% 132 Missing ⚠️
devito/data/data.py 18.33% 44 Missing and 5 partials ⚠️
devito/data/distributed/redistribution.py 18.75% 39 Missing ⚠️
devito/data/distributed/transport.py 13.88% 31 Missing ⚠️
devito/data/distributed/exchange.py 44.44% 25 Missing ⚠️
tests/test_mpi.py 17.24% 24 Missing ⚠️
devito/data/distributed/selection.py 88.51% 10 Missing and 7 partials ⚠️
devito/data/decomposition.py 71.42% 2 Missing and 4 partials ⚠️
devito/types/dense.py 37.50% 5 Missing ⚠️
... and 2 more
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2955      +/-   ##
==========================================
- Coverage   83.60%   82.73%   -0.88%     
==========================================
  Files         250      257       +7     
  Lines       52908    53383     +475     
  Branches     4564     4573       +9     
==========================================
- Hits        44235    44167      -68     
- Misses       7895     8393     +498     
- Partials      778      823      +45     
Flag Coverage Δ
pytest-gpu-aomp-amdgpuX ?
pytest-gpu-gcc- 78.17% <43.93%> (-0.23%) ⬇️
pytest-gpu-icx- 43.26% <23.24%> (-35.07%) ⬇️
pytest-gpu-nvc-nvidiaX 68.99% <24.96%> (-0.41%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

mloubout added 2 commits June 25, 2026 09:52
The distributed-engine docstrings used Sphinx cross-reference roles
(:mod:, :class:, :func:, :meth:) that don't match Devito's plain-prose
style (cf. Decomposition). Replace them with plain double-backtick names
-- reducing :class:`~pkg.mod.Name` shorthand to the bare name -- and
reflow the few lines left ragged by the shorter text. Docstrings only.
"Distributed data redistribution engine" was redundant and misleading:
indexing is local/induced and only communicates for the scattered and
structured-assignment cases, so the package is an indexing engine, not a
redistribution one.
mloubout added 10 commits June 25, 2026 09:59
These are serial unit tests of devito/data internals (Selection, Layout,
result_dims, index_has_array), same as TestDecomposition, so they belong
beside it rather than in a separate file. Move the four classes into
test_data.py (aliasing the engine's Scalar to avoid clashing with
devito.types.Scalar) and drop tests/test_distributed_engine.py.
The selection-layer Scalar collided with devito.types.Scalar, forcing an
alias wherever both are imported (e.g. test_data.py). Rename it to
IndexScalar at the source so the name is unambiguous everywhere and no
aliasing is needed.
Match the single-backtick `name` convention rather than the double
``name`` form, across the distributed engine and the docstrings/comments
this branch added to data.py, decomposition.py and test_data.py.
Pre-existing double-backtick text elsewhere is left untouched.
Times distributed data[idx] get/set against single-process NumPy on the
full domain, for a routed scatter assignment, the matching routed read,
and a local strided slice. Routed cases report cold (plan construction)
and warm (cached plan) timings, so the steady-state cost is visible.
Two hot-path costs in serial/local data[idx], surfaced by benchmarking:

* index_decomposition (used by every basic slice) built a full np.arange
  over the axis and ran np.isin against each whole subdomain -- O(domain)
  per slice. Subdomains are contiguous ranges, so bucket the selected
  indices by bounds in O(result size); an np.isin fallback covers any
  non-contiguous subdomain. A 1024-wide slice of a 2**20 axis drops from
  ~2.1 ms to ~20 us.

* convert_index ran index_glb_to_loc per element via np.vectorize for an
  array index. For a single (non-distributed) subdomain the map is just a
  negative-index wrap, so vectorize it; the distributed path (handled by
  the redistribution engine) is unchanged. A 65536-point assignment drops
  from ~62 ms to ~1.6 ms.

Both are behaviour-preserving (doctests, TestDecomposition, and the mode-4
slicing/advanced-indexing tests pass).
The first version pitted distributed devito against single-process NumPy
as if competing, which is the wrong comparison. Reframe into two studies,
auto-selected by the launch size, each making one point:

* serial (no mpirun): the data[idx] wrapper adds only a small overhead
  over NumPy, with no per-element Python loop;
* parallel (mpirun -n N): a routed assignment's cost scales with the
  volume communicated, while the NumPy reference (pure compute) is
  unaffected by the rank count.

Sweeps the number of indexed/communicated points and prints a table per
study.
index_handle_oob dropped out-of-bounds (None) entries with
np.delete(idx, where(idx == None)) on every 1-D array index. Only an
object array can hold those sentinels; a plain numeric index has none, so
the call was a wasted O(n) copy (plus an elementwise '== None'). Return
the index unchanged in that case. A 65536-point assignment drops from
~1.6 ms to ~0.17 ms; the object-array path (real OOB drops) is unchanged.
Add a ratio column and stop calling the serial overhead 'negligible'. It
is a fixed per-call cost (the .data accessor and global-index
translation): large relative to NumPy for a trivial index -- whose NumPy
cost is almost nothing -- but, being fixed, amortising to ~2x at realistic
operation sizes.
nbx_exchange polled with a tight 'while True: Iprobe' busy-wait, spinning
at 100% CPU while waiting for peer messages and the termination barrier.
When ranks are oversubscribed (more ranks than cores), a spinning rank
starves the very peer it is waiting on, so MPI makes no progress and the
exchange deadlocks. This is exactly the MPI-notebook CI configuration: a
4-engine ipyparallel cluster on a 2-core runner (OpenMPI --oversubscribe),
where it manifested as a multi-minute cell timeout.

Yield (time.sleep(0)) whenever a poll pass finds nothing ready, so
co-scheduled ranks can run; drain ready messages first via .
Verified: correctness unchanged (44 routed/gather mode-4 tests pass) and
16 ranks on 8 cores complete in 0.35s instead of hanging.
The data-initialization notebook's point-to-point routing (NBX exchange)
hung the 4-engine ipyparallel cluster under OpenMPI, while IntelMPI ran
fine. OpenMPI defaults to an aggressive busy-wait: with the engines
contending for cores, the probe/barrier loop spins at 100% and starves the
peers it waits on, so MPI never progresses. IntelMPI yields by default.

Set OMPI_MCA_mpi_yield_when_idle=1 in the examples-mpi job so OpenMPI yields
the CPU while waiting. Also revert the earlier Python-level sleep(0) in
nbx_exchange: sched_yield at the wrong layer (the spin is in OpenMPI's C
progress engine), which did not help.
@mloubout mloubout force-pushed the mpi-redistribution-engine branch 2 times, most recently from 60d8c80 to 52c7525 Compare June 25, 2026 23:38
Comment thread devito/data/distributed/exchange.py Outdated
def __init__(self, data, idx):
self.data = data
self.idx = idx
self._sig = _signature(data, idx)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be a cached_property

the docstring should also be clarified because at the moment it's pretty unclear what it actually does. We are (obviously) not using the data values for the cache key, but instead, we're using all the metadata associated with data

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be a cached_property

No, that's conflicting with __slots__ that stores an instance as a compact object without a __dict__. Adding a cached_property defeats the purpose.

@FabioLuporini

Copy link
Copy Markdown
Contributor

related?

#2217

@mloubout

Copy link
Copy Markdown
Contributor Author

Fixes #2217

@mloubout mloubout force-pushed the mpi-redistribution-engine branch from 52c7525 to 1da5547 Compare June 26, 2026 14:25
The point-to-point router used the NBX nonblocking-consensus algorithm
(Issend + Iprobe(ANY_SOURCE) + Ibarrier). It works under IntelMPI and
mpirun, but deadlocks under OpenMPI inside the ipyparallel example
notebooks -- whereas ordinary collective/halo MPI (e.g. op.apply) runs
fine there. So the NBX pattern itself is the fragile part.

Replace it with a reduce-scatter-based sparse exchange: each rank learns
how many peers will send to it via one Reduce_scatter_block over a
length-nprocs 0/1 indicator, then Isend / Probe / Recv / Waitall. These
are the same standard, portable calls that work elsewhere; payloads still
move strictly point-to-point (no data all-to-all). Rename nbx_exchange ->
sparse_exchange and nbx_push -> sparse_push accordingly, and revert the
ineffective OpenMPI yield CI env var.

Correctness verified under mpirun: TestDataDistributed, TestDataGather and
the sparse advanced-indexing tests pass (modes 4 and 6).
@mloubout mloubout force-pushed the mpi-redistribution-engine branch from 1da5547 to f17919a Compare June 26, 2026 14:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

MPI mpi-related

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants