Skip to content
Snippets Groups Projects
Commit 1ff2fed6 authored by Felix Agner's avatar Felix Agner
Browse files

initial commit. Added code along with a build so that it can hopefully be easily installed

parent 5b712627
No related branches found
No related tags found
No related merge requests found
LICENSE 0 → 100644
MIT License
Copyright (c) 2022 Department of Automatic Control, Lund University
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
File added
File added
File added
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "omnibot_client"
version = "0.0.1"
authors = [
{ name="Felix Agner", email="felix.agner@control.lth.se" },
]
description = "A python package for connecting to omnibots via TCP."
readme = "README.md"
requires-python = ">=3.7"
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
]
[project.urls]
"Homepage" = "https://gitlab.control.lth.se/processes/omnibot/omnibotclient.py"
"Bug Tracker" = "https://gitlab.control.lth.se/processes/omnibot/omnibotclient.py/issues"
# general
import numpy as np
import sys
from time import time, sleep
# threads
import threading
# Controller
from inputs import get_gamepad
from Omnibot import OmniBot
class ControllerState:
"""
Class that logs the controller state
"""
def __init__(self):
""" Initialize the controller """
self.x = 128
self.y = 128
self.r = 128
self.on = True
def set_x(self,x):
self.x = x
def set_y(self,y):
self.y = y
def set_r(self,r):
self.r = r
def state_data(self):
return 'x'+str(self.x)+'y'+str(self.y)+'r'+str(self.r)
def set_state(self,data):
s = str(data)
ind = [s.find(i) for i in ['x','y','r']]
self.set_x(int(s[ind[0]+1:ind[1]]))
self.set_y(int(s[ind[1]+1:ind[2]]))
self.set_r(int(s[ind[2]+1:-1]))
def log_controller(state):
"""
Function that continuously listens to any event from the controller, and logs the change in
the controller state "state"
"""
while 1:
events = get_gamepad()
for event in events:
if event.code == "ABS_X":
state.set_x(event.state)
elif event.code == "ABS_Y":
state.set_y(event.state)
elif event.code == "ABS_Z":
state.set_r(event.state)
elif event.code == "BTN_PINKIE" and event.state == 1:
# The "BTN_PINKIE" (right index trigger)
# is currently used as an off-button.
state.on = False
def run_omnibot_with_controller(HOST,verbose=False):
"""
Connect to an omnibot at ip HOST and proceed to steer it with a controller and print current position of omnibot.
"""
# Robot parameters
R = 0.16 # Distance between center of robot and wheels
a1 = 2*np.pi/3 # Angle between x axis and first wheel
a2 = 4*np.pi/3 # Angle between x axis and second wheel
r = 0.028*0.45/18 # Wheel radius. Has been fudge-factored because the actual velocity of the wheels did not align with the set-points.
vel_factor = 6 # Fudge factor that changes the relative velocity of the bot
def phidot(xdot,ang):
"""Returns reference velocities for the wheels, given current system state and reference velocities"""
M = -1/r*np.array([[-np.sin(ang), np.cos(ang), R ],[-np.sin(ang+a1), np.cos(ang+a1), R],[-np.sin(ang+a2), np.cos(ang+a2), R]])
return M.dot(xdot)
# Start loggin controller
state = ControllerState()
t = threading.Thread(target=log_controller, args=(state,), daemon=True)
t.start()
ts = 0.1 # sampling time
with OmniBot(HOST,verbose=verbose) as bot:
no_error = True
while state.on and no_error:
t0 = time()
# Get gamepad updates.
# Use some scaling to turn the controller state values into
# reasonable velocities.
w = (128-state.r)*np.pi/1000
vy = (128-state.x)/1500
vx = (128-state.y)/1500
dphi = vel_factor*phidot([vx,vy,w],0.0)
for i,v in enumerate(dphi):
bot.set_speed(i,round(v))
print('x:'+bot.get_x())
print('y:'+bot.get_y())
sleep(max(0,t0+ts-time()))
if __name__ == '__main__':
# Server settings
HOST = "localhost"
if len(sys.argv) > 1:
# If an input is given to the script, it will be interpreted as the intended
# IP-address. Baseline is localhost.
HOST = sys.argv[1]
print(f"HOST: \t {HOST}")
run_omnibot_with_controller(HOST)
\ No newline at end of file
# general
import sys
from time import time, sleep
from Omnibot import OmniBot
def run_dummybot(HOST,verbose=True):
"""
Runs a dummybot at host HOST that rotates a bit while printing x and y coordinates.
"""
ts = 0.1 # sampling time
tstart = time()
with OmniBot(HOST,verbose=verbose) as bot:
print("Connected")
while time() < tstart + 5:
t0 = time()
bot.set_speed([])
print('x:'+bot.get_x())
print('y:'+bot.get_y())
sleep(max(0,t0+ts-time()))
if __name__ == '__main__':
# Server settings
HOST = "localhost"
if len(sys.argv) > 1:
# If an input is given to the script, it will be interpreted as the intended
# IP-address. Baseline is localhost.
HOST = sys.argv[1]
print(f"HOST: \t {HOST}")
run_dummybot(HOST)
\ No newline at end of file
import socket
class OmniBot(object):
"""
A class that implements a servo-client which is capable of sending servo speed-settings
to an omnibot.
HOST: Host name of server (string)
PORT: Port to connect to (int) (default 9998)
verbose: choose level of chit-chat (boolean)
"""
def __init__(self,HOST,PORT=9998,verbose=False):
self.HOST = HOST
self.PORT = PORT
self.verbose=verbose
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
def __enter__(self):
"""
Boot up the socket and connect to the omnibot server
"""
if self.verbose:
print("Connecting to HOST: " + str(self.HOST) + ", PORT: " + str(self.PORT))
self.sock.connect((self.HOST, self.PORT)) # Connect to the robot
if self.verbose:
print("Connection successful.")
return self
def __exit__(self, exc_type, exc_value, exc_tb):
"""
Close down the socket after closing the client
"""
if self.verbose:
print("Closing down socket")
self.sock.close()
def send_and_receive(self,message):
"""Send a message to the omnibot and return the answer that the omnibot gave."""
# Send the package
self.sock.sendall(bytes(message,'utf-8'))
# Receive confirmation
ret=self.sock.recv(1024).decode('utf-8')
return ret
def set_speed(self,i,v):
"""
Set the speed of servo i to v
Communicates with messages on the form "wivvvv"
where w stands for "write"self.
i stands for index of desired servo
vvvv stands for requested target speed for the servo
"""
package = 'w'+str(i)+str(v)
if self.verbose:
print("Sending " + package)
ret = self.send_and_receive(package)
if ret[0] == "e":
raise Exception(ret)
elif self.verbose:
print(ret)
def set_speeds(self,v):
"""
Set the speed of the servos to the values in vector v
"""
for (i,vi) in enumerate(v):
self.set_speed(i,vi)
def get_x(self):
"""Get x coordinate
Sends a message "rx"
where "r" stands for "read" and "x" stands for "x"
"""
if self.verbose:
print("Requesting x")
ret = self.send_and_receive("rx")
if ret[0] == "e":
raise Exception(ret)
if self.verbose:
print("x: "+ret)
return float(ret)
def get_y(self):
"""Get y coordinate
Sends a message "ry"
where "r" stands for "read" and "y" stands for "y"
"""
if self.verbose:
print("Requesting y")
ret = self.send_and_receive("ry")
if ret[0] == "e":
raise Exception(ret)
if self.verbose:
print("y: "+ret)
return float(ret)
def get_z(self):
"""Get z coordinate
Sends a message "rz"
where "r" stands for "read" and "z" stands for "z"
"""
if self.verbose:
print("Requesting z")
ret = self.send_and_receive("rz")
if ret[0] == "e":
raise Exception(ret)
if self.verbose:
print("z: "+ret)
return float(ret)
def get_theta(self):
"""Get x coordinate
Sends a message "rtheta"
where "r" stands for "read" and "theta" stands for "theta"
"""
if self.verbose:
print("Requesting theta")
ret = self.send_and_receive("rtheta")
if ret[0] == "e":
raise Exception(ret)
if self.verbose:
print("theta: "+ret)
return float(ret)
def get_state(self):
""""Get state vector of [x y theta]^T."""
x = self.get_x()
y = self.get_y()
theta = self.get_theta()
return [x,y,theta]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment