-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdetector.py
344 lines (258 loc) · 10.1 KB
/
detector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
#! /usr/bin/env python3
from abc import ABC, abstractmethod
import argparse
import datetime
import dotenv
import logging
import logging.handlers
import numpy as np
from typing import Optional, Tuple
import os
import pathlib
import pyaudio
from scipy.signal import argrelmax
import subprocess
import shlex
import time
import wave
RATE: int = 8000 # サンプリング周波数、フレーム数
CHUNK: int = int(RATE / 2) # PyAudioで一度に取得するサンプリング数. サンプリング周波数の半分。0.5秒分
FORMAT = pyaudio.paInt16 # フォーマット
CHANNELS: int = 1 # チャンネル数 (モノラル)
FREQ_1ST: float = 849.0 # ピンポーンのピンの周波数
FREQ_2ND: float = 680.0 # ピンポーンのポーンの周波数
WINDOW_IN_SECONDS: float = 1.5 # ピンポーンを検出を試みる区間の秒数(window)
WINDOW_IN_FRAMES: int = int(WINDOW_IN_SECONDS * RATE) # ピンポーン検出を試みる区間のフレーム数
logger: Optional[logging.Logger] = None
# 音入力のインターフェース
class FrameReader(ABC):
@abstractmethod
def open(self) -> bool:
pass
@abstractmethod
def should_open_again(self) -> bool:
pass
@abstractmethod
def read(self, chunk=CHUNK) -> Optional[bytes]:
pass
@abstractmethod
def close(self) -> bool:
pass
# 音入力としてwavファイルを利用するときのFrameReader。デバッグ用
class WavFrameReader(FrameReader):
def __init__(self, wavFilePath: pathlib.Path) -> None:
super().__init__()
self._wavFilePath = wavFilePath
self._wavFile = None
def open(self) -> bool:
self._wavFile = wave.open(str(self._wavFilePath), 'rb')
if not self._wavFile:
logger.error(f"Failed to open the wav file. {self._wavFilePath}")
return False
if self._wavFile.getframerate() != RATE:
logger.error(f"Invalid framerate . {self._wavFile.getframerate()}")
return False
if self._wavFile.getnchannels() != CHANNELS:
logger.error(f"Invalid channels . {self._wavFile.getnchannels()}")
return False
if self._wavFile.getsampwidth() != 2:
logger.error(f"Invalid width . {self._wavFile.getsamwidth()}")
return False
return True
def should_open_again(self) -> bool:
return False
def read(self, chunk=CHUNK) -> Optional[bytes]:
frames = self._wavFile.readframes(chunk)
if not frames:
return None
return frames
def close(self) -> bool:
self._wavFile.close()
self._wavFile = None
return True
# 音入力としてmicを利用するときのFrameReader。
class MicFrameReader(FrameReader):
def __init__(self) -> None:
super().__init__()
self._p = None
self._stream = None
def open(self) -> bool:
self._p = pyaudio.PyAudio()
input_device_index = -1
for host_index in range(0, self._p.get_host_api_count()):
logger.info(f"host: {self._p.get_host_api_info_by_index(host_index)}")
for device_index in range(0, self._p.get_host_api_info_by_index(host_index)['deviceCount']):
device_info = self._p.get_device_info_by_host_api_device_index(host_index, device_index)
logger.info(f"device: {device_info}")
if device_info['name'] == os.environ.get("AUDIO_DEVICE"):
input_device_index = device_info["index"]
break
else:
continue
break
if input_device_index < 0:
self.close()
return False
logger.info(f"========= {input_device_index}")
try:
self._stream = self._p.open(
format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
input_device_index=input_device_index,
frames_per_buffer=CHUNK
)
except Exception as e:
logger.error(e)
self.close()
return False
return True
def should_open_again(self) -> bool:
return True
def read(self, chunk=CHUNK) -> Optional[bytes]:
if not self._stream.is_active():
return None
frames = self._stream.read(CHUNK)
return frames
def close(self) -> bool:
if self._stream:
self._stream.stop_stream()
self._stream.close()
self._stream = None
if self._p:
self._p.terminate()
self._p = None
return True
def setup_logger(name, console=False, level=logging.INFO, logfile='LOGFILENAME.txt') -> logging.Logger:
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
fmt = "%(asctime)s %(thread)d %(levelname)s %(name)s :%(message)s"
# create file handler which logs even DEBUG messages
fh = logging.handlers.RotatingFileHandler(logfile, maxBytes=1000000, backupCount=10)
fh.setLevel(level)
fh_formatter = logging.Formatter(fmt)
fh.setFormatter(fh_formatter)
logger.addHandler(fh)
if console:
ch = logging.StreamHandler()
ch.setLevel(level)
ch_formatter = logging.Formatter(fmt)
ch.setFormatter(ch_formatter)
logger.addHandler(ch)
return logger
# FFTをする。
def fft(frames: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
x = np.fft.fft(frames)
freq = np.fft.fftfreq(len(frames), d=1.0 / RATE)
x = x[:int(len(x) / 2)]
freq = freq[:int(len(freq) / 2)]
amp = np.sqrt(x.real ** 2 + x.imag ** 2)
return (amp, freq)
# Peakを求める。
def findpeaks(x: np.ndarray, y: np.ndarray, n: int = 50, w: int = 100) -> Tuple[np.ndarray, np.ndarray]:
index_all = argrelmax(y, order=w) # scipyのピーク検出
index = [] # ピーク指標の空リスト
peaks = [] # ピーク値の空リスト
# n個分のピーク情報(指標、値)を格納
for i in range(min(len(index_all[0]), n)):
index.append(index_all[0][i])
peaks.append(y[index_all[0][i]])
index = np.array(index) * x[1] # xの分解能x[1]をかけて指標を物理軸に変換
peaks = np.array(peaks)
return index, peaks
# ピークに、指定された周波数が含まれるかチェック
def has_freq(freqs: np.ndarray, peaks: np.ndarray, target: float) -> bool:
for freq, peak in zip(freqs.tolist(), peaks.tolist()):
if target - 2 <= freq and freq <= target + 2:
logger.info(f"peak {target}, {freq} {peak}")
if peak > 20000:
return True
return False
# l = freq.tolist()
# res = list(filter(lambda x: target - 5 <= x and x <= target + 5, l))
# return bool(res)
# Alexa Echo に 話させる。
def speak_alexa() -> None:
device = os.environ.get("ALEXA_DEVICE")
cmd = f'./alexa_remote_control.sh -d "{device}" -e "speak: チャイムが鳴りました"'
logging.debug(cmd)
# cmd = './alexa_remote_control.sh -d "ALL" -e "speak: チャイムが鳴りました"'
subprocess.run(shlex.split(cmd))
def save_wav(folder: pathlib.Path, frames) -> None:
now = datetime.datetime.now()
filename = "detect-" + now.strftime('%Y%m%d_%H%M%S') + '.wav'
path = pathlib.Path(folder, filename)
logging.info(f"save wav file. {path}")
wav_file = wave.open(str(path), "wb")
wav_file.setnchannels(CHANNELS)
wav_file.setsampwidth(2)
wav_file.setframerate(RATE)
wav_file.writeframes(b"".join(frames))
wav_file.close()
def main() -> None:
dotenv.load_dotenv(verbose=True)
parser = argparse.ArgumentParser()
parser.add_argument("-v", "--verbose", help="verbose", action="count", default=0)
parser.add_argument("-c", "--console", help="console output", action='store_true')
parser.add_argument("-w", "--wav_folder", help="Output folder to store detected wav filess", default=".")
args = parser.parse_args()
logging_level = logging.INFO if args.verbose == 0 else logging.DEBUG
global logger
logger = setup_logger(__name__, args.console, logging_level, "intercom.log")
wav_folder = pathlib.Path(args.wav_folder)
if (not wav_folder.exists()):
wav_folder.mkdir(parents=True)
# frame_reader: FrameReader = WavFrameReader('test-data/sample1.wav')
frame_reader: FrameReader = MicFrameReader()
# Open FrameReader
while True:
logger.info("Try to open")
if frame_reader.open():
logger.info("Success")
break
if not frame_reader.should_open_again():
frame_reader.close()
return
time.sleep(1)
counter: int = 0
skip_count: int = 0
frames_list = []
np_frames_list = []
while True:
# Read frames
frames = frame_reader.read()
if not frames:
logger.info("End of frames")
break
logger.debug(f"frames: {type(frames), {len(frames)}}")
if counter == 0:
logger.info(".")
counter = counter + 1 if counter < 10 else 0
if frames is None:
logger.info("End of frames")
break
skip_count = max(0, skip_count - 1)
frames_list.append(frames)
np_frames_list.append(np.frombuffer(frames, dtype='int16'))
if len(frames_list) * CHUNK < WINDOW_IN_FRAMES:
continue
if len(frames_list) * CHUNK > WINDOW_IN_FRAMES:
frames_list = frames_list[1:]
np_frames_list = np_frames_list[1:]
# FFT
amp, freq = fft(np.concatenate(np_frames_list))
# Find peaks
index, peaks = findpeaks(freq, amp)
# Detect
if has_freq(index, peaks, FREQ_1ST) and has_freq(index, peaks, FREQ_2ND):
logger.info(f"detect!!! {skip_count}")
if skip_count == 0:
speak_alexa()
save_wav(wav_folder, frames_list)
skip_count = 5
frame_reader.close()
if __name__ == "__main__":
main()
# dotenv.load_dotenv(verbose=True)
# speak_alexa()