@@ -1,11 +1,5 @@
# WhatsApp Chat Viewer
#
# This script reads a WhatsApp ChatStorage.sqlite database and associated media files
# to generate a browsable HTML archive of chat conversations.
#
# Author: Gemini
# Date: September 7, 2025
# Version: 1.3 - Improved name resolution to avoid displaying encoded strings.
#! /usr/bin/env python3
# WhatsApp Chat Archiver
import sqlite3
import os
@@ -266,6 +260,7 @@ def generate_html_chat(db_path, media_path, output_dir, chat_id, chat_name, is_g
def process_iphone_backup(backup_path, output_dir):
"""
Processes the iPhone backup manifest.db, extracts WhatsApp shared files, and recreates the file structure in output_dir.
Acts as an archiver to accumulate data across multiple imports without overwriting existing data.
"""
manifest_db_path = os.path.join(backup_path, 'Manifest.db')
if not os.path.exists(manifest_db_path):
@@ -273,24 +268,221 @@ def process_iphone_backup(backup_path, output_dir):
return
# Connect to manifest.db and extract WhatsApp shared files
conn = sqlite3.connect(manifest_db_path)
cursor = conn.cursor()
cursor.execute("SELECT fileID, domain, relativePath FROM Files WHERE domain = ?", ('AppDomainGroup-group.net.whatsapp.WhatsApp.shared',))
files = cursor.fetchall()
backup_ conn = sqlite3.connect(manifest_db_path)
backup_ cursor = backup_ conn.cursor()
backup_ cursor.execute("SELECT fileID, domain, relativePath FROM Files WHERE domain = ?", ('AppDomainGroup-group.net.whatsapp.WhatsApp.shared',))
files = backup_ cursor.fetchall()
print(f"Found {len(files)} WhatsApp shared files in manifest.db.")
backup_conn.close()
# Count for statistics
new_files = 0
updated_files = 0
skipped_files = 0
special_db_files = 0
# Check for SQLite database files that need special handling
db_files_to_merge = [
'ChatStorage.sqlite',
'CallHistory.sqlite',
'DeviceAgents.sqlite',
'Labels.sqlite',
'Ranking.sqlite',
'Sticker.sqlite'
]
# Prepare to recreate file structure
for fileID, domain, relativePath in files:
src_file = os.path.join(backup_path, fileID[:2], fileID)
dest_file = os.path.join(output_dir, relativePath)
os.makedirs(os.path.dirname(dest_file), exist_ok=True)
if os.path.exists(src_file):
if not os.path.exists(dest_file):
if not os.path.exists(src_file):
print(f"Source file missing: {src_file}")
skipped_files += 1
continue
# Handle SQLite database files specially - merge data instead of overwriting
file_basename = os.path.basename(dest_file)
if file_basename in db_files_to_merge and os.path.exists(dest_file):
special_db_files += 1
try:
# For SQLite databases, we need to merge the data
if file_basename == 'ChatStorage.sqlite':
merge_chat_database(src_file, dest_file)
else:
# For other SQLite databases, make a backup and then replace
# Future enhancement: implement proper merging for all database types
backup_file = f"{dest_file}.backup_{datetime.now().strftime('%Y%m%d%H%M%S')}"
shutil.copy2(dest_file, backup_file)
print(f"Created backup of {file_basename} as {os.path.basename(backup_file)}")
shutil.copy2(src_file, dest_file)
except Exception as e:
print(f"Error processing database {dest_file}: {e}")
continue
# For non-database files
if os.path.exists(dest_file):
# If file exists, we want to keep the newer one
# For media files, we always keep them (accumulate data)
is_media_file = any(relativePath.startswith(prefix) for prefix in ['Media/', 'Message/', 'ProfilePictures/', 'Avatar/'])
if is_media_file:
# For media files, don't overwrite but create a version with timestamp if different
if not files_are_identical(src_file, dest_file):
filename, ext = os.path.splitext(dest_file)
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
new_dest_file = f"{filename}_{timestamp}{ext}"
try:
shutil.copy2(src_file, new_dest_file)
print(f"Saved additional version of media file: {os.path.relpath(new_dest_file, output_dir)}")
new_files += 1
except Exception as e:
print(f"Error copying alternate version {src_file}: {e}")
skipped_files += 1
else:
skipped_files += 1
else:
# For non-media files, we'll take the newer one
try:
shutil.copy2(src_file, dest_file)
updated_files += 1
except Exception as e:
print(f"Error copying {src_file} to {dest_file}: {e}")
print(f"Error updating {dest_file}: {e}")
skipped_files += 1
else:
print(f"Source file missing: {src_file}")
# If file doesn't exist, copy it
try:
shutil.copy2(src_file, dest_file)
new_files += 1
except Exception as e:
print(f"Error copying {src_file} to {dest_file}: {e}")
skipped_files += 1
print(f"\nBackup import summary:")
print(f"- Added {new_files} new files")
print(f"- Updated {updated_files} existing files")
print(f"- Special handling for {special_db_files} database files")
print(f"- Skipped {skipped_files} files")
def files_are_identical(file1, file2):
"""Compare two files to see if they are identical in content."""
if os.path.getsize(file1) != os.path.getsize(file2):
return False
# For larger files, just compare a sample to avoid reading entire files into memory
if os.path.getsize(file1) > 1024*1024: # 1MB threshold
with open(file1, 'rb') as f1, open(file2, 'rb') as f2:
# Compare the first and last 4KB of the file
start1 = f1.read(4096)
start2 = f2.read(4096)
if start1 != start2:
return False
f1.seek(-4096, 2) # 2 is os.SEEK_END
f2.seek(-4096, 2)
end1 = f1.read(4096)
end2 = f2.read(4096)
return end1 == end2
else:
# For smaller files, read entire contents for comparison
with open(file1, 'rb') as f1, open(file2, 'rb') as f2:
return f1.read() == f2.read()
def merge_chat_database(src_file, dest_file):
"""
Merge WhatsApp chat databases to combine messages from multiple backups.
This preserves all existing messages and adds only new ones.
"""
print(f"Merging chat databases to preserve existing messages...")
# Create a temporary copy for processing
temp_file = f"{dest_file}.temp"
shutil.copy2(dest_file, temp_file)
try:
# Connect to both databases
src_conn = sqlite3.connect(src_file)
dest_conn = sqlite3.connect(temp_file)
# Make it safer by enabling foreign keys
src_conn.execute("PRAGMA foreign_keys = OFF")
dest_conn.execute("PRAGMA foreign_keys = OFF")
# Get all messages from source
src_cursor = src_conn.cursor()
src_cursor.execute("SELECT Z_PK FROM ZWAMESSAGE")
src_message_ids = {row[0] for row in src_cursor.fetchall()}
# Get all messages from destination to avoid duplicates
dest_cursor = dest_conn.cursor()
dest_cursor.execute("SELECT Z_PK FROM ZWAMESSAGE")
dest_message_ids = {row[0] for row in dest_cursor.fetchall()}
# Find new message IDs that don't exist in the destination
new_message_ids = src_message_ids - dest_message_ids
if not new_message_ids:
print("No new messages to import")
src_conn.close()
dest_conn.close()
os.remove(temp_file)
return
print(f"Found {len(new_message_ids)} new messages to import")
# Tables that need to be merged (simplified for this example)
tables_to_check = [
"ZWAMESSAGE", "ZWAMEDIAITEM", "ZWAGROUPMEMBER",
"ZWACHATSESSION", "ZWAPROFILEPUSHNAME"
]
# For each table, copy new records
for table in tables_to_check:
# Check if table exists
src_cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table}'")
if not src_cursor.fetchone():
print(f"Table {table} doesn't exist in source database, skipping...")
continue
# Get column names
src_cursor.execute(f"PRAGMA table_info({table})")
columns = [row[1] for row in src_cursor.fetchall()]
column_str = ", ".join(columns)
# For each message ID, copy related records
for msg_id in new_message_ids:
# This is simplified - in reality you'd need more complex logic to follow foreign key relationships
src_cursor.execute(f"SELECT {column_str} FROM {table} WHERE Z_PK = ?", (msg_id,))
rows = src_cursor.fetchall()
for row in rows:
# Skip existing records with same primary key
dest_cursor.execute(f"SELECT 1 FROM {table} WHERE Z_PK = ?", (row[0],))
if dest_cursor.fetchone():
continue
# Insert new record
placeholders = ", ".join(["?" for _ in row])
dest_cursor.execute(f"INSERT OR IGNORE INTO {table} ({column_str}) VALUES ({placeholders})", row)
# Commit changes
dest_conn.commit()
# Close connections
src_conn.close()
dest_conn.close()
# Replace destination file with merged file
os.rename(temp_file, dest_file)
print(f"Successfully merged chat databases")
except Exception as e:
print(f"Error merging databases: {e}")
if os.path.exists(temp_file):
os.remove(temp_file)
def main():