Skip to content

Commit

Permalink
Merge branch 'release/0.6.2-RC2'
Browse files Browse the repository at this point in the history
  • Loading branch information
metasim committed May 9, 2018
2 parents 679f48d + d94bb15 commit 2bfa9af
Show file tree
Hide file tree
Showing 95 changed files with 2,960 additions and 549 deletions.
2 changes: 0 additions & 2 deletions .sbtopts
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
-Djava.awt.headless=true
-J-Xmx2g
-J-XX:MaxMetaspaceSize=1g

This file was deleted.

21 changes: 17 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,33 @@ addCommandAlias("console", "datasource/console")

lazy val root = project
.in(file("."))
.withId("RF")
.aggregate(core, datasource)
.settings(publishArtifact := false)
.withId("RasterFrames")
.aggregate(core, datasource, pyrasterframes, experimental)
.settings(publish / skip := true)
.settings(releaseSettings)

lazy val core = project
.disablePlugins(SparkPackagePlugin)

lazy val pyrasterframes = project
.dependsOn(core, datasource)
.settings(assemblySettings)

lazy val datasource = project
.dependsOn(core % "test->test;compile->compile")
.disablePlugins(SparkPackagePlugin)

lazy val experimental = project
.dependsOn(core % "test->test;compile->compile")
.dependsOn(datasource % "test->test;compile->compile")
.disablePlugins(SparkPackagePlugin)

lazy val docs = project
.dependsOn(core, datasource)
.disablePlugins(SparkPackagePlugin)

lazy val bench = project
.dependsOn(core)

.disablePlugins(SparkPackagePlugin)
.settings(publish / skip := true)

6 changes: 3 additions & 3 deletions core/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ moduleName := "rasterframes"

