Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 124 additions & 76 deletions scripts/artifacts/geodMapTiles.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,38 @@
""" geodMapTiles """
__artifacts_v2__ = {
"geodMapTiles": {
"name": "GeoD Maptiles",
"description": "Parses Map Tile Records from Apple geod Cache",
"author": "@ydkhatri",
"version": "0.0.2",
"date": "2024-10-17",
"creation_date": "2024-10-17",
"last_update_date": "2026-06-17",
"requirements": "none",
"category": "Location",
"notes": "",
"paths": ('**/MapTiles.sqlitedb*'),
"output_types": ['lava', 'tsv', 'timeline']
"output_types": "standard"
}
}

import base64
import gzip
import struct
import sqlite3
import zlib
from io import BytesIO

from scripts.artifact_report import ArtifactHtmlReport
from scripts.ilapfuncs import logfunc, open_sqlite_db_readonly, does_table_exist_in_db, artifact_processor
from PIL import Image
from pillow_heif import register_heif_opener
from scripts.filetype import guess_mime
from scripts.ilapfuncs import (
artifact_processor,
check_in_embedded_media,
does_table_exist_in_db,
logfunc,
open_sqlite_db_readonly
)


def ReadVLOC(data):
def read_vloc(data):
names = []
total_len = len(data)
pos = 8
Expand All @@ -43,31 +52,40 @@ def ReadVLOC(data):
return names


def ParseTCOL(data):
def parse_tcol(data):
'''returns tuple (VMP4 places, VLOC places)'''
tcol_places = []
data_size = len(data)
if data_size >=8:
tcol_data_offset = struct.unpack('<I', data[4:8])[0]
tcol_compressed_data = data[tcol_data_offset:]
if tcol_compressed_data:
try:
tcol_places = gzip.decompress(tcol_compressed_data)
#print("VLOC ->", tcol_places)
except (OSError, EOFError, zlib.error) as ex:
logfunc('Gzip decompression error from ParseTCOL() - ' + str(ex))
tcol_places = ''
vmp4_places = ParseVMP4(data[8:tcol_data_offset])
return vmp4_places, ReadVLOC(tcol_places)


def ParseVMP4(data):
if data_size < 8:
return [], []

tcol_data_offset = struct.unpack('<I', data[4:8])[0]
tcol_compressed_data = data[tcol_data_offset:]
if tcol_compressed_data:
try:
tcol_places = gzip.decompress(tcol_compressed_data)
#print("VLOC ->", tcol_places)
except (OSError, EOFError, zlib.error) as ex:
logfunc('Gzip decompression error from ParseTCOL() - ' + str(ex))
tcol_places = ''
vmp4_places = parse_vmp4(data[8:tcol_data_offset])
return vmp4_places, read_vloc(tcol_places)


def parse_vmp4(data):
if len(data) < 8:
return []

num_items = struct.unpack('<H', data[6:8])[0]
pos = 8
for x in range(num_items):
if pos + 10 > len(data):
break
item_type, offset, size = struct.unpack("<HII", data[pos:pos + 10])
if item_type == 10:
item_data = data[offset:offset + size]
if not item_data:
return []
if item_data[0] == 1:
compressed_data = item_data[5:]
try:
Expand All @@ -89,53 +107,106 @@ def get_hex(num):
return ''


@artifact_processor
def geodMapTiles(files_found, report_folder, seeker, wrap_text, timezone_offset):
def check_in_tile_image(source_file, data, name):
mime_type = guess_mime(data)
if not (mime_type and mime_type.startswith('image/')):
return None

if mime_type == 'image/heic':
try:
register_heif_opener()
with Image.open(BytesIO(data)) as image:
converted = BytesIO()
image.convert('RGB').save(converted, format='JPEG')
data = converted.getvalue()
mime_type = 'image/jpeg'
force_extension = 'jpg'
except (OSError, ValueError) as ex:
logfunc(f'Failed to convert HEIC map tile image, using original. Error was: {ex}')
force_extension = None
else:
force_extension = None

return check_in_embedded_media(
source_file, data, name, force_type=mime_type, force_extension=force_extension)

report_file = 'Unknown'

for file_found in files_found:
@artifact_processor
def geodMapTiles(context):
""" see artifact description """
file_found = ''
data_headers = (
("Timestamp", "datetime"), "Places_from_VLOC", "Labels_in_tile",
("Image", "media", "max-height:240px; max-width:320px;"), "Tileset",
"Key A", "Key B", "Key C", "Key D"
)#, "Size", "ETAG")

for file_found in context.get_files_found():
file_found = str(file_found)

if file_found.endswith('.sqlitedb'):
break
else:
logfunc('No MapTiles.sqlitedb file found.')
return (), [], ''

#os.chmod(file_found, 0o0777)
db = open_sqlite_db_readonly(file_found)
if not db:
return (), [], file_found

db.row_factory = sqlite3.Row
cursor = db.cursor()
usesDataTable = True


if does_table_exist_in_db(file_found, 'tiles') and does_table_exist_in_db(file_found, 'data'):
logfunc('Parsing Geolocation from data table with tiles table.')
query = '''
SELECT datetime(access_times.timestamp, 'unixepoch') as timestamp, key_a, key_b, key_c, key_d, tileset, data, size, etag
SELECT
datetime(access_times.timestamp, 'unixepoch') as timestamp,
key_a, key_b, key_c, key_d, tileset, data, size, etag
FROM data
INNER JOIN tiles on data.ROWID = tiles.data_pk
INNER JOIN access_times on data.rowid = access_times.data_pk
'''
elif does_table_exist_in_db(file_found, 'data'):
logfunc('Parsing Geolocation from data table.')
logfunc('Parsing Geolocation from data table.')
query = '''
SELECT datetime(access_times.timestamp, 'unixepoch') as timestamp, key_a, key_b, key_c, key_d, tileset, data, size, etag
SELECT
datetime(access_times.timestamp, 'unixepoch') as timestamp,
key_a, key_b, key_c, key_d, tileset, data, size, etag
FROM data
INNER JOIN access_times on data.rowid = access_times.data_pk
'''
else:
elif does_table_exist_in_db(file_found, 'image'):
logfunc('Parsing Geolocation from image table.')
usesDataTable = False
query = '''
SELECT datetime(retrieved, 'unixepoch') as timestamp, a, b, c, d, tileset, data, size, etag
SELECT
datetime(retrieved, 'unixepoch') as timestamp,
a as key_a,
b as key_b,
c as key_c,
d as key_d,
tileset,
data,
size,
etag
FROM image
'''
else:
logfunc('No supported Map Tiles tables found. No data available.')
db.close()
return (), [], file_found

try:
cursor.execute(query)
except Exception as e:
print(e)
logfunc('Table is missing columns. No data available.')
return
except sqlite3.Error as ex:
logfunc(f'Table is missing columns. No data available. Error was: {ex}')
db.close()
return (), [], file_found

all_rows = cursor.fetchall()
db.close()

data_list = []
if len(all_rows) > 0:
for row in all_rows:
Expand All @@ -144,44 +215,21 @@ def geodMapTiles(files_found, report_folder, seeker, wrap_text, timezone_offset)
data_parsed = ''
data = row['data']
if data: # NULL sometimes
if len(data) >= 11 and data[:11] == b'\xff\xd8\xff\xe0\x00\x10\x4a\x46\x49\x46\x00':
img_base64 = base64.b64encode(data).decode('utf-8')
img_html = f'<img src="data:image/jpeg;base64, {img_base64}" alt="Map Tile" />'
data_parsed = img_html
elif len(data) >= 4 and data[:4] == b'TCOL':
vmp4_places, tcol_places = ParseTCOL(data)
name = f"Map Tile {get_hex(row['tileset'])} {get_hex(row['key_a'])}-{get_hex(row['key_b'])}"
data_parsed = check_in_tile_image(file_found, data, name)
if not data_parsed and len(data) >= 4 and data[:4] == b'TCOL':
vmp4_places, tcol_places = parse_tcol(data)
vmp4_places = ", ".join(vmp4_places)
tcol_places = ", ".join(tcol_places)
elif len(data) >=4 and data[:4] == b'VMP4':
vmp4_places = ParseVMP4(data)
elif not data_parsed and len(data) >= 4 and data[:4] == b'VMP4':
vmp4_places = parse_vmp4(data)
vmp4_places = ", ".join(vmp4_places)
#else:
#header_bytes = data[:28]
#hexdump = generate_hexdump(header_bytes, 5) if header_bytes else ''
#data_parsed = hexdump

if usesDataTable:
data_list.append((row['timestamp'], tcol_places, vmp4_places, data_parsed, get_hex(row['tileset']),
get_hex(row['key_a']), get_hex(row['key_b']), get_hex(row['key_c']), get_hex(row['key_d'])) )
# row['size']) , row['etag']))
else:
data_list.append((row['timestamp'], tcol_places, vmp4_places, data_parsed, get_hex(row['tileset']),
get_hex(row['a']), get_hex(row['b']), get_hex(row['c']), get_hex(row['d'])) )
# row['size']) , row['etag']))

description = ''
report = ArtifactHtmlReport('Geolocation')
report.start_artifact_report(report_folder, 'Map Tile Cache', description)
report.add_script()
data_headers = ["Timestamp", "Places_from_VLOC", "Labels_in_tile", "Image", "Tileset", "Key A", "Key B", "Key C", "Key D"]#, "Size", "ETAG")
report.write_artifact_data_table(data_headers, data_list, file_found, html_escape = False)
report.end_artifact_report()

db.close()

data_headers[0] = (data_headers[0], 'datetime')
row_data = (
row['timestamp'], tcol_places, vmp4_places, data_parsed,
get_hex(row['tileset']), get_hex(row['key_a']), get_hex(row['key_b']),
get_hex(row['key_c']), get_hex(row['key_d'])
)
data_list.append(row_data)

# remove the image from lava output until Media Manager is ready
data_list = [(row[0], row[1], row[2], 'See HTML Report', row[4], row[5], row[6], row[7], row[8])
if row[3] else row for row in data_list]
return data_headers, data_list, file_found
return data_headers, data_list, file_found
Loading