""" Apply a list of rules to a list of filenames; produce preview and perform renames. Undo: save a log after renames; undo reverses renames from that log. """ import json import os from pathlib import Path from typing import List, Optional from .rules import Rule UNDO_FILENAME = ".hsrename-undo.json" def _split_name(name: str) -> tuple[str, str]: if "." in name and not name.startswith("."): idx = name.rfind(".") return name[:idx], name[idx:] return name, "" def apply_pipeline( name: str, rules: List[Rule], index: int, total: int, ) -> str: """Apply all enabled rules to a single filename (no path). Returns new filename.""" stem, ext = _split_name(name) for r in rules: if not r.enabled: continue stem, ext = r.apply(stem, ext, index, total, original_name=name) return stem + ext def compute_preview( names: List[str], rules: List[Rule], ) -> List[tuple[str, str]]: """ For each name (filename only), compute the new name after rules. Returns list of (original_name, new_name). Detects collisions. """ result = [] for i, name in enumerate(names): new_name = apply_pipeline(name, rules, i, len(names)) result.append((name, new_name)) return result def perform_renames( base_dir: str, renames: List[tuple[str, str]], dry_run: bool = False, ) -> List[tuple[str, str, Optional[str]]]: """ Perform renames in base_dir. Each item is (old_name, new_name). Returns list of (old_path, new_path, error_message or None). Uses two-pass: first rename to temp names to avoid collisions. """ base = Path(base_dir) results = [] # Two-pass to avoid overwriting: first move to temporary names, then to final temp_suffix = ".bru_tmp" step1 = [] for old_name, new_name in renames: if old_name == new_name: continue old_path = base / old_name if not old_path.exists(): results.append((str(old_path), "", "File not found")) continue step1.append((old_path, new_name)) # Assign temp names that don't clash with each other or final names final_names = {n for _, n in step1} temp_map = [] for i, (old_path, new_name) in enumerate(step1): temp_name = f"__temp_{i}_{old_path.name}{temp_suffix}" while temp_name in final_names or (base / temp_name).exists(): i += 1 temp_name = f"__temp_{i}_{old_path.name}{temp_suffix}" temp_map.append((old_path, base / temp_name, base / new_name)) if dry_run: for old_p, temp_p, new_p in temp_map: results.append((str(old_p), str(new_p), None)) return results # Execute: first to temp, then to final for old_p, temp_p, new_p in temp_map: try: old_p.rename(temp_p) except OSError as e: results.append((str(old_p), str(new_p), str(e))) for old_p, temp_p, new_p in temp_map: if not temp_p.exists(): continue try: temp_p.rename(new_p) results.append((str(old_p), str(new_p), None)) except OSError as e: results.append((str(old_p), str(new_p), str(e))) try: temp_p.rename(old_p) except OSError: pass return results def undo_log_path(base_dir: str) -> Path: return Path(base_dir) / UNDO_FILENAME def save_undo_log(base_dir: str, renames: List[tuple[str, str]]) -> None: """Save (old_name, new_name) list so undo can reverse it. Overwrites any existing log for this folder.""" path = undo_log_path(base_dir) data = {"renames": renames} try: path.write_text(json.dumps(data, indent=0), encoding="utf-8") except OSError: pass def load_undo_log(base_dir: str) -> Optional[List[tuple[str, str]]]: """Load the last rename log for this folder. Returns list of (old_name, new_name) or None.""" path = undo_log_path(base_dir) if not path.is_file(): return None try: data = json.loads(path.read_text(encoding="utf-8")) return data.get("renames") except (OSError, json.JSONDecodeError, TypeError): return None def remove_undo_log(base_dir: str) -> None: """Remove the undo log file for this folder.""" path = undo_log_path(base_dir) try: path.unlink() except OSError: pass def perform_undo(base_dir: str) -> List[tuple[str, str, Optional[str]]]: """ Undo the last rename in base_dir using the saved log. Returns same format as perform_renames. Does not remove the log on failure. """ renames = load_undo_log(base_dir) if not renames: return [] # Reverse: rename current (new) back to original (old). Order must be reversed for chains. undo_list = [(new_name, old_name) for (old_name, new_name) in renames] undo_list.reverse() results = perform_renames(base_dir, undo_list, dry_run=False) if not any(r[2] for r in results): remove_undo_log(base_dir) return results