Select Git revision
control_loop_manager.py
m-guberina authored
full body interface and it makes sense. does not break other code by itself. that said it won't work arbitrarily because a lot of logs and so on use robot.model.nq, and now it has to be model.nq to support controlling joint subspaces. those will be addressed on a case-by-case basis, i don't have an hour to fix all of this now, nor a few more to write all the required tests. onto mobile base only path-following mpc
control_loop_manager.py 11.14 KiB
from argparse import Namespace
from smc.robots.robotmanager_abstract import AbstractRobotManager
from smc.multiprocessing.process_manager import ProcessManager
from smc.visualization.plotters import realTimePlotter
from functools import partial
import signal
import time
import numpy as np
from collections import deque
import copy
from os import getpid
class ControlLoopManager:
"""
ControlLoopManager
-------------------
Slightly fancier programming (passing a function as argument and using functools.partial)
to get a wrapper around the control loop.
In other words, it's the book-keeping around the actual control loop.
It's a class because it keeps non-directly-control-loop-related parameters
like max_iterations, what data to save etc.
NOTE: you give this the ready-made control loop.
if it has arguments, bake them in with functools.partial.
Handles short-term data saving and logging.
Details on this are given below.
Short term data saving:
- it's a dictionaries of deques (initialized here), because deque is the most convenient class
for removing the first element and appending a last (it is just a linked list under the hood of course).
- it's a dictionary for modularity's sake, because this way you can save whatever you want
- and it will just work based on dictionary keys.
- it is the user's resposibility to make sure they're providing correct data.
- --> TODO but make an assert for the keys at least
- in the c++ imlementation, make the user write their own struct or something.
- since this is python, you need to give me initial values to infer types.
- you need to provide initial values to populate the deque to start anyway.
Logging data (for analysis and plotting):
- it can only be handled here because the control loop itself only cares about the present/
a small time-window around it.
- saves it all in a dictionary of ndarrays (initialized here), returns that after a run
TODO: it's provided by the user now, make it actually initialize here!!!
- you need to specify which keys you're using to do the initialization
- later, the controlLoop needs to return what's to be save in a small temporary dict.
- NOTE: this is of course a somewhat redundant solution, but it's the simplest
and most flexible way of doing it.
it probably will be done some other way down the line, but it works and is not
a priority right now
Other info:
- relies on RobotManager to handle all the magic numbers
that are not truly only control loop related
"""
def __init__(
self,
robot_manager: AbstractRobotManager,
controlLoop: partial,
args,
save_past_item,
log_item,
):
signal.signal(signal.SIGINT, self.stopHandler)
self.pid = getpid()
self.max_iterations: int = args.max_iterations
self.robot_manager: AbstractRobotManager = robot_manager
self.controlLoop = controlLoop # TODO: declare partial function type
self.final_iteration = -1 # because we didn't even start yet
self.args: Namespace = args
self.iter_n: int = 0
self.past_data: dict[str, deque[np.ndarray]] = {}
# NOTE: viz update rate is a magic number that seems to work fine and i don't have
# any plans to make it smarter
self.viz_update_rate: int = int(np.ceil(self.args.ctrl_freq / 25))
# save_past_dict has to have the key and 1 example of what you're saving
# so that it's type can be inferred (but we're in python so types don't really work).
# the good thing is this way you also immediatelly put in the initial values
for key in save_past_item:
self.past_data[key] = deque()
# immediatelly populate every deque with initial values
for _ in range(self.args.past_window_size):
# deepcopy just in case, better safe than sorry plus it's during initialization,
# not real time
self.past_data[key].append(copy.deepcopy(save_past_item[key]))
# similar story for log_dict as for past_data,
# except this is not used in the control loop,
# we don't predeclare sizes, but instead
# just shove items into linked lists (python lists) in dictionaries (hash-maps)
self.log_dict: dict[str, list[np.ndarray]] = {}
for key in log_item:
self.log_dict[key] = []
if self.args.plotter:
self.plotter_manager: ProcessManager = ProcessManager(
args, realTimePlotter, log_item, 0
)
def run_one_iter(self, i):
"""
run
---
do timing to run at 500Hz.
also handle the number of iterations.
it's the controlLoop's responsibility to break if it achieved it's goals.
this is done via the breakFlag
"""
# NOTE: all required pre-computations are handled here
self.robot_manager._step()
# TODO make the arguments to controlLoop kwargs or whatever
# so that you don't have to declare them on client side if you're not using them
breakFlag, latest_to_save_dict, log_item = self.controlLoop(i, self.past_data)
self.final_iteration = i
# update past rolling window
# TODO: write an assert assuring the keys are what's been promised
# (ideally this is done only once, not every time, so think whether/how that can be avoided)
for key in latest_to_save_dict:
# remove oldest entry
self.past_data[key].popleft()
# add new entry
self.past_data[key].append(latest_to_save_dict[key])
# log the data
# check that you can
# TODO only need to check this once, pls enforce better
# if len(self.log_dict) > 0:
for key in log_item:
# if key not in self.log_dict.keys():
# raise KeyError("you need to provide log items you promised!")
# break
self.log_dict[key].append(log_item[key])
# TODO: do it this way if running on the real robot.
# but if not, we want to control the speed of the simulation,
# and how much we're plotting.
# so this should be an argument that is use ONLY if we're in simulation
if i % self.viz_update_rate == 0:
# don't send what wasn't ready
if self.args.visualizer:
self.robot_manager.visualizer_manager.sendCommand(
{
"q": self.robot_manager._q,
}
)
if self.robot_manager.robot_name != "yumi":
self.robot_manager.visualizer_manager.sendCommand(
{
"T_w_e": self.robot_manager.T_w_e,
}
)
else:
T_w_e_left, T_w_e_right = self.robot_manager.T_w_e
self.robot_manager.visualizer_manager.sendCommand(
{"T_w_e": T_w_e_left}
)
if self.robot_manager.robot_name == "heron":
T_base = self.robot_manager.data.oMi[1]
self.robot_manager.visualizer_manager.sendCommand(
{"T_base": T_base}
)
# if self.robot_manager.robot_name == "yumi":
# T_base = self.robot_manager.data.oMi[1]
# self.robot_manager.visualizer_manager.sendCommand({"T_base" : T_base})
if self.args.visualize_collision_approximation:
raise NotADirectoryError
# TODO: here call robot manager's update ellipses function
if self.args.plotter:
# don't put new stuff in if it didn't handle the previous stuff.
# it's a plotter, who cares if it's late.
# the number 5 is arbitrary
self.plotter_manager.sendCommand(log_item)
return breakFlag
def run(self):
self.final_iteration = 0
for i in range(self.max_iterations):
start = time.time()
breakFlag = self.run_one_iter(i)
# break if done
if breakFlag:
break
# sleep for the rest of the frequency cycle
end = time.time()
diff = end - start
if (self.robot_manager.dt < diff) and (self.args.ctrl_freq > 0):
if self.args.debug_prints:
print("missed deadline by", diff - self.robot_manager.dt)
continue
else:
if self.args.real or self.args.ctrl_freq > 0:
time.sleep(self.robot_manager.dt - diff)
######################################################################
# for over
######################################################################
if self.args.plotter:
self.plotter_manager.terminateProcess()
if self.args.save_log:
self.robot_manager._log_manager.storeControlLoopRun(
self.log_dict, self.controlLoop.func.__name__, self.final_iteration
)
if i < self.max_iterations - 1:
if self.args.debug_prints:
print("success in", i, "iterations!")
else:
print("FAIL: did not succed in", self.max_iterations, "iterations")
# self.stopHandler(None, None)
def stopHandler(self, signum, frame):
"""
stopHandler
-----------
upon receiving SIGINT it sends zeros for speed commands to
stop the robot.
NOTE: apparently this isn't enough,
nor does stopJ do anything, so it goes to freedriveMode
and then exits it, which actually stops ur robots at least.
"""
# when we make a new process, it inherits signal handling.
# which means we call this more than once.
# and that could lead to race conditions.
# but if we exit immediatealy it's fine
if getpid() != self.pid:
return
# TODO: you want this kinda bullshit to be defined on a per-robot basis,
# and put it into robot_manager.stopRobot()
print("sending 300 speedjs full of zeros and exiting")
for _ in range(300):
vel_cmd = np.zeros(self.robot_manager.nv)
self.robot_manager.sendVelocityCommand(vel_cmd)
self.robot_manager.stopRobot()
if self.args.save_log:
print("saving log")
# this does not get run if you exited with ctrl-c
self.robot_manager._log_manager.storeControlLoopRun(
self.log_dict, self.controlLoop.func.__name__, self.final_iteration
)
self.robot_manager._log_manager.saveLog()
if self.args.plotter:
self.plotter_manager.terminateProcess()
if self.args.visualizer:
self.robot_manager.visualizer_manager.terminateProcess()
exit()