diff --git a/distributed/protocol/tests/test_highlevelgraph.py b/distributed/protocol/tests/test_highlevelgraph.py index ba1f6478a8..b3df069a92 100644 --- a/distributed/protocol/tests/test_highlevelgraph.py +++ b/distributed/protocol/tests/test_highlevelgraph.py @@ -1,7 +1,5 @@ from __future__ import annotations -import contextlib - import pytest np = pytest.importorskip("numpy") @@ -175,11 +173,9 @@ async def test_dataframe_annotations(c, s, a, b): acol = df["a"] bcol = df["b"] - ctx = contextlib.nullcontext() - if dd._dask_expr_enabled(): - ctx = pytest.warns( - UserWarning, match="Annotations will be ignored when using query-planning" - ) + ctx = pytest.warns( + UserWarning, match="Annotations will be ignored when using query-planning" + ) with dask.annotate(retries=retries), ctx: df = acol + bcol @@ -189,7 +185,3 @@ async def test_dataframe_annotations(c, s, a, b): assert rdf.dtypes == np.float64 assert (rdf == 10.0).all() - - if not dd._dask_expr_enabled(): - # There is an annotation match per partition (i.e. task) - assert plugin.retry_matches == df.npartitions diff --git a/distributed/shuffle/tests/test_merge.py b/distributed/shuffle/tests/test_merge.py index 2143d9920e..92d1152bf8 100644 --- a/distributed/shuffle/tests/test_merge.py +++ b/distributed/shuffle/tests/test_merge.py @@ -56,13 +56,10 @@ async def test_basic_merge(c, s, a, b, how): joined = a.merge(b, left_on="y", right_on="y", how=how) - if dd._dask_expr_enabled(): - # Ensure we're using a hash join - from dask_expr._merge import HashJoinP2P + # Ensure we're using a hash join + from dask_expr._merge import HashJoinP2P - assert any( - isinstance(expr, HashJoinP2P) for expr in joined.optimize()._expr.walk() - ) + assert any(isinstance(expr, HashJoinP2P) for expr in joined.optimize()._expr.walk()) expected = pd.merge(A, B, how, "y") await list_eq(joined, expected) diff --git a/distributed/shuffle/tests/test_shuffle.py b/distributed/shuffle/tests/test_shuffle.py index 24887ec47b..18dc1f1887 100644 --- a/distributed/shuffle/tests/test_shuffle.py +++ b/distributed/shuffle/tests/test_shuffle.py @@ -1637,9 +1637,7 @@ async def test_multi(c, s, a, b): await assert_scheduler_cleanup(s) -@pytest.mark.skipif( - dd._dask_expr_enabled(), reason="worker restrictions are not supported in dask-expr" -) +@pytest.mark.skipif(reason="worker restrictions are not supported in dask-expr") @gen_cluster(client=True) async def test_restrictions(c, s, a, b): df = dask.datasets.timeseries( diff --git a/distributed/shuffle/tests/utils.py b/distributed/shuffle/tests/utils.py index 693a044373..f0d7abac0a 100644 --- a/distributed/shuffle/tests/utils.py +++ b/distributed/shuffle/tests/utils.py @@ -6,14 +6,7 @@ from distributed.core import PooledRPCCall from distributed.shuffle._core import ShuffleId, ShuffleRun -UNPACK_PREFIX = "shuffle_p2p" -try: - import dask.dataframe as dd - - if dd._dask_expr_enabled(): - UNPACK_PREFIX = "p2pshuffle" -except ImportError: - pass +UNPACK_PREFIX = "p2pshuffle" class PooledRPCShuffle(PooledRPCCall):