76574e2bee
Made-with: Cursor
327 lines
9.7 KiB
Python
327 lines
9.7 KiB
Python
"""
|
|
Rename rules: each rule transforms a filename (stem + extension) and returns a new name.
|
|
Rules are applied in sequence; extension can be changed by a rule that includes it.
|
|
"""
|
|
import csv
|
|
import re
|
|
from abc import ABC, abstractmethod
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import List, Optional
|
|
|
|
|
|
@dataclass
|
|
class Rule(ABC):
|
|
enabled: bool = True
|
|
|
|
@abstractmethod
|
|
def apply(
|
|
self,
|
|
stem: str,
|
|
ext: str,
|
|
index: int,
|
|
total: int,
|
|
original_name: Optional[str] = None,
|
|
) -> tuple[str, str]:
|
|
"""Return (new_stem, new_ext). Index is 0-based. original_name is the filename at pipeline start (for CSV lookup)."""
|
|
pass
|
|
|
|
def _split_name(self, name: str) -> tuple[str, str]:
|
|
if "." in name and not name.startswith("."):
|
|
idx = name.rfind(".")
|
|
return name[:idx], name[idx:]
|
|
return name, ""
|
|
|
|
|
|
# --- Text rules ---
|
|
|
|
@dataclass
|
|
class ReplaceRule(Rule):
|
|
find: str = ""
|
|
replace: str = ""
|
|
case_sensitive: bool = False
|
|
whole_word: bool = False
|
|
|
|
def apply(
|
|
self,
|
|
stem: str,
|
|
ext: str,
|
|
index: int,
|
|
total: int,
|
|
original_name: Optional[str] = None,
|
|
) -> tuple[str, str]:
|
|
if not self.find:
|
|
return stem, ext
|
|
flags = 0 if self.case_sensitive else re.IGNORECASE
|
|
if self.whole_word:
|
|
pattern = r"\b" + re.escape(self.find) + r"\b"
|
|
else:
|
|
pattern = re.escape(self.find)
|
|
new_stem = re.sub(pattern, self.replace, stem, flags=flags)
|
|
return new_stem, ext
|
|
|
|
|
|
@dataclass
|
|
class RegexRule(Rule):
|
|
pattern: str = ""
|
|
replacement: str = ""
|
|
|
|
def apply(
|
|
self,
|
|
stem: str,
|
|
ext: str,
|
|
index: int,
|
|
total: int,
|
|
original_name: Optional[str] = None,
|
|
) -> tuple[str, str]:
|
|
if not self.pattern:
|
|
return stem, ext
|
|
try:
|
|
new_stem = re.sub(self.pattern, self.replacement, stem)
|
|
except re.error:
|
|
return stem, ext
|
|
return new_stem, ext
|
|
|
|
|
|
@dataclass
|
|
class InsertRule(Rule):
|
|
text: str = ""
|
|
position: int = 0 # 0 = start, -1 = end, or character index
|
|
|
|
def apply(
|
|
self,
|
|
stem: str,
|
|
ext: str,
|
|
index: int,
|
|
total: int,
|
|
original_name: Optional[str] = None,
|
|
) -> tuple[str, str]:
|
|
if not self.text:
|
|
return stem, ext
|
|
if self.position <= 0:
|
|
return self.text + stem, ext # start
|
|
if self.position >= len(stem) or self.position == -1:
|
|
return stem + self.text, ext # end
|
|
return stem[: self.position] + self.text + stem[self.position :], ext
|
|
|
|
|
|
@dataclass
|
|
class RemoveRule(Rule):
|
|
remove_type: str = "chars" # chars, digits, first_n, last_n, from_start, from_end
|
|
value: str = "" # chars to remove, or n for first_n/last_n
|
|
from_start: str = ""
|
|
from_end: str = ""
|
|
|
|
def apply(
|
|
self,
|
|
stem: str,
|
|
ext: str,
|
|
index: int,
|
|
total: int,
|
|
original_name: Optional[str] = None,
|
|
) -> tuple[str, str]:
|
|
if self.remove_type == "chars" and self.value:
|
|
return stem.replace(self.value, ""), ext
|
|
if self.remove_type == "digits":
|
|
return re.sub(r"\d+", "", stem), ext
|
|
if self.remove_type == "first_n":
|
|
try:
|
|
n = int(self.value)
|
|
return stem[n:], ext
|
|
except ValueError:
|
|
return stem, ext
|
|
if self.remove_type == "last_n":
|
|
try:
|
|
n = int(self.value)
|
|
return stem[:-n] if n > 0 else stem, ext
|
|
except ValueError:
|
|
return stem, ext
|
|
if self.remove_type == "from_start" and self.from_start:
|
|
idx = stem.find(self.from_start)
|
|
if idx >= 0:
|
|
return stem[idx + len(self.from_start) :], ext
|
|
if self.remove_type == "from_end" and self.from_end:
|
|
idx = stem.rfind(self.from_end)
|
|
if idx >= 0:
|
|
return stem[:idx], ext
|
|
return stem, ext
|
|
|
|
|
|
@dataclass
|
|
class CaseRule(Rule):
|
|
case_type: str = "title" # upper, lower, title, sentence
|
|
|
|
def apply(
|
|
self,
|
|
stem: str,
|
|
ext: str,
|
|
index: int,
|
|
total: int,
|
|
original_name: Optional[str] = None,
|
|
) -> tuple[str, str]:
|
|
if self.case_type == "upper":
|
|
return stem.upper(), ext
|
|
if self.case_type == "lower":
|
|
return stem.lower(), ext
|
|
if self.case_type == "title":
|
|
return stem.title(), ext
|
|
if self.case_type == "sentence":
|
|
if stem:
|
|
return stem[0].upper() + stem[1:].lower(), ext
|
|
return stem, ext
|
|
return stem, ext
|
|
|
|
|
|
@dataclass
|
|
class NumberingRule(Rule):
|
|
start: int = 1
|
|
step: int = 1
|
|
padding: int = 2 # zero-pad width
|
|
where: str = "prefix" # prefix, suffix, insert_at
|
|
insert_at: int = 0
|
|
separator: str = " "
|
|
|
|
def apply(
|
|
self,
|
|
stem: str,
|
|
ext: str,
|
|
index: int,
|
|
total: int,
|
|
original_name: Optional[str] = None,
|
|
) -> tuple[str, str]:
|
|
num = self.start + index * self.step
|
|
num_str = str(num).zfill(max(1, self.padding))
|
|
if self.where == "prefix":
|
|
return f"{num_str}{self.separator}{stem}", ext
|
|
if self.where == "suffix":
|
|
return f"{stem}{self.separator}{num_str}", ext
|
|
if self.where == "insert_at":
|
|
pos = max(0, min(self.insert_at, len(stem)))
|
|
return stem[:pos] + num_str + self.separator + stem[pos:], ext
|
|
return stem, ext
|
|
|
|
|
|
# Episode renumber: match SxxExx or SxxExx-Eyy, replace episode block; keep title
|
|
@dataclass
|
|
class EpisodeRenumberRule(Rule):
|
|
start: int = 1
|
|
step: int = 1
|
|
padding: int = 2
|
|
# Group 1 = prefix (e.g. "S01E"), 2 = first ep digits, 3 = full "-E06" or None, 4 = second ep if range, 5 = rest (title)
|
|
pattern: str = r"(.*?[Ss]\d+[Ee])(\d+)(-[Ee](\d+))?(.*)"
|
|
|
|
def apply(
|
|
self,
|
|
stem: str,
|
|
ext: str,
|
|
index: int,
|
|
total: int,
|
|
original_name: Optional[str] = None,
|
|
) -> tuple[str, str]:
|
|
try:
|
|
m = re.match(self.pattern, stem)
|
|
except re.error:
|
|
return stem, ext
|
|
if not m:
|
|
return stem, ext
|
|
prefix, old_first_s, range_dash_e, old_second_s, rest = (
|
|
m.group(1),
|
|
m.group(2),
|
|
m.group(3),
|
|
m.group(4),
|
|
m.group(5),
|
|
)
|
|
try:
|
|
old_first = int(old_first_s)
|
|
except ValueError:
|
|
return stem, ext
|
|
span = 1
|
|
if old_second_s is not None:
|
|
try:
|
|
old_second = int(old_second_s)
|
|
except ValueError:
|
|
return stem, ext
|
|
span = old_second - old_first + 1
|
|
if span < 1:
|
|
span = 1
|
|
ep_new_first = self.start + index * self.step
|
|
pad = max(1, self.padding)
|
|
e1 = str(ep_new_first).zfill(pad)
|
|
if span <= 1:
|
|
new_stem = f"{prefix}{e1}{rest}"
|
|
else:
|
|
ep_new_last = ep_new_first + span - 1
|
|
e2 = str(ep_new_last).zfill(pad)
|
|
# Match common style: S01E01-E02 (second block uses E again)
|
|
new_stem = f"{prefix}{e1}-E{e2}{rest}"
|
|
return new_stem, ext
|
|
|
|
|
|
@dataclass
|
|
class PrefixSuffixRule(Rule):
|
|
prefix: str = ""
|
|
suffix: str = ""
|
|
|
|
def apply(
|
|
self,
|
|
stem: str,
|
|
ext: str,
|
|
index: int,
|
|
total: int,
|
|
original_name: Optional[str] = None,
|
|
) -> tuple[str, str]:
|
|
return (self.prefix + stem + self.suffix), ext
|
|
|
|
|
|
@dataclass
|
|
class CsvMappingRule(Rule):
|
|
"""Rename using a CSV with columns 'Original Name' and 'Target Name'. Lookup by original filename (as on disk)."""
|
|
csv_path: str = ""
|
|
_mapping: dict = field(default_factory=dict, repr=False) # original_name -> target_name, loaded on first use
|
|
|
|
def _load_mapping(self) -> dict:
|
|
if getattr(self, "_mapping_loaded", False):
|
|
return self._mapping
|
|
self._mapping = {}
|
|
path = Path(self.csv_path)
|
|
if not path.is_file():
|
|
setattr(self, "_mapping_loaded", True)
|
|
return self._mapping
|
|
try:
|
|
with open(path, newline="", encoding="utf-8") as f:
|
|
reader = csv.DictReader(f)
|
|
if not reader.fieldnames:
|
|
setattr(self, "_mapping_loaded", True)
|
|
return self._mapping
|
|
# Normalize header: strip spaces, accept "Original Name" and "Target Name"
|
|
orig_key = next((k for k in reader.fieldnames if k.strip().lower() == "original name"), None)
|
|
target_key = next((k for k in reader.fieldnames if k.strip().lower() == "target name"), None)
|
|
if orig_key is None or target_key is None:
|
|
setattr(self, "_mapping_loaded", True)
|
|
return self._mapping
|
|
for row in reader:
|
|
orig = row.get(orig_key, "").strip()
|
|
target = row.get(target_key, "").strip()
|
|
if orig:
|
|
self._mapping[orig] = target
|
|
except (OSError, csv.Error):
|
|
pass
|
|
setattr(self, "_mapping_loaded", True)
|
|
return self._mapping
|
|
|
|
def apply(
|
|
self,
|
|
stem: str,
|
|
ext: str,
|
|
index: int,
|
|
total: int,
|
|
original_name: Optional[str] = None,
|
|
) -> tuple[str, str]:
|
|
if not self.csv_path or original_name is None:
|
|
return stem, ext
|
|
mapping = self._load_mapping()
|
|
target = mapping.get(original_name)
|
|
if target is None:
|
|
return stem, ext
|
|
return self._split_name(target)
|