fix(udf): JobHandle.wait() terminates on failed jobs

wait() (sync + async) only stopped on finished/stale/committed, so a job the
server already reported as state=failed was polled until the (default 3600s)
timeout, then raised a misleading TimeoutError instead of the real cause. A
doomed backfill -- e.g. a multi-column REFRESH COLUMN of a scalar UDF -- hung
the client even though get_job surfaced the failure within ~3s.

Add a terminal failed branch that raises JobFailedError carrying the server
error, exported from the package. Verified end-to-end against the cluster:
raises in 3.6s instead of hanging. Unit-tested with a mock conn (sync+async,
failure + success + committed paths).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Wyatt Alt
2026-06-19 14:54:33 -07:00
committed by Jack Ye
parent 142ac835d3
commit 3a4cdb7aff
3 changed files with 118 additions and 0 deletions

View File

@@ -22,6 +22,7 @@ from .udf import (
table_udf,
Udf,
JobHandle,
JobFailedError,
MaterializedView,
AsyncJobHandle,
AsyncMaterializedView,
@@ -462,6 +463,7 @@ __all__ = [
"table_udf",
"Udf",
"JobHandle",
"JobFailedError",
"MaterializedView",
"AsyncJobHandle",
"AsyncMaterializedView",

View File

@@ -569,6 +569,20 @@ class MaterializedView:
_PROGRESS = re.compile(r"(\d+)/(\d+)")
class JobFailedError(RuntimeError):
"""Raised by ``JobHandle.wait()`` when the server reports the job ``failed``.
Carries the server-side error so a doomed backfill (e.g. a multi-column
``REFRESH COLUMN`` of a scalar UDF) surfaces its real cause promptly,
instead of the caller blocking until ``wait()``'s timeout.
"""
def __init__(self, job_id: str, error: "str | None"):
self.job_id = job_id
self.error = error
super().__init__(f"job {job_id} failed: {error or 'unknown error'}")
class JobHandle:
"""A reference to an inflight server-side job, with polling helpers."""
@@ -615,6 +629,11 @@ class JobHandle:
state = self.status()
if state in ("finished", "stale"):
return state
if state == "failed":
# Terminal failure -- surface the server error now, don't block
# until `timeout`. `finalize` wrote it to the job's status node.
job = self._job()
raise JobFailedError(self.id, job.error if job is not None else None)
if state == "pending":
time.sleep(min(poll, 0.5))
continue
@@ -713,6 +732,11 @@ class AsyncJobHandle:
state = await self.status()
if state in ("finished", "stale"):
return state
if state == "failed":
# Terminal failure -- surface the server error now, don't block
# until `timeout`. `finalize` wrote it to the job's status node.
job = await self._job()
raise JobFailedError(self.id, job.error if job is not None else None)
if state == "pending":
await asyncio.sleep(min(poll, 0.5))
continue

View File

@@ -0,0 +1,92 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
"""JobHandle.wait() terminal-state handling.
Regression coverage for the cluster backfill-failure hang: the server reports a
doomed job as ``state="failed"`` within seconds, but ``wait()`` used to ignore
``failed`` and block until its (default 3600s) timeout. These tests pin that a
``failed`` job raises ``JobFailedError`` promptly, carrying the server error.
"""
import asyncio
import time
import pytest
from lancedb.udf import JobHandle, AsyncJobHandle, JobFailedError
class FakeJobInfo:
"""Mirror of the pyo3 builtins.JobInfo fields wait()/status() read."""
def __init__(self, state, error=None, committed=False, units_total=None):
self.state = state
self.error = error
self.committed = committed
self.units_total = units_total
self.units_done = None
self.job_id = "job-1"
class FakeConn:
"""get_job() walks a scripted list of JobInfo (or None) snapshots, holding
the last one once exhausted, so wait() polls a deterministic timeline."""
def __init__(self, snapshots):
self._snaps = list(snapshots)
self.calls = 0
def get_job(self, job_id, table=None):
snap = self._snaps[min(self.calls, len(self._snaps) - 1)]
self.calls += 1
return snap
class AsyncFakeConn(FakeConn):
async def get_job(self, job_id, table=None):
return FakeConn.get_job(self, job_id, table)
def test_wait_raises_on_failed_promptly():
# pending -> failed: wait() must raise the server error, not TimeoutError.
conn = FakeConn(
[None, FakeJobInfo("failed", error="multi-column backfill needs a STRUCT")]
)
jh = JobHandle(conn, "job-1", table="t")
t0 = time.monotonic()
with pytest.raises(JobFailedError) as exc:
jh.wait(timeout=30, poll=0.01)
assert time.monotonic() - t0 < 5 # prompt, nowhere near the 30s timeout
assert "STRUCT" in str(exc.value)
assert exc.value.error == "multi-column backfill needs a STRUCT"
assert exc.value.job_id == "job-1"
def test_wait_returns_finished_on_success():
# running -> finished (job left the inflight listing) returns normally.
conn = FakeConn([FakeJobInfo("running", units_total=2), None])
jh = JobHandle(conn, "job-1", table="t")
jh._seen = True # already observed, so a None now means "finished" not grace
assert jh.wait(timeout=30, poll=0.01) == "finished"
def test_wait_returns_finished_on_committed():
# A committed job that is still listed resolves to finished.
conn = FakeConn([FakeJobInfo("running", committed=True, units_total=2)])
jh = JobHandle(conn, "job-1", table="t")
jh._seen = True
assert jh.wait(timeout=30, poll=0.01) == "finished"
def test_async_wait_raises_on_failed_promptly():
conn = AsyncFakeConn([None, FakeJobInfo("failed", error="boom")])
jh = AsyncJobHandle(conn, "job-1", table="t")
async def run():
t0 = time.monotonic()
with pytest.raises(JobFailedError) as exc:
await jh.wait(timeout=30, poll=0.01)
assert time.monotonic() - t0 < 5
assert exc.value.error == "boom"
asyncio.run(run())