From 8f1b24178f39c63820fbc081b234d40dda293346 Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Fri, 20 Dec 2024 20:43:14 +0100 Subject: [PATCH] Clean up tests after legacy DataFrame removal (#8972) --- distributed/protocol/tests/test_highlevelgraph.py | 14 +++----------- distributed/shuffle/tests/test_merge.py | 9 +++------ distributed/shuffle/tests/test_shuffle.py | 4 +--- distributed/shuffle/tests/utils.py | 9 +-------- 4 files changed, 8 insertions(+), 28 deletions(-) diff --git a/distributed/protocol/tests/test_highlevelgraph.py b/distributed/protocol/tests/test_highlevelgraph.py index ba1f6478a8e..b3df069a92e 100644 --- a/distributed/protocol/tests/test_highlevelgraph.py +++ b/distributed/protocol/tests/test_highlevelgraph.py @@ -1,7 +1,5 @@ from __future__ import annotations -import contextlib - import pytest np = pytest.importorskip("numpy") @@ -175,11 +173,9 @@ async def test_dataframe_annotations(c, s, a, b): acol = df["a"] bcol = df["b"] - ctx = contextlib.nullcontext() - if dd._dask_expr_enabled(): - ctx = pytest.warns( - UserWarning, match="Annotations will be ignored when using query-planning" - ) + ctx = pytest.warns( + UserWarning, match="Annotations will be ignored when using query-planning" + ) with dask.annotate(retries=retries), ctx: df = acol + bcol @@ -189,7 +185,3 @@ async def test_dataframe_annotations(c, s, a, b): assert rdf.dtypes == np.float64 assert (rdf == 10.0).all() - - if not dd._dask_expr_enabled(): - # There is an annotation match per partition (i.e. task) - assert plugin.retry_matches == df.npartitions diff --git a/distributed/shuffle/tests/test_merge.py b/distributed/shuffle/tests/test_merge.py index 2143d9920e0..92d1152bf8e 100644 --- a/distributed/shuffle/tests/test_merge.py +++ b/distributed/shuffle/tests/test_merge.py @@ -56,13 +56,10 @@ async def test_basic_merge(c, s, a, b, how): joined = a.merge(b, left_on="y", right_on="y", how=how) - if dd._dask_expr_enabled(): - # Ensure we're using a hash join - from dask_expr._merge import HashJoinP2P + # Ensure we're using a hash join + from dask_expr._merge import HashJoinP2P - assert any( - isinstance(expr, HashJoinP2P) for expr in joined.optimize()._expr.walk() - ) + assert any(isinstance(expr, HashJoinP2P) for expr in joined.optimize()._expr.walk()) expected = pd.merge(A, B, how, "y") await list_eq(joined, expected) diff --git a/distributed/shuffle/tests/test_shuffle.py b/distributed/shuffle/tests/test_shuffle.py index 24887ec47b5..18dc1f18879 100644 --- a/distributed/shuffle/tests/test_shuffle.py +++ b/distributed/shuffle/tests/test_shuffle.py @@ -1637,9 +1637,7 @@ async def test_multi(c, s, a, b): await assert_scheduler_cleanup(s) -@pytest.mark.skipif( - dd._dask_expr_enabled(), reason="worker restrictions are not supported in dask-expr" -) +@pytest.mark.skipif(reason="worker restrictions are not supported in dask-expr") @gen_cluster(client=True) async def test_restrictions(c, s, a, b): df = dask.datasets.timeseries( diff --git a/distributed/shuffle/tests/utils.py b/distributed/shuffle/tests/utils.py index 693a044373e..f0d7abac0a7 100644 --- a/distributed/shuffle/tests/utils.py +++ b/distributed/shuffle/tests/utils.py @@ -6,14 +6,7 @@ from distributed.core import PooledRPCCall from distributed.shuffle._core import ShuffleId, ShuffleRun -UNPACK_PREFIX = "shuffle_p2p" -try: - import dask.dataframe as dd - - if dd._dask_expr_enabled(): - UNPACK_PREFIX = "p2pshuffle" -except ImportError: - pass +UNPACK_PREFIX = "p2pshuffle" class PooledRPCShuffle(PooledRPCCall):