-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy path__init__.py
More file actions
103 lines (81 loc) · 3.05 KB
/
__init__.py
File metadata and controls
103 lines (81 loc) · 3.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
"""
Shell interaction extension for riotctrl
Defines classes to abstract interactions with RIOT shell commands
"""
import abc
import pexpect
import pexpect.replwrap
# pylint: disable=R0903
class ShellInteractionParser(abc.ABC):
"""Allows parsing of the result string of a ShellInteraction"""
@abc.abstractmethod
def parse(self, cmd_output):
"""
Abstract parse method. Must be extended for a specific command
:param cmd_output (str): Output of ShellInteraction::cmd()
"""
class ShellInteraction:
"""
Base class for shell interactions
:param riotctrl: a RIOTCtrl object
:param prompt: the prompt of the shell (default: '> ')
"""
PROMPT_TIMEOUT = 0.5
def __init__(self, riotctrl, prompt="> "):
self.riotctrl = riotctrl
self.prompt = prompt
self.replwrap = None
self.term_was_started = False
def __del__(self):
if self.term_was_started:
self.stop_term()
def _start_replwrap(self):
if self.replwrap is None or self.replwrap.child != self.riotctrl.term:
# flush pexpect up to 10000 characters (arbitrarily chosen value)
# to start with an empty buffer. This fixes an issue where the
# REPLWrapper would capture an empty output for the first command
# and on subsequent commands always captures the output of the
# previous command on some RIOT-supported boards such as
# nucleo-f411re, b-l072z-lrwan1, and b-l475e-iot01a.
try:
self.riotctrl.term.read_nonblocking(10000, timeout=self.PROMPT_TIMEOUT)
except pexpect.TIMEOUT:
pass
# enforce prompt to be shown by sending newline
self.riotctrl.term.sendline("")
self.replwrap = pexpect.replwrap.REPLWrapper(
self.riotctrl.term,
orig_prompt=self.prompt,
prompt_change=None,
)
def start_term(self):
"""
Starts the terminal of the RIOTCtrl object
"""
self.term_was_started = True
self.riotctrl.start_term()
def stop_term(self):
"""
Stops the terminal of the RIOTCtrl object
"""
self.term_was_started = False
self.riotctrl.stop_term()
@staticmethod
def check_term(func, reset=True, **startkwargs):
"""
Decorator to ensure the terminal is running and stopped
"""
def wrapper(self, *args, **kwargs):
if self.riotctrl.term is None:
with self.riotctrl.run_term(reset, **startkwargs):
return func(self, *args, **kwargs)
return func(self, *args, **kwargs)
return wrapper
def cmd(self, cmd, timeout=-1, async_=False):
"""
Sends a command via the ShellInteraction's `riotctrl`
:param cmd: A shell command as string.
:return: Output of the command as a string
"""
self._start_replwrap()
return self.replwrap.run_command(cmd, timeout=timeout, async_=async_)