-
Notifications
You must be signed in to change notification settings - Fork 19
/
Copy pathopen_bci.py
64 lines (53 loc) · 1.86 KB
/
open_bci.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
"""Core OpenBCI objects for handling connections and samples from the board.
"""
import serial
class OpenBCIBoard(object):
"""Handle a connection to an OpenBCI board.
Args:
port: The port to connect to.
baud: The baud of the serial connection.
"""
def __init__(self, port, baud):
self.ser = serial.Serial(port, baud)
self.dump_registry_data()
self.streaming = False
self.filter_data = False
def dump_registry_data(self):
"""Dump all the debug data until we get to a line with something
about streaming data.
"""
line = ''
while 'begin streaming data' not in line:
line = self.ser.readline()
def start_streaming(self, callback):
"""Start handling streaming data from the board. Call a provided callback
for every single sample that is processed.
Args:
callback: A callback function that will receive a single argument of the
OpenBCISample object captured.
"""
if not self.streaming:
# Send an 'x' to the board to tell it to start streaming us text.
self.ser.write('x')
# Dump the first line that says "Arduino: Starting..."
self.ser.readline()
if self.filter_data:
print 'Enabling filter'
self.ser.write('f')
self.ser.readline()
self.ser.readline()
while True:
data = self.ser.readline()
sample = OpenBCISample(data)
callback(sample)
class OpenBCISample(object):
"""Object encapulsating a single sample from the OpenBCI board."""
def __init__(self, data):
parts = data.rstrip().split(', ')
self.id = parts[0]
self.channels = []
for c in xrange(1, len(parts) - 1):
self.channels.append(int(parts[c]))
# This is fucking bullshit but I have to strip the comma from the last
# sample because the board is returning a comma... wat?
self.channels.append(int(parts[len(parts) - 1][:-1]))