Skip to content
Snippets Groups Projects
Commit d37e5246 authored by Mawio's avatar Mawio
Browse files

Implemented location message broadcast

parent 693cb6f5
No related branches found
No related tags found
No related merge requests found
...@@ -37,7 +37,6 @@ location_info = "" #location info such lat & long, it is saved to send later in ...@@ -37,7 +37,6 @@ location_info = "" #location info such lat & long, it is saved to send later in
def separate_bytes(hex_str, delim=' '): def separate_bytes(hex_str, delim=' '):
return re.sub(r'.{2}', r'\g<0>{}'.format(delim), hex_str).strip() return re.sub(r'.{2}', r'\g<0>{}'.format(delim), hex_str).strip()
AUTH_PAGE_0_LEN = 17 AUTH_PAGE_0_LEN = 17
AUTH_PAGE_LEN = 23 AUTH_PAGE_LEN = 23
...@@ -45,6 +44,117 @@ g_lat, g_lon = 58.398176, 15.575927 ...@@ -45,6 +44,117 @@ g_lat, g_lon = 58.398176, 15.575927
executor = ThreadPoolExecutor(5) executor = ThreadPoolExecutor(5)
def broadcast(payload):
"""Utility function to broadcast data"""
headers = "0x1e 0x16 0xfa 0xff 0x0d 0x00 0x00 0x42" #unknown meaning
num_bytes = len(payload.split(" ")) + len(headers.split(" "))
padding = "0x00 " * (31-num_bytes)
padding = padding.rstrip()
if is_wifi:
beacon_wifi(f"{payload} {padding}")
else:
if is_bt5:
"""
OGF: 0x08 (LE controller commands)
OCF: 0x0037 (LE Set Extended Advertising Data command)
Advertising handle: 0x00 (Advertising set number)
Operation: 0x03 (Complete extended advertising data)
Fragment_Preference: 0x01 (The Controller should not fragment or should minimize fragmentation of Host advertising data)
Advertising_Data_Length: 0x1f
"""
settings = "0x08 0x0037 0x00 0x03 0x01 0x1f"
subprocess.run(
f"hcitool -i hci0 cmd {settings} {headers} {payload} {padding}", shell=True)
else:
"""
OGF: 0x08 (LE controller commands)
OCF: 0x0008 (LE Set Advertising Data command)
Advertising_Data_Length: 0x1f
"""
settings = "0x08 0x0008 0x1f"
print(f"hcitool -i hci0 cmd {settings} {headers} {payload} {padding}")
subprocess.run(
f"hcitool -i hci0 cmd {settings} {headers} {payload} {padding}", shell=True)
# Basic ID (ASTM Message 0x0) Type 1
# Updated to draft-ietf-drip-auth-37
def broadcast_basic_id_1(hhit):
print(f"basic_id_1 with hhit: {hhit}")
"""
Serial Number = MFR Code | Length Code | MFR SN
MFR Code = ICAO MFR Code
Length Code = 0xF
MFR SN = HHIT Suite ID | ORCHID hash left-padded with 3 bits of zeros and encoded into 15 characters
"""
MFR_code = "8653"
# Extract HHIT Suite ID and ORCHID hash from the hhit bytes
hhit_bytes = hhit.split(' ')
hhit_suite_id = hhit_bytes[-9]
ORCHID_hash = "".join(hhit_bytes[-8:])
# Convert MFR SN to binary and add left padding of 3 bits of zeros
MFR_SN = bin(int(hhit_suite_id + ORCHID_hash, 16))[2:].zfill(75)
# Encoding of MFR_SN according to RFC9374
encoding_dictionary = [
"0", "1", "2", "3", "4", "5", "6", "7",
"8", "9", "A", "B", "C", "D", "E", "F",
"G", "H", "J", "K", "L", "M", "N", "P",
"Q", "R", "T", "U", "V", "W", "X", "Y"
]
MFR_SN_encoded = ""
for i in range(0, len(MFR_SN), 5):
bit_sequence = MFR_SN[i: i + 5]
MFR_SN_encoded += encoding_dictionary[int(bit_sequence, 2)]
serial_number = f"{MFR_code}F{MFR_SN_encoded}"
# Convert serial number to hex
hex_serial_number = ""
for c in serial_number:
hex_serial_number += hex(ord(c))
if c != serial_number[-1]:
hex_serial_number += " "
"""
ID Type and UA Type: 0x1f
"""
broadcast(f"0x1f {hex_serial_number}")
def broadcast_location(gpsd: GpsPoller):
# If gpsd is active, check that it has received values
if gpsd and gpsd.get_current_value() == None:
return
if gpsd:
lat = separate_bytes(astm_float_to_int_hex(gpsd.get_lat()))
lon = separate_bytes(astm_float_to_int_hex(gpsd.get_lon()))
else:
lat = separate_bytes(astm_float_to_int_hex(g_lat))
lon = separate_bytes(astm_float_to_int_hex(g_lon))
"""
ASTM Standard F3411, TABLE 6 Location/Vector Message Details (page 14)
Status (Bits [7..4]): 1 (Ground)
"""
location_info = f"0x10 0x00 0x00 0x00 {lat} {lon} 0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x00"
broadcast(f"{location_info}")
def get_auth_pages(signature: bytes, auth_type=0x10): def get_auth_pages(signature: bytes, auth_type=0x10):
""" """
...@@ -85,8 +195,6 @@ def Drip_Wrapper(gpsd: GpsPoller, iroha: DripIrohaAccount, flight_id, HostID: Ho ...@@ -85,8 +195,6 @@ def Drip_Wrapper(gpsd: GpsPoller, iroha: DripIrohaAccount, flight_id, HostID: Ho
print("location_update") print("location_update")
global executor global executor
# If gpsd is active, check that it has received values # If gpsd is active, check that it has received values
if gpsd and gpsd.get_current_value() == None: if gpsd and gpsd.get_current_value() == None:
return return
...@@ -175,107 +283,6 @@ def manifest_broadcast(HostID: HostIdentity, flight_id, exp_date, prev_loc = "" ...@@ -175,107 +283,6 @@ def manifest_broadcast(HostID: HostIdentity, flight_id, exp_date, prev_loc = ""
"hcitool -i hci0 cmd 0x08 0x0008 1f 1e 16 fa ff 0d 00 10 {payload}".format(payload=payload), shell=True) "hcitool -i hci0 cmd 0x08 0x0008 1f 1e 16 fa ff 0d 00 10 {payload}".format(payload=payload), shell=True)
# Basic ID (ASTM Message 0x0) Type 1
# Updated to draft-ietf-drip-auth-37
def broadcast_basic_id_1(hhit):
print(f"basic_id_1 with hhit: {hhit}")
"""
Serial Number = MFR Code | Length Code | MFR SN
MFR Code = ICAO MFR Code
Length Code = 0xF
MFR SN = HHIT Suite ID | ORCHID hash left-padded with 3 bits of zeros and encoded into 15 characters
"""
MFR_code = "8653"
hhit_bytes = hhit.split(' ')
hhit_suite_id = hhit_bytes[-9]
ORCHID_hash = "".join(hhit_bytes[-8:])
MFR_SN = bin(int(hhit_suite_id + ORCHID_hash, 16))[2:].zfill(75)
# Encoding of MFR_SN according to RFC9374
encoding_dictionary = [
"0", "1", "2", "3", "4", "5", "6", "7",
"8", "9", "A", "B", "C", "D", "E", "F",
"G", "H", "J", "K", "L", "M", "N", "P",
"Q", "R", "T", "U", "V", "W", "X", "Y"
]
MFR_SN_encoded = ""
for i in range(0, len(MFR_SN), 5):
bit_sequence = MFR_SN[i: i + 5]
MFR_SN_encoded += encoding_dictionary[int(bit_sequence, 2)]
serial_number = f"{MFR_code}F{MFR_SN_encoded}"
# Set (Extended) Advertising Command
if is_wifi:
beacon_wifi("{} 0x00 0x00 0x00 0x00 0x00 0x00 0x00".format('0x01', hhit))
else:
if is_bt5:
"""
OGF: 0x08 (LE controller commands)
OCF: 0x0037 (LE Set Extended Advertising Data command)
Advertising handle: 0x00 (Advertising set number)
Operation: 0x03 (Complete extended advertising data)
Fragment_Preference: 0x01 (The Controller should not fragment or should minimize fragmentation of Host advertising data)
Advertising_Data_Length: 0x1f
"""
settings = "0x08 0x0037 0x00 0x03 0x01 0x1f"
"""
ID Type and UA Type: 0x1f
"""
subprocess.run(
f"hcitool -i hci0 cmd {settings} 0x1f {serial_number} 0x00 0x00 0x00 0x00 0x00 0x00 0x00", shell=True)
else:
"""
OGF: 0x08 (LE controller commands)
OCF: 0x0008 (LE Set Advertising Data command)
Advertising_Data_Length: 0x1f
Advertising_Data: 1e 16 fa ff 0d 00 00 42 + hhit
"""
subprocess.run(
f"hcitool -i hci0 cmd 0x08 0x0008 0x1f 0x1e 0x16 0xfa 0xff 0x0d 0x00 0x00 0x42 {hhit} 0x00 0x00 0x00 0x00 0x00 0x00 0x00", shell=True)
# session id / hhit should be passed in the basic_id_update function
def basic_id_update(hhit):
# ASTM F3-411 Basic ID message 0x0 with HHIT.
print("basic_id_update")
# Set (Extended) Advertising Command
if is_wifi:
beacon_wifi("{} 0x00 0x00 0x00 0x00 0x00 0x00 0x00".format('0x01', hhit))
else:
if is_bt5:
"""
OGF: 0x08 (LE controller commands)
OCF: 0x0037 (LE Set Extended Advertising Data command)
Advertising handle: 0x00 (Advertising set number)
Operation: 0x03 (Complete extended advertising data)
Fragment_Preference: 0x01 (The Controller should not fragment or should minimize fragmentation of Host advertising data)
Advertising_Data_Length: 0x1f
Advertising_Data: 1e 16 fa ff 0d 00 10 + hhit
"""
subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0037 0x00 0x03 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d 0x00 0x00 0x42 {} 0x00 0x00 0x00 0x00 0x00 0x00 0x00".format('0x01', hhit), shell=True)
else:
"""
OGF: 0x08 (LE controller commands)
OCF: 0x0008 (LE Set Advertising Data command)
Advertising_Data_Length: 0x1f
Advertising_Data: 1e 16 fa ff 0d 00 00 42 + hhit
"""
subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0008 1f 1e 16 fa ff 0d 00 00 42 {} 00 00 00 00 00 00 00".format('0x01', hhit), shell=True)
time.sleep(0.5)
def auth_update(auth_pages, msg_number): def auth_update(auth_pages, msg_number):
print("auth_update") print("auth_update")
for page in auth_pages: for page in auth_pages:
...@@ -524,41 +531,8 @@ def main(): ...@@ -524,41 +531,8 @@ def main():
i = 0 i = 0
try: try:
broadcast_basic_id_1(hhit) broadcast_basic_id_1(hhit)
""" broadcast_location(gpsd)
while True:
os.system('clear')
print("The flight_id is {}".format(flight_id))
# Send basic_id and auth at least once every 3 seconds.
# Send basic_id first second
if i % 3 == 0:
broadcast_basic_id_1(hhit)
# Send first half of auth pages second second
elif i % 3 == 1:
auth_update(auth_pages[:2], 1)
# Send last half of auth pages last second
elif i % 3 == 2:
auth_update(auth_pages[2:], 2)
# saving the hash_location information such as lon & lat, to use as previous location_hash later in Manifests
prev_location_info = location_info
# Location updates at least once every second.
Drip_Wrapper(gpsd, iroha, flight_id, hi, exp_date)
# Manifest broadcast
manifest_broadcast(hi, flight_id, exp_date, prev_location_info)
if args.run_scenario:
g_lat += random.uniform(0.0001, 0.001) * \
random.choice([-1, 0, 1])
g_lon += random.uniform(0.0001, 0.001) * \
random.choice([-1, 0, 1])
print(g_lat, g_lon)
i += 1
time.sleep(1 - time.monotonic() % 1)
"""
except KeyboardInterrupt: except KeyboardInterrupt:
# Reset bt # Reset bt
subprocess.run("hcitool -i hci0 cmd 0x03 0x0003", shell=True) subprocess.run("hcitool -i hci0 cmd 0x03 0x0003", shell=True)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment