Forked from
Hampus Rosenquist / TDDE21 DRIP 2022
11 commits behind the upstream repository.
-
Abdullah Bin Zubair authored
Beacon (sender) code following the draft-ietf-drip-auth-17 DRIP Authentication Formats structure. No Wi-Fi part is merged with this file it contains bt4, bt5 and updated Authentication Formats structure.
Abdullah Bin Zubair authoredBeacon (sender) code following the draft-ietf-drip-auth-17 DRIP Authentication Formats structure. No Wi-Fi part is merged with this file it contains bt4, bt5 and updated Authentication Formats structure.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
beacon_1.py 18.46 KiB
import argparse
import os
import sys
import re
import subprocess
import time
from base64 import b64encode
from binascii import unhexlify, hexlify
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
import string
import random
from dotenv.main import load_dotenv
FILE_DIR = os.path.dirname(os.path.abspath(__file__))
BASE_DIR = os.path.join(FILE_DIR, "..")
sys.path.append(BASE_DIR)
from drippy.drip_iroha_account import DripIrohaAccount
from drippy.gps_poll import GpsPoller
from drippy.util import astm_float_to_int_hex, get_timestamp, astm_time
from drippy.host_identity import HostIdentity
from drippy.cSHAKE import cSHAKE128
# File with flight_id that will be incremented and used if no other
# flight_id is specified.
FLIGHT_ID_FILE = os.path.join(FILE_DIR, "default_flight_id.txt")
load_dotenv()
location_info = "" #location info such lat & long, it is saved to send later in manifests
def separate_bytes(hex_str, delim=' '):
return re.sub(r'.{2}', r'\g<0>{}'.format(delim), hex_str).strip()
AUTH_PAGE_0_LEN = 17
AUTH_PAGE_LEN = 23
g_lat, g_lon = 58.398176, 15.575927
executor = ThreadPoolExecutor(5)
def get_auth_pages(signature: bytes, auth_type=0x10):
"""
Build a list of ASTM F3411-19 authentication pages with the
signature provided.
"""
auth_pages = []
auth_len = len(signature)
# Pad signature with 0s so all pages are even length.
signature += b'\x00' * \
(AUTH_PAGE_LEN - ((len(signature)-AUTH_PAGE_0_LEN) % AUTH_PAGE_LEN))
pages = 1 + (len(signature) - AUTH_PAGE_0_LEN)//AUTH_PAGE_LEN
# First page is 7 bytes header + 17 bytes from signature
auth_page_0 = b'\x10' + pages.to_bytes(1, 'big') + auth_len.to_bytes(
1, 'big') + int(astm_time()).to_bytes(4, 'little')
auth_page_0 += signature[:AUTH_PAGE_0_LEN]
auth_pages.append(auth_page_0)
# Remaining pages are 1 byte header + 23 bytes from signature
i = AUTH_PAGE_0_LEN
page = 1
bytes_to_encode = len(signature) - AUTH_PAGE_0_LEN
while i < bytes_to_encode:
auth_page = (auth_type | page).to_bytes(1, 'big')
auth_page += signature[i:i+AUTH_PAGE_LEN]
auth_pages.append(auth_page)
i += AUTH_PAGE_LEN
page += 1
return auth_pages
# Placing a single location message For Now
def Drip_Wrapper(gpsd: GpsPoller, iroha: DripIrohaAccount, flight_id, HostID: HostIdentity, exp_date):
# ASTM F3-411 Location message 0x1
print("location_update")
global executor
# If gpsd is active, check that it has received values
if gpsd and gpsd.get_current_value() == None:
return
if gpsd:
lat = separate_bytes(astm_float_to_int_hex(gpsd.get_lat()))
lon = separate_bytes(astm_float_to_int_hex(gpsd.get_lon()))
else:
lat = separate_bytes(astm_float_to_int_hex(g_lat))
lon = separate_bytes(astm_float_to_int_hex(g_lon))
location_info = "10 00 00 00 {lat} {lon} 00 00 00 00 00 00 00 00 {ts} 00 00".format(
lat=lat, lon=lon, ts="00 00")
if(location_info):
hash_location = cSHAKE128( location_info.encode('utf-8'),25,"", "") # hash of location_info, 25 bytes
if iroha: # if iroha private supplied, send location transactions on separate thread.
# location will be flight id concatenated with the bluetooth payload
# In order to save space we can b64encode the bytes of the payload,
# instead of sending a hexstring which would result in location message
# taking up double the bytes (2 characters per byte).
iroha_msg = b64encode(
unhexlify(flight_id + ''.join(location_info.split(' ')))).decode("utf-8")
executor.submit(
iroha.set_account_details, "location", iroha_msg
)
payload = HostID.generate_self_attestation_Wrapper(hash_location, get_timestamp(exp_date))
# Set (Extended) Advertising command
if is_bt5:
"""
OGF: 0x08 (LE controller commands)
OCF: 0x0037 (LE Set Extended Advertising Data command)
Advertising handle: 0x00 (Advertising set number)
Operation: 0x03 (Complete extended advertising data)
Fragment_Preference: 0x01 (The Controller should not fragment or should minimize fragmentation of Host advertising data)
Advertising_Data_Length: 0x1f
Advertising_Data: 1e 16 fa ff 0d 00 10 + payload
"""
subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0037 0x00 0x03 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d 0x00 0x10 {hash_location}{payload}".format(hash_location=hash_location,payload=payload), shell=True)
else:
"""
OGF: 0x08 (LE controller commands)
OCF: 0x0008 (LE Set Advertising Data command)
Advertising_Data_Length: 0x1f
Advertising_Data: 1e 16 fa ff 0d 00 10 + payload
"""
subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0008 1f 1e 16 fa ff 0d 00 10 {hash_location}{payload}".format(hash_location=hash_location, payload=payload), shell=True)
def manifest_broadcast(HostID: HostIdentity, flight_id, exp_date, prev_loc = "" ): ## Manifest broadcast can contain max 11 hashes (at 8-bytes each). For now we are sending one message.
# ASTM F3-411 Location message 0x1
print("manifest_broadcast")
# If gpsd is active, check that it has received values
if location_info == None:
print("payload is empty, location information missing from gps")
payload = HostID.generate_self_attestation_Manifests(cSHAKE128(prev_loc.encode('utf8'),8, "",""),cSHAKE128(location_info.encode('utf8'),8,"",""), get_timestamp(exp_date))
# Set (Extended) Advertising command
if is_bt5:
"""
OGF: 0x08 (LE controller commands)
OCF: 0x0037 (LE Set Extended Advertising Data command)
Advertising handle: 0x00 (Advertising set number)
Operation: 0x03 (Complete extended advertising data)
Fragment_Preference: 0x01 (The Controller should not fragment or should minimize fragmentation of Host advertising data)
Advertising_Data_Length: 0x1f
Advertising_Data: 1e 16 fa ff 0d 00 10 + payload
"""
subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0037 0x00 0x03 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d 0x00 0x10 {payload}".format(payload=payload), shell=True) #0x03
else:
"""
OGF: 0x08 (LE controller commands)
OCF: 0x0008 (LE Set Advertising Data command)
Advertising_Data_Length: 0x1f
Advertising_Data: 1e 16 fa ff 0d 00 10 + payload
"""
subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0008 1f 1e 16 fa ff 0d 00 10 {payload}".format(payload=payload), shell=True)
# session id / hhit should be passed in the basic_id_update function
def basic_id_update(hhit):
# ASTM F3-411 Basic ID message 0x0 with HHIT.
print("basic_id_update")
# Set (Extended) Advertising Command
if is_bt5:
"""
OGF: 0x08 (LE controller commands)
OCF: 0x0037 (LE Set Extended Advertising Data command)
Advertising handle: 0x00 (Advertising set number)
Operation: 0x03 (Complete extended advertising data)
Fragment_Preference: 0x01 (The Controller should not fragment or should minimize fragmentation of Host advertising data)
Advertising_Data_Length: 0x1f
Advertising_Data: 1e 16 fa ff 0d 00 10 + hhit
"""
subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0037 0x00 0x03 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d 0x00 0x00 0x42 {} 0x00 0x00 0x00 0x00 0x00 0x00 0x00".format('0x01', hhit), shell=True)
else:
"""
OGF: 0x08 (LE controller commands)
OCF: 0x0008 (LE Set Advertising Data command)
Advertising_Data_Length: 0x1f
Advertising_Data: 1e 16 fa ff 0d 00 00 42 + hhit
"""
subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0008 1f 1e 16 fa ff 0d 00 00 42 {} 00 00 00 00 00 00 00".format('0x01', hhit), shell=True)
time.sleep(0.5)
def auth_update(auth_pages, msg_number):
print("auth_update")
for page in auth_pages:
msg = separate_bytes(hexlify(page).decode("utf-8"))
# Set (Extended) Advertising Command
if is_bt5:
"""
OGF: 0x08 (LE controller commands)
OCF: 0x0037 (LE Set Extended Advertising Data command)
Advertising handle: 0x00 (Advertising set number)
Operation: 0x01 / 0x02 (First and last part of advertising data)
Fragment_Preference: 0x01 (The Controller should not fragment or should minimize fragmentation of Host advertising data)
Advertising_Data_Length: 0x1f
Advertising_Data: 1e 16 fa ff 0d 00 20 + msg
"""
if msg_number == 1:
subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0037 0x00 0x01 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d {} 0x20 {}".format("00", msg), shell=True)
elif msg_number == 2:
subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0037 0x00 0x02 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d {} 0x20 {}".format("00", msg), shell=True)
else:
"""
OGF: 0x08 (LE controller commands)
OCF: 0x0008 (LE Set Advertising Data command)
Advertising_Data_Length: 0x1f
Advertising_Data: 1e 16 fa ff 0d 00 20 + msg
"""
subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0008 1f 1e 16 fa ff 0d {} 20 {}".format("00", msg), shell=True)
time.sleep(0.2)
DESCRIPTION = """Send ble advertisements using the specified Host Identity,
in accordance to ASTM F3411-19.
If iroha option with the private key file is specified,
location updates will be sent to Iroha blockchain as well.
The private key should belong to the same host specificed by
the host identity.
If no flight_id is specified, then a random flight id will be generated.
flight_id is only used if the --iroha option is specified.
"""
def main():
global g_lat
global g_lon
global is_bt5
parser = argparse.ArgumentParser(description=DESCRIPTION)
parser.add_argument(
"-f", "--file",
dest="host_identity_file",
default="/usr/local/etc/hip/my_host_identities.xml",
type=str,
help="Specify the host identity xml file."
)
parser.add_argument(
"-r", "--registry_file", # this is for registry_identity_file
dest="registry_identity_file",
default="/usr/local/etc/hip/my_host_identities.xml",
type=str,
help="Specify the registry identity xml file."
)
parser.add_argument(
"-i", "--iroha",
dest="iroha_priv_key_file",
default=None,
type=str,
help="Specify the iroha private key file to enable location updates to Iroha."
)
parser.add_argument(
"-b", "--bluetooth",
dest="bluetooth_version",
default=4,
type=int,
help="Specify the bluetooth version, 4 or 5 (Default: 4)."
)
parser.add_argument(
"--gps",
help="Turn on gps updates",
action="store_true"
)
parser.add_argument(
"--flight_id",
type=int,
help="Specify a flight_id for the location updates. Used to identify the flight path in the blockchain. This value is only used if the --iroha option is specified."
)
parser.add_argument(
"--run-scenario",
dest="run_scenario",
action="store_true",
help="Run a flight scenario with randomized coordinates originating at ({}, {}). This will disregard the gps option.".format(
g_lat, g_lon)
)
parser.add_argument(
"--expiration-date",
dest="exp_date",
default="2030-01-01T13:00",
type=str,
help="Specify expiration date for the attestation in the format YYYY-mm-ddTHH:MM"
)
#
args = parser.parse_args(sys.argv[1:])
# hi_file is read from identities/drone
hi_file = args.host_identity_file
# registry hi_file (actually identities/admin)
r_hi_file = args.registry_identity_file
iroha_priv_key_file = args.iroha_priv_key_file
is_bt5 = True if args.bluetooth_version == 5 else False
exp_date = datetime.strptime(args.exp_date, "%Y-%m-%dT%H:%M")
if (exp_date - datetime.utcnow()).total_seconds() <= 0:
print("Expiration date cannot come before current date.")
sys.exit(-1)
if not os.path.isfile(hi_file) \
or hi_file[-3:] != "xml":
print("The specified file is not an xml file")
sys.exit(-1)
#HostIdentity object containing Drone DRIP credentials
hi = HostIdentity.from_file(hi_file)
# registry host identity
r_hi = HostIdentity.from_file(r_hi_file)
iroha = None
if iroha_priv_key_file:
if not os.path.isfile(iroha_priv_key_file) \
or iroha_priv_key_file[-4:] != "priv":
print("The specified iroha private key file is not a .priv file")
sys.exit(-1)
key = open(iroha_priv_key_file, 'r').read()
iroha = DripIrohaAccount(key, hi.get_hhit(), hi.get_hid(
), os.environ.get("IROHA_HOST"), os.environ.get("IROHA_PORT"))
if args.flight_id:
flight_id = args.flight_id
else:
flight_id = ''.join(random.SystemRandom().choice(
string.hexdigits) for _ in range(64))
hhit = separate_bytes(hi.get_hhit())
hid = separate_bytes(hi.get_hid())
# this endorsement should be generated by the registry,
# but just for now, we are generating it according draft-ietf-drip-auth-17
attest = r_hi.generate_attestation_Endorsement_Broadcast(hhit,hid,get_timestamp(exp_date))
#attest = r_hi.generate_self_attestation(get_timestamp(exp_date))
if attest is None:
print("Expiration date has expired.")
sys.exit(-1)
auth_pages = get_auth_pages(attest)
# Reset bt
subprocess.run("hcitool -i hci0 cmd 0x03 0x0003", shell=True)
# Set (Extended) Advertising Parameters
if is_bt5:
"""
OGF: 0x08 (LE controller commands)
OCF: 0x0036 (LE Set Extended Advertising Parameters command)
Advertising handle: 0x00 (Advertising set number)
Advertising_Event_Properties: 0x0000 (Non-connectable, non-scannable undirected and not legacy PDU)
Primary_Advertising_Interval_Min: 0xa00000 (100 ms)
Primary_Advertising_Interval_Max: 0xa00000 (100 ms)
Primary_Advertising_Channel_Map: 0x07 (All three channels enabled)
Own_Address_Type: 0x00 (Public device address)
Peer_Address_Type: 0x00 (Public Device Address or Public Identity Address)
Peer_Address: 0x000000000000
Advertising_Filter_Policy: 0x02 (Process scan requests: all devices, Connection requests: only Filter Accept List)
Advertising_TX_Power: 0x7f (Host has no preference)
Primary_Advertising_PHY: 0x03 (Primary advertisement PHY is LE Coded)
Secondary_Advertising_Max_Skip: 0x00 (AUX_ADV_IND shall be sent prior to the next advertising event)
Secondary_Advertising_PHY: 0x03 (Secondary advertisement PHY is LE Coded)
Advertising_SID: 0x00 (Value of the Advertising SID subfield in the ADI field of the PDU)
Scan_Request_Notification_Enable: 0x00 (Scan request notifications disabled)
"""
subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0036 0x00 0x00 0x00 0xa0 0x00 0x00 0xa0 0x00 0x00 0x07 0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x02 0x7f 0x03 0x00 0x03 0x00 0x00", shell=True)
else:
"""
OGF: 0x08 (LE controller commands)
OCF: 0x0006 (LE Set Advertising Parameters command)
Advertising_Interval_Min: 0xa000 (100 ms)
Advertising_Interval_Max: 0xa000 (100 ms)
Advertising_Type: 0x03 (Non connectable undirected advertising (ADV_NONCONN_IND))
Own_Address_Type: 0x00 (Public device address)
Peer_Address_Type: 0x00 (Public Device Address or Public Identity Address)
Peer_Address: 0x000000000000
Advertising_Channel_Map: 0x07 (All three channels enabled)
Advertising_Filter_Policy: 0x02 (Process scan requests: all devices, Connection requests: only Filter Accept List)
"""
subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0006 a0 00 a0 00 03 00 00 00 00 00 00 00 00 07 02", shell=True)
# Enable (Extended) Bluetooth Advertisement
if is_bt5:
"""
OGF: 0x08 (LE controller commands)
OCF: 0x0039 (LE Set Extended Advertising Enable command)
Enable: 0x01 (True)
Num_Sets: 0x01 (Number of advertising sets to enable)
Advertising_Handle[i]: 0x00 (Advertising set's handles)
Duration[i]: 0x0000 (Advertise until the Host disables it)
Max_Extended_Advertising_Events[i]: 0x00 (No maximum)
"""
subprocess.run("hcitool -i hci0 cmd 0x08 0x0039 0x01 0x01 0x00 0x00 0x00 0x00", shell=True)
else:
"""
OGF: 0x08 (LE controller commands)
OCF: 0x000a (LE Set Advertising Enable command)
Enable: 0x01 (True)
"""
subprocess.run("hcitool -i hci0 cmd 0x08 0x000a 01", shell=True)
gpsd = None
if not args.run_scenario and args.gps:
gpsd = GpsPoller()
gpsd.start()
i = 0
try:
while True:
os.system('clear')
print("The flight_id is {}".format(flight_id))
# Send basic_id and auth at least once every 3 seconds.
# Send basic_id first second
if i % 3 == 0:
basic_id_update(hhit)
# Send first half of auth pages second second
elif i % 3 == 1:
auth_update(auth_pages[:2], 1)
# Send last half of auth pages last second
elif i % 3 == 2:
auth_update(auth_pages[2:], 2)
# saving the hash_location information such as lon & lat, to use as previous location_hash later in Manifests
prev_location_info = location_info
# Location updates at least once every second.
Drip_Wrapper(gpsd, iroha, flight_id, hi, exp_date)
# Manifest broadcast
manifest_broadcast(hi, flight_id, exp_date, prev_location_info)
if args.run_scenario:
g_lat += random.uniform(0.0001, 0.001) * \
random.choice([-1, 0, 1])
g_lon += random.uniform(0.0001, 0.001) * \
random.choice([-1, 0, 1])
print(g_lat, g_lon)
i += 1
time.sleep(1 - time.monotonic() % 1)
except KeyboardInterrupt:
# Reset bt
subprocess.run("hcitool -i hci0 cmd 0x03 0x0003", shell=True)
if __name__ == '__main__':
main()