#!/usr/bin/env python3 """ Clamp VerifiedBuild literals to signed MEDIUMINT range for DBs that still use MEDIUMINT on VerifiedBuild (see data/sql/old/db_world/8.x/2022_10_30_01.sql). Processes INSERT (explicit column lists) and simple UPDATE ... VerifiedBuild = N. """ from __future__ import annotations import re import sys from pathlib import Path MAX_SIGNED_MEDIUMINT = 8388607 MIN_SIGNED_MEDIUMINT = -8388608 INSERT_RE = re.compile( r"INSERT\s+INTO\s+`(?P\w+)`\s*\((?P[^)]+)\)\s*VALUES\s*", re.IGNORECASE, ) # Base dumps: full row order ends with VerifiedBuild (6- or 8-column layouts). INSERT_SPELL_TP_VALUES_RE = re.compile( r"INSERT\s+INTO\s+`spell_target_position`\s*VALUES\s*", re.IGNORECASE, ) STMT_START = ( "INSERT ", "DELETE ", "UPDATE ", "ALTER ", "DROP ", "CREATE ", "SET ", "REPLACE ", ) def split_fields(inner: str) -> list[str]: fields: list[str] = [] cur: list[str] = [] depth = 0 in_str = False str_ch = "" esc = False for ch in inner: if in_str: cur.append(ch) if esc: esc = False elif ch == "\\": esc = True elif ch == str_ch: in_str = False continue if ch in "'\"": in_str = True str_ch = ch cur.append(ch) continue if ch == "(": depth += 1 elif ch == ")": depth -= 1 if ch == "," and depth == 0: fields.append("".join(cur).strip()) cur = [] else: cur.append(ch) fields.append("".join(cur).strip()) return fields def join_fields(fields: list[str]) -> str: return ", ".join(fields) def clamp_tuple(tuple_str: str, vb_idx: int) -> str: t = tuple_str.strip() if not (t.startswith("(") and t.endswith(")")): return tuple_str fields = split_fields(t[1:-1]) if vb_idx >= len(fields): return tuple_str raw = fields[vb_idx].strip() if not re.fullmatch(r"-?\d+", raw): return tuple_str n = int(raw) if MIN_SIGNED_MEDIUMINT <= n <= MAX_SIGNED_MEDIUMINT: return tuple_str fields[vb_idx] = "0" return "(" + join_fields(fields) + ")" def process_value_segment(segment: str, vb_idx: int) -> tuple[str, int]: out: list[str] = [] changes = 0 i, n = 0, len(segment) while i < n: if segment[i] == "(": depth = 1 j = i + 1 while j < n and depth > 0: if segment[j] == "(": depth += 1 elif segment[j] == ")": depth -= 1 j += 1 tup = segment[i:j] new_tup = clamp_tuple(tup, vb_idx) if new_tup != tup: changes += 1 out.append(new_tup) i = j continue out.append(segment[i]) i += 1 return "".join(out), changes def clamp_update_verifiedbuild(line: str) -> tuple[str, int]: changes = 0 def repl(m: re.Match[str]) -> str: nonlocal changes num = m.group(1) n = int(num) if MIN_SIGNED_MEDIUMINT <= n <= MAX_SIGNED_MEDIUMINT: return m.group(0) changes += 1 return f"`VerifiedBuild` = 0" new_line = re.sub( r"`VerifiedBuild`\s*=\s*([0-9]+)", repl, line, flags=re.IGNORECASE, ) return new_line, changes def first_tuple_vb_index(segment: str) -> int | None: i, n = 0, len(segment) while i < n: if segment[i] == "(": depth = 1 j = i + 1 while j < n and depth > 0: if segment[j] == "(": depth += 1 elif segment[j] == ")": depth -= 1 j += 1 tup = segment[i:j] fields = split_fields(tup[1:-1]) if fields: return len(fields) - 1 return None i += 1 return None def process_sql_text(text: str) -> tuple[str, int]: lines = text.splitlines(keepends=True) out: list[str] = [] mode: str | None = None vb_idx = 0 total_changes = 0 for line in lines: nl, ch = clamp_update_verifiedbuild(line) total_changes += ch line = nl stripped = line.lstrip() if mode == "insert_values": if stripped and not stripped.startswith("(") and not stripped.startswith("--"): if any(stripped.startswith(p) for p in STMT_START): mode = None if mode == "spell_tp_values": if stripped and not stripped.startswith("(") and not stripped.startswith("--"): if any(stripped.startswith(p) for p in STMT_START): mode = None if mode is None: m = INSERT_RE.search(line) if m and "`VerifiedBuild`" in m.group("cols"): cols = [c.strip().strip("`") for c in m.group("cols").split(",")] vb_idx = cols.index("VerifiedBuild") mode = "insert_values" prefix = line[: m.end()] suffix = line[m.end() :] if suffix.strip(): new_suf, ch2 = process_value_segment(suffix, vb_idx) total_changes += ch2 line = prefix + new_suf if ");" in line or line.rstrip().endswith(");"): mode = None out.append(line) continue m2 = INSERT_SPELL_TP_VALUES_RE.search(line) if m2: mode = "spell_tp_values" vb_idx = -1 prefix = line[: m2.end()] suffix = line[m2.end() :] if suffix.strip(): vb_idx = first_tuple_vb_index(suffix) if vb_idx is None: vb_idx = 7 new_suf, ch2 = process_value_segment(suffix, vb_idx) total_changes += ch2 line = prefix + new_suf if ");" in line or line.rstrip().endswith(");"): mode = None out.append(line) continue out.append(line) continue if mode == "spell_tp_values": if vb_idx < 0: vb_idx = first_tuple_vb_index(line) if vb_idx is None: vb_idx = 7 new_line, ch3 = process_value_segment(line, vb_idx) total_changes += ch3 out.append(new_line) if ");" in line or line.rstrip().endswith(");"): mode = None continue # insert_values continuation new_line, ch3 = process_value_segment(line, vb_idx) total_changes += ch3 out.append(new_line) if ");" in line or line.rstrip().endswith(");"): mode = None return "".join(out), total_changes def main() -> int: sql_root = Path(__file__).resolve().parents[1] roots = [ sql_root / "archive" / "db_world", sql_root / "updates" / "db_world", sql_root / "base" / "db_world", ] file_count = 0 change_lines = 0 for root in roots: if not root.exists(): continue for path in sorted(root.glob("*.sql")): text = path.read_text(encoding="utf-8", errors="strict") new_text, ch = process_sql_text(text) if ch: path.write_text(new_text, encoding="utf-8") change_lines += ch file_count += 1 print(f"{path.relative_to(sql_root)}: {ch} change(s)") print(f"Done. {file_count} file(s) written, {change_lines} change(s).") return 0 if __name__ == "__main__": sys.exit(main())