From 3a4cdb7aff0ea7189a66141e7dcc8fc35e30ca80 Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Fri, 19 Jun 2026 14:54:33 -0700 Subject: [PATCH] fix(udf): JobHandle.wait() terminates on failed jobs wait() (sync + async) only stopped on finished/stale/committed, so a job the server already reported as state=failed was polled until the (default 3600s) timeout, then raised a misleading TimeoutError instead of the real cause. A doomed backfill -- e.g. a multi-column REFRESH COLUMN of a scalar UDF -- hung the client even though get_job surfaced the failure within ~3s. Add a terminal failed branch that raises JobFailedError carrying the server error, exported from the package. Verified end-to-end against the cluster: raises in 3.6s instead of hanging. Unit-tested with a mock conn (sync+async, failure + success + committed paths). Co-Authored-By: Claude Opus 4.8 (1M context) --- python/python/lancedb/__init__.py | 2 + python/python/lancedb/udf.py | 24 +++++++ python/python/tests/test_job_handle.py | 92 ++++++++++++++++++++++++++ 3 files changed, 118 insertions(+) create mode 100644 python/python/tests/test_job_handle.py diff --git a/python/python/lancedb/__init__.py b/python/python/lancedb/__init__.py index c34196e1d..4dc692d81 100644 --- a/python/python/lancedb/__init__.py +++ b/python/python/lancedb/__init__.py @@ -22,6 +22,7 @@ from .udf import ( table_udf, Udf, JobHandle, + JobFailedError, MaterializedView, AsyncJobHandle, AsyncMaterializedView, @@ -462,6 +463,7 @@ __all__ = [ "table_udf", "Udf", "JobHandle", + "JobFailedError", "MaterializedView", "AsyncJobHandle", "AsyncMaterializedView", diff --git a/python/python/lancedb/udf.py b/python/python/lancedb/udf.py index ef46094a5..c4f91f893 100644 --- a/python/python/lancedb/udf.py +++ b/python/python/lancedb/udf.py @@ -569,6 +569,20 @@ class MaterializedView: _PROGRESS = re.compile(r"(\d+)/(\d+)") +class JobFailedError(RuntimeError): + """Raised by ``JobHandle.wait()`` when the server reports the job ``failed``. + + Carries the server-side error so a doomed backfill (e.g. a multi-column + ``REFRESH COLUMN`` of a scalar UDF) surfaces its real cause promptly, + instead of the caller blocking until ``wait()``'s timeout. + """ + + def __init__(self, job_id: str, error: "str | None"): + self.job_id = job_id + self.error = error + super().__init__(f"job {job_id} failed: {error or 'unknown error'}") + + class JobHandle: """A reference to an inflight server-side job, with polling helpers.""" @@ -615,6 +629,11 @@ class JobHandle: state = self.status() if state in ("finished", "stale"): return state + if state == "failed": + # Terminal failure -- surface the server error now, don't block + # until `timeout`. `finalize` wrote it to the job's status node. + job = self._job() + raise JobFailedError(self.id, job.error if job is not None else None) if state == "pending": time.sleep(min(poll, 0.5)) continue @@ -713,6 +732,11 @@ class AsyncJobHandle: state = await self.status() if state in ("finished", "stale"): return state + if state == "failed": + # Terminal failure -- surface the server error now, don't block + # until `timeout`. `finalize` wrote it to the job's status node. + job = await self._job() + raise JobFailedError(self.id, job.error if job is not None else None) if state == "pending": await asyncio.sleep(min(poll, 0.5)) continue diff --git a/python/python/tests/test_job_handle.py b/python/python/tests/test_job_handle.py new file mode 100644 index 000000000..1cb194a1d --- /dev/null +++ b/python/python/tests/test_job_handle.py @@ -0,0 +1,92 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright The LanceDB Authors +"""JobHandle.wait() terminal-state handling. + +Regression coverage for the cluster backfill-failure hang: the server reports a +doomed job as ``state="failed"`` within seconds, but ``wait()`` used to ignore +``failed`` and block until its (default 3600s) timeout. These tests pin that a +``failed`` job raises ``JobFailedError`` promptly, carrying the server error. +""" + +import asyncio +import time + +import pytest + +from lancedb.udf import JobHandle, AsyncJobHandle, JobFailedError + + +class FakeJobInfo: + """Mirror of the pyo3 builtins.JobInfo fields wait()/status() read.""" + + def __init__(self, state, error=None, committed=False, units_total=None): + self.state = state + self.error = error + self.committed = committed + self.units_total = units_total + self.units_done = None + self.job_id = "job-1" + + +class FakeConn: + """get_job() walks a scripted list of JobInfo (or None) snapshots, holding + the last one once exhausted, so wait() polls a deterministic timeline.""" + + def __init__(self, snapshots): + self._snaps = list(snapshots) + self.calls = 0 + + def get_job(self, job_id, table=None): + snap = self._snaps[min(self.calls, len(self._snaps) - 1)] + self.calls += 1 + return snap + + +class AsyncFakeConn(FakeConn): + async def get_job(self, job_id, table=None): + return FakeConn.get_job(self, job_id, table) + + +def test_wait_raises_on_failed_promptly(): + # pending -> failed: wait() must raise the server error, not TimeoutError. + conn = FakeConn( + [None, FakeJobInfo("failed", error="multi-column backfill needs a STRUCT")] + ) + jh = JobHandle(conn, "job-1", table="t") + t0 = time.monotonic() + with pytest.raises(JobFailedError) as exc: + jh.wait(timeout=30, poll=0.01) + assert time.monotonic() - t0 < 5 # prompt, nowhere near the 30s timeout + assert "STRUCT" in str(exc.value) + assert exc.value.error == "multi-column backfill needs a STRUCT" + assert exc.value.job_id == "job-1" + + +def test_wait_returns_finished_on_success(): + # running -> finished (job left the inflight listing) returns normally. + conn = FakeConn([FakeJobInfo("running", units_total=2), None]) + jh = JobHandle(conn, "job-1", table="t") + jh._seen = True # already observed, so a None now means "finished" not grace + assert jh.wait(timeout=30, poll=0.01) == "finished" + + +def test_wait_returns_finished_on_committed(): + # A committed job that is still listed resolves to finished. + conn = FakeConn([FakeJobInfo("running", committed=True, units_total=2)]) + jh = JobHandle(conn, "job-1", table="t") + jh._seen = True + assert jh.wait(timeout=30, poll=0.01) == "finished" + + +def test_async_wait_raises_on_failed_promptly(): + conn = AsyncFakeConn([None, FakeJobInfo("failed", error="boom")]) + jh = AsyncJobHandle(conn, "job-1", table="t") + + async def run(): + t0 = time.monotonic() + with pytest.raises(JobFailedError) as exc: + await jh.wait(timeout=30, poll=0.01) + assert time.monotonic() - t0 < 5 + assert exc.value.error == "boom" + + asyncio.run(run())