Skip to content
Snippets Groups Projects
Commit d0e29425 authored by Elmedin Zildzic's avatar Elmedin Zildzic :alien: Committed by Andrei Gurtov
Browse files

Fixes and improvements to bt and drippy scripts

parent 3b2920e0
No related branches found
No related tags found
1 merge request!1Fixes and improvements to bt and drippy scripts
......@@ -10,7 +10,8 @@ from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
import string
import random
from dotenv.main import load_dotenv
from dotenv import load_dotenv
import signal
FILE_DIR = os.path.dirname(os.path.abspath(__file__))
BASE_DIR = os.path.join(FILE_DIR, "..")
......@@ -42,6 +43,8 @@ g_lat, g_lon = 58.398176, 15.575927
executor = ThreadPoolExecutor(5)
stdout_fd = subprocess.DEVNULL
def get_auth_pages(signature: bytes, auth_type=0x10):
"""
......@@ -81,6 +84,7 @@ def location_update(gpsd: GpsPoller, iroha: DripIrohaAccount, flight_id):
# ASTM F3-411 Location message 0x1
print("location_update")
global executor
global stdout_fd
# If gpsd is active, check that it has received values
if gpsd and gpsd.get_current_value() == None:
......@@ -122,7 +126,7 @@ def location_update(gpsd: GpsPoller, iroha: DripIrohaAccount, flight_id):
Advertising_Data: 1e 16 fa ff 0d 00 10 + payload
"""
subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0037 0x00 0x03 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d 0x00 0x10 {payload}".format(payload=payload), shell=True)
"hcitool -i hci0 cmd 0x08 0x0037 0x00 0x03 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d 0x00 0x10 {payload}".format(payload=payload), shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
else:
"""
OGF: 0x08 (LE controller commands)
......@@ -131,12 +135,13 @@ def location_update(gpsd: GpsPoller, iroha: DripIrohaAccount, flight_id):
Advertising_Data: 1e 16 fa ff 0d 00 10 + payload
"""
subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0008 1f 1e 16 fa ff 0d 00 10 {payload}".format(payload=payload), shell=True)
"hcitool -i hci0 cmd 0x08 0x0008 1f 1e 16 fa ff 0d 00 10 {payload}".format(payload=payload), shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
def basic_id_update(hhit):
# ASTM F3-411 Basic ID message 0x0 with HHIT.
print("basic_id_update")
global stdout_fd
# Set (Extended) Advertising Command
if is_wifi:
beacon_wifi(hhit)
......@@ -152,7 +157,7 @@ def basic_id_update(hhit):
Advertising_Data: 1e 16 fa ff 0d 00 10 + hhit
"""
subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0037 0x00 0x03 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d 0x00 0x00 0x42 {} 0x00 0x00 0x00 0x00 0x00 0x00 0x00".format(hhit), shell=True)
"hcitool -i hci0 cmd 0x08 0x0037 0x00 0x03 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d 0x00 0x00 0x42 {} 0x00 0x00 0x00 0x00 0x00 0x00 0x00".format(hhit), shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
else:
"""
OGF: 0x08 (LE controller commands)
......@@ -161,12 +166,13 @@ def basic_id_update(hhit):
Advertising_Data: 1e 16 fa ff 0d 00 00 42 + hhit
"""
subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0008 1f 1e 16 fa ff 0d 00 00 42 {} 00 00 00 00 00 00 00".format(hhit), shell=True)
"hcitool -i hci0 cmd 0x08 0x0008 1f 1e 16 fa ff 0d 00 00 42 {} 00 00 00 00 00 00 00".format(hhit), shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
time.sleep(0.5)
def auth_update(auth_pages, msg_number):
print("auth_update")
global stdout_fd
for page in auth_pages:
msg = separate_bytes(hexlify(page).decode("utf-8"))
if is_wifi:
......@@ -185,10 +191,10 @@ def auth_update(auth_pages, msg_number):
"""
if msg_number == 1:
subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0037 0x00 0x01 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d {} 0x20 {}".format("00", msg), shell=True)
"hcitool -i hci0 cmd 0x08 0x0037 0x00 0x01 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d {} 0x20 {}".format("00", msg), shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
elif msg_number == 2:
subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0037 0x00 0x02 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d {} 0x20 {}".format("00", msg), shell=True)
"hcitool -i hci0 cmd 0x08 0x0037 0x00 0x02 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d {} 0x20 {}".format("00", msg), shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
else:
"""
OGF: 0x08 (LE controller commands)
......@@ -197,7 +203,7 @@ def auth_update(auth_pages, msg_number):
Advertising_Data: 1e 16 fa ff 0d 00 20 + msg
"""
subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0008 1f 1e 16 fa ff 0d {} 20 {}".format("00", msg), shell=True)
"hcitool -i hci0 cmd 0x08 0x0008 1f 1e 16 fa ff 0d {} 20 {}".format("00", msg), shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
time.sleep(0.2)
......@@ -273,6 +279,13 @@ def main():
type=str,
help="Specify expiration date for the attestation in the format YYYY-mm-ddTHH:MM"
)
parser.add_argument(
"-v", "--verbose",
dest="verbose",
default=False,
action="store_true",
help="Use verbose output."
)
args = parser.parse_args(sys.argv[1:])
hi_file = args.host_identity_file
......@@ -280,6 +293,10 @@ def main():
is_bt5 = True if args.bluetooth_version == 5 else False
is_wifi= args.activate_wifi
global stdout_fd
if args.verbose:
stdout_fd = None # stdout
exp_date = datetime.strptime(args.exp_date, "%Y-%m-%dT%H:%M")
if (exp_date - datetime.utcnow()).total_seconds() <= 0:
print("Expiration date cannot come before current date.")
......@@ -319,7 +336,7 @@ def main():
if not is_wifi:
# Reset bt
subprocess.run("hcitool -i hci0 cmd 0x03 0x0003", shell=True)
subprocess.run("hcitool -i hci0 cmd 0x03 0x0003", shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
# Set (Extended) Advertising Parameters
if is_bt5:
......@@ -343,7 +360,7 @@ def main():
Scan_Request_Notification_Enable: 0x00 (Scan request notifications disabled)
"""
subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0036 0x00 0x00 0x00 0xa0 0x00 0x00 0xa0 0x00 0x00 0x07 0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x02 0x7f 0x03 0x00 0x03 0x00 0x00", shell=True)
"hcitool -i hci0 cmd 0x08 0x0036 0x00 0x00 0x00 0xa0 0x00 0x00 0xa0 0x00 0x00 0x07 0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x02 0x7f 0x03 0x00 0x03 0x00 0x00", shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
else:
"""
OGF: 0x08 (LE controller commands)
......@@ -358,7 +375,7 @@ def main():
Advertising_Filter_Policy: 0x02 (Process scan requests: all devices, Connection requests: only Filter Accept List)
"""
subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0006 a0 00 a0 00 03 00 00 00 00 00 00 00 00 07 02", shell=True)
"hcitool -i hci0 cmd 0x08 0x0006 a0 00 a0 00 03 00 00 00 00 00 00 00 00 07 02", shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
# Enable (Extended) Bluetooth Advertisement
if is_bt5:
......@@ -371,52 +388,67 @@ def main():
Duration[i]: 0x0000 (Advertise until the Host disables it)
Max_Extended_Advertising_Events[i]: 0x00 (No maximum)
"""
subprocess.run("hcitool -i hci0 cmd 0x08 0x0039 0x01 0x01 0x00 0x00 0x00 0x00", shell=True)
subprocess.run("hcitool -i hci0 cmd 0x08 0x0039 0x01 0x01 0x00 0x00 0x00 0x00", shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
else:
"""
OGF: 0x08 (LE controller commands)
OCF: 0x000a (LE Set Advertising Enable command)
Enable: 0x01 (True)
"""
subprocess.run("hcitool -i hci0 cmd 0x08 0x000a 01", shell=True)
subprocess.run("hcitool -i hci0 cmd 0x08 0x000a 01", shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
gpsd = None
if not args.run_scenario and args.gps:
gpsd = GpsPoller()
gpsd.start()
i = 0
try:
while True:
os.system('clear')
print("The flight_id is {}".format(flight_id))
# Send basic_id and auth at least once every 3 seconds.
# Send basic_id first second
if i % 3 == 0:
basic_id_update(hhit)
# Send first half of auth pages second second
elif i % 3 == 1:
auth_update(auth_pages[:2], 1)
# Send last half of auth pages last second
elif i % 3 == 2:
auth_update(auth_pages[2:], 2)
# Location updates at least once every second.
location_update(gpsd, iroha, flight_id)
if args.run_scenario:
g_lat += random.uniform(0.0001, 0.001) * \
random.choice([-1, 0, 1])
g_lon += random.uniform(0.0001, 0.001) * \
random.choice([-1, 0, 1])
i += 1
time.sleep(1 - time.monotonic() % 1)
except KeyboardInterrupt:
def signal_handler(_signo, _stack_frame):
if gpsd:
gpsd.stop_thread()
gpsd.join()
# Reset bt
if not is_wifi:
# Reset bt
subprocess.run("hcitool -i hci0 cmd 0x03 0x0003", shell=True)
sys.exit(0)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
i = 0
while True:
os.system('clear')
print("The flight_id is {}".format(flight_id))
if hi and not hi.is_hhit_verified():
print("HHIT verification failed")
if gpsd and gpsd.has_error():
print(f'GpsPoller received an error: {gpsd.get_error_msg()}')
# Send basic_id and auth at least once every 3 seconds.
# Send basic_id first second
if i % 3 == 0:
basic_id_update(hhit)
# Send first half of auth pages second second
elif i % 3 == 1:
auth_update(auth_pages[:2], 1)
# Send last half of auth pages last second
elif i % 3 == 2:
auth_update(auth_pages[2:], 2)
# Location updates at least once every second.
location_update(gpsd, iroha, flight_id)
if args.run_scenario:
g_lat += random.uniform(0.0001, 0.001) * \
random.choice([-1, 0, 1])
g_lon += random.uniform(0.0001, 0.001) * \
random.choice([-1, 0, 1])
i += 1
time.sleep(1 - time.monotonic() % 1)
if __name__ == '__main__':
......
import threading
from gps import *
import socket
import json
GPSD_PORT = 2947
WATCH_CMD = b'?WATCH={"enable":true,"json":true}'
STOP_WATCH_CMD = b'?WATCH={"enable":false}'
class GpsPoller(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self._session = gps(mode=WATCH_ENABLE)
self.current_value = None
def __init__(self):
threading.Thread.__init__(self)
self.current_value = None
self.stop = False
self.error = False
def stop_thread(self):
self.stop = True
def get_current_value(self):
return self.current_value
def get_current_value(self):
return self.current_value
def get_lat(self):
return self.current_value["lat"]
def get_lat(self):
return self.current_value["lat"]
def get_lon(self):
return self.current_value["lon"]
def has_error(self):
return self.error
def get_error_msg(self):
return self.error_msg
def get_lon(self):
return self.current_value["lon"]
def run(self):
self._socket = socket.socket()
try:
self._socket.connect(('localhost', GPSD_PORT))
except ConnectionRefusedError as e:
self.error = True
self.error_msg = e
return
self._socket.send(WATCH_CMD)
while not self.stop:
data = self._socket.recvmsg(8192)
msg = str(data[0], 'utf-8')
if 'TPV' in msg:
try:
self.current_value = json.loads(msg)
except json.JSONDecodeError:
pass
def run(self):
try:
while True:
gps_val = self._session.next()
if gps_val["class"] == "TPV":
self.current_value = gps_val
except StopIteration:
pass
\ No newline at end of file
print("Stopping GpsPoller.")
self._socket.send(STOP_WATCH_CMD)
self._socket.close()
print("GpsPoller stopped!")
\ No newline at end of file
......@@ -18,7 +18,9 @@ class HostIdentity:
self._priv_key = priv_key
self._pub_key = pub_key
self._is_hhit_verified = True
if not self.verify_hhit():
self._is_hhit_verified = False
print("The public key does not produce the same hash in hhit.")
else:
print("hhit verified")
......@@ -28,22 +30,25 @@ class HostIdentity:
bs = None
try:
with open(file_name, 'r') as f:
bs = BeautifulSoup(f.read(), 'lxml')
bs = BeautifulSoup(f.read(), features='xml')
except PermissionError as e:
print(e)
sys.exit(1)
host_id = bs.find("host_identity") # use first host id
priv_key = ''.join(host_id.find("priv").string.split(':'))
pub_key = ''.join(host_id.find("pub").string.split(':'))
hid = host_id.find("hid").string
priv_key = ''.join(host_id.find("PRIV").string.split(':'))
pub_key = ''.join(host_id.find("PUB").string.split(':'))
hid = host_id.find("HID").string
hhit = ''.join(["{:>}".format(byte_group) for byte_group in host_id.find("hit").string.split(':')])
hhit = ''.join(["{:>}".format(byte_group) for byte_group in host_id.find("HIT").string.split(':')])
return HostIdentity(hhit, hid, priv_key, pub_key)
def get_hhit(self):
return self._hhit
def is_hhit_verified(self):
return self._is_hhit_verified
def get_hid(self):
return self._hid
......
beautifulsoup4==4.11.1
gps==3.19
protobuf
protobuf==3.20.1
psutil==5.9.4
PyNaCl==1.5.0
python-dotenv==0.21.0
python-dotenv==1.0.0
scapy==2.4.5
iroha==1.4.1.1
lxml==4.9.1
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment