job wait(): poll by id via get_job (point access) instead of list_jobs

JobHandle/AsyncJobHandle now poll conn.get_job(id, table) -- one job -- instead
of list_jobs() + client-side filter over every active job. The job's table is
threaded in from refresh_column / MV refresh as an O(1) lookup hint. Plumbs
get_job through the Database trait (default not_supported), RemoteDatabase
(GET /v1/job/{id}?table=...), the Connection wrapper, and the pyo3 binding +
db.py.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Wyatt Alt
2026-06-17 14:08:03 -07:00
committed by Jack Ye
parent 9dfa43a9de
commit 3f44f93e92
7 changed files with 117 additions and 36 deletions

View File

@@ -784,6 +784,15 @@ class DBConnection(EnforceOverrides):
"""List inflight server-side jobs across the database's tables."""
return LOOP.run(self._conn.list_jobs())
def get_job(self, job_id: str, table: "str | None" = None):
"""Look up one server-side job by id (the wait()/status poll path).
Passing ``table`` (the job's table) lets the server answer with an O(1)
single-node read instead of scanning the database's active jobs.
Returns the job's status, or None if it's unknown or no longer active.
"""
return LOOP.run(self._conn.get_job(job_id, table))
def cancel_job(self, job_id: str) -> bool:
"""Cancel an inflight server-side job by id (CANCEL JOB).
@@ -2182,6 +2191,12 @@ class AsyncConnection(object):
"""List inflight server-side jobs across the database's tables."""
return await self._inner.list_jobs()
async def get_job(self, job_id: str, table: "str | None" = None):
"""Look up one server-side job by id (the wait()/status poll path).
``table`` (the job's table) enables an O(1) server-side lookup.
Returns the job's status, or None if unknown / no longer active."""
return await self._inner.get_job(job_id, table)
async def cancel_job(self, job_id: str) -> bool:
"""Cancel an inflight server-side job by id (CANCEL JOB).

View File

@@ -3842,7 +3842,7 @@ class LanceTable(Table):
priority=priority,
)
)
return JobHandle(self._conn, job_id)
return JobHandle(self._conn, job_id, table=self.name)
def alter_columns(
self, *alterations: Iterable[Dict[str, str]]

View File

@@ -510,7 +510,9 @@ class MaterializedView:
A no-op when the view was created with no data."""
if self.job_id is None:
return "finished"
return JobHandle(self.conn, self.job_id).wait(timeout=timeout, poll=poll)
return JobHandle(self.conn, self.job_id, table=self.name).wait(
timeout=timeout, poll=poll
)
def refresh(self, full: bool = False) -> "JobHandle":
"""Refresh the materialized view; returns a `JobHandle` to wait on,
@@ -521,7 +523,7 @@ class MaterializedView:
the view's indexes -- they are reindexed by the distributed indexer.
"""
job_id = self.conn._refresh_materialized_view(self.name, full=full)
return JobHandle(self.conn, job_id)
return JobHandle(self.conn, job_id, table=self.name)
def explain_refresh(self, full: bool = False):
"""Plan a refresh without running it (EXPLAIN REFRESH)."""
@@ -574,20 +576,20 @@ class JobHandle:
#: -> agent cycle -> manifest write is async).
GRACE_SECONDS = 20.0
def __init__(self, conn, job_id: str):
def __init__(self, conn, job_id: str, table: "str | None" = None):
self.conn = conn
self.id = job_id
#: The job's table, when known (refresh_column / MV refresh). Lets the
#: server resolve this job with an O(1) single-node read; without it the
#: lookup scans the database's active jobs (still correct).
self.table = table
self._created = time.monotonic()
self._seen = False
def _matches(self, listed_id: str) -> bool:
return _job_id_matches(self.id, listed_id)
def _job(self):
for j in self.conn.list_jobs():
if self._matches(j.job_id):
return j
return None
# Poll by id (one job), not list_jobs (every active job): the server
# matches the submission/manifest id and reads just this table's node.
return self.conn.get_job(self.id, self.table)
def status(self) -> str:
"""pending / running / cancelling / stale, or 'finished' once the
@@ -643,7 +645,7 @@ class AsyncMaterializedView:
A no-op when the view was created with no data."""
if self.job_id is None:
return "finished"
return await AsyncJobHandle(self.conn, self.job_id).wait(
return await AsyncJobHandle(self.conn, self.job_id, table=self.name).wait(
timeout=timeout, poll=poll
)
@@ -655,7 +657,7 @@ class AsyncMaterializedView:
(indexes are preserved and reindexed by the distributed indexer).
"""
job_id = await self.conn._refresh_materialized_view(self.name, full=full)
return AsyncJobHandle(self.conn, job_id)
return AsyncJobHandle(self.conn, job_id, table=self.name)
async def explain_refresh(self, full: bool = False):
return await self.conn.explain_refresh_materialized_view(self.name, full=full)
@@ -678,17 +680,17 @@ class AsyncJobHandle:
GRACE_SECONDS = 20.0
def __init__(self, conn, job_id: str):
def __init__(self, conn, job_id: str, table: "str | None" = None):
self.conn = conn
self.id = job_id
#: See JobHandle.table -- enables an O(1) by-id lookup when known.
self.table = table
self._created = time.monotonic()
self._seen = False
async def _job(self):
for j in await self.conn.list_jobs():
if _job_id_matches(self.id, j.job_id):
return j
return None
# Poll by id, not list_jobs (see JobHandle._job).
return await self.conn.get_job(self.id, self.table)
async def status(self) -> str:
job = await self._job()

View File

@@ -582,6 +582,35 @@ impl Connection {
})
}
#[pyo3(signature = (job_id, table=None))]
pub fn get_job(
self_: PyRef<'_, Self>,
job_id: String,
table: Option<String>,
) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.get_inner()?.clone();
future_into_py(self_.py(), async move {
let job = inner
.get_job(&job_id, table.as_deref())
.await
.infer_error()?;
Ok(job.map(|j| JobInfo {
table: j.table,
job_id: j.job_id,
job_type: j.job_type,
state: j.state,
column: j.column,
age_seconds: j.age_seconds,
command: j.command,
units_done: j.units_done,
units_total: j.units_total,
committed: j.committed,
rows_skipped: j.rows_skipped,
error: j.error,
}))
})
}
#[pyo3(signature = (cur_name, new_name, cur_namespace_path=None, new_namespace_path=None))]
pub fn rename_table(
self_: PyRef<'_, Self>,

View File

@@ -572,6 +572,14 @@ impl Connection {
self.internal.cancel_job(job_id).await
}
/// Look up a single server-side job by id -- the `wait()`/status poll path.
/// `table_hint` (the job's table) enables an O(1) server-side lookup; `None`
/// scans the database's active jobs. A `None` result means unknown / not
/// active.
pub async fn get_job(&self, job_id: &str, table_hint: Option<&str>) -> Result<Option<JobInfo>> {
self.internal.get_job(job_id, table_hint).await
}
/// Rename a table in the database.
///
/// This is only supported in LanceDB Cloud.

View File

@@ -480,6 +480,12 @@ pub trait Database:
async fn cancel_job(&self, _job_id: &str) -> Result<bool> {
not_supported("cancel_job")
}
/// Point-access for a single job by id -- the `wait()`/status poll path.
/// `table_hint` (the job's table, which `wait()` callers know) enables an
/// O(1) server-side lookup. `None` if the job is unknown or not active.
async fn get_job(&self, _job_id: &str, _table_hint: Option<&str>) -> Result<Option<JobInfo>> {
not_supported("get_job")
}
/// Open a table in the database
async fn open_table(&self, request: OpenTableRequest) -> Result<Arc<dyn BaseTable>>;

View File

@@ -163,11 +163,36 @@ struct RemoteListJobsResponse {
jobs: Vec<RemoteJobEntry>,
}
#[derive(serde::Deserialize)]
struct RemoteGetJobResponse {
#[serde(default)]
job: Option<RemoteJobEntry>,
}
#[derive(serde::Deserialize)]
struct RemoteCancelJobResponse {
cancelled: bool,
}
impl From<RemoteJobEntry> for JobInfo {
fn from(j: RemoteJobEntry) -> Self {
JobInfo {
table: j.table,
job_id: j.job_id,
job_type: j.job_type,
state: j.state,
column: j.column,
age_seconds: j.age_seconds,
command: j.command,
units_done: j.units_done,
units_total: j.units_total,
committed: j.committed,
rows_skipped: j.rows_skipped,
error: j.error,
}
}
}
// Request structure for the remote clone table API
#[derive(serde::Serialize)]
struct RemoteCloneTableRequest {
@@ -949,24 +974,20 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
let (request_id, rsp) = self.client.send(req).await?;
let rsp = self.client.check_response(&request_id, rsp).await?;
let body: RemoteListJobsResponse = rsp.json().await.err_to_http(request_id)?;
Ok(body
.jobs
.into_iter()
.map(|j| JobInfo {
table: j.table,
job_id: j.job_id,
job_type: j.job_type,
state: j.state,
column: j.column,
age_seconds: j.age_seconds,
command: j.command,
units_done: j.units_done,
units_total: j.units_total,
committed: j.committed,
rows_skipped: j.rows_skipped,
error: j.error,
})
.collect())
Ok(body.jobs.into_iter().map(JobInfo::from).collect())
}
async fn get_job(&self, job_id: &str, table: Option<&str>) -> Result<Option<JobInfo>> {
// Point-access poll path: GET /v1/job/{id}, with the table as the O(1)
// hint when known. `query` handles URL-encoding the table name.
let mut req = self.client.get(&format!("/v1/job/{job_id}"));
if let Some(t) = table {
req = req.query(&[("table", t)]);
}
let (request_id, rsp) = self.client.send(req).await?;
let rsp = self.client.check_response(&request_id, rsp).await?;
let body: RemoteGetJobResponse = rsp.json().await.err_to_http(request_id)?;
Ok(body.job.map(JobInfo::from))
}
async fn cancel_job(&self, job_id: &str) -> Result<bool> {