Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle Zip files on upload #905

Open
wants to merge 10 commits into
base: use-files-directly
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,18 @@
import com.conveyal.file.FileUtils;
import com.conveyal.gtfs.GTFSCache;
import com.conveyal.gtfs.GTFSFeed;
import com.conveyal.gtfs.error.GTFSError;
import com.conveyal.gtfs.error.GeneralError;
import com.conveyal.gtfs.model.Stop;
import com.conveyal.gtfs.validator.PostLoadValidator;
import com.conveyal.osmlib.Node;
import com.conveyal.osmlib.OSM;
import com.conveyal.r5.analyst.progress.ProgressInputStream;
import com.conveyal.r5.analyst.cluster.TransportNetworkConfig;
import com.conveyal.r5.analyst.progress.ProgressInputStream;
import com.conveyal.r5.analyst.progress.Task;
import com.conveyal.r5.streets.OSMCache;
import com.conveyal.r5.util.ExceptionUtils;
import com.mongodb.QueryBuilder;
import org.apache.commons.fileupload.FileItem;
import org.apache.commons.fileupload.disk.DiskFileItem;
import org.bson.types.ObjectId;
import org.locationtech.jts.geom.Envelope;
import org.slf4j.Logger;
Expand Down Expand Up @@ -107,19 +105,25 @@ private Bundle create (Request req, Response res) {
// Do some initial synchronous work setting up the bundle to fail fast if the request is bad.
final Map<String, List<FileItem>> files = HttpUtils.getRequestFiles(req.raw());
final Bundle bundle = new Bundle();
final File osmPbfFile;
final List<File> gtfsZipFiles;
try {
bundle.name = files.get("bundleName").get(0).getString("UTF-8");
bundle.regionId = files.get("regionId").get(0).getString("UTF-8");

if (files.get("osmId") != null) {
osmPbfFile = null;
bundle.osmId = files.get("osmId").get(0).getString("UTF-8");
Bundle bundleWithOsm = Persistence.bundles.find(QueryBuilder.start("osmId").is(bundle.osmId).get()).next();
if (bundleWithOsm == null) {
throw AnalysisServerException.badRequest("Selected OSM does not exist.");
}
} else {
osmPbfFile = HttpUtils.saveFileItemLocally(files.get("osm").get(0));
}

if (files.get("feedGroupId") != null) {
gtfsZipFiles = null;
bundle.feedGroupId = files.get("feedGroupId").get(0).getString("UTF-8");
Bundle bundleWithFeed = Persistence.bundles.find(QueryBuilder.start("feedGroupId").is(bundle.feedGroupId).get()).next();
if (bundleWithFeed == null) {
Expand All @@ -134,6 +138,8 @@ private Bundle create (Request req, Response res) {
bundle.feeds = bundleWithFeed.feeds;
bundle.feedsComplete = bundleWithFeed.feedsComplete;
bundle.totalFeeds = bundleWithFeed.totalFeeds;
} else {
gtfsZipFiles = HttpUtils.saveFileItemsLocally(files.get("feedGroup"));
}
UserPermissions userPermissions = UserPermissions.from(req);
bundle.accessGroup = userPermissions.accessGroup;
Expand All @@ -155,16 +161,15 @@ private Bundle create (Request req, Response res) {
.withWorkProduct(BUNDLE, bundle._id, bundle.regionId)
.withAction(progressListener -> {
try {
if (bundle.osmId == null) {
if (osmPbfFile != null) {
// Process uploaded OSM.
bundle.osmId = new ObjectId().toString();
DiskFileItem fi = (DiskFileItem) files.get("osm").get(0);
// Here we perform minimal validation by loading the OSM, but don't retain the resulting MapDB.
OSM osm = new OSM(null);
osm.intersectionDetection = true;
// Number of entities in an OSM file is unknown, so derive progress from the number of bytes read.
// Wrapping in buffered input stream should reduce number of progress updates.
osm.readPbf(ProgressInputStream.forFileItem(fi, progressListener));
osm.readPbf(ProgressInputStream.forFile(osmPbfFile, progressListener));
// osm.readPbf(new BufferedInputStream(fi.getInputStream()));
Envelope osmBounds = new Envelope();
for (Node n : osm.nodes.values()) {
Expand All @@ -173,10 +178,10 @@ private Bundle create (Request req, Response res) {
osm.close();
checkWgsEnvelopeSize(osmBounds, "OSM data");
// Store the source OSM file. Note that we're not storing the derived MapDB file here.
fileStorage.moveIntoStorage(OSMCache.getKey(bundle.osmId), fi.getStoreLocation());
fileStorage.moveIntoStorage(OSMCache.getKey(bundle.osmId), osmPbfFile);
}

if (bundle.feedGroupId == null) {
if (gtfsZipFiles != null) {
// Process uploaded GTFS files
bundle.feedGroupId = new ObjectId().toString();

Expand All @@ -186,8 +191,7 @@ private Bundle create (Request req, Response res) {
bundle.feeds = new ArrayList<>();
bundle.totalFeeds = files.get("feedGroup").size();

for (FileItem fileItem : files.get("feedGroup")) {
File feedFile = ((DiskFileItem) fileItem).getStoreLocation();
for (File feedFile : gtfsZipFiles) {
ZipFile zipFile = new ZipFile(feedFile);
File tempDbFile = FileUtils.createScratchFile("db");
File tempDbpFile = new File(tempDbFile.getAbsolutePath() + ".p");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.conveyal.analysis.AnalysisServerException;
import com.conveyal.analysis.UserPermissions;
import com.conveyal.analysis.components.TaskScheduler;
import com.conveyal.analysis.datasource.DataSourceUtil;
import com.conveyal.analysis.grids.SeamlessCensusGridExtractor;
import com.conveyal.analysis.models.DataGroup;
import com.conveyal.analysis.models.OpportunityDataset;
Expand All @@ -11,7 +12,6 @@
import com.conveyal.analysis.persistence.AnalysisCollection;
import com.conveyal.analysis.persistence.AnalysisDB;
import com.conveyal.analysis.persistence.Persistence;
import com.conveyal.analysis.util.FileItemInputStreamProvider;
import com.conveyal.analysis.util.HttpUtils;
import com.conveyal.analysis.util.JsonUtil;
import com.conveyal.file.FileStorage;
Expand All @@ -25,7 +25,6 @@
import com.conveyal.r5.analyst.progress.Task;
import com.conveyal.r5.analyst.progress.WorkProduct;
import com.conveyal.r5.util.ExceptionUtils;
import com.conveyal.r5.util.InputStreamProvider;
import com.conveyal.r5.util.ProgressListener;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.io.Files;
Expand Down Expand Up @@ -56,7 +55,6 @@
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

import static com.conveyal.analysis.datasource.DataSourceUtil.detectUploadFormatAndValidate;
import static com.conveyal.analysis.util.JsonUtil.toJson;
import static com.conveyal.file.FileCategory.GRIDS;
import static com.conveyal.r5.analyst.WebMercatorExtents.parseZoom;
Expand Down Expand Up @@ -275,7 +273,7 @@ private static FileStorageFormat getFormatCode (PointSet pointSet) {
* This method executes in a blocking (synchronous) manner, but it can take a while so should be called within an
* non-blocking asynchronous task.
*/
private List<FreeFormPointSet> createFreeFormPointSetsFromCsv(FileItem csvFileItem, Map<String, String> params) {
private List<FreeFormPointSet> createFreeFormPointSetsFromCsv(File csvFile, Map<String, String> params) {

String latField = params.get("latField");
String lonField = params.get("lonField");
Expand All @@ -296,12 +294,11 @@ private List<FreeFormPointSet> createFreeFormPointSetsFromCsv(FileItem csvFileIt

try {
List<FreeFormPointSet> pointSets = new ArrayList<>();
InputStreamProvider csvStreamProvider = new FileItemInputStreamProvider(csvFileItem);
pointSets.add(FreeFormPointSet.fromCsv(csvStreamProvider, latField, lonField, idField, countField));
pointSets.add(FreeFormPointSet.fromCsv(csvFile, latField, lonField, idField, countField));
// The second pair of lat and lon fields allow creating two matched pointsets from the same CSV.
// This is used for one-to-one travel times between specific origins/destinations.
if (latField2 != null && lonField2 != null) {
pointSets.add(FreeFormPointSet.fromCsv(csvStreamProvider, latField2, lonField2, idField, countField));
pointSets.add(FreeFormPointSet.fromCsv(csvFile, latField2, lonField2, idField, countField));
}
return pointSets;
} catch (Exception e) {
Expand Down Expand Up @@ -329,22 +326,25 @@ private OpportunityDatasetUploadStatus createOpportunityDataset(Request req, Res
// are recorded in a persistent purpose-built way rather than falling back on the UI's catch-all error window.
// TODO more standardized mechanism for tracking asynchronous tasks and catching exceptions on them
OpportunityDatasetUploadStatus status = new OpportunityDatasetUploadStatus(regionId, sourceName);
addStatusAndRemoveOldStatuses(status);

final List<FileItem> fileItems;
final List<File> files;
final FileStorageFormat uploadFormat;
final Map<String, String> parameters;
try {
// Validate inputs and parameters, which will throw an exception if there's anything wrong with them.
// Call remove() rather than get() so that subsequent code will see only string parameters, not the files.
fileItems = formFields.remove("files");
uploadFormat = detectUploadFormatAndValidate(fileItems);
files = HttpUtils.extractFilesFromFileItemsAndUnzip(formFields.remove("files"));
uploadFormat = DataSourceUtil.detectUploadFormatAndValidate(files);
parameters = extractStringParameters(formFields);
} catch (Exception e) {
status.completeWithError(e);
res.status(400);
return status;
}

// Add the status to the region wide tracker before we begin the heavy tasks.
addStatusAndRemoveOldStatuses(status);

// We are going to call several potentially slow blocking methods to create and persist new pointsets.
// This whole series of actions will be run sequentially but within an asynchronous Executor task.
// After enqueueing, the status is returned so the UI can track progress.
Expand All @@ -354,35 +354,35 @@ private OpportunityDatasetUploadStatus createOpportunityDataset(Request req, Res
List<PointSet> pointsets = new ArrayList<>();
if (uploadFormat == FileStorageFormat.GRID) {
LOG.info("Detected opportunity dataset stored in Conveyal binary format.");
pointsets.addAll(createGridsFromBinaryGridFiles(fileItems, status));
pointsets.addAll(createGridsFromBinaryGridFiles(files, status));
} else if (uploadFormat == FileStorageFormat.SHP) {
LOG.info("Detected opportunity dataset stored as ESRI shapefile.");
pointsets.addAll(createGridsFromShapefile(fileItems, zoom, status));
pointsets.addAll(createGridsFromShapefile(files, zoom, status));
} else if (uploadFormat == FileStorageFormat.CSV) {
LOG.info("Detected opportunity dataset stored as CSV");
// Create a grid even when user has requested a freeform pointset so we have something to visualize.
FileItem csvFileItem = fileItems.get(0);
File csvFile = files.get(0);
// FIXME why were we uploading to S3 using the file path not the UUID?
// writeFileToS3(csvFile);
// TODO report progress / status as with grids. That involves pre-scanning the CSV which would be
// facilitated by retaining the CSV server side and later converting to pointset.
boolean requestedFreeForm = Boolean.parseBoolean(parameters.get("freeform"));
// Hack to enable freeform pointset building without exposing a UI element, via file name.
if (csvFileItem.getName().contains("FREEFORM_PS.")) {
if (csvFile.getName().contains("FREEFORM_PS.")) {
requestedFreeForm = true;
}
if (requestedFreeForm) {
LOG.info("Processing CSV as freeform (rather than gridded) pointset as requested.");
// This newer process creates a FreeFormPointSet only for the specified count fields,
// as well as a Grid to assist in visualization of the uploaded data.
for (FreeFormPointSet freeForm : createFreeFormPointSetsFromCsv(csvFileItem, parameters)) {
for (FreeFormPointSet freeForm : createFreeFormPointSetsFromCsv(csvFile, parameters)) {
Grid gridFromFreeForm = Grid.fromFreeForm(freeForm, zoom);
pointsets.add(freeForm);
pointsets.add(gridFromFreeForm);
}
} else {
// This is the common default process: create a grid for every non-ignored field in the CSV.
pointsets.addAll(createGridsFromCsv(csvFileItem, formFields, zoom, status));
pointsets.addAll(createGridsFromCsv(csvFile, formFields, zoom, status));
}
}
if (pointsets.isEmpty()) {
Expand Down Expand Up @@ -473,7 +473,7 @@ private OpportunityDataset deleteDataset(String id, UserPermissions userPermissi
* TODO explain latField2 usage
* @return one or two Grids for each numeric column in the CSV input.
*/
private List<Grid> createGridsFromCsv(FileItem csvFileItem,
private List<Grid> createGridsFromCsv(File csvFile,
Map<String, List<FileItem>> query,
int zoom,
OpportunityDatasetUploadStatus status) throws Exception {
Expand All @@ -488,12 +488,11 @@ private List<Grid> createGridsFromCsv(FileItem csvFileItem,
String lonField2 = HttpUtils.getFormField(query, "lonField2", false);

List<String> ignoreFields = Arrays.asList(idField, latField2, lonField2);
InputStreamProvider csvStreamProvider = new FileItemInputStreamProvider(csvFileItem);
List<Grid> grids = Grid.fromCsv(csvStreamProvider, latField, lonField, ignoreFields, zoom, status);
List<Grid> grids = Grid.fromCsv(csvFile, latField, lonField, ignoreFields, zoom, status);
// TODO verify correctness of this second pass
if (latField2 != null && lonField2 != null) {
ignoreFields = Arrays.asList(idField, latField, lonField);
grids.addAll(Grid.fromCsv(csvStreamProvider, latField2, lonField2, ignoreFields, zoom, status));
grids.addAll(Grid.fromCsv(csvFile, latField2, lonField2, ignoreFields, zoom, status));
}

return grids;
Expand All @@ -503,14 +502,14 @@ private List<Grid> createGridsFromCsv(FileItem csvFileItem,
* Create a grid from an input stream containing a binary grid file.
* For those in the know, we can upload manually created binary grid files.
*/
private List<Grid> createGridsFromBinaryGridFiles(List<FileItem> uploadedFiles,
private List<Grid> createGridsFromBinaryGridFiles(List<File> uploadedFiles,
OpportunityDatasetUploadStatus status) throws Exception {

List<Grid> grids = new ArrayList<>();
status.totalFeatures = uploadedFiles.size();
for (FileItem fileItem : uploadedFiles) {
Grid grid = Grid.read(fileItem.getInputStream());
String name = fileItem.getName();
for (File file : uploadedFiles) {
Grid grid = Grid.read(FileUtils.getInputStream(file));
String name = file.getName();
// Remove ".grid" from the name
if (name.contains(".grid")) name = name.split(".grid")[0];
grid.name = name;
Expand All @@ -522,37 +521,37 @@ private List<Grid> createGridsFromBinaryGridFiles(List<FileItem> uploadedFiles,
}

/**
* Preconditions: fileItems must contain SHP, DBF, and PRJ files, and optionally SHX. All files should have the
* Preconditions: files must contain SHP, DBF, and PRJ files, and optionally SHX. All files should have the
* same base name, and should not contain any other files but these three or four.
*/
private List<Grid> createGridsFromShapefile(List<FileItem> fileItems,
private List<Grid> createGridsFromShapefile(List<File> files,
int zoom,
OpportunityDatasetUploadStatus status) throws Exception {

// In the caller, we should have already verified that all files have the same base name and have an extension.
// Extract the relevant files: .shp, .prj, .dbf, and .shx.
// We need the SHX even though we're looping over every feature as they might be sparse.
Map<String, FileItem> filesByExtension = new HashMap<>();
for (FileItem fileItem : fileItems) {
filesByExtension.put(FilenameUtils.getExtension(fileItem.getName()).toUpperCase(), fileItem);
Map<String, File> filesByExtension = new HashMap<>();
for (File file : files) {
filesByExtension.put(FilenameUtils.getExtension(file.getName()).toUpperCase(), file);
}

// Copy the shapefile component files into a temporary directory with a fixed base name.
File tempDir = Files.createTempDir();
File tempDir = FileUtils.createScratchDirectory();

File shpFile = new File(tempDir, "grid.shp");
filesByExtension.get("SHP").write(shpFile);
Files.copy(filesByExtension.get("SHP"), shpFile);

File prjFile = new File(tempDir, "grid.prj");
filesByExtension.get("PRJ").write(prjFile);
Files.copy(filesByExtension.get("PRJ"), prjFile);

File dbfFile = new File(tempDir, "grid.dbf");
filesByExtension.get("DBF").write(dbfFile);
Files.copy(filesByExtension.get("DBF"), dbfFile);

// The .shx file is an index. It is optional, and not needed for dense shapefiles.
if (filesByExtension.containsKey("SHX")) {
File shxFile = new File(tempDir, "grid.shx");
filesByExtension.get("SHX").write(shxFile);
Files.copy(filesByExtension.get("SHX"), shxFile);
}

List<Grid> grids = Grid.fromShapefile(shpFile, zoom, status);
Expand Down Expand Up @@ -625,6 +624,7 @@ public static class OpportunityDatasetUploadStatus implements ProgressListener {
public Status status = Status.PROCESSING;
public String name;
public String message;
public String stackTrace;
public Date createdAt;
public Date completedAt;

Expand All @@ -641,7 +641,8 @@ private void completed (Status status) {
}

public void completeWithError (Exception e) {
message = "Unable to create opportunity dataset. " + ExceptionUtils.stackTraceString(e);
stackTrace = ExceptionUtils.stackTraceString(e);
message = "Unable to create opportunity dataset. " + e.getMessage();
completed(Status.ERROR);
}

Expand Down
Loading