Skip to content
Snippets Groups Projects
Commit ed88aa80 authored by Tommy Persson's avatar Tommy Persson
Browse files

Work on doc

parent 15fa1c53
No related branches found
No related tags found
No related merge requests found
import serial, time
from pynmeagps import NMEAReader
SERIALPORT = "/dev/ttyUSB1"
BAUDRATE = 57600
import serial
ser = serial.Serial(port=SERIALPORT, baudrate=BAUDRATE, bytesize=8, parity='N', stopbits=1)
print(ser.name) # check which port was really used
str = b""
while True:
ch = ser.read()
print("CH:", ch)
str += ch
print("STR:", str)
if ch == b"\n":
print("STR:", str)
msg = NMEAReader.parse(str)
print("MSG:", msg)
str = b""
import os
import collections.abc
import collections
collections.MutableMapping = collections.abc.MutableMapping
import math
import rclpy
from rclpy.node import Node
from rclpy.executors import MultiThreadedExecutor
from rclpy.callback_groups import ReentrantCallbackGroup
from geometry_msgs.msg import PoseStamped
from haversine import haversine
import serial, time
from pynmeagps import NMEAMessage, GET
from lrs_util.util import euler_from_quaternion
from lrs_util.coordtrans import CoordTrans
from datetime import datetime, timezone
from dronekit import connect, VehicleMode, LocationGlobal
class GGA() :
def __init__(self, lat, lon, alt, geoid_height):
utc = datetime.now(timezone.utc).time()
self.gga = NMEAMessage('GP','GGA', GET, time=utc,
lat=lat, lon=lon, alt=alt, numSV=24, HDOP=0.5,
altUnit="M", sepUnit="M", quality=4, sep=geoid_height,
diffAge="", diffStation="")
def serialize(self):
return self.gga.serialize()
class GSA() :
def __init__(self):
self.gsa = NMEAMessage('GN','GSA', GET, opMode="A", navMode=3,
svid_01=0,
PDOP=0.5, HDOP=0.5, VDOP=0.5, systemId=""
)
def serialize(self):
return self.gsa.serialize()
class HDM() :
def __init__(self, heading_deg):
self.hdm = NMEAMessage('GP','HDM', GET,
headingM=heading_deg, deviation=6.0
)
def serialize(self):
return self.hdm.serialize()
class HDT() :
def __init__(self, heading_deg):
self.hdt = NMEAMessage('GP','HDT', GET,
headingT=heading_deg,
headingTu="T"
)
def serialize(self):
return self.hdt.serialize()
class RMC() :
def __init__(self, lat, lon, speed, track_angle):
knots = speed*1.943844
utc = datetime.now(timezone.utc).time()
date = datetime.now(timezone.utc).date()
magnetic_variation = 0.0 #degreed
self.rmc = NMEAMessage('GP','RMC', GET, time=utc, status="A",
lat=lat, lon=lon, spd=knots, cog=track_angle,
date=date, mv=magnetic_variation,
mvEW="", posMode="", navStatus=""
)
def serialize(self):
return self.rmc.serialize()
class ViconToNmea(Node):
def __init__(self):
super().__init__('vicon_to_nmea')
self.group = ReentrantCallbackGroup()
self.coordtrans = CoordTrans(location=os.environ.get("LRS_LOCATION"))
self.timer_period = 1/5.0 # seconds
namespace = self.get_namespace()
self.pose_sub = self.create_subscription(PoseStamped, f"/vicon{namespace}", self.pose_callback, 10, callback_group=self.group)
self.timer = self.create_timer(self.timer_period, self.timer_callback, callback_group=self.group)
self.timer_counter = 0
self.current_pose = None
self.n_sum = 5
self.distance_index = 0
self.distance = []
self.x0 = 0.0
self.y0 = 0.0
self.have_position = False
self.have_speed = False
self.lat0 = 0.0
self.lon0 = 0.0
self.heading = None
self.port = "/dev/ttyUSB0"
self.baudrate = 57600
self.ser = serial.Serial(port=self.port, baudrate=self.baudrate, bytesize=8, parity='N', stopbits=1) # open serial port
print("SERIAL PORT:", self.ser.name)
#self.drone = drone = System()
#await self.drone.connect(system_address=f"tcp:{self.host}:{self.port}")
self.host = "10.8.0.64" # miniugv 7.203
self.port = 14553
connection_string = f"tcp:{self.host}:{self.port}"
# self.vehicle = connect(connection_string, wait_ready=True)
def send_vision_position_estimate(self, x, y, z, roll, pitch, yaw):
try:
usec = 0
roll = 0.0
pitch = 0.0
yaw = 0.0
cov = [math.nan]*21
reset_counter = 1
#"Row-major representation of pose 6x6 cross-covariance
#matrix upper right triangle (states: x, y, z, roll, pitch,
# yaw; first six entries are the first ROW, next five
# entries are the second ROW, etc.). If unknown, assign NaN value to first element in the array
# Estimate reset counter. This should be incremented when the estimate resets in any of the dimensions (position, velocity, attitude, angular speed). This is designed to be used when e.g an external SLAM system detects a loop-closure and the estimate jumps.
# https://mavlink.io/en/messages/common.html
mvmsg = self.vehicle.message_factory.vision_position_estimate_encode(
usec, # uint64_t us Timestamp (UNIX time or time since system boot)
x, y, z,
roll, pitch, yaw,
cov,
reset_counter
)
self.vehicle.send_mavlink(mvmsg)
except Exception as exc:
print("EXCEPTION send_vision_position_estimate:", type(exc), exc)
def pose_callback(self, msg):
try:
### print("pose_callback MSG:", msg)
self.current_pose = msg
pose = self.current_pose.pose
p = pose.position
q = pose.orientation
if self.have_position:
dx = p.x-self.x0
dy = p.y-self.y0
delta = math.sqrt(dx*dx+dy*dy)
if len(self.distance) == self.n_sum:
self.distance[self.distance_index] = delta
# print("DISTANCE:", self.distance)
self.dist = sum(self.distance)
self.have_speed = True
else:
self.distance.append(delta)
self.distance_index += 1
self.distance_index %= self.n_sum
self.x0 = p.x
self.y0 = p.y
self.have_position = True
(roll, pitch, yaw) = euler_from_quaternion(q.x, q.y, q.z, q.w)
self.heading = 180.0 - math.degrees(yaw)
if self.heading < 0.0:
self.heading += 360.0
if self.heading > 360.0:
self.heading -= 360.0
### self.send_vision_position_estimate(p.x, p.y, p.z, 0, 0, 0)
except Exception as exc:
print("EXCEPTION pose_callback:", type(exc))
print(exc)
def timer_callback(self):
# default lon = x, lat = y, LAT=NORTH
### self.send_vision_position_estimate(0, 0, 0, 0, 0, 0)
self.timer_counter += 1
if not self.current_pose:
return
pose = self.current_pose.pose
p = pose.position
(lon, lat, alt) = self.coordtrans.world_to_wgs84(p.x, p.y, p.z, angle_deg=-90.0)
## print("P:", p)
##(x, y, z) = self.coordtrans.wgs84_to_world(lon, lat, angle_deg=-90.0)
##print("PP:", x, y, z)
track_angle = 0.0
track_angle_flag = False
if self.timer_counter > 1:
dist = haversine((self.lat0, self.lon0), (lat, lon), unit='m')
dnorth = haversine((self.lat0, self.lon0), (lat, self.lon0), unit='m')
deast = haversine((self.lat0, self.lon0), (self.lat0, lon), unit='m')
if deast > 0.01 or dnorth > 0.01:
track_angle_flag = True
if lat < self.lat0:
dnorth = math.copysign(dnorth, -1.0)
if lon < self.lon0:
deast = math.copysign(deast, -1.0)
# print(f"HAVERSINE DIST: {dist:4.2f} {dnorth:4.2f} {deast:4.2f}")
if track_angle_flag:
track_angle = round(math.degrees(math.atan2(deast, dnorth)), 1)
self.lat0 = lat
self.lon0 = lon
# (lon1, lat1, alt1) = self.coordtrans.world_to_wgs84(p.x-100.0, p.y, p.z, angle_deg=-90.0)
##print("LON LAT ALT :", lon, lat, alt)
##print("LON1 LAT1 ALT1:", lon1, lat1, alt1)
# compute rotated x y and then got to lon lat alt
above_sea = round(alt - self.coordtrans.geoid_height, 3)
gga = GGA(lat, lon, above_sea, self.coordtrans.geoid_height)
##gga = GGA(lat, lon, above_sea, self.coordtrans.geoid_height)
##print("GGA:", gga.gga)
#print("GGA serialize:", gga.serialize())
self.ser.write(gga.serialize())
gsa = GSA()
#print("GSA:", gsa.gsa)
#print("GSA serialize:", gsa.serialize())
self.ser.write(gsa.serialize())
# hdm = HDM(self.heading)
#hdt = HDT(self.heading)
#print("HDT:", hdt.hdt)
# print("HDT serialize:", hdt.serialize())
#self.ser.write(hdt.serialize())
if self.have_speed:
speed = round(self.dist/(self.n_sum*self.timer_period), 3)
#print("SPEED:", speed)
#print("ANGLE:", track_angle)
rmc = RMC(lat, lon, speed, track_angle)
#print("RMC:", rmc.rmc, type(rmc.rmc.date))
#print("RMCserialize:", rmc.serialize())
self.ser.write(rmc.serialize())
def main(args=None):
print('Hi from tbot_connect.')
rclpy.init(args=args)
executor = MultiThreadedExecutor(num_threads=8)
coordtrans = CoordTrans(location=os.environ.get("LRS_LOCATION"))
(lon, lat, alt) = coordtrans.world_to_wgs84(0, 0, 0.0)
above_sea = round(alt - coordtrans.geoid_height, 3)
gga = GGA(lat, lon, above_sea, coordtrans.geoid_height)
utc = datetime.now(timezone.utc).time()
print("UTC:", utc, type(utc))
print("GGA:", gga.gga)
print("GGA serialize:", gga.serialize())
gsa = GSA()
print("GSA:", gsa.gsa)
print("GSA serialize:", gsa.serialize())
speed = 2.3 # m/s
track_angle = 97.0 #degrees
rmc = RMC(lat, lon, speed, track_angle)
print("RMC:", rmc.rmc, type(rmc.rmc.date))
print("RMCserialize:", rmc.serialize())
vicon = ViconToNmea()
executor.add_node(vicon)
try:
executor.spin()
except KeyboardInterrupt:
pass
executor.shutdown()
rclpy.shutdown()
if __name__ == '__main__':
main()
......@@ -25,7 +25,6 @@ setup(
tests_require=['pytest'],
entry_points={
'console_scripts': [
'vicon_to_nmea = lrs_ardupilot.vicon_to_nmea:main',
'ardu_manager = lrs_ardupilot.ardu_manager:main',
'ardu_command = lrs_ardupilot.ardu_command:main',
'ardupilot_connect = lrs_ardupilot.ardupilot_connect:main',
......
......@@ -68,7 +68,6 @@ def main_unit(ns):
if options.vicon:
ros2("vicon", "ros2 launch lrs_vicon vicon.launch.py rate:=10")
ros2("vicon_to_nmea", f"ros2 run lrs_ardupilot vicon_to_nmea --ros-args -r __ns:={ns}")
if options.jazzy:
ros2("mavproxy", f"cd; source venv/bin/activate; mavproxy.py --master=tcp:localhost:14552 --out=tcpin:mavproxy:14553", run=False)
else:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment