"""Smoke E2E para POST /ffmpeg/sequence-clip-preparation (Fase 2.5.e).

Verifica el primitivo de preparación de clips all-intra:

  1. 401 sin token.
  2. Validation: clips vacíos → 422 Pydantic.
  3. Validation: out_ms <= in_ms → error stage=validation.
  4. Validation: src no existe → error stage=validation.
  5. Real: 3 clips de un mismo video → 3 mezzanines con phases
     starting → (clip_starting → clip_progress* → clip_done)×3 → done.
     Verifica que los 3 dst existan y sean all-intra (keyint=1).
  6. Cancel: lanza un job con 3 clips, cancela durante el primero,
     espera phase=cancelled con processed<total.

Uso:
    python companion/scripts/smoke_sequence_clip_preparation.py
"""
from __future__ import annotations

import json
import os
import shutil
import socket
import subprocess
import sys
import tempfile
import threading
import time
import urllib.error
import urllib.request
from pathlib import Path

REPO_ROOT = Path(__file__).resolve().parents[2]
COMPANION_SRC = REPO_ROOT / "companion" / "src"
TOKEN = "smoke-seqprep-token-1234567890"
PORT = 7492


def _free_port_or_die(port: int) -> None:
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.settimeout(0.3)
        if s.connect_ex(("127.0.0.1", port)) == 0:
            sys.exit(f"FATAL: puerto {port} en uso")


def _wait_companion(timeout_s: float = 15.0) -> None:
    deadline = time.monotonic() + timeout_s
    while time.monotonic() < deadline:
        try:
            with urllib.request.urlopen(f"http://127.0.0.1:{PORT}/health", timeout=1.0) as r:
                if r.status == 200:
                    return
        except (urllib.error.URLError, ConnectionError, OSError):
            time.sleep(0.3)
    raise TimeoutError("companion no respondió a /health")


def _req(method: str, path: str, body: dict | None = None, *, with_token: bool = True) -> tuple[int, bytes]:
    data = json.dumps(body).encode("utf-8") if body is not None else None
    headers: dict[str, str] = {}
    if with_token:
        headers["X-Companion-Token"] = TOKEN
    if data is not None:
        headers["Content-Type"] = "application/json"
    req = urllib.request.Request(
        f"http://127.0.0.1:{PORT}{path}",
        data=data,
        headers=headers,
        method=method,
    )
    try:
        with urllib.request.urlopen(req, timeout=30) as r:
            return r.status, r.read()
    except urllib.error.HTTPError as exc:
        return exc.code, exc.read()


def _stream_sse(body: dict, *, timeout_s: float = 120.0) -> list[dict]:
    """POST blocking, devuelve eventos parseados."""
    req = urllib.request.Request(
        f"http://127.0.0.1:{PORT}/ffmpeg/sequence-clip-preparation",
        data=json.dumps(body).encode("utf-8"),
        headers={"Content-Type": "application/json", "X-Companion-Token": TOKEN},
        method="POST",
    )
    events: list[dict] = []
    with urllib.request.urlopen(req, timeout=timeout_s) as r:
        for raw in r:
            line = raw.decode("utf-8", errors="replace").rstrip("\r\n")
            if line.startswith("data: "):
                try:
                    events.append(json.loads(line[6:]))
                except Exception:
                    pass
    return events


def _stream_sse_with_cancel(body: dict, cancel_after_s: float, *, timeout_s: float = 120.0) -> tuple[list[dict], str | None]:
    events: list[dict] = []
    captured: list[str] = []
    done = threading.Event()

    def run() -> None:
        req = urllib.request.Request(
            f"http://127.0.0.1:{PORT}/ffmpeg/sequence-clip-preparation",
            data=json.dumps(body).encode("utf-8"),
            headers={"Content-Type": "application/json", "X-Companion-Token": TOKEN},
            method="POST",
        )
        try:
            with urllib.request.urlopen(req, timeout=timeout_s) as r:
                for raw in r:
                    line = raw.decode("utf-8", errors="replace").rstrip("\r\n")
                    if line.startswith("data: "):
                        try:
                            evt = json.loads(line[6:])
                        except Exception:
                            continue
                        events.append(evt)
                        if evt.get("phase") == "starting" and evt.get("job_id"):
                            captured.append(evt["job_id"])
                        if evt.get("phase") in ("done", "cancelled", "error"):
                            break
        except (ConnectionResetError, urllib.error.URLError, OSError) as exc:
            # Companion puede cerrar el socket abruptamente al cancelar.
            print(f"[debug] SSE thread excepción: {type(exc).__name__}: {exc}")
        finally:
            done.set()

    t = threading.Thread(target=run, daemon=True)
    t.start()
    deadline = time.monotonic() + 5.0
    while time.monotonic() < deadline and not captured:
        time.sleep(0.05)
    job_id = captured[0] if captured else None
    if job_id:
        time.sleep(cancel_after_s)
        try:
            _req("POST", "/ffmpeg/cancel", {"job_id": job_id})
        except Exception as exc:  # noqa: BLE001
            print(f"[debug] cancel POST excepción: {type(exc).__name__}: {exc}")
    done.wait(timeout=timeout_s)
    return events, job_id


def _generate_video(dst: Path, duration_s: int = 30, *, size: str = "320x240") -> None:
    cmd = [
        "ffmpeg", "-hide_banner", "-loglevel", "error", "-y",
        "-f", "lavfi", "-i", f"testsrc2=duration={duration_s}:size={size}:rate=30",
        "-f", "lavfi", "-i", f"sine=frequency=440:duration={duration_s}",
        "-c:v", "libx264", "-preset", "ultrafast", "-pix_fmt", "yuv420p",
        "-c:a", "aac", "-shortest",
        str(dst),
    ]
    subprocess.run(cmd, check=True, capture_output=True)


def _is_all_intra(path: Path) -> bool:
    """Comprueba con ffprobe que TODOS los frames del primer segundo son keyframes."""
    cmd = [
        "ffprobe", "-v", "error", "-select_streams", "v:0",
        "-read_intervals", "%+#30",  # primeros 30 frames
        "-show_entries", "frame=pict_type",
        "-of", "csv=p=0",
        str(path),
    ]
    out = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
    if out.returncode != 0:
        print(f"[debug] ffprobe rc={out.returncode}: {out.stderr[:200]}")
        return False
    types = [line.strip().rstrip(",").strip() for line in out.stdout.splitlines() if line.strip()]
    if not types:
        print(f"[debug] ffprobe stdout vacío para {path}")
        return False
    is_all_i = all(t == "I" for t in types)
    if not is_all_i:
        print(f"[debug] {path.name}: tipos={types[:10]}... (count={len(types)})")
    return is_all_i


def main() -> int:
    if not shutil.which("ffmpeg") or not shutil.which("ffprobe"):
        sys.exit("FATAL: ffmpeg/ffprobe no en PATH")

    _free_port_or_die(PORT)

    smoke_root = Path(tempfile.mkdtemp(prefix="_smoke_seqprep_"))
    workspace = smoke_root / "ws"
    cert_dir = smoke_root / "certs"
    workspace.mkdir(parents=True)
    cert_dir.mkdir(parents=True)

    sys.path.insert(0, str(COMPANION_SRC))
    from maxeditor_companion.workspace import set_workspace  # type: ignore  # noqa: E402

    set_workspace(cert_dir, workspace)

    src_dir = workspace / "projects" / "smoke_seqprep"
    src_dir.mkdir(parents=True)
    src_video = src_dir / "long.mp4"
    _generate_video(src_video, duration_s=30)
    print(f"[smoke] video fuente: {src_video} ({src_video.stat().st_size} B, 30s)")
    # Para test 6: video 1280x720 60s para que ffmpeg tarde varios segundos
    # encodeando y el cancel mid-clip pueda ejecutarse antes de que termine.
    heavy_video = src_dir / "heavy.mp4"
    _generate_video(heavy_video, duration_s=60, size="1280x720")
    print(f"[smoke] video pesado: {heavy_video} ({heavy_video.stat().st_size} B, 60s, 1280x720)")

    env = os.environ.copy()
    env.update({
        "COMPANION_HOST": "127.0.0.1",
        "COMPANION_PORT": str(PORT),
        "COMPANION_USE_SSL": "0",
        "COMPANION_HEADLESS": "1",
        "COMPANION_TOKEN": TOKEN,
        "COMPANION_LOG_DIR": str(smoke_root / "logs"),
        "COMPANION_CERT_DIR": str(cert_dir),
        "PYTHONPATH": str(COMPANION_SRC) + os.pathsep + env.get("PYTHONPATH", ""),
    })

    print(f"[smoke] arrancando companion en http://127.0.0.1:{PORT} (token={TOKEN})")
    proc = subprocess.Popen(
        [sys.executable, "-u", "-m", "maxeditor_companion"],
        env=env,
        stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True,
    )

    failures: list[str] = []
    passed: list[str] = []

    def fail(name: str, msg: str) -> None:
        failures.append(f"{name}: {msg}")

    def ok(name: str, detail: str = "") -> None:
        passed.append(f"{name}{(' — ' + detail) if detail else ''}")

    try:
        _wait_companion()

        # ── 1) 401 sin token ──────────────────────────────────────
        status, _body = _req(
            "POST", "/ffmpeg/sequence-clip-preparation",
            {"clips": [{"block_id": "a", "src": "smoke_seqprep/long.mp4", "in_ms": 0, "out_ms": 1000}], "output_dir": "smoke_seqprep/out1"},
            with_token=False,
        )
        if status == 401:
            ok("1) 401 sin token")
        else:
            fail("1) 401 sin token", f"status={status}")

        # ── 2) clips vacíos → 422 Pydantic ────────────────────────
        status, body = _req(
            "POST", "/ffmpeg/sequence-clip-preparation",
            {"clips": [], "output_dir": "smoke_seqprep/out2"},
        )
        if status == 422:
            ok("2) clips vacíos → 422")
        else:
            fail("2) clips vacíos → 422", f"status={status}, body={body[:120]!r}")

        # ── 3) out_ms <= in_ms → error stage=validation ──────────
        events = _stream_sse({
            "clips": [{"block_id": "x", "src": "smoke_seqprep/long.mp4", "in_ms": 1000, "out_ms": 1000}],
            "output_dir": "smoke_seqprep/out3",
        })
        err = next((e for e in events if e.get("phase") == "error"), None)
        if err and err.get("stage") == "validation":
            ok("3) out_ms<=in_ms → validation", err.get("error", "")[:80])
        else:
            fail("3) out_ms<=in_ms", f"events={[e.get('phase') for e in events]}")

        # ── 4) src no existe → error stage=validation ────────────
        events = _stream_sse({
            "clips": [{"block_id": "x", "src": "smoke_seqprep/missing.mp4", "in_ms": 0, "out_ms": 1000}],
            "output_dir": "smoke_seqprep/out4",
        })
        err = next((e for e in events if e.get("phase") == "error"), None)
        if err and err.get("stage") == "validation":
            ok("4) src no existe → validation", err.get("error", "")[:80])
        else:
            fail("4) src no existe", f"events={[e.get('phase') for e in events]}")

        # ── 5) Real: 3 clips de 2s cada uno ──────────────────────
        out_dir = "smoke_seqprep/out5"
        events = _stream_sse({
            "clips": [
                {"block_id": "blk1", "label": "Bloque 1", "src": "smoke_seqprep/long.mp4", "in_ms": 0, "out_ms": 2000},
                {"block_id": "blk2", "label": "Bloque 2", "src": "smoke_seqprep/long.mp4", "in_ms": 5000, "out_ms": 7000},
                {"block_id": "blk3", "label": "Bloque 3", "src": "smoke_seqprep/long.mp4", "in_ms": 15000, "out_ms": 17000},
            ],
            "output_dir": out_dir,
        })
        phases = [e.get("phase") for e in events]
        starting = next((e for e in events if e.get("phase") == "starting"), None)
        clip_done = [e for e in events if e.get("phase") == "clip_done"]
        clip_progress = [e for e in events if e.get("phase") == "clip_progress"]
        done = next((e for e in events if e.get("phase") == "done"), None)
        if not done or done.get("succeeded") != 3 or done.get("failed") != 0:
            fail("5) real 3 clips", f"done={done}, phases={phases}")
        elif len(clip_done) != 3:
            fail("5) real 3 clips", f"clip_done={len(clip_done)} esperados 3, phases={phases}")
        elif starting is None or len(starting.get("clips") or []) != 3:
            fail("5) real 3 clips", f"starting.clips len={len(starting.get('clips') or []) if starting else 0}")
        else:
            # Verifica que los 3 dst existan y sean all-intra.
            all_intra_count = 0
            for cd in clip_done:
                p = Path(cd["dst"])
                if p.exists() and _is_all_intra(p):
                    all_intra_count += 1
            if all_intra_count != 3:
                fail("5) real 3 clips", f"all-intra solo {all_intra_count}/3")
            else:
                ok(
                    "5) real 3 clips",
                    f"3 clips ok all-intra, clip_progress_total={len(clip_progress)}, "
                    f"job_id={starting.get('job_id', '')[:12]}.., elapsed={done.get('elapsed_s')}s",
                )

        # ── 6) Cancel mid-clip ───────────────────────────────────
        out_dir = "smoke_seqprep/out6"
        try:
            events, job_id = _stream_sse_with_cancel(
                {
                    "clips": [
                        {"block_id": "c1", "src": "smoke_seqprep/heavy.mp4", "in_ms": 0, "out_ms": 60000},
                        {"block_id": "c2", "src": "smoke_seqprep/heavy.mp4", "in_ms": 0, "out_ms": 60000},
                    ],
                    "output_dir": out_dir,
                },
                cancel_after_s=1.0,
            )
        except Exception as exc:  # noqa: BLE001
            fail("6) cancel mid-clip", f"excepción cliente: {exc}")
            events, job_id = [], None
        cancelled = next((e for e in events if e.get("phase") == "cancelled"), None)
        done_evt = next((e for e in events if e.get("phase") == "done"), None)
        if not job_id:
            fail("6) cancel mid-clip", "no job_id en starting")
        elif done_evt is not None:
            fail("6) cancel mid-clip", f"llegó done en lugar de cancelled: {done_evt}")
        elif not cancelled:
            fail("6) cancel mid-clip", f"no cancelled. phases={[e.get('phase') for e in events]}")
        elif cancelled.get("processed", 99) >= 3:
            fail("6) cancel mid-clip", f"processed={cancelled.get('processed')} >= 3 (no canceló mid-batch)")
        else:
            ok(
                "6) cancel mid-clip",
                f"processed={cancelled.get('processed')}/{cancelled.get('total_clips')}, "
                f"succeeded={cancelled.get('succeeded')}, reason={cancelled.get('reason')}",
            )

    finally:
        proc.terminate()
        try:
            proc.wait(timeout=5)
        except subprocess.TimeoutExpired:
            proc.kill()
        # Diagnóstico: si hubo failures, volcar las últimas líneas del companion.
        if failures:
            try:
                if proc.stdout is not None:
                    tail = proc.stdout.read() or ""
                    if tail:
                        print()
                        print("[companion stdout tail]")
                        for line in tail.splitlines()[-40:]:
                            print(f"  | {line}")
            except Exception:  # noqa: BLE001
                pass
        try:
            shutil.rmtree(smoke_root, ignore_errors=True)
        except Exception:
            pass

    total = len(passed) + len(failures)
    print()
    print("─" * 60)
    print(f"PASSED ({len(passed)}/{total}):")
    for p in passed:
        print(f"  ✓ {p}")
    if failures:
        print()
        print(f"FAILED ({len(failures)}/{total}):")
        for f in failures:
            print(f"  ✗ {f}")
        return 1
    return 0


if __name__ == "__main__":
    raise SystemExit(main())
