From 2b41fce033b8f67c514a1ae6205276e5153c53ba Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Sat, 13 Jun 2026 07:09:45 -0700 Subject: [PATCH] feat: cancel_job over REST (Database::cancel_job + remote impl + pyo3 + python) Exposes the existing server-side CANCEL JOB (CoordinatorCatalog::cancel_job) as a REST-backed SDK method: Database trait default NotSupported, RemoteDatabase POSTs /v1/job/{id}/cancel, pyo3 binding, sync+async python wrappers. Best-effort: a missing job returns false, not an error. Mock-HTTP unit test in test_derived_compute_routes. Co-Authored-By: Claude Opus 4.8 (1M context) --- python/python/lancedb/db.py | 17 +++++++++++++++++ python/src/connection.rs | 7 +++++++ rust/lancedb/src/connection.rs | 6 ++++++ rust/lancedb/src/database.rs | 6 ++++++ rust/lancedb/src/remote/db.rs | 34 ++++++++++++++++++++++++++++++++++ 5 files changed, 70 insertions(+) diff --git a/python/python/lancedb/db.py b/python/python/lancedb/db.py index 640a6c9ff..c3c683319 100644 --- a/python/python/lancedb/db.py +++ b/python/python/lancedb/db.py @@ -658,6 +658,15 @@ class DBConnection(EnforceOverrides): """List inflight server-side jobs across the database's tables.""" return LOOP.run(self._conn.list_jobs()) + def cancel_job(self, job_id: str) -> bool: + """Cancel an inflight server-side job by id (CANCEL JOB). + + Returns True if a matching inflight job was found and flagged for + cancellation, False if none was inflight (already finished or + unknown id) -- cancellation is best-effort. + """ + return LOOP.run(self._conn.cancel_job(job_id)) + class LanceDBConnection(DBConnection): """ A connection to a LanceDB database. @@ -1951,6 +1960,14 @@ class AsyncConnection(object): """List inflight server-side jobs across the database's tables.""" return await self._inner.list_jobs() + async def cancel_job(self, job_id: str) -> bool: + """Cancel an inflight server-side job by id (CANCEL JOB). + + Returns True if a matching inflight job was found and flagged for + cancellation, False otherwise (best-effort). + """ + return await self._inner.cancel_job(job_id) + async def rename_table( self, cur_name: str, diff --git a/python/src/connection.rs b/python/src/connection.rs index b32ccfb2e..42bbdfd7e 100644 --- a/python/src/connection.rs +++ b/python/src/connection.rs @@ -510,6 +510,13 @@ impl Connection { }) } + pub fn cancel_job(self_: PyRef<'_, Self>, job_id: String) -> PyResult> { + let inner = self_.get_inner()?.clone(); + future_into_py(self_.py(), async move { + inner.cancel_job(&job_id).await.infer_error() + }) + } + #[pyo3(signature = (cur_name, new_name, cur_namespace_path=None, new_namespace_path=None))] pub fn rename_table( self_: PyRef<'_, Self>, diff --git a/rust/lancedb/src/connection.rs b/rust/lancedb/src/connection.rs index 218643284..7af407d74 100644 --- a/rust/lancedb/src/connection.rs +++ b/rust/lancedb/src/connection.rs @@ -547,6 +547,12 @@ impl Connection { self.internal.list_jobs().await } + /// Cancel an inflight server-side job by id. Returns true if a + /// matching inflight job was flagged for cancellation. + pub async fn cancel_job(&self, job_id: &str) -> Result { + self.internal.cancel_job(job_id).await + } + /// Rename a table in the database. /// /// This is only supported in LanceDB Cloud. diff --git a/rust/lancedb/src/database.rs b/rust/lancedb/src/database.rs index d6f788945..2d20bb283 100644 --- a/rust/lancedb/src/database.rs +++ b/rust/lancedb/src/database.rs @@ -418,6 +418,12 @@ pub trait Database: async fn list_jobs(&self) -> Result> { not_supported("list_jobs") } + /// Cancel an inflight server-side job by id. Returns true if a + /// matching inflight job was found and flagged for cancellation, + /// false if none was inflight (best-effort, like SQL `CANCEL JOB`). + async fn cancel_job(&self, _job_id: &str) -> Result { + not_supported("cancel_job") + } /// Open a table in the database async fn open_table(&self, request: OpenTableRequest) -> Result>; diff --git a/rust/lancedb/src/remote/db.rs b/rust/lancedb/src/remote/db.rs index 15bbeb1b5..c2234b663 100644 --- a/rust/lancedb/src/remote/db.rs +++ b/rust/lancedb/src/remote/db.rs @@ -139,6 +139,11 @@ struct RemoteListJobsResponse { jobs: Vec, } +#[derive(serde::Deserialize)] +struct RemoteCancelJobResponse { + cancelled: bool, +} + // Request structure for the remote clone table API #[derive(serde::Serialize)] struct RemoteCloneTableRequest { @@ -890,6 +895,14 @@ impl Database for RemoteDatabase { .collect()) } + async fn cancel_job(&self, job_id: &str) -> Result { + let req = self.client.post(&format!("/v1/job/{}/cancel", job_id)); + let (request_id, rsp) = self.client.send(req).await?; + let rsp = self.client.check_response(&request_id, rsp).await?; + let body: RemoteCancelJobResponse = rsp.json().await.err_to_http(request_id)?; + Ok(body.cancelled) + } + async fn open_table(&self, request: OpenTableRequest) -> Result> { let identifier = build_table_identifier( &request.name, @@ -1977,6 +1990,27 @@ mod tests { assert_eq!(jobs.len(), 1); assert_eq!(jobs[0].state, "running"); assert_eq!(jobs[0].units_total, Some(2)); + + // cancel_job + let conn = Connection::new_with_handler(|request| { + assert_eq!(request.method(), &reqwest::Method::POST); + assert_eq!(request.url().path(), "/v1/job/j-3/cancel"); + http::Response::builder() + .status(200) + .body(r#"{"cancelled":true}"#) + .unwrap() + }); + assert!(conn.cancel_job("j-3").await.unwrap()); + + // cancel_job: no such inflight job -> false, not an error + let conn = Connection::new_with_handler(|request| { + assert_eq!(request.url().path(), "/v1/job/gone/cancel"); + http::Response::builder() + .status(200) + .body(r#"{"cancelled":false}"#) + .unwrap() + }); + assert!(!conn.cancel_job("gone").await.unwrap()); } #[tokio::test]