From 3f44f93e92963ff61e7fdab86609cb50e8e67cdf Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Wed, 17 Jun 2026 14:08:03 -0700 Subject: [PATCH] job wait(): poll by id via get_job (point access) instead of list_jobs JobHandle/AsyncJobHandle now poll conn.get_job(id, table) -- one job -- instead of list_jobs() + client-side filter over every active job. The job's table is threaded in from refresh_column / MV refresh as an O(1) lookup hint. Plumbs get_job through the Database trait (default not_supported), RemoteDatabase (GET /v1/job/{id}?table=...), the Connection wrapper, and the pyo3 binding + db.py. Co-Authored-By: Claude Opus 4.8 (1M context) --- python/python/lancedb/db.py | 15 +++++++++ python/python/lancedb/table.py | 2 +- python/python/lancedb/udf.py | 36 +++++++++++---------- python/src/connection.rs | 29 +++++++++++++++++ rust/lancedb/src/connection.rs | 8 +++++ rust/lancedb/src/database.rs | 6 ++++ rust/lancedb/src/remote/db.rs | 57 +++++++++++++++++++++++----------- 7 files changed, 117 insertions(+), 36 deletions(-) diff --git a/python/python/lancedb/db.py b/python/python/lancedb/db.py index d7d937164..034e42566 100644 --- a/python/python/lancedb/db.py +++ b/python/python/lancedb/db.py @@ -784,6 +784,15 @@ class DBConnection(EnforceOverrides): """List inflight server-side jobs across the database's tables.""" return LOOP.run(self._conn.list_jobs()) + def get_job(self, job_id: str, table: "str | None" = None): + """Look up one server-side job by id (the wait()/status poll path). + + Passing ``table`` (the job's table) lets the server answer with an O(1) + single-node read instead of scanning the database's active jobs. + Returns the job's status, or None if it's unknown or no longer active. + """ + return LOOP.run(self._conn.get_job(job_id, table)) + def cancel_job(self, job_id: str) -> bool: """Cancel an inflight server-side job by id (CANCEL JOB). @@ -2182,6 +2191,12 @@ class AsyncConnection(object): """List inflight server-side jobs across the database's tables.""" return await self._inner.list_jobs() + async def get_job(self, job_id: str, table: "str | None" = None): + """Look up one server-side job by id (the wait()/status poll path). + ``table`` (the job's table) enables an O(1) server-side lookup. + Returns the job's status, or None if unknown / no longer active.""" + return await self._inner.get_job(job_id, table) + async def cancel_job(self, job_id: str) -> bool: """Cancel an inflight server-side job by id (CANCEL JOB). diff --git a/python/python/lancedb/table.py b/python/python/lancedb/table.py index b6bd6f370..e00e32686 100644 --- a/python/python/lancedb/table.py +++ b/python/python/lancedb/table.py @@ -3842,7 +3842,7 @@ class LanceTable(Table): priority=priority, ) ) - return JobHandle(self._conn, job_id) + return JobHandle(self._conn, job_id, table=self.name) def alter_columns( self, *alterations: Iterable[Dict[str, str]] diff --git a/python/python/lancedb/udf.py b/python/python/lancedb/udf.py index 00a35f6b9..ef46094a5 100644 --- a/python/python/lancedb/udf.py +++ b/python/python/lancedb/udf.py @@ -510,7 +510,9 @@ class MaterializedView: A no-op when the view was created with no data.""" if self.job_id is None: return "finished" - return JobHandle(self.conn, self.job_id).wait(timeout=timeout, poll=poll) + return JobHandle(self.conn, self.job_id, table=self.name).wait( + timeout=timeout, poll=poll + ) def refresh(self, full: bool = False) -> "JobHandle": """Refresh the materialized view; returns a `JobHandle` to wait on, @@ -521,7 +523,7 @@ class MaterializedView: the view's indexes -- they are reindexed by the distributed indexer. """ job_id = self.conn._refresh_materialized_view(self.name, full=full) - return JobHandle(self.conn, job_id) + return JobHandle(self.conn, job_id, table=self.name) def explain_refresh(self, full: bool = False): """Plan a refresh without running it (EXPLAIN REFRESH).""" @@ -574,20 +576,20 @@ class JobHandle: #: -> agent cycle -> manifest write is async). GRACE_SECONDS = 20.0 - def __init__(self, conn, job_id: str): + def __init__(self, conn, job_id: str, table: "str | None" = None): self.conn = conn self.id = job_id + #: The job's table, when known (refresh_column / MV refresh). Lets the + #: server resolve this job with an O(1) single-node read; without it the + #: lookup scans the database's active jobs (still correct). + self.table = table self._created = time.monotonic() self._seen = False - def _matches(self, listed_id: str) -> bool: - return _job_id_matches(self.id, listed_id) - def _job(self): - for j in self.conn.list_jobs(): - if self._matches(j.job_id): - return j - return None + # Poll by id (one job), not list_jobs (every active job): the server + # matches the submission/manifest id and reads just this table's node. + return self.conn.get_job(self.id, self.table) def status(self) -> str: """pending / running / cancelling / stale, or 'finished' once the @@ -643,7 +645,7 @@ class AsyncMaterializedView: A no-op when the view was created with no data.""" if self.job_id is None: return "finished" - return await AsyncJobHandle(self.conn, self.job_id).wait( + return await AsyncJobHandle(self.conn, self.job_id, table=self.name).wait( timeout=timeout, poll=poll ) @@ -655,7 +657,7 @@ class AsyncMaterializedView: (indexes are preserved and reindexed by the distributed indexer). """ job_id = await self.conn._refresh_materialized_view(self.name, full=full) - return AsyncJobHandle(self.conn, job_id) + return AsyncJobHandle(self.conn, job_id, table=self.name) async def explain_refresh(self, full: bool = False): return await self.conn.explain_refresh_materialized_view(self.name, full=full) @@ -678,17 +680,17 @@ class AsyncJobHandle: GRACE_SECONDS = 20.0 - def __init__(self, conn, job_id: str): + def __init__(self, conn, job_id: str, table: "str | None" = None): self.conn = conn self.id = job_id + #: See JobHandle.table -- enables an O(1) by-id lookup when known. + self.table = table self._created = time.monotonic() self._seen = False async def _job(self): - for j in await self.conn.list_jobs(): - if _job_id_matches(self.id, j.job_id): - return j - return None + # Poll by id, not list_jobs (see JobHandle._job). + return await self.conn.get_job(self.id, self.table) async def status(self) -> str: job = await self._job() diff --git a/python/src/connection.rs b/python/src/connection.rs index 2335c4f6e..b8407a705 100644 --- a/python/src/connection.rs +++ b/python/src/connection.rs @@ -582,6 +582,35 @@ impl Connection { }) } + #[pyo3(signature = (job_id, table=None))] + pub fn get_job( + self_: PyRef<'_, Self>, + job_id: String, + table: Option, + ) -> PyResult> { + let inner = self_.get_inner()?.clone(); + future_into_py(self_.py(), async move { + let job = inner + .get_job(&job_id, table.as_deref()) + .await + .infer_error()?; + Ok(job.map(|j| JobInfo { + table: j.table, + job_id: j.job_id, + job_type: j.job_type, + state: j.state, + column: j.column, + age_seconds: j.age_seconds, + command: j.command, + units_done: j.units_done, + units_total: j.units_total, + committed: j.committed, + rows_skipped: j.rows_skipped, + error: j.error, + })) + }) + } + #[pyo3(signature = (cur_name, new_name, cur_namespace_path=None, new_namespace_path=None))] pub fn rename_table( self_: PyRef<'_, Self>, diff --git a/rust/lancedb/src/connection.rs b/rust/lancedb/src/connection.rs index 58128c023..84b8d1776 100644 --- a/rust/lancedb/src/connection.rs +++ b/rust/lancedb/src/connection.rs @@ -572,6 +572,14 @@ impl Connection { self.internal.cancel_job(job_id).await } + /// Look up a single server-side job by id -- the `wait()`/status poll path. + /// `table_hint` (the job's table) enables an O(1) server-side lookup; `None` + /// scans the database's active jobs. A `None` result means unknown / not + /// active. + pub async fn get_job(&self, job_id: &str, table_hint: Option<&str>) -> Result> { + self.internal.get_job(job_id, table_hint).await + } + /// Rename a table in the database. /// /// This is only supported in LanceDB Cloud. diff --git a/rust/lancedb/src/database.rs b/rust/lancedb/src/database.rs index 6907aefd3..151c3e95c 100644 --- a/rust/lancedb/src/database.rs +++ b/rust/lancedb/src/database.rs @@ -480,6 +480,12 @@ pub trait Database: async fn cancel_job(&self, _job_id: &str) -> Result { not_supported("cancel_job") } + /// Point-access for a single job by id -- the `wait()`/status poll path. + /// `table_hint` (the job's table, which `wait()` callers know) enables an + /// O(1) server-side lookup. `None` if the job is unknown or not active. + async fn get_job(&self, _job_id: &str, _table_hint: Option<&str>) -> Result> { + not_supported("get_job") + } /// Open a table in the database async fn open_table(&self, request: OpenTableRequest) -> Result>; diff --git a/rust/lancedb/src/remote/db.rs b/rust/lancedb/src/remote/db.rs index ce9760391..adab42447 100644 --- a/rust/lancedb/src/remote/db.rs +++ b/rust/lancedb/src/remote/db.rs @@ -163,11 +163,36 @@ struct RemoteListJobsResponse { jobs: Vec, } +#[derive(serde::Deserialize)] +struct RemoteGetJobResponse { + #[serde(default)] + job: Option, +} + #[derive(serde::Deserialize)] struct RemoteCancelJobResponse { cancelled: bool, } +impl From for JobInfo { + fn from(j: RemoteJobEntry) -> Self { + JobInfo { + table: j.table, + job_id: j.job_id, + job_type: j.job_type, + state: j.state, + column: j.column, + age_seconds: j.age_seconds, + command: j.command, + units_done: j.units_done, + units_total: j.units_total, + committed: j.committed, + rows_skipped: j.rows_skipped, + error: j.error, + } + } +} + // Request structure for the remote clone table API #[derive(serde::Serialize)] struct RemoteCloneTableRequest { @@ -949,24 +974,20 @@ impl Database for RemoteDatabase { let (request_id, rsp) = self.client.send(req).await?; let rsp = self.client.check_response(&request_id, rsp).await?; let body: RemoteListJobsResponse = rsp.json().await.err_to_http(request_id)?; - Ok(body - .jobs - .into_iter() - .map(|j| JobInfo { - table: j.table, - job_id: j.job_id, - job_type: j.job_type, - state: j.state, - column: j.column, - age_seconds: j.age_seconds, - command: j.command, - units_done: j.units_done, - units_total: j.units_total, - committed: j.committed, - rows_skipped: j.rows_skipped, - error: j.error, - }) - .collect()) + Ok(body.jobs.into_iter().map(JobInfo::from).collect()) + } + + async fn get_job(&self, job_id: &str, table: Option<&str>) -> Result> { + // Point-access poll path: GET /v1/job/{id}, with the table as the O(1) + // hint when known. `query` handles URL-encoding the table name. + let mut req = self.client.get(&format!("/v1/job/{job_id}")); + if let Some(t) = table { + req = req.query(&[("table", t)]); + } + let (request_id, rsp) = self.client.send(req).await?; + let rsp = self.client.check_response(&request_id, rsp).await?; + let body: RemoteGetJobResponse = rsp.json().await.err_to_http(request_id)?; + Ok(body.job.map(JobInfo::from)) } async fn cancel_job(&self, job_id: &str) -> Result {