From f4d06657345486ea3d56b1906182e0144326d275 Mon Sep 17 00:00:00 2001 From: Pirmin Kalberer Date: Fri, 12 Feb 2021 20:08:32 +0100 Subject: [PATCH] Optimize task queue size for seeding --- Cargo.lock | 1 + t-rex-service/Cargo.toml | 1 + t-rex-service/src/mvt_service.rs | 9 ++++++--- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 07ea2c49..dde165af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2969,6 +2969,7 @@ dependencies = [ "elementtree", "futures-util", "log", + "num_cpus", "pbr", "percent-encoding", "serde", diff --git a/t-rex-service/Cargo.toml b/t-rex-service/Cargo.toml index 95b4212a..a0943380 100644 --- a/t-rex-service/Cargo.toml +++ b/t-rex-service/Cargo.toml @@ -19,6 +19,7 @@ serde_json = "1.0" percent-encoding = "2.1" elementtree = "0.5" log = "0.4" +num_cpus = "1.13" clap = "2.33" pbr = "1.0" tokio = { version = "1.2.0", features = ["full"] } diff --git a/t-rex-service/src/mvt_service.rs b/t-rex-service/src/mvt_service.rs index aa86124e..2727ddb0 100644 --- a/t-rex-service/src/mvt_service.rs +++ b/t-rex-service/src/mvt_service.rs @@ -228,7 +228,7 @@ impl MvtService { progress: bool, overwrite: bool, ) { - let mut rt = tokio::runtime::Runtime::new().expect("Couldn't initialize tokio runtime"); + let rt = tokio::runtime::Runtime::new().expect("Couldn't initialize tokio runtime"); self.init_cache(); let nodes = nodes.unwrap_or(1) as u64; let nodeno = nodeno.unwrap_or(0) as u64; @@ -299,7 +299,10 @@ impl MvtService { progress: bool, overwrite: bool, ) { - let task_queue_size = 128; + // Keep a queue of tasks waiting for parallel async execution (size >= #cores). + // libspatialite has a max connection limit of 64 for now. libspatialite (4.4.0) when + // compiled on top of GEOS 3.5.0 is able to support an arbitrary number of threads + let task_queue_size = cmp::min(num_cpus::get() * 2, 64); let mut tasks = Vec::with_capacity(task_queue_size); let griditer = GridIterator::new(ts_minzoom, ts_maxzoom, limits.clone()); let mut tileno: u64 = 0; @@ -335,7 +338,7 @@ impl MvtService { let tileset_name = tileset_name.clone(); tasks.push(task::spawn(async move { // rust-postgres starts its own Tokio runtime - // without spawn_blocking we get 'Cannot start a runtime from within a runtime' + // without spawn_blocking or block_in_place we get 'Cannot start a runtime from within a runtime' let mvt_tile = task::spawn_blocking(move || { svc.tile(&tileset_name, xtile as u32, ytile as u32, zoom, None) })