diff --git a/.sbtopts b/.sbtopts index d0d25ed4f..ca8c83416 100644 --- a/.sbtopts +++ b/.sbtopts @@ -1,3 +1 @@ --Djava.awt.headless=true --J-Xmx2g -J-XX:MaxMetaspaceSize=1g diff --git a/bench/src/main/scala/astraea/spark/rasterframes/bench/HistogramEncodeBench.scala b/bench/src/main/scala/astraea/spark/rasterframes/bench/HistogramEncodeBench.scala deleted file mode 100644 index bfe378570..000000000 --- a/bench/src/main/scala/astraea/spark/rasterframes/bench/HistogramEncodeBench.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * This software is licensed under the Apache 2 license, quoted below. - * - * Copyright 2017 Astraea, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * [http://www.apache.org/licenses/LICENSE-2.0] - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - * - */ - -package astraea.spark.rasterframes.bench - -import java.util.concurrent.TimeUnit - -import geotrellis.raster.histogram.{Histogram, StreamingHistogram} -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.gt.types.HistogramUDT -import org.openjdk.jmh.annotations._ - -@BenchmarkMode(Array(Mode.AverageTime)) -@State(Scope.Benchmark) -@OutputTimeUnit(TimeUnit.MICROSECONDS) -/** - * @author sfitch - * @since 9/29/17 - */ -class HistogramEncodeBench extends SparkEnv { - - val encoder: ExpressionEncoder[Histogram[Double]] = ExpressionEncoder() - val boundEncoder = encoder.resolveAndBind() - - @Param(Array("float64")) - var cellTypeName: String = _ - - @Param(Array("128")) - var tileSize: Int = _ - - @Param(Array("400")) - var numTiles: Int = _ - - @transient - var histogram: Histogram[Double] = _ - - @Setup(Level.Trial) - def setupData(): Unit = { - // Creates what should hopefully be a representative structure - val tiles = Seq.fill(numTiles)(randomTile(tileSize, tileSize, cellTypeName)) - histogram = tiles.foldLeft(StreamingHistogram(): Histogram[Double])( - (hist, tile) ⇒ hist.merge(StreamingHistogram.fromTile(tile)) - ) - } - @Benchmark - def serialize(): Any = { - HistogramUDT.serialize(histogram) - } - - @Benchmark - def encode(): InternalRow = { - boundEncoder.toRow(histogram) - } - -// @Benchmark -// def roundTrip(): Tile = { -// } -} - diff --git a/build.sbt b/build.sbt index b7d645e3f..15571e4fd 100644 --- a/build.sbt +++ b/build.sbt @@ -3,20 +3,33 @@ addCommandAlias("console", "datasource/console") lazy val root = project .in(file(".")) - .withId("RF") - .aggregate(core, datasource) - .settings(publishArtifact := false) + .withId("RasterFrames") + .aggregate(core, datasource, pyrasterframes, experimental) + .settings(publish / skip := true) .settings(releaseSettings) lazy val core = project + .disablePlugins(SparkPackagePlugin) + +lazy val pyrasterframes = project + .dependsOn(core, datasource) + .settings(assemblySettings) lazy val datasource = project .dependsOn(core % "test->test;compile->compile") + .disablePlugins(SparkPackagePlugin) + +lazy val experimental = project + .dependsOn(core % "test->test;compile->compile") + .dependsOn(datasource % "test->test;compile->compile") + .disablePlugins(SparkPackagePlugin) lazy val docs = project .dependsOn(core, datasource) + .disablePlugins(SparkPackagePlugin) lazy val bench = project .dependsOn(core) - + .disablePlugins(SparkPackagePlugin) + .settings(publish / skip := true) diff --git a/core/build.sbt b/core/build.sbt index 94bc554e9..04d6a18df 100644 --- a/core/build.sbt +++ b/core/build.sbt @@ -4,8 +4,8 @@ moduleName := "rasterframes" libraryDependencies ++= Seq( "com.chuusai" %% "shapeless" % "2.3.2", - "org.locationtech.geomesa" %% "geomesa-z3" % "1.3.5", - "org.locationtech.geomesa" %% "geomesa-spark-jts" % "2.0.0-astraea.1" exclude("jgridshift", "jgridshift"), + "org.locationtech.geomesa" %% "geomesa-z3" % rfGeoMesaVersion.value, + "org.locationtech.geomesa" %% "geomesa-spark-jts" % rfGeoMesaVersion.value exclude("jgridshift", "jgridshift"), spark("core").value % Provided, spark("mllib").value % Provided, spark("sql").value % Provided, @@ -19,7 +19,7 @@ libraryDependencies ++= Seq( ) buildInfoKeys ++= Seq[BuildInfoKey]( - name, version, scalaVersion, sbtVersion, rfGeotrellisVersion, rfSparkVersion + name, version, scalaVersion, sbtVersion, rfGeoTrellisVersion, rfGeoMesaVersion, rfSparkVersion ) buildInfoPackage := "astraea.spark.rasterframes" diff --git a/core/src/main/scala/astraea/spark/rasterframes/PairRDDConverter.scala b/core/src/main/scala/astraea/spark/rasterframes/PairRDDConverter.scala new file mode 100644 index 000000000..68263d5c9 --- /dev/null +++ b/core/src/main/scala/astraea/spark/rasterframes/PairRDDConverter.scala @@ -0,0 +1,144 @@ +package astraea.spark.rasterframes + +import astraea.spark.rasterframes.util._ +import geotrellis.raster.{MultibandTile, Tile, TileFeature} +import geotrellis.spark.{SpaceTimeKey, SpatialKey, TemporalKey} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql._ +import org.apache.spark.sql.gt.types.TileUDT +import org.apache.spark.sql.types._ + +import scala.annotation.implicitNotFound + +/** + * Typeclass for converting a Pair RDD into a dataframe. + * + * @since 4/8/18 + */ +@implicitNotFound("An RDD converter is required create a RasterFrame. " + + "Please provide an implementation of PairRDDConverter[${K}, ${V}].") +trait PairRDDConverter[K, V] extends Serializable { + val schema: StructType + def toDataFrame(rdd: RDD[(K, V)])(implicit spark: SparkSession): DataFrame +} + +object PairRDDConverter { + /** Enrichment over a pair RDD for converting it to a DataFrame given a converter. */ + implicit class RDDCanBeDataFrame[K, V](rdd: RDD[(K, V)])(implicit spark: SparkSession, converter: PairRDDConverter[K, V]) { + def toDataFrame: DataFrame = converter.toDataFrame(rdd) + } + + // Hack around Spark bug when singletons are used in schemas + private val serializableTileUDT = new TileUDT() + + /** Fetch converter from implicit scope. */ + def apply[K, V](implicit sp: PairRDDConverter[K, V]) = sp + + /** Enables conversion of `RDD[(SpatialKey, Tile)]` to DataFrame. */ + implicit val spatialTileConverter = new PairRDDConverter[SpatialKey, Tile] { + val schema: StructType = { + StructType(Seq( + StructField(SPATIAL_KEY_COLUMN.columnName, spatialKeyEncoder.schema, nullable = false), + StructField(TILE_COLUMN.columnName, serializableTileUDT, nullable = false) + )) + } + + def toDataFrame(rdd: RDD[(SpatialKey, Tile)])(implicit spark: SparkSession): DataFrame = { + import spark.implicits._ + rdd.toDF(schema.fields.map(_.name): _*) + } + } + + /** Enables conversion of `RDD[(SpaceTimeKey, Tile)]` to DataFrame. */ + implicit val spaceTimeTileConverter = new PairRDDConverter[SpaceTimeKey, Tile] { + val schema: StructType = { + val base = spatialTileConverter.schema + val addedFields = Seq(StructField(TEMPORAL_KEY_COLUMN.columnName, temporalKeyEncoder.schema, nullable = false)) + StructType(base.fields.patch(1, addedFields, 0)) + } + + def toDataFrame(rdd: RDD[(SpaceTimeKey, Tile)])(implicit spark: SparkSession): DataFrame = { + import spark.implicits._ + rdd.map{ case (k, v) ⇒ (k.spatialKey, k.temporalKey, v)}.toDF(schema.fields.map(_.name): _*) + } + } + + /** Enables conversion of `RDD[(SpatialKey, TileFeature[Tile, D])]` to DataFrame. */ + implicit def spatialTileFeatureConverter[D: Encoder] = new PairRDDConverter[SpatialKey, TileFeature[Tile, D]] { + implicit val featureEncoder = implicitly[Encoder[D]] + implicit val rowEncoder = Encoders.tuple(spatialKeyEncoder, singlebandTileEncoder, featureEncoder) + + val schema: StructType = { + val base = spatialTileConverter.schema + StructType(base.fields :+ StructField(TILE_FEATURE_DATA_COLUMN.columnName, featureEncoder.schema, nullable = true)) + } + + def toDataFrame(rdd: RDD[(SpatialKey, TileFeature[Tile, D])])(implicit spark: SparkSession): DataFrame = { + import spark.implicits._ + rdd.map{ case (k, v) ⇒ (k, v.tile, v.data)}.toDF(schema.fields.map(_.name): _*) + } + } + + /** Enables conversion of `RDD[(SpaceTimeKey, TileFeature[Tile, D])]` to DataFrame. */ + implicit def spaceTimeTileFeatureConverter[D: Encoder] = new PairRDDConverter[SpaceTimeKey, TileFeature[Tile, D]] { + implicit val featureEncoder = implicitly[Encoder[D]] + implicit val rowEncoder = Encoders.tuple(spatialKeyEncoder, temporalKeyEncoder, singlebandTileEncoder, featureEncoder) + + val schema: StructType = { + val base = spaceTimeTileConverter.schema + StructType(base.fields :+ StructField(TILE_FEATURE_DATA_COLUMN.columnName, featureEncoder.schema, nullable = true)) + } + + def toDataFrame(rdd: RDD[(SpaceTimeKey, TileFeature[Tile, D])])(implicit spark: SparkSession): DataFrame = { + import spark.implicits._ + val tupRDD = rdd.map { case (k, v) ⇒ (k.spatialKey, k.temporalKey, v.tile, v.data) } + + rddToDatasetHolder(tupRDD) + tupRDD.toDF(schema.fields.map(_.name): _*) + } + } + + /** Enables conversion of `RDD[(SpatialKey, MultibandTile)]` to DataFrame. */ + def forSpatialMultiband(bands: Int) = new PairRDDConverter[SpatialKey, MultibandTile] { + val schema: StructType = { + val base = spatialTileConverter.schema + + val basename = TILE_COLUMN.columnName + + val tiles = for(i ← 1 to bands) yield { + StructField(s"${basename}_$i" , serializableTileUDT, nullable = false) + } + + StructType(base.fields.patch(1, tiles, 1)) + } + + def toDataFrame(rdd: RDD[(SpatialKey, MultibandTile)])(implicit spark: SparkSession): DataFrame = { + spark.createDataFrame( + rdd.map { case (k, v) ⇒ Row(Row(k.col, k.row) +: v.bands: _*) }, + schema + ) + } + } + + /** Enables conversion of `RDD[(SpaceTimeKey, MultibandTile)]` to DataFrame. */ + def forSpaceTimeMultiband(bands: Int) = new PairRDDConverter[SpaceTimeKey, MultibandTile] { + val schema: StructType = { + val base = spaceTimeTileConverter.schema + + val basename = TILE_COLUMN.columnName + + val tiles = for(i ← 1 to bands) yield { + StructField(s"${basename}_$i" , serializableTileUDT, nullable = false) + } + + StructType(base.fields.patch(2, tiles, 1)) + } + + def toDataFrame(rdd: RDD[(SpaceTimeKey, MultibandTile)])(implicit spark: SparkSession): DataFrame = { + spark.createDataFrame( + rdd.map { case (k, v) ⇒ Row(Seq(Row(k.spatialKey.col, k.spatialKey.row), Row(k.temporalKey)) ++ v.bands: _*) }, + schema + ) + } + } +} diff --git a/core/src/main/scala/astraea/spark/rasterframes/RasterFunctions.scala b/core/src/main/scala/astraea/spark/rasterframes/RasterFunctions.scala index e1359943f..4a0e2e054 100644 --- a/core/src/main/scala/astraea/spark/rasterframes/RasterFunctions.scala +++ b/core/src/main/scala/astraea/spark/rasterframes/RasterFunctions.scala @@ -22,9 +22,9 @@ package astraea.spark.rasterframes import astraea.spark.rasterframes.encoders.SparkDefaultEncoders import astraea.spark.rasterframes.expressions.ExplodeTileExpression import astraea.spark.rasterframes.functions.{CellCountAggregateFunction, CellMeanAggregateFunction} +import astraea.spark.rasterframes.stats.{CellHistogram, CellStatistics} import astraea.spark.rasterframes.{functions ⇒ F} import com.vividsolutions.jts.geom.Envelope -import geotrellis.raster.histogram.Histogram import geotrellis.raster.mapalgebra.local.LocalTileBinaryOp import geotrellis.raster.{CellType, Tile} import org.apache.spark.annotation.Experimental @@ -84,21 +84,25 @@ trait RasterFunctions { def cellType(col: Column): TypedColumn[Any, String] = expressions.CellTypeExpression(col.expr).asColumn.as[String] + /** Change the Tile's cell type */ + def convertCellType(col: Column, cellType: CellType): TypedColumn[Any, Tile] = + udf[Tile, Tile](F.convertCellType(cellType)).apply(col).as[Tile] + /** Assign a `NoData` value to the Tiles. */ def withNoData(col: Column, nodata: Double) = withAlias("withNoData", col)( udf[Tile, Tile](F.withNoData(nodata)).apply(col) ).as[Tile] /** Compute the full column aggregate floating point histogram. */ - def aggHistogram(col: Column): TypedColumn[Any, Histogram[Double]] = + def aggHistogram(col: Column): TypedColumn[Any, CellHistogram] = withAlias("histogram", col)( F.aggHistogram(col) - ).as[Histogram[Double]] + ).as[CellHistogram] /** Compute the full column aggregate floating point statistics. */ - def aggStats(col: Column): TypedColumn[Any, Statistics] = withAlias("aggStats", col)( + def aggStats(col: Column): TypedColumn[Any, CellStatistics] = withAlias("aggStats", col)( F.aggStats(col) - ).as[Statistics] + ).as[CellStatistics] /** Computes the column aggregate mean. */ def aggMean(col: Column) = CellMeanAggregateFunction(col.expr) @@ -140,16 +144,16 @@ trait RasterFunctions { ).as[Double] /** Compute TileHistogram of Tile values. */ - def tileHistogram(col: Column): TypedColumn[Any, Histogram[Double]] = + def tileHistogram(col: Column): TypedColumn[Any, CellHistogram] = withAlias("tileHistogram", col)( - udf[Histogram[Double], Tile](F.tileHistogram).apply(col) - ).as[Histogram[Double]] + udf[CellHistogram, Tile](F.tileHistogram).apply(col) + ).as[CellHistogram] /** Compute statistics of Tile values. */ - def tileStats(col: Column): TypedColumn[Any, Statistics] = + def tileStats(col: Column): TypedColumn[Any, CellStatistics] = withAlias("tileStats", col)( - udf[Statistics, Tile](F.tileStats).apply(col) - ).as[Statistics] + udf[CellStatistics, Tile](F.tileStats).apply(col) + ).as[CellStatistics] /** Counts the number of non-NoData cells per Tile. */ def dataCells(tile: Column): TypedColumn[Any, Long] = diff --git a/core/src/main/scala/astraea/spark/rasterframes/encoders/StandardEncoders.scala b/core/src/main/scala/astraea/spark/rasterframes/encoders/StandardEncoders.scala index 02a3b1649..95edddd0f 100644 --- a/core/src/main/scala/astraea/spark/rasterframes/encoders/StandardEncoders.scala +++ b/core/src/main/scala/astraea/spark/rasterframes/encoders/StandardEncoders.scala @@ -19,14 +19,13 @@ package astraea.spark.rasterframes.encoders -import astraea.spark.rasterframes.Statistics -import geotrellis.raster.histogram.Histogram -import geotrellis.raster.{MultibandTile, Tile} +import astraea.spark.rasterframes.stats.{CellHistogram, CellStatistics} +import geotrellis.raster.Tile import geotrellis.spark.tiling.LayoutDefinition import geotrellis.spark.{KeyBounds, SpaceTimeKey, SpatialKey, TemporalKey, TileLayerMetadata} import geotrellis.vector.Extent -import org.apache.spark.sql.Encoder import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.{Encoder, Encoders} import scala.reflect.runtime.universe._ @@ -34,21 +33,21 @@ import scala.reflect.runtime.universe._ * Implicit encoder definitions for RasterFrame types. */ trait StandardEncoders { + implicit val spatialKeyEncoder = ExpressionEncoder[SpatialKey] + implicit val temporalKeyEncoder = ExpressionEncoder[TemporalKey] + implicit val spaceTimeKeyEncoder = ExpressionEncoder[SpaceTimeKey] + implicit val statsEncoder = ExpressionEncoder[CellStatistics] + implicit val histEncoder = ExpressionEncoder[CellHistogram] + implicit val layoutDefinitionEncoder = ExpressionEncoder[LayoutDefinition] + implicit val stkBoundsEncoder = ExpressionEncoder[KeyBounds[SpaceTimeKey]] + implicit val extentEncoder = ExpressionEncoder[Extent] + implicit def singlebandTileEncoder = ExpressionEncoder[Tile]() - implicit def multibandTileEncoder = ExpressionEncoder[MultibandTile]() + implicit def tileLayerMetadataEncoder[K: TypeTag]: Encoder[TileLayerMetadata[K]] = TileLayerMetadataEncoder[K]() implicit val crsEncoder = CRSEncoder() - implicit val extentEncoder = ExpressionEncoder[Extent]() implicit val projectedExtentEncoder = ProjectedExtentEncoder() implicit val temporalProjectedExtentEncoder = TemporalProjectedExtentEncoder() - implicit def histogramDoubleEncoder = ExpressionEncoder[Histogram[Double]]() - implicit val statsEncoder = ExpressionEncoder[Statistics]() - implicit def tileLayerMetadataEncoder[K: TypeTag]: Encoder[TileLayerMetadata[K]] = TileLayerMetadataEncoder[K]() - implicit val layoutDefinitionEncoder = ExpressionEncoder[LayoutDefinition]() - implicit val stkBoundsEncoder = ExpressionEncoder[KeyBounds[SpaceTimeKey]]() implicit val cellTypeEncoder = CellTypeEncoder() - implicit val spatialKeyEncoder = ExpressionEncoder[SpatialKey]() - implicit val temporalKeyEncoder = ExpressionEncoder[TemporalKey]() - implicit val spaceTimeKeyEncoder = ExpressionEncoder[SpaceTimeKey]() implicit val uriEncoder = URIEncoder() implicit val envelopeEncoder = EnvelopeEncoder() } diff --git a/core/src/main/scala/astraea/spark/rasterframes/expressions/DimensionsExpression.scala b/core/src/main/scala/astraea/spark/rasterframes/expressions/DimensionsExpression.scala index 100e337e1..720e60759 100644 --- a/core/src/main/scala/astraea/spark/rasterframes/expressions/DimensionsExpression.scala +++ b/core/src/main/scala/astraea/spark/rasterframes/expressions/DimensionsExpression.scala @@ -32,6 +32,8 @@ import org.apache.spark.sql.types.{ShortType, StructField, StructType} case class DimensionsExpression(child: Expression) extends UnaryExpression with RequiresTile { override def toString: String = s"dimension($child)" + override def nodeName: String = "dimension" + def dataType = StructType(Seq( StructField("cols", ShortType), StructField("rows", ShortType) diff --git a/core/src/main/scala/astraea/spark/rasterframes/extensions/ContextRDDMethods.scala b/core/src/main/scala/astraea/spark/rasterframes/extensions/ContextRDDMethods.scala index 18d244814..ef0c901f8 100644 --- a/core/src/main/scala/astraea/spark/rasterframes/extensions/ContextRDDMethods.scala +++ b/core/src/main/scala/astraea/spark/rasterframes/extensions/ContextRDDMethods.scala @@ -16,38 +16,32 @@ package astraea.spark.rasterframes.extensions -import astraea.spark.rasterframes.RasterFrame -import astraea.spark.rasterframes.extensions.Implicits._ +import astraea.spark.rasterframes.PairRDDConverter._ import astraea.spark.rasterframes.StandardColumns._ -import geotrellis.raster.{Tile, TileFeature} +import astraea.spark.rasterframes.extensions.Implicits._ +import astraea.spark.rasterframes.util._ +import astraea.spark.rasterframes.{PairRDDConverter, RasterFrame} +import geotrellis.raster.{CellGrid, Tile} import geotrellis.spark._ import geotrellis.spark.io._ import geotrellis.util.MethodExtensions import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession -import spray.json.JsonFormat -import astraea.spark.rasterframes.util._ -import scala.reflect.ClassTag -import scala.reflect.runtime.universe._ - /** - * Extension method on `ContextRDD`-shaped [[Tile]] RDDs with appropriate context bounds to create a RasterFrame. + * Extension method on `ContextRDD`-shaped RDDs with appropriate context bounds to create a RasterFrame. * @since 7/18/17 */ -abstract class SpatialContextRDDMethods(implicit spark: SparkSession) - extends MethodExtensions[RDD[(SpatialKey, Tile)] with Metadata[TileLayerMetadata[SpatialKey]]] { - - def toRF: RasterFrame = toRF(TILE_COLUMN.columnName) +abstract class SpatialContextRDDMethods[T <: CellGrid](implicit spark: SparkSession) + extends MethodExtensions[RDD[(SpatialKey, T)] with Metadata[TileLayerMetadata[SpatialKey]]] { + import PairRDDConverter._ - def toRF(tileColumnName: String): RasterFrame = { - import spark.implicits._ + def toRF(implicit converter: PairRDDConverter[SpatialKey, T]): RasterFrame = toRF(TILE_COLUMN.columnName) - val rdd = self: RDD[(SpatialKey, Tile)] - val df = rdd - .toDF(SPATIAL_KEY_COLUMN.columnName, tileColumnName) - - df.setSpatialColumnRole(SPATIAL_KEY_COLUMN, self.metadata) + def toRF(tileColumnName: String)(implicit converter: PairRDDConverter[SpatialKey, T]): RasterFrame = { + val df = self.toDataFrame.setSpatialColumnRole(SPATIAL_KEY_COLUMN, self.metadata) + val defName = TILE_COLUMN.columnName + df.mapWhen(_ ⇒ tileColumnName != defName, _.withColumnRenamed(defName, tileColumnName)) .certify } } @@ -56,63 +50,18 @@ abstract class SpatialContextRDDMethods(implicit spark: SparkSession) * Extension method on `ContextRDD`-shaped [[Tile]] RDDs keyed with [[SpaceTimeKey]], with appropriate context bounds to create a RasterFrame. * @since 9/11/17 */ -abstract class SpatioTemporalContextRDDMethods(implicit spark: SparkSession) - extends MethodExtensions[RDD[(SpaceTimeKey, Tile)] with Metadata[TileLayerMetadata[SpaceTimeKey]]] { - - def toRF: RasterFrame = { - import spark.implicits._ - - val rdd = self: RDD[(SpaceTimeKey, Tile)] - val df = rdd - .map { case (k, v) ⇒ (k.spatialKey, k.temporalKey, v)} - .toDF(SPATIAL_KEY_COLUMN.columnName, TEMPORAL_KEY_COLUMN.columnName, TILE_COLUMN.columnName) - - df - .setSpatialColumnRole(SPATIAL_KEY_COLUMN, self.metadata) - .setTemporalColumnRole(TEMPORAL_KEY_COLUMN) - .certify - } -} - -/** - * Extension method on `ContextRDD`-shaped [[TileFeature]] RDDs with appropriate context bounds to create a RasterFrame. - * @since 7/18/17 - */ -abstract class TFContextRDDMethods[K: SpatialComponent: JsonFormat: ClassTag: TypeTag, D: TypeTag](implicit spark: SparkSession) - extends MethodExtensions[RDD[(K, TileFeature[Tile, D])] with Metadata[TileLayerMetadata[K]]] { - - def toRF: RasterFrame = { - import spark.implicits._ - val rdd = self: RDD[(K, TileFeature[Tile, D])] - - val df = rdd - .map { case (k, v) ⇒ (k, v.tile, v.data) } - .toDF(SPATIAL_KEY_COLUMN.columnName, TILE_COLUMN.columnName, TILE_FEATURE_DATA_COLUMN.columnName) - - df - .setSpatialColumnRole(SPATIAL_KEY_COLUMN, self.metadata) - .certify - } -} - -/** - * Extension method on `ContextRDD`-shaped [[TileFeature]] RDDs with appropriate context bounds to create a RasterFrame. - * @since 7/18/17 - */ -abstract class TFSTContextRDDMethods[D: TypeTag](implicit spark: SparkSession) - extends MethodExtensions[RDD[(SpaceTimeKey, TileFeature[Tile, D])] with Metadata[TileLayerMetadata[SpaceTimeKey]]] { - - def toRF: RasterFrame = { - import spark.implicits._ - val rdd = self: RDD[(SpaceTimeKey, TileFeature[Tile, D])] +abstract class SpatioTemporalContextRDDMethods[T <: CellGrid]( + implicit spark: SparkSession) + extends MethodExtensions[RDD[(SpaceTimeKey, T)] with Metadata[TileLayerMetadata[SpaceTimeKey]]] { - val df = rdd - .map { case (k, v) ⇒ (k.spatialKey, k.temporalKey, v.tile, v.data)} - .toDF(SPATIAL_KEY_COLUMN.columnName, TEMPORAL_KEY_COLUMN.columnName, TILE_COLUMN.columnName, TILE_FEATURE_DATA_COLUMN.columnName) + def toRF(implicit converter: PairRDDConverter[SpaceTimeKey, T]): RasterFrame = toRF(TILE_COLUMN.columnName) - df + def toRF(tileColumnName: String)(implicit converter: PairRDDConverter[SpaceTimeKey, T]): RasterFrame = { + val df = self.toDataFrame .setSpatialColumnRole(SPATIAL_KEY_COLUMN, self.metadata) .setTemporalColumnRole(TEMPORAL_KEY_COLUMN) + val defName = TILE_COLUMN.columnName + df.mapWhen(_ ⇒ tileColumnName != defName, _.withColumnRenamed(defName, tileColumnName)) .certify } } diff --git a/core/src/main/scala/astraea/spark/rasterframes/extensions/Implicits.scala b/core/src/main/scala/astraea/spark/rasterframes/extensions/Implicits.scala index 36e819d9c..b2aa42faf 100644 --- a/core/src/main/scala/astraea/spark/rasterframes/extensions/Implicits.scala +++ b/core/src/main/scala/astraea/spark/rasterframes/extensions/Implicits.scala @@ -19,13 +19,14 @@ package astraea.spark.rasterframes.extensions -import astraea.spark.rasterframes.RasterFrame -import geotrellis.raster.{ProjectedRaster, Tile, TileFeature} +import astraea.spark.rasterframes.{PairRDDConverter, RasterFrame} +import astraea.spark.rasterframes.util.{WithMergeMethods, WithPrototypeMethods} +import geotrellis.raster._ import geotrellis.spark.{Metadata, SpaceTimeKey, SpatialComponent, SpatialKey, TileLayerMetadata} import geotrellis.util.MethodExtensions import org.apache.spark.rdd.RDD import org.apache.spark.sql._ -import org.apache.spark.sql.types.{MetadataBuilder, Metadata => SMetadata} +import org.apache.spark.sql.types.{MetadataBuilder, Metadata ⇒ SMetadata} import spray.json.JsonFormat import scala.reflect.ClassTag @@ -41,32 +42,31 @@ trait Implicits { implicit class WithSQLContextMethods(val self: SQLContext) extends SQLContextMethods - implicit class WithProjectedRasterMethods(val self: ProjectedRaster[Tile]) - extends ProjectedRasterMethods + implicit class WithProjectedRasterMethods[T <: CellGrid: WithMergeMethods: WithPrototypeMethods: TypeTag]( + val self: ProjectedRaster[T]) extends ProjectedRasterMethods[T] implicit class WithDataFrameMethods[D <: DataFrame](val self: D) extends DataFrameMethods[D] implicit class WithRasterFrameMethods(val self: RasterFrame) extends RasterFrameMethods - implicit class WithSpatialContextRDDMethods( - val self: RDD[(SpatialKey, Tile)] with Metadata[TileLayerMetadata[SpatialKey]] - )(implicit spark: SparkSession) extends SpatialContextRDDMethods - - implicit class WithSpatioTemporalContextRDDMethods( - val self: RDD[(SpaceTimeKey, Tile)] with Metadata[TileLayerMetadata[SpaceTimeKey]] - )(implicit spark: SparkSession) extends SpatioTemporalContextRDDMethods - - implicit class WithTFContextRDDMethods[K: SpatialComponent: JsonFormat: ClassTag: TypeTag, - D: TypeTag]( - val self: RDD[(K, TileFeature[Tile, D])] with Metadata[TileLayerMetadata[K]] - )(implicit spark: SparkSession) - extends TFContextRDDMethods[K, D] - - implicit class WithTFSTContextRDDMethods[D: TypeTag]( - val self: RDD[(SpaceTimeKey, TileFeature[Tile, D])] with Metadata[ - TileLayerMetadata[SpaceTimeKey]] - )(implicit spark: SparkSession) - extends TFSTContextRDDMethods[D] + implicit class WithSpatialContextRDDMethods[T <: CellGrid]( + val self: RDD[(SpatialKey, T)] with Metadata[TileLayerMetadata[SpatialKey]] + )(implicit spark: SparkSession) extends SpatialContextRDDMethods[T] + + implicit class WithSpatioTemporalContextRDDMethods[T <: CellGrid]( + val self: RDD[(SpaceTimeKey, T)] with Metadata[TileLayerMetadata[SpaceTimeKey]] + )(implicit spark: SparkSession) extends SpatioTemporalContextRDDMethods[T] + +// implicit class WithTFContextRDDMethods[K: SpatialComponent: JsonFormat: TypeTag, D: TypeTag]( +// val self: RDD[(K, TileFeature[Tile, D])] with Metadata[TileLayerMetadata[K]] +// )(implicit spark: SparkSession) +// extends TFContextRDDMethods[K, D] +// +// implicit class WithTFSTContextRDDMethods[D: TypeTag]( +// val self: RDD[(SpaceTimeKey, TileFeature[Tile, D])] with Metadata[ +// TileLayerMetadata[SpaceTimeKey]] +// )(implicit spark: SparkSession) +// extends TFSTContextRDDMethods[D] private[astraea] implicit class WithMetadataMethods[R: JsonFormat](val self: R) extends MetadataMethods[R] diff --git a/core/src/main/scala/astraea/spark/rasterframes/extensions/ProjectedRasterMethods.scala b/core/src/main/scala/astraea/spark/rasterframes/extensions/ProjectedRasterMethods.scala index 23a825d8b..7b14d03fe 100644 --- a/core/src/main/scala/astraea/spark/rasterframes/extensions/ProjectedRasterMethods.scala +++ b/core/src/main/scala/astraea/spark/rasterframes/extensions/ProjectedRasterMethods.scala @@ -2,23 +2,26 @@ package astraea.spark.rasterframes.extensions import java.time.ZonedDateTime -import astraea.spark.rasterframes.{RasterFrame, StandardColumns} -import geotrellis.raster.{ProjectedRaster, Tile, TileLayout} +import astraea.spark.rasterframes.util._ +import astraea.spark.rasterframes.{PairRDDConverter, RasterFrame, StandardColumns} +import geotrellis.raster.{CellGrid, ProjectedRaster} import geotrellis.spark._ import geotrellis.spark.tiling._ -import geotrellis.spark.io._ import geotrellis.util.MethodExtensions +import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession -import astraea.spark.rasterframes.util._ + +import scala.reflect.runtime.universe._ /** * Extension methods on [[ProjectedRaster]] for creating [[RasterFrame]]s. * * @since 8/10/17 */ -trait ProjectedRasterMethods extends MethodExtensions[ProjectedRaster[Tile]] with StandardColumns { - import Implicits.WithSpatialContextRDDMethods - import Implicits.WithSpatioTemporalContextRDDMethods +abstract class ProjectedRasterMethods[T <: CellGrid: WithMergeMethods: WithPrototypeMethods: TypeTag] + extends MethodExtensions[ProjectedRaster[T]] with StandardColumns { + import Implicits.{WithSpatialContextRDDMethods, WithSpatioTemporalContextRDDMethods} + type XTileLayerRDD[K] = RDD[(K, T)] with Metadata[TileLayerMetadata[K]] /** * Convert the wrapped [[ProjectedRaster]] into a [[RasterFrame]] with a @@ -26,8 +29,7 @@ trait ProjectedRasterMethods extends MethodExtensions[ProjectedRaster[Tile]] wit * * @param spark [[SparkSession]] in which to create [[RasterFrame]] */ - def toRF(implicit spark: SparkSession): RasterFrame = toRF(TILE_COLUMN.columnName) - + def toRF(implicit spark: SparkSession, schema: PairRDDConverter[SpatialKey, T]): RasterFrame = toRF(TILE_COLUMN.columnName) /** * Convert the wrapped [[ProjectedRaster]] into a [[RasterFrame]] with a @@ -35,7 +37,8 @@ trait ProjectedRasterMethods extends MethodExtensions[ProjectedRaster[Tile]] wit * * @param spark [[SparkSession]] in which to create [[RasterFrame]] */ - def toRF(tileColName: String)(implicit spark: SparkSession): RasterFrame = { + def toRF(tileColName: String) + (implicit spark: SparkSession, schema: PairRDDConverter[SpatialKey, T]): RasterFrame = { val (cols, rows) = self.raster.dimensions toRF(cols, rows, tileColName) } @@ -45,11 +48,26 @@ trait ProjectedRasterMethods extends MethodExtensions[ProjectedRaster[Tile]] wit * given dimensions as the target per-row tile size. * * @param tileCols Max number of horizontal cells per tile - * @param tileRows Max number of vertical cells per tile. + * @param tileRows Max number of vertical cells per tile + * @param spark [[SparkSession]] in which to create [[RasterFrame]] + */ + def toRF(tileCols: Int, tileRows: Int) + (implicit spark: SparkSession, schema: PairRDDConverter[SpatialKey, T]): RasterFrame = + toRF(tileCols, tileRows, TILE_COLUMN.columnName) + + /** + * Convert the [[ProjectedRaster]] into a [[RasterFrame]] using the + * given dimensions as the target per-row tile size. + * + * @param tileCols Max number of horizontal cells per tile + * @param tileRows Max number of vertical cells per tile + * @param tileColName Name to give the created tile column * @param spark [[SparkSession]] in which to create [[RasterFrame]] */ - def toRF(tileCols: Int, tileRows: Int, tileColName: String = TILE_COLUMN.columnName)(implicit spark: SparkSession): RasterFrame = + def toRF(tileCols: Int, tileRows: Int, tileColName: String) + (implicit spark: SparkSession, schema: PairRDDConverter[SpatialKey, T]): RasterFrame = { toTileLayerRDD(tileCols, tileRows).toRF(tileColName) + } /** * Convert the [[ProjectedRaster]] into a [[RasterFrame]] using the @@ -60,7 +78,8 @@ trait ProjectedRasterMethods extends MethodExtensions[ProjectedRaster[Tile]] wit * @param timestamp Temporal key value to assign to tiles. * @param spark [[SparkSession]] in which to create [[RasterFrame]] */ - def toRF(tileCols: Int, tileRows: Int, timestamp: ZonedDateTime)(implicit spark: SparkSession): RasterFrame = + def toRF(tileCols: Int, tileRows: Int, timestamp: ZonedDateTime) + (implicit spark: SparkSession, schema: PairRDDConverter[SpaceTimeKey, T]): RasterFrame = toTileLayerRDD(tileCols, tileRows, timestamp).toRF /** @@ -72,13 +91,15 @@ trait ProjectedRasterMethods extends MethodExtensions[ProjectedRaster[Tile]] wit * @param spark [[SparkSession]] in which to create RDD */ def toTileLayerRDD(tileCols: Int, - tileRows: Int)(implicit spark: SparkSession): TileLayerRDD[SpatialKey] = { + tileRows: Int)(implicit spark: SparkSession): XTileLayerRDD[SpatialKey] = { val layout = LayoutDefinition(self.rasterExtent, tileCols, tileRows) val kb = KeyBounds(SpatialKey(0, 0), SpatialKey(layout.layoutCols - 1, layout.layoutRows - 1)) val tlm = TileLayerMetadata(self.tile.cellType, layout, self.extent, self.crs, kb) val rdd = spark.sparkContext.makeRDD(Seq((self.projectedExtent, self.tile))) + implicit val tct = typeTag[T].asClassTag + val tiled = rdd.tileToLayout(tlm) ContextRDD(tiled, tlm) @@ -93,13 +114,15 @@ trait ProjectedRasterMethods extends MethodExtensions[ProjectedRaster[Tile]] wit * @param timestamp Temporal key value to assign to tiles. * @param spark [[SparkSession]] in which to create RDD */ - def toTileLayerRDD(tileCols: Int, tileRows: Int, timestamp: ZonedDateTime)(implicit spark: SparkSession): TileLayerRDD[SpaceTimeKey] = { + def toTileLayerRDD(tileCols: Int, tileRows: Int, timestamp: ZonedDateTime)(implicit spark: SparkSession): XTileLayerRDD[SpaceTimeKey] = { val layout = LayoutDefinition(self.rasterExtent, tileCols, tileRows) val kb = KeyBounds(SpaceTimeKey(0, 0, timestamp), SpaceTimeKey(layout.layoutCols - 1, layout.layoutRows - 1, timestamp)) val tlm = TileLayerMetadata(self.tile.cellType, layout, self.extent, self.crs, kb) val rdd = spark.sparkContext.makeRDD(Seq((TemporalProjectedExtent(self.projectedExtent, timestamp), self.tile))) + implicit val tct = typeTag[T].asClassTag + val tiled = rdd.tileToLayout(tlm) ContextRDD(tiled, tlm) diff --git a/core/src/main/scala/astraea/spark/rasterframes/extensions/RasterFrameMethods.scala b/core/src/main/scala/astraea/spark/rasterframes/extensions/RasterFrameMethods.scala index 64fcacf78..e2d2746f5 100644 --- a/core/src/main/scala/astraea/spark/rasterframes/extensions/RasterFrameMethods.scala +++ b/core/src/main/scala/astraea/spark/rasterframes/extensions/RasterFrameMethods.scala @@ -21,15 +21,14 @@ import java.time.ZonedDateTime import astraea.spark.rasterframes.util._ import astraea.spark.rasterframes.{MetadataKeys, RasterFrame} import geotrellis.proj4.CRS -import geotrellis.raster.resample.{Bilinear, ResampleMethod} -import geotrellis.raster.{CellGrid, MultibandTile, ProjectedRaster, Tile, TileLayout} +import geotrellis.raster.resample.{NearestNeighbor, ResampleMethod} +import geotrellis.raster.{MultibandTile, ProjectedRaster, Tile, TileLayout} import geotrellis.spark._ import geotrellis.spark.io._ import geotrellis.spark.tiling.{LayoutDefinition, Tiler} import geotrellis.util.{LazyLogging, MethodExtensions} import geotrellis.vector.ProjectedExtent import org.apache.spark.annotation.Experimental -import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{Metadata, TimestampType} @@ -302,7 +301,7 @@ trait RasterFrameMethods extends MethodExtensions[RasterFrame] def toRaster(tileCol: Column, rasterCols: Int, rasterRows: Int, - resampler: ResampleMethod = Bilinear): ProjectedRaster[Tile] = { + resampler: ResampleMethod = NearestNeighbor): ProjectedRaster[Tile] = { val clipped = clipLayerExtent @@ -338,7 +337,7 @@ trait RasterFrameMethods extends MethodExtensions[RasterFrame] tileCols: Seq[Column], rasterCols: Int, rasterRows: Int, - resampler: ResampleMethod = Bilinear): ProjectedRaster[MultibandTile] = { + resampler: ResampleMethod = NearestNeighbor): ProjectedRaster[MultibandTile] = { val clipped = clipLayerExtent diff --git a/core/src/main/scala/astraea/spark/rasterframes/functions/CellMeanAggregateFunction.scala b/core/src/main/scala/astraea/spark/rasterframes/functions/CellMeanAggregateFunction.scala index fbca53995..5489ee167 100644 --- a/core/src/main/scala/astraea/spark/rasterframes/functions/CellMeanAggregateFunction.scala +++ b/core/src/main/scala/astraea/spark/rasterframes/functions/CellMeanAggregateFunction.scala @@ -59,7 +59,7 @@ case class CellMeanAggregateFunction(child: Expression) extends DeclarativeAggre count.left + count.right ) - val evaluateExpression = sum / Cast(count, DoubleType) + val evaluateExpression = sum / new Cast(count, DoubleType) def inputTypes = Seq(TileUDT) diff --git a/core/src/main/scala/astraea/spark/rasterframes/functions/CellStatsAggregateFunction.scala b/core/src/main/scala/astraea/spark/rasterframes/functions/CellStatsAggregateFunction.scala index f21e6146f..7ed353682 100644 --- a/core/src/main/scala/astraea/spark/rasterframes/functions/CellStatsAggregateFunction.scala +++ b/core/src/main/scala/astraea/spark/rasterframes/functions/CellStatsAggregateFunction.scala @@ -108,16 +108,6 @@ case class CellStatsAggregateFunction() extends UserDefinedAggregateFunction { } object CellStatsAggregateFunction { - case class Statistics(dataCells: Long, min: Double, max: Double, mean: Double, variance: Double) - object Statistics { - // Convert GeoTrellis stats object into our simplified one. - def apply(stats: geotrellis.raster.summary.Statistics[Double]) = - new Statistics(stats.dataCells, stats.zmin, stats.zmax, stats.mean, stats.stddev * stats.stddev) - - def apply(stats: geotrellis.raster.summary.Statistics[Int])(implicit d: DummyImplicit) = - new Statistics(stats.dataCells, stats.zmin.toDouble, stats.zmax.toDouble, stats.mean, stats.stddev * stats.stddev) - } - /** Column index values. */ private object C { val COUNT = 0 diff --git a/core/src/main/scala/astraea/spark/rasterframes/functions/HistogramAggregateFunction.scala b/core/src/main/scala/astraea/spark/rasterframes/functions/HistogramAggregateFunction.scala index a768be958..f85b2ae65 100644 --- a/core/src/main/scala/astraea/spark/rasterframes/functions/HistogramAggregateFunction.scala +++ b/core/src/main/scala/astraea/spark/rasterframes/functions/HistogramAggregateFunction.scala @@ -16,12 +16,15 @@ package astraea.spark.rasterframes.functions +import astraea.spark.rasterframes.encoders.StandardEncoders +import astraea.spark.rasterframes.stats.CellHistogram import geotrellis.raster.Tile import geotrellis.raster.histogram.{Histogram, StreamingHistogram} +import geotrellis.spark.util.KryoSerializer import org.apache.spark.sql.Row import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} -import org.apache.spark.sql.gt.types.{HistogramUDT, TileUDT} -import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.sql.gt.types.TileUDT +import org.apache.spark.sql.types._ /** * Histogram aggregation function for a full column of tiles. @@ -31,26 +34,43 @@ import org.apache.spark.sql.types.{DataType, StructField, StructType} case class HistogramAggregateFunction() extends UserDefinedAggregateFunction { override def inputSchema: StructType = StructType(StructField("value", TileUDT) :: Nil) - override def bufferSchema: StructType = StructType(StructField("buffer", HistogramUDT) :: Nil) + override def bufferSchema: StructType = StructType(StructField("buffer", BinaryType) :: Nil) - override def dataType: DataType = new HistogramUDT() + override def dataType: DataType = StandardEncoders.histEncoder.schema override def deterministic: Boolean = true + @inline + private def marshall(hist: Histogram[Double]): Array[Byte] = + KryoSerializer.serialize(hist) + + @inline + private def unmarshall(blob: Array[Byte]): Histogram[Double] = + KryoSerializer.deserialize(blob) + override def initialize(buffer: MutableAggregationBuffer): Unit = - buffer(0) = StreamingHistogram() + buffer(0) = marshall(StreamingHistogram()) override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { - val hist = buffer.getAs[Histogram[Double]](0) + val hist = unmarshall(buffer.getAs[Array[Byte]](0)) val tile = input.getAs[Tile](0) - buffer(0) = safeEval((h: Histogram[Double], t: Tile) ⇒ h.merge(StreamingHistogram.fromTile(t)))(hist, tile) + val updatedHist = safeEval((h: Histogram[Double], t: Tile) ⇒ h.merge(StreamingHistogram.fromTile(t)))(hist, tile) + buffer(0) = marshall(updatedHist) } override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { - val hist1 = buffer1.getAs[Histogram[Double]](0) - val hist2 = buffer2.getAs[Histogram[Double]](0) - buffer1(0) = safeEval((h1: Histogram[Double], h2: Histogram[Double]) ⇒ h1 merge h2)(hist1, hist2) + val hist1 = unmarshall(buffer1.getAs[Array[Byte]](0)) + val hist2 = unmarshall(buffer2.getAs[Array[Byte]](0)) + val updatedHist = safeEval((h1: Histogram[Double], h2: Histogram[Double]) ⇒ h1 merge h2)(hist1, hist2) + buffer1(0) = marshall(updatedHist) } - override def evaluate(buffer: Row): Any = buffer.getAs[Histogram[Double]](0) + override def evaluate(buffer: Row): Any = { + val hist = unmarshall(buffer.getAs[Array[Byte]](0)) + CellHistogram(hist) + } +} + +object HistogramAggregateFunction { + case class RFTileHistogram() } diff --git a/core/src/main/scala/astraea/spark/rasterframes/functions/package.scala b/core/src/main/scala/astraea/spark/rasterframes/functions/package.scala index a5a4f97de..d5eb514d4 100644 --- a/core/src/main/scala/astraea/spark/rasterframes/functions/package.scala +++ b/core/src/main/scala/astraea/spark/rasterframes/functions/package.scala @@ -15,7 +15,7 @@ */ package astraea.spark.rasterframes -import geotrellis.raster.histogram.Histogram +import astraea.spark.rasterframes.stats.{CellHistogram, CellStatistics} import geotrellis.raster.mapalgebra.local._ import geotrellis.raster._ import geotrellis.raster.render.ascii.AsciiArtEncoder @@ -30,7 +30,6 @@ import scala.reflect.runtime.universe._ * @since 9/7/17 */ package object functions { - import astraea.spark.rasterframes.functions.CellStatsAggregateFunction.Statistics @inline private[rasterframes] def safeBinaryOp[T <: AnyRef, R >: T](op: (T, T) ⇒ R): ((T, T) ⇒ R) = @@ -144,16 +143,19 @@ package object functions { /** Computes the column aggregate statistics */ private[rasterframes] val aggStats = CellStatsAggregateFunction() + /** Change the tile's cell type. */ + private[rasterframes] def convertCellType(cellType: CellType) = safeEval[Tile, Tile](_.convert(cellType)) + /** Set the tile's no-data value. */ private[rasterframes] def withNoData(nodata: Double) = safeEval[Tile, Tile](_.withNoData(Some(nodata))) /** Single tile histogram. */ - private[rasterframes] val tileHistogram = safeEval[Tile, Histogram[Double]](_.histogramDouble) + private[rasterframes] val tileHistogram = safeEval[Tile, CellHistogram](t ⇒ CellHistogram(t.histogramDouble)) /** Single tile statistics. Convenience for `tileHistogram.statistics`. */ - private[rasterframes] val tileStats = safeEval[Tile, Statistics]((t: Tile) ⇒ - if (t.cellType.isFloatingPoint) t.statisticsDouble.map(Statistics.apply).orNull - else t.statistics.map(Statistics.apply).orNull + private[rasterframes] val tileStats = safeEval[Tile, CellStatistics]((t: Tile) ⇒ + if (t.cellType.isFloatingPoint) t.statisticsDouble.map(CellStatistics.apply).orNull + else t.statistics.map(CellStatistics.apply).orNull ) /** Add up all the cell values. */ diff --git a/core/src/main/scala/astraea/spark/rasterframes/jts/Implicits.scala b/core/src/main/scala/astraea/spark/rasterframes/jts/Implicits.scala index d58996e9d..3362dc0d0 100644 --- a/core/src/main/scala/astraea/spark/rasterframes/jts/Implicits.scala +++ b/core/src/main/scala/astraea/spark/rasterframes/jts/Implicits.scala @@ -29,12 +29,13 @@ import geotrellis.vector.{Point ⇒ gtPoint} import org.apache.spark.sql.TypedColumn import org.apache.spark.sql.functions._ import org.apache.spark.sql.rf.CanBeColumn +import org.locationtech.geomesa.spark.jts.DataFrameFunctions.SpatialConstructors /** * Extension methods on typed columns allowing for DSL-like queries over JTS types. * @since 1/10/18 */ -trait Implicits extends SpatialFunctions { +trait Implicits extends SpatialConstructors { import astraea.spark.rasterframes.encoders.SparkDefaultEncoders._ implicit class ExtentColumnMethods[T <: Geometry](val self: TypedColumn[Any, T]) diff --git a/core/src/main/scala/astraea/spark/rasterframes/jts/SpatialFunctions.scala b/core/src/main/scala/astraea/spark/rasterframes/jts/SpatialFunctions.scala deleted file mode 100644 index c4fe7357a..000000000 --- a/core/src/main/scala/astraea/spark/rasterframes/jts/SpatialFunctions.scala +++ /dev/null @@ -1,36 +0,0 @@ -package astraea.spark.rasterframes.jts - -import com.vividsolutions.jts.geom._ -import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.jts._ -import org.apache.spark.sql.{Column, Encoder, TypedColumn} - -/** - * Spatial type support functions. - * - * @since 2/19/18 - */ -trait SpatialFunctions { - import astraea.spark.rasterframes.encoders.SparkDefaultEncoders._ - - /** Constructs a geometric literal from a value and JTS UDT */ - private def udtlit[T >: Null <: Geometry: Encoder, U <: AbstractGeometryUDT[T]](t: T, u: U): TypedColumn[Any, T] = - new Column(Literal.create(u.serialize(t), u)).as[T] - - /** Create a generic geometry literal, encoded as a GeometryUDT. */ - def geomLit(g: Geometry): TypedColumn[Any, Geometry] = udtlit(g, GeometryUDT) - /** Create a point literal, encoded as a PointUDT. */ - def pointLit(g: Point): TypedColumn[Any, Point] = udtlit(g, PointUDT) - /** Create a line literal, encoded as a LineUDT. */ - def lineLit(g: LineString): TypedColumn[Any, LineString] = udtlit(g, LineStringUDT) - /** Create a polygon literal, encoded as a PolygonUDT. */ - def polygonLit(g: Polygon): TypedColumn[Any, Polygon] = udtlit(g, PolygonUDT) - /** Create a multi-point literal, encoded as a MultiPointUDT. */ - def mPointLit(g: MultiPoint): TypedColumn[Any, MultiPoint] = udtlit(g, MultiPointUDT) - /** Create a multi-line literal, encoded as a MultiPointUDT. */ - def mLineLit(g: MultiLineString): TypedColumn[Any, MultiLineString] = udtlit(g, MultiLineStringUDT) - /** Create a multi-polygon literal, encoded as a MultiPolygonUDT. */ - def mPolygonLit(g: MultiPolygon): TypedColumn[Any, MultiPolygon] = udtlit(g, MultiPolygonUDT) - /** create a geometry collection literal, encoded as a GeometryCollectionUDT. */ - def geomCollLit(g: GeometryCollection): TypedColumn[Any, GeometryCollection] = udtlit(g, GeometryCollectionUDT) -} diff --git a/core/src/main/scala/astraea/spark/rasterframes/package.scala b/core/src/main/scala/astraea/spark/rasterframes/package.scala index 51e7f1f2b..332f8dec1 100644 --- a/core/src/main/scala/astraea/spark/rasterframes/package.scala +++ b/core/src/main/scala/astraea/spark/rasterframes/package.scala @@ -17,15 +17,18 @@ package astraea.spark import astraea.spark.rasterframes.encoders.StandardEncoders -import geotrellis.raster.{Tile, TileFeature} -import geotrellis.spark.{Bounds, ContextRDD, Metadata, TileLayerMetadata} -import geotrellis.util.GetComponent +import geotrellis.raster.{MultibandTile, Tile, TileFeature} +import geotrellis.spark.{ContextRDD, Metadata, SpaceTimeKey, SpatialKey, TileLayerMetadata} import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.locationtech.geomesa.spark.jts.DataFrameFunctions import org.locationtech.geomesa.spark.jts.encoders.SpatialEncoders import shapeless.tag.@@ +import scala.annotation.implicitNotFound +import scala.language.higherKinds +import scala.reflect.runtime.universe._ + /** * Module providing support for RasterFrames. * `import astraea.spark.rasterframes._`., and then call `rfInit(SQLContext)`. @@ -39,7 +42,6 @@ package object rasterframes extends StandardColumns with StandardEncoders with SpatialEncoders with DataFrameFunctions.Library { - type Statistics = astraea.spark.rasterframes.functions.CellStatsAggregateFunction.Statistics /** * Initialization injection point. Must be called before any RasterFrame @@ -65,14 +67,6 @@ package object rasterframes extends StandardColumns /** Tagged type for allowing compiler to help keep track of what has RasterFrame assurances applied to it. */ trait RasterFrameTag - /** - * Type lambda alias for components that have bounds with parameterized key. - * @tparam K bounds key type - */ - type BoundsComponentOf[K] = { - type get[M] = GetComponent[M, Bounds[K]] - } - type TileFeatureLayerRDD[K, D] = RDD[(K, TileFeature[Tile, D])] with Metadata[TileLayerMetadata[K]] @@ -82,6 +76,7 @@ package object rasterframes extends StandardColumns new ContextRDD(rdd, metadata) } + /** Provides evidence that a given primitive has an associated CellType. */ trait HasCellType[T] extends Serializable object HasCellType { implicit val intHasCellType = new HasCellType[Int] {} @@ -90,4 +85,20 @@ package object rasterframes extends StandardColumns implicit val shortHasCellType = new HasCellType[Short] {} implicit val floatHasCellType = new HasCellType[Float] {} } + + /** Evidence type class for communicating that only standard key types are supported with the more general GeoTrellis type parameters. */ + trait StandardLayerKey[T] extends Serializable { + val selfType: TypeTag[T] + def isType[R: TypeTag]: Boolean = typeOf[R] =:= selfType.tpe + } + object StandardLayerKey { + def apply[T: StandardLayerKey]: StandardLayerKey[T] = implicitly + implicit val spatialKeySupport = new StandardLayerKey[SpatialKey] { + override val selfType: TypeTag[SpatialKey] = implicitly + } + implicit val spatioTemporalKeySupport = new StandardLayerKey[SpaceTimeKey] { + override val selfType: TypeTag[SpaceTimeKey] = implicitly + } + } + } diff --git a/core/src/main/scala/astraea/spark/rasterframes/stats/CellHistogram.scala b/core/src/main/scala/astraea/spark/rasterframes/stats/CellHistogram.scala new file mode 100644 index 000000000..71cb5cc2b --- /dev/null +++ b/core/src/main/scala/astraea/spark/rasterframes/stats/CellHistogram.scala @@ -0,0 +1,45 @@ +/* + * This software is licensed under the Apache 2 license, quoted below. + * + * Copyright 2018 Astraea. Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * [http://www.apache.org/licenses/LICENSE-2.0] + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + * + */ + +package astraea.spark.rasterframes.stats +import astraea.spark.rasterframes.stats.CellHistogram.Bin +import geotrellis.raster.histogram.{Histogram ⇒ GTHistogram} + +/** + * Container for histogram computed + * + * @since 4/3/18 + */ +case class CellHistogram(stats: CellStatistics, bins: Seq[Bin]) { + def mean = stats.mean + def totalCount = stats.dataCells +} + +object CellHistogram { + case class Bin(value: Double, count: Long) + def apply(hist: GTHistogram[Int]): CellHistogram = { + val stats = CellStatistics(hist.statistics().get) + CellHistogram(stats, hist.binCounts().map(p ⇒ Bin(p._1.toDouble, p._2))) + } + def apply(hist: GTHistogram[Double])(implicit ev: DummyImplicit): CellHistogram = { + val stats = CellStatistics(hist.statistics().get) + CellHistogram(stats, hist.binCounts().map(p ⇒ Bin(p._1, p._2))) + } +} diff --git a/core/src/main/scala/astraea/spark/rasterframes/stats/CellStatistics.scala b/core/src/main/scala/astraea/spark/rasterframes/stats/CellStatistics.scala new file mode 100644 index 000000000..567a69a78 --- /dev/null +++ b/core/src/main/scala/astraea/spark/rasterframes/stats/CellStatistics.scala @@ -0,0 +1,38 @@ +/* + * This software is licensed under the Apache 2 license, quoted below. + * + * Copyright 2018 Astraea. Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * [http://www.apache.org/licenses/LICENSE-2.0] + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + * + */ + +package astraea.spark.rasterframes.stats + +/** + * Container for computed statistics over cells. + * + * @since 4/3/18 + */ +case class CellStatistics(dataCells: Long, min: Double, max: Double, mean: Double, variance: Double) { + def stddev: Double = math.sqrt(variance) +} +object CellStatistics { + // Convert GeoTrellis stats object into our simplified one. + def apply(stats: geotrellis.raster.summary.Statistics[Double]) = + new CellStatistics(stats.dataCells, stats.zmin, stats.zmax, stats.mean, stats.stddev * stats.stddev) + + def apply(stats: geotrellis.raster.summary.Statistics[Int])(implicit d: DummyImplicit) = + new CellStatistics(stats.dataCells, stats.zmin.toDouble, stats.zmax.toDouble, stats.mean, stats.stddev * stats.stddev) +} \ No newline at end of file diff --git a/core/src/main/scala/astraea/spark/rasterframes/util/SubdivideSupport.scala b/core/src/main/scala/astraea/spark/rasterframes/util/SubdivideSupport.scala new file mode 100644 index 000000000..c94b425d4 --- /dev/null +++ b/core/src/main/scala/astraea/spark/rasterframes/util/SubdivideSupport.scala @@ -0,0 +1,118 @@ +/* + * This software is licensed under the Apache 2 license, quoted below. + * + * Copyright 2018 Astraea. Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * [http://www.apache.org/licenses/LICENSE-2.0] + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + * + */ + +package astraea.spark.rasterframes.util + +import geotrellis.raster.crop.Crop +import geotrellis.raster.{CellGrid, TileLayout} +import geotrellis.spark.{Bounds, KeyBounds, SpatialComponent, SpatialKey, TileLayerMetadata} +import geotrellis.util._ + +/** + * + * + * @since 4/5/18 + */ +trait SubdivideSupport { + implicit class TileLayoutHasSubdivide(self: TileLayout) { + def subdivide(divs: Int): TileLayout = { + def shrink(num: Int) = { + require(num % divs == 0, s"Subdivision of '$divs' does not evenly divide into dimension '$num'") + num / divs + } + def grow(num: Int) = num * divs + + divs match { + case 0 ⇒ self + case i if i < 0 ⇒ throw new IllegalArgumentException(s"divs=$divs must be positive") + case _ ⇒ + TileLayout( + layoutCols = grow(self.layoutCols), + layoutRows = grow(self.layoutRows), + tileCols = shrink(self.tileCols), + tileRows = shrink(self.tileRows) + ) + } + } + } + + implicit class BoundsHasSubdivide[K: SpatialComponent](self: Bounds[K]) { + def subdivide(divs: Int): Bounds[K] = { + self.map(kb ⇒ { + val currGrid = kb.toGridBounds() + // NB: As with GT regrid, we keep the spatial key origin (0, 0) at the same map coordinate + val newGrid = currGrid.copy( + colMin = currGrid.colMin * divs, + rowMin = currGrid.rowMin * divs, + colMax = currGrid.colMin * divs + (currGrid.width - 1) * divs + 1, + rowMax = currGrid.rowMin * divs + (currGrid.height - 1) * divs + 1 + ) + kb.setSpatialBounds(KeyBounds(newGrid)) + }) + } + } + + /** + * Note: this enrichment makes the assumption that the new keys will be used in the + * layout regime defined by `TileLayoutHasSubdivide` + */ + implicit class SpatialKeyHasSubdivide[K: SpatialComponent](self: K) { + def subdivide(divs: Int): Seq[K] = { + val base = self.getComponent[SpatialKey] + val shifted = SpatialKey(base.col * divs, base.row * divs) + + for{ + i ← 0 until divs + j ← 0 until divs + } yield { + val newKey = SpatialKey(shifted.col + j, shifted.row + i) + self.setComponent(newKey) + } + } + } + + implicit class TileLayerMetadataHasSubdivide[K: SpatialComponent](tlm: TileLayerMetadata[K]) { + def subdivide(divs: Int): TileLayerMetadata[K] = { + val tileLayout = tlm.layout.tileLayout.subdivide(divs) + val layout = tlm.layout.copy(tileLayout = tileLayout) + val bounds = tlm.bounds.subdivide(divs) + tlm.copy(layout = layout, bounds = bounds) + } + } + + implicit class TileHasSubdivide[T <: CellGrid: WithCropMethods](self: T) { + def subdivide(divs: Int): Seq[T] = { + val (cols, rows) = self.dimensions + val (newCols, newRows) = (cols/divs, rows/divs) + for { + i ← 0 until divs + j ← 0 until divs + } yield { + val startCol = j * newCols + val startRow = i * newRows + val endCol = startCol + newCols - 1 + val endRow = startRow + newRows - 1 + self.crop(startCol, startRow, endCol, endRow, Crop.Options(force = true)) + } + } + } +} + +object SubdivideSupport extends SubdivideSupport diff --git a/core/src/main/scala/astraea/spark/rasterframes/util/debug/package.scala b/core/src/main/scala/astraea/spark/rasterframes/util/debug/package.scala new file mode 100644 index 000000000..4a0df0bdb --- /dev/null +++ b/core/src/main/scala/astraea/spark/rasterframes/util/debug/package.scala @@ -0,0 +1,48 @@ +/* + * This software is licensed under the Apache 2 license, quoted below. + * + * Copyright 2018 Astraea. Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * [http://www.apache.org/licenses/LICENSE-2.0] + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + * + */ + +package astraea.spark.rasterframes.util + +import java.net.URI + +import astraea.spark.rasterframes._ +import geotrellis.proj4.WebMercator +import geotrellis.raster.MultibandTile +import geotrellis.raster.io.geotiff.tags.codes.ColorSpace +import geotrellis.raster.io.geotiff.{GeoTiffOptions, MultibandGeoTiff} +import geotrellis.spark.{ContextRDD, MultibandTileLayerRDD, SpatialKey} +import geotrellis.spark.io.slippy.HadoopSlippyTileWriter +import geotrellis.spark.tiling.ZoomedLayoutScheme +import org.apache.spark.annotation.Experimental + +/** + * Additional debugging routines. No guarantees these are or will remain stable. + * + * @since 4/6/18 + */ +package object debug { + implicit class RasterFrameWithDebug(val self: RasterFrame) { + + /** Renders the whole schema with metadata as a JSON string. */ + def describeFullSchema: String = { + self.schema.prettyJson + } + } +} diff --git a/core/src/main/scala/astraea/spark/rasterframes/util/package.scala b/core/src/main/scala/astraea/spark/rasterframes/util/package.scala index bde570644..f80c122ca 100644 --- a/core/src/main/scala/astraea/spark/rasterframes/util/package.scala +++ b/core/src/main/scala/astraea/spark/rasterframes/util/package.scala @@ -19,8 +19,15 @@ package astraea.spark.rasterframes +import geotrellis.raster.CellGrid +import geotrellis.raster.crop.TileCropMethods import geotrellis.raster.mapalgebra.local.LocalTileBinaryOp -import geotrellis.util.LazyLogging +import geotrellis.raster.mask.TileMaskMethods +import geotrellis.raster.merge.TileMergeMethods +import geotrellis.raster.prototype.TilePrototypeMethods +import geotrellis.spark.Bounds +import geotrellis.spark.tiling.TilerKeyMethods +import geotrellis.util.{GetComponent, LazyLogging} import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -29,7 +36,6 @@ import org.apache.spark.sql.rf._ import org.apache.spark.sql.{Column, DataFrame, SQLContext} import shapeless.Lub - /** * Internal utilities. * @@ -37,6 +43,29 @@ import shapeless.Lub */ package object util extends LazyLogging { + import reflect.ClassTag + import reflect.runtime.universe._ + + implicit class TypeTagCanBeClassTag[T](val t: TypeTag[T]) extends AnyVal { + def asClassTag: ClassTag[T] = ClassTag[T](t.mirror.runtimeClass(t.tpe)) + } + + /** + * Type lambda alias for components that have bounds with parameterized key. + * @tparam K bounds key type + */ + type BoundsComponentOf[K] = { + type Get[M] = GetComponent[M, Bounds[K]] + } + + // Type lambda aliases + type WithMergeMethods[V] = (V ⇒ TileMergeMethods[V]) + type WithPrototypeMethods[V <: CellGrid] = (V ⇒ TilePrototypeMethods[V]) + type WithCropMethods[V <: CellGrid] = (V ⇒ TileCropMethods[V]) + type WithMaskMethods[V] = (V ⇒ TileMaskMethods[V]) + + type KeyMethodsProvider[K1, K2] = K1 ⇒ TilerKeyMethods[K1, K2] + /** Internal method for slapping the RasterFrame seal of approval on a DataFrame. */ private[rasterframes] def certifyRasterframe(df: DataFrame): RasterFrame = shapeless.tag[RasterFrameTag][DataFrame](df) @@ -55,9 +84,7 @@ package object util extends LazyLogging { op.getClass.getSimpleName.replace("$", "").toLowerCase - // $COVERAGE-OFF$ implicit class WithWiden[A, B](thing: Either[A, B]) { - /** Returns the value as a LUB of the Left & Right items. */ def widen[Out](implicit ev: Lub[A, B, Out]): Out = thing.fold(identity, identity).asInstanceOf[Out] @@ -106,5 +133,4 @@ package object util extends LazyLogging { logger.error("Extended rule resolution not available in this version of Spark") analyzer(sqlContext).extendedResolutionRules } - // $COVERAGE-ON$ } diff --git a/core/src/main/scala/org/apache/spark/sql/gt/package.scala b/core/src/main/scala/org/apache/spark/sql/gt/package.scala index 5f856aaea..9e469552b 100644 --- a/core/src/main/scala/org/apache/spark/sql/gt/package.scala +++ b/core/src/main/scala/org/apache/spark/sql/gt/package.scala @@ -16,7 +16,7 @@ package org.apache.spark.sql -import org.apache.spark.sql.gt.types.{HistogramUDT, TileUDT} +import org.apache.spark.sql.gt.types.TileUDT /** * Module of GeoTrellis UDTs for Spark/Catalyst. @@ -28,6 +28,5 @@ package object gt { // Referencing the companion objects here is intended to have it's constructor called, // which is where the registration actually happens. TileUDT - HistogramUDT } } diff --git a/core/src/main/scala/org/apache/spark/sql/gt/types/HistogramUDT.scala b/core/src/main/scala/org/apache/spark/sql/gt/types/HistogramUDT.scala deleted file mode 100644 index feca59e6a..000000000 --- a/core/src/main/scala/org/apache/spark/sql/gt/types/HistogramUDT.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright 2017 Astraea, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.gt.types - -import geotrellis.raster.histogram.Histogram -import org.apache.spark.sql.rf.KryoBackedUDT -import org.apache.spark.sql.types._ - -/** - * Wraps up GT Histogram type. - * - * @since 4/18/17 - */ -class HistogramUDT extends UserDefinedType[Histogram[Double]] with KryoBackedUDT[Histogram[Double]] { - - override val typeName = "gt_histogram" - - override val targetClassTag = scala.reflect.classTag[Histogram[Double]] - - private[sql] override def acceptsType(dataType: DataType) = dataType match { - case o: HistogramUDT ⇒ o.typeName == this.typeName - case _ ⇒ super.acceptsType(dataType) - } -} - -object HistogramUDT extends HistogramUDT { - UDTRegistration.register(classOf[Histogram[Double]].getName, classOf[HistogramUDT].getName) -} diff --git a/core/src/main/scala/org/apache/spark/sql/rf/package.scala b/core/src/main/scala/org/apache/spark/sql/rf/package.scala index 532c723c5..9fbe0d651 100644 --- a/core/src/main/scala/org/apache/spark/sql/rf/package.scala +++ b/core/src/main/scala/org/apache/spark/sql/rf/package.scala @@ -16,11 +16,13 @@ package org.apache.spark.sql +import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry, MultiAlias} import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.{CreateArray, Expression, Inline} import org.apache.spark.sql.types.{StructType, UDTRegistration, UserDefinedType} + import scala.reflect.runtime.universe._ /** @@ -45,7 +47,15 @@ package object rf { sqlContext.sessionState.analyzer } - // $COVERAGE-OFF$ + /** Lookup the registered Catalyst UDT for the given Scala type. */ + def udtOf[T >: Null: TypeTag]: UserDefinedType[T] = + UDTRegistration.getUDTFor(typeTag[T].tpe.toString).map(_.newInstance().asInstanceOf[UserDefinedType[T]]) + .getOrElse(throw new IllegalArgumentException(typeTag[T].tpe + " doesn't have a corresponding UDT")) + + /** Creates a Catalyst expression for flattening the fields in a struct into columns. */ + def projectStructExpression(dataType: StructType, input: Expression) = + MultiAlias(Inline(CreateArray(Seq(input))), dataType.fields.map(_.name)) + implicit class WithDecoder[T](enc: ExpressionEncoder[T]) { def decode(row: InternalRow): T = enc.resolveAndBind(enc.schema.toAttributes).fromRow(row) @@ -63,13 +73,4 @@ package object rf { } } - /** Lookup the registered Catalyst UDT for the given Scala type. */ - def udtOf[T >: Null: TypeTag]: UserDefinedType[T] = - UDTRegistration.getUDTFor(typeTag[T].tpe.toString).map(_.newInstance().asInstanceOf[UserDefinedType[T]]) - .getOrElse(throw new IllegalArgumentException(typeTag[T].tpe + " doesn't have a corresponding UDT")) - - /** Creates a Catalyst expression for flattening the fields in a struct into columns. */ - def projectStructExpression(dataType: StructType, input: Expression) = - MultiAlias(Inline(CreateArray(Seq(input))), dataType.fields.map(_.name)) - // $COVERAGE-ON$ } diff --git a/core/src/test/resources/NAIP-VA-b1.tiff b/core/src/test/resources/NAIP-VA-b1.tiff index 65140ed09..f1e4338a2 100644 Binary files a/core/src/test/resources/NAIP-VA-b1.tiff and b/core/src/test/resources/NAIP-VA-b1.tiff differ diff --git a/core/src/test/resources/NAIP-VA-b2.tiff b/core/src/test/resources/NAIP-VA-b2.tiff index 3e46fd32f..664d21f18 100644 Binary files a/core/src/test/resources/NAIP-VA-b2.tiff and b/core/src/test/resources/NAIP-VA-b2.tiff differ diff --git a/core/src/test/resources/NAIP-VA-b3.tiff b/core/src/test/resources/NAIP-VA-b3.tiff index ad107c38c..fc049a40d 100644 Binary files a/core/src/test/resources/NAIP-VA-b3.tiff and b/core/src/test/resources/NAIP-VA-b3.tiff differ diff --git a/core/src/test/resources/NAIP-VA-b4.tiff b/core/src/test/resources/NAIP-VA-b4.tiff index d9d9eaeb0..84e6c6432 100644 Binary files a/core/src/test/resources/NAIP-VA-b4.tiff and b/core/src/test/resources/NAIP-VA-b4.tiff differ diff --git a/core/src/test/scala/Scratch.sc b/core/src/test/scala/Scratch.sc index 13c8bfada..53bbd1457 100644 --- a/core/src/test/scala/Scratch.sc +++ b/core/src/test/scala/Scratch.sc @@ -1,16 +1,3 @@ -import com.vividsolutions.jts.geom._ -import org.geotools.geometry.jts.JTSFactoryFinder +val s = Seq(1,2,3,4,5) -val f = JTSFactoryFinder.getGeometryFactory - -val pt = f.createPoint(new Coordinate(1, 2)) - -val poly = f.createPolygon(Array(new Coordinate(0, 0), new Coordinate(0, 3), new Coordinate(3, 3), new Coordinate(3, 0), new Coordinate(0, 0))) - - -poly.intersects(pt) -pt.intersects(poly) -pt.intersects(pt) - - -Double.NaN.toInt +s.patch(1, Seq(9), 1) \ No newline at end of file diff --git a/core/src/test/scala/astraea/spark/rasterframes/ExtensionMethodSpec.scala b/core/src/test/scala/astraea/spark/rasterframes/ExtensionMethodSpec.scala index 1834f1fa2..aabfb9498 100644 --- a/core/src/test/scala/astraea/spark/rasterframes/ExtensionMethodSpec.scala +++ b/core/src/test/scala/astraea/spark/rasterframes/ExtensionMethodSpec.scala @@ -19,6 +19,12 @@ */ package astraea.spark.rasterframes +import astraea.spark.rasterframes.util.SubdivideSupport._ +import geotrellis.proj4.LatLng +import geotrellis.raster.{ByteCellType, GridBounds, TileLayout} +import geotrellis.spark.{KeyBounds, SpatialKey, TileLayerMetadata} +import geotrellis.spark.tiling.LayoutDefinition +import geotrellis.spark.tiling.CRSWorldExtent /** * Tests miscellaneous extension methods. @@ -47,4 +53,52 @@ class ExtensionMethodSpec extends TestEnvironment with TestData { "val Some(col) = rf.spatialKeyColumn" shouldNot compile } } + describe("Miscellaneous extensions") { + it("should split TileLayout") { + val tl1 = TileLayout(2, 3, 10, 10) + assert(tl1.subdivide(0) === tl1) + assert(tl1.subdivide(1) === tl1) + assert(tl1.subdivide(2) === TileLayout(4, 6, 5, 5)) + assertThrows[IllegalArgumentException](tl1.subdivide(-1)) + + } + it("should split KeyBounds[SpatialKey]") { + val grid = GridBounds(0, 0, 9, 9) + val kb = KeyBounds(grid) + val kb2 = kb.subdivide(2) + assert(kb2.get.toGridBounds() === GridBounds(0, 0, 19, 19)) + + val grid2 = GridBounds(2, 2, 9, 9) + val kb3 = KeyBounds(grid2) + val kb4 = kb3.subdivide(2) + assert(kb4.get.toGridBounds() === GridBounds(4, 4, 19, 19)) + } + + it("should split key") { + val s1 = SpatialKey(0, 0).subdivide(2) + assert(s1 === Seq(SpatialKey(0,0), SpatialKey(1,0), SpatialKey(0,1), SpatialKey(1,1))) + + val s2 = SpatialKey(2, 3).subdivide(3) + assert(s2 === Seq(SpatialKey(6,9), SpatialKey(7,9), SpatialKey(8,9), SpatialKey(6,10), SpatialKey(7,10), SpatialKey(8,10), SpatialKey(6,11), SpatialKey(7,11), SpatialKey(8,11))) + } + + it("should split TileLayerMetadata[SpatialKey]") { + val tileSize = 12 + val dataGridSize = 2 + val grid = GridBounds(2, 4, 10, 11) + val layout = LayoutDefinition(LatLng.worldExtent, TileLayout(dataGridSize, dataGridSize, tileSize, tileSize)) + val tlm = TileLayerMetadata(ByteCellType, layout, LatLng.worldExtent, LatLng, KeyBounds(grid)) + + val divided = tlm.subdivide(2) + + assert(divided.tileLayout.tileDimensions === (tileSize/2, tileSize/2)) + + println(tlm) + + println(tlm.subdivide(2)) + println(tlm.tileLayout.tileDimensions) + println(tlm.subdivide(2).tileLayout.tileDimensions) + } + + } } diff --git a/core/src/test/scala/astraea/spark/rasterframes/GTSQLSpec.scala b/core/src/test/scala/astraea/spark/rasterframes/GTSQLSpec.scala index 07f0d251f..3ceb0ac34 100644 --- a/core/src/test/scala/astraea/spark/rasterframes/GTSQLSpec.scala +++ b/core/src/test/scala/astraea/spark/rasterframes/GTSQLSpec.scala @@ -132,7 +132,7 @@ class GTSQLSpec extends TestEnvironment with TestData { val datasets = Seq( { - val tiles = Array.fill[Tile](30)(randomTile(5, 5, "float32")) + val tiles = Array.fill[Tile](30)(randomTile(5, 5, FloatCellType)) tiles(1) = null tiles(11) = null tiles(29) = null diff --git a/core/src/test/scala/astraea/spark/rasterframes/RasterFrameSpec.scala b/core/src/test/scala/astraea/spark/rasterframes/RasterFrameSpec.scala index 18fd4b761..653af750e 100644 --- a/core/src/test/scala/astraea/spark/rasterframes/RasterFrameSpec.scala +++ b/core/src/test/scala/astraea/spark/rasterframes/RasterFrameSpec.scala @@ -5,16 +5,15 @@ package astraea.spark.rasterframes import java.sql.Timestamp import java.time.ZonedDateTime +import astraea.spark.rasterframes.util._ import geotrellis.proj4.LatLng -import geotrellis.raster.render.{ColorMap, ColorRamp, ColorRamps} -import geotrellis.raster.{IntCellType, ProjectedRaster, Tile, TileFeature, TileLayout} +import geotrellis.raster.render.{ColorMap, ColorRamp} +import geotrellis.raster.{ProjectedRaster, Tile, TileFeature, TileLayout, UByteCellType} import geotrellis.spark._ -import geotrellis.spark.io._ import geotrellis.spark.tiling._ import geotrellis.vector.{Extent, ProjectedExtent} -import org.apache.spark.sql.{SQLContext, SparkSession} import org.apache.spark.sql.functions._ -import astraea.spark.rasterframes.util._ +import org.apache.spark.sql.{SQLContext, SparkSession} import scala.util.control.NonFatal @@ -75,6 +74,10 @@ class RasterFrameSpec extends TestEnvironment with MetadataKeys ) assert(rf.count() === 4) + + val cols = tileLayerRDD.toRF("foo").columns + assert(!cols.contains("tile")) + assert(cols.contains("foo")) } it("should implicitly convert from spatiotemporal layer type") { @@ -94,10 +97,13 @@ class RasterFrameSpec extends TestEnvironment with MetadataKeys println(rf.schema.prettyJson) throw ex } + val cols = tileLayerRDD.toRF("foo").columns + assert(!cols.contains("tile")) + assert(cols.contains("foo")) } it("should implicitly convert layer of TileFeature") { - val tile = TileFeature(randomTile(20, 20, "uint8"), (1, "b", 3.0)) + val tile = TileFeature(randomTile(20, 20, UByteCellType), (1, "b", 3.0)) val tileLayout = TileLayout(1, 1, 20, 20) @@ -116,7 +122,7 @@ class RasterFrameSpec extends TestEnvironment with MetadataKeys } it("should implicitly convert spatiotemporal layer of TileFeature") { - val tile = TileFeature(randomTile(20, 20, "uint8"), (1, "b", 3.0)) + val tile = TileFeature(randomTile(20, 20, UByteCellType), (1, "b", 3.0)) val tileLayout = TileLayout(1, 1, 20, 20) @@ -154,7 +160,6 @@ class RasterFrameSpec extends TestEnvironment with MetadataKeys assert(wt.columns.contains(TEMPORAL_KEY_COLUMN.columnName)) val joined = wt.spatialJoin(wt, "outer") - joined.printSchema // Should be both left and right column names. assert(joined.columns.count(_.contains(TEMPORAL_KEY_COLUMN.columnName)) === 2) @@ -168,14 +173,13 @@ class RasterFrameSpec extends TestEnvironment with MetadataKeys val right = left.withColumnRenamed(left.tileColumns.head.columnName, "rightTile") .asRF - val joined = left.spatialJoin(right, "inner") - joined.printSchema + val joined = left.spatialJoin(right) + // since right is a copy of left, should not drop any rows with inner join + assert(joined.count === left.count) // Should use left's key column names assert(joined.spatialKeyColumn.columnName === left.spatialKeyColumn.columnName) assert(joined.temporalKeyColumn.map(_.columnName) === left.temporalKeyColumn.map(_.columnName)) - // since right is a copy of left, should not drop any rows with inner join - assert(joined.count === left.count) } @@ -206,22 +210,22 @@ class RasterFrameSpec extends TestEnvironment with MetadataKeys assert(bounds._2 === SpaceTimeKey(3, 1, now)) } - it("should clip TileLayerMetadata extent") { - val tiled = sampleTileLayerRDD - - val rf = tiled.toRF - - val worldish = Extent(-179, -89, 179, 89) - val areaish = Extent(-90, 30, -81, 39) - - val orig = tiled.metadata.extent - assert(orig.contains(worldish)) - assert(orig.contains(areaish)) - - val clipped = rf.clipLayerExtent.tileLayerMetadata.widen.extent - assert(!clipped.contains(worldish)) - assert(clipped.contains(areaish)) - } +// it("should clip TileLayerMetadata extent") { +// val tiled = sampleTileLayerRDD +// +// val rf = tiled.reproject(LatLng, tiled.metadata.layout)._2.toRF +// +// val worldish = Extent(-179, -89, 179, 89) +// val areaish = Extent(-90, 30, -81, 40) +// +// val orig = rf.tileLayerMetadata.widen.extent +// assert(worldish.contains(orig)) +// assert(areaish.contains(orig)) +// +// val clipped = rf.clipLayerExtent.tileLayerMetadata.widen.extent +// assert(!clipped.contains(worldish)) +// assert(clipped.contains(areaish)) +// } def basicallySame(expected: Extent, computed: Extent): Unit = { val components = Seq( @@ -306,7 +310,6 @@ class RasterFrameSpec extends TestEnvironment with MetadataKeys val green = TestData.naipSample(2).projectedRaster.toRF.withRFColumnRenamed("tile", "green") val blue = TestData.naipSample(3).projectedRaster.toRF.withRFColumnRenamed("tile", "blue") val joined = blue.spatialJoin(green).spatialJoin(red) - joined.printSchema noException shouldBe thrownBy { val raster = joined.toMultibandRaster(Seq($"red", $"green", $"blue"), 256, 256) diff --git a/core/src/test/scala/astraea/spark/rasterframes/TestData.scala b/core/src/test/scala/astraea/spark/rasterframes/TestData.scala index ade97090b..175691a10 100644 --- a/core/src/test/scala/astraea/spark/rasterframes/TestData.scala +++ b/core/src/test/scala/astraea/spark/rasterframes/TestData.scala @@ -108,12 +108,8 @@ trait TestData { } def sampleTileLayerRDD(implicit spark: SparkSession): TileLayerRDD[SpatialKey] = { - val raster = sampleGeoTiff.projectedRaster.reproject(LatLng) - val layout = LayoutDefinition(LatLng.worldExtent, TileLayout(36, 18, 128, 128)) - val kb = KeyBounds(SpatialKey(0, 0), SpatialKey(layout.layoutCols, layout.layoutRows)) - val tlm = TileLayerMetadata(raster.tile.cellType, layout, layout.extent, LatLng, kb) - val rdd = spark.sparkContext.makeRDD(Seq((raster.projectedExtent, raster.tile))) - ContextRDD(rdd.tileToLayout(tlm), tlm) + val rf = sampleGeoTiff.projectedRaster.toRF(128, 128) + rf.toTileLayerRDD(rf.tileColumns.head).left.get } object JTS { @@ -136,48 +132,59 @@ object TestData extends TestData { val rnd = new scala.util.Random(42) /** Construct a tile of given size and cell type populated with random values. */ - def randomTile(cols: Int, rows: Int, cellTypeName: String): Tile = { - val cellType = CellType.fromName(cellTypeName) - val tile = ArrayTile.alloc(cellType, cols, rows) - - def possibleND(c: Int) = - c == NODATA || c == byteNODATA || c == ubyteNODATA || c == shortNODATA || c == ushortNODATA - + def randomTile(cols: Int, rows: Int, cellType: CellType): Tile = { // Initialize tile with some initial random values - var result = tile.dualMap(_ ⇒ rnd.nextInt())(_ ⇒ rnd.nextGaussian()) - - // Due to cell width narrowing and custom NoData values, we can end up randomly creating - // NoData values. While perhaps inefficient, the safest way to ensure a tile with no-NoData values - // with the current CellType API (GT 1.1), while still generating random data is to - // iteratively pass through all the cells and replace NoData values as we find them. - do { - result = result.dualMap( - z ⇒ if (isNoData(z)) rnd.nextInt() else z - ) ( - z ⇒ if (isNoData(z)) rnd.nextGaussian() else z - ) - } while (F.noDataCells(result) != 0L) + val base: Tile = cellType match { + case _: FloatCells ⇒ + val data = Array.fill(cols * rows)(rnd.nextGaussian().toFloat) + ArrayTile(data, cols, rows) + case _: DoubleCells ⇒ + val data = Array.fill(cols * rows)(rnd.nextGaussian().toFloat) + ArrayTile(data, cols, rows) + case _ ⇒ + val words = cellType.bits / 8 + val bytes = Array.ofDim[Byte](cols * rows * words) + rnd.nextBytes(bytes) + ArrayTile.fromBytes(bytes, cellType, cols, rows) + } - assert(F.noDataCells(result) == 0L, - s"Should not have any NoData cells for $cellTypeName:\n${result.asciiDraw()}") - result + cellType match { + case _: NoNoData ⇒ base + case _ ⇒ + // Due to cell width narrowing and custom NoData values, we can end up randomly creating + // NoData values. While perhaps inefficient, the safest way to ensure a tile with no-NoData values + // with the current CellType API (GT 1.1), while still generating random data is to + // iteratively pass through all the cells and replace NoData values as we find them. + var result = base + do { + result = result.dualMap( + z ⇒ if (isNoData(z)) rnd.nextInt(1 << cellType.bits) else z + ) ( + z ⇒ if (isNoData(z)) rnd.nextGaussian() else z + ) + } while (F.noDataCells(result) != 0L) + + assert(F.noDataCells(result) == 0L, + s"Should not have any NoData cells for $cellType:\n${result.asciiDraw()}") + result + } } /** Create a series of random tiles. */ val makeTiles: (Int) ⇒ Array[Tile] = (count) ⇒ - Array.fill(count)(randomTile(4, 4, "int8raw")) + Array.fill(count)(randomTile(4, 4, UByteCellType)) def randomSpatialTileLayerRDD( rasterCols: Int, rasterRows: Int, layoutCols: Int, layoutRows: Int)(implicit sc: SparkContext): TileLayerRDD[SpatialKey] = { - val tile = randomTile(rasterCols, rasterRows, "uint8") + val tile = randomTile(rasterCols, rasterRows, UByteCellType) TileLayerRDDBuilders.createTileLayerRDD(tile, layoutCols, layoutRows, LatLng)._2 } def randomSpatioTemporalTileLayerRDD( rasterCols: Int, rasterRows: Int, layoutCols: Int, layoutRows: Int)(implicit sc: SparkContext): TileLayerRDD[SpaceTimeKey] = { - val tile = randomTile(rasterCols, rasterRows, "uint8") + val tile = randomTile(rasterCols, rasterRows, UByteCellType) val tileLayout = TileLayout(layoutCols, layoutRows, rasterCols/layoutCols, rasterRows/layoutRows) TileLayerRDDBuilders.createSpaceTimeTileLayerRDD(Seq((tile, ZonedDateTime.now())), tileLayout, tile.cellType) } diff --git a/core/src/test/scala/astraea/spark/rasterframes/TileStatsSpec.scala b/core/src/test/scala/astraea/spark/rasterframes/TileStatsSpec.scala index 70cc581ee..ad3c7038c 100644 --- a/core/src/test/scala/astraea/spark/rasterframes/TileStatsSpec.scala +++ b/core/src/test/scala/astraea/spark/rasterframes/TileStatsSpec.scala @@ -20,8 +20,8 @@ package astraea.spark.rasterframes import astraea.spark.rasterframes.TestData.randomTile +import astraea.spark.rasterframes.stats.CellHistogram import geotrellis.raster._ -import geotrellis.raster.histogram.Histogram import geotrellis.raster.mapalgebra.local.{Max, Min} import org.apache.spark.sql.functions._ @@ -84,7 +84,7 @@ class TileStatsSpec extends TestEnvironment with TestData { } it("should count data and no-data cells") { - val ds = (Seq.fill[Tile](10)(injectND(10)(randomTile(10, 10, "uint8"))) :+ null).toDF("tile") + val ds = (Seq.fill[Tile](10)(injectND(10)(randomTile(10, 10, UByteConstantNoDataCellType))) :+ null).toDF("tile") val expectedNoData = 10 * 10 val expectedData = 10 * 10 * 10 - expectedNoData @@ -103,8 +103,7 @@ class TileStatsSpec extends TestEnvironment with TestData { } it("should compute tile statistics") { - val ds = (Seq.fill[Tile](3)(randomTile(5, 5, "float32")) :+ null).toDS() - ds.printSchema() + val ds = (Seq.fill[Tile](3)(randomTile(5, 5, FloatConstantNoDataCellType)) :+ null).toDS() val means1 = ds.select(tileStats($"value")).map(s ⇒ Option(s).map(_.mean).getOrElse(0.0)).collect val means2 = ds.select(tileMean($"value")).collect.map(m ⇒ if (m.isNaN) 0.0 else m) // Compute the mean manually, knowing we're not dealing with no-data values. @@ -115,49 +114,51 @@ class TileStatsSpec extends TestEnvironment with TestData { } it("should compute per-tile histogram") { - val ds = (Seq.fill[Tile](3)(randomTile(5, 5, "float32")) :+ null).toDF("tiles") + val ds = Seq.fill[Tile](3)(randomTile(5, 5, FloatCellType)).toDF("tiles") ds.createOrReplaceTempView("tmp") - val r1 = ds.select(tileHistogram($"tiles").as[Histogram[Double]]) - assert(r1.first.totalCount() === 5 * 5) + val r1 = ds.select(tileHistogram($"tiles")) + assert(r1.first.totalCount === 5 * 5) write(r1) - val r2 = sql("select rf_tileHistogram(tiles) from tmp") + val r2 = sql("select hist.* from (select rf_tileHistogram(tiles) as hist from tmp)").as[CellHistogram] write(r2) - - assert(r1.first.mean === r2.as[Histogram[Double]].first.mean) + assert(r1.first.mean === r2.first.mean) } it("should compute aggregate histogram") { - val ds = Seq.fill[Tile](10)(randomTile(5, 5, "float32")).toDF("tiles") + val ds = Seq.fill[Tile](10)(randomTile(5, 5, FloatConstantNoDataCellType)).toDF("tiles") ds.createOrReplaceTempView("tmp") - val agg = ds.select(aggHistogram($"tiles")).as[Histogram[Double]] + val agg = ds.select(aggHistogram($"tiles")).as[CellHistogram] val hist = agg.collect() assert(hist.length === 1) - val stats = agg.map(_.statistics().get).as("stats") + val stats = agg.map(_.stats).as("stats") //stats.select("stats.*").show(false) assert(stats.first().stddev === 1.0 +- 0.3) // <-- playing with statistical fire :) - val hist2 = sql("select rf_aggHistogram(tiles) as hist from tmp").as[Histogram[Double]] + val hist2 = sql("select hist.* from (select rf_aggHistogram(tiles) as hist from tmp)").as[CellHistogram] - assert(hist2.first.totalCount() === 250) + assert(hist2.first.totalCount === 250) } it("should compute aggregate mean") { - val ds = (Seq.fill[Tile](10)(randomTile(5, 5, "float32")) :+ null).toDF("tiles") + val ds = (Seq.fill[Tile](10)(randomTile(5, 5, FloatCellType)) :+ null).toDF("tiles") val agg = ds.select(aggMean($"tiles")) val stats = ds.select(aggStats($"tiles") as "stats").select($"stats.mean".as[Double]) assert(agg.first() === stats.first()) } it("should compute aggregate statistics") { - val ds = Seq.fill[Tile](10)(randomTile(5, 5, "float32")).toDF("tiles") + val ds = Seq.fill[Tile](10)(randomTile(5, 5, FloatConstantNoDataCellType)).toDF("tiles") val exploded = ds.select(explodeTiles($"tiles")) val (mean, vrnc) = exploded.agg(avg($"tiles"), var_pop($"tiles")).as[(Double, Double)].first val stats = ds.select(aggStats($"tiles") as "stats")///.as[(Long, Double, Double, Double, Double)] - ds.select(aggStats($"tiles")).show(false) + + noException shouldBe thrownBy { + ds.select(aggStats($"tiles")).collect() + } val agg = stats.select($"stats.variance".as[Double]) @@ -174,7 +175,7 @@ class TileStatsSpec extends TestEnvironment with TestData { it("should compute aggregate local stats") { val ave = (nums: Array[Double]) ⇒ nums.sum / nums.length - val ds = (Seq.fill[Tile](30)(randomTile(5, 5, "float32")) + val ds = (Seq.fill[Tile](30)(randomTile(5, 5, FloatConstantNoDataCellType)) .map(injectND(2)) :+ null).toDF("tiles") ds.createOrReplaceTempView("tmp") @@ -238,7 +239,7 @@ class TileStatsSpec extends TestEnvironment with TestData { val tsize = 5 val count = 20 val nds = 2 - val tiles = (Seq.fill[Tile](count)(randomTile(tsize, tsize, "uint8ud255")) + val tiles = (Seq.fill[Tile](count)(randomTile(tsize, tsize, UByteUserDefinedNoDataCellType(255.toByte))) .map(injectND(nds)) :+ null).toDF("tiles") val counts = tiles.select(noDataCells($"tiles")).collect().dropRight(1) diff --git a/core/src/test/scala/astraea/spark/rasterframes/TileUDTSpec.scala b/core/src/test/scala/astraea/spark/rasterframes/TileUDTSpec.scala index 39ede1700..0001c4509 100644 --- a/core/src/test/scala/astraea/spark/rasterframes/TileUDTSpec.scala +++ b/core/src/test/scala/astraea/spark/rasterframes/TileUDTSpec.scala @@ -21,7 +21,7 @@ package astraea.spark.rasterframes -import geotrellis.raster.Tile +import geotrellis.raster.{CellType, Tile} import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.gt.types.TileUDT import org.apache.spark.sql.rf.InternalRowTile @@ -46,7 +46,7 @@ class TileUDTSpec extends TestEnvironment with TestData with Inspectors with Int def forEveryConfig(test: (Tile) ⇒ Unit): Unit = { forEvery(tileSizes.combinations(2).toSeq) { case Seq(cols, rows) ⇒ forEvery(ct) { c ⇒ - val tile = randomTile(cols, rows, c) + val tile = randomTile(cols, rows, CellType.fromName(c)) test(tile) } } diff --git a/datasource/src/main/scala/astraea/spark/rasterframes/datasource/geotiff/DefaultSource.scala b/datasource/src/main/scala/astraea/spark/rasterframes/datasource/geotiff/DefaultSource.scala index 31b75f4bc..2aeed5335 100644 --- a/datasource/src/main/scala/astraea/spark/rasterframes/datasource/geotiff/DefaultSource.scala +++ b/datasource/src/main/scala/astraea/spark/rasterframes/datasource/geotiff/DefaultSource.scala @@ -19,25 +19,63 @@ package astraea.spark.rasterframes.datasource.geotiff -import java.net.URI - -import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{DataSourceRegister, RelationProvider} +import _root_.geotrellis.raster.io.geotiff.GeoTiff import astraea.spark.rasterframes._ +import astraea.spark.rasterframes.datasource._ +import astraea.spark.rasterframes.util._ +import com.typesafe.scalalogging.LazyLogging +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} /** - * + * Spark SQL data source over GeoTIFF files. * @since 1/14/18 */ -@Experimental -class DefaultSource extends DataSourceRegister with RelationProvider { - def shortName() = "geotiff" +class DefaultSource extends DataSourceRegister with RelationProvider with CreatableRelationProvider with LazyLogging { + def shortName() = DefaultSource.SHORT_NAME + + def path(parameters: Map[String, String]) = + uriParam(DefaultSource.PATH_PARAM, parameters) def createRelation(sqlContext: SQLContext, parameters: Map[String, String]) = { - require(parameters.contains("path"), "'path' parameter required.") - val uri: URI = URI.create(parameters("path")) + val pathO = path(parameters) + require(pathO.isDefined, "Valid URI 'path' parameter required.") + sqlContext.withRasterFrames + GeoTiffRelation(sqlContext, pathO.get) + } + + override def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation = { + val pathO = path(parameters) + require(pathO.isDefined, "Valid URI 'path' parameter required.") + require(pathO.get.getScheme == "file" || pathO.get.getScheme == null, "Currently only 'file://' destinations are supported") sqlContext.withRasterFrames - GeoTiffRelation(sqlContext, uri) + + + require(data.isRF, "GeoTIFF can only be constructed from a RasterFrame") + val rf = data.certify + + val tl = rf.tileLayerMetadata.widen.layout.tileLayout + + val cols = numParam(DefaultSource.IMAGE_WIDTH_PARAM, parameters).getOrElse(tl.totalCols) + val rows = numParam(DefaultSource.IMAGE_HEIGHT_PARAM, parameters).getOrElse(tl.totalRows) + + require(cols <= Int.MaxValue && rows <= Int.MaxValue, s"Can't construct a GeoTIFF of size $cols x $rows. (Too big!)") + + // Should we really play traffic cop here? + if(cols.toDouble * rows * 64.0 > Runtime.getRuntime.totalMemory() * 0.5) + logger.warn(s"You've asked for the construction of a very large image ($cols x $rows), destined for ${pathO.get}. Out of memory error likely.") + + println() + val raster = rf.toMultibandRaster(rf.tileColumns, cols.toInt, rows.toInt) + + GeoTiff(raster).write(pathO.get.getPath) + GeoTiffRelation(sqlContext, pathO.get) } } + +object DefaultSource { + final val SHORT_NAME = "geotiff" + final val PATH_PARAM = "path" + final val IMAGE_WIDTH_PARAM = "imageWidth" + final val IMAGE_HEIGHT_PARAM = "imageWidth" +} diff --git a/datasource/src/main/scala/astraea/spark/rasterframes/datasource/geotiff/GeoTiffRelation.scala b/datasource/src/main/scala/astraea/spark/rasterframes/datasource/geotiff/GeoTiffRelation.scala index 1d14b7ec8..9f54cccbc 100644 --- a/datasource/src/main/scala/astraea/spark/rasterframes/datasource/geotiff/GeoTiffRelation.scala +++ b/datasource/src/main/scala/astraea/spark/rasterframes/datasource/geotiff/GeoTiffRelation.scala @@ -22,13 +22,14 @@ package astraea.spark.rasterframes.datasource.geotiff import java.net.URI import astraea.spark.rasterframes._ +import astraea.spark.rasterframes.util._ +import geotrellis.raster.TileLayout import geotrellis.raster.io.geotiff.reader.GeoTiffReader import geotrellis.spark._ import geotrellis.spark.io._ import geotrellis.spark.io.hadoop._ import geotrellis.spark.tiling.LayoutDefinition import geotrellis.util._ -import geotrellis.vector.Extent import org.apache.hadoop.fs.Path import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder @@ -36,9 +37,6 @@ import org.apache.spark.sql.gt.types.TileUDT import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.sql.{Row, SQLContext} -import astraea.spark.rasterframes.util._ -import geotrellis.raster.TileLayout -import geotrellis.raster.io.geotiff.MultibandGeoTiff /** * Spark SQL data source over a single GeoTiff file. Works best with CoG compliant ones. diff --git a/datasource/src/main/scala/astraea/spark/rasterframes/datasource/geotiff/package.scala b/datasource/src/main/scala/astraea/spark/rasterframes/datasource/geotiff/package.scala index 8e46f4641..c47a6d742 100644 --- a/datasource/src/main/scala/astraea/spark/rasterframes/datasource/geotiff/package.scala +++ b/datasource/src/main/scala/astraea/spark/rasterframes/datasource/geotiff/package.scala @@ -22,25 +22,34 @@ package astraea.spark.rasterframes.datasource import java.net.URI import astraea.spark.rasterframes._ -import org.apache.spark.sql.DataFrameReader +import org.apache.spark.sql.{DataFrameReader, DataFrameWriter} import shapeless.tag import shapeless.tag.@@ /** + * Extension methods enabled by this module. * * @since 1/16/18 */ package object geotiff { - /** Tagged type construction for enabling type-safe extension methods for loading * a RasterFrame in expected form. */ type GeoTiffRasterFrameReader = DataFrameReader @@ GeoTiffRasterFrameReaderTag trait GeoTiffRasterFrameReaderTag + /** Tagged type construction for enabling type-safe extension methods for writing + * a RasterFrame to a GeoTIFF. */ + type GeoTiffRasterFrameWriter[T] = DataFrameWriter[T] @@ GeoTiffRasterFrameWriterTag + trait GeoTiffRasterFrameWriterTag /** Adds `geotiff` format specifier to `DataFrameReader`. */ implicit class DataFrameReaderHasGeoTiffFormat(val reader: DataFrameReader) { def geotiff: GeoTiffRasterFrameReader = - tag[GeoTiffRasterFrameReaderTag][DataFrameReader](reader.format("geotiff")) + tag[GeoTiffRasterFrameReaderTag][DataFrameReader](reader.format(DefaultSource.SHORT_NAME)) + } + + implicit class DataFrameWriterHasGeoTiffFormat[T](val writer: DataFrameWriter[T]) { + def geotiff: GeoTiffRasterFrameWriter[T] = + tag[GeoTiffRasterFrameWriterTag][DataFrameWriter[T]](writer.format(DefaultSource.SHORT_NAME)) } /** Adds `loadRF` to appropriately tagged `DataFrameReader` */ diff --git a/datasource/src/main/scala/astraea/spark/rasterframes/datasource/geotrellis/DefaultSource.scala b/datasource/src/main/scala/astraea/spark/rasterframes/datasource/geotrellis/DefaultSource.scala index 92137ecb3..47d589621 100644 --- a/datasource/src/main/scala/astraea/spark/rasterframes/datasource/geotrellis/DefaultSource.scala +++ b/datasource/src/main/scala/astraea/spark/rasterframes/datasource/geotrellis/DefaultSource.scala @@ -23,6 +23,7 @@ import java.net.URI import astraea.spark.rasterframes.rules.registerOptimization import astraea.spark.rasterframes._ +import astraea.spark.rasterframes.datasource.geotrellis.DefaultSource._ import geotrellis.spark._ import geotrellis.spark.io._ import geotrellis.spark.io.index.ZCurveKeyIndexMethod @@ -38,7 +39,7 @@ import scala.util.Try */ @Experimental class DefaultSource extends DataSourceRegister with RelationProvider with CreatableRelationProvider { - def shortName(): String = "geotrellis" + def shortName(): String = DefaultSource.SHORT_NAME /** * Create a GeoTrellis data source. @@ -48,30 +49,33 @@ class DefaultSource extends DataSourceRegister with RelationProvider with Creata * `layer`-layer name (e.g. "LC08_L1GT"); * `zoom`-positive integer zoom level (e.g. "8"); * `numPartitions`-(optional) integer specifying initial number of partitions; + * `tileSubdivisions`-(optional) positive integer defining how many division horizontally and vertically should be applied to a tile; * `failOnUnrecognizedFilter`-(optional) if true, predicate push-down filters not translated into GeoTrellis query syntax are fatal. */ def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { require(parameters.contains("path"), "'path' parameter required.") - require(parameters.contains("layer"), "'layer' parameter for raster layer name required.") - require(parameters.contains("zoom"), "'zoom' parameter for raster layer zoom level required.") + require(parameters.contains(LAYER_PARAM), "'layer' parameter for raster layer name required.") + require(parameters.contains(ZOOM_PARAM), "'zoom' parameter for raster layer zoom level required.") sqlContext.withRasterFrames registerOptimization(sqlContext, SpatialFilterPushdownRules) val uri: URI = URI.create(parameters("path")) - val layerId: LayerId = LayerId(parameters("layer"), parameters("zoom").toInt) - val numPartitions = parameters.get("numPartitions").map(_.toInt) + val layerId: LayerId = LayerId(parameters(LAYER_PARAM), parameters(ZOOM_PARAM).toInt) + val numPartitions = parameters.get(NUM_PARTITIONS_PARAM).map(_.toInt) + val tileSubdivisions = parameters.get(TILE_SUBDIVISIONS_PARAM).map(_.toInt) + tileSubdivisions.foreach(s ⇒ require(s >= 0, TILE_SUBDIVISIONS_PARAM + " must be a postive integer")) val failOnUnrecognizedFilter = parameters.get("failOnUnrecognizedFilter").exists(_.toBoolean) - GeoTrellisRelation(sqlContext, uri, layerId, numPartitions, failOnUnrecognizedFilter) + GeoTrellisRelation(sqlContext, uri, layerId, numPartitions, failOnUnrecognizedFilter, tileSubdivisions) } /** Write relation. */ def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation = { - val zoom = parameters.get("zoom").flatMap(p ⇒ Try(p.toInt).toOption) + val zoom = parameters.get(ZOOM_PARAM).flatMap(p ⇒ Try(p.toInt).toOption) val path = parameters.get("path").flatMap(p ⇒ Try(new URI(p)).toOption) - val layerName = parameters.get("layer") + val layerName = parameters.get(LAYER_PARAM) require(path.isDefined, "Valid URI 'path' parameter required.") require(layerName.isDefined, "'layer' parameter for raster layer name required.") @@ -107,3 +111,11 @@ class DefaultSource extends DataSourceRegister with RelationProvider with Creata createRelation(sqlContext, parameters) } } + +object DefaultSource { + final val SHORT_NAME = "geotrellis" + final val TILE_SUBDIVISIONS_PARAM = "tileSubdivisions" + final val NUM_PARTITIONS_PARAM = "numPartitions" + final val LAYER_PARAM = "layer" + final val ZOOM_PARAM = "zoom" +} diff --git a/datasource/src/main/scala/astraea/spark/rasterframes/datasource/geotrellis/GeoTrellisRelation.scala b/datasource/src/main/scala/astraea/spark/rasterframes/datasource/geotrellis/GeoTrellisRelation.scala index 95599f3c1..ec0ae40d4 100644 --- a/datasource/src/main/scala/astraea/spark/rasterframes/datasource/geotrellis/GeoTrellisRelation.scala +++ b/datasource/src/main/scala/astraea/spark/rasterframes/datasource/geotrellis/GeoTrellisRelation.scala @@ -29,13 +29,14 @@ import astraea.spark.rasterframes.jts.SpatialFilters.{BetweenTimes, Contains ⇒ import astraea.spark.rasterframes.datasource.geotrellis.GeoTrellisRelation.TileFeatureData import astraea.spark.rasterframes.util._ import com.vividsolutions.jts.geom -import geotrellis.raster.{MultibandTile, Tile, TileFeature} +import geotrellis.raster.{CellGrid, MultibandTile, Tile, TileFeature} import geotrellis.spark.io._ import geotrellis.spark.io.avro.AvroRecordCodec import geotrellis.spark.util.KryoWrapper import geotrellis.spark.{LayerId, Metadata, SpatialKey, TileLayerMetadata, _} import geotrellis.util.LazyLogging import geotrellis.vector._ +import geotrellis.util._ import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.apache.spark.rdd.RDD @@ -46,6 +47,9 @@ import org.apache.spark.sql.types._ import org.apache.spark.sql.{Row, SQLContext, sources} import spray.json.DefaultJsonProtocol._ import spray.json.JsValue +import TileFeatureSupport._ +import geotrellis.spark.tiling.{CutTiles, TilerKeyMethods} +import SubdivideSupport._ import scala.reflect.ClassTag import scala.reflect.runtime.universe._ @@ -54,12 +58,13 @@ import scala.reflect.runtime.universe._ * A Spark SQL `Relation` over a standard GeoTrellis layer. */ case class GeoTrellisRelation(sqlContext: SQLContext, - uri: URI, - layerId: LayerId, - numPartitions: Option[Int] = None, - failOnUnrecognizedFilter: Boolean = false, - filters: Seq[Filter] = Seq.empty) - extends BaseRelation with PrunedScan with LazyLogging { + uri: URI, + layerId: LayerId, + numPartitions: Option[Int] = None, + failOnUnrecognizedFilter: Boolean = false, + tileSubdivisions: Option[Int] = None, + filters: Seq[Filter] = Seq.empty) + extends BaseRelation with PrunedScan with LazyLogging { implicit val sc = sqlContext.sparkContext @@ -112,6 +117,15 @@ case class GeoTrellisRelation(sqlContext: SQLContext, ) } + def subdividedTileLayerMetadata: Either[TileLayerMetadata[SpatialKey], TileLayerMetadata[SpaceTimeKey]] = { + tileSubdivisions.filter(_ > 1) match { + case None ⇒ tileLayerMetadata + case Some(divs) ⇒ tileLayerMetadata + .right.map(_.subdivide(divs)) + .left.map(_.subdivide(divs)) + } + } + private object Cols { lazy val SK = SPATIAL_KEY_COLUMN.columnName lazy val TK = TEMPORAL_KEY_COLUMN.columnName @@ -121,7 +135,7 @@ case class GeoTrellisRelation(sqlContext: SQLContext, lazy val EX = BOUNDS_COLUMN.columnName } - /** This unfortunate routine is here because the number bands in a multiband layer isn't written + /** This unfortunate routine is here because the number bands in a multiband layer isn't written * in the metadata anywhere. This is potentially an expensive hack, which needs further quantifying of impact. * Another option is to force the user to specify the number of bands. */ private lazy val peekBandCount = { @@ -143,8 +157,8 @@ case class GeoTrellisRelation(sqlContext: SQLContext, override def schema: StructType = { val skSchema = ExpressionEncoder[SpatialKey]().schema - val skMetadata = attributes.readMetadata[JsValue](layerId) |> - (m ⇒ Metadata.fromJson(m.compactPrint)) |> + val skMetadata = subdividedTileLayerMetadata. + fold(_.asColumnMetadata, _.asColumnMetadata) |> (Metadata.empty.append.attachContext(_).tagSpatialKey.build) val keyFields = keyType match { @@ -244,11 +258,16 @@ case class GeoTrellisRelation(sqlContext: SQLContext, } } - def query[T: AvroRecordCodec: ClassTag](reader: FilteringLayerReader[LayerId], columnIndexes: Seq[Int]) = { - tileLayerMetadata.fold( + private def subdivider[K: SpatialComponent, T <: CellGrid: WithCropMethods](divs: Int) = (p: (K, T)) ⇒ { + val newKeys = p._1.subdivide(divs) + val newTiles = p._2.subdivide(divs) + newKeys.zip(newTiles) + } + + private def query[T <: CellGrid: WithCropMethods: WithMergeMethods: AvroRecordCodec: ClassTag](reader: FilteringLayerReader[LayerId], columnIndexes: Seq[Int]) = { + subdividedTileLayerMetadata.fold( // Without temporal key case (tlm: TileLayerMetadata[SpatialKey]) ⇒ { - val trans = tlm.mapTransform val parts = numPartitions.getOrElse(reader.defaultNumPartitions) @@ -256,11 +275,15 @@ case class GeoTrellisRelation(sqlContext: SQLContext, reader.query[SpatialKey, T, TileLayerMetadata[SpatialKey]](layerId, parts) )(applyFilter(_, _)) - val rdd = query.result + val rdd = tileSubdivisions.filter(_ > 1) match { + case Some(divs) ⇒ + query.result.flatMap(subdivider[SpatialKey, T](divs)) + case None ⇒ query.result + } + val trans = tlm.mapTransform rdd .map { case (sk: SpatialKey, tile: T) ⇒ - val entries = columnIndexes.map { case 0 ⇒ sk case 1 ⇒ trans.keyToExtent(sk).jtsGeom @@ -286,7 +309,11 @@ case class GeoTrellisRelation(sqlContext: SQLContext, reader.query[SpaceTimeKey, T, TileLayerMetadata[SpaceTimeKey]](layerId, parts) )(applyFilterTemporal(_, _)) - val rdd = query.result + val rdd = tileSubdivisions.filter(_ > 1) match { + case Some(divs) ⇒ + query.result.flatMap(subdivider[SpaceTimeKey, T](divs)) + case None ⇒ query.result + } rdd .map { case (stk: SpaceTimeKey, tile: T) ⇒ diff --git a/datasource/src/main/scala/astraea/spark/rasterframes/datasource/geotrellis/TileFeatureSupport.scala b/datasource/src/main/scala/astraea/spark/rasterframes/datasource/geotrellis/TileFeatureSupport.scala index a34dbe139..caca1707e 100644 --- a/datasource/src/main/scala/astraea/spark/rasterframes/datasource/geotrellis/TileFeatureSupport.scala +++ b/datasource/src/main/scala/astraea/spark/rasterframes/datasource/geotrellis/TileFeatureSupport.scala @@ -28,16 +28,12 @@ import geotrellis.raster.resample.ResampleMethod import geotrellis.raster.{CellGrid, CellType, GridBounds, TileFeature} import geotrellis.util.MethodExtensions import geotrellis.vector.{Extent, Geometry} +import astraea.spark.rasterframes.util._ import scala.reflect.ClassTag trait TileFeatureSupport { - type WithMergeMethods[V] = (V => TileMergeMethods[V]) - type WithPrototypeMethods[V <: CellGrid] = (V => TilePrototypeMethods[V]) - type WithCropMethods[V <: CellGrid] = (V => TileCropMethods[V]) - type WithMaskMethods[V] = (V => TileMaskMethods[V]) - implicit class TileFeatureMethodsWrapper[V <: CellGrid: ClassTag: WithMergeMethods: WithPrototypeMethods: WithCropMethods: WithMaskMethods, D: MergeableData](val self: TileFeature[V, D]) extends TileMergeMethods[TileFeature[V, D]] with TilePrototypeMethods[TileFeature[V,D]] diff --git a/datasource/src/main/scala/astraea/spark/rasterframes/datasource/geotrellis/package.scala b/datasource/src/main/scala/astraea/spark/rasterframes/datasource/geotrellis/package.scala index 5d3e6fb98..0699c3b08 100644 --- a/datasource/src/main/scala/astraea/spark/rasterframes/datasource/geotrellis/package.scala +++ b/datasource/src/main/scala/astraea/spark/rasterframes/datasource/geotrellis/package.scala @@ -21,6 +21,7 @@ package astraea.spark.rasterframes.datasource import java.net.URI import _root_.geotrellis.spark.LayerId +import astraea.spark.rasterframes.datasource.geotrellis.DefaultSource._ import astraea.spark.rasterframes.{RasterFrame, _} import org.apache.spark.sql._ import org.apache.spark.sql.functions.col @@ -28,7 +29,7 @@ import shapeless.tag import shapeless.tag.@@ /** - * Module utilities. + * Extension methods for literate and type-safe loading of geotrellis layers. * * @since 1/12/18 */ @@ -55,35 +56,44 @@ package object geotrellis { reader.format("geotrellis-catalog").load(base.toASCIIString) def geotrellis: GeoTrellisRasterFrameReader = - tag[GeoTrellisRasterFrameReaderTag][DataFrameReader](reader.format("geotrellis")) + tag[GeoTrellisRasterFrameReaderTag][DataFrameReader](reader.format(SHORT_NAME)) } implicit class DataFrameWriterHasGeotrellisFormat[T](val writer: DataFrameWriter[T]) { def geotrellis: GeoTrellisRasterFrameWriter[T] = - tag[GeoTrellisRasterFrameWriterTag][DataFrameWriter[T]](writer.format("geotrellis")) + tag[GeoTrellisRasterFrameWriterTag][DataFrameWriter[T]](writer.format(SHORT_NAME)) } implicit class GeoTrellisWriterAddLayer[T](val writer: GeoTrellisRasterFrameWriter[T]) { def asLayer(id: LayerId): DataFrameWriter[T] = writer - .option("layer", id.name) - .option("zoom", id.zoom.toString) + .option(LAYER_PARAM, id.name) + .option(ZOOM_PARAM, id.zoom.toString) def asLayer(layer: Layer): DataFrameWriter[T] = asLayer(layer.id) .option("path", layer.base.toASCIIString) } - /** Extension methods for loading a RasterFrame from a tagged `DataFrameReader`. */ + /** Extension methods for loading a RasterFrame from a tagged `DataFrameReader`. */ implicit class GeoTrellisReaderWithRF(val reader: GeoTrellisRasterFrameReader) { + def withTileSubdivisions(divs: Int): GeoTrellisRasterFrameReader = + tag[GeoTrellisRasterFrameReaderTag][DataFrameReader]( + reader.option(DefaultSource.TILE_SUBDIVISIONS_PARAM, divs) + ) + + def withNumPartitions(partitions: Int): GeoTrellisRasterFrameReader = + tag[GeoTrellisRasterFrameReaderTag][DataFrameReader]( + reader.option(DefaultSource.NUM_PARTITIONS_PARAM, partitions) + ) + def loadRF(uri: URI, id: LayerId): RasterFrame = reader - .option("layer", id.name) - .option("zoom", id.zoom.toString) + .option(LAYER_PARAM, id.name) + .option(ZOOM_PARAM, id.zoom.toString) .load(uri.toASCIIString) .asRF def loadRF(layer: Layer): RasterFrame = loadRF(layer.base, layer.id) } - } diff --git a/datasource/src/main/scala/astraea/spark/rasterframes/datasource/package.scala b/datasource/src/main/scala/astraea/spark/rasterframes/datasource/package.scala index 4dc603d0e..8f1852e25 100644 --- a/datasource/src/main/scala/astraea/spark/rasterframes/datasource/package.scala +++ b/datasource/src/main/scala/astraea/spark/rasterframes/datasource/package.scala @@ -18,9 +18,23 @@ */ package astraea.spark.rasterframes + +import java.net.URI + +import scala.util.Try + /** + * Module utilities * * @since 1/13/18 */ package object datasource { + + private[rasterframes] + def numParam(key: String, parameters: Map[String, String]): Option[Long] = + parameters.get(key).map(_.toLong) + + private[rasterframes] + def uriParam(key: String, parameters: Map[String, String]) = + parameters.get(key).flatMap(p ⇒ Try(URI.create(p)).toOption) } diff --git a/datasource/src/test/scala/astraea/spark/rasterframes/datasource/geotiff/GeoTiffDataSourceSpec.scala b/datasource/src/test/scala/astraea/spark/rasterframes/datasource/geotiff/GeoTiffDataSourceSpec.scala index 00e8b5d4a..6b59af02b 100644 --- a/datasource/src/test/scala/astraea/spark/rasterframes/datasource/geotiff/GeoTiffDataSourceSpec.scala +++ b/datasource/src/test/scala/astraea/spark/rasterframes/datasource/geotiff/GeoTiffDataSourceSpec.scala @@ -18,6 +18,8 @@ */ package astraea.spark.rasterframes.datasource.geotiff +import java.nio.file.Paths + import astraea.spark.rasterframes._ /** @@ -80,11 +82,24 @@ class GeoTiffDataSourceSpec } - it("should write RF to parquet") { + it("should write GeoTIFF RF to parquet") { val rf = spark.read .geotiff .loadRF(cogPath) assert(write(rf)) } + + it("should write GeoTIFF") { + + val rf = spark.read + .geotiff + .loadRF(cogPath) + + val out = Paths.get(outputLocalPath, "example-geotiff.tiff") + //val out = Paths.get("target", "example-geotiff.tiff") + noException shouldBe thrownBy { + rf.write.geotiff.save(out.toString) + } + } } } diff --git a/datasource/src/test/scala/astraea/spark/rasterframes/datasource/geotrellis/GeoTrellisDataSourceSpec.scala b/datasource/src/test/scala/astraea/spark/rasterframes/datasource/geotrellis/GeoTrellisDataSourceSpec.scala index ea5ab493d..d63e1e398 100644 --- a/datasource/src/test/scala/astraea/spark/rasterframes/datasource/geotrellis/GeoTrellisDataSourceSpec.scala +++ b/datasource/src/test/scala/astraea/spark/rasterframes/datasource/geotrellis/GeoTrellisDataSourceSpec.scala @@ -22,9 +22,14 @@ import java.io.File import java.time.ZonedDateTime import astraea.spark.rasterframes._ +import astraea.spark.rasterframes.datasource.geotrellis.DefaultSource._ import astraea.spark.rasterframes.util._ +import astraea.spark.rasterframes.util.debug._ import geotrellis.proj4.LatLng import geotrellis.raster._ +import geotrellis.raster.io.geotiff.GeoTiff +import geotrellis.raster.resample.{NearestNeighbor, ResampleMethod} +import geotrellis.raster.testkit.RasterMatchers import geotrellis.spark._ import geotrellis.spark.io._ import geotrellis.spark.io.avro.AvroRecordCodec @@ -37,33 +42,40 @@ import org.apache.hadoop.fs.FileUtil import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.functions.{udf ⇒ sparkUdf, _} import org.apache.spark.sql.{DataFrame, Row} -import org.scalatest.{BeforeAndAfter, Inspectors} +import org.scalatest.{BeforeAndAfterAll, Inspectors} import org.apache.avro.generic._ import org.apache.spark.storage.StorageLevel -import scala.math.{min, max} +import scala.math.{max, min} class GeoTrellisDataSourceSpec - extends TestEnvironment with TestData with BeforeAndAfter with Inspectors - with IntelliJPresentationCompilerHack { + extends TestEnvironment with TestData with BeforeAndAfterAll with Inspectors + with RasterMatchers with IntelliJPresentationCompilerHack { + val tileSize = 12 lazy val layer = Layer(new File(outputLocalPath).toURI, LayerId("test-layer", 4)) lazy val tfLayer = Layer(new File(outputLocalPath).toURI, LayerId("test-tf-layer", 4)) + lazy val sampleImageLayer = Layer(new File(outputLocalPath).toURI, LayerId("sample", 0)) val now = ZonedDateTime.now() val tileCoordRange = 2 to 5 lazy val testRdd = { + val ct = IntCellType + + //UByteConstantNoDataCellType val recs: Seq[(SpaceTimeKey, Tile)] = for { col ← tileCoordRange row ← tileCoordRange - } yield SpaceTimeKey(col, row, now) -> ArrayTile.alloc(DoubleConstantNoDataCellType, 12, 12) - + } yield SpaceTimeKey(col, row, now) -> TestData.randomTile(tileSize, tileSize, ct) val rdd = sc.parallelize(recs) - val scheme = ZoomedLayoutScheme(LatLng, tileSize = 12) + val scheme = ZoomedLayoutScheme(LatLng, tileSize = tileSize) val layerLayout = scheme.levelForZoom(4).layout - val layerBounds = KeyBounds(SpaceTimeKey(2, 2, now.minusMonths(1)), SpaceTimeKey(5, 5, now.plusMonths(1))) + val layerBounds = KeyBounds( + SpaceTimeKey(tileCoordRange.start, tileCoordRange.start, now.minusMonths(1)), + SpaceTimeKey(tileCoordRange.end, tileCoordRange.end, now.plusMonths(1)) + ) val md = TileLayerMetadata[SpaceTimeKey]( - cellType = DoubleConstantNoDataCellType, + cellType = ct, crs = LatLng, bounds = layerBounds, layout = layerLayout, @@ -71,7 +83,7 @@ class GeoTrellisDataSourceSpec ContextRDD(rdd, md) } - before { + override def beforeAll = { val outputDir = new File(layer.base) FileUtil.fullyDelete(outputDir) outputDir.deleteOnExit() @@ -102,8 +114,12 @@ class GeoTrellisDataSourceSpec val writer = LayerWriter(tfLayer.base) val tlfRdd = ContextRDD(tfRdd, testRdd.metadata) writer.write(tfLayer.id, tlfRdd, ZCurveKeyIndexMethod.byDay()) - } + //TestData.sampleTileLayerRDD.toRF.write.geotrellis.asLayer(sampleImageLayer).save() + val writer2 = LayerWriter(sampleImageLayer.base) + val imgRDD = TestData.sampleTileLayerRDD + writer2.write(sampleImageLayer.id, imgRDD, ZCurveKeyIndexMethod) + } describe("DataSource reading") { def layerReader = spark.read.geotrellis @@ -150,20 +166,96 @@ class GeoTrellisDataSourceSpec } describe("DataSource options") { - def layerReader = spark.read.geotrellis - it("should respect partitions 2") { val expected = 2 - val df = spark.read.option("numPartitions", expected) - .geotrellis.loadRF(layer) + val df = spark.read.geotrellis + .withNumPartitions(expected) + .loadRF(layer) assert(df.rdd.partitions.length === expected) } it("should respect partitions 20") { val expected = 20 - val df = spark.read.option("numPartitions", expected) - .geotrellis.loadRF(layer) + val df = spark.read.geotrellis + .withNumPartitions(expected) + .loadRF(layer) assert(df.rdd.partitions.length === expected) } + it("should respect subdivide 2") { + val param = 2 + val df: RasterFrame = spark.read.geotrellis + .withTileSubdivisions(param) + .loadRF(layer) + + val dims = df.select(tileDimensions(df.tileColumns.head)("cols"), tileDimensions(df.tileColumns.head)("rows")).first() + assert(dims.getAs[Int](0) === tileSize / param) + assert(dims.getAs[Int](1) === tileSize / param) + + // row count will increase + assert(df.count === testRdd.count() * param * param) + } + it("should respect subdivide with TileFeature"){ + val param = 2 + val rf: RasterFrame = spark.read.geotrellis + .withTileSubdivisions(param) + .loadRF(tfLayer) + + val dims = rf.select(tileDimensions(rf.tileColumns.head)("cols"), tileDimensions(rf.tileColumns.head)("rows")) + .first() + assert(dims.getAs[Int](0) === tileSize / param) + assert(dims.getAs[Int](1) === tileSize / param) + + assert(rf.count() === testRdd.count() * param * param) + } + + it("should respect both subdivideTile and numPartitions"){ + val subParam = 3 + + val rf = spark.read + .geotrellis + .withNumPartitions(7) + .withTileSubdivisions(subParam) + .loadRF(layer) + + // is it partitioned correctly? + assert(rf.rdd.partitions.length === 7) + + // is it subdivided? + assert(rf.count === testRdd.count * subParam * subParam) + val dims = rf.select(tileDimensions(rf.tileColumns.head)("cols"), tileDimensions(rf.tileColumns.head)("rows")) + .first() + assert(dims.getAs[Int](0) === tileSize / subParam) + assert(dims.getAs[Int](1) === tileSize / subParam) + } + + it("should subdivide tiles properly") { + val subs = 4 + val rf = spark.read.geotrellis + .withTileSubdivisions(subs) + .loadRF(sampleImageLayer) + + + assert(rf.count === (TestData.sampleTileLayerRDD.count * subs * subs)) + + val (width, height) = sampleGeoTiff.tile.dimensions + + val raster = rf.toRaster(rf.tileColumns.head, width, height, NearestNeighbor) + + assertEqual(raster.tile, sampleGeoTiff.tile) + + //GeoTiff(raster).write("target/from-split.tiff") + // 774 x 500 + } + + it("should throw on subdivide 5") { + // only throws when an action is taken... + assertThrows[IllegalArgumentException](spark.read.geotrellis.withTileSubdivisions(5).loadRF(layer).cache) + } + it("should throw on subdivide 13") { + assertThrows[IllegalArgumentException](spark.read.geotrellis.withTileSubdivisions(13).loadRF(layer).cache) + } + it("should throw on subdivide -3") { + assertThrows[IllegalArgumentException](spark.read.geotrellis.withTileSubdivisions(-3).loadRF(layer).count) + } } describe("Predicate push-down support") { @@ -329,5 +421,37 @@ class GeoTrellisDataSourceSpec //rf.show(false) assert(rf.collect().length === testRdd.count()) } + it("should respect subdivideTile option on TileFeature RasterFrame") { + val subParam = 4 + val rf = spark.read.option(TILE_SUBDIVISIONS_PARAM, subParam).geotrellis.loadRF(tfLayer) + + assert(rf.count === testRdd.count * subParam * subParam) + + val dims = rf.select(tileDimensions(rf.tileColumns.head)("cols"), tileDimensions(rf.tileColumns.head)("rows")) + .first() + assert(dims.getAs[Int](0) === tileSize / subParam) + assert(dims.getAs[Int](1) === tileSize / subParam) + } + it("should respect both `subdivideTile` and `numPartition` options on TileFeature"){ + val subParam = 2 + + val rf = spark.read + .option(TILE_SUBDIVISIONS_PARAM, subParam) + .option(NUM_PARTITIONS_PARAM, 10) + .geotrellis.loadRF(tfLayer) + + // is it subdivided? + assert(rf.count === testRdd.count * subParam * subParam) + val dims = rf.select(tileDimensions(rf.tileColumns.head)("cols"), tileDimensions(rf.tileColumns.head)("rows")) + .first() + assert(dims.getAs[Int](0) === tileSize / subParam) + assert(dims.getAs[Int](1) === tileSize / subParam) + + // is it partitioned correctly? + assert(rf.rdd.partitions.length === 10) + } + it("should respect options on spatial-only TileFeature"){ + assert(true === true) + } } } diff --git a/datasource/src/test/scala/astraea/spark/rasterframes/datasource/geotrellis/TileFeatureSupportSpec.scala b/datasource/src/test/scala/astraea/spark/rasterframes/datasource/geotrellis/TileFeatureSupportSpec.scala index b621fa915..e7fa73d7c 100644 --- a/datasource/src/test/scala/astraea/spark/rasterframes/datasource/geotrellis/TileFeatureSupportSpec.scala +++ b/datasource/src/test/scala/astraea/spark/rasterframes/datasource/geotrellis/TileFeatureSupportSpec.scala @@ -21,11 +21,12 @@ package astraea.spark.rasterframes.datasource.geotrellis import astraea.spark.rasterframes._ import astraea.spark.rasterframes.datasource.geotrellis.TileFeatureSupport._ +import astraea.spark.rasterframes.util.{WithCropMethods, WithMaskMethods, WithMergeMethods, WithPrototypeMethods} import geotrellis.proj4.LatLng import geotrellis.raster.crop.Crop import geotrellis.raster.rasterize.Rasterizer import geotrellis.raster.resample.Bilinear -import geotrellis.raster.{CellGrid, GridBounds, IntCellType, ShortConstantNoDataCellType, Tile, TileFeature, TileLayout} +import geotrellis.raster.{CellGrid, GridBounds, IntCellType, ShortCellType, ShortConstantNoDataCellType, Tile, TileFeature, TileLayout} import geotrellis.spark.tiling.Implicits._ import geotrellis.spark.tiling._ import geotrellis.vector.{Extent, ProjectedExtent} @@ -167,7 +168,7 @@ object TileFeatureSupportSpec { sc.parallelize((1 to n).map(i => { val (latMin, latMax) = rnd.nextOrderedPair(90) val (lonMin, lonMax) = rnd.nextOrderedPair(180) - (ProjectedExtent(Extent(lonMin, latMin, lonMax, latMax),LatLng),TestData.randomTile(20, 20, "int16")) + (ProjectedExtent(Extent(lonMin, latMin, lonMax, latMax),LatLng),TestData.randomTile(20, 20, ShortCellType)) })) } diff --git a/docs/src/main/tut/getting-started.md b/docs/src/main/tut/getting-started.md index 216fa54fd..7e8c195f8 100644 --- a/docs/src/main/tut/getting-started.md +++ b/docs/src/main/tut/getting-started.md @@ -26,13 +26,21 @@ To use RasterFrames, add the following library dependencies: @@dependency[sbt,Maven,Gradle] { group="io.astraea" - artifact="raster-frames_2.11" + artifact="rasterframes_2.11" version="x.y.z" } @@dependency[sbt,Maven,Gradle] { group="io.astraea" - artifact="raster-frames-datasource_2.11" + artifact="rasterframes-datasource_2.11" + version="x.y.z" +} + +Optional: + +@@dependency[sbt,Maven,Gradle] { + group="io.astraea" + artifact="rasterframes-experimental_2.11" version="x.y.z" } diff --git a/docs/src/main/tut/release-notes.md b/docs/src/main/tut/release-notes.md index 451955129..51e2bc77b 100644 --- a/docs/src/main/tut/release-notes.md +++ b/docs/src/main/tut/release-notes.md @@ -2,6 +2,21 @@ ## 0.6.x +### 0.6.2 + +* Updated to GeoMesa version 2.0.0-rc.1. +* Added for for writing GeoTIFFs from RasterFrames via with DataFrameWriter. +* Added `spark.read.geotrellis.withNumPartitions(Int)` for setting the initial number of partitions to use when reading a layer. +* Added `spark.read.geotrellis.withTileSubdivisions(Int)` for evenly subdividing tiles before they become rows in a RasterFrame. +* Added `experimental` package for sandboxing new feature ideas. +* Added `SlippyExport` experimental feature for exporting the contents of a RasterFrame as a [SlippyMap](https://wiki.openstreetmap.org/wiki/Slippy_Map) + tile image directory structure and Leaflet-enabled HTML file. +* _Change_: Default interpoation for `toRaster` and `toMultibandRaster` has been changed from `Bilinear` to `NearestNeighbor`. +* _Breaking_: Renamed/moved `astraea.spark.rasterframes.functions.CellStatsAggregateFunction.Statistics` to +`astraea.spark.rasterframes.stats.CellStatistics`. +* _Breaking_: `HistogramAggregateFunction` now generates a `astraea.spark.rasterframes.stats.CellHistogram`. + + ### 0.6.1 * Added support for reading striped GeoTiffs (#64). diff --git a/experimental/README.md b/experimental/README.md new file mode 100644 index 000000000..e6da48cdf --- /dev/null +++ b/experimental/README.md @@ -0,0 +1,7 @@ +# RasterFrames Experimental Module + +## Notice + +Features in this modules are experimental, and as such should not be considered stable or production-ready, +regardless of project version number. It is a place to try out new ideas, iterate over them, and over time either reject +them or promote them to core RasterFrame features. \ No newline at end of file diff --git a/experimental/build.sbt b/experimental/build.sbt new file mode 100644 index 000000000..ddb8b7450 --- /dev/null +++ b/experimental/build.sbt @@ -0,0 +1,8 @@ +moduleName := "rasterframes-experimental" + +libraryDependencies ++= Seq( + geotrellis("s3").value, + spark("core").value % Provided, + spark("mllib").value % Provided, + spark("sql").value % Provided +) \ No newline at end of file diff --git a/experimental/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/experimental/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister new file mode 100644 index 000000000..adc5d2c83 --- /dev/null +++ b/experimental/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -0,0 +1 @@ +astraea.spark.rasterframes.experimental.datasource.geojson.DefaultSource diff --git a/experimental/src/main/resources/slippy.html b/experimental/src/main/resources/slippy.html new file mode 100644 index 000000000..321fbc5bb --- /dev/null +++ b/experimental/src/main/resources/slippy.html @@ -0,0 +1,84 @@ + + + + +
+ +