Skip to content

Commit

Permalink
fix executor custom resource request+support pyarrow >=15 & spark 3.5…
Browse files Browse the repository at this point in the history
….2/3/4 (#419)

* fix executor custom resource request

* support spark 3.5.2/3

* migrate to latest API

* test against multiple ray version

* unpin pyarrow

* use iter_internal_ref_bundles in test, require ray >= 2.33.0

* fix test, init ray cluster properly

* worksaorund

* worksaorund

* work around a bug in spark

* Add spark 3.4.4 and 3.5.4 support
  • Loading branch information
pang-wu authored Dec 26, 2024
1 parent 7810e38 commit 0466c72
Show file tree
Hide file tree
Showing 12 changed files with 103 additions and 414 deletions.
13 changes: 5 additions & 8 deletions .github/workflows/ray_nightly_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
strategy:
matrix:
os: [ ubuntu-latest ]
python-version: [3.8, 3.9, 3.10.14]
python-version: [3.9, 3.10.14]
spark-version: [3.2.4, 3.3.2, 3.4.0, 3.5.0]

runs-on: ${{ matrix.os }}
Expand Down Expand Up @@ -82,17 +82,14 @@ jobs:
pip install torch
fi
case $PYTHON_VERSION in
3.7)
pip install "ray[train] @ https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-3.0.0.dev0-cp37-cp37m-manylinux2014_x86_64.whl"
;;
3.8)
pip install "ray[train] @ https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-3.0.0.dev0-cp38-cp38-manylinux2014_x86_64.whl"
;;
3.9)
pip install "ray[train] @ https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-3.0.0.dev0-cp39-cp39-manylinux2014_x86_64.whl"
;;
3.10.14)
pip install "ray[train] @ https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-3.0.0.dev0-cp310-cp310-manylinux2014_x86_64.whl"
;;
esac
pip install pyarrow==6.0.1 pytest koalas tensorflow==2.13.1 tabulate grpcio-tools wget
pip install pyarrow tqdm pytest tensorflow==2.13.1 tabulate grpcio-tools wget
pip install "xgboost_ray[default]<=0.1.13"
pip install torchmetrics
HOROVOD_WITH_GLOO=1
Expand Down
5 changes: 3 additions & 2 deletions .github/workflows/raydp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ jobs:
strategy:
matrix:
os: [ ubuntu-latest ]
python-version: [3.8, 3.9, 3.10.14]
python-version: [3.9, 3.10.14]
spark-version: [3.2.4, 3.3.2, 3.4.0, 3.5.0]
ray-version: [2.34.0, 2.40.0]

runs-on: ${{ matrix.os }}

Expand Down Expand Up @@ -82,7 +83,7 @@ jobs:
else
pip install torch
fi
pip install pyarrow==6.0.1 ray[train] pytest tensorflow==2.13.1 tabulate grpcio-tools wget
pip install pyarrow "ray[train]==${{ matrix.ray-version }}" tqdm pytest tensorflow==2.13.1 tabulate grpcio-tools wget
pip install "xgboost_ray[default]<=0.1.13"
pip install "xgboost<=2.0.3"
pip install torchmetrics
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,6 @@ examples/.ipynb_checkpoints/
*.parquet
*.crc
_SUCCESS

.metals/
.bloop/
Original file line number Diff line number Diff line change
Expand Up @@ -201,17 +201,15 @@ class RayCoarseGrainedSchedulerBackend(
}

def parseRayDPResourceRequirements(sparkConf: SparkConf): Map[String, Double] = {
sparkConf.getAllWithPrefix(
s"${SparkOnRayConfigs.SPARK_EXECUTOR_ACTOR_CPU_RESOURCE}.")
.filter{ case (key, _) => key.toLowerCase() != "cpu" }
.map{ case (key, _) => key }
.distinct
.map(name => {
val amountDouble = sparkConf.get(
s"${SparkOnRayConfigs.SPARK_EXECUTOR_ACTOR_CPU_RESOURCE}.${name}",
0d.toString).toDouble
name->amountDouble
})
sparkConf.getAll
.filter { case (key, _) =>
key.startsWith(SparkOnRayConfigs.SPARK_EXECUTOR_ACTOR_RESOURCE_PREFIX)
}
.map { case (key, value) =>
key.stripPrefix(
s"${SparkOnRayConfigs.SPARK_EXECUTOR_ACTOR_RESOURCE_PREFIX}.") -> value.toDouble
}
.filter { case (key, _) => key.toLowerCase() != "cpu" }
.toMap
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ object SparkShimProvider {
val SPARK341_DESCRIPTOR = SparkShimDescriptor(3, 4, 1)
val SPARK342_DESCRIPTOR = SparkShimDescriptor(3, 4, 2)
val SPARK343_DESCRIPTOR = SparkShimDescriptor(3, 4, 3)
val SPARK344_DESCRIPTOR = SparkShimDescriptor(3, 4, 4)
val DESCRIPTOR_STRINGS = Seq(s"$SPARK340_DESCRIPTOR", s"$SPARK341_DESCRIPTOR", s"$SPARK342_DESCRIPTOR",
s"$SPARK343_DESCRIPTOR")
s"$SPARK343_DESCRIPTOR", s"$SPARK344_DESCRIPTOR")
val DESCRIPTOR = SPARK341_DESCRIPTOR
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ import com.intel.raydp.shims.{SparkShims, SparkShimDescriptor}
object SparkShimProvider {
val SPARK350_DESCRIPTOR = SparkShimDescriptor(3, 5, 0)
val SPARK351_DESCRIPTOR = SparkShimDescriptor(3, 5, 1)
val DESCRIPTOR_STRINGS = Seq(s"$SPARK350_DESCRIPTOR", s"$SPARK351_DESCRIPTOR")
val SPARK352_DESCRIPTOR = SparkShimDescriptor(3, 5, 2)
val SPARK353_DESCRIPTOR = SparkShimDescriptor(3, 5, 3)
val SPARK354_DESCRIPTOR = SparkShimDescriptor(3, 5, 4)
val DESCRIPTOR_STRINGS = Seq(s"$SPARK350_DESCRIPTOR", s"$SPARK351_DESCRIPTOR", s"$SPARK352_DESCRIPTOR",
s"$SPARK353_DESCRIPTOR", s"$SPARK354_DESCRIPTOR")
val DESCRIPTOR = SPARK350_DESCRIPTOR
}

Expand Down
8 changes: 0 additions & 8 deletions python/raydp/spark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,3 @@
"ray_dataset_to_spark_dataframe",
"from_spark_recoverable"
]

try:
import ray.util.data
from .dataset import RayMLDataset
__all__.append("RayMLDataset")
except ImportError:
# Ray MLDataset is removed in Ray 2.0
pass
Loading

0 comments on commit 0466c72

Please sign in to comment.