Skip to content
Snippets Groups Projects
Commit 4a178a45 authored by m-guberina's avatar m-guberina
Browse files

about to attempt templetizing path following via composition

parent 046ef99b
No related branches found
No related tags found
No related merge requests found
...@@ -41,7 +41,7 @@ def controlLoopClikExternalGoal( ...@@ -41,7 +41,7 @@ def controlLoopClikExternalGoal(
ik_solver, ik_solver,
i, i,
past_data, past_data,
): ) -> tuple[bool, dict[str, np.ndarray], dict[str, np.ndarray]]:
""" """
controlLoop controlLoop
----------------------------- -----------------------------
...@@ -61,11 +61,12 @@ def controlLoopClikExternalGoal( ...@@ -61,11 +61,12 @@ def controlLoopClikExternalGoal(
SEerror = T_w_e.actInv(T_goal) SEerror = T_w_e.actInv(T_goal)
err_vector = pin.log6(SEerror).vector err_vector = pin.log6(SEerror).vector
J = pin.computeFrameJacobian(robot.model, robot.data, q, robot.ee_frame_id) J = pin.computeFrameJacobian(robot.model, robot.data, q, robot.ee_frame_id)
K = np.diag([30] * 3 + [15] * 3)
qd_cmd = ( qd_cmd = (
J.T J.T
@ np.linalg.inv(J @ J.T + np.eye(J.shape[0], J.shape[0]) * args.tikhonov_damp) @ np.linalg.inv(J @ J.T + np.eye(J.shape[0], J.shape[0]) * args.tikhonov_damp)
@ (err_vector + data["v"]) @ (K @ err_vector + data["v"])
) )
if qd_cmd is None: if qd_cmd is None:
print("the controller you chose didn't work, using dampedPseudoinverse instead") print("the controller you chose didn't work, using dampedPseudoinverse instead")
......
from argparse import Namespace
from typing import Any, Callable
import numpy as np
from smc.robots.robotmanager_abstract import AbstractRobotManager
global control_loop_return
control_loop_return = tuple[bool, dict[str, np.ndarray], dict[str, np.ndarray]]
def GenericControlLoop(
X: Any,
control_loop: Callable[
[Any, Namespace, AbstractRobotManager, dict[str, np.ndarray], float],
control_loop_return,
],
args: Namespace,
robot: AbstractRobotManager,
past_data: dict[str, np.ndarray],
t: int, # will be float eventually
) -> control_loop_return:
breakFlag = False
log_item = {}
save_past_item = {}
control_loop(X, args, robot, past_data, t)
return breakFlag, log_item, save_past_item
from smc.robots.robotmanager_abstract import AbstractRobotManager
from smc.multiprocessing.process_manager import ProcessManager
from pinocchio import SE3, log6
from argparse import Namespace
from typing import Any, Callable
import numpy as np
global control_loop_return
control_loop_return = tuple[bool, dict[str, np.ndarray], dict[str, np.ndarray]]
def PathFollowingFromPlannerControlLoop(
path_planner: ProcessManager,
get_position: Callable[[AbstractRobotManager], np.ndarray],
SOLVER: Callable,
control_loop: Callable[
[Any, np.ndarray, Namespace, AbstractRobotManager, dict[str, np.ndarray], int],
np.ndarray,
],
args: Namespace,
robot: AbstractRobotManager,
past_data: dict[str, np.ndarray],
t: int, # will be float eventually
) -> control_loop_return:
"""
Point2PointControlLoop
---------------
generic control loop for point to point motion
(handling error to final point etc).
"""
breakFlag = False
log_item = {}
save_past_item = {}
p = get_position(robot)
path_planner.sendCommand(p)
data = path_planner.getData()
if data == "done":
breakFlag = True
if data == "done" or data is None:
robot.sendVelocityCommand(np.zeros(robot.model.nv))
log_item["qs"] = robot.q
log_item["dqs"] = robot.v
return breakFlag, save_past_item, log_item
_, path2D_untimed = data
path2D_untimed = np.array(path2D_untimed).reshape((-1, 2))
v_cmd, log_item_inner = control_loop(
SOLVER, path2D_untimed, args, robot, past_data, t
)
robot.sendVelocityCommand(v_cmd)
log_item.update(log_item_inner)
log_item["qs"] = robot.q
log_item["dqs"] = robot.v
log_item["dqs_cmd"] = v_cmd.reshape((robot.model.nv,))
return breakFlag, save_past_item, log_item
from smc.robots.interfaces.single_arm_interface import SingleArmInterface
from pinocchio import SE3, log6
from argparse import Namespace
from typing import Any, Callable
import numpy as np
global control_loop_return
control_loop_return = tuple[bool, dict[str, np.ndarray], dict[str, np.ndarray]]
def Point2PointControlLoop(
SOLVER: Callable,
T_w_goal: SE3,
control_loop: Callable[
[Any, SE3, Namespace, SingleArmInterface, dict[str, np.ndarray], int],
np.ndarray,
],
args: Namespace,
robot: SingleArmInterface,
past_data: dict[str, np.ndarray],
t: int, # will be float eventually
) -> control_loop_return:
"""
Point2PointControlLoop
---------------
generic control loop for point to point motion
(handling error to final point etc).
"""
breakFlag = False
log_item = {}
save_past_item = {}
v_cmd = control_loop(SOLVER, T_w_goal, args, robot, past_data, t)
robot.sendVelocityCommand(v_cmd)
T_w_e = robot.T_w_e
SEerror = T_w_e.actInv(T_w_goal)
err_vector = log6(SEerror).vector
if np.linalg.norm(err_vector) < robot.args.goal_error:
breakFlag = True
log_item["qs"] = robot.q
log_item["err_norm"] = np.linalg.norm(err_vector).reshape((1,))
log_item["dqs"] = robot.v
log_item["dqs_cmd"] = v_cmd.reshape((robot.model.nv,))
return breakFlag, save_past_item, log_item
...@@ -305,7 +305,9 @@ def BaseAndEEPathFollowingFromPlannerMPCControlLoop( ...@@ -305,7 +305,9 @@ def BaseAndEEPathFollowingFromPlannerMPCControlLoop(
return breakFlag, save_past_dict, log_item return breakFlag, save_past_dict, log_item
def initializePastData(args: Namespace, T_w_e: SE3, p_base: np.ndarray, max_base_v: float) -> np.ndarray: def initializePastData(
args: Namespace, T_w_e: SE3, p_base: np.ndarray, max_base_v: float
) -> np.ndarray:
# prepopulate past data to make base and cart be on the same path in the past # prepopulate past data to make base and cart be on the same path in the past
# (which didn't actually happen because this just started) # (which didn't actually happen because this just started)
p_ee = T_w_e.translation[:2] p_ee = T_w_e.translation[:2]
......
...@@ -44,7 +44,14 @@ def client_receiver(args: Namespace, init_command, shm_name: str, lock): ...@@ -44,7 +44,14 @@ def client_receiver(args: Namespace, init_command, shm_name: str, lock):
msg_in_bytes = b"" msg_in_bytes = b""
len_size_offset = 0 len_size_offset = 0
while True: while True:
try:
next_pos, pos = _DecodeVarint32(buffer, pos) next_pos, pos = _DecodeVarint32(buffer, pos)
except IndexError:
print(
"NETWORKING CLIENT_RECEIVER: got IndexError while decoding message length. \
this only happens if the sender died. i'll exit as well"
)
return None, -1
# TODO: either save the message chunk, or save how many initial bytes to ignore in the next message # TODO: either save the message chunk, or save how many initial bytes to ignore in the next message
if pos + next_pos > buffer_len: if pos + next_pos > buffer_len:
# print("NETWORKING CLIENT: BUFFER OVERFLOW, DROPPING MSG!") # print("NETWORKING CLIENT: BUFFER OVERFLOW, DROPPING MSG!")
...@@ -81,6 +88,9 @@ def client_receiver(args: Namespace, init_command, shm_name: str, lock): ...@@ -81,6 +88,9 @@ def client_receiver(args: Namespace, init_command, shm_name: str, lock):
msg_raw = s.recv(1024) msg_raw = s.recv(1024)
buffer += msg_raw buffer += msg_raw
msg_in_bytes, pos = parse_message(buffer) msg_in_bytes, pos = parse_message(buffer)
# -1 means an error happened
if pos == -1:
break
buffer = buffer[pos:] buffer = buffer[pos:]
dict_msg = encoder_decoder.serializedPb2MsgToDict(msg_in_bytes, msg_code) dict_msg = encoder_decoder.serializedPb2MsgToDict(msg_in_bytes, msg_code)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment