Add CSV mapping rule and Undo last rename feature
Made-with: Cursor
This commit is contained in:
+2
-1
@@ -1,4 +1,4 @@
|
||||
from .rules import Rule, ReplaceRule, InsertRule, RemoveRule, CaseRule, NumberingRule, EpisodeRenumberRule, RegexRule, PrefixSuffixRule
|
||||
from .rules import Rule, ReplaceRule, InsertRule, RemoveRule, CaseRule, NumberingRule, EpisodeRenumberRule, RegexRule, PrefixSuffixRule, CsvMappingRule
|
||||
from .pipeline import apply_pipeline, compute_preview
|
||||
|
||||
__all__ = [
|
||||
@@ -11,6 +11,7 @@ __all__ = [
|
||||
"EpisodeRenumberRule",
|
||||
"RegexRule",
|
||||
"PrefixSuffixRule",
|
||||
"CsvMappingRule",
|
||||
"apply_pipeline",
|
||||
"compute_preview",
|
||||
]
|
||||
|
||||
+57
-1
@@ -1,12 +1,16 @@
|
||||
"""
|
||||
Apply a list of rules to a list of filenames; produce preview and perform renames.
|
||||
Undo: save a log after renames; undo reverses renames from that log.
|
||||
"""
|
||||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import List, Optional
|
||||
|
||||
from .rules import Rule
|
||||
|
||||
UNDO_FILENAME = ".bulk-renamer-undo.json"
|
||||
|
||||
|
||||
def _split_name(name: str) -> tuple[str, str]:
|
||||
if "." in name and not name.startswith("."):
|
||||
@@ -26,7 +30,7 @@ def apply_pipeline(
|
||||
for r in rules:
|
||||
if not r.enabled:
|
||||
continue
|
||||
stem, ext = r.apply(stem, ext, index, total)
|
||||
stem, ext = r.apply(stem, ext, index, total, original_name=name)
|
||||
return stem + ext
|
||||
|
||||
|
||||
@@ -104,3 +108,55 @@ def perform_renames(
|
||||
pass
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def undo_log_path(base_dir: str) -> Path:
|
||||
return Path(base_dir) / UNDO_FILENAME
|
||||
|
||||
|
||||
def save_undo_log(base_dir: str, renames: List[tuple[str, str]]) -> None:
|
||||
"""Save (old_name, new_name) list so undo can reverse it. Overwrites any existing log for this folder."""
|
||||
path = undo_log_path(base_dir)
|
||||
data = {"renames": renames}
|
||||
try:
|
||||
path.write_text(json.dumps(data, indent=0), encoding="utf-8")
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
def load_undo_log(base_dir: str) -> Optional[List[tuple[str, str]]]:
|
||||
"""Load the last rename log for this folder. Returns list of (old_name, new_name) or None."""
|
||||
path = undo_log_path(base_dir)
|
||||
if not path.is_file():
|
||||
return None
|
||||
try:
|
||||
data = json.loads(path.read_text(encoding="utf-8"))
|
||||
return data.get("renames")
|
||||
except (OSError, json.JSONDecodeError, TypeError):
|
||||
return None
|
||||
|
||||
|
||||
def remove_undo_log(base_dir: str) -> None:
|
||||
"""Remove the undo log file for this folder."""
|
||||
path = undo_log_path(base_dir)
|
||||
try:
|
||||
path.unlink()
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
def perform_undo(base_dir: str) -> List[tuple[str, str, Optional[str]]]:
|
||||
"""
|
||||
Undo the last rename in base_dir using the saved log.
|
||||
Returns same format as perform_renames. Does not remove the log on failure.
|
||||
"""
|
||||
renames = load_undo_log(base_dir)
|
||||
if not renames:
|
||||
return []
|
||||
# Reverse: rename current (new) back to original (old). Order must be reversed for chains.
|
||||
undo_list = [(new_name, old_name) for (old_name, new_name) in renames]
|
||||
undo_list.reverse()
|
||||
results = perform_renames(base_dir, undo_list, dry_run=False)
|
||||
if not any(r[2] for r in results):
|
||||
remove_undo_log(base_dir)
|
||||
return results
|
||||
|
||||
+128
-10
@@ -2,9 +2,11 @@
|
||||
Rename rules: each rule transforms a filename (stem + extension) and returns a new name.
|
||||
Rules are applied in sequence; extension can be changed by a rule that includes it.
|
||||
"""
|
||||
import csv
|
||||
import re
|
||||
from abc import ABC, abstractmethod
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import List, Optional
|
||||
|
||||
|
||||
@@ -13,8 +15,15 @@ class Rule(ABC):
|
||||
enabled: bool = True
|
||||
|
||||
@abstractmethod
|
||||
def apply(self, stem: str, ext: str, index: int, total: int) -> tuple[str, str]:
|
||||
"""Return (new_stem, new_ext). Index is 0-based."""
|
||||
def apply(
|
||||
self,
|
||||
stem: str,
|
||||
ext: str,
|
||||
index: int,
|
||||
total: int,
|
||||
original_name: Optional[str] = None,
|
||||
) -> tuple[str, str]:
|
||||
"""Return (new_stem, new_ext). Index is 0-based. original_name is the filename at pipeline start (for CSV lookup)."""
|
||||
pass
|
||||
|
||||
def _split_name(self, name: str) -> tuple[str, str]:
|
||||
@@ -33,7 +42,14 @@ class ReplaceRule(Rule):
|
||||
case_sensitive: bool = False
|
||||
whole_word: bool = False
|
||||
|
||||
def apply(self, stem: str, ext: str, index: int, total: int) -> tuple[str, str]:
|
||||
def apply(
|
||||
self,
|
||||
stem: str,
|
||||
ext: str,
|
||||
index: int,
|
||||
total: int,
|
||||
original_name: Optional[str] = None,
|
||||
) -> tuple[str, str]:
|
||||
if not self.find:
|
||||
return stem, ext
|
||||
flags = 0 if self.case_sensitive else re.IGNORECASE
|
||||
@@ -50,7 +66,14 @@ class RegexRule(Rule):
|
||||
pattern: str = ""
|
||||
replacement: str = ""
|
||||
|
||||
def apply(self, stem: str, ext: str, index: int, total: int) -> tuple[str, str]:
|
||||
def apply(
|
||||
self,
|
||||
stem: str,
|
||||
ext: str,
|
||||
index: int,
|
||||
total: int,
|
||||
original_name: Optional[str] = None,
|
||||
) -> tuple[str, str]:
|
||||
if not self.pattern:
|
||||
return stem, ext
|
||||
try:
|
||||
@@ -65,7 +88,14 @@ class InsertRule(Rule):
|
||||
text: str = ""
|
||||
position: int = 0 # 0 = start, -1 = end, or character index
|
||||
|
||||
def apply(self, stem: str, ext: str, index: int, total: int) -> tuple[str, str]:
|
||||
def apply(
|
||||
self,
|
||||
stem: str,
|
||||
ext: str,
|
||||
index: int,
|
||||
total: int,
|
||||
original_name: Optional[str] = None,
|
||||
) -> tuple[str, str]:
|
||||
if not self.text:
|
||||
return stem, ext
|
||||
if self.position <= 0:
|
||||
@@ -82,7 +112,14 @@ class RemoveRule(Rule):
|
||||
from_start: str = ""
|
||||
from_end: str = ""
|
||||
|
||||
def apply(self, stem: str, ext: str, index: int, total: int) -> tuple[str, str]:
|
||||
def apply(
|
||||
self,
|
||||
stem: str,
|
||||
ext: str,
|
||||
index: int,
|
||||
total: int,
|
||||
original_name: Optional[str] = None,
|
||||
) -> tuple[str, str]:
|
||||
if self.remove_type == "chars" and self.value:
|
||||
return stem.replace(self.value, ""), ext
|
||||
if self.remove_type == "digits":
|
||||
@@ -114,7 +151,14 @@ class RemoveRule(Rule):
|
||||
class CaseRule(Rule):
|
||||
case_type: str = "title" # upper, lower, title, sentence
|
||||
|
||||
def apply(self, stem: str, ext: str, index: int, total: int) -> tuple[str, str]:
|
||||
def apply(
|
||||
self,
|
||||
stem: str,
|
||||
ext: str,
|
||||
index: int,
|
||||
total: int,
|
||||
original_name: Optional[str] = None,
|
||||
) -> tuple[str, str]:
|
||||
if self.case_type == "upper":
|
||||
return stem.upper(), ext
|
||||
if self.case_type == "lower":
|
||||
@@ -137,7 +181,14 @@ class NumberingRule(Rule):
|
||||
insert_at: int = 0
|
||||
separator: str = " "
|
||||
|
||||
def apply(self, stem: str, ext: str, index: int, total: int) -> tuple[str, str]:
|
||||
def apply(
|
||||
self,
|
||||
stem: str,
|
||||
ext: str,
|
||||
index: int,
|
||||
total: int,
|
||||
original_name: Optional[str] = None,
|
||||
) -> tuple[str, str]:
|
||||
num = self.start + index * self.step
|
||||
num_str = str(num).zfill(max(1, self.padding))
|
||||
if self.where == "prefix":
|
||||
@@ -159,7 +210,14 @@ class EpisodeRenumberRule(Rule):
|
||||
# Pattern: group 1 = prefix (e.g. "S01E"), group 2 = episode digits, group 3 = rest (title)
|
||||
pattern: str = r"(.*?[Ss]\d+[Ee])(\d+)(.*)"
|
||||
|
||||
def apply(self, stem: str, ext: str, index: int, total: int) -> tuple[str, str]:
|
||||
def apply(
|
||||
self,
|
||||
stem: str,
|
||||
ext: str,
|
||||
index: int,
|
||||
total: int,
|
||||
original_name: Optional[str] = None,
|
||||
) -> tuple[str, str]:
|
||||
try:
|
||||
m = re.match(self.pattern, stem)
|
||||
except re.error:
|
||||
@@ -178,5 +236,65 @@ class PrefixSuffixRule(Rule):
|
||||
prefix: str = ""
|
||||
suffix: str = ""
|
||||
|
||||
def apply(self, stem: str, ext: str, index: int, total: int) -> tuple[str, str]:
|
||||
def apply(
|
||||
self,
|
||||
stem: str,
|
||||
ext: str,
|
||||
index: int,
|
||||
total: int,
|
||||
original_name: Optional[str] = None,
|
||||
) -> tuple[str, str]:
|
||||
return (self.prefix + stem + self.suffix), ext
|
||||
|
||||
|
||||
@dataclass
|
||||
class CsvMappingRule(Rule):
|
||||
"""Rename using a CSV with columns 'Original Name' and 'Target Name'. Lookup by original filename (as on disk)."""
|
||||
csv_path: str = ""
|
||||
_mapping: dict = field(default_factory=dict, repr=False) # original_name -> target_name, loaded on first use
|
||||
|
||||
def _load_mapping(self) -> dict:
|
||||
if getattr(self, "_mapping_loaded", False):
|
||||
return self._mapping
|
||||
self._mapping = {}
|
||||
path = Path(self.csv_path)
|
||||
if not path.is_file():
|
||||
setattr(self, "_mapping_loaded", True)
|
||||
return self._mapping
|
||||
try:
|
||||
with open(path, newline="", encoding="utf-8") as f:
|
||||
reader = csv.DictReader(f)
|
||||
if not reader.fieldnames:
|
||||
setattr(self, "_mapping_loaded", True)
|
||||
return self._mapping
|
||||
# Normalize header: strip spaces, accept "Original Name" and "Target Name"
|
||||
orig_key = next((k for k in reader.fieldnames if k.strip().lower() == "original name"), None)
|
||||
target_key = next((k for k in reader.fieldnames if k.strip().lower() == "target name"), None)
|
||||
if orig_key is None or target_key is None:
|
||||
setattr(self, "_mapping_loaded", True)
|
||||
return self._mapping
|
||||
for row in reader:
|
||||
orig = row.get(orig_key, "").strip()
|
||||
target = row.get(target_key, "").strip()
|
||||
if orig:
|
||||
self._mapping[orig] = target
|
||||
except (OSError, csv.Error):
|
||||
pass
|
||||
setattr(self, "_mapping_loaded", True)
|
||||
return self._mapping
|
||||
|
||||
def apply(
|
||||
self,
|
||||
stem: str,
|
||||
ext: str,
|
||||
index: int,
|
||||
total: int,
|
||||
original_name: Optional[str] = None,
|
||||
) -> tuple[str, str]:
|
||||
if not self.csv_path or original_name is None:
|
||||
return stem, ext
|
||||
mapping = self._load_mapping()
|
||||
target = mapping.get(original_name)
|
||||
if target is None:
|
||||
return stem, ext
|
||||
return self._split_name(target)
|
||||
|
||||
Reference in New Issue
Block a user