libraryDependencies ++= Seq(
"com.chuusai" %% "shapeless" % "2.3.2",
"org.locationtech.geomesa" %% "geomesa-z3" % "1.3.5",
"org.locationtech.geomesa" %% "geomesa-spark-jts" % "2.0.0-astraea.1" exclude("jgridshift", "jgridshift"),
"org.locationtech.geomesa" %% "geomesa-z3" % rfGeoMesaVersion.value,
"org.locationtech.geomesa" %% "geomesa-spark-jts" % rfGeoMesaVersion.value exclude("jgridshift", "jgridshift"),
spark("core").value % Provided,
spark("mllib").value % Provided,
spark("sql").value % Provided,
Expand All @@ -19,7 +19,7 @@ libraryDependencies ++= Seq(
)

buildInfoKeys ++= Seq[BuildInfoKey](
name, version, scalaVersion, sbtVersion, rfGeotrellisVersion, rfSparkVersion
name, version, scalaVersion, sbtVersion, rfGeoTrellisVersion, rfGeoMesaVersion, rfSparkVersion
)

buildInfoPackage := "astraea.spark.rasterframes"
Expand Down
144 changes: 144 additions & 0 deletions core/src/main/scala/astraea/spark/rasterframes/PairRDDConverter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package astraea.spark.rasterframes

import astraea.spark.rasterframes.util._
import geotrellis.raster.{MultibandTile, Tile, TileFeature}
import geotrellis.spark.{SpaceTimeKey, SpatialKey, TemporalKey}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.gt.types.TileUDT
import org.apache.spark.sql.types._

import scala.annotation.implicitNotFound

/**
* Typeclass for converting a Pair RDD into a dataframe.
*
* @since 4/8/18
*/
@implicitNotFound("An RDD converter is required create a RasterFrame. " +
"Please provide an implementation of PairRDDConverter[${K}, ${V}].")
trait PairRDDConverter[K, V] extends Serializable {
val schema: StructType
def toDataFrame(rdd: RDD[(K, V)])(implicit spark: SparkSession): DataFrame
}

object PairRDDConverter {
/** Enrichment over a pair RDD for converting it to a DataFrame given a converter. */
implicit class RDDCanBeDataFrame[K, V](rdd: RDD[(K, V)])(implicit spark: SparkSession, converter: PairRDDConverter[K, V]) {
def toDataFrame: DataFrame = converter.toDataFrame(rdd)
}

// Hack around Spark bug when singletons are used in schemas
private val serializableTileUDT = new TileUDT()

/** Fetch converter from implicit scope. */
def apply[K, V](implicit sp: PairRDDConverter[K, V]) = sp

/** Enables conversion of `RDD[(SpatialKey, Tile)]` to DataFrame. */
implicit val spatialTileConverter = new PairRDDConverter[SpatialKey, Tile] {
val schema: StructType = {
StructType(Seq(
StructField(SPATIAL_KEY_COLUMN.columnName, spatialKeyEncoder.schema, nullable = false),
StructField(TILE_COLUMN.columnName, serializableTileUDT, nullable = false)
))
}

def toDataFrame(rdd: RDD[(SpatialKey, Tile)])(implicit spark: SparkSession): DataFrame = {
import spark.implicits._
rdd.toDF(schema.fields.map(_.name): _*)
}
}

/** Enables conversion of `RDD[(SpaceTimeKey, Tile)]` to DataFrame. */
implicit val spaceTimeTileConverter = new PairRDDConverter[SpaceTimeKey, Tile] {
val schema: StructType = {
val base = spatialTileConverter.schema
val addedFields = Seq(StructField(TEMPORAL_KEY_COLUMN.columnName, temporalKeyEncoder.schema, nullable = false))
StructType(base.fields.patch(1, addedFields, 0))
}

def toDataFrame(rdd: RDD[(SpaceTimeKey, Tile)])(implicit spark: SparkSession): DataFrame = {
import spark.implicits._
rdd.map{ case (k, v) (k.spatialKey, k.temporalKey, v)}.toDF(schema.fields.map(_.name): _*)
}
}

/** Enables conversion of `RDD[(SpatialKey, TileFeature[Tile, D])]` to DataFrame. */
implicit def spatialTileFeatureConverter[D: Encoder] = new PairRDDConverter[SpatialKey, TileFeature[Tile, D]] {
implicit val featureEncoder = implicitly[Encoder[D]]
implicit val rowEncoder = Encoders.tuple(spatialKeyEncoder, singlebandTileEncoder, featureEncoder)

val schema: StructType = {
val base = spatialTileConverter.schema
StructType(base.fields :+ StructField(TILE_FEATURE_DATA_COLUMN.columnName, featureEncoder.schema, nullable = true))
}

def toDataFrame(rdd: RDD[(SpatialKey, TileFeature[Tile, D])])(implicit spark: SparkSession): DataFrame = {
import spark.implicits._
rdd.map{ case (k, v) (k, v.tile, v.data)}.toDF(schema.fields.map(_.name): _*)
}
}

/** Enables conversion of `RDD[(SpaceTimeKey, TileFeature[Tile, D])]` to DataFrame. */
implicit def spaceTimeTileFeatureConverter[D: Encoder] = new PairRDDConverter[SpaceTimeKey, TileFeature[Tile, D]] {
implicit val featureEncoder = implicitly[Encoder[D]]
implicit val rowEncoder = Encoders.tuple(spatialKeyEncoder, temporalKeyEncoder, singlebandTileEncoder, featureEncoder)

val schema: StructType = {
val base = spaceTimeTileConverter.schema
StructType(base.fields :+ StructField(TILE_FEATURE_DATA_COLUMN.columnName, featureEncoder.schema, nullable = true))
}

def toDataFrame(rdd: RDD[(SpaceTimeKey, TileFeature[Tile, D])])(implicit spark: SparkSession): DataFrame = {
import spark.implicits._
val tupRDD = rdd.map { case (k, v) (k.spatialKey, k.temporalKey, v.tile, v.data) }

rddToDatasetHolder(tupRDD)
tupRDD.toDF(schema.fields.map(_.name): _*)
}
}

/** Enables conversion of `RDD[(SpatialKey, MultibandTile)]` to DataFrame. */
def forSpatialMultiband(bands: Int) = new PairRDDConverter[SpatialKey, MultibandTile] {
val schema: StructType = {
val base = spatialTileConverter.schema

val basename = TILE_COLUMN.columnName

val tiles = for(i 1 to bands) yield {
StructField(s"${basename}_$i" , serializableTileUDT, nullable = false)
}

StructType(base.fields.patch(1, tiles, 1))
}

def toDataFrame(rdd: RDD[(SpatialKey, MultibandTile)])(implicit spark: SparkSession): DataFrame = {
spark.createDataFrame(
rdd.map { case (k, v) Row(Row(k.col, k.row) +: v.bands: _*) },
schema
)
}
}

/** Enables conversion of `RDD[(SpaceTimeKey, MultibandTile)]` to DataFrame. */
def forSpaceTimeMultiband(bands: Int) = new PairRDDConverter[SpaceTimeKey, MultibandTile] {
val schema: StructType = {
val base = spaceTimeTileConverter.schema

val basename = TILE_COLUMN.columnName

val tiles = for(i 1 to bands) yield {
StructField(s"${basename}_$i" , serializableTileUDT, nullable = false)
}

StructType(base.fields.patch(2, tiles, 1))
}

def toDataFrame(rdd: RDD[(SpaceTimeKey, MultibandTile)])(implicit spark: SparkSession): DataFrame = {
spark.createDataFrame(
rdd.map { case (k, v) Row(Seq(Row(k.spatialKey.col, k.spatialKey.row), Row(k.temporalKey)) ++ v.bands: _*) },
schema
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ package astraea.spark.rasterframes
import astraea.spark.rasterframes.encoders.SparkDefaultEncoders
import astraea.spark.rasterframes.expressions.ExplodeTileExpression
import astraea.spark.rasterframes.functions.{CellCountAggregateFunction, CellMeanAggregateFunction}
import astraea.spark.rasterframes.stats.{CellHistogram, CellStatistics}
import astraea.spark.rasterframes.{functions F}
import com.vividsolutions.jts.geom.Envelope
import geotrellis.raster.histogram.Histogram
import geotrellis.raster.mapalgebra.local.LocalTileBinaryOp
import geotrellis.raster.{CellType, Tile}
import org.apache.spark.annotation.Experimental
Expand Down Expand Up @@ -84,21 +84,25 @@ trait RasterFunctions {
def cellType(col: Column): TypedColumn[Any, String] =
expressions.CellTypeExpression(col.expr).asColumn.as[String]

/** Change the Tile's cell type */
def convertCellType(col: Column, cellType: CellType): TypedColumn[Any, Tile] =
udf[Tile, Tile](F.convertCellType(cellType)).apply(col).as[Tile]

/** Assign a `NoData` value to the Tiles. */
def withNoData(col: Column, nodata: Double) = withAlias("withNoData", col)(
udf[Tile, Tile](F.withNoData(nodata)).apply(col)
).as[Tile]

/** Compute the full column aggregate floating point histogram. */
def aggHistogram(col: Column): TypedColumn[Any, Histogram[Double]] =
def aggHistogram(col: Column): TypedColumn[Any, CellHistogram] =
withAlias("histogram", col)(
F.aggHistogram(col)
).as[Histogram[Double]]
).as[CellHistogram]

/** Compute the full column aggregate floating point statistics. */
def aggStats(col: Column): TypedColumn[Any, Statistics] = withAlias("aggStats", col)(
def aggStats(col: Column): TypedColumn[Any, CellStatistics] = withAlias("aggStats", col)(
F.aggStats(col)
).as[Statistics]
).as[CellStatistics]

/** Computes the column aggregate mean. */
def aggMean(col: Column) = CellMeanAggregateFunction(col.expr)
Expand Down Expand Up @@ -140,16 +144,16 @@ trait RasterFunctions {
).as[Double]

/** Compute TileHistogram of Tile values. */
def tileHistogram(col: Column): TypedColumn[Any, Histogram[Double]] =
def tileHistogram(col: Column): TypedColumn[Any, CellHistogram] =
withAlias("tileHistogram", col)(
udf[Histogram[Double], Tile](F.tileHistogram).apply(col)
).as[Histogram[Double]]
udf[CellHistogram, Tile](F.tileHistogram).apply(col)
).as[CellHistogram]

/** Compute statistics of Tile values. */
def tileStats(col: Column): TypedColumn[Any, Statistics] =
def tileStats(col: Column): TypedColumn[Any, CellStatistics] =
withAlias("tileStats", col)(
udf[Statistics, Tile](F.tileStats).apply(col)
).as[Statistics]
udf[CellStatistics, Tile](F.tileStats).apply(col)
).as[CellStatistics]

/** Counts the number of non-NoData cells per Tile. */
def dataCells(tile: Column): TypedColumn[Any, Long] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,36 +19,35 @@

package astraea.spark.rasterframes.encoders

import astraea.spark.rasterframes.Statistics
import geotrellis.raster.histogram.Histogram
import geotrellis.raster.{MultibandTile, Tile}
import astraea.spark.rasterframes.stats.{CellHistogram, CellStatistics}
import geotrellis.raster.Tile
import geotrellis.spark.tiling.LayoutDefinition
import geotrellis.spark.{KeyBounds, SpaceTimeKey, SpatialKey, TemporalKey, TileLayerMetadata}
import geotrellis.vector.Extent
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.{Encoder, Encoders}

import scala.reflect.runtime.universe._

/**
* Implicit encoder definitions for RasterFrame types.
*/
trait StandardEncoders {
implicit val spatialKeyEncoder = ExpressionEncoder[SpatialKey]
implicit val temporalKeyEncoder = ExpressionEncoder[TemporalKey]
implicit val spaceTimeKeyEncoder = ExpressionEncoder[SpaceTimeKey]
implicit val statsEncoder = ExpressionEncoder[CellStatistics]
implicit val histEncoder = ExpressionEncoder[CellHistogram]
implicit val layoutDefinitionEncoder = ExpressionEncoder[LayoutDefinition]
implicit val stkBoundsEncoder = ExpressionEncoder[KeyBounds[SpaceTimeKey]]
implicit val extentEncoder = ExpressionEncoder[Extent]

implicit def singlebandTileEncoder = ExpressionEncoder[Tile]()
implicit def multibandTileEncoder = ExpressionEncoder[MultibandTile]()
implicit def tileLayerMetadataEncoder[K: TypeTag]: Encoder[TileLayerMetadata[K]] = TileLayerMetadataEncoder[K]()
implicit val crsEncoder = CRSEncoder()
implicit val extentEncoder = ExpressionEncoder[Extent]()
implicit val projectedExtentEncoder = ProjectedExtentEncoder()
implicit val temporalProjectedExtentEncoder = TemporalProjectedExtentEncoder()
implicit def histogramDoubleEncoder = ExpressionEncoder[Histogram[Double]]()
implicit val statsEncoder = ExpressionEncoder[Statistics]()
implicit def tileLayerMetadataEncoder[K: TypeTag]: Encoder[TileLayerMetadata[K]] = TileLayerMetadataEncoder[K]()
implicit val layoutDefinitionEncoder = ExpressionEncoder[LayoutDefinition]()
implicit val stkBoundsEncoder = ExpressionEncoder[KeyBounds[SpaceTimeKey]]()
implicit val cellTypeEncoder = CellTypeEncoder()
implicit val spatialKeyEncoder = ExpressionEncoder[SpatialKey]()
implicit val temporalKeyEncoder = ExpressionEncoder[TemporalKey]()
implicit val spaceTimeKeyEncoder = ExpressionEncoder[SpaceTimeKey]()
implicit val uriEncoder = URIEncoder()
implicit val envelopeEncoder = EnvelopeEncoder()
}
Expand Down
Loading

0 comments on commit 2bfa9af

Please sign in to comment.