Files
proxmox-infra/scripts/cleanup/organize-media.py
kavren 96d413602f docs: Update Sonarr IP to 10.4.2.20, Jellyseerr to 10.4.2.25
Fixed outdated IP addresses across documentation and scripts.
Sonarr LXC 105 is at 10.4.2.20, not 10.4.2.15.
Jellyseerr LXC 115 is at 10.4.2.25, not 10.4.2.20.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-12 18:24:17 -05:00

410 lines
14 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Media Organization Script
Compares media files against Radarr/Sonarr managed files and moves unmanaged files to processing folder.
"""
import argparse
import json
import os
import shutil
import sys
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Set, Tuple
import urllib.request
import urllib.error
# Configuration
RADARR_URL = "http://10.4.2.16:7878"
RADARR_API_KEY = "5e6796988abf4d6d819a2b506a44f422"
SONARR_URL = "http://10.4.2.20:8989"
SONARR_API_KEY = "b331fe18ec2144148a41645d9ce8b249"
MEDIA_DIRS = {
"movies": "/mnt/pve/elantris-media/movies",
"tv": "/mnt/pve/elantris-media/tv",
"anime": "/mnt/pve/elantris-media/anime"
}
# Path translation: Radarr/Sonarr see /media/* but files are at /mnt/pve/elantris-media/*
PATH_MAPPING = {
"/media/movies": "/mnt/pve/elantris-media/movies",
"/media/tv": "/mnt/pve/elantris-media/tv",
"/media/anime": "/mnt/pve/elantris-media/anime"
}
PROCESSING_DIR = "/mnt/pve/elantris-media/processing"
VIDEO_EXTENSIONS = {'.mkv', '.mp4', '.avi', '.m4v', '.ts', '.wmv', '.flv', '.webm'}
class MediaOrganizer:
def __init__(self, dry_run: bool = True, verbose: bool = True):
self.dry_run = dry_run
self.verbose = verbose
self.managed_files: Set[str] = set()
self.unmanaged_files: Dict[str, List[Path]] = {
"movies": [],
"tv": [],
"anime": []
}
self.stats = {
"total_scanned": 0,
"managed": 0,
"unmanaged": 0,
"moved": 0,
"errors": 0
}
self.log_entries: List[str] = []
def log(self, message: str, level: str = "INFO"):
"""Log a message to console and internal log"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"[{timestamp}] [{level}] {message}"
self.log_entries.append(log_entry)
if self.verbose:
print(log_entry)
def translate_path(self, path: str) -> str:
"""Translate Radarr/Sonarr paths to actual filesystem paths"""
for api_path, real_path in PATH_MAPPING.items():
if path.startswith(api_path):
return path.replace(api_path, real_path, 1)
return path
def api_request(self, url: str, api_key: str, endpoint: str) -> dict:
"""Make an API request to Radarr or Sonarr"""
full_url = f"{url}/api/v3/{endpoint}"
headers = {"X-Api-Key": api_key}
try:
req = urllib.request.Request(full_url, headers=headers)
with urllib.request.urlopen(req, timeout=30) as response:
return json.loads(response.read().decode())
except urllib.error.URLError as e:
self.log(f"API request failed for {full_url}: {e}", "ERROR")
return None
except json.JSONDecodeError as e:
self.log(f"Failed to decode JSON response from {full_url}: {e}", "ERROR")
return None
def get_radarr_files(self) -> Set[str]:
"""Get all file paths managed by Radarr"""
self.log("Querying Radarr for managed movie files...")
managed_files = set()
movies = self.api_request(RADARR_URL, RADARR_API_KEY, "movie")
if not movies:
self.log("Failed to retrieve movies from Radarr", "ERROR")
return managed_files
for movie in movies:
# Get the movie file path if it exists
if movie.get("hasFile") and "movieFile" in movie:
file_path = movie["movieFile"].get("path")
if file_path:
# Translate API path to real filesystem path
real_path = self.translate_path(file_path)
managed_files.add(real_path)
self.log(f" Radarr manages: {file_path} -> {real_path}", "DEBUG")
self.log(f"Found {len(managed_files)} files managed by Radarr")
return managed_files
def get_sonarr_files(self) -> Set[str]:
"""Get all file paths managed by Sonarr"""
self.log("Querying Sonarr for managed TV series files...")
managed_files = set()
series = self.api_request(SONARR_URL, SONARR_API_KEY, "series")
if not series:
self.log("Failed to retrieve series from Sonarr", "ERROR")
return managed_files
for show in series:
series_id = show.get("id")
if not series_id:
continue
# Get episode files for this series
episode_files = self.api_request(
SONARR_URL,
SONARR_API_KEY,
f"episodefile?seriesId={series_id}"
)
if episode_files:
for episode_file in episode_files:
file_path = episode_file.get("path")
if file_path:
# Translate API path to real filesystem path
real_path = self.translate_path(file_path)
managed_files.add(real_path)
self.log(f" Sonarr manages: {file_path} -> {real_path}", "DEBUG")
self.log(f"Found {len(managed_files)} files managed by Sonarr")
return managed_files
def scan_directory(self, directory: Path, media_type: str) -> List[Path]:
"""Scan a directory recursively for video files"""
self.log(f"Scanning {directory} for video files...")
video_files = []
if not directory.exists():
self.log(f"Directory does not exist: {directory}", "WARNING")
return video_files
try:
for root, dirs, files in os.walk(directory):
for file in files:
file_path = Path(root) / file
if file_path.suffix.lower() in VIDEO_EXTENSIONS:
video_files.append(file_path)
self.stats["total_scanned"] += 1
except PermissionError as e:
self.log(f"Permission denied accessing {directory}: {e}", "ERROR")
self.stats["errors"] += 1
except Exception as e:
self.log(f"Error scanning {directory}: {e}", "ERROR")
self.stats["errors"] += 1
self.log(f"Found {len(video_files)} video files in {directory}")
return video_files
def categorize_files(self):
"""Scan media directories and categorize files as managed or unmanaged"""
self.log("\n" + "="*80)
self.log("STEP 1: Querying Radarr and Sonarr for managed files")
self.log("="*80)
# Get managed files from Radarr and Sonarr
radarr_files = self.get_radarr_files()
sonarr_files = self.get_sonarr_files()
self.managed_files = radarr_files | sonarr_files
self.log(f"\nTotal managed files: {len(self.managed_files)}")
self.log("\n" + "="*80)
self.log("STEP 2: Scanning media directories")
self.log("="*80)
# Scan each media directory
for media_type, directory in MEDIA_DIRS.items():
dir_path = Path(directory)
video_files = self.scan_directory(dir_path, media_type)
# Categorize each file
for file_path in video_files:
file_str = str(file_path)
if file_str in self.managed_files:
self.stats["managed"] += 1
self.log(f" MANAGED: {file_path}", "DEBUG")
else:
self.stats["unmanaged"] += 1
self.unmanaged_files[media_type].append(file_path)
self.log(f" UNMANAGED: {file_path}", "DEBUG")
def create_processing_structure(self):
"""Create processing directory structure"""
self.log("\n" + "="*80)
self.log("STEP 3: Creating processing directory structure")
self.log("="*80)
processing_path = Path(PROCESSING_DIR)
for media_type in MEDIA_DIRS.keys():
subdir = processing_path / f"from-{media_type}"
if self.dry_run:
self.log(f"[DRY RUN] Would create directory: {subdir}")
else:
try:
subdir.mkdir(parents=True, exist_ok=True)
self.log(f"Created directory: {subdir}")
except Exception as e:
self.log(f"Failed to create directory {subdir}: {e}", "ERROR")
self.stats["errors"] += 1
def move_unmanaged_files(self):
"""Move unmanaged files to processing folder"""
self.log("\n" + "="*80)
self.log("STEP 4: Moving unmanaged files to processing folder")
self.log("="*80)
processing_path = Path(PROCESSING_DIR)
for media_type, files in self.unmanaged_files.items():
if not files:
self.log(f"No unmanaged files found in {media_type}")
continue
self.log(f"\nProcessing {len(files)} unmanaged files from {media_type}...")
source_dir = Path(MEDIA_DIRS[media_type])
dest_base = processing_path / f"from-{media_type}"
for file_path in files:
try:
# Preserve relative path structure
relative_path = file_path.relative_to(source_dir)
dest_path = dest_base / relative_path
if self.dry_run:
self.log(f"[DRY RUN] Would move: {file_path}")
self.log(f" To: {dest_path}")
else:
# Create destination directory if needed
dest_path.parent.mkdir(parents=True, exist_ok=True)
# Move the file
shutil.move(str(file_path), str(dest_path))
self.log(f"Moved: {file_path} -> {dest_path}")
self.stats["moved"] += 1
except Exception as e:
self.log(f"Failed to move {file_path}: {e}", "ERROR")
self.stats["errors"] += 1
def find_empty_directories(self) -> List[Path]:
"""Find directories that would be empty after moving files"""
self.log("\n" + "="*80)
self.log("STEP 5: Identifying empty directories")
self.log("="*80)
empty_dirs = []
for media_type, directory in MEDIA_DIRS.items():
dir_path = Path(directory)
if not dir_path.exists():
continue
try:
for root, dirs, files in os.walk(dir_path, topdown=False):
root_path = Path(root)
# Skip if this is the root media directory
if root_path == dir_path:
continue
# Check if directory is empty or would be empty
try:
contents = list(root_path.iterdir())
if not contents:
empty_dirs.append(root_path)
self.log(f"Empty directory: {root_path}")
except PermissionError:
self.log(f"Permission denied checking {root_path}", "WARNING")
except Exception as e:
self.log(f"Error finding empty directories in {directory}: {e}", "ERROR")
return empty_dirs
def write_log_file(self):
"""Write log file to processing directory"""
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
log_path = Path(PROCESSING_DIR) / f"cleanup-log-{timestamp}.txt"
try:
if self.dry_run:
self.log(f"\n[DRY RUN] Would write log file to: {log_path}")
else:
with open(log_path, 'w') as f:
f.write('\n'.join(self.log_entries))
self.log(f"\nLog file written to: {log_path}")
except Exception as e:
self.log(f"Failed to write log file: {e}", "ERROR")
def print_summary(self, empty_dirs: List[Path]):
"""Print summary report"""
self.log("\n" + "="*80)
self.log("SUMMARY REPORT")
self.log("="*80)
mode = "DRY RUN MODE" if self.dry_run else "EXECUTION MODE"
self.log(f"\nMode: {mode}")
self.log(f"\nTotal files scanned: {self.stats['total_scanned']}")
self.log(f"Files managed by Radarr/Sonarr: {self.stats['managed']}")
self.log(f"Unmanaged files found: {self.stats['unmanaged']}")
if not self.dry_run:
self.log(f"Files successfully moved: {self.stats['moved']}")
if self.stats['errors'] > 0:
self.log(f"Errors encountered: {self.stats['errors']}", "WARNING")
self.log("\nUnmanaged files by category:")
for media_type, files in self.unmanaged_files.items():
self.log(f" {media_type}: {len(files)} files")
if empty_dirs:
self.log(f"\nEmpty directories found: {len(empty_dirs)}")
self.log("(These directories can be manually removed if desired)")
self.log("\n" + "="*80)
def run(self):
"""Main execution method"""
self.log("="*80)
self.log("MEDIA ORGANIZATION SCRIPT")
self.log("="*80)
self.log(f"Mode: {'DRY RUN' if self.dry_run else 'EXECUTE'}")
self.log(f"Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# Step 1 & 2: Categorize files
self.categorize_files()
# Step 3: Create processing structure
self.create_processing_structure()
# Step 4: Move unmanaged files
self.move_unmanaged_files()
# Step 5: Find empty directories
empty_dirs = self.find_empty_directories()
# Print summary
self.print_summary(empty_dirs)
# Write log file
self.write_log_file()
return self.stats
def main():
parser = argparse.ArgumentParser(
description="Organize media files by comparing against Radarr/Sonarr managed files"
)
parser.add_argument(
"--execute",
action="store_true",
help="Actually move files (default is dry run mode)"
)
parser.add_argument(
"--quiet",
action="store_true",
help="Reduce verbosity (only show summary)"
)
args = parser.parse_args()
# Create organizer instance
organizer = MediaOrganizer(
dry_run=not args.execute,
verbose=not args.quiet
)
# Run the organization
stats = organizer.run()
# Exit with appropriate code
if stats["errors"] > 0:
sys.exit(1)
else:
sys.exit(0)
if __name__ == "__main__":
main()