Skip to content

Commit f4d6d3a

Browse files
committed
update to lsl client
at the moment, I am able to connect to a stream. I need to test whether I am able to read the data from the stream. opened an issue on pylsl to inquire whether i can both send and receive on same machine for local dev purpose (labstreaminglayer/pylsl#10) once I can confirm the data is coming in properly, i will add some tests
1 parent 5de9e0a commit f4d6d3a

2 files changed

Lines changed: 103 additions & 29 deletions

File tree

mne/realtime/base_client.py

Lines changed: 53 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import time
88
import numpy as np
99

10-
from mne.io.meas_info import create_info
10+
from ..utils import logger, fill_doc
1111

1212
def _buffer_recv_worker(client):
1313
"""Worker thread that constantly receives buffers.
@@ -45,19 +45,18 @@ class _BaseClient(object):
4545
"""
4646

4747
def __init__(self, identifier, port=None, tmin=None, tmax=np.inf,
48-
buffer_size=1000, verbose=None): # noqa: D102
48+
wait_max=10, buffer_size=1000, verbose=None): # noqa: D102
4949
self.identifier = identifier
5050
self.port = port
5151
self.tmin = tmin
5252
self.tmax = tmax
53+
self.wait_max = wait_max
5354
self.buffer_size = buffer_size
5455
self.verbose = verbose
55-
self.create_info()
56-
5756

5857
def __enter__(self): # noqa: D105
5958

60-
# connect to FieldTrip buffer
59+
# connect to buffer
6160
logger.info("Client: Waiting for server to start")
6261
start_time, current_time = time.time(), time.time()
6362
success = False
@@ -72,34 +71,60 @@ def __enter__(self): # noqa: D105
7271
time.sleep(0.1)
7372

7473
if not success:
75-
raise RuntimeError('Could not connect to FieldTrip Buffer')
74+
raise RuntimeError('Could not connect to Buffer')
7675

76+
self.create_info()
7777
self._enter_extra()
7878

7979
return self
8080

81+
def __exit__(self, type, value, traceback):
82+
self.disconnect()
83+
84+
return self
85+
8186
def connect(self):
8287
pass
8388

8489
def create_info(self):
8590
pass
8691

87-
def _enter_extra():
88-
"""For system-specific loading and initializing during the enter
92+
def get_data_as_epoch(self, n_samples=1024, picks=None):
93+
"""Return last n_samples from current time.
94+
95+
Parameters
96+
----------
97+
n_samples : int
98+
Number of samples to fetch.
99+
%(picks_all)s
100+
101+
Returns
102+
-------
103+
epoch : instance of Epochs
104+
The samples fetched as an Epochs object.
105+
106+
See Also
107+
--------
108+
mne.Epochs.iter_evoked
89109
"""
90110
pass
91111

112+
def get_measurement_info(self):
113+
"""Return the measurement info.
114+
115+
Returns
116+
-------
117+
self.info : dict
118+
The measurement info.
119+
"""
120+
return self.info
121+
92122
def iter_raw_buffers(self):
93123
"""Return an iterator over raw buffers.
94124
"""
95125
pass
96126

97-
def _push_raw_buffer(self, raw_buffer):
98-
"""Push raw buffer to clients using callbacks."""
99-
for callback in self._recv_callbacks:
100-
callback(raw_buffer)
101-
102-
def _register_receive_callback(self, callback):
127+
def register_receive_callback(self, callback):
103128
"""Register a raw buffer receive callback.
104129
105130
Parameters
@@ -111,7 +136,7 @@ def _register_receive_callback(self, callback):
111136
if callback not in self._recv_callbacks:
112137
self._recv_callbacks.append(callback)
113138

114-
def _start_receive_thread(self, nchan):
139+
def start_receive_thread(self, nchan):
115140
"""Start the receive thread.
116141
117142
If the measurement has not been started, it will also be started.
@@ -128,7 +153,7 @@ def _start_receive_thread(self, nchan):
128153
self._recv_thread.daemon = True
129154
self._recv_thread.start()
130155

131-
def _stop_receive_thread(self, stop_measurement=False):
156+
def stop_receive_thread(self, stop_measurement=False):
132157
"""Stop the receive thread.
133158
134159
Parameters
@@ -140,7 +165,7 @@ def _stop_receive_thread(self, stop_measurement=False):
140165
self._recv_thread.stop()
141166
self._recv_thread = None
142167

