diff --git a/README.md b/README.md index de77500..0cb371f 100644 --- a/README.md +++ b/README.md @@ -41,6 +41,92 @@ You are free to tweak the hyperparameters and the network architecture to see ho I used the [MNIST dataset](https://en.wikipedia.org/wiki/MNIST_database) to test the library, but you can use any dataset you want. +## 🚀 Quick examples (more [here](examples/)) + +### Binary Classification + +```python +from neuralnetlib.model import Model +from neuralnetlib.layers import Input, Dense +from neuralnetlib.activations import Sigmoid +from neuralnetlib.losses import BinaryCrossentropy +from neuralnetlib.optimizers import SGD +from neuralnetlib.metrics import accuracy_score + +# ... Preprocess x_train, y_train, x_test, y_test if necessary (you can use neuralnetlib.preprocess and neuralnetlib.utils) + +# Create a model +model = Model() +model.add(Input(10)) # 10 features +model.add(Dense(8)) +model.add(Dense(1)) +model.add(Activation(Sigmoid())) # many way to tell the model which Activation Function you'd like, see the next example + +# Compile the model +model.compile(loss_function='bce', optimizer='sgd') + +# Train the model +model.fit(X_train, y_train, epochs=10, batch_size=32, metrics=[accuracy_score]) +``` + +### Multiclass Classification + +```python +from neuralnetlib.activations import Softmax +from neuralnetlib.losses import CategoricalCrossentropy +from neuralnetlib.optimizers import Adam +from neuralnetlib.metrics import accuracy_score + +# ... Preprocess x_train, y_train, x_test, y_test if necessary (you can use neuralnetlib.preprocess and neuralnetlib.utils) + +# Create and compile a model +model = Model() +model.add(Input(28, 28, 1)) # For example, MNIST images +model.add(Conv2D(32, kernel_size=3, padding='same')) +model.add(Activation('relu')) # activation supports both str... +model.add(BatchNormalization()) +model.add(MaxPooling2D(pool_size=2)) +model.add(Dense(64, activation='relu')) +model.add(Dense(10, activation=Softmax())) # ... and ActivationFunction objects +model.compile(loss_function='categorical_crossentropy', optimizer=Adam()) + + +model.compile(loss_function='categorical_crossentropy', optimizer=Adam()) # same for loss_function and optimizer + +# Train the model +model.fit(X_train, y_train_ohe, epochs=5, metrics=[accuracy_score]) +``` + +### Regression + +```python +from neuralnetlib.losses import MeanSquaredError +from neuralnetlib.metrics import accuracy_score + +# ... Preprocess x_train, y_train, x_test, y_test if necessary (you can use neuralnetlib.preprocess and neuralnetlib.utils) + +# Create and compile a model +model = Model() +model.add(Input(13)) +model.add(Dense(64, activation='leakyrelu')) +model.add(Dense(1), activation="linear") + +model.compile(loss_function="mse", optimizer='adam') # you can either put acronyms or full name + +# Train the model +model.fit(X_train, y_train, epochs=100, batch_size=128, metrics=[accuracy_score]) +``` + +You can also save and load models: + +```python +# Save a model +model.save('my_model.json') + +# Load a model +model = Model.load('my_model.json') +``` + ## 📜 Output of the example file Here is an example of a model training on the mnist using the library diff --git a/examples/cnn-classification/simple_cnn_classification_mnist.ipynb b/examples/cnn-classification/simple_cnn_classification_mnist.ipynb index 83c840d..5047135 100644 --- a/examples/cnn-classification/simple_cnn_classification_mnist.ipynb +++ b/examples/cnn-classification/simple_cnn_classification_mnist.ipynb @@ -104,7 +104,9 @@ "outputs": [ { "data": { - "text/plain": "\"\\n Side note: if you set the following:\\n \\n - filters to 8 and 16 (in this order)\\n - padding of the Conv2D layers to 'same'\\n - weights initialization to 'he'\\n \\n you'll get an accuracy of ~0.9975 which is actually pretty cool\\n\"" + "text/plain": [ + "\"\\n Side note: if you set the following:\\n \\n - filters to 8 and 16 (in this order)\\n - padding of the Conv2D layers to 'same'\\n - weights initialization to 'he'\\n \\n you'll get an accuracy of ~0.9975 which is actually pretty cool\\n\"" + ] }, "execution_count": 4, "metadata": {}, @@ -123,8 +125,7 @@ "model.add(Flatten())\n", "model.add(Dense(64, random_state=42))\n", "model.add(Activation(ReLU()))\n", - "model.add(Dense(10, random_state=42))\n", - "model.add(Activation(Softmax()))\n", + "model.add(Dense(10, random_state=42, activation=\"softmax\")) # Yeah, you can also use strings for the activation functions, or directly the class\n", "\n", "\"\"\"\n", " Side note: if you set the following:\n", @@ -180,7 +181,7 @@ } ], "source": [ - "model.compile(loss_function=CategoricalCrossentropy(), optimizer=Adam())\n", + "model.compile(loss_function=\"cce\", optimizer=\"adam\") # You can also use strings for the loss function and the optimizer\n", "\n", "model.summary()" ] @@ -327,8 +328,10 @@ "outputs": [ { "data": { - "text/plain": "
", - "image/png": "\n" + "image/png": "", + "text/plain": [ + "
" + ] }, "metadata": {}, "output_type": "display_data" diff --git a/neuralnetlib/callbacks.py b/neuralnetlib/callbacks.py index 5119dd7..ee3d910 100644 --- a/neuralnetlib/callbacks.py +++ b/neuralnetlib/callbacks.py @@ -1,6 +1,5 @@ import numpy as np - class EarlyStopping: def __init__(self, patience: int = 5, min_delta: float = 0.001, restore_best_weights: bool = True, start_from_epoch: int = 0, monitor: list = None, mode: str = 'auto', baseline: float = None): @@ -12,44 +11,44 @@ def __init__(self, patience: int = 5, min_delta: float = 0.001, restore_best_wei self.mode = mode self.baseline = baseline self.best_weights = None - self.best_metrics = None + self.best_metric = None self.patience_counter = 0 self.epoch = 0 self.stop_training = False - def on_epoch_end(self, model, metrics): + def on_epoch_end(self, model, loss, metrics=None): self.epoch += 1 if self.epoch < self.start_from_epoch: return False - if self.best_metrics is None: + if self.best_metric is None: if self.monitor is None: - self.best_metrics = metrics - if np.any(np.isnan(metrics)): - self.mode = 'min' - else: - self.mode = 'auto' + self.best_metric = loss + self.mode = 'min' else: - metric_values = [metric(model.predictions, model.y_true) for metric in self.monitor] - self.best_metrics = [np.inf if m > 0 else -np.inf for m in metric_values] - self.mode = 'max' + if metrics is None: + raise ValueError("Metric to monitor is provided, but no metrics are available.") + metric_value = metrics[self.monitor[0].__name__] + self.best_metric = metric_value + if self.mode == 'auto': + if np.isnan(metric_value): + self.mode = 'min' + else: + self.mode = 'max' improved = False if self.monitor is None: - current_metric = metrics[-1] - best_metric = self.best_metrics[-1] - if (self.mode == 'min' and current_metric < best_metric - self.min_delta) or \ - (self.mode == 'max' and current_metric > best_metric + self.min_delta) or \ - (self.mode == 'auto' and current_metric < best_metric - self.min_delta): - self.best_metrics[-1] = current_metric + current_metric = loss + if (self.mode == 'min' and current_metric < self.best_metric - self.min_delta) or \ + (self.mode == 'max' and current_metric > self.best_metric + self.min_delta): + self.best_metric = current_metric improved = True else: - for i, metric in enumerate(metrics): - best_metric = self.best_metrics[i] - if (self.mode == 'max' and metric > best_metric + self.min_delta) or \ - (self.mode == 'min' and metric < best_metric - self.min_delta): - self.best_metrics[i] = metric - improved = True + current_metric = metrics[self.monitor[0].__name__] + if (self.mode == 'max' and current_metric > self.best_metric + self.min_delta) or \ + (self.mode == 'min' and current_metric < self.best_metric - self.min_delta): + self.best_metric = current_metric + improved = True if improved: self.patience_counter = 0 @@ -57,19 +56,17 @@ def on_epoch_end(self, model, metrics): self.best_weights = [layer.weights for layer in model.layers if hasattr(layer, 'weights')] else: self.patience_counter += 1 - - if self.baseline is not None: - if self.mode == 'max' and max(self.best_metrics) < self.baseline: - self.patience_counter = self.patience + 1 - elif self.mode == 'min' and min(self.best_metrics) > self.baseline: - self.patience_counter = self.patience + 1 + if self.baseline is not None: + if self.mode == 'max' and self.best_metric < self.baseline: + self.patience_counter = self.patience + 1 + elif self.mode == 'min' and self.best_metric > self.baseline: + self.patience_counter = self.patience + 1 if self.patience_counter >= self.patience: self.stop_training = True print(f"\nEarly stopping after {self.epoch} epochs.", end='') if self.restore_best_weights and self.best_weights is not None: - for layer, best_weights in zip([layer for layer in model.layers if hasattr(layer, 'weights')], - self.best_weights): + for layer, best_weights in zip([layer for layer in model.layers if hasattr(layer, 'weights')], self.best_weights): layer.weights = best_weights return True diff --git a/neuralnetlib/layers.py b/neuralnetlib/layers.py index 66e7dc7..94a3c7d 100644 --- a/neuralnetlib/layers.py +++ b/neuralnetlib/layers.py @@ -75,7 +75,7 @@ def from_config(config: dict): class Dense(Layer): - def __init__(self, units: int, weights_init: str = "default", bias_init: str = "default", random_state: int = None): + def __init__(self, units: int, weights_init: str = "default", bias_init: str = "default", random_state: int = None, **kwargs): self.units = units self.weights = None @@ -87,8 +87,12 @@ def __init__(self, units: int, weights_init: str = "default", bias_init: str = " self.bias_init = bias_init self.random_state = random_state + for key, value in kwargs.items(): + setattr(self, key, value) + def initialize_weights(self, input_size: int): - self.rng = np.random.default_rng(self.random_state if self.random_state is not None else int(time.time_ns())) + self.rng = np.random.default_rng( + self.random_state if self.random_state is not None else int(time.time_ns())) if self.weights_init == "xavier": stddev = np.sqrt(2 / (input_size + self.units)) self.weights = self.rng.normal(0, stddev, (input_size, self.units)) @@ -101,7 +105,8 @@ def initialize_weights(self, input_size: int): stddev = np.sqrt(1 / input_size) self.weights = self.rng.normal(0, stddev, (input_size, self.units)) else: - raise ValueError("Invalid weights_init value. Possible values are 'xavier', 'he', 'lecun' and 'default'.") + raise ValueError( + "Invalid weights_init value. Possible values are 'xavier', 'he', 'lecun' and 'default'.") if self.bias_init == "default": self.bias = np.zeros((1, self.units)) @@ -112,7 +117,8 @@ def initialize_weights(self, input_size: int): elif self.bias_init == "small": self.bias = np.full((1, self.units), 0.01) else: - raise ValueError("Invalid bias_init value. Possible values are 'normal', 'uniform', 'small' and 'default'.") + raise ValueError( + "Invalid bias_init value. Possible values are 'normal', 'uniform', 'small' and 'default'.") self.d_weights = np.zeros_like(self.weights) self.d_bias = np.zeros_like(self.bias) @@ -122,7 +128,8 @@ def __str__(self): def forward_pass(self, input_data: np.ndarray) -> np.ndarray: if self.weights is None: - assert len(input_data.shape) == 2, f"Dense input must be 2D (batch_size, features), got {input_data.shape}" + assert len( + input_data.shape) == 2, f"Dense input must be 2D (batch_size, features), got {input_data.shape}" self.initialize_weights(input_data.shape[1]) self.input = input_data @@ -148,7 +155,8 @@ def get_config(self) -> dict: @staticmethod def from_config(config: dict): - layer = Dense(config['units'], config['weights_init'], config['bias_init'], config['random_state']) + layer = Dense(config['units'], config['weights_init'], + config['bias_init'], config['random_state']) if config['weights'] is not None: layer.weights = np.array(config['weights']) layer.bias = np.array(config['bias']) @@ -184,23 +192,36 @@ def get_config(self) -> dict: @staticmethod def from_config(config: dict): - activation_function = ActivationFunction.from_config(config['activation_function']) + activation_function = ActivationFunction.from_config( + config['activation_function']) return Activation(activation_function) + @staticmethod + def from_name(name: str) -> "Activation": + name = name.lower().replace("_", "") + for subclass in ActivationFunction.__subclasses__(): + if subclass.__name__.lower() == name: + return Activation(subclass()) + raise ValueError(f"No activation function found for the name: {name}") + class Dropout(Layer): - def __init__(self, rate: float, seed: int = None): + def __init__(self, rate: float, seed: int = None, **kwargs): self.rate = rate self.mask = None self.seed = seed + for key, value in kwargs.items(): + setattr(self, key, value) + def __str__(self): return f'Dropout(rate={self.rate})' def forward_pass(self, input_data: np.ndarray, training: bool = True) -> np.ndarray: if training: rng = np.random.default_rng(self.seed) - self.mask = rng.binomial(1, 1 - self.rate, size=input_data.shape) / (1 - self.rate) + self.mask = rng.binomial( + 1, 1 - self.rate, size=input_data.shape) / (1 - self.rate) return input_data * self.mask else: return input_data @@ -222,9 +243,10 @@ def from_config(config: dict): class Conv2D(Layer): def __init__(self, filters: int, kernel_size: int | tuple, stride: int | tuple = 1, padding: str = 'valid', - weights_init: str = "default", bias_init: str = "default", random_state: int = None): + weights_init: str = "default", bias_init: str = "default", random_state: int = None, **kwargs): self.filters = filters - self.kernel_size = (kernel_size, kernel_size) if isinstance(kernel_size, int) else kernel_size + self.kernel_size = (kernel_size, kernel_size) if isinstance( + kernel_size, int) else kernel_size self.stride = (stride, stride) if isinstance(stride, int) else stride self.padding = padding @@ -237,10 +259,14 @@ def __init__(self, filters: int, kernel_size: int | tuple, stride: int | tuple = self.bias_init = bias_init self.random_state = random_state + for key, value in kwargs.items(): + setattr(self, key, value) + def initialize_weights(self, input_shape: tuple): in_channels, _, _ = input_shape - self.rng = np.random.default_rng(self.random_state if self.random_state is not None else int(time.time_ns())) + self.rng = np.random.default_rng( + self.random_state if self.random_state is not None else int(time.time_ns())) if self.weights_init == "xavier": self.weights = self.rng.normal(0, np.sqrt(2 / (np.prod(self.kernel_size) * in_channels)), (self.filters, in_channels, *self.kernel_size)) @@ -248,9 +274,11 @@ def initialize_weights(self, input_shape: tuple): self.weights = self.rng.normal(0, np.sqrt(2 / (in_channels * np.prod(self.kernel_size))), (self.filters, in_channels, *self.kernel_size)) elif self.weights_init == "default": - self.weights = self.rng.normal(0, 0.01, (self.filters, in_channels, *self.kernel_size)) + self.weights = self.rng.normal( + 0, 0.01, (self.filters, in_channels, *self.kernel_size)) else: - raise ValueError("Invalid weights_init value. Possible values are 'xavier', 'he', and 'default'.") + raise ValueError( + "Invalid weights_init value. Possible values are 'xavier', 'he', and 'default'.") if self.bias_init == "default": self.bias = np.zeros((1, self.filters)) @@ -261,7 +289,8 @@ def initialize_weights(self, input_shape: tuple): elif self.bias_init == "small": self.bias = np.full((1, self.filters), 0.01) else: - raise ValueError("Invalid bias_init value. Possible values are 'normal', 'uniform', 'small' and 'default'.") + raise ValueError( + "Invalid bias_init value. Possible values are 'normal', 'uniform', 'small' and 'default'.") self.d_weights = np.zeros_like(self.weights) self.d_bias = np.zeros_like(self.bias) @@ -276,7 +305,8 @@ def forward_pass(self, input_data: np.ndarray) -> np.ndarray: self.initialize_weights(input_data.shape[1:]) self.input = input_data - output = self._convolve(self.input, self.weights, self.bias, self.stride, self.padding) + output = self._convolve(self.input, self.weights, + self.bias, self.stride, self.padding) return output def backward_pass(self, output_error: np.ndarray) -> np.ndarray: @@ -316,19 +346,24 @@ def _convolve(input_data: np.ndarray, weights: np.ndarray, bias: np.ndarray, str assert in_channels == _ if padding == 'same': - pad_height = ((in_height - 1) * stride[0] + kernel_height - in_height) // 2 - pad_width = ((in_width - 1) * stride[1] + kernel_width - in_width) // 2 + pad_height = ((in_height - 1) * + stride[0] + kernel_height - in_height) // 2 + pad_width = ((in_width - 1) * + stride[1] + kernel_width - in_width) // 2 else: pad_height, pad_width = 0, 0 - out_height = (in_height + 2 * pad_height - kernel_height) // stride[0] + 1 + out_height = (in_height + 2 * pad_height - + kernel_height) // stride[0] + 1 out_width = (in_width + 2 * pad_width - kernel_width) // stride[1] + 1 - col = im2col_2d(input_data, kernel_height, kernel_width, stride, (pad_height, pad_width)) + col = im2col_2d(input_data, kernel_height, kernel_width, + stride, (pad_height, pad_width)) col_W = weights.reshape(out_channels, -1).T output = np.dot(col, col_W) + bias - output = output.reshape(batch_size, out_height, out_width, -1).transpose(0, 3, 1, 2) + output = output.reshape(batch_size, out_height, + out_width, -1).transpose(0, 3, 1, 2) return output @@ -340,21 +375,26 @@ def _convolve_backward(output_error: np.ndarray, input_data: np.ndarray, weights _, _, kernel_height, kernel_width = weights.shape if padding == 'same': - pad_height = ((in_height - 1) * stride[0] + kernel_height - in_height) // 2 - pad_width = ((in_width - 1) * stride[1] + kernel_width - in_width) // 2 + pad_height = ((in_height - 1) * + stride[0] + kernel_height - in_height) // 2 + pad_width = ((in_width - 1) * + stride[1] + kernel_width - in_width) // 2 else: pad_height, pad_width = 0, 0 - col = im2col_2d(input_data, kernel_height, kernel_width, stride, (pad_height, pad_width)) + col = im2col_2d(input_data, kernel_height, kernel_width, + stride, (pad_height, pad_width)) col_W = weights.reshape(out_channels, -1).T - d_output = output_error.transpose(0, 2, 3, 1).reshape(batch_size * out_height * out_width, -1) + d_output = output_error.transpose(0, 2, 3, 1).reshape( + batch_size * out_height * out_width, -1) d_bias = np.sum(d_output, axis=0) d_weights = np.dot(col.T, d_output) d_weights = d_weights.transpose(1, 0).reshape(weights.shape) d_col = np.dot(d_output, col_W.T) - d_input = col2im_2d(d_col, input_data.shape, kernel_height, kernel_width, stride, (pad_height, pad_width)) + d_input = col2im_2d(d_col, input_data.shape, kernel_height, + kernel_width, stride, (pad_height, pad_width)) return d_input, d_weights, d_bias @@ -375,11 +415,13 @@ def forward_pass(self, input_data: np.ndarray) -> np.ndarray: assert len( input_data.shape) == 4, f"MaxPooling2D input must be 4D (batch_size, channels, height, width), got {input_data.shape}" self.input = input_data - output = self._pool(self.input, self.pool_size, self.stride, self.padding) + output = self._pool(self.input, self.pool_size, + self.stride, self.padding) return output def backward_pass(self, output_error: np.ndarray) -> np.ndarray: - input_error = self._pool_backward(output_error, self.input, self.pool_size, self.stride, self.padding) + input_error = self._pool_backward( + output_error, self.input, self.pool_size, self.stride, self.padding) return input_error def get_config(self) -> dict: @@ -399,15 +441,18 @@ def _pool(input_data: np.ndarray, pool_size: tuple, stride: tuple, padding: str) batch_size, channels, in_height, in_width = input_data.shape if padding == 'same': - pad_height = ((in_height - 1) * stride[0] + pool_size[0] - in_height) // 2 - pad_width = ((in_width - 1) * stride[1] + pool_size[1] - in_width) // 2 + pad_height = ((in_height - 1) * + stride[0] + pool_size[0] - in_height) // 2 + pad_width = ((in_width - 1) * + stride[1] + pool_size[1] - in_width) // 2 else: pad_height, pad_width = 0, 0 padded_input = np.pad(input_data, ((0, 0), (0, 0), (pad_height, pad_height), (pad_width, pad_width)), mode='constant') - out_height = (in_height + 2 * pad_height - pool_size[0]) // stride[0] + 1 + out_height = (in_height + 2 * pad_height - + pool_size[0]) // stride[0] + 1 out_width = (in_width + 2 * pad_width - pool_size[1]) // stride[1] + 1 output = np.zeros((batch_size, channels, out_height, out_width)) @@ -415,7 +460,7 @@ def _pool(input_data: np.ndarray, pool_size: tuple, stride: tuple, padding: str) for i in range(out_height): for j in range(out_width): input_slice = padded_input[:, :, i * stride[0]:i * stride[0] + pool_size[0], - j * stride[1]:j * stride[1] + pool_size[1]] + j * stride[1]:j * stride[1] + pool_size[1]] output[:, :, i, j] = np.max(input_slice, axis=(2, 3)) return output @@ -427,8 +472,10 @@ def _pool_backward(output_error: np.ndarray, input_data: np.ndarray, pool_size: _, _, out_height, out_width = output_error.shape if padding == 'same': - pad_height = ((in_height - 1) * stride[0] + pool_size[0] - in_height) // 2 - pad_width = ((in_width - 1) * stride[1] + pool_size[1] - in_width) // 2 + pad_height = ((in_height - 1) * + stride[0] + pool_size[0] - in_height) // 2 + pad_width = ((in_width - 1) * + stride[1] + pool_size[1] - in_width) // 2 else: pad_height, pad_width = 0, 0 @@ -440,14 +487,16 @@ def _pool_backward(output_error: np.ndarray, input_data: np.ndarray, pool_size: for i in range(out_height): for j in range(out_width): input_slice = padded_input[:, :, i * stride[0]:i * stride[0] + pool_size[0], - j * stride[1]:j * stride[1] + pool_size[1]] - mask = (input_slice == np.max(input_slice, axis=(2, 3), keepdims=True)) + j * stride[1]:j * stride[1] + pool_size[1]] + mask = (input_slice == np.max( + input_slice, axis=(2, 3), keepdims=True)) d_input[:, :, i * stride[0]:i * stride[0] + pool_size[0], - j * stride[1]:j * stride[1] + pool_size[1]] += output_error[:, :, i, j][:, :, np.newaxis, - np.newaxis] * mask + j * stride[1]:j * stride[1] + pool_size[1]] += output_error[:, :, i, j][:, :, np.newaxis, + np.newaxis] * mask if padding == 'same': - d_input = d_input[:, :, pad_height:-pad_height, pad_width:-pad_width] + d_input = d_input[:, :, pad_height:- + pad_height, pad_width:-pad_width] return d_input @@ -457,7 +506,8 @@ def __str__(self): return 'Flatten' def forward_pass(self, input_data: np.ndarray) -> np.ndarray: - assert len(input_data.shape) >= 2, f"Flatten input must be at least 2D, got {input_data.shape}" + assert len( + input_data.shape) >= 2, f"Flatten input must be at least 2D, got {input_data.shape}" self.input_shape = input_data.shape return input_data.reshape(input_data.shape[0], -1) @@ -474,7 +524,7 @@ def from_config(config: dict): class Conv1D(Layer): def __init__(self, filters: int, kernel_size: int, stride: int = 1, padding: str = 'valid', - weights_init: str = "default", bias_init: str = "default", random_state: int = None): + weights_init: str = "default", bias_init: str = "default", random_state: int = None, **kwargs): self.filters = filters self.kernel_size = kernel_size self.stride = stride @@ -489,10 +539,14 @@ def __init__(self, filters: int, kernel_size: int, stride: int = 1, padding: str self.bias_init = bias_init self.random_state = random_state + for key, value in kwargs.items(): + setattr(self, key, value) + def initialize_weights(self, input_shape: tuple): in_channels = input_shape[0] - self.rng = np.random.default_rng(self.random_state if self.random_state is not None else int(time.time_ns())) + self.rng = np.random.default_rng( + self.random_state if self.random_state is not None else int(time.time_ns())) if self.weights_init == "xavier": self.weights = self.rng.normal(0, np.sqrt(2 / (self.kernel_size * in_channels)), (self.filters, in_channels, self.kernel_size)) @@ -500,9 +554,11 @@ def initialize_weights(self, input_shape: tuple): self.weights = self.rng.normal(0, np.sqrt(2 / (in_channels * self.kernel_size)), (self.filters, in_channels, self.kernel_size)) elif self.weights_init == "default": - self.weights = self.rng.normal(0, 0.01, (self.filters, in_channels, self.kernel_size)) + self.weights = self.rng.normal( + 0, 0.01, (self.filters, in_channels, self.kernel_size)) else: - raise ValueError("Invalid weights_init value. Possible values are 'xavier', 'he', and 'default'.") + raise ValueError( + "Invalid weights_init value. Possible values are 'xavier', 'he', and 'default'.") if self.bias_init == "default": self.bias = np.zeros((1, self.filters)) @@ -513,7 +569,8 @@ def initialize_weights(self, input_shape: tuple): elif self.bias_init == "small": self.bias = np.full((1, self.filters), 0.01) else: - raise ValueError("Invalid bias_init value. Possible values are 'normal', 'uniform', 'small' and 'default'.") + raise ValueError( + "Invalid bias_init value. Possible values are 'normal', 'uniform', 'small' and 'default'.") self.d_weights = np.zeros_like(self.weights) self.d_bias = np.zeros_like(self.bias) @@ -528,7 +585,8 @@ def forward_pass(self, input_data: np.ndarray) -> np.ndarray: self.initialize_weights(input_data.shape[1:]) self.input = input_data - output = self._convolve(self.input, self.weights, self.bias, self.stride, self.padding) + output = self._convolve(self.input, self.weights, + self.bias, self.stride, self.padding) return output def backward_pass(self, output_error: np.ndarray) -> np.ndarray: @@ -568,7 +626,8 @@ def _convolve(input_data: np.ndarray, weights: np.ndarray, bias: np.ndarray, str assert in_channels == _ if padding == 'same': - pad_length = ((in_length - 1) * stride + kernel_length - in_length) // 2 + pad_length = ((in_length - 1) * stride + + kernel_length - in_length) // 2 else: pad_length = 0 @@ -590,20 +649,23 @@ def _convolve_backward(output_error: np.ndarray, input_data: np.ndarray, weights _, _, kernel_length = weights.shape if padding == 'same': - pad_length = ((in_length - 1) * stride + kernel_length - in_length) // 2 + pad_length = ((in_length - 1) * stride + + kernel_length - in_length) // 2 else: pad_length = 0 col = im2col_1d(input_data, kernel_length, stride, pad_length) col_W = weights.reshape(out_channels, -1).T - d_output = output_error.transpose(0, 2, 1).reshape(batch_size * out_length, -1) + d_output = output_error.transpose( + 0, 2, 1).reshape(batch_size * out_length, -1) d_bias = np.sum(d_output, axis=0) d_weights = np.dot(col.T, d_output) d_weights = d_weights.transpose(1, 0).reshape(weights.shape) d_col = np.dot(d_output, col_W.T) - d_input = col2im_1d(d_col, input_data.shape, kernel_length, stride, pad_length) + d_input = col2im_1d(d_col, input_data.shape, + kernel_length, stride, pad_length) return d_input, d_weights, d_bias @@ -621,11 +683,13 @@ def forward_pass(self, input_data: np.ndarray) -> np.ndarray: assert len( input_data.shape) == 3, f"MaxPooling1D input must be 3D (batch_size, steps, features), got {input_data.shape}" self.input = input_data - output = self._pool(self.input, self.pool_size, self.stride, self.padding) + output = self._pool(self.input, self.pool_size, + self.stride, self.padding) return output def backward_pass(self, output_error: np.ndarray) -> np.ndarray: - input_error = self._pool_backward(output_error, self.input, self.pool_size, self.stride, self.padding) + input_error = self._pool_backward( + output_error, self.input, self.pool_size, self.stride, self.padding) return input_error def get_config(self) -> dict: @@ -645,11 +709,13 @@ def _pool(input_data: np.ndarray, pool_size: int, stride: int, padding: str) -> batch_size, channels, in_length = input_data.shape if padding == 'same': - pad_length = ((in_length - 1) * stride + pool_size - in_length) // 2 + pad_length = ((in_length - 1) * stride + + pool_size - in_length) // 2 else: pad_length = 0 - padded_input = np.pad(input_data, ((0, 0), (0, 0), (pad_length, pad_length)), mode='constant') + padded_input = np.pad( + input_data, ((0, 0), (0, 0), (pad_length, pad_length)), mode='constant') out_length = (in_length + 2 * pad_length - pool_size) // stride + 1 @@ -668,18 +734,21 @@ def _pool_backward(output_error: np.ndarray, input_data: np.ndarray, pool_size: _, _, out_length = output_error.shape if padding == 'same': - pad_length = ((in_length - 1) * stride + pool_size - in_length) // 2 + pad_length = ((in_length - 1) * stride + + pool_size - in_length) // 2 else: pad_length = 0 - padded_input = np.pad(input_data, ((0, 0), (0, 0), (pad_length, pad_length)), mode='constant') + padded_input = np.pad( + input_data, ((0, 0), (0, 0), (pad_length, pad_length)), mode='constant') d_input = np.zeros_like(padded_input) for i in range(out_length): input_slice = padded_input[:, :, i * stride:i * stride + pool_size] mask = (input_slice == np.max(input_slice, axis=2, keepdims=True)) - d_input[:, :, i * stride:i * stride + pool_size] += output_error[:, :, i][:, :, np.newaxis] * mask + d_input[:, :, i * stride:i * stride + + pool_size] += output_error[:, :, i][:, :, np.newaxis] * mask if padding == 'same': d_input = d_input[:, :, pad_length:-pad_length] @@ -698,16 +767,20 @@ def __init__(self, input_dim: int, output_dim: int, input_length: int = None, we self.random_state = random_state def initialize_weights(self): - self.rng = np.random.default_rng(self.random_state if self.random_state is not None else int(time.time_ns())) + self.rng = np.random.default_rng( + self.random_state if self.random_state is not None else int(time.time_ns())) if self.weights_init == "xavier": self.weights = self.rng.normal(0, np.sqrt(2 / (self.input_dim + self.output_dim)), (self.input_dim, self.output_dim)) elif self.weights_init == "he": - self.weights = self.rng.normal(0, np.sqrt(2 / self.input_dim), (self.input_dim, self.output_dim)) + self.weights = self.rng.normal(0, np.sqrt( + 2 / self.input_dim), (self.input_dim, self.output_dim)) elif self.weights_init == "default": - self.weights = self.rng.normal(0, 0.01, (self.input_dim, self.output_dim)) + self.weights = self.rng.normal( + 0, 0.01, (self.input_dim, self.output_dim)) else: - raise ValueError("Invalid weights_init value. Possible values are 'xavier', 'he', and 'default'.") + raise ValueError( + "Invalid weights_init value. Possible values are 'xavier', 'he', and 'default'.") def __str__(self): return f'Embedding(input_dim={self.input_dim}, output_dim={self.output_dim}, input_length={self.input_length})' @@ -723,10 +796,13 @@ def forward_pass(self, input_data: np.ndarray) -> np.ndarray: return output def backward_pass(self, output_error: np.ndarray) -> np.ndarray: - input_error = np.zeros((self.input.shape[0], self.input.shape[1], self.input_dim)) - output_error = output_error.reshape(output_error.shape[0], output_error.shape[1], -1) + input_error = np.zeros( + (self.input.shape[0], self.input.shape[1], self.input_dim)) + output_error = output_error.reshape( + output_error.shape[0], output_error.shape[1], -1) for i, index in enumerate(self.input): - input_error[i, np.arange(index.shape[0]), index] = np.sum(output_error[i], axis=1) + input_error[i, np.arange(index.shape[0]), index] = np.sum( + output_error[i], axis=1) return input_error def get_config(self) -> dict: @@ -750,7 +826,7 @@ def from_config(config: dict): class BatchNormalization(Layer): - def __init__(self, momentum: float = 0.99, epsilon: float = 1e-8): + def __init__(self, momentum: float = 0.99, epsilon: float = 1e-8, **kwargs): self.gamma = None self.beta = None self.d_gamma = None @@ -760,6 +836,9 @@ def __init__(self, momentum: float = 0.99, epsilon: float = 1e-8): self.running_mean = None self.running_var = None + for key, value in kwargs.items(): + setattr(self, key, value) + def initialize_weights(self, input_shape: tuple): self.gamma = np.ones(input_shape) self.beta = np.zeros(input_shape) @@ -778,14 +857,17 @@ def forward_pass(self, input_data: np.ndarray, training: bool = True) -> np.ndar if training: mean = np.mean(input_data, axis=0) var = np.var(input_data, axis=0) - self.running_mean = self.momentum * self.running_mean + (1 - self.momentum) * mean - self.running_var = self.momentum * self.running_var + (1 - self.momentum) * var + self.running_mean = self.momentum * \ + self.running_mean + (1 - self.momentum) * mean + self.running_var = self.momentum * \ + self.running_var + (1 - self.momentum) * var else: mean = self.running_mean var = self.running_var self.input_centered = input_data - mean - self.input_normalized = self.input_centered / np.sqrt(var + self.epsilon) + self.input_normalized = self.input_centered / \ + np.sqrt(var + self.epsilon) return self.gamma * self.input_normalized + self.beta def backward_pass(self, output_error: np.ndarray) -> np.ndarray: @@ -795,7 +877,7 @@ def backward_pass(self, output_error: np.ndarray) -> np.ndarray: d_input_normalized = output_error * self.gamma d_var = np.sum(d_input_normalized * self.input_centered, axis=0) * -0.5 * ( - self.input_centered / (self.input_centered.var(axis=0) + self.epsilon) ** 1.5) + self.input_centered / (self.input_centered.var(axis=0) + self.epsilon) ** 1.5) d_mean = np.sum(d_input_normalized, axis=0) * -1 / np.sqrt( self.input_centered.var(axis=0) + self.epsilon) - 2 * d_var * np.mean(self.input_centered, axis=0) d_input = d_input_normalized / np.sqrt( @@ -836,11 +918,13 @@ def forward_pass(self, input_data: np.ndarray) -> np.ndarray: assert len( input_data.shape) == 4, f"AveragePooling2D input must be 4D (batch_size, channels, height, width), got {input_data.shape}" self.input = input_data - output = self._pool(self.input, self.pool_size, self.stride, self.padding) + output = self._pool(self.input, self.pool_size, + self.stride, self.padding) return output def backward_pass(self, output_error: np.ndarray) -> np.ndarray: - input_error = self._pool_backward(output_error, self.input, self.pool_size, self.stride, self.padding) + input_error = self._pool_backward( + output_error, self.input, self.pool_size, self.stride, self.padding) return input_error def get_config(self) -> dict: @@ -860,15 +944,18 @@ def _pool(input_data: np.ndarray, pool_size: tuple, stride: tuple, padding: str) batch_size, channels, in_height, in_width = input_data.shape if padding == 'same': - pad_height = ((in_height - 1) * stride[0] + pool_size[0] - in_height) // 2 - pad_width = ((in_width - 1) * stride[1] + pool_size[1] - in_width) // 2 + pad_height = ((in_height - 1) * + stride[0] + pool_size[0] - in_height) // 2 + pad_width = ((in_width - 1) * + stride[1] + pool_size[1] - in_width) // 2 else: pad_height, pad_width = 0, 0 padded_input = np.pad(input_data, ((0, 0), (0, 0), (pad_height, pad_height), (pad_width, pad_width)), mode='constant') - out_height = (in_height + 2 * pad_height - pool_size[0]) // stride[0] + 1 + out_height = (in_height + 2 * pad_height - + pool_size[0]) // stride[0] + 1 out_width = (in_width + 2 * pad_width - pool_size[1]) // stride[1] + 1 output = np.zeros((batch_size, channels, out_height, out_width)) @@ -876,7 +963,7 @@ def _pool(input_data: np.ndarray, pool_size: tuple, stride: tuple, padding: str) for i in range(out_height): for j in range(out_width): input_slice = padded_input[:, :, i * stride[0]:i * stride[0] + pool_size[0], - j * stride[1]:j * stride[1] + pool_size[1]] + j * stride[1]:j * stride[1] + pool_size[1]] output[:, :, i, j] = np.mean(input_slice, axis=(2, 3)) return output @@ -888,8 +975,10 @@ def _pool_backward(output_error: np.ndarray, input_data: np.ndarray, pool_size: _, _, out_height, out_width = output_error.shape if padding == 'same': - pad_height = ((in_height - 1) * stride[0] + pool_size[0] - in_height) // 2 - pad_width = ((in_width - 1) * stride[1] + pool_size[1] - in_width) // 2 + pad_height = ((in_height - 1) * + stride[0] + pool_size[0] - in_height) // 2 + pad_width = ((in_width - 1) * + stride[1] + pool_size[1] - in_width) // 2 else: pad_height, pad_width = 0, 0 @@ -901,11 +990,12 @@ def _pool_backward(output_error: np.ndarray, input_data: np.ndarray, pool_size: for i in range(out_height): for j in range(out_width): d_input[:, :, i * stride[0]:i * stride[0] + pool_size[0], - j * stride[1]:j * stride[1] + pool_size[1]] += output_error[:, :, i, j][:, :, np.newaxis, - np.newaxis] / np.prod(pool_size) + j * stride[1]:j * stride[1] + pool_size[1]] += output_error[:, :, i, j][:, :, np.newaxis, + np.newaxis] / np.prod(pool_size) if padding == 'same': - d_input = d_input[:, :, pad_height:-pad_height, pad_width:-pad_width] + d_input = d_input[:, :, pad_height:- + pad_height, pad_width:-pad_width] return d_input @@ -923,11 +1013,13 @@ def forward_pass(self, input_data: np.ndarray) -> np.ndarray: assert len( input_data.shape) == 3, f"AveragePooling1D input must be 3D (batch_size, steps, features), got {input_data.shape}" self.input = input_data - output = self._pool(self.input, self.pool_size, self.stride, self.padding) + output = self._pool(self.input, self.pool_size, + self.stride, self.padding) return output def backward_pass(self, output_error: np.ndarray) -> np.ndarray: - input_error = self._pool_backward(output_error, self.input, self.pool_size, self.stride, self.padding) + input_error = self._pool_backward( + output_error, self.input, self.pool_size, self.stride, self.padding) return input_error def get_config(self) -> dict: @@ -951,7 +1043,8 @@ def _pool(input_data: np.ndarray, pool_size: int, stride: int, padding: str) -> else: pad_steps = 0 - padded_input = np.pad(input_data, ((0, 0), (pad_steps, pad_steps), (0, 0)), mode='constant') + padded_input = np.pad( + input_data, ((0, 0), (pad_steps, pad_steps), (0, 0)), mode='constant') out_steps = (steps + 2 * pad_steps - pool_size) // stride + 1 @@ -974,12 +1067,14 @@ def _pool_backward(output_error: np.ndarray, input_data: np.ndarray, pool_size: else: pad_steps = 0 - padded_input = np.pad(input_data, ((0, 0), (pad_steps, pad_steps), (0, 0)), mode='constant') + padded_input = np.pad( + input_data, ((0, 0), (pad_steps, pad_steps), (0, 0)), mode='constant') d_input = np.zeros_like(padded_input) for i in range(out_steps): - d_input[:, i * stride:i * stride + pool_size, :] += output_error[:, i, :][:, np.newaxis, :] / pool_size + d_input[:, i * stride:i * stride + pool_size, + :] += output_error[:, i, :][:, np.newaxis, :] / pool_size if padding == 'same': d_input = d_input[:, pad_steps:-pad_steps, :] @@ -987,22 +1082,52 @@ def _pool_backward(output_error: np.ndarray, input_data: np.ndarray, pool_size: return d_input +class Permute(Layer): + def __init__(self, dims): + self.dims = dims + + def __str__(self): + return f'Permute(dims={self.dims})' + + def forward_pass(self, input_data: np.ndarray) -> np.ndarray: + self.input = input_data + permutation = [0] + [dim - 1 for dim in self.dims] + self.output = np.transpose(self.input, permutation) + return self.output + + def backward_pass(self, output_error: np.ndarray) -> np.ndarray: + input_error = np.transpose(output_error, np.argsort( + [0] + [dim - 1 for dim in self.dims])) + return input_error + + def get_config(self) -> dict: + config = {'name': self.__class__.__name__, 'dims': self.dims} + config.update({key: getattr(self, key) + for key in self.__dict__ if key not in ['dims']}) + return config + + @staticmethod + def from_config(config: dict): + return Permute(config['dims'], **{key: value for key, value in config.items() if key != 'name' and key != 'dims'}) + + # -------------------------------------------------------------------------------------------------------------- compatibility_dict = { - Input: [Dense, Conv2D, Conv1D, Embedding], - Dense: [Dense, Activation, Dropout, BatchNormalization], - Activation: [Dense, Conv2D, Conv1D, MaxPooling2D, AveragePooling2D, MaxPooling1D, AveragePooling1D, Flatten, - Dropout], - Conv2D: [Conv2D, MaxPooling2D, AveragePooling2D, Activation, Dropout, Flatten, BatchNormalization], - MaxPooling2D: [Conv2D, MaxPooling2D, AveragePooling2D, Flatten], - AveragePooling2D: [Conv2D, MaxPooling2D, AveragePooling2D, Flatten], - Conv1D: [Conv1D, MaxPooling1D, AveragePooling1D, Activation, Dropout, Flatten, BatchNormalization], - MaxPooling1D: [Conv1D, MaxPooling1D, AveragePooling1D, Flatten], - AveragePooling1D: [Conv1D, MaxPooling1D, AveragePooling1D, Flatten], - Flatten: [Dense, Dropout], - Dropout: [Dense, Conv2D, Conv1D, Activation], - Embedding: [Conv1D, Flatten, Dense], - BatchNormalization: [Dense, Conv2D, Conv1D, Activation] + Input: [Dense, Conv2D, Conv1D, Embedding, Permute], + Dense: [Dense, Activation, Dropout, BatchNormalization, Permute], + Activation: [Dense, Conv2D, Conv1D, MaxPooling2D, AveragePooling2D, MaxPooling1D, AveragePooling1D, Flatten, Dropout, Permute], + Conv2D: [Conv2D, MaxPooling2D, AveragePooling2D, Activation, Dropout, Flatten, BatchNormalization, Permute], + MaxPooling2D: [Conv2D, MaxPooling2D, AveragePooling2D, Flatten, Permute], + AveragePooling2D: [Conv2D, MaxPooling2D, AveragePooling2D, Flatten, Permute], + Conv1D: [Conv1D, MaxPooling1D, AveragePooling1D, Activation, Dropout, Flatten, BatchNormalization, Permute], + MaxPooling1D: [Conv1D, MaxPooling1D, AveragePooling1D, Flatten, Permute], + AveragePooling1D: [Conv1D, MaxPooling1D, AveragePooling1D, Flatten, Permute], + Flatten: [Dense, Dropout, Permute], + Dropout: [Dense, Conv2D, Conv1D, Activation, Permute], + Embedding: [Conv1D, Flatten, Dense, Permute], + BatchNormalization: [Dense, Conv2D, Conv1D, Activation, Permute], + Permute: [Dense, Conv2D, Conv1D, Activation, + Dropout, Flatten, BatchNormalization, Permute] } diff --git a/neuralnetlib/losses.py b/neuralnetlib/losses.py index 534919e..c382576 100644 --- a/neuralnetlib/losses.py +++ b/neuralnetlib/losses.py @@ -27,15 +27,35 @@ def from_config(config: dict) -> 'LossFunction': return HuberLoss(config['delta']) else: raise ValueError(f'Unknown loss function: {config["name"]}') + + @staticmethod + def from_name(name: str) -> "LossFunction": + name = name.lower().replace("_", "") + if name == "mse": + return MeanSquaredError() + elif name == "bce": + return BinaryCrossentropy() + elif name == "cce": + return CategoricalCrossentropy() + elif name == "mae": + return MeanAbsoluteError() + elif name.startswith("huber"): + delta = float(name.split("_")[-1]) + return HuberLoss(delta) + else: + for subclass in LossFunction.__subclasses__(): + if subclass.__name__.lower() == name: + return subclass() + + raise ValueError(f"No loss function found for the name: {name}") class MeanSquaredError(LossFunction): def __call__(self, y_true: np.ndarray, y_pred: np.ndarray) -> float: - return np.mean(np.power(y_true - y_pred, 2)) + return np.mean(np.square(y_true - y_pred)) def derivative(self, y_true: np.ndarray, y_pred: np.ndarray) -> np.ndarray: - y_true_reshaped = y_true.reshape(-1, 1) - return 2 * (y_pred - y_true_reshaped) / y_true.shape[0] + return 2 * (y_pred - y_true) / y_true.shape[0] def __str__(self): return "MeanSquaredError" @@ -48,7 +68,7 @@ def __call__(self, y_true: np.ndarray, y_pred: np.ndarray) -> float: def derivative(self, y_true: np.ndarray, y_pred: np.ndarray) -> np.ndarray: y_pred = np.clip(y_pred, LossFunction.EPSILON, 1 - LossFunction.EPSILON) - return y_pred - y_true + return (y_pred - y_true) / (y_pred * (1 - y_pred)) def __str__(self): return "BinaryCrossentropy" @@ -62,10 +82,10 @@ def __call__(self, y_true: np.ndarray, y_pred: np.ndarray) -> float: def derivative(self, y_true: np.ndarray, y_pred: np.ndarray) -> np.ndarray: try: y_pred = np.clip(y_pred, LossFunction.EPSILON, 1 - LossFunction.EPSILON) - return (y_pred - y_true) / y_true.shape[0] + return -y_true / y_pred except Exception as e: print(e, "Make sure to one-hot encode your labels.", sep="\n") - + def __str__(self): return "CategoricalCrossentropy" diff --git a/neuralnetlib/model.py b/neuralnetlib/model.py index dafbfce..72031b3 100644 --- a/neuralnetlib/model.py +++ b/neuralnetlib/model.py @@ -3,6 +3,7 @@ import numpy as np +from neuralnetlib.activations import ActivationFunction from neuralnetlib.layers import Layer, Input, Activation, Dropout, compatibility_dict from neuralnetlib.losses import LossFunction, CategoricalCrossentropy from neuralnetlib.metrics import accuracy_score @@ -39,13 +40,29 @@ def add(self, layer: Layer): else: previous_layer = self.layers[-1] if type(layer) not in compatibility_dict[type(previous_layer)]: - raise ValueError(f"{type(layer).__name__} layer cannot follow {type(previous_layer).__name__} layer.") + raise ValueError( + f"{type(layer).__name__} layer cannot follow {type(previous_layer).__name__} layer.") self.layers.append(layer) - def compile(self, loss_function: LossFunction, optimizer: Optimizer, verbose: bool = False): - self.loss_function = loss_function - self.optimizer = optimizer + activation_attr = getattr(layer, 'activation', getattr( + layer, 'activation_function', None)) + if activation_attr and not isinstance(layer, Activation): + if isinstance(activation_attr, str): + activation = Activation.from_name(activation_attr) + elif isinstance(activation_attr, ActivationFunction): + activation = Activation(activation_attr) + elif isinstance(activation_attr, Activation): + activation = activation_attr + else: + raise ValueError( + f"Invalid activation function: {activation_attr}") + self.layers.append(activation) + + def compile(self, loss_function: LossFunction | str, optimizer: Optimizer | str, verbose: bool = False): + self.loss_function = loss_function if isinstance(loss_function, LossFunction) else LossFunction.from_name( + loss_function) + self.optimizer = optimizer if isinstance(optimizer, Optimizer) else Optimizer.from_name(optimizer) if verbose: print(str(self)) @@ -71,7 +88,8 @@ def backward_pass(self, error: np.ndarray): self.optimizer.update(len(self.layers) - 1 - i, layer.weights, layer.d_weights, layer.bias, layer.d_bias) elif hasattr(layer, 'd_weights'): - self.optimizer.update(len(self.layers) - 1 - i, layer.weights, layer.d_weights) + self.optimizer.update( + len(self.layers) - 1 - i, layer.weights, layer.d_weights) def train_on_batch(self, x_batch: np.ndarray, y_batch: np.ndarray) -> float: self.y_true = y_batch @@ -98,46 +116,35 @@ def fit(self, x_train: np.ndarray, y_train: np.ndarray, epochs: int, batch_size: epochs: Number of epochs to train the model batch_size: Number of samples per gradient update verbose: Whether to print training progress - metrics: List of metrics to evaluate the model (functions from neuralnetlib.metrics module) + metrics: List of metric functions to evaluate the model random_state: Random seed for shuffling the data validation_data: Tuple of validation data and labels callbacks: List of callback objects (e.g., EarlyStopping) """ - x_train = np.array(x_train) - y_train = np.array(y_train) - + x_train = np.array(x_train) if not isinstance( + x_train, np.ndarray) else x_train + y_train = np.array(y_train) if not isinstance( + y_train, np.ndarray) else y_train + if validation_data is not None: x_test, y_test = validation_data x_test = np.array(x_test) y_test = np.array(y_test) - - if callbacks: - callback_metrics = set() - for callback in callbacks: - if hasattr(callback, 'monitor') and callback.monitor is not None: - callback_metrics.update(callback.monitor) - - if metrics is None: - metrics = list(callback_metrics) - else: - metrics = set(metrics) - missing_metrics = callback_metrics - metrics - if missing_metrics: - raise ValueError(f"The following metrics to monitor provided in callbacks are not provided in the fit method: {', '.join(str(metric) for metric in missing_metrics)}") - for i in range(epochs): start_time = time.time() # Shuffling the data to avoid overfitting - x_train_shuffled, y_train_shuffled = shuffle(x_train, y_train, random_state=random_state) + x_train_shuffled, y_train_shuffled = shuffle( + x_train, y_train, random_state=random_state) error = 0 predictions_list = [] y_true_list = [] if batch_size is not None: - num_batches = np.ceil(x_train.shape[0] / batch_size).astype(int) + num_batches = np.ceil( + x_train.shape[0] / batch_size).astype(int) for j in range(0, x_train.shape[0], batch_size): x_batch = x_train_shuffled[j:j + batch_size] y_batch = y_train_shuffled[j:j + batch_size] @@ -153,10 +160,11 @@ def fit(self, x_train: np.ndarray, y_train: np.ndarray, epochs: int, batch_size: metrics_str = '' if metrics is not None: for metric in metrics: - metric_value = metric(np.vstack(predictions_list), np.vstack(y_true_list)) + metric_value = metric( + np.vstack(predictions_list), np.vstack(y_true_list)) metrics_str += f'{metric.__name__}: {metric_value:.4f} - ' progress_bar(j / batch_size + 1, num_batches, - message=f'Epoch {i + 1}/{epochs} - loss: {error / (j / batch_size + 1):.4f} - {metrics_str[:-3]} - {time.time() - start_time:.2f}s') + message=f'Epoch {i + 1}/{epochs} - loss: {error / (j / batch_size + 1):.4f} - {metrics_str[:-3]} - {time.time() - start_time:.2f}s') error /= num_batches else: @@ -168,32 +176,43 @@ def fit(self, x_train: np.ndarray, y_train: np.ndarray, epochs: int, batch_size: metrics_str = '' if metrics is not None: for metric in metrics: - metric_value = metric(np.vstack(predictions_list), np.vstack(y_true_list)) + metric_value = metric( + np.vstack(predictions_list), np.vstack(y_true_list)) metrics_str += f'{metric.__name__}: {metric_value:.4f} - ' progress_bar(1, 1, - message=f'Epoch {i + 1}/{epochs} - loss: {error:.4f} - {metrics_str[:-3]} - {time.time() - start_time:.2f}s') + message=f'Epoch {i + 1}/{epochs} - loss: {error:.4f} - {metrics_str[:-3]} - {time.time() - start_time:.2f}s') if validation_data is not None: x_test, y_test = validation_data val_predictions = self.predict(x_test) - val_accuracy = accuracy_score(val_predictions, y_test) + val_metrics = [] + if metrics is not None: + for metric in metrics: + # Change extend to append + val_metrics.append(metric(val_predictions, y_test)) if verbose: - print(f' - val_accuracy: {val_accuracy:.4f}', end='') + val_metrics_str = ' - '.join( + f'{metric.__name__}: {val_metric:.4f}' for metric, val_metric in zip(metrics, val_metrics)) + print(f' - {val_metrics_str}', end='') if callbacks: - metrics_values = [] + metrics_values = {} if metrics is not None: - metrics_values.extend( - [metric(np.vstack(predictions_list), np.vstack(y_true_list)) for metric in metrics]) - else: - # If no metrics are provided, use the loss value by default - metrics_values.append(error) + for metric in metrics: + metrics_values[metric.__name__] = metric( + np.vstack(predictions_list), np.vstack(y_true_list)) + + callback_monitor_metrics = set(cb.monitor[0].__name__ for cb in callbacks if hasattr(cb, 'monitor') and cb.monitor is not None) + missing_metrics = callback_monitor_metrics.difference(metrics_values.keys()) + if missing_metrics: + raise ValueError(f"The following metrics weren't (and must be) included in the fit() method: {', '.join(missing_metrics)}") + for callback in callbacks: if callback.stop_training: break - if callback.on_epoch_end(self, metrics_values): + if callback.on_epoch_end(self, error, metrics_values): break - + if any(callback.stop_training for callback in callbacks): break @@ -229,8 +248,10 @@ def load(filename: str) -> 'Model': model_state = json.load(f) model = Model() - model.layers = [Layer.from_config(layer_config) for layer_config in model_state['layers']] - model.loss_function = LossFunction.from_config(model_state['loss_function']) + model.layers = [Layer.from_config(layer_config) + for layer_config in model_state['layers']] + model.loss_function = LossFunction.from_config( + model_state['loss_function']) model.optimizer = Optimizer.from_config(model_state['optimizer']) return model diff --git a/neuralnetlib/optimizers.py b/neuralnetlib/optimizers.py index 472456c..62c1e06 100644 --- a/neuralnetlib/optimizers.py +++ b/neuralnetlib/optimizers.py @@ -26,6 +26,16 @@ def from_config(config: dict): return Adam.from_config(config) else: raise ValueError(f"Unknown optimizer name: {config['name']}") + + @staticmethod + def from_name(name: str) -> "Optimizer": + name = name.lower().replace("_", "") + + for subclass in Optimizer.__subclasses__(): + if subclass.__name__.lower() == name: + return subclass() + + raise ValueError(f"No optimizer found for the name: {name}") class SGD(Optimizer): diff --git a/setup.py b/setup.py index 627326d..5ef0e0a 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name='neuralnetlib', - version='2.4.2', + version='2.5.0', author='Marc Pinet', description='A simple convolutional neural network library with only numpy as dependency', long_description=open('README.md', encoding="utf-8").read(),