Skip to content
Snippets Groups Projects
Forked from Hampus Rosenquist / TDDE21 DRIP 2022
11 commits behind the upstream repository.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
beacon_1.py 18.46 KiB
import argparse
import os
import sys
import re
import subprocess
import time
from base64 import b64encode
from binascii import unhexlify, hexlify
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
import string
import random
from dotenv.main import load_dotenv

FILE_DIR = os.path.dirname(os.path.abspath(__file__))
BASE_DIR = os.path.join(FILE_DIR, "..")
sys.path.append(BASE_DIR)

from drippy.drip_iroha_account import DripIrohaAccount
from drippy.gps_poll import GpsPoller
from drippy.util import astm_float_to_int_hex, get_timestamp, astm_time
from drippy.host_identity import HostIdentity
from drippy.cSHAKE import cSHAKE128




# File with flight_id that will be incremented and used if no other
# flight_id is specified.
FLIGHT_ID_FILE = os.path.join(FILE_DIR, "default_flight_id.txt")


load_dotenv()

location_info = "" #location info such lat & long, it is saved to send later in manifests

def separate_bytes(hex_str, delim=' '):
    return re.sub(r'.{2}', r'\g<0>{}'.format(delim), hex_str).strip()


AUTH_PAGE_0_LEN = 17
AUTH_PAGE_LEN = 23

g_lat, g_lon = 58.398176, 15.575927

executor = ThreadPoolExecutor(5)


def get_auth_pages(signature: bytes, auth_type=0x10):
    """
    Build a list of ASTM F3411-19 authentication pages with the
    signature provided.
    """
    auth_pages = []
    auth_len = len(signature)

    # Pad signature with 0s so all pages are even length.
    signature += b'\x00' * \
        (AUTH_PAGE_LEN - ((len(signature)-AUTH_PAGE_0_LEN) % AUTH_PAGE_LEN))
    pages = 1 + (len(signature) - AUTH_PAGE_0_LEN)//AUTH_PAGE_LEN

    # First page is 7 bytes header + 17 bytes from signature
    auth_page_0 = b'\x10' + pages.to_bytes(1, 'big') + auth_len.to_bytes(
        1, 'big') + int(astm_time()).to_bytes(4, 'little')
    auth_page_0 += signature[:AUTH_PAGE_0_LEN]
    auth_pages.append(auth_page_0)

    # Remaining pages are 1 byte header + 23 bytes from signature
    i = AUTH_PAGE_0_LEN
    page = 1
    bytes_to_encode = len(signature) - AUTH_PAGE_0_LEN
    while i < bytes_to_encode:
        auth_page = (auth_type | page).to_bytes(1, 'big')
        auth_page += signature[i:i+AUTH_PAGE_LEN]
        auth_pages.append(auth_page)

        i += AUTH_PAGE_LEN
        page += 1

    return auth_pages

# Placing a single location message For Now 
def Drip_Wrapper(gpsd: GpsPoller, iroha: DripIrohaAccount, flight_id, HostID: HostIdentity, exp_date):
    # ASTM F3-411 Location message 0x1
    print("location_update")
    global executor

   

    # If gpsd is active, check that it has received values
    if gpsd and gpsd.get_current_value() == None:
        return

    if gpsd:
        lat = separate_bytes(astm_float_to_int_hex(gpsd.get_lat()))
        lon = separate_bytes(astm_float_to_int_hex(gpsd.get_lon()))
    else:
        lat = separate_bytes(astm_float_to_int_hex(g_lat))
        lon = separate_bytes(astm_float_to_int_hex(g_lon))

    location_info = "10 00 00 00 {lat} {lon} 00 00 00 00 00 00 00 00 {ts} 00 00".format(
        lat=lat, lon=lon, ts="00 00")

    if(location_info):
        hash_location = cSHAKE128( location_info.encode('utf-8'),25,"", "") # hash of location_info, 25 bytes
    if iroha:  # if iroha private supplied, send location transactions on separate thread.
        # location will be flight id concatenated with the bluetooth payload
        # In order to save space we can b64encode the bytes of the payload,
        # instead of sending a hexstring which would result in location message
        # taking up double the bytes (2 characters per byte).
        iroha_msg = b64encode(
            unhexlify(flight_id + ''.join(location_info.split(' ')))).decode("utf-8")
        executor.submit(
            iroha.set_account_details, "location", iroha_msg
        )
    
    payload = HostID.generate_self_attestation_Wrapper(hash_location, get_timestamp(exp_date))
    # Set (Extended) Advertising command
    if is_bt5:
        """
        OGF: 0x08  (LE controller commands)
        OCF: 0x0037 (LE Set Extended Advertising Data command)
        Advertising handle: 0x00 (Advertising set number)
        Operation: 0x03 (Complete extended advertising data)
        Fragment_Preference: 0x01 (The Controller should not fragment or should minimize fragmentation of Host advertising data)
        Advertising_Data_Length: 0x1f 
        Advertising_Data: 1e 16 fa ff 0d 00 10 + payload 
        """
        subprocess.run(
            "hcitool -i hci0 cmd 0x08 0x0037 0x00 0x03 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d 0x00 0x10 {hash_location}{payload}".format(hash_location=hash_location,payload=payload), shell=True)
    else:
        """
        OGF: 0x08  (LE controller commands)
        OCF: 0x0008 (LE Set Advertising Data command)
        Advertising_Data_Length: 0x1f 
        Advertising_Data: 1e 16 fa ff 0d 00 10 + payload
        """
        subprocess.run(
            "hcitool -i hci0 cmd 0x08 0x0008 1f 1e 16 fa ff 0d 00 10 {hash_location}{payload}".format(hash_location=hash_location, payload=payload), shell=True)


def manifest_broadcast(HostID: HostIdentity, flight_id, exp_date, prev_loc = "" ): ## Manifest broadcast can contain max  11 hashes (at 8-bytes each). For now we are sending one message.
    # ASTM F3-411 Location message 0x1
    print("manifest_broadcast")
    # If gpsd is active, check that it has received values
    if location_info == None:
        print("payload is empty, location information missing from gps")

    payload = HostID.generate_self_attestation_Manifests(cSHAKE128(prev_loc.encode('utf8'),8, "",""),cSHAKE128(location_info.encode('utf8'),8,"",""), get_timestamp(exp_date))
    # Set (Extended) Advertising command
    if is_bt5:
        """
        OGF: 0x08  (LE controller commands)
        OCF: 0x0037 (LE Set Extended Advertising Data command)
        Advertising handle: 0x00 (Advertising set number)
        Operation: 0x03 (Complete extended advertising data)
        Fragment_Preference: 0x01 (The Controller should not fragment or should minimize fragmentation of Host advertising data)
        Advertising_Data_Length: 0x1f 
        Advertising_Data: 1e 16 fa ff 0d 00 10 + payload 
        """
        subprocess.run(
            "hcitool -i hci0 cmd 0x08 0x0037 0x00 0x03 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d 0x00 0x10 {payload}".format(payload=payload), shell=True) #0x03
    else:
        """
        OGF: 0x08  (LE controller commands)
        OCF: 0x0008 (LE Set Advertising Data command)
        Advertising_Data_Length: 0x1f 
        Advertising_Data: 1e 16 fa ff 0d 00 10 + payload
        """
        subprocess.run(
            "hcitool -i hci0 cmd 0x08 0x0008 1f 1e 16 fa ff 0d 00 10 {payload}".format(payload=payload), shell=True)




# session id / hhit should be passed in the basic_id_update function

def basic_id_update(hhit):
    # ASTM F3-411 Basic ID message 0x0 with HHIT.

    print("basic_id_update")
    # Set (Extended) Advertising Command
    if is_bt5:
        """
        OGF: 0x08  (LE controller commands)
        OCF: 0x0037 (LE Set Extended Advertising Data command)
        Advertising handle: 0x00 (Advertising set number)
        Operation: 0x03 (Complete extended advertising data)
        Fragment_Preference: 0x01 (The Controller should not fragment or should minimize fragmentation of Host advertising data)
        Advertising_Data_Length: 0x1f 
        Advertising_Data: 1e 16 fa ff 0d 00 10 + hhit 
        """
        subprocess.run(
            "hcitool -i hci0 cmd 0x08 0x0037 0x00 0x03 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d 0x00 0x00 0x42 {} 0x00 0x00 0x00 0x00 0x00 0x00 0x00".format('0x01', hhit), shell=True)
    else:
        """
        OGF: 0x08  (LE controller commands)
        OCF: 0x0008 (LE Set Advertising Data command)
        Advertising_Data_Length: 0x1f 
        Advertising_Data: 1e 16 fa ff 0d 00 00 42 + hhit
        """
        subprocess.run(
            "hcitool -i hci0 cmd 0x08 0x0008 1f 1e 16 fa ff 0d 00 00 42 {} 00 00 00 00 00 00 00".format('0x01', hhit), shell=True)
    time.sleep(0.5)


def auth_update(auth_pages, msg_number):
    print("auth_update")
    for page in auth_pages:
        msg = separate_bytes(hexlify(page).decode("utf-8"))
        # Set (Extended) Advertising Command
        if is_bt5:
            """
            OGF: 0x08  (LE controller commands)
            OCF: 0x0037 (LE Set Extended Advertising Data command)
            Advertising handle: 0x00 (Advertising set number)
            Operation: 0x01 / 0x02 (First and last part of advertising data)
            Fragment_Preference: 0x01 (The Controller should not fragment or should minimize fragmentation of Host advertising data)
            Advertising_Data_Length: 0x1f 
            Advertising_Data: 1e 16 fa ff 0d 00 20 + msg
            """
            if msg_number == 1:
                subprocess.run(
                    "hcitool -i hci0 cmd 0x08 0x0037 0x00 0x01 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d {} 0x20 {}".format("00", msg), shell=True)
            elif msg_number == 2:
                subprocess.run(
                    "hcitool -i hci0 cmd 0x08 0x0037 0x00 0x02 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d {} 0x20 {}".format("00", msg), shell=True)
        else:
            """
            OGF: 0x08  (LE controller commands)
            OCF: 0x0008 (LE Set Advertising Data command)
            Advertising_Data_Length: 0x1f 
            Advertising_Data: 1e 16 fa ff 0d 00 20 + msg
            """
            subprocess.run(
                "hcitool -i hci0 cmd 0x08 0x0008 1f 1e 16 fa ff 0d {} 20 {}".format("00", msg), shell=True)
        time.sleep(0.2)


DESCRIPTION = """Send ble advertisements using the specified Host Identity,
in accordance to ASTM F3411-19.

If iroha option with the private key file is specified,
location updates will be sent to Iroha blockchain as well.
The private key should belong to the same host specificed by
the host identity.

If no flight_id is specified, then a random flight id will be generated.
flight_id is only used if the --iroha option is specified.
"""


def main():
    global g_lat
    global g_lon
    global is_bt5
    parser = argparse.ArgumentParser(description=DESCRIPTION)
    parser.add_argument(
        "-f", "--file",
        dest="host_identity_file",
        default="/usr/local/etc/hip/my_host_identities.xml",
        type=str,
        help="Specify the host identity xml file."
    )
    
    parser.add_argument( 
        "-r", "--registry_file",     # this is for registry_identity_file
        dest="registry_identity_file",
        default="/usr/local/etc/hip/my_host_identities.xml",
        type=str,
        help="Specify the registry identity xml file."
    )
    parser.add_argument(
        "-i", "--iroha",
        dest="iroha_priv_key_file",
        default=None,
        type=str,
        help="Specify the iroha private key file to enable location updates to Iroha."
    )
    parser.add_argument(
        "-b", "--bluetooth",
        dest="bluetooth_version",
        default=4,
        type=int,
        help="Specify the bluetooth version, 4 or 5 (Default: 4)."
    )
    parser.add_argument(
        "--gps",
        help="Turn on gps updates",
        action="store_true"
    )
    parser.add_argument(
        "--flight_id",
        type=int,
        help="Specify a flight_id for the location updates. Used to identify the flight path in the blockchain. This value is only used if the --iroha option is specified."
    )
    parser.add_argument(
        "--run-scenario",
        dest="run_scenario",
        action="store_true",
        help="Run a flight scenario with randomized coordinates originating at ({}, {}). This will disregard the gps option.".format(
            g_lat, g_lon)
    )
    parser.add_argument(
        "--expiration-date",
        dest="exp_date",
        default="2030-01-01T13:00",
        type=str,
        help="Specify expiration date for the attestation in the format YYYY-mm-ddTHH:MM"
    )

    # 
    args = parser.parse_args(sys.argv[1:])

    # hi_file is read from identities/drone
    hi_file = args.host_identity_file
    # registry hi_file (actually identities/admin)
    r_hi_file = args.registry_identity_file

    iroha_priv_key_file = args.iroha_priv_key_file
    is_bt5 = True if args.bluetooth_version == 5 else False

    exp_date = datetime.strptime(args.exp_date, "%Y-%m-%dT%H:%M")
    if (exp_date - datetime.utcnow()).total_seconds() <= 0:
        print("Expiration date cannot come before current date.")
        sys.exit(-1)

    if not os.path.isfile(hi_file) \
            or hi_file[-3:] != "xml":
        print("The specified file is not an xml file")
        sys.exit(-1)

    #HostIdentity object containing Drone DRIP credentials
    hi = HostIdentity.from_file(hi_file)
    
    # registry host identity 
    r_hi = HostIdentity.from_file(r_hi_file)
   


    iroha = None
    if iroha_priv_key_file:
        if not os.path.isfile(iroha_priv_key_file) \
                or iroha_priv_key_file[-4:] != "priv":
            print("The specified iroha private key file is not a .priv file")
            sys.exit(-1)

        key = open(iroha_priv_key_file, 'r').read()
        iroha = DripIrohaAccount(key, hi.get_hhit(), hi.get_hid(
        ), os.environ.get("IROHA_HOST"), os.environ.get("IROHA_PORT"))

    if args.flight_id:
        flight_id = args.flight_id
    else:
        flight_id = ''.join(random.SystemRandom().choice(
            string.hexdigits) for _ in range(64))

    hhit = separate_bytes(hi.get_hhit())
    hid = separate_bytes(hi.get_hid())

    # this endorsement should be generated by the registry, 
    # but just for now, we are generating it according draft-ietf-drip-auth-17
    attest = r_hi.generate_attestation_Endorsement_Broadcast(hhit,hid,get_timestamp(exp_date))
    #attest = r_hi.generate_self_attestation(get_timestamp(exp_date))

    if attest is None:
        print("Expiration date has expired.")
        sys.exit(-1)

    auth_pages = get_auth_pages(attest)

    # Reset bt
    subprocess.run("hcitool -i hci0 cmd 0x03 0x0003", shell=True)

    # Set (Extended) Advertising Parameters
    if is_bt5:
        """
        OGF: 0x08  (LE controller commands)
        OCF: 0x0036 (LE Set Extended Advertising Parameters command)
        Advertising handle: 0x00 (Advertising set number)
        Advertising_Event_Properties: 0x0000 (Non-connectable, non-scannable undirected and not legacy PDU)
        Primary_Advertising_Interval_Min: 0xa00000 (100 ms)
        Primary_Advertising_Interval_Max: 0xa00000 (100 ms)
        Primary_Advertising_Channel_Map: 0x07 (All three channels enabled)
        Own_Address_Type: 0x00 (Public device address)
        Peer_Address_Type: 0x00 (Public Device Address or Public Identity Address)
        Peer_Address: 0x000000000000
        Advertising_Filter_Policy: 0x02 (Process scan requests: all devices, Connection requests: only Filter Accept List)
        Advertising_TX_Power: 0x7f (Host has no preference)
        Primary_Advertising_PHY: 0x03 (Primary advertisement PHY is LE Coded)
        Secondary_Advertising_Max_Skip: 0x00 (AUX_ADV_IND shall be sent prior to the next advertising event)
        Secondary_Advertising_PHY: 0x03 (Secondary advertisement PHY is LE Coded)
        Advertising_SID: 0x00 (Value of the Advertising SID subfield in the ADI field of the PDU)
        Scan_Request_Notification_Enable: 0x00 (Scan request notifications disabled)
        """
        subprocess.run(
            "hcitool -i hci0 cmd 0x08 0x0036 0x00 0x00 0x00 0xa0 0x00 0x00 0xa0 0x00 0x00 0x07 0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x02 0x7f 0x03 0x00 0x03 0x00 0x00", shell=True)
    else:
        """
        OGF: 0x08  (LE controller commands)
        OCF: 0x0006 (LE Set Advertising Parameters command)
        Advertising_Interval_Min: 0xa000 (100 ms)
        Advertising_Interval_Max: 0xa000 (100 ms)
        Advertising_Type: 0x03 (Non connectable undirected advertising (ADV_NONCONN_IND))
        Own_Address_Type: 0x00 (Public device address)
        Peer_Address_Type: 0x00 (Public Device Address or Public Identity Address)
        Peer_Address: 0x000000000000
        Advertising_Channel_Map: 0x07 (All three channels enabled)
        Advertising_Filter_Policy: 0x02 (Process scan requests: all devices, Connection requests: only Filter Accept List)
        """
        subprocess.run(
            "hcitool -i hci0 cmd 0x08 0x0006 a0 00 a0 00 03 00 00 00 00 00 00 00 00 07 02", shell=True)

    # Enable (Extended) Bluetooth Advertisement
    if is_bt5:
        """
        OGF: 0x08 (LE controller commands)
        OCF: 0x0039 (LE Set Extended Advertising Enable command)
        Enable: 0x01 (True)
        Num_Sets: 0x01 (Number of advertising sets to enable)
        Advertising_Handle[i]: 0x00 (Advertising set's handles)
        Duration[i]: 0x0000 (Advertise until the Host disables it)
        Max_Extended_Advertising_Events[i]: 0x00 (No maximum)
        """
        subprocess.run("hcitool -i hci0 cmd 0x08 0x0039 0x01 0x01 0x00 0x00 0x00 0x00", shell=True)
    else:
        """
        OGF: 0x08  (LE controller commands)
        OCF: 0x000a (LE Set Advertising Enable command)
        Enable: 0x01 (True)
        """
        subprocess.run("hcitool -i hci0 cmd 0x08 0x000a 01", shell=True)

    gpsd = None
    if not args.run_scenario and args.gps:
        gpsd = GpsPoller()
        gpsd.start()

    i = 0
    try:
        while True:
            os.system('clear')
            print("The flight_id is {}".format(flight_id))

            # Send basic_id and auth at least once every 3 seconds.
            # Send basic_id first second
            if i % 3 == 0:
                basic_id_update(hhit)
    
            # Send first half of auth pages second second
            elif i % 3 == 1:
                auth_update(auth_pages[:2], 1)
            # Send last half of auth pages last second
            elif i % 3 == 2:
                auth_update(auth_pages[2:], 2)

            # saving the hash_location information such as lon & lat, to use as previous location_hash later in Manifests
            prev_location_info = location_info  
            # Location updates at least once every second.
            Drip_Wrapper(gpsd, iroha, flight_id, hi, exp_date)
            # Manifest broadcast 
            manifest_broadcast(hi, flight_id, exp_date, prev_location_info)

            if args.run_scenario:
                g_lat += random.uniform(0.0001, 0.001) * \
                    random.choice([-1, 0, 1])
                g_lon += random.uniform(0.0001, 0.001) * \
                    random.choice([-1, 0, 1])
                
                print(g_lat, g_lon)
                
            i += 1
            time.sleep(1 - time.monotonic() % 1)
    except KeyboardInterrupt:
        # Reset bt
        subprocess.run("hcitool -i hci0 cmd 0x03 0x0003", shell=True)


if __name__ == '__main__':
    main()