Skip to content
Snippets Groups Projects
Commit dabdf97a authored by Hampus Rosenquist's avatar Hampus Rosenquist
Browse files

Merge branch 'main' of gitlab.liu.se:hamro777/tdde21-drip-2022

parents 9cf2aa87 d4fb1b29
No related branches found
No related tags found
No related merge requests found
...@@ -32,7 +32,7 @@ FLIGHT_ID_FILE = os.path.join(FILE_DIR, "default_flight_id.txt") ...@@ -32,7 +32,7 @@ FLIGHT_ID_FILE = os.path.join(FILE_DIR, "default_flight_id.txt")
load_dotenv() load_dotenv()
payload = "" hash_location = "" #hash of location info such lat & long, it is saved to send later in manifests
def separate_bytes(hex_str, delim=' '): def separate_bytes(hex_str, delim=' '):
return re.sub(r'.{2}', r'\g<0>{}'.format(delim), hex_str).strip() return re.sub(r'.{2}', r'\g<0>{}'.format(delim), hex_str).strip()
...@@ -80,7 +80,7 @@ def get_auth_pages(signature: bytes, auth_type=0x10): ...@@ -80,7 +80,7 @@ def get_auth_pages(signature: bytes, auth_type=0x10):
return auth_pages return auth_pages
# Placing a single location message For Now # Placing a single location message For Now
def Drip_Wrapper(gpsd: GpsPoller, iroha: DripIrohaAccount, flight_id): def Drip_Wrapper(gpsd: GpsPoller, iroha: DripIrohaAccount, flight_id, HostID: HostIdentity, exp_date):
# ASTM F3-411 Location message 0x1 # ASTM F3-411 Location message 0x1
print("location_update") print("location_update")
global executor global executor
...@@ -98,20 +98,22 @@ def Drip_Wrapper(gpsd: GpsPoller, iroha: DripIrohaAccount, flight_id): ...@@ -98,20 +98,22 @@ def Drip_Wrapper(gpsd: GpsPoller, iroha: DripIrohaAccount, flight_id):
lat = separate_bytes(astm_float_to_int_hex(g_lat)) lat = separate_bytes(astm_float_to_int_hex(g_lat))
lon = separate_bytes(astm_float_to_int_hex(g_lon)) lon = separate_bytes(astm_float_to_int_hex(g_lon))
payload = "10 00 00 00 {lat} {lon} 00 00 00 00 00 00 00 00 {ts} 00 00".format( location_info = "10 00 00 00 {lat} {lon} 00 00 00 00 00 00 00 00 {ts} 00 00".format(
lat=lat, lon=lon, ts="00 00") lat=lat, lon=lon, ts="00 00")
if(location_info):
hash_location = cSHAKE128( location_info,25,"", flight_id) # hash of location_info, 25 bytes
if iroha: # if iroha private supplied, send location transactions on separate thread. if iroha: # if iroha private supplied, send location transactions on separate thread.
# location will be flight id concatenated with the bluetooth payload # location will be flight id concatenated with the bluetooth payload
# In order to save space we can b64encode the bytes of the payload, # In order to save space we can b64encode the bytes of the payload,
# instead of sending a hexstring which would result in location message # instead of sending a hexstring which would result in location message
# taking up double the bytes (2 characters per byte). # taking up double the bytes (2 characters per byte).
iroha_msg = b64encode( iroha_msg = b64encode(
unhexlify(flight_id + ''.join(payload.split(' ')))).decode("utf-8") unhexlify(flight_id + ''.join(location_info.split(' ')))).decode("utf-8")
executor.submit( executor.submit(
iroha.set_account_details, "location", iroha_msg iroha.set_account_details, "location", iroha_msg
) )
payload = HostID.generate_self_attestation_Wrapper(hash_location, get_timestamp(exp_date))
# Set (Extended) Advertising command # Set (Extended) Advertising command
if is_bt5: if is_bt5:
""" """
...@@ -124,7 +126,7 @@ def Drip_Wrapper(gpsd: GpsPoller, iroha: DripIrohaAccount, flight_id): ...@@ -124,7 +126,7 @@ def Drip_Wrapper(gpsd: GpsPoller, iroha: DripIrohaAccount, flight_id):
Advertising_Data: 1e 16 fa ff 0d 00 10 + payload Advertising_Data: 1e 16 fa ff 0d 00 10 + payload
""" """
subprocess.run( subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0037 0x00 0x03 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d 0x00 0x10 {payload}".format(payload=payload), shell=True) "hcitool -i hci0 cmd 0x08 0x0037 0x00 0x03 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d 0x00 0x10 {hash_location}{payload}".format(hash_location=hash_location,payload=payload), shell=True)
else: else:
""" """
OGF: 0x08 (LE controller commands) OGF: 0x08 (LE controller commands)
...@@ -133,30 +135,17 @@ def Drip_Wrapper(gpsd: GpsPoller, iroha: DripIrohaAccount, flight_id): ...@@ -133,30 +135,17 @@ def Drip_Wrapper(gpsd: GpsPoller, iroha: DripIrohaAccount, flight_id):
Advertising_Data: 1e 16 fa ff 0d 00 10 + payload Advertising_Data: 1e 16 fa ff 0d 00 10 + payload
""" """
subprocess.run( subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0008 1f 1e 16 fa ff 0d 00 10 {payload}".format(payload=payload), shell=True) "hcitool -i hci0 cmd 0x08 0x0008 1f 1e 16 fa ff 0d 00 10 {hash_location}{payload}".format(hash_location=hash_location, payload=payload), shell=True)
def manifest_broadcast(payload, flight_id): ## Manifest broadcast can contain max 11 hashes (at 8-bytes each). For now we are sending one message. def manifest_broadcast(HostID: HostIdentity, flight_id, prev_hash_loc, exp_date): ## Manifest broadcast can contain max 11 hashes (at 8-bytes each). For now we are sending one message.
# ASTM F3-411 Location message 0x1 # ASTM F3-411 Location message 0x1
print("manifest_broadcast") print("manifest_broadcast")
hash_payload = " "
# If gpsd is active, check that it has received values # If gpsd is active, check that it has received values
if payload == None: if hash_location == None:
print("payload is empty, cannot generate hash_payload") print("payload is empty, cannot generate hash_payload for gps information")
if(payload):
hash_payload = cSHAKE128(
payload,
8,
"",
flight_id
)
print(hash_payload + "aaaaaaaaaa")
payload = HostID.generate_self_attestation_Manifests(cSHAKE128(prev_hash_loc,8, "",flight_id),cSHAKE128(hash_location,8,"",flight_id), get_timestamp(exp_date))
# Set (Extended) Advertising command # Set (Extended) Advertising command
if is_bt5: if is_bt5:
""" """
...@@ -169,7 +158,7 @@ def manifest_broadcast(payload, flight_id): ## Manifest broadcast can contain ma ...@@ -169,7 +158,7 @@ def manifest_broadcast(payload, flight_id): ## Manifest broadcast can contain ma
Advertising_Data: 1e 16 fa ff 0d 00 10 + payload Advertising_Data: 1e 16 fa ff 0d 00 10 + payload
""" """
subprocess.run( subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0037 0x00 0x03 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d 0x00 0x10 {payload}".format(payload=hash_payload), shell=True) #0x03 "hcitool -i hci0 cmd 0x08 0x0037 0x00 0x03 0x01 0x1f 0x1e 0x16 0xfa 0xff 0x0d 0x00 0x10 {payload}".format(payload=payload), shell=True) #0x03
else: else:
""" """
OGF: 0x08 (LE controller commands) OGF: 0x08 (LE controller commands)
...@@ -178,7 +167,7 @@ def manifest_broadcast(payload, flight_id): ## Manifest broadcast can contain ma ...@@ -178,7 +167,7 @@ def manifest_broadcast(payload, flight_id): ## Manifest broadcast can contain ma
Advertising_Data: 1e 16 fa ff 0d 00 10 + payload Advertising_Data: 1e 16 fa ff 0d 00 10 + payload
""" """
subprocess.run( subprocess.run(
"hcitool -i hci0 cmd 0x08 0x0008 1f 1e 16 fa ff 0d 00 10 {payload}".format(payload=hash_payload), shell=True) "hcitool -i hci0 cmd 0x08 0x0008 1f 1e 16 fa ff 0d 00 10 {payload}".format(payload=payload), shell=True)
...@@ -281,9 +270,6 @@ def main(): ...@@ -281,9 +270,6 @@ def main():
type=str, type=str,
help="Specify the registry identity xml file." help="Specify the registry identity xml file."
) )
parser.add_argument( parser.add_argument(
"-i", "--iroha", "-i", "--iroha",
dest="iroha_priv_key_file", dest="iroha_priv_key_file",
...@@ -344,6 +330,7 @@ def main(): ...@@ -344,6 +330,7 @@ def main():
print("The specified file is not an xml file") print("The specified file is not an xml file")
sys.exit(-1) sys.exit(-1)
#HostIdentity object containing Drone DRIP credentials
hi = HostIdentity.from_file(hi_file) hi = HostIdentity.from_file(hi_file)
# registry host identity # registry host identity
...@@ -371,13 +358,10 @@ def main(): ...@@ -371,13 +358,10 @@ def main():
hhit = separate_bytes(hi.get_hhit()) hhit = separate_bytes(hi.get_hhit())
hid = separate_bytes(hi.get_hid()) hid = separate_bytes(hi.get_hid())
# hhit and signature of the registry # this endorsement should be generated by the registry,
r_hhit = separate_bytes(r_hi.get_hhit()) # but just for now, we are generating it according draft-ietf-drip-auth-17
r_signature = HostIdentity.generate_self_attestation(HostIdentity,astm_time()) # this signature should be generated by the registry, attest = r_hi.generate_attestation_Endorsement_Broadcast(hhit,hid,get_timestamp(exp_date))
# but just for now, we are generating the r_signature using the same function used for drone signature generate function
attest = hi.generate_self_concise_attestation(r_hhit,r_signature,get_timestamp(exp_date))
if attest is None: if attest is None:
print("Expiration date has expired.") print("Expiration date has expired.")
sys.exit(-1) sys.exit(-1)
...@@ -469,10 +453,13 @@ def main(): ...@@ -469,10 +453,13 @@ def main():
elif i % 3 == 2: elif i % 3 == 2:
auth_update(auth_pages[2:], 2) auth_update(auth_pages[2:], 2)
# saving the hash_location information such as lon & lat, to use as previous location_hash later in Manifests
prev_hash_location = hash_location
# Location updates at least once every second. # Location updates at least once every second.
Drip_Wrapper(gpsd, iroha, flight_id) Drip_Wrapper(gpsd, iroha, flight_id, hi, exp_date)
# Manifest broadcast # Manifest broadcast
manifest_broadcast(payload, flight_id) manifest_broadcast(hi, flight_id, prev_hash_location, exp_date)
if args.run_scenario: if args.run_scenario:
g_lat += random.uniform(0.0001, 0.001) * \ g_lat += random.uniform(0.0001, 0.001) * \
random.choice([-1, 0, 1]) random.choice([-1, 0, 1])
......
...@@ -74,13 +74,37 @@ class HostIdentity: ...@@ -74,13 +74,37 @@ class HostIdentity:
sign_key = SigningKey(unhexlify(self._priv_key)) sign_key = SigningKey(unhexlify(self._priv_key))
return hexlify(sign_key.sign(unhexlify(message))) return hexlify(sign_key.sign(unhexlify(message)))
## Function for DRIP_Wrapper
def generate_self_attestation_Wrapper(self,payload, expiration):
if expiration <= astm_time():
print("expiration timestamp has already expired.")
return None
#hash of SAM Type for Wrapper, which is 0x02
hash_SAM = cSHAKE128( '0x02', 1,"","")
message = hash_SAM + self._hhit + payload + "{:>08x}".format(convert_be_to_le(astm_time())) + "{:>08x}".format(convert_be_to_le(expiration))
sign_key = SigningKey(unhexlify(self._priv_key))
return hexlify(sign_key.sign(unhexlify(message)))
## Function for DRIP_Manifests
def generate_self_attestation_Manifests(self, prev_hashed_loc, hashed_loc, expiration): #hashed_loc is hashes of location for now its single, but it can be upto 11(Each 8 byte)
if expiration <= astm_time():
print("expiration timestamp has already expired.")
return None
#hash of SAM Type for Manifests, which is 0x03
hash_SAM = cSHAKE128( '0x03', 1,"","")
message = hash_SAM + self._hhit + prev_hashed_loc + hashed_loc + "{:>08x}".format(convert_be_to_le(astm_time())) + "{:>08x}".format(convert_be_to_le(expiration))
sign_key = SigningKey(unhexlify(self._priv_key))
return hexlify(sign_key.sign(unhexlify(message)))
def generate_self_concise_attestation(self, expiration): def generate_attestation_Endorsement_Broadcast(self, Drone_hhit, Drone_hid, expiration):
if expiration <= astm_time(): if expiration <= astm_time():
print("expiration timestamp has already expired.") print("expiration timestamp has already expired.")
return None return None
message = self._hhit + "{:>08x}".format(convert_be_to_le(expiration)) message = self._hhit + Drone_hhit + Drone_hid +"{:>08x}".format(convert_be_to_le(astm_time())) + "{:>08x}".format(convert_be_to_le(expiration))
sign_key = SigningKey(unhexlify(self._priv_key)) sign_key = SigningKey(unhexlify(self._priv_key))
return sign_key.sign(unhexlify(message)) return sign_key.sign(unhexlify(message))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment