Initial commit: KavCorp infrastructure documentation
- CLAUDE.md: Project configuration for Claude Code - docs/: Infrastructure documentation - INFRASTRUCTURE.md: Service map, storage, network - CONFIGURATIONS.md: Service configs and credentials - CHANGELOG.md: Change history - DECISIONS.md: Architecture decisions - TASKS.md: Task tracking - scripts/: Automation scripts 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
409
scripts/cleanup/organize-media.py
Executable file
409
scripts/cleanup/organize-media.py
Executable file
@@ -0,0 +1,409 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Media Organization Script
|
||||
Compares media files against Radarr/Sonarr managed files and moves unmanaged files to processing folder.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import sys
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Set, Tuple
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
|
||||
|
||||
# Configuration
|
||||
RADARR_URL = "http://10.4.2.16:7878"
|
||||
RADARR_API_KEY = "5e6796988abf4d6d819a2b506a44f422"
|
||||
SONARR_URL = "http://10.4.2.15:8989"
|
||||
SONARR_API_KEY = "b331fe18ec2144148a41645d9ce8b249"
|
||||
|
||||
MEDIA_DIRS = {
|
||||
"movies": "/mnt/pve/elantris-media/movies",
|
||||
"tv": "/mnt/pve/elantris-media/tv",
|
||||
"anime": "/mnt/pve/elantris-media/anime"
|
||||
}
|
||||
|
||||
# Path translation: Radarr/Sonarr see /media/* but files are at /mnt/pve/elantris-media/*
|
||||
PATH_MAPPING = {
|
||||
"/media/movies": "/mnt/pve/elantris-media/movies",
|
||||
"/media/tv": "/mnt/pve/elantris-media/tv",
|
||||
"/media/anime": "/mnt/pve/elantris-media/anime"
|
||||
}
|
||||
|
||||
PROCESSING_DIR = "/mnt/pve/elantris-media/processing"
|
||||
VIDEO_EXTENSIONS = {'.mkv', '.mp4', '.avi', '.m4v', '.ts', '.wmv', '.flv', '.webm'}
|
||||
|
||||
|
||||
class MediaOrganizer:
|
||||
def __init__(self, dry_run: bool = True, verbose: bool = True):
|
||||
self.dry_run = dry_run
|
||||
self.verbose = verbose
|
||||
self.managed_files: Set[str] = set()
|
||||
self.unmanaged_files: Dict[str, List[Path]] = {
|
||||
"movies": [],
|
||||
"tv": [],
|
||||
"anime": []
|
||||
}
|
||||
self.stats = {
|
||||
"total_scanned": 0,
|
||||
"managed": 0,
|
||||
"unmanaged": 0,
|
||||
"moved": 0,
|
||||
"errors": 0
|
||||
}
|
||||
self.log_entries: List[str] = []
|
||||
|
||||
def log(self, message: str, level: str = "INFO"):
|
||||
"""Log a message to console and internal log"""
|
||||
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
log_entry = f"[{timestamp}] [{level}] {message}"
|
||||
self.log_entries.append(log_entry)
|
||||
if self.verbose:
|
||||
print(log_entry)
|
||||
|
||||
def translate_path(self, path: str) -> str:
|
||||
"""Translate Radarr/Sonarr paths to actual filesystem paths"""
|
||||
for api_path, real_path in PATH_MAPPING.items():
|
||||
if path.startswith(api_path):
|
||||
return path.replace(api_path, real_path, 1)
|
||||
return path
|
||||
|
||||
def api_request(self, url: str, api_key: str, endpoint: str) -> dict:
|
||||
"""Make an API request to Radarr or Sonarr"""
|
||||
full_url = f"{url}/api/v3/{endpoint}"
|
||||
headers = {"X-Api-Key": api_key}
|
||||
|
||||
try:
|
||||
req = urllib.request.Request(full_url, headers=headers)
|
||||
with urllib.request.urlopen(req, timeout=30) as response:
|
||||
return json.loads(response.read().decode())
|
||||
except urllib.error.URLError as e:
|
||||
self.log(f"API request failed for {full_url}: {e}", "ERROR")
|
||||
return None
|
||||
except json.JSONDecodeError as e:
|
||||
self.log(f"Failed to decode JSON response from {full_url}: {e}", "ERROR")
|
||||
return None
|
||||
|
||||
def get_radarr_files(self) -> Set[str]:
|
||||
"""Get all file paths managed by Radarr"""
|
||||
self.log("Querying Radarr for managed movie files...")
|
||||
managed_files = set()
|
||||
|
||||
movies = self.api_request(RADARR_URL, RADARR_API_KEY, "movie")
|
||||
if not movies:
|
||||
self.log("Failed to retrieve movies from Radarr", "ERROR")
|
||||
return managed_files
|
||||
|
||||
for movie in movies:
|
||||
# Get the movie file path if it exists
|
||||
if movie.get("hasFile") and "movieFile" in movie:
|
||||
file_path = movie["movieFile"].get("path")
|
||||
if file_path:
|
||||
# Translate API path to real filesystem path
|
||||
real_path = self.translate_path(file_path)
|
||||
managed_files.add(real_path)
|
||||
self.log(f" Radarr manages: {file_path} -> {real_path}", "DEBUG")
|
||||
|
||||
self.log(f"Found {len(managed_files)} files managed by Radarr")
|
||||
return managed_files
|
||||
|
||||
def get_sonarr_files(self) -> Set[str]:
|
||||
"""Get all file paths managed by Sonarr"""
|
||||
self.log("Querying Sonarr for managed TV series files...")
|
||||
managed_files = set()
|
||||
|
||||
series = self.api_request(SONARR_URL, SONARR_API_KEY, "series")
|
||||
if not series:
|
||||
self.log("Failed to retrieve series from Sonarr", "ERROR")
|
||||
return managed_files
|
||||
|
||||
for show in series:
|
||||
series_id = show.get("id")
|
||||
if not series_id:
|
||||
continue
|
||||
|
||||
# Get episode files for this series
|
||||
episode_files = self.api_request(
|
||||
SONARR_URL,
|
||||
SONARR_API_KEY,
|
||||
f"episodefile?seriesId={series_id}"
|
||||
)
|
||||
|
||||
if episode_files:
|
||||
for episode_file in episode_files:
|
||||
file_path = episode_file.get("path")
|
||||
if file_path:
|
||||
# Translate API path to real filesystem path
|
||||
real_path = self.translate_path(file_path)
|
||||
managed_files.add(real_path)
|
||||
self.log(f" Sonarr manages: {file_path} -> {real_path}", "DEBUG")
|
||||
|
||||
self.log(f"Found {len(managed_files)} files managed by Sonarr")
|
||||
return managed_files
|
||||
|
||||
def scan_directory(self, directory: Path, media_type: str) -> List[Path]:
|
||||
"""Scan a directory recursively for video files"""
|
||||
self.log(f"Scanning {directory} for video files...")
|
||||
video_files = []
|
||||
|
||||
if not directory.exists():
|
||||
self.log(f"Directory does not exist: {directory}", "WARNING")
|
||||
return video_files
|
||||
|
||||
try:
|
||||
for root, dirs, files in os.walk(directory):
|
||||
for file in files:
|
||||
file_path = Path(root) / file
|
||||
if file_path.suffix.lower() in VIDEO_EXTENSIONS:
|
||||
video_files.append(file_path)
|
||||
self.stats["total_scanned"] += 1
|
||||
except PermissionError as e:
|
||||
self.log(f"Permission denied accessing {directory}: {e}", "ERROR")
|
||||
self.stats["errors"] += 1
|
||||
except Exception as e:
|
||||
self.log(f"Error scanning {directory}: {e}", "ERROR")
|
||||
self.stats["errors"] += 1
|
||||
|
||||
self.log(f"Found {len(video_files)} video files in {directory}")
|
||||
return video_files
|
||||
|
||||
def categorize_files(self):
|
||||
"""Scan media directories and categorize files as managed or unmanaged"""
|
||||
self.log("\n" + "="*80)
|
||||
self.log("STEP 1: Querying Radarr and Sonarr for managed files")
|
||||
self.log("="*80)
|
||||
|
||||
# Get managed files from Radarr and Sonarr
|
||||
radarr_files = self.get_radarr_files()
|
||||
sonarr_files = self.get_sonarr_files()
|
||||
self.managed_files = radarr_files | sonarr_files
|
||||
|
||||
self.log(f"\nTotal managed files: {len(self.managed_files)}")
|
||||
|
||||
self.log("\n" + "="*80)
|
||||
self.log("STEP 2: Scanning media directories")
|
||||
self.log("="*80)
|
||||
|
||||
# Scan each media directory
|
||||
for media_type, directory in MEDIA_DIRS.items():
|
||||
dir_path = Path(directory)
|
||||
video_files = self.scan_directory(dir_path, media_type)
|
||||
|
||||
# Categorize each file
|
||||
for file_path in video_files:
|
||||
file_str = str(file_path)
|
||||
if file_str in self.managed_files:
|
||||
self.stats["managed"] += 1
|
||||
self.log(f" MANAGED: {file_path}", "DEBUG")
|
||||
else:
|
||||
self.stats["unmanaged"] += 1
|
||||
self.unmanaged_files[media_type].append(file_path)
|
||||
self.log(f" UNMANAGED: {file_path}", "DEBUG")
|
||||
|
||||
def create_processing_structure(self):
|
||||
"""Create processing directory structure"""
|
||||
self.log("\n" + "="*80)
|
||||
self.log("STEP 3: Creating processing directory structure")
|
||||
self.log("="*80)
|
||||
|
||||
processing_path = Path(PROCESSING_DIR)
|
||||
|
||||
for media_type in MEDIA_DIRS.keys():
|
||||
subdir = processing_path / f"from-{media_type}"
|
||||
|
||||
if self.dry_run:
|
||||
self.log(f"[DRY RUN] Would create directory: {subdir}")
|
||||
else:
|
||||
try:
|
||||
subdir.mkdir(parents=True, exist_ok=True)
|
||||
self.log(f"Created directory: {subdir}")
|
||||
except Exception as e:
|
||||
self.log(f"Failed to create directory {subdir}: {e}", "ERROR")
|
||||
self.stats["errors"] += 1
|
||||
|
||||
def move_unmanaged_files(self):
|
||||
"""Move unmanaged files to processing folder"""
|
||||
self.log("\n" + "="*80)
|
||||
self.log("STEP 4: Moving unmanaged files to processing folder")
|
||||
self.log("="*80)
|
||||
|
||||
processing_path = Path(PROCESSING_DIR)
|
||||
|
||||
for media_type, files in self.unmanaged_files.items():
|
||||
if not files:
|
||||
self.log(f"No unmanaged files found in {media_type}")
|
||||
continue
|
||||
|
||||
self.log(f"\nProcessing {len(files)} unmanaged files from {media_type}...")
|
||||
source_dir = Path(MEDIA_DIRS[media_type])
|
||||
dest_base = processing_path / f"from-{media_type}"
|
||||
|
||||
for file_path in files:
|
||||
try:
|
||||
# Preserve relative path structure
|
||||
relative_path = file_path.relative_to(source_dir)
|
||||
dest_path = dest_base / relative_path
|
||||
|
||||
if self.dry_run:
|
||||
self.log(f"[DRY RUN] Would move: {file_path}")
|
||||
self.log(f" To: {dest_path}")
|
||||
else:
|
||||
# Create destination directory if needed
|
||||
dest_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Move the file
|
||||
shutil.move(str(file_path), str(dest_path))
|
||||
self.log(f"Moved: {file_path} -> {dest_path}")
|
||||
self.stats["moved"] += 1
|
||||
|
||||
except Exception as e:
|
||||
self.log(f"Failed to move {file_path}: {e}", "ERROR")
|
||||
self.stats["errors"] += 1
|
||||
|
||||
def find_empty_directories(self) -> List[Path]:
|
||||
"""Find directories that would be empty after moving files"""
|
||||
self.log("\n" + "="*80)
|
||||
self.log("STEP 5: Identifying empty directories")
|
||||
self.log("="*80)
|
||||
|
||||
empty_dirs = []
|
||||
|
||||
for media_type, directory in MEDIA_DIRS.items():
|
||||
dir_path = Path(directory)
|
||||
if not dir_path.exists():
|
||||
continue
|
||||
|
||||
try:
|
||||
for root, dirs, files in os.walk(dir_path, topdown=False):
|
||||
root_path = Path(root)
|
||||
|
||||
# Skip if this is the root media directory
|
||||
if root_path == dir_path:
|
||||
continue
|
||||
|
||||
# Check if directory is empty or would be empty
|
||||
try:
|
||||
contents = list(root_path.iterdir())
|
||||
if not contents:
|
||||
empty_dirs.append(root_path)
|
||||
self.log(f"Empty directory: {root_path}")
|
||||
except PermissionError:
|
||||
self.log(f"Permission denied checking {root_path}", "WARNING")
|
||||
|
||||
except Exception as e:
|
||||
self.log(f"Error finding empty directories in {directory}: {e}", "ERROR")
|
||||
|
||||
return empty_dirs
|
||||
|
||||
def write_log_file(self):
|
||||
"""Write log file to processing directory"""
|
||||
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
|
||||
log_path = Path(PROCESSING_DIR) / f"cleanup-log-{timestamp}.txt"
|
||||
|
||||
try:
|
||||
if self.dry_run:
|
||||
self.log(f"\n[DRY RUN] Would write log file to: {log_path}")
|
||||
else:
|
||||
with open(log_path, 'w') as f:
|
||||
f.write('\n'.join(self.log_entries))
|
||||
self.log(f"\nLog file written to: {log_path}")
|
||||
except Exception as e:
|
||||
self.log(f"Failed to write log file: {e}", "ERROR")
|
||||
|
||||
def print_summary(self, empty_dirs: List[Path]):
|
||||
"""Print summary report"""
|
||||
self.log("\n" + "="*80)
|
||||
self.log("SUMMARY REPORT")
|
||||
self.log("="*80)
|
||||
|
||||
mode = "DRY RUN MODE" if self.dry_run else "EXECUTION MODE"
|
||||
self.log(f"\nMode: {mode}")
|
||||
self.log(f"\nTotal files scanned: {self.stats['total_scanned']}")
|
||||
self.log(f"Files managed by Radarr/Sonarr: {self.stats['managed']}")
|
||||
self.log(f"Unmanaged files found: {self.stats['unmanaged']}")
|
||||
|
||||
if not self.dry_run:
|
||||
self.log(f"Files successfully moved: {self.stats['moved']}")
|
||||
|
||||
if self.stats['errors'] > 0:
|
||||
self.log(f"Errors encountered: {self.stats['errors']}", "WARNING")
|
||||
|
||||
self.log("\nUnmanaged files by category:")
|
||||
for media_type, files in self.unmanaged_files.items():
|
||||
self.log(f" {media_type}: {len(files)} files")
|
||||
|
||||
if empty_dirs:
|
||||
self.log(f"\nEmpty directories found: {len(empty_dirs)}")
|
||||
self.log("(These directories can be manually removed if desired)")
|
||||
|
||||
self.log("\n" + "="*80)
|
||||
|
||||
def run(self):
|
||||
"""Main execution method"""
|
||||
self.log("="*80)
|
||||
self.log("MEDIA ORGANIZATION SCRIPT")
|
||||
self.log("="*80)
|
||||
self.log(f"Mode: {'DRY RUN' if self.dry_run else 'EXECUTE'}")
|
||||
self.log(f"Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||||
|
||||
# Step 1 & 2: Categorize files
|
||||
self.categorize_files()
|
||||
|
||||
# Step 3: Create processing structure
|
||||
self.create_processing_structure()
|
||||
|
||||
# Step 4: Move unmanaged files
|
||||
self.move_unmanaged_files()
|
||||
|
||||
# Step 5: Find empty directories
|
||||
empty_dirs = self.find_empty_directories()
|
||||
|
||||
# Print summary
|
||||
self.print_summary(empty_dirs)
|
||||
|
||||
# Write log file
|
||||
self.write_log_file()
|
||||
|
||||
return self.stats
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Organize media files by comparing against Radarr/Sonarr managed files"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--execute",
|
||||
action="store_true",
|
||||
help="Actually move files (default is dry run mode)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--quiet",
|
||||
action="store_true",
|
||||
help="Reduce verbosity (only show summary)"
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Create organizer instance
|
||||
organizer = MediaOrganizer(
|
||||
dry_run=not args.execute,
|
||||
verbose=not args.quiet
|
||||
)
|
||||
|
||||
# Run the organization
|
||||
stats = organizer.run()
|
||||
|
||||
# Exit with appropriate code
|
||||
if stats["errors"] > 0:
|
||||
sys.exit(1)
|
||||
else:
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user