Skip to content
Snippets Groups Projects
Commit 139b0ea6 authored by Elmedin Zildzic's avatar Elmedin Zildzic :alien:
Browse files

Fixes and improvements to bt and drippy scripts

* Fix bs4 constructor args, now uses xml
* Add verbosity option for hcitool commands
* Output errors
* Implement graceful shutdown for GpsPoller
* GpsPoller now opens UNIX socket instead of using GPS lib
parent 3b2920e0
No related branches found
No related tags found
1 merge request!1Fixes and improvements to bt and drippy scripts
This commit is part of merge request !1. Comments created here will be created in the context of that merge request.
......@@ -10,7 +10,8 @@ from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
import string
import random
from dotenv.main import load_dotenv
from dotenv import load_dotenv
import signal
FILE_DIR = os.path.dirname(os.path.abspath(__file__))
BASE_DIR = os.path.join(FILE_DIR, "..")
......@@ -42,6 +43,8 @@ g_lat, g_lon = 58.398176, 15.575927
executor = ThreadPoolExecutor(5)
stdout_fd = subprocess.DEVNULL
def get_auth_pages(signature: bytes, auth_type=0x10):
"""
......@@ -81,6 +84,7 @@ def location_update(gpsd: GpsPoller, iroha: DripIrohaAccount, flight_id):
# ASTM F3-411 Location message 0x1
print("location_update")
global executor
global stdout_fd
# If gpsd is active, check that it has received values
if gpsd and gpsd.get_current_value() == None:
......@@ -122,7 +126,7 @@ def location_update(gpsd: GpsPoller, iroha: DripIrohaAccount, flight_id):
Advertising_Data: 1e 16 fa ff 0d 00 10 + payload
"""
subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0037 0x00 0x03 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d 0x00 0x10 {payload}".format(payload=payload), shell=True)
"hcitool -i hci0 cmd 0x08 0x0037 0x00 0x03 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d 0x00 0x10 {payload}".format(payload=payload), shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
else:
"""
OGF: 0x08 (LE controller commands)
......@@ -131,12 +135,13 @@ def location_update(gpsd: GpsPoller, iroha: DripIrohaAccount, flight_id):
Advertising_Data: 1e 16 fa ff 0d 00 10 + payload
"""
subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0008 1f 1e 16 fa ff 0d 00 10 {payload}".format(payload=payload), shell=True)
"hcitool -i hci0 cmd 0x08 0x0008 1f 1e 16 fa ff 0d 00 10 {payload}".format(payload=payload), shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
def basic_id_update(hhit):
# ASTM F3-411 Basic ID message 0x0 with HHIT.
print("basic_id_update")
global stdout_fd
# Set (Extended) Advertising Command
if is_wifi:
beacon_wifi(hhit)
......@@ -152,7 +157,7 @@ def basic_id_update(hhit):
Advertising_Data: 1e 16 fa ff 0d 00 10 + hhit
"""
subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0037 0x00 0x03 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d 0x00 0x00 0x42 {} 0x00 0x00 0x00 0x00 0x00 0x00 0x00".format(hhit), shell=True)
"hcitool -i hci0 cmd 0x08 0x0037 0x00 0x03 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d 0x00 0x00 0x42 {} 0x00 0x00 0x00 0x00 0x00 0x00 0x00".format(hhit), shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
else:
"""
OGF: 0x08 (LE controller commands)
......@@ -161,12 +166,13 @@ def basic_id_update(hhit):
Advertising_Data: 1e 16 fa ff 0d 00 00 42 + hhit
"""
subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0008 1f 1e 16 fa ff 0d 00 00 42 {} 00 00 00 00 00 00 00".format(hhit), shell=True)
"hcitool -i hci0 cmd 0x08 0x0008 1f 1e 16 fa ff 0d 00 00 42 {} 00 00 00 00 00 00 00".format(hhit), shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
time.sleep(0.5)
def auth_update(auth_pages, msg_number):
print("auth_update")
global stdout_fd
for page in auth_pages:
msg = separate_bytes(hexlify(page).decode("utf-8"))
if is_wifi:
......@@ -185,10 +191,10 @@ def auth_update(auth_pages, msg_number):
"""
if msg_number == 1:
subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0037 0x00 0x01 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d {} 0x20 {}".format("00", msg), shell=True)
"hcitool -i hci0 cmd 0x08 0x0037 0x00 0x01 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d {} 0x20 {}".format("00", msg), shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
elif msg_number == 2:
subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0037 0x00 0x02 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d {} 0x20 {}".format("00", msg), shell=True)
"hcitool -i hci0 cmd 0x08 0x0037 0x00 0x02 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d {} 0x20 {}".format("00", msg), shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
else:
"""
OGF: 0x08 (LE controller commands)
......@@ -197,7 +203,7 @@ def auth_update(auth_pages, msg_number):
Advertising_Data: 1e 16 fa ff 0d 00 20 + msg
"""
subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0008 1f 1e 16 fa ff 0d {} 20 {}".format("00", msg), shell=True)
"hcitool -i hci0 cmd 0x08 0x0008 1f 1e 16 fa ff 0d {} 20 {}".format("00", msg), shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
time.sleep(0.2)
......@@ -273,6 +279,13 @@ def main():
type=str,
help="Specify expiration date for the attestation in the format YYYY-mm-ddTHH:MM"
)
parser.add_argument(
"-v", "--verbose",
dest="verbose",
default=False,
action="store_true",
help="Use verbose output."
)
args = parser.parse_args(sys.argv[1:])
hi_file = args.host_identity_file
......@@ -280,6 +293,10 @@ def main():
is_bt5 = True if args.bluetooth_version == 5 else False
is_wifi= args.activate_wifi
global stdout_fd
if args.verbose:
stdout_fd = None # stdout
exp_date = datetime.strptime(args.exp_date, "%Y-%m-%dT%H:%M")
if (exp_date - datetime.utcnow()).total_seconds() <= 0:
print("Expiration date cannot come before current date.")
......@@ -319,7 +336,7 @@ def main():
if not is_wifi:
# Reset bt
subprocess.run("hcitool -i hci0 cmd 0x03 0x0003", shell=True)
subprocess.run("hcitool -i hci0 cmd 0x03 0x0003", shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
# Set (Extended) Advertising Parameters
if is_bt5:
......@@ -343,7 +360,7 @@ def main():
Scan_Request_Notification_Enable: 0x00 (Scan request notifications disabled)
"""
subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0036 0x00 0x00 0x00 0xa0 0x00 0x00 0xa0 0x00 0x00 0x07 0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x02 0x7f 0x03 0x00 0x03 0x00 0x00", shell=True)
"hcitool -i hci0 cmd 0x08 0x0036 0x00 0x00 0x00 0xa0 0x00 0x00 0xa0 0x00 0x00 0x07 0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x02 0x7f 0x03 0x00 0x03 0x00 0x00", shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
else:
"""
OGF: 0x08 (LE controller commands)
......@@ -358,7 +375,7 @@ def main():
Advertising_Filter_Policy: 0x02 (Process scan requests: all devices, Connection requests: only Filter Accept List)
"""
subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0006 a0 00 a0 00 03 00 00 00 00 00 00 00 00 07 02", shell=True)
"hcitool -i hci0 cmd 0x08 0x0006 a0 00 a0 00 03 00 00 00 00 00 00 00 00 07 02", shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
# Enable (Extended) Bluetooth Advertisement
if is_bt5:
......@@ -371,52 +388,67 @@ def main():
Duration[i]: 0x0000 (Advertise until the Host disables it)
Max_Extended_Advertising_Events[i]: 0x00 (No maximum)
"""
subprocess.run("hcitool -i hci0 cmd 0x08 0x0039 0x01 0x01 0x00 0x00 0x00 0x00", shell=True)
subprocess.run("hcitool -i hci0 cmd 0x08 0x0039 0x01 0x01 0x00 0x00 0x00 0x00", shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
else:
"""
OGF: 0x08 (LE controller commands)
OCF: 0x000a (LE Set Advertising Enable command)
Enable: 0x01 (True)
"""
subprocess.run("hcitool -i hci0 cmd 0x08 0x000a 01", shell=True)
subprocess.run("hcitool -i hci0 cmd 0x08 0x000a 01", shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
gpsd = None
if not args.run_scenario and args.gps:
gpsd = GpsPoller()
gpsd.start()
i = 0
try:
while True:
os.system('clear')
print("The flight_id is {}".format(flight_id))
# Send basic_id and auth at least once every 3 seconds.
# Send basic_id first second
if i % 3 == 0:
basic_id_update(hhit)
# Send first half of auth pages second second
elif i % 3 == 1:
auth_update(auth_pages[:2], 1)
# Send last half of auth pages last second
elif i % 3 == 2:
auth_update(auth_pages[2:], 2)
# Location updates at least once every second.
location_update(gpsd, iroha, flight_id)
if args.run_scenario:
g_lat += random.uniform(0.0001, 0.001) * \
random.choice([-1, 0, 1])
g_lon += random.uniform(0.0001, 0.001) * \
random.choice([-1, 0, 1])
i += 1
time.sleep(1 - time.monotonic() % 1)
except KeyboardInterrupt:
def signal_handler(_signo, _stack_frame):
if gpsd:
gpsd.stop_thread()
gpsd.join()
# Reset bt
if not is_wifi:
# Reset bt
subprocess.run("hcitool -i hci0 cmd 0x03 0x0003", shell=True)
sys.exit(0)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
i = 0
while True:
os.system('clear')
print("The flight_id is {}".format(flight_id))
if hi and not hi.is_hhit_verified():
print("HHIT verification failed")
if gpsd and gpsd.has_error():
print(f'GpsPoller received an error: {gpsd.get_error_msg()}')
# Send basic_id and auth at least once every 3 seconds.
# Send basic_id first second
if i % 3 == 0:
basic_id_update(hhit)
# Send first half of auth pages second second
elif i % 3 == 1:
auth_update(auth_pages[:2], 1)
# Send last half of auth pages last second
elif i % 3 == 2:
auth_update(auth_pages[2:], 2)
# Location updates at least once every second.
location_update(gpsd, iroha, flight_id)
if args.run_scenario:
g_lat += random.uniform(0.0001, 0.001) * \
random.choice([-1, 0, 1])
g_lon += random.uniform(0.0001, 0.001) * \
random.choice([-1, 0, 1])
i += 1
time.sleep(1 - time.monotonic() % 1)
if __name__ == '__main__':
......
import threading
from gps import *
import socket
import json
GPSD_PORT = 2947
WATCH_CMD = b'?WATCH={"enable":true,"json":true}'
STOP_WATCH_CMD = b'?WATCH={"enable":false}'
class GpsPoller(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self._session = gps(mode=WATCH_ENABLE)
self.current_value = None
def __init__(self):
threading.Thread.__init__(self)
self.current_value = None
self.stop = False
self.error = False
def stop_thread(self):
self.stop = True
def get_current_value(self):
return self.current_value
def get_current_value(self):
return self.current_value
def get_lat(self):
return self.current_value["lat"]
def get_lat(self):
return self.current_value["lat"]
def get_lon(self):
return self.current_value["lon"]
def has_error(self):
return self.error
def get_error_msg(self):
return self.error_msg
def get_lon(self):
return self.current_value["lon"]
def run(self):
self._socket = socket.socket()
try:
self._socket.connect(('localhost', GPSD_PORT))
except ConnectionRefusedError as e:
self.error = True
self.error_msg = e
return
self._socket.send(WATCH_CMD)
while not self.stop:
data = self._socket.recvmsg(8192)
msg = str(data[0], 'utf-8')
if 'TPV' in msg:
try:
self.current_value = json.loads(msg)
except json.JSONDecodeError:
pass
def run(self):
try:
while True:
gps_val = self._session.next()
if gps_val["class"] == "TPV":
self.current_value = gps_val
except StopIteration:
pass
\ No newline at end of file
print("Stopping GpsPoller.")
self._socket.send(STOP_WATCH_CMD)
self._socket.close()
print("GpsPoller stopped!")
\ No newline at end of file
......@@ -18,7 +18,9 @@ class HostIdentity:
self._priv_key = priv_key
self._pub_key = pub_key
self._is_hhit_verified = True
if not self.verify_hhit():
self._is_hhit_verified = False
print("The public key does not produce the same hash in hhit.")
else:
print("hhit verified")
......@@ -28,22 +30,25 @@ class HostIdentity:
bs = None
try:
with open(file_name, 'r') as f:
bs = BeautifulSoup(f.read(), 'lxml')
bs = BeautifulSoup(f.read(), features='xml')
except PermissionError as e:
print(e)
sys.exit(1)
host_id = bs.find("host_identity") # use first host id
priv_key = ''.join(host_id.find("priv").string.split(':'))
pub_key = ''.join(host_id.find("pub").string.split(':'))
hid = host_id.find("hid").string
priv_key = ''.join(host_id.find("PRIV").string.split(':'))
pub_key = ''.join(host_id.find("PUB").string.split(':'))
hid = host_id.find("HID").string
hhit = ''.join(["{:>}".format(byte_group) for byte_group in host_id.find("hit").string.split(':')])
hhit = ''.join(["{:>}".format(byte_group) for byte_group in host_id.find("HIT").string.split(':')])
return HostIdentity(hhit, hid, priv_key, pub_key)
def get_hhit(self):
return self._hhit
def is_hhit_verified(self):
return self._is_hhit_verified
def get_hid(self):
return self._hid
......
beautifulsoup4==4.11.1
gps==3.19
protobuf
protobuf==3.20.1
psutil==5.9.4
PyNaCl==1.5.0
python-dotenv==0.21.0
python-dotenv==1.0.0
scapy==2.4.5
iroha==1.4.1.1
lxml==4.9.1
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment