Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 36 additions & 44 deletions scripts/artifacts/parsecdCache.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
""" parsecdCache """
__artifacts_v2__ = {
"get_parseCDCache": {
"parse_cd_cache": {
"name": "ParseCD Cache",
"description": "Parses Spotlight search completion from ParseCD Cache database. If completion is a bundle ID it's likely that application opened as a result of completing the search",
"description": (
"Parses Spotlight search completion from ParseCD Cache "
"database. If completion is a bundle ID it's likely that "
"application opened as a result of completing the search"
),
"author": "@JohnHyla",
"version": "0.0.1",
"date": "2024-10-17",
"creation_date": "2024-10-17",
"last_update_date": "2026-06-18",
"requirements": "none",
"category": "Spotlight Searches",
"notes": "",
Expand All @@ -14,65 +19,52 @@
}


from scripts.ilapfuncs import open_sqlite_db_readonly, convert_ts_human_to_utc
from scripts.ilapfuncs import artifact_processor, convert_utc_human_to_timezone
from scripts.ilapfuncs import (
artifact_processor,
convert_cocoa_core_data_ts_to_utc,
does_column_exist_in_db,
get_sqlite_db_records,
)


@artifact_processor
def get_parseCDCache(files_found, report_folder, seeker, wrap_text, timezone_offset):
def parse_cd_cache(context):
""" see artifact description """
data_list = []
report_file = "Unknown"
has_score = False
for file_found in files_found:

for file_found in context.get_files_found():
file_found = str(file_found)
if not file_found.endswith(".db"):
continue # Skip all other files
report_file = file_found
db = open_sqlite_db_readonly(file_found)

cursor = db.cursor()

cursor.execute("pragma table_info(completion_cache_engagement)")
cols = [row[1] for row in cursor.fetchall()]
has_score = "score" in cols

select_cols = [
"datetime(engagement_date + 978307200,'unixepoch') as engagement_date",
"input",
"completion",
"transformed",
]
if has_score:
select_cols.append("score")
report_file = context.get_relative_path(file_found)
score_column = "score"
if not does_column_exist_in_db(file_found, "completion_cache_engagement", "score"):
score_column = "NULL AS score"

qry = f"""
select {", ".join(select_cols)}
select
engagement_date,
input,
completion,
transformed,
{score_column}
from completion_cache_engagement
"""

cursor.execute(qry)
all_rows = cursor.fetchall()
all_rows = get_sqlite_db_records(file_found, qry)

for row in all_rows:
timestamp = convert_ts_human_to_utc(row[0])
timestamp = convert_utc_human_to_timezone(timestamp, timezone_offset)

# Save data to temp variable to check whether has "score" column or no
temp = [timestamp, row[1], row[2], row[3]]
if has_score:
temp.append(row[4])

data_list.append(tuple(temp))
timestamp = convert_cocoa_core_data_ts_to_utc(row[0])

db.close()
data_list.append((timestamp, row[1], row[2], row[3], row[4]))

data_headers = [
data_headers = (
("Engagement Date", "datetime"),
"Input",
"Completion",
"Transformed",
]
if has_score:
data_headers.append("Score")
"Score",
)

return tuple(data_headers), data_list, report_file
return data_headers, data_list, report_file
Loading