This repository has been archived by the owner on Oct 14, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathinstrument_classifier.py
339 lines (286 loc) · 11.9 KB
/
instrument_classifier.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
"""Train a multi-layer network in Theano to classify monophonic instruments.
Contact: <[email protected]>
This script will train a "deep" network to classify monophonic instrument
feature vectors. For this script to execute, you will need to download the
provided dataset:
https://www.dropbox.com/s/b2oaqyl8nmy15dd/instrument_dataset.tgz?dl=0
Sample call:
$ python instrument_classifier.py \
instrument_dataset/uiowa_train_data.npy \
instrument_dataset/uiowa_train_labels.npy \
instrument_dataset/uiowa_test_data.npy \
instrument_dataset/uiowa_test_labels.npy \
--learning_rate=0.02 \
--print_frequency=100 \
--max_iterations=10000
"""
import argparse
import numpy as np
import theano
import theano.tensor as T
import time
from collections import OrderedDict
def data_shuffler(data, labels, batch_size=100):
"""Data shuffler for training online algorithms with mini-batches.
Parameters
----------
data : np.ndarray
Data observations with shape (n_samples, dim0, dim1, ... dimN).
labels : np.ndarray
Integer targets corresponding to the data (x).
Returns
-------
x_m : np.ndarray
Data with shape (batch_size, dim0, dim1, ... dimN).
y_m : np.ndarray
Targets corresponding to the samples in x_m.
"""
num_samples = len(data)
sample_idx = np.arange(num_samples, dtype=np.int32)
read_ptr = num_samples
while True:
x_m, y_m = [], []
while len(x_m) < batch_size:
if read_ptr >= num_samples:
np.random.shuffle(sample_idx)
read_ptr = 0
x_m.append(data[sample_idx[read_ptr]])
y_m.append(labels[sample_idx[read_ptr]])
read_ptr += 1
yield np.array(x_m), np.array(y_m)
def data_stepper(data, labels, batch_size=100):
"""Generator for stepping through a dataset in batches.
Parameters
----------
data : np.ndarray
Data observations with shape (n_samples, dim0, dim1, ... dimN).
labels : np.ndarray
Integer targets corresponding to the data (x).
Returns
-------
x_m : np.ndarray
Data with shape (batch_size, dim0, dim1, ... dimN).
y_m : np.ndarray
Targets corresponding to the samples in x_m.
"""
num_samples = len(data)
read_ptr = 0
x_m, y_m = [], []
while read_ptr < num_samples:
x_m.append(data[read_ptr])
y_m.append(labels[read_ptr])
read_ptr += 1
if len(x_m) == batch_size:
yield np.array(x_m), np.array(y_m)
x_m, y_m = [], []
if len(x_m) > 0:
yield np.array(x_m), np.array(y_m)
def prepare_training_data(data_file, label_file, batch_size=100):
"""Create a data generator from input data and label files.
Parameters
----------
data_file : str
Path to a numpy file of data observations.
label_file : str
Path to a numpy file of data labels.
batch_size : int, default=100
Number of datapoints to return for each batch.
Returns
-------
shuffler : generator
Data generator that returns an (x,y) tuple for each call
to next().
"""
data = np.load(data_file)
labels = np.load(label_file)
# Compute statistics for standardizing the data.
stats = {'mu': data.mean(axis=0), 'sigma': data.std(axis=0)}
return data_shuffler(data, labels, batch_size=batch_size), stats
def hwr(x_input):
"""Theano functiom to compute half-wave rectification, i.e. max(x, 0).
Parameters
----------
x : theano symbolic type
Object to half-wave rectify.
Returns
-------
z : theano symbolic type
Result of the function.
"""
return 0.5 * (T.abs_(x_input) + x_input)
def build_network():
"""Build a network for instrument classification.
Returns
-------
objective_fx: compiled theano function
Callable function that takes (x, y, eta) as arguments, returning the
scalar loss over the data x; implicitly updates the parameters of the
network given the learning rate eta.
prediction_fx: compiled theano function
Callable function that takes (x) as an argument; returns the posterior
representation for the input data.
params: dict
All trainable parameters in the network.
"""
# ----------------------------------------------------
# Step 1. Build the network
# ----------------------------------------------------
x_input = T.matrix('input')
# Define layer shapes -- (n_in, n_out)
l0_dim = (120, 256)
l1_dim = (256, 256)
l2_dim = (256, 10)
# Build-in the standardization methods.
mu_obs = theano.shared(np.zeros(l0_dim[:1]), name='mu')
sigma_obs = theano.shared(np.ones(l0_dim[:1]), name='sigma')
x_input -= mu_obs.dimshuffle('x', 0)
x_input /= sigma_obs.dimshuffle('x', 0)
# Layer 0
weights0 = theano.shared(np.random.normal(scale=0.01, size=l0_dim),
name='weights0')
bias0 = theano.shared(np.zeros(l0_dim[1]), name='bias0')
z_out0 = hwr(T.dot(x_input, weights0) + bias0)
# Layer 1
weights1 = theano.shared(np.random.normal(scale=0.01, size=l1_dim),
name='weights1')
bias1 = theano.shared(np.zeros(l1_dim[1]), name='bias1')
z_out1 = hwr(T.dot(z_out0, weights1) + bias1)
# Layer 2 - Classifier Layer
weights2 = theano.shared(np.random.normal(scale=0.01, size=l2_dim),
name='weights2')
bias2 = theano.shared(np.zeros(l2_dim[1]), name='bias2')
z_output = T.nnet.softmax(T.dot(z_out1, weights2) + bias2)
# ----------------------------------------------------
# Step 2. Define a loss function
# ----------------------------------------------------
y_target = T.ivector('y_target')
observation_index = T.arange(y_target.shape[0], dtype='int32')
scalar_loss = T.mean(-T.log(z_output)[observation_index, y_target])
accuracy = T.mean(T.eq(T.argmax(z_output, axis=1), y_target))
# ----------------------------------------------------
# Step 3. Compute Update rules
# ----------------------------------------------------
eta = T.scalar(name="learning_rate")
updates = OrderedDict()
network_params = OrderedDict()
for param in [weights0, bias0, weights1, bias1, weights2, bias2]:
# Save each parameter for returning later.
network_params[param.name] = param
# Compute the gradient with respect to each parameter.
gparam = T.grad(scalar_loss, param)
# Now, save the update rule for each parameter.
updates[param] = param - eta * gparam
# ----------------------------------------------------
# Step 4. Compile wicked fast theano functions!
# ----------------------------------------------------
objective_fx = theano.function(inputs=[x_input, y_target, eta],
outputs=100*(1.0 - accuracy),
updates=updates,
allow_input_downcast=True)
prediction_fx = theano.function(inputs=[x_input],
outputs=z_output,
allow_input_downcast=True)
# Add mu and sigma variables now, as we don't want to update them
# during training.
network_params.update({mu_obs.name: mu_obs,
sigma_obs.name: sigma_obs})
return objective_fx, prediction_fx, network_params
def train_network(objective_fx, shuffler, learning_rate, num_iterations,
print_frequency=100):
"""Run the training process for some number of iterations.
Parameters
----------
objective_fx : compiled theano function
First function returned by build network; updates the parameters as
data is passed to it.
shuffler : generator
Data source with a next() method, returning a two-element tuple (x,y).
learning_rate : scalar
Update rate for each gradient step.
num_iterations : int
Number of update iterations to run.
print_frequency : int
Number of iterations between printing information to the console.
Returns
-------
training_error : np.ndarray
Vector of training loss values over iterations.
"""
training_error = np.zeros(num_iterations)
n_iter = 0
try:
while n_iter < num_iterations:
x_m, y_m = shuffler.next()
training_error[n_iter] = objective_fx(x_m, y_m, learning_rate)
if (n_iter % print_frequency) == 0:
print "[%s]\t Iter: %07d \tTraining Error: %0.2f" % \
(time.asctime(), n_iter, training_error[n_iter])
n_iter += 1
except KeyboardInterrupt:
print "Stopping Early."
return training_error[:n_iter]
def main(args):
"""Main routine for training a deep network.
After training a deep network some number of iterations, the error over the
last batch update is reported and the total error over the holdout set is
computed.
As a point of reference, sklearn's SVC with a linear kernel achieves train
and test error of approximately 13%/20%, respectively. Here, with 50k
iterations and a learning rate of 0.025, the deep net achieves train and
test error of 2%/6.2%, respectively.
Parameters
----------
args : ArgumentParser
Initialized argument object.
"""
objective_fx, prediction_fx, params = build_network()
shuffler, stats = prepare_training_data(
args.train_data_file, args.train_label_file, args.batch_size)
# Set network's mu and sigma values.
for name in ['mu', 'sigma']:
params[name].set_value(stats[name])
training_error = train_network(objective_fx,
shuffler,
args.learning_rate,
args.max_iterations,
args.print_frequency)
print "Final Training Error: %0.4f" % training_error[-1]
# Prepare testing data to step through in batches.
test_data = data_stepper(
np.load(args.test_data_file), np.load(args.test_label_file), 500)
correct_predictions = [np.equal(prediction_fx(x_m).argmax(axis=1),
y_m) for x_m, y_m in test_data]
test_error = 1.0 - np.concatenate(correct_predictions).mean()
print "Test Error: %0.4f" % (100*test_error)
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description="Learn chroma features from DFT magnitude spectra.")
parser.add_argument("train_data_file",
metavar="train_data_file", type=str,
help="Filepath to train data.")
parser.add_argument("train_label_file",
metavar="train_label_file", type=str,
help="Filepath to train labels.")
parser.add_argument("test_data_file",
metavar="test_data_file", type=str,
help="Filepath to test data.")
parser.add_argument("test_label_file",
metavar="test_label_file", type=str,
help="Filepath to test labels.")
parser.add_argument("--max_iterations",
metavar="max_iterations", type=int,
default=5000, action="store",
help="Maximum number of iterations to train.")
parser.add_argument("--batch_size",
metavar="batch_size", type=int,
default=50, action="store",
help="Size of the mini-batch.")
parser.add_argument("--print_frequency",
metavar="print_frequency", type=int,
default=50, action="store",
help="Number of iterations between console printing.")
parser.add_argument("--learning_rate",
metavar="learning_rate", type=float,
default=0.02, action="store",
help="Learning rate for updating parameters.")
main(parser.parse_args())