diff --git a/src/caf/distribute/gravity_model/core.py b/src/caf/distribute/gravity_model/core.py index 62cba89..e920748 100644 --- a/src/caf/distribute/gravity_model/core.py +++ b/src/caf/distribute/gravity_model/core.py @@ -1,27 +1,22 @@ # -*- coding: utf-8 -*- """Core abstract functionality for gravity model classes to build on.""" # Built-Ins -import os import abc +import dataclasses +import functools import logging +import os import warnings -import functools -import dataclasses - -from typing import Any -from typing import Optional +from typing import Any, Optional # Third Party import numpy as np import pandas as pd - +from caf.toolkit import cost_utils, io, timing from scipy import optimize # Local Imports # pylint: disable=import-error,wrong-import-position -from caf.toolkit import io -from caf.toolkit import timing -from caf.toolkit import cost_utils from caf.distribute import cost_functions # pylint: enable=import-error,wrong-import-position @@ -426,6 +421,7 @@ def _calculate_perceived_factors( def _apply_perceived_factors(self, cost_matrix: np.ndarray) -> np.ndarray: return cost_matrix * self._perceived_factors + @abc.abstractmethod def _gravity_function( self, cost_args: list[float], @@ -433,7 +429,7 @@ def _gravity_function( target_cost_distribution: Optional[cost_utils.CostDistribution] = None, diff_step: float = 0.0, **kwargs, - ): + ) -> np.ndarray: """Calculate residuals to the target cost distribution. Runs gravity model with given parameters and converts into achieved @@ -448,57 +444,9 @@ def _gravity_function( self.achieved_distribution self.optimal_cost_params """ - # TODO(MB) Move definition to single area class and change to ABC method - # Not used, but need for compatibility with self._jacobian_function - del diff_step - - # Init - cost_kwargs = self._cost_params_to_kwargs(cost_args) - cost_matrix = self._apply_perceived_factors(self.cost_matrix) - - # Furness trips to trip ends - matrix, iters, rmse = self.gravity_furness( - seed_matrix=self.cost_function.calculate(cost_matrix, **cost_kwargs), - **kwargs, - ) - - # Evaluate the performance of this run - cost_distribution, achieved_residuals, convergence = cost_distribution_stats( - achieved_trip_distribution=matrix, - cost_matrix=self.cost_matrix, - target_cost_distribution=target_cost_distribution, - ) - - # Log this iteration - end_time = timing.current_milli_time() - self._log_iteration( - log_path=running_log_path, - attempt_id=self._attempt_id, - loop_num=self._loop_num, - loop_time=(end_time - self._loop_start_time) / 1000, - cost_kwargs=cost_kwargs, - furness_iters=iters, - furness_rmse=rmse, - convergence=convergence, - ) - - # Update loop params and return the achieved band shares - self._loop_num += 1 - self._loop_start_time = timing.current_milli_time() - - # Update performance params - self.achieved_cost_dist = cost_distribution - self.achieved_convergence = convergence - self.achieved_distribution = matrix - - # Store the initial values to log later - if self.initial_cost_params is None: - self.initial_cost_params = cost_kwargs - if self.initial_convergence is None: - self.initial_convergence = convergence - - return achieved_residuals + raise NotImplementedError("abstract method to be implemented in child class") + @abc.abstractmethod def _jacobian_function( self, cost_args: list[float], @@ -506,7 +454,7 @@ def _jacobian_function( running_log_path: os.PathLike, target_cost_distribution: cost_utils.CostDistribution, **kwargs, - ): + ) -> np.ndarray: """Calculate the Jacobian for _gravity_function. The Jacobian is shape of (n_cost_bands, n_cost_args), where each index @@ -515,66 +463,7 @@ def _jacobian_function( Used by the `optimize.least_squares` function. """ - # TODO(MB) Move definition to single area class and change to ABC method - # pylint: disable=too-many-locals - # Not used, but need for compatibility with self._gravity_function - del running_log_path - del kwargs - - # Initialise the output - jacobian = np.zeros((target_cost_distribution.n_bins, len(cost_args))) - - # Initialise running params - cost_kwargs = self._cost_params_to_kwargs(cost_args) - cost_matrix = self._apply_perceived_factors(self.cost_matrix) - row_targets = self.achieved_distribution.sum(axis=1) - col_targets = self.achieved_distribution.sum(axis=0) - - # Estimate what the furness does to the matrix - base_matrix = self.cost_function.calculate(cost_matrix, **cost_kwargs) - furness_factor = np.divide( - self.achieved_distribution, - base_matrix, - where=base_matrix != 0, - out=np.zeros_like(base_matrix), - ) - - # Build the Jacobian matrix. - for i, cost_param in enumerate(self.cost_function.kw_order): - cost_step = cost_kwargs[cost_param] * diff_step - - # Get slightly adjusted base matrix - adj_cost_kwargs = cost_kwargs.copy() - adj_cost_kwargs[cost_param] += cost_step - adj_base_mat = self.cost_function.calculate(cost_matrix, **adj_cost_kwargs) - - # Estimate the impact of the furness - adj_distribution = adj_base_mat * furness_factor - if adj_distribution.sum() == 0: - raise ValueError("estimated furness matrix total is 0") - - # Convert to weights to estimate impact on output - adj_weights = adj_distribution / adj_distribution.sum() - adj_final = self.achieved_distribution.sum() * adj_weights - - # Finesse to match row / col targets - adj_final, *_ = self.jacobian_furness( - seed_matrix=adj_final, - row_targets=row_targets, - col_targets=col_targets, - ) - - # Calculate the Jacobian values for this cost param - adj_cost_dist = cost_utils.CostDistribution.from_data( - matrix=adj_final, - cost_matrix=self.cost_matrix, - bin_edges=target_cost_distribution.bin_edges, - ) - - jacobian_residuals = self.achieved_band_share - adj_cost_dist.band_share_vals - jacobian[:, i] = jacobian_residuals / cost_step - - return jacobian + raise NotImplementedError("abstract method to be implemented in child class") def _calibrate( self, @@ -1017,79 +906,6 @@ def calibrate_with_perceived_factors( ) return results - @abc.abstractmethod - def gravity_furness( - self, - seed_matrix: np.ndarray, - **kwargs, - ) -> tuple[np.ndarray, int, float]: - """Run a doubly constrained furness on the seed matrix. - - Wrapper around furness.doubly_constrained_furness, to be used when - running the furness withing the gravity model. - - Parameters - ---------- - seed_matrix: - Initial values for the furness. - - kwargs: - Additional arguments from the caller - allows arguments to be - passed to this function. - - Returns - ------- - furnessed_matrix: - The final furnessed matrix - - completed_iters: - The number of completed iterations before exiting - - achieved_rmse: - The Root Mean Squared Error difference achieved before exiting - """ - raise NotImplementedError - # TODO(MB) Remove once gravity function method has been moved - - @abc.abstractmethod - def jacobian_furness( - self, - seed_matrix: np.ndarray, - row_targets: np.ndarray, - col_targets: np.ndarray, - ) -> tuple[np.ndarray, int, float]: - """Run a doubly constrained furness on the seed matrix. - - Wrapper around furness.doubly_constrained_furness, to be used when - running the furness withing the jacobian calculation. - - Parameters - ---------- - seed_matrix: - Initial values for the furness. - - row_targets: - The target values for the sum of each row. - i.e. np.sum(seed_matrix, axis=1) - - col_targets: - The target values for the sum of each column - i.e. np.sum(seed_matrix, axis=0) - - Returns - ------- - furnessed_matrix: - The final furnessed matrix - - completed_iters: - The number of completed iterations before exiting - - achieved_rmse: - The Root Mean Squared Error difference achieved before exiting - """ - raise NotImplementedError - # TODO(MB) Remove once jacobian function method has been moved - def run_with_perceived_factors( self, cost_params: dict[str, Any], diff --git a/src/caf/distribute/gravity_model/single_area.py b/src/caf/distribute/gravity_model/single_area.py index fe34ab4..89ae832 100644 --- a/src/caf/distribute/gravity_model/single_area.py +++ b/src/caf/distribute/gravity_model/single_area.py @@ -2,19 +2,17 @@ """Implementation of a self-calibrating single area gravity model.""" # Built-Ins import logging - -from typing import Any +import os +from typing import Any, Optional # Third Party import numpy as np +from caf.toolkit import cost_utils, timing, toolbox from scipy import optimize # Local Imports # pylint: disable=import-error,wrong-import-position -from caf.toolkit import toolbox -from caf.toolkit import cost_utils -from caf.distribute import furness -from caf.distribute import cost_functions +from caf.distribute import cost_functions, furness from caf.distribute.gravity_model import core # pylint: enable=import-error,wrong-import-position @@ -66,87 +64,139 @@ def __init__( self.row_targets = row_targets self.col_targets = col_targets - def gravity_furness( + def _gravity_function( self, - seed_matrix: np.ndarray, + cost_args: list[float], + running_log_path: os.PathLike, + target_cost_distribution: Optional[cost_utils.CostDistribution] = None, + diff_step: float = 0.0, **kwargs, - ) -> tuple[np.ndarray, int, float]: - """Run a doubly constrained furness on the seed matrix. - - Wrapper around furness.doubly_constrained_furness, using class - attributes to set up the function call. - - Parameters - ---------- - seed_matrix: - Initial values for the furness. - - kwargs: - Additional arguments from the caller to pass to - `self.doubly_constrained_furness`. - - Returns - ------- - furnessed_matrix: - The final furnessed matrix + ) -> np.ndarray: + # inherited docstring + # Not used, but need for compatibility with self._jacobian_function + del diff_step - completed_iters: - The number of completed iterations before exiting + # Init + cost_kwargs = self._cost_params_to_kwargs(cost_args) + cost_matrix = self._apply_perceived_factors(self.cost_matrix) - achieved_rmse: - The Root Mean Squared Error difference achieved before exiting - """ - return furness.doubly_constrained_furness( - seed_vals=seed_matrix, + # Furness trips to trip ends + matrix, iters, rmse = furness.doubly_constrained_furness( + seed_vals=self.cost_function.calculate(cost_matrix, **cost_kwargs), row_targets=self.row_targets, col_targets=self.col_targets, **kwargs, ) - def jacobian_furness( - self, - seed_matrix: np.ndarray, - row_targets: np.ndarray, - col_targets: np.ndarray, - ) -> tuple[np.ndarray, int, float]: - """Run a doubly constrained furness on the seed matrix. + # Evaluate the performance of this run + cost_distribution, achieved_residuals, convergence = core.cost_distribution_stats( + achieved_trip_distribution=matrix, + cost_matrix=self.cost_matrix, + target_cost_distribution=target_cost_distribution, + ) + + # Log this iteration + end_time = timing.current_milli_time() + self._log_iteration( + log_path=running_log_path, + attempt_id=self._attempt_id, + loop_num=self._loop_num, + loop_time=(end_time - self._loop_start_time) / 1000, + cost_kwargs=cost_kwargs, + furness_iters=iters, + furness_rmse=rmse, + convergence=convergence, + ) - Wrapper around furness.doubly_constrained_furness, to be used when - running the furness withing the jacobian calculation. + # Update loop params and return the achieved band shares + self._loop_num += 1 + self._loop_start_time = timing.current_milli_time() - Parameters - ---------- - seed_matrix: - Initial values for the furness. + # Update performance params + self.achieved_cost_dist = cost_distribution + self.achieved_convergence = convergence + self.achieved_distribution = matrix - row_targets: - The target values for the sum of each row. - i.e. np.sum(seed_matrix, axis=1) + # Store the initial values to log later + if self.initial_cost_params is None: + self.initial_cost_params = cost_kwargs + if self.initial_convergence is None: + self.initial_convergence = convergence - col_targets: - The target values for the sum of each column - i.e. np.sum(seed_matrix, axis=0) + return achieved_residuals - Returns - ------- - furnessed_matrix: - The final furnessed matrix + def _jacobian_function( + self, + cost_args: list[float], + diff_step: float, + running_log_path: os.PathLike, + target_cost_distribution: cost_utils.CostDistribution, + **kwargs, + ) -> np.ndarray: + # inherited docstring + # pylint: disable=too-many-locals + # Not used, but need for compatibility with self._gravity_function + del running_log_path + del kwargs - completed_iters: - The number of completed iterations before exiting + # Initialise the output + jacobian = np.zeros((target_cost_distribution.n_bins, len(cost_args))) - achieved_rmse: - The Root Mean Squared Error difference achieved before exiting - """ - return furness.doubly_constrained_furness( - seed_vals=seed_matrix, - row_targets=row_targets, - col_targets=col_targets, - tol=1e-6, - max_iters=20, - warning=False, + # Initialise running params + cost_kwargs = self._cost_params_to_kwargs(cost_args) + cost_matrix = self._apply_perceived_factors(self.cost_matrix) + row_targets = self.achieved_distribution.sum(axis=1) + col_targets = self.achieved_distribution.sum(axis=0) + + # Estimate what the furness does to the matrix + base_matrix = self.cost_function.calculate(cost_matrix, **cost_kwargs) + furness_factor = np.divide( + self.achieved_distribution, + base_matrix, + where=base_matrix != 0, + out=np.zeros_like(base_matrix), ) + # Build the Jacobian matrix. + for i, cost_param in enumerate(self.cost_function.kw_order): + cost_step = cost_kwargs[cost_param] * diff_step + + # Get slightly adjusted base matrix + adj_cost_kwargs = cost_kwargs.copy() + adj_cost_kwargs[cost_param] += cost_step + adj_base_mat = self.cost_function.calculate(cost_matrix, **adj_cost_kwargs) + + # Estimate the impact of the furness + adj_distribution = adj_base_mat * furness_factor + if adj_distribution.sum() == 0: + raise ValueError("estimated furness matrix total is 0") + + # Convert to weights to estimate impact on output + adj_weights = adj_distribution / adj_distribution.sum() + adj_final = self.achieved_distribution.sum() * adj_weights + + # Finesse to match row / col targets + adj_final, *_ = furness.doubly_constrained_furness( + seed_vals=adj_final, + row_targets=row_targets, + col_targets=col_targets, + tol=1e-6, + max_iters=20, + warning=False, + ) + + # Calculate the Jacobian values for this cost param + adj_cost_dist = cost_utils.CostDistribution.from_data( + matrix=adj_final, + cost_matrix=self.cost_matrix, + bin_edges=target_cost_distribution.bin_edges, + ) + + jacobian_residuals = self.achieved_band_share - adj_cost_dist.band_share_vals + jacobian[:, i] = jacobian_residuals / cost_step + + return jacobian + def _guess_init_params( self, cost_args: list[float],