diff --git a/.gitignore b/.gitignore index 95e893fa..4642a059 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ __pycache__/ build/ dist/ *.egg-info/ +*.eggs/ dev/.tmp_dir/ target/ diff --git a/core/pom.xml b/core/pom.xml index ae9e2239..d4272f99 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -17,6 +17,7 @@ 3.2.2 3.3.0 3.4.0 + 3.5.0 1.1.10.4 4.1.94.Final 1.10.0 diff --git a/core/shims/pom.xml b/core/shims/pom.xml index 571eeac2..c013538b 100644 --- a/core/shims/pom.xml +++ b/core/shims/pom.xml @@ -20,6 +20,7 @@ spark322 spark330 spark340 + spark350 diff --git a/core/shims/spark350/pom.xml b/core/shims/spark350/pom.xml new file mode 100644 index 00000000..2368daa2 --- /dev/null +++ b/core/shims/spark350/pom.xml @@ -0,0 +1,99 @@ + + + + 4.0.0 + + + com.intel + raydp-shims + 1.7.0-SNAPSHOT + ../pom.xml + + + raydp-shims-spark350 + RayDP Shims for Spark 3.5.0 + jar + + + 2.12.15 + 2.13.5 + + + + + + org.scalastyle + scalastyle-maven-plugin + + + net.alchim31.maven + scala-maven-plugin + 3.2.2 + + + scala-compile-first + process-resources + + compile + + + + scala-test-compile-first + process-test-resources + + testCompile + + + + + + + + + src/main/resources + + + + + + + com.intel + raydp-shims-common + ${project.version} + compile + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark350.version} + provided + + + org.apache.spark + spark-core_${scala.binary.version} + ${spark350.version} + provided + + + org.xerial.snappy + snappy-java + + + io.netty + netty-handler + + + + + org.xerial.snappy + snappy-java + ${snappy.version} + + + io.netty + netty-handler + ${netty.version} + + + diff --git a/core/shims/spark350/src/main/resources/META-INF/services/com.intel.raydp.shims.SparkShimProvider b/core/shims/spark350/src/main/resources/META-INF/services/com.intel.raydp.shims.SparkShimProvider new file mode 100644 index 00000000..6e5a394e --- /dev/null +++ b/core/shims/spark350/src/main/resources/META-INF/services/com.intel.raydp.shims.SparkShimProvider @@ -0,0 +1 @@ +com.intel.raydp.shims.spark350.SparkShimProvider diff --git a/core/shims/spark350/src/main/scala/com/intel/raydp/shims/SparkShimProvider.scala b/core/shims/spark350/src/main/scala/com/intel/raydp/shims/SparkShimProvider.scala new file mode 100644 index 00000000..bca05fa1 --- /dev/null +++ b/core/shims/spark350/src/main/scala/com/intel/raydp/shims/SparkShimProvider.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.raydp.shims.spark350 + +import com.intel.raydp.shims.{SparkShims, SparkShimDescriptor} + +object SparkShimProvider { + val SPARK350_DESCRIPTOR = SparkShimDescriptor(3, 5, 0) + val DESCRIPTOR_STRINGS = Seq(s"$SPARK350_DESCRIPTOR") + val DESCRIPTOR = SPARK350_DESCRIPTOR +} + +class SparkShimProvider extends com.intel.raydp.shims.SparkShimProvider { + def createShim: SparkShims = { + new Spark350Shims() + } + + def matches(version: String): Boolean = { + SparkShimProvider.DESCRIPTOR_STRINGS.contains(version) + } +} diff --git a/core/shims/spark350/src/main/scala/com/intel/raydp/shims/SparkShims.scala b/core/shims/spark350/src/main/scala/com/intel/raydp/shims/SparkShims.scala new file mode 100644 index 00000000..558614a0 --- /dev/null +++ b/core/shims/spark350/src/main/scala/com/intel/raydp/shims/SparkShims.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.raydp.shims.spark350 + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.executor.RayDPExecutorBackendFactory +import org.apache.spark.executor.spark350._ +import org.apache.spark.spark350.TaskContextUtils +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.spark350.SparkSqlUtils + +import com.intel.raydp.shims.{ShimDescriptor, SparkShims} + +class Spark350Shims extends SparkShims { + override def getShimDescriptor: ShimDescriptor = SparkShimProvider.DESCRIPTOR + + override def toDataFrame( + rdd: JavaRDD[Array[Byte]], + schema: String, + session: SparkSession): DataFrame = { + SparkSqlUtils.toDataFrame(rdd, schema, session) + } + + override def getExecutorBackendFactory(): RayDPExecutorBackendFactory = { + new RayDPSpark350ExecutorBackendFactory() + } + + override def getDummyTaskContext(partitionId: Int, env: SparkEnv): TaskContext = { + TaskContextUtils.getDummyTaskContext(partitionId, env) + } +} diff --git a/core/shims/spark350/src/main/scala/org/apache/spark/TaskContextUtils.scala b/core/shims/spark350/src/main/scala/org/apache/spark/TaskContextUtils.scala new file mode 100644 index 00000000..0f38bbb9 --- /dev/null +++ b/core/shims/spark350/src/main/scala/org/apache/spark/TaskContextUtils.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.spark350 + +import java.util.Properties + +import org.apache.spark.{SparkEnv, TaskContext, TaskContextImpl} +import org.apache.spark.memory.TaskMemoryManager + +object TaskContextUtils { + def getDummyTaskContext(partitionId: Int, env: SparkEnv): TaskContext = { + new TaskContextImpl(0, 0, partitionId, -1024, 0, 0, + new TaskMemoryManager(env.memoryManager, 0), new Properties(), env.metricsSystem) + } +} diff --git a/core/shims/spark350/src/main/scala/org/apache/spark/executor/RayCoarseGrainedExecutorBackend.scala b/core/shims/spark350/src/main/scala/org/apache/spark/executor/RayCoarseGrainedExecutorBackend.scala new file mode 100644 index 00000000..2e6b5e25 --- /dev/null +++ b/core/shims/spark350/src/main/scala/org/apache/spark/executor/RayCoarseGrainedExecutorBackend.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.net.URL + +import org.apache.spark.SparkEnv +import org.apache.spark.resource.ResourceProfile +import org.apache.spark.rpc.RpcEnv + +class RayCoarseGrainedExecutorBackend( + rpcEnv: RpcEnv, + driverUrl: String, + executorId: String, + bindAddress: String, + hostname: String, + cores: Int, + userClassPath: Seq[URL], + env: SparkEnv, + resourcesFileOpt: Option[String], + resourceProfile: ResourceProfile) + extends CoarseGrainedExecutorBackend( + rpcEnv, + driverUrl, + executorId, + bindAddress, + hostname, + cores, + env, + resourcesFileOpt, + resourceProfile) { + + override def getUserClassPath: Seq[URL] = userClassPath + +} diff --git a/core/shims/spark350/src/main/scala/org/apache/spark/executor/RayDPSpark350ExecutorBackendFactory.scala b/core/shims/spark350/src/main/scala/org/apache/spark/executor/RayDPSpark350ExecutorBackendFactory.scala new file mode 100644 index 00000000..54d53d7d --- /dev/null +++ b/core/shims/spark350/src/main/scala/org/apache/spark/executor/RayDPSpark350ExecutorBackendFactory.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor.spark350 + +import java.net.URL + +import org.apache.spark.SparkEnv +import org.apache.spark.executor._ +import org.apache.spark.resource.ResourceProfile +import org.apache.spark.rpc.RpcEnv + +class RayDPSpark350ExecutorBackendFactory + extends RayDPExecutorBackendFactory { + override def createExecutorBackend( + rpcEnv: RpcEnv, + driverUrl: String, + executorId: String, + bindAddress: String, + hostname: String, + cores: Int, + userClassPath: Seq[URL], + env: SparkEnv, + resourcesFileOpt: Option[String], + resourceProfile: ResourceProfile): CoarseGrainedExecutorBackend = { + new RayCoarseGrainedExecutorBackend( + rpcEnv, + driverUrl, + executorId, + bindAddress, + hostname, + cores, + userClassPath, + env, + resourcesFileOpt, + resourceProfile) + } +} diff --git a/core/shims/spark350/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala b/core/shims/spark350/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala new file mode 100644 index 00000000..3ae4ae80 --- /dev/null +++ b/core/shims/spark350/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.spark350 + +import org.apache.spark.TaskContext +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.sql.{DataFrame, SparkSession, SQLContext} +import org.apache.spark.sql.execution.arrow.ArrowConverters +import org.apache.spark.sql.types._ + +object SparkSqlUtils { + def toDataFrame( + arrowBatchRDD: JavaRDD[Array[Byte]], + schemaString: String, + session: SparkSession): DataFrame = { + val schema = DataType.fromJson(schemaString).asInstanceOf[StructType] + val timeZoneId = session.sessionState.conf.sessionLocalTimeZone + val rdd = arrowBatchRDD.rdd.mapPartitions { iter => + val context = TaskContext.get() + ArrowConverters.fromBatchIterator(iter, schema, timeZoneId,false, context) + } + session.internalCreateDataFrame(rdd.setName("arrow"), schema) + } +}