Files
HS-Rename/engine/pipeline.py
T
2026-03-03 22:49:26 -06:00

163 lines
5.0 KiB
Python

"""
Apply a list of rules to a list of filenames; produce preview and perform renames.
Undo: save a log after renames; undo reverses renames from that log.
"""
import json
import os
from pathlib import Path
from typing import List, Optional
from .rules import Rule
UNDO_FILENAME = ".hsrename-undo.json"
def _split_name(name: str) -> tuple[str, str]:
if "." in name and not name.startswith("."):
idx = name.rfind(".")
return name[:idx], name[idx:]
return name, ""
def apply_pipeline(
name: str,
rules: List[Rule],
index: int,
total: int,
) -> str:
"""Apply all enabled rules to a single filename (no path). Returns new filename."""
stem, ext = _split_name(name)
for r in rules:
if not r.enabled:
continue
stem, ext = r.apply(stem, ext, index, total, original_name=name)
return stem + ext
def compute_preview(
names: List[str],
rules: List[Rule],
) -> List[tuple[str, str]]:
"""
For each name (filename only), compute the new name after rules.
Returns list of (original_name, new_name). Detects collisions.
"""
result = []
for i, name in enumerate(names):
new_name = apply_pipeline(name, rules, i, len(names))
result.append((name, new_name))
return result
def perform_renames(
base_dir: str,
renames: List[tuple[str, str]],
dry_run: bool = False,
) -> List[tuple[str, str, Optional[str]]]:
"""
Perform renames in base_dir. Each item is (old_name, new_name).
Returns list of (old_path, new_path, error_message or None).
Uses two-pass: first rename to temp names to avoid collisions.
"""
base = Path(base_dir)
results = []
# Two-pass to avoid overwriting: first move to temporary names, then to final
temp_suffix = ".bru_tmp"
step1 = []
for old_name, new_name in renames:
if old_name == new_name:
continue
old_path = base / old_name
if not old_path.exists():
results.append((str(old_path), "", "File not found"))
continue
step1.append((old_path, new_name))
# Assign temp names that don't clash with each other or final names
final_names = {n for _, n in step1}
temp_map = []
for i, (old_path, new_name) in enumerate(step1):
temp_name = f"__temp_{i}_{old_path.name}{temp_suffix}"
while temp_name in final_names or (base / temp_name).exists():
i += 1
temp_name = f"__temp_{i}_{old_path.name}{temp_suffix}"
temp_map.append((old_path, base / temp_name, base / new_name))
if dry_run:
for old_p, temp_p, new_p in temp_map:
results.append((str(old_p), str(new_p), None))
return results
# Execute: first to temp, then to final
for old_p, temp_p, new_p in temp_map:
try:
old_p.rename(temp_p)
except OSError as e:
results.append((str(old_p), str(new_p), str(e)))
for old_p, temp_p, new_p in temp_map:
if not temp_p.exists():
continue
try:
temp_p.rename(new_p)
results.append((str(old_p), str(new_p), None))
except OSError as e:
results.append((str(old_p), str(new_p), str(e)))
try:
temp_p.rename(old_p)
except OSError:
pass
return results
def undo_log_path(base_dir: str) -> Path:
return Path(base_dir) / UNDO_FILENAME
def save_undo_log(base_dir: str, renames: List[tuple[str, str]]) -> None:
"""Save (old_name, new_name) list so undo can reverse it. Overwrites any existing log for this folder."""
path = undo_log_path(base_dir)
data = {"renames": renames}
try:
path.write_text(json.dumps(data, indent=0), encoding="utf-8")
except OSError:
pass
def load_undo_log(base_dir: str) -> Optional[List[tuple[str, str]]]:
"""Load the last rename log for this folder. Returns list of (old_name, new_name) or None."""
path = undo_log_path(base_dir)
if not path.is_file():
return None
try:
data = json.loads(path.read_text(encoding="utf-8"))
return data.get("renames")
except (OSError, json.JSONDecodeError, TypeError):
return None
def remove_undo_log(base_dir: str) -> None:
"""Remove the undo log file for this folder."""
path = undo_log_path(base_dir)
try:
path.unlink()
except OSError:
pass
def perform_undo(base_dir: str) -> List[tuple[str, str, Optional[str]]]:
"""
Undo the last rename in base_dir using the saved log.
Returns same format as perform_renames. Does not remove the log on failure.
"""
renames = load_undo_log(base_dir)
if not renames:
return []
# Reverse: rename current (new) back to original (old). Order must be reversed for chains.
undo_list = [(new_name, old_name) for (old_name, new_name) in renames]
undo_list.reverse()
results = perform_renames(base_dir, undo_list, dry_run=False)
if not any(r[2] for r in results):
remove_undo_log(base_dir)
return results