Source code for pyfrc.test_support.controller

import contextlib
import typing
import wpilib
import threading
import pytest

from wpilib.simulation import DriverStationSim, stepTiming, stepTimingAsync


[docs] class TestController: """ Use this object to control the robot's state during tests """ def __init__(self, reraise, robot: wpilib.RobotBase): self._reraise = reraise self._thread: typing.Optional[threading.Thread] = None self._robot = robot self._cond = threading.Condition() self._robot_started = False self._robot_initialized = False self._robot_finished = False def _on_robot_initialized(self): with self._cond: self._robot_initialized = True self._cond.notify_all() def _robot_thread(self, robot): with self._cond: self._robot_started = True self._cond.notify_all() with self._reraise(catch=True): assert robot is not None # shouldn't happen... robot._TestRobot__robotInitialized = self._on_robot_initialized try: robot.startCompetition() assert self._robot_finished finally: # always call endCompetition or python hangs robot.endCompetition() del robot
[docs] @contextlib.contextmanager def run_robot(self): """ Use this in a "with" statement to start your robot code at the beginning of the with block, and end your robot code at the end of the with block. Your `robotInit` function will not be called until this function is called. """ # remove robot reference so it gets cleaned up when gc.collect() is called robot = self._robot self._robot = None self._thread = th = threading.Thread( target=self._robot_thread, args=(robot,), daemon=True ) th.start() with self._cond: # make sure the thread didn't die assert self._cond.wait_for(lambda: self._robot_started, timeout=1) # If your robotInit is taking more than 2 seconds in simulation, you're # probably doing something wrong... but if not, please report a bug! assert self._cond.wait_for(lambda: self._robot_initialized, timeout=2) try: # in this block you should tell the sim to do sim things yield finally: self._robot_finished = True robot.endCompetition() if isinstance(self._reraise.exception, RuntimeError): if str(self._reraise.exception).startswith( "HAL: A handle parameter was passed incorrectly" ): msg = ( "Do not reuse HAL objects in tests! This error may occur if you" " stored a motor/sensor as a global or as a class variable" " outside of a method." ) if hasattr(Exception, "add_note"): self._reraise.exception.add_note(f"*** {msg}") else: e = self._reraise.exception self._reraise.reset() raise RuntimeError(msg) from e # Increment time by 1 second to ensure that any notifiers fire stepTimingAsync(1.0) # the robot thread should exit quickly th.join(timeout=1) if th.is_alive(): pytest.fail("robot did not exit within 2 seconds") self._robot = None self._thread = None
@property def robot_is_alive(self) -> bool: """ True if the robot code is alive """ th = self._thread if not th: return False return th.is_alive()
[docs] def step_timing( self, *, seconds: float, autonomous: bool, enabled: bool, assert_alive: bool = True, ) -> float: """ This utility will increment simulated time, while pretending that there's a driver station connected and delivering new packets every 0.2 seconds. :param seconds: Number of seconds to run (will step in increments of 0.2) :param autonomous: Tell the robot that it is in autonomous mode :param enabled: Tell the robot that it is enabled :returns: Number of seconds time was incremented """ assert self.robot_is_alive, "did you call control.run_robot()?" assert seconds > 0 DriverStationSim.setDsAttached(True) DriverStationSim.setAutonomous(autonomous) DriverStationSim.setEnabled(enabled) tm = 0.0 while tm < seconds + 0.01: DriverStationSim.notifyNewData() stepTiming(0.2) if assert_alive: assert self.robot_is_alive tm += 0.2 return tm