|
| 1 | +# Authors: Teon Brooks <teon.brooks@gmail.com> |
| 2 | +# Mainak Jas <mainakjas@gmail.com> |
| 3 | +# |
| 4 | +# License: BSD (3-clause) |
| 5 | + |
| 6 | +import threading |
| 7 | +import time |
| 8 | +import numpy as np |
| 9 | + |
| 10 | +from ..utils import logger, fill_doc |
| 11 | + |
| 12 | + |
| 13 | +def _buffer_recv_worker(client): |
| 14 | + """Worker thread that constantly receives buffers.""" |
| 15 | + try: |
| 16 | + for raw_buffer in client.iter_raw_buffers(): |
| 17 | + client._push_raw_buffer(raw_buffer) |
| 18 | + except RuntimeError as err: |
| 19 | + # something is wrong, the server stopped (or something) |
| 20 | + client._recv_thread = None |
| 21 | + print('Buffer receive thread stopped: %s' % err) |
| 22 | + |
| 23 | + |
| 24 | +class _BaseClient(object): |
| 25 | + """Base Realtime Client. |
| 26 | +
|
| 27 | + Parameters |
| 28 | + ---------- |
| 29 | + info : instance of mne.Info | None |
| 30 | + The measurement info read in from a file. If None, it is generated from |
| 31 | + the realtime stream. This method may result in less info than expected. |
| 32 | + host : str |
| 33 | + The identifier of the server. IP address, LSL id, or raw filename. |
| 34 | + port : int | None |
| 35 | + Port to use for the connection. |
| 36 | + wait_max : float |
| 37 | + Maximum time (in seconds) to wait for real-time buffer to start. |
| 38 | + tmin : float | None |
| 39 | + Time instant to start receiving buffers. If None, start from the latest |
| 40 | + samples available. |
| 41 | + tmax : float |
| 42 | + Time instant to stop receiving buffers. |
| 43 | + buffer_size : int |
| 44 | + Size of each buffer in terms of number of samples. |
| 45 | + verbose : bool, str, int, or None |
| 46 | + If not None, override default verbose level (see :func:`mne.verbose` |
| 47 | + and :ref:`Logging documentation <tut_logging>` for more). |
| 48 | + """ |
| 49 | + |
| 50 | + def __init__(self, info=None, host='localhost', port=None, |
| 51 | + wait_max=10., tmin=None, tmax=np.inf, |
| 52 | + buffer_size=1000, verbose=None): # noqa: D102 |
| 53 | + self.info = info |
| 54 | + self.host = host |
| 55 | + self.port = port |
| 56 | + self.wait_max = wait_max |
| 57 | + self.tmin = tmin |
| 58 | + self.tmax = tmax |
| 59 | + self.buffer_size = buffer_size |
| 60 | + self.verbose = verbose |
| 61 | + |
| 62 | + def __enter__(self): # noqa: D105 |
| 63 | + |
| 64 | + # connect to buffer |
| 65 | + logger.info("Client: Waiting for server to start") |
| 66 | + start_time = time.time() |
| 67 | + while time.time() < (start_time + self.wait_max): |
| 68 | + try: |
| 69 | + self._connect() |
| 70 | + logger.info("Client: Connected") |
| 71 | + break |
| 72 | + except Exception: |
| 73 | + time.sleep(0.1) |
| 74 | + else: |
| 75 | + raise RuntimeError('Could not connect to Client.') |
| 76 | + |
| 77 | + if not self.info: |
| 78 | + self.info = self._create_info() |
| 79 | + self._enter_extra() |
| 80 | + |
| 81 | + return self |
| 82 | + |
| 83 | + def __exit__(self, type, value, traceback): |
| 84 | + self._disconnect() |
| 85 | + |
| 86 | + return self |
| 87 | + |
| 88 | + @fill_doc |
| 89 | + def get_data_as_epoch(self, n_samples=1024, picks=None): |
| 90 | + """Return last n_samples from current time. |
| 91 | +
|
| 92 | + Parameters |
| 93 | + ---------- |
| 94 | + n_samples : int |
| 95 | + Number of samples to fetch. |
| 96 | + %(picks_all)s |
| 97 | +
|
| 98 | + Returns |
| 99 | + ------- |
| 100 | + epoch : instance of Epochs |
| 101 | + The samples fetched as an Epochs object. |
| 102 | +
|
| 103 | + See Also |
| 104 | + -------- |
| 105 | + mne.Epochs.iter_evoked |
| 106 | + """ |
| 107 | + pass |
| 108 | + |
| 109 | + def get_measurement_info(self): |
| 110 | + """Return the measurement info. |
| 111 | +
|
| 112 | + Returns |
| 113 | + ------- |
| 114 | + self.info : dict |
| 115 | + The measurement info. |
| 116 | + """ |
| 117 | + return self.info |
| 118 | + |
| 119 | + def iter_raw_buffers(self): |
| 120 | + """Return an iterator over raw buffers.""" |
| 121 | + pass |
| 122 | + |
| 123 | + def register_receive_callback(self, callback): |
| 124 | + """Register a raw buffer receive callback. |
| 125 | +
|
| 126 | + Parameters |
| 127 | + ---------- |
| 128 | + callback : callable |
| 129 | + The callback. The raw buffer is passed as the first parameter |
| 130 | + to callback. |
| 131 | + """ |
| 132 | + if callback not in self._recv_callbacks: |
| 133 | + self._recv_callbacks.append(callback) |
| 134 | + |
| 135 | + def start_receive_thread(self, nchan): |
| 136 | + """Start the receive thread. |
| 137 | +
|
| 138 | + If the measurement has not been started, it will also be started. |
| 139 | +
|
| 140 | + Parameters |
| 141 | + ---------- |
| 142 | + nchan : int |
| 143 | + The number of channels in the data. |
| 144 | + """ |
| 145 | + if self._recv_thread is None: |
| 146 | + |
| 147 | + self._recv_thread = threading.Thread(target=_buffer_recv_worker, |
| 148 | + args=(self, )) |
| 149 | + self._recv_thread.daemon = True |
| 150 | + self._recv_thread.start() |
| 151 | + |
| 152 | + def stop_receive_thread(self, stop_measurement=False): |
| 153 | + """Stop the receive thread. |
| 154 | +
|
| 155 | + Parameters |
| 156 | + ---------- |
| 157 | + stop_measurement : bool |
| 158 | + Also stop the measurement. |
| 159 | + """ |
| 160 | + if self._recv_thread is not None: |
| 161 | + self._recv_thread.stop() |
| 162 | + self._recv_thread = None |
| 163 | + |
| 164 | + def unregister_receive_callback(self, callback): |
| 165 | + """Unregister a raw buffer receive callback. |
| 166 | +
|
| 167 | + Parameters |
| 168 | + ---------- |
| 169 | + callback : callable |
| 170 | + The callback to unregister. |
| 171 | + """ |
| 172 | + if callback in self._recv_callbacks: |
| 173 | + self._recv_callbacks.remove(callback) |
| 174 | + |
| 175 | + def _connect(self): |
| 176 | + """Connect to client device.""" |
| 177 | + pass |
| 178 | + |
| 179 | + def _create_info(self): |
| 180 | + """Create an mne.Info class for connection to client.""" |
| 181 | + pass |
| 182 | + |
| 183 | + def _disconnect(self): |
| 184 | + """Disconnect the client device.""" |
| 185 | + pass |
| 186 | + |
| 187 | + def _enter_extra(self): |
| 188 | + """Run additional commands in __enter__. |
| 189 | +
|
| 190 | + For system-specific loading and initializing after connect but |
| 191 | + during the enter. |
| 192 | +
|
| 193 | + """ |
| 194 | + pass |
| 195 | + |
| 196 | + def _push_raw_buffer(self, raw_buffer): |
| 197 | + """Push raw buffer to clients using callbacks.""" |
| 198 | + for callback in self._recv_callbacks: |
| 199 | + callback(raw_buffer) |
0 commit comments