#!/usr/bin/env python3 """ Media Organization Script Compares media files against Radarr/Sonarr managed files and moves unmanaged files to processing folder. """ import argparse import json import os import shutil import sys from datetime import datetime from pathlib import Path from typing import Dict, List, Set, Tuple import urllib.request import urllib.error # Configuration RADARR_URL = "http://10.4.2.16:7878" RADARR_API_KEY = "5e6796988abf4d6d819a2b506a44f422" SONARR_URL = "http://10.4.2.20:8989" SONARR_API_KEY = "b331fe18ec2144148a41645d9ce8b249" MEDIA_DIRS = { "movies": "/mnt/pve/elantris-media/movies", "tv": "/mnt/pve/elantris-media/tv", "anime": "/mnt/pve/elantris-media/anime" } # Path translation: Radarr/Sonarr see /media/* but files are at /mnt/pve/elantris-media/* PATH_MAPPING = { "/media/movies": "/mnt/pve/elantris-media/movies", "/media/tv": "/mnt/pve/elantris-media/tv", "/media/anime": "/mnt/pve/elantris-media/anime" } PROCESSING_DIR = "/mnt/pve/elantris-media/processing" VIDEO_EXTENSIONS = {'.mkv', '.mp4', '.avi', '.m4v', '.ts', '.wmv', '.flv', '.webm'} class MediaOrganizer: def __init__(self, dry_run: bool = True, verbose: bool = True): self.dry_run = dry_run self.verbose = verbose self.managed_files: Set[str] = set() self.unmanaged_files: Dict[str, List[Path]] = { "movies": [], "tv": [], "anime": [] } self.stats = { "total_scanned": 0, "managed": 0, "unmanaged": 0, "moved": 0, "errors": 0 } self.log_entries: List[str] = [] def log(self, message: str, level: str = "INFO"): """Log a message to console and internal log""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") log_entry = f"[{timestamp}] [{level}] {message}" self.log_entries.append(log_entry) if self.verbose: print(log_entry) def translate_path(self, path: str) -> str: """Translate Radarr/Sonarr paths to actual filesystem paths""" for api_path, real_path in PATH_MAPPING.items(): if path.startswith(api_path): return path.replace(api_path, real_path, 1) return path def api_request(self, url: str, api_key: str, endpoint: str) -> dict: """Make an API request to Radarr or Sonarr""" full_url = f"{url}/api/v3/{endpoint}" headers = {"X-Api-Key": api_key} try: req = urllib.request.Request(full_url, headers=headers) with urllib.request.urlopen(req, timeout=30) as response: return json.loads(response.read().decode()) except urllib.error.URLError as e: self.log(f"API request failed for {full_url}: {e}", "ERROR") return None except json.JSONDecodeError as e: self.log(f"Failed to decode JSON response from {full_url}: {e}", "ERROR") return None def get_radarr_files(self) -> Set[str]: """Get all file paths managed by Radarr""" self.log("Querying Radarr for managed movie files...") managed_files = set() movies = self.api_request(RADARR_URL, RADARR_API_KEY, "movie") if not movies: self.log("Failed to retrieve movies from Radarr", "ERROR") return managed_files for movie in movies: # Get the movie file path if it exists if movie.get("hasFile") and "movieFile" in movie: file_path = movie["movieFile"].get("path") if file_path: # Translate API path to real filesystem path real_path = self.translate_path(file_path) managed_files.add(real_path) self.log(f" Radarr manages: {file_path} -> {real_path}", "DEBUG") self.log(f"Found {len(managed_files)} files managed by Radarr") return managed_files def get_sonarr_files(self) -> Set[str]: """Get all file paths managed by Sonarr""" self.log("Querying Sonarr for managed TV series files...") managed_files = set() series = self.api_request(SONARR_URL, SONARR_API_KEY, "series") if not series: self.log("Failed to retrieve series from Sonarr", "ERROR") return managed_files for show in series: series_id = show.get("id") if not series_id: continue # Get episode files for this series episode_files = self.api_request( SONARR_URL, SONARR_API_KEY, f"episodefile?seriesId={series_id}" ) if episode_files: for episode_file in episode_files: file_path = episode_file.get("path") if file_path: # Translate API path to real filesystem path real_path = self.translate_path(file_path) managed_files.add(real_path) self.log(f" Sonarr manages: {file_path} -> {real_path}", "DEBUG") self.log(f"Found {len(managed_files)} files managed by Sonarr") return managed_files def scan_directory(self, directory: Path, media_type: str) -> List[Path]: """Scan a directory recursively for video files""" self.log(f"Scanning {directory} for video files...") video_files = [] if not directory.exists(): self.log(f"Directory does not exist: {directory}", "WARNING") return video_files try: for root, dirs, files in os.walk(directory): for file in files: file_path = Path(root) / file if file_path.suffix.lower() in VIDEO_EXTENSIONS: video_files.append(file_path) self.stats["total_scanned"] += 1 except PermissionError as e: self.log(f"Permission denied accessing {directory}: {e}", "ERROR") self.stats["errors"] += 1 except Exception as e: self.log(f"Error scanning {directory}: {e}", "ERROR") self.stats["errors"] += 1 self.log(f"Found {len(video_files)} video files in {directory}") return video_files def categorize_files(self): """Scan media directories and categorize files as managed or unmanaged""" self.log("\n" + "="*80) self.log("STEP 1: Querying Radarr and Sonarr for managed files") self.log("="*80) # Get managed files from Radarr and Sonarr radarr_files = self.get_radarr_files() sonarr_files = self.get_sonarr_files() self.managed_files = radarr_files | sonarr_files self.log(f"\nTotal managed files: {len(self.managed_files)}") self.log("\n" + "="*80) self.log("STEP 2: Scanning media directories") self.log("="*80) # Scan each media directory for media_type, directory in MEDIA_DIRS.items(): dir_path = Path(directory) video_files = self.scan_directory(dir_path, media_type) # Categorize each file for file_path in video_files: file_str = str(file_path) if file_str in self.managed_files: self.stats["managed"] += 1 self.log(f" MANAGED: {file_path}", "DEBUG") else: self.stats["unmanaged"] += 1 self.unmanaged_files[media_type].append(file_path) self.log(f" UNMANAGED: {file_path}", "DEBUG") def create_processing_structure(self): """Create processing directory structure""" self.log("\n" + "="*80) self.log("STEP 3: Creating processing directory structure") self.log("="*80) processing_path = Path(PROCESSING_DIR) for media_type in MEDIA_DIRS.keys(): subdir = processing_path / f"from-{media_type}" if self.dry_run: self.log(f"[DRY RUN] Would create directory: {subdir}") else: try: subdir.mkdir(parents=True, exist_ok=True) self.log(f"Created directory: {subdir}") except Exception as e: self.log(f"Failed to create directory {subdir}: {e}", "ERROR") self.stats["errors"] += 1 def move_unmanaged_files(self): """Move unmanaged files to processing folder""" self.log("\n" + "="*80) self.log("STEP 4: Moving unmanaged files to processing folder") self.log("="*80) processing_path = Path(PROCESSING_DIR) for media_type, files in self.unmanaged_files.items(): if not files: self.log(f"No unmanaged files found in {media_type}") continue self.log(f"\nProcessing {len(files)} unmanaged files from {media_type}...") source_dir = Path(MEDIA_DIRS[media_type]) dest_base = processing_path / f"from-{media_type}" for file_path in files: try: # Preserve relative path structure relative_path = file_path.relative_to(source_dir) dest_path = dest_base / relative_path if self.dry_run: self.log(f"[DRY RUN] Would move: {file_path}") self.log(f" To: {dest_path}") else: # Create destination directory if needed dest_path.parent.mkdir(parents=True, exist_ok=True) # Move the file shutil.move(str(file_path), str(dest_path)) self.log(f"Moved: {file_path} -> {dest_path}") self.stats["moved"] += 1 except Exception as e: self.log(f"Failed to move {file_path}: {e}", "ERROR") self.stats["errors"] += 1 def find_empty_directories(self) -> List[Path]: """Find directories that would be empty after moving files""" self.log("\n" + "="*80) self.log("STEP 5: Identifying empty directories") self.log("="*80) empty_dirs = [] for media_type, directory in MEDIA_DIRS.items(): dir_path = Path(directory) if not dir_path.exists(): continue try: for root, dirs, files in os.walk(dir_path, topdown=False): root_path = Path(root) # Skip if this is the root media directory if root_path == dir_path: continue # Check if directory is empty or would be empty try: contents = list(root_path.iterdir()) if not contents: empty_dirs.append(root_path) self.log(f"Empty directory: {root_path}") except PermissionError: self.log(f"Permission denied checking {root_path}", "WARNING") except Exception as e: self.log(f"Error finding empty directories in {directory}: {e}", "ERROR") return empty_dirs def write_log_file(self): """Write log file to processing directory""" timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") log_path = Path(PROCESSING_DIR) / f"cleanup-log-{timestamp}.txt" try: if self.dry_run: self.log(f"\n[DRY RUN] Would write log file to: {log_path}") else: with open(log_path, 'w') as f: f.write('\n'.join(self.log_entries)) self.log(f"\nLog file written to: {log_path}") except Exception as e: self.log(f"Failed to write log file: {e}", "ERROR") def print_summary(self, empty_dirs: List[Path]): """Print summary report""" self.log("\n" + "="*80) self.log("SUMMARY REPORT") self.log("="*80) mode = "DRY RUN MODE" if self.dry_run else "EXECUTION MODE" self.log(f"\nMode: {mode}") self.log(f"\nTotal files scanned: {self.stats['total_scanned']}") self.log(f"Files managed by Radarr/Sonarr: {self.stats['managed']}") self.log(f"Unmanaged files found: {self.stats['unmanaged']}") if not self.dry_run: self.log(f"Files successfully moved: {self.stats['moved']}") if self.stats['errors'] > 0: self.log(f"Errors encountered: {self.stats['errors']}", "WARNING") self.log("\nUnmanaged files by category:") for media_type, files in self.unmanaged_files.items(): self.log(f" {media_type}: {len(files)} files") if empty_dirs: self.log(f"\nEmpty directories found: {len(empty_dirs)}") self.log("(These directories can be manually removed if desired)") self.log("\n" + "="*80) def run(self): """Main execution method""" self.log("="*80) self.log("MEDIA ORGANIZATION SCRIPT") self.log("="*80) self.log(f"Mode: {'DRY RUN' if self.dry_run else 'EXECUTE'}") self.log(f"Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") # Step 1 & 2: Categorize files self.categorize_files() # Step 3: Create processing structure self.create_processing_structure() # Step 4: Move unmanaged files self.move_unmanaged_files() # Step 5: Find empty directories empty_dirs = self.find_empty_directories() # Print summary self.print_summary(empty_dirs) # Write log file self.write_log_file() return self.stats def main(): parser = argparse.ArgumentParser( description="Organize media files by comparing against Radarr/Sonarr managed files" ) parser.add_argument( "--execute", action="store_true", help="Actually move files (default is dry run mode)" ) parser.add_argument( "--quiet", action="store_true", help="Reduce verbosity (only show summary)" ) args = parser.parse_args() # Create organizer instance organizer = MediaOrganizer( dry_run=not args.execute, verbose=not args.quiet ) # Run the organization stats = organizer.run() # Exit with appropriate code if stats["errors"] > 0: sys.exit(1) else: sys.exit(0) if __name__ == "__main__": main()