-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathplot_signal.py
114 lines (96 loc) · 3.66 KB
/
plot_signal.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#!/usr/bin/env python3
"""Plot the live microphone signal(s) with matplotlib.
Matplotlib and NumPy have to be installed.
"""
import argparse
import queue
import sys
def int_or_str(text):
"""Helper function for argument parsing."""
try:
return int(text)
except ValueError:
return text
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
'-l', '--list-devices', action='store_true',
help='show list of audio devices and exit')
parser.add_argument(
'-d', '--device', type=int_or_str,
help='input device (numeric ID or substring)')
parser.add_argument(
'-w', '--window', type=float, default=200, metavar='DURATION',
help='visible time slot (default: %(default)s ms)')
parser.add_argument(
'-i', '--interval', type=float, default=30,
help='minimum time between plot updates (default: %(default)s ms)')
parser.add_argument(
'-b', '--blocksize', type=int, help='block size (in samples)')
parser.add_argument(
'-r', '--samplerate', type=float, help='sampling rate of audio device')
parser.add_argument(
'-n', '--downsample', type=int, default=10, metavar='N',
help='display every Nth sample (default: %(default)s)')
parser.add_argument(
'channels', type=int, default=[1], nargs='*', metavar='CHANNEL',
help='input channels to plot (default: the first)')
args = parser.parse_args()
if any(c < 1 for c in args.channels):
parser.error('argument CHANNEL: must be >= 1')
mapping = [c - 1 for c in args.channels] # Channel numbers start with 1
q = queue.Queue()
def audio_callback(indata, frames, time, status):
"""This is called (from a separate thread) for each audio block."""
if status:
print(status, file=sys.stderr)
# Fancy indexing with mapping creates a (necessary!) copy:
q.put(indata[::args.downsample, mapping])
def update_plot(frame):
"""This is called by matplotlib for each plot update.
Typically, audio callbacks happen more frequently than plot updates,
therefore the queue tends to contain multiple blocks of audio data.
"""
global plotdata
while True:
try:
data = q.get_nowait()
except queue.Empty:
break
shift = len(data)
plotdata = np.roll(plotdata, -shift, axis=0)
plotdata[-shift:, :] = data
for column, line in enumerate(lines):
line.set_ydata(plotdata[:, column])
return lines
try:
from matplotlib.animation import FuncAnimation
import matplotlib.pyplot as plt
import numpy as np
import sounddevice as sd
if args.list_devices:
print(sd.query_devices())
parser.exit(0)
if args.samplerate is None:
device_info = sd.query_devices(args.device, 'input')
args.samplerate = device_info['default_samplerate']
length = int(args.window * args.samplerate / (1000 * args.downsample))
plotdata = np.zeros((length, len(args.channels)))
fig, ax = plt.subplots()
lines = ax.plot(plotdata)
if len(args.channels) > 1:
ax.legend(['channel {}'.format(c) for c in args.channels],
loc='lower left', ncol=len(args.channels))
ax.axis((0, len(plotdata), -1, 1))
ax.set_yticks([0])
ax.yaxis.grid(True)
ax.tick_params(bottom='off', top='off', labelbottom='off',
right='off', left='off', labelleft='off')
fig.tight_layout(pad=0)
stream = sd.InputStream(
device=args.device, channels=max(args.channels),
samplerate=args.samplerate, callback=audio_callback)
ani = FuncAnimation(fig, update_plot, interval=args.interval, blit=True)
with stream:
plt.show()
except Exception as e:
parser.exit(type(e).__name__ + ': ' + str(e))