Skip to content
Snippets Groups Projects
Commit 26a2926a authored by Andrei Gurtov's avatar Andrei Gurtov
Browse files

Merge branch 'elmzi904/gps_poll_update' into 'main'

Fixes and improvements to bt and drippy scripts

See merge request !1
parents 3b2920e0 d0e29425
No related branches found
No related tags found
No related merge requests found
...@@ -10,7 +10,8 @@ from datetime import datetime ...@@ -10,7 +10,8 @@ from datetime import datetime
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
import string import string
import random import random
from dotenv.main import load_dotenv from dotenv import load_dotenv
import signal
FILE_DIR = os.path.dirname(os.path.abspath(__file__)) FILE_DIR = os.path.dirname(os.path.abspath(__file__))
BASE_DIR = os.path.join(FILE_DIR, "..") BASE_DIR = os.path.join(FILE_DIR, "..")
...@@ -42,6 +43,8 @@ g_lat, g_lon = 58.398176, 15.575927 ...@@ -42,6 +43,8 @@ g_lat, g_lon = 58.398176, 15.575927
executor = ThreadPoolExecutor(5) executor = ThreadPoolExecutor(5)
stdout_fd = subprocess.DEVNULL
def get_auth_pages(signature: bytes, auth_type=0x10): def get_auth_pages(signature: bytes, auth_type=0x10):
""" """
...@@ -81,6 +84,7 @@ def location_update(gpsd: GpsPoller, iroha: DripIrohaAccount, flight_id): ...@@ -81,6 +84,7 @@ def location_update(gpsd: GpsPoller, iroha: DripIrohaAccount, flight_id):
# ASTM F3-411 Location message 0x1 # ASTM F3-411 Location message 0x1
print("location_update") print("location_update")
global executor global executor
global stdout_fd
# If gpsd is active, check that it has received values # If gpsd is active, check that it has received values
if gpsd and gpsd.get_current_value() == None: if gpsd and gpsd.get_current_value() == None:
...@@ -122,7 +126,7 @@ def location_update(gpsd: GpsPoller, iroha: DripIrohaAccount, flight_id): ...@@ -122,7 +126,7 @@ def location_update(gpsd: GpsPoller, iroha: DripIrohaAccount, flight_id):
Advertising_Data: 1e 16 fa ff 0d 00 10 + payload Advertising_Data: 1e 16 fa ff 0d 00 10 + payload
""" """
subprocess.run( subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0037 0x00 0x03 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d 0x00 0x10 {payload}".format(payload=payload), shell=True) "hcitool -i hci0 cmd 0x08 0x0037 0x00 0x03 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d 0x00 0x10 {payload}".format(payload=payload), shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
else: else:
""" """
OGF: 0x08 (LE controller commands) OGF: 0x08 (LE controller commands)
...@@ -131,12 +135,13 @@ def location_update(gpsd: GpsPoller, iroha: DripIrohaAccount, flight_id): ...@@ -131,12 +135,13 @@ def location_update(gpsd: GpsPoller, iroha: DripIrohaAccount, flight_id):
Advertising_Data: 1e 16 fa ff 0d 00 10 + payload Advertising_Data: 1e 16 fa ff 0d 00 10 + payload
""" """
subprocess.run( subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0008 1f 1e 16 fa ff 0d 00 10 {payload}".format(payload=payload), shell=True) "hcitool -i hci0 cmd 0x08 0x0008 1f 1e 16 fa ff 0d 00 10 {payload}".format(payload=payload), shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
def basic_id_update(hhit): def basic_id_update(hhit):
# ASTM F3-411 Basic ID message 0x0 with HHIT. # ASTM F3-411 Basic ID message 0x0 with HHIT.
print("basic_id_update") print("basic_id_update")
global stdout_fd
# Set (Extended) Advertising Command # Set (Extended) Advertising Command
if is_wifi: if is_wifi:
beacon_wifi(hhit) beacon_wifi(hhit)
...@@ -152,7 +157,7 @@ def basic_id_update(hhit): ...@@ -152,7 +157,7 @@ def basic_id_update(hhit):
Advertising_Data: 1e 16 fa ff 0d 00 10 + hhit Advertising_Data: 1e 16 fa ff 0d 00 10 + hhit
""" """
subprocess.run( subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0037 0x00 0x03 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d 0x00 0x00 0x42 {} 0x00 0x00 0x00 0x00 0x00 0x00 0x00".format(hhit), shell=True) "hcitool -i hci0 cmd 0x08 0x0037 0x00 0x03 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d 0x00 0x00 0x42 {} 0x00 0x00 0x00 0x00 0x00 0x00 0x00".format(hhit), shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
else: else:
""" """
OGF: 0x08 (LE controller commands) OGF: 0x08 (LE controller commands)
...@@ -161,12 +166,13 @@ def basic_id_update(hhit): ...@@ -161,12 +166,13 @@ def basic_id_update(hhit):
Advertising_Data: 1e 16 fa ff 0d 00 00 42 + hhit Advertising_Data: 1e 16 fa ff 0d 00 00 42 + hhit
""" """
subprocess.run( subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0008 1f 1e 16 fa ff 0d 00 00 42 {} 00 00 00 00 00 00 00".format(hhit), shell=True) "hcitool -i hci0 cmd 0x08 0x0008 1f 1e 16 fa ff 0d 00 00 42 {} 00 00 00 00 00 00 00".format(hhit), shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
time.sleep(0.5) time.sleep(0.5)
def auth_update(auth_pages, msg_number): def auth_update(auth_pages, msg_number):
print("auth_update") print("auth_update")
global stdout_fd
for page in auth_pages: for page in auth_pages:
msg = separate_bytes(hexlify(page).decode("utf-8")) msg = separate_bytes(hexlify(page).decode("utf-8"))
if is_wifi: if is_wifi:
...@@ -185,10 +191,10 @@ def auth_update(auth_pages, msg_number): ...@@ -185,10 +191,10 @@ def auth_update(auth_pages, msg_number):
""" """
if msg_number == 1: if msg_number == 1:
subprocess.run( subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0037 0x00 0x01 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d {} 0x20 {}".format("00", msg), shell=True) "hcitool -i hci0 cmd 0x08 0x0037 0x00 0x01 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d {} 0x20 {}".format("00", msg), shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
elif msg_number == 2: elif msg_number == 2:
subprocess.run( subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0037 0x00 0x02 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d {} 0x20 {}".format("00", msg), shell=True) "hcitool -i hci0 cmd 0x08 0x0037 0x00 0x02 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d {} 0x20 {}".format("00", msg), shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
else: else:
""" """
OGF: 0x08 (LE controller commands) OGF: 0x08 (LE controller commands)
...@@ -197,7 +203,7 @@ def auth_update(auth_pages, msg_number): ...@@ -197,7 +203,7 @@ def auth_update(auth_pages, msg_number):
Advertising_Data: 1e 16 fa ff 0d 00 20 + msg Advertising_Data: 1e 16 fa ff 0d 00 20 + msg
""" """
subprocess.run( subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0008 1f 1e 16 fa ff 0d {} 20 {}".format("00", msg), shell=True) "hcitool -i hci0 cmd 0x08 0x0008 1f 1e 16 fa ff 0d {} 20 {}".format("00", msg), shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
time.sleep(0.2) time.sleep(0.2)
...@@ -273,6 +279,13 @@ def main(): ...@@ -273,6 +279,13 @@ def main():
type=str, type=str,
help="Specify expiration date for the attestation in the format YYYY-mm-ddTHH:MM" help="Specify expiration date for the attestation in the format YYYY-mm-ddTHH:MM"
) )
parser.add_argument(
"-v", "--verbose",
dest="verbose",
default=False,
action="store_true",
help="Use verbose output."
)
args = parser.parse_args(sys.argv[1:]) args = parser.parse_args(sys.argv[1:])
hi_file = args.host_identity_file hi_file = args.host_identity_file
...@@ -280,6 +293,10 @@ def main(): ...@@ -280,6 +293,10 @@ def main():
is_bt5 = True if args.bluetooth_version == 5 else False is_bt5 = True if args.bluetooth_version == 5 else False
is_wifi= args.activate_wifi is_wifi= args.activate_wifi
global stdout_fd
if args.verbose:
stdout_fd = None # stdout
exp_date = datetime.strptime(args.exp_date, "%Y-%m-%dT%H:%M") exp_date = datetime.strptime(args.exp_date, "%Y-%m-%dT%H:%M")
if (exp_date - datetime.utcnow()).total_seconds() <= 0: if (exp_date - datetime.utcnow()).total_seconds() <= 0:
print("Expiration date cannot come before current date.") print("Expiration date cannot come before current date.")
...@@ -319,7 +336,7 @@ def main(): ...@@ -319,7 +336,7 @@ def main():
if not is_wifi: if not is_wifi:
# Reset bt # Reset bt
subprocess.run("hcitool -i hci0 cmd 0x03 0x0003", shell=True) subprocess.run("hcitool -i hci0 cmd 0x03 0x0003", shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
# Set (Extended) Advertising Parameters # Set (Extended) Advertising Parameters
if is_bt5: if is_bt5:
...@@ -343,7 +360,7 @@ def main(): ...@@ -343,7 +360,7 @@ def main():
Scan_Request_Notification_Enable: 0x00 (Scan request notifications disabled) Scan_Request_Notification_Enable: 0x00 (Scan request notifications disabled)
""" """
subprocess.run( subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0036 0x00 0x00 0x00 0xa0 0x00 0x00 0xa0 0x00 0x00 0x07 0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x02 0x7f 0x03 0x00 0x03 0x00 0x00", shell=True) "hcitool -i hci0 cmd 0x08 0x0036 0x00 0x00 0x00 0xa0 0x00 0x00 0xa0 0x00 0x00 0x07 0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x02 0x7f 0x03 0x00 0x03 0x00 0x00", shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
else: else:
""" """
OGF: 0x08 (LE controller commands) OGF: 0x08 (LE controller commands)
...@@ -358,7 +375,7 @@ def main(): ...@@ -358,7 +375,7 @@ def main():
Advertising_Filter_Policy: 0x02 (Process scan requests: all devices, Connection requests: only Filter Accept List) Advertising_Filter_Policy: 0x02 (Process scan requests: all devices, Connection requests: only Filter Accept List)
""" """
subprocess.run( subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0006 a0 00 a0 00 03 00 00 00 00 00 00 00 00 07 02", shell=True) "hcitool -i hci0 cmd 0x08 0x0006 a0 00 a0 00 03 00 00 00 00 00 00 00 00 07 02", shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
# Enable (Extended) Bluetooth Advertisement # Enable (Extended) Bluetooth Advertisement
if is_bt5: if is_bt5:
...@@ -371,52 +388,67 @@ def main(): ...@@ -371,52 +388,67 @@ def main():
Duration[i]: 0x0000 (Advertise until the Host disables it) Duration[i]: 0x0000 (Advertise until the Host disables it)
Max_Extended_Advertising_Events[i]: 0x00 (No maximum) Max_Extended_Advertising_Events[i]: 0x00 (No maximum)
""" """
subprocess.run("hcitool -i hci0 cmd 0x08 0x0039 0x01 0x01 0x00 0x00 0x00 0x00", shell=True) subprocess.run("hcitool -i hci0 cmd 0x08 0x0039 0x01 0x01 0x00 0x00 0x00 0x00", shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
else: else:
""" """
OGF: 0x08 (LE controller commands) OGF: 0x08 (LE controller commands)
OCF: 0x000a (LE Set Advertising Enable command) OCF: 0x000a (LE Set Advertising Enable command)
Enable: 0x01 (True) Enable: 0x01 (True)
""" """
subprocess.run("hcitool -i hci0 cmd 0x08 0x000a 01", shell=True) subprocess.run("hcitool -i hci0 cmd 0x08 0x000a 01", shell=True, stdout=stdout_fd, stderr=subprocess.STDOUT)
gpsd = None gpsd = None
if not args.run_scenario and args.gps: if not args.run_scenario and args.gps:
gpsd = GpsPoller() gpsd = GpsPoller()
gpsd.start() gpsd.start()
i = 0 def signal_handler(_signo, _stack_frame):
try: if gpsd:
while True: gpsd.stop_thread()
os.system('clear') gpsd.join()
print("The flight_id is {}".format(flight_id))
# Reset bt
# Send basic_id and auth at least once every 3 seconds.
# Send basic_id first second
if i % 3 == 0:
basic_id_update(hhit)
# Send first half of auth pages second second
elif i % 3 == 1:
auth_update(auth_pages[:2], 1)
# Send last half of auth pages last second
elif i % 3 == 2:
auth_update(auth_pages[2:], 2)
# Location updates at least once every second.
location_update(gpsd, iroha, flight_id)
if args.run_scenario:
g_lat += random.uniform(0.0001, 0.001) * \
random.choice([-1, 0, 1])
g_lon += random.uniform(0.0001, 0.001) * \
random.choice([-1, 0, 1])
i += 1
time.sleep(1 - time.monotonic() % 1)
except KeyboardInterrupt:
if not is_wifi: if not is_wifi:
# Reset bt # Reset bt
subprocess.run("hcitool -i hci0 cmd 0x03 0x0003", shell=True) subprocess.run("hcitool -i hci0 cmd 0x03 0x0003", shell=True)
sys.exit(0)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
i = 0
while True:
os.system('clear')
print("The flight_id is {}".format(flight_id))
if hi and not hi.is_hhit_verified():
print("HHIT verification failed")
if gpsd and gpsd.has_error():
print(f'GpsPoller received an error: {gpsd.get_error_msg()}')
# Send basic_id and auth at least once every 3 seconds.
# Send basic_id first second
if i % 3 == 0:
basic_id_update(hhit)
# Send first half of auth pages second second
elif i % 3 == 1:
auth_update(auth_pages[:2], 1)
# Send last half of auth pages last second
elif i % 3 == 2:
auth_update(auth_pages[2:], 2)
# Location updates at least once every second.
location_update(gpsd, iroha, flight_id)
if args.run_scenario:
g_lat += random.uniform(0.0001, 0.001) * \
random.choice([-1, 0, 1])
g_lon += random.uniform(0.0001, 0.001) * \
random.choice([-1, 0, 1])
i += 1
time.sleep(1 - time.monotonic() % 1)
if __name__ == '__main__': if __name__ == '__main__':
......
import threading import threading
from gps import * import socket
import json
GPSD_PORT = 2947
WATCH_CMD = b'?WATCH={"enable":true,"json":true}'
STOP_WATCH_CMD = b'?WATCH={"enable":false}'
class GpsPoller(threading.Thread): class GpsPoller(threading.Thread):
def __init__(self): def __init__(self):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self._session = gps(mode=WATCH_ENABLE) self.current_value = None
self.current_value = None self.stop = False
self.error = False
def stop_thread(self):
self.stop = True
def get_current_value(self):
return self.current_value
def get_current_value(self): def get_lat(self):
return self.current_value return self.current_value["lat"]
def get_lat(self): def get_lon(self):
return self.current_value["lat"] return self.current_value["lon"]
def has_error(self):
return self.error
def get_error_msg(self):
return self.error_msg
def get_lon(self): def run(self):
return self.current_value["lon"] self._socket = socket.socket()
try:
self._socket.connect(('localhost', GPSD_PORT))
except ConnectionRefusedError as e:
self.error = True
self.error_msg = e
return
self._socket.send(WATCH_CMD)
while not self.stop:
data = self._socket.recvmsg(8192)
msg = str(data[0], 'utf-8')
if 'TPV' in msg:
try:
self.current_value = json.loads(msg)
except json.JSONDecodeError:
pass
def run(self): print("Stopping GpsPoller.")
try: self._socket.send(STOP_WATCH_CMD)
while True: self._socket.close()
gps_val = self._session.next() print("GpsPoller stopped!")
if gps_val["class"] == "TPV": \ No newline at end of file
self.current_value = gps_val
except StopIteration:
pass
\ No newline at end of file
...@@ -18,7 +18,9 @@ class HostIdentity: ...@@ -18,7 +18,9 @@ class HostIdentity:
self._priv_key = priv_key self._priv_key = priv_key
self._pub_key = pub_key self._pub_key = pub_key
self._is_hhit_verified = True
if not self.verify_hhit(): if not self.verify_hhit():
self._is_hhit_verified = False
print("The public key does not produce the same hash in hhit.") print("The public key does not produce the same hash in hhit.")
else: else:
print("hhit verified") print("hhit verified")
...@@ -28,22 +30,25 @@ class HostIdentity: ...@@ -28,22 +30,25 @@ class HostIdentity:
bs = None bs = None
try: try:
with open(file_name, 'r') as f: with open(file_name, 'r') as f:
bs = BeautifulSoup(f.read(), 'lxml') bs = BeautifulSoup(f.read(), features='xml')
except PermissionError as e: except PermissionError as e:
print(e) print(e)
sys.exit(1) sys.exit(1)
host_id = bs.find("host_identity") # use first host id host_id = bs.find("host_identity") # use first host id
priv_key = ''.join(host_id.find("priv").string.split(':')) priv_key = ''.join(host_id.find("PRIV").string.split(':'))
pub_key = ''.join(host_id.find("pub").string.split(':')) pub_key = ''.join(host_id.find("PUB").string.split(':'))
hid = host_id.find("hid").string hid = host_id.find("HID").string
hhit = ''.join(["{:>}".format(byte_group) for byte_group in host_id.find("hit").string.split(':')]) hhit = ''.join(["{:>}".format(byte_group) for byte_group in host_id.find("HIT").string.split(':')])
return HostIdentity(hhit, hid, priv_key, pub_key) return HostIdentity(hhit, hid, priv_key, pub_key)
def get_hhit(self): def get_hhit(self):
return self._hhit return self._hhit
def is_hhit_verified(self):
return self._is_hhit_verified
def get_hid(self): def get_hid(self):
return self._hid return self._hid
......
beautifulsoup4==4.11.1 beautifulsoup4==4.11.1
gps==3.19 protobuf==3.20.1
protobuf
psutil==5.9.4 psutil==5.9.4
PyNaCl==1.5.0 PyNaCl==1.5.0
python-dotenv==0.21.0 python-dotenv==1.0.0
scapy==2.4.5 scapy==2.4.5
iroha==1.4.1.1 iroha==1.4.1.1
lxml==4.9.1 lxml==4.9.1
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment