Skip to content
Snippets Groups Projects

Fixes and improvements to bt and drippy scripts

Merged Elmedin Zildzic requested to merge elmzi904/gps_poll_update into main
4 files
+ 137
69
Compare changes
  • Side-by-side
  • Inline
Files
4
@@ -10,7 +10,8 @@ from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
import string
import random
from dotenv.main import load_dotenv
from dotenv import load_dotenv
import signal
FILE_DIR = os.path.dirname(os.path.abspath(__file__))
BASE_DIR = os.path.join(FILE_DIR, "..")
@@ -42,6 +43,8 @@ g_lat, g_lon = 58.398176, 15.575927
executor = ThreadPoolExecutor(5)
stdout_fd = subprocess.DEVNULL
def get_auth_pages(signature: bytes, auth_type=0x10):
"""
@@ -81,6 +84,7 @@ def location_update(gpsd: GpsPoller, iroha: DripIrohaAccount, flight_id):
# ASTM F3-411 Location message 0x1
print("location_update")
global executor
global stdout_fd
# If gpsd is active, check that it has received values
if gpsd and gpsd.get_current_value() == None:
@@ -122,7 +126,7 @@ def location_update(gpsd: GpsPoller, iroha: DripIrohaAccount, flight_id):
Advertising_Data: 1e 16 fa ff 0d 00 10 + payload
"""
subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0037 0x00 0x03 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d 0x00 0x10 {payload}".format(payload=payload), shell=True)
"hcitool -i hci0 cmd 0x08 0x0037 0x00 0x03 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d 0x00 0x10 {payload}".format(payload=payload), shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
else:
"""
OGF: 0x08 (LE controller commands)
@@ -131,12 +135,13 @@ def location_update(gpsd: GpsPoller, iroha: DripIrohaAccount, flight_id):
Advertising_Data: 1e 16 fa ff 0d 00 10 + payload
"""
subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0008 1f 1e 16 fa ff 0d 00 10 {payload}".format(payload=payload), shell=True)
"hcitool -i hci0 cmd 0x08 0x0008 1f 1e 16 fa ff 0d 00 10 {payload}".format(payload=payload), shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
def basic_id_update(hhit):
# ASTM F3-411 Basic ID message 0x0 with HHIT.
print("basic_id_update")
global stdout_fd
# Set (Extended) Advertising Command
if is_wifi:
beacon_wifi(hhit)
@@ -152,7 +157,7 @@ def basic_id_update(hhit):
Advertising_Data: 1e 16 fa ff 0d 00 10 + hhit
"""
subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0037 0x00 0x03 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d 0x00 0x00 0x42 {} 0x00 0x00 0x00 0x00 0x00 0x00 0x00".format(hhit), shell=True)
"hcitool -i hci0 cmd 0x08 0x0037 0x00 0x03 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d 0x00 0x00 0x42 {} 0x00 0x00 0x00 0x00 0x00 0x00 0x00".format(hhit), shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
else:
"""
OGF: 0x08 (LE controller commands)
@@ -161,12 +166,13 @@ def basic_id_update(hhit):
Advertising_Data: 1e 16 fa ff 0d 00 00 42 + hhit
"""
subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0008 1f 1e 16 fa ff 0d 00 00 42 {} 00 00 00 00 00 00 00".format(hhit), shell=True)
"hcitool -i hci0 cmd 0x08 0x0008 1f 1e 16 fa ff 0d 00 00 42 {} 00 00 00 00 00 00 00".format(hhit), shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
time.sleep(0.5)
def auth_update(auth_pages, msg_number):
print("auth_update")
global stdout_fd
for page in auth_pages:
msg = separate_bytes(hexlify(page).decode("utf-8"))
if is_wifi:
@@ -185,10 +191,10 @@ def auth_update(auth_pages, msg_number):
"""
if msg_number == 1:
subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0037 0x00 0x01 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d {} 0x20 {}".format("00", msg), shell=True)
"hcitool -i hci0 cmd 0x08 0x0037 0x00 0x01 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d {} 0x20 {}".format("00", msg), shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
elif msg_number == 2:
subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0037 0x00 0x02 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d {} 0x20 {}".format("00", msg), shell=True)
"hcitool -i hci0 cmd 0x08 0x0037 0x00 0x02 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d {} 0x20 {}".format("00", msg), shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
else:
"""
OGF: 0x08 (LE controller commands)
@@ -197,7 +203,7 @@ def auth_update(auth_pages, msg_number):
Advertising_Data: 1e 16 fa ff 0d 00 20 + msg
"""
subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0008 1f 1e 16 fa ff 0d {} 20 {}".format("00", msg), shell=True)
"hcitool -i hci0 cmd 0x08 0x0008 1f 1e 16 fa ff 0d {} 20 {}".format("00", msg), shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
time.sleep(0.2)
@@ -273,6 +279,13 @@ def main():
type=str,
help="Specify expiration date for the attestation in the format YYYY-mm-ddTHH:MM"
)
parser.add_argument(
"-v", "--verbose",
dest="verbose",
default=False,
action="store_true",
help="Use verbose output."
)
args = parser.parse_args(sys.argv[1:])
hi_file = args.host_identity_file
@@ -280,6 +293,10 @@ def main():
is_bt5 = True if args.bluetooth_version == 5 else False
is_wifi= args.activate_wifi
global stdout_fd
if args.verbose:
stdout_fd = None # stdout
exp_date = datetime.strptime(args.exp_date, "%Y-%m-%dT%H:%M")
if (exp_date - datetime.utcnow()).total_seconds() <= 0:
print("Expiration date cannot come before current date.")
@@ -319,7 +336,7 @@ def main():
if not is_wifi:
# Reset bt
subprocess.run("hcitool -i hci0 cmd 0x03 0x0003", shell=True)
subprocess.run("hcitool -i hci0 cmd 0x03 0x0003", shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
# Set (Extended) Advertising Parameters
if is_bt5:
@@ -343,7 +360,7 @@ def main():
Scan_Request_Notification_Enable: 0x00 (Scan request notifications disabled)
"""
subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0036 0x00 0x00 0x00 0xa0 0x00 0x00 0xa0 0x00 0x00 0x07 0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x02 0x7f 0x03 0x00 0x03 0x00 0x00", shell=True)
"hcitool -i hci0 cmd 0x08 0x0036 0x00 0x00 0x00 0xa0 0x00 0x00 0xa0 0x00 0x00 0x07 0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x02 0x7f 0x03 0x00 0x03 0x00 0x00", shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
else:
"""
OGF: 0x08 (LE controller commands)
@@ -358,7 +375,7 @@ def main():
Advertising_Filter_Policy: 0x02 (Process scan requests: all devices, Connection requests: only Filter Accept List)
"""
subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0006 a0 00 a0 00 03 00 00 00 00 00 00 00 00 07 02", shell=True)
"hcitool -i hci0 cmd 0x08 0x0006 a0 00 a0 00 03 00 00 00 00 00 00 00 00 07 02", shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
# Enable (Extended) Bluetooth Advertisement
if is_bt5:
@@ -371,52 +388,67 @@ def main():
Duration[i]: 0x0000 (Advertise until the Host disables it)
Max_Extended_Advertising_Events[i]: 0x00 (No maximum)
"""
subprocess.run("hcitool -i hci0 cmd 0x08 0x0039 0x01 0x01 0x00 0x00 0x00 0x00", shell=True)
subprocess.run("hcitool -i hci0 cmd 0x08 0x0039 0x01 0x01 0x00 0x00 0x00 0x00", shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
else:
"""
OGF: 0x08 (LE controller commands)
OCF: 0x000a (LE Set Advertising Enable command)
Enable: 0x01 (True)
"""
subprocess.run("hcitool -i hci0 cmd 0x08 0x000a 01", shell=True)
subprocess.run("hcitool -i hci0 cmd 0x08 0x000a 01", shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
gpsd = None
if not args.run_scenario and args.gps:
gpsd = GpsPoller()
gpsd.start()
i = 0
try:
while True:
os.system('clear')
print("The flight_id is {}".format(flight_id))
# Send basic_id and auth at least once every 3 seconds.
# Send basic_id first second
if i % 3 == 0:
basic_id_update(hhit)
# Send first half of auth pages second second
elif i % 3 == 1:
auth_update(auth_pages[:2], 1)
# Send last half of auth pages last second
elif i % 3 == 2:
auth_update(auth_pages[2:], 2)
# Location updates at least once every second.
location_update(gpsd, iroha, flight_id)
if args.run_scenario:
g_lat += random.uniform(0.0001, 0.001) * \
random.choice([-1, 0, 1])
g_lon += random.uniform(0.0001, 0.001) * \
random.choice([-1, 0, 1])
i += 1
time.sleep(1 - time.monotonic() % 1)
except KeyboardInterrupt:
def signal_handler(_signo, _stack_frame):
if gpsd:
gpsd.stop_thread()
gpsd.join()
# Reset bt
if not is_wifi:
# Reset bt
subprocess.run("hcitool -i hci0 cmd 0x03 0x0003", shell=True)
sys.exit(0)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
i = 0
while True:
os.system('clear')
print("The flight_id is {}".format(flight_id))
if hi and not hi.is_hhit_verified():
print("HHIT verification failed")
if gpsd and gpsd.has_error():
print(f'GpsPoller received an error: {gpsd.get_error_msg()}')
# Send basic_id and auth at least once every 3 seconds.
# Send basic_id first second
if i % 3 == 0:
basic_id_update(hhit)
# Send first half of auth pages second second
elif i % 3 == 1:
auth_update(auth_pages[:2], 1)
# Send last half of auth pages last second
elif i % 3 == 2:
auth_update(auth_pages[2:], 2)
# Location updates at least once every second.
location_update(gpsd, iroha, flight_id)
if args.run_scenario:
g_lat += random.uniform(0.0001, 0.001) * \
random.choice([-1, 0, 1])
g_lon += random.uniform(0.0001, 0.001) * \
random.choice([-1, 0, 1])
i += 1
time.sleep(1 - time.monotonic() % 1)
if __name__ == '__main__':
Loading