143-
def _unregister_receive_callback(self, callback):
168+
def unregister_receive_callback(self, callback):
144169
"""Unregister a raw buffer receive callback.
145170
146171
Parameters
@@ -150,3 +175,14 @@ def _unregister_receive_callback(self, callback):
150175
"""
151176
if callback in self._recv_callbacks:
152177
self._recv_callbacks.remove(callback)
178+
179+
def _enter_extra(self):
180+
"""For system-specific loading and initializing after connect but
181+
during the enter.
182+
"""
183+
pass
184+
185+
def _push_raw_buffer(self, raw_buffer):
186+
"""Push raw buffer to clients using callbacks."""
187+
for callback in self._recv_callbacks:
188+
callback(raw_buffer)

mne/realtime/lsl_client.py

Lines changed: 50 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@
44
# License: BSD (3-clause)
55

66
import numpy as np
7+
78
from .base_client import _BaseClient
9+
from ..epochs import EpochsArray
10+
from ..io.meas_info import create_info
811

912
try:
1013
import pylsl
@@ -34,13 +37,10 @@ class LSLClient(_BaseClient):
3437
and :ref:`Logging documentation <tut_logging>` for more).
3538
"""
3639
def __init__(self, identifier, port=None, tmin=None, tmax=np.inf,
37-
buffer_size=1000, verbose=None):
38-
self.identifier = identifier
39-
self.port = port
40-
self.tmin = tmin
41-
self.tmax = tmax
42-
self.buffer_size = buffer_size
43-
self.verbose = verbose
40+
wait_max=10, buffer_size=1000, verbose=None):
41+
super(LSLClient, self).__init__(identifier, port, tmin, tmax, wait_max,
42+
buffer_size, verbose)
43+
4444

4545
def connect(self):
4646
stream = pylsl.resolve_byprop('source_id', self.identifier,
@@ -56,17 +56,55 @@ def create_info(self):
5656
ch_info = lsl_info.desc().child("channels").child("channel")
5757
ch_names = list()
5858
ch_types = list()
59+
ch_type = lsl_info.type()
5960
for k in range(lsl_info.channel_count()):
60-
ch_names.append(ch_info.child_value("label"))
61-
ch_types.append(ch_info.child_value("type"))
62-
ch_info = ch_info.next_sibling()
61+
ch_names.append(ch_info.child_value("label")
62+
or '{} {:03d}'.format(ch_type, k))
63+
ch_types.append(ch_info.child_value("type")
64+
or ch_type.lower())
65+
ch_info.next_sibling()
6366

6467
info = create_info(ch_names, sfreq, ch_types)
6568

6669
self.info = info
6770

6871
return self
6972

73+
def disconnect(self):
74+
self.client.close_stream()
75+
76+
return self
77+
78+
def get_data_as_epoch(self, n_samples=1024, picks=None):
79+
"""Return last n_samples from current time.
80+
81+
Parameters
82+
----------
83+
n_samples : int
84+
Number of samples to fetch.
85+
%(picks_all)s
86+
87+
Returns
88+
-------
89+
epoch : instance of Epochs
90+
The samples fetched as an Epochs object.
91+
92+
See Also
93+
--------
94+
mne.Epochs.iter_evoked
95+
"""
96+
inlet = pylsl.StreamInlet(self.client, n_samples)
97+
print(inlet)
98+
99+
wait_time = n_samples * 5. / self.info['sfreq']
100+
samples, _ = inlet.pull_chunk(max_samples=n_samples,
101+
timeout=wait_time)
102+
data = np.vstack(samples).T
103+
104+
picks = _picks_to_idx(self.info, picks, 'all', exclude=())
105+
info = pick_info(self.info, picks)
106+
return EpochsArray(data[picks][np.newaxis], info, events)
107+
70108
def iter_raw_buffers(self):
71109
"""Return an iterator over raw buffers.
72110
"""
@@ -75,6 +113,6 @@ def iter_raw_buffers(self):
75113
## add tmin and tmax to this logic
76114

77115
while True:
78-
samples, timestamps = inlet.pull_chunk(max_samples=self.buffer_size)
116+
samples, _ = inlet.pull_chunk(max_samples=self.buffer_size)
79117

80-
yield samples
118+
yield np.vstack(samples).T

0 commit comments

Comments
 (0)