mirror of
https://github.com/lancedb/lancedb.git
synced 2026-07-03 11:00:40 +00:00
feat: cancel_job over REST (Database::cancel_job + remote impl + pyo3 + python)
Exposes the existing server-side CANCEL JOB (CoordinatorCatalog::cancel_job)
as a REST-backed SDK method: Database trait default NotSupported,
RemoteDatabase POSTs /v1/job/{id}/cancel, pyo3 binding, sync+async python
wrappers. Best-effort: a missing job returns false, not an error. Mock-HTTP
unit test in test_derived_compute_routes.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -658,6 +658,15 @@ class DBConnection(EnforceOverrides):
|
||||
"""List inflight server-side jobs across the database's tables."""
|
||||
return LOOP.run(self._conn.list_jobs())
|
||||
|
||||
def cancel_job(self, job_id: str) -> bool:
|
||||
"""Cancel an inflight server-side job by id (CANCEL JOB).
|
||||
|
||||
Returns True if a matching inflight job was found and flagged for
|
||||
cancellation, False if none was inflight (already finished or
|
||||
unknown id) -- cancellation is best-effort.
|
||||
"""
|
||||
return LOOP.run(self._conn.cancel_job(job_id))
|
||||
|
||||
class LanceDBConnection(DBConnection):
|
||||
"""
|
||||
A connection to a LanceDB database.
|
||||
@@ -1951,6 +1960,14 @@ class AsyncConnection(object):
|
||||
"""List inflight server-side jobs across the database's tables."""
|
||||
return await self._inner.list_jobs()
|
||||
|
||||
async def cancel_job(self, job_id: str) -> bool:
|
||||
"""Cancel an inflight server-side job by id (CANCEL JOB).
|
||||
|
||||
Returns True if a matching inflight job was found and flagged for
|
||||
cancellation, False otherwise (best-effort).
|
||||
"""
|
||||
return await self._inner.cancel_job(job_id)
|
||||
|
||||
async def rename_table(
|
||||
self,
|
||||
cur_name: str,
|
||||
|
||||
@@ -510,6 +510,13 @@ impl Connection {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn cancel_job(self_: PyRef<'_, Self>, job_id: String) -> PyResult<Bound<'_, PyAny>> {
|
||||
let inner = self_.get_inner()?.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
inner.cancel_job(&job_id).await.infer_error()
|
||||
})
|
||||
}
|
||||
|
||||
#[pyo3(signature = (cur_name, new_name, cur_namespace_path=None, new_namespace_path=None))]
|
||||
pub fn rename_table(
|
||||
self_: PyRef<'_, Self>,
|
||||
|
||||
@@ -547,6 +547,12 @@ impl Connection {
|
||||
self.internal.list_jobs().await
|
||||
}
|
||||
|
||||
/// Cancel an inflight server-side job by id. Returns true if a
|
||||
/// matching inflight job was flagged for cancellation.
|
||||
pub async fn cancel_job(&self, job_id: &str) -> Result<bool> {
|
||||
self.internal.cancel_job(job_id).await
|
||||
}
|
||||
|
||||
/// Rename a table in the database.
|
||||
///
|
||||
/// This is only supported in LanceDB Cloud.
|
||||
|
||||
@@ -418,6 +418,12 @@ pub trait Database:
|
||||
async fn list_jobs(&self) -> Result<Vec<JobInfo>> {
|
||||
not_supported("list_jobs")
|
||||
}
|
||||
/// Cancel an inflight server-side job by id. Returns true if a
|
||||
/// matching inflight job was found and flagged for cancellation,
|
||||
/// false if none was inflight (best-effort, like SQL `CANCEL JOB`).
|
||||
async fn cancel_job(&self, _job_id: &str) -> Result<bool> {
|
||||
not_supported("cancel_job")
|
||||
}
|
||||
|
||||
/// Open a table in the database
|
||||
async fn open_table(&self, request: OpenTableRequest) -> Result<Arc<dyn BaseTable>>;
|
||||
|
||||
@@ -139,6 +139,11 @@ struct RemoteListJobsResponse {
|
||||
jobs: Vec<RemoteJobEntry>,
|
||||
}
|
||||
|
||||
#[derive(serde::Deserialize)]
|
||||
struct RemoteCancelJobResponse {
|
||||
cancelled: bool,
|
||||
}
|
||||
|
||||
// Request structure for the remote clone table API
|
||||
#[derive(serde::Serialize)]
|
||||
struct RemoteCloneTableRequest {
|
||||
@@ -890,6 +895,14 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
|
||||
.collect())
|
||||
}
|
||||
|
||||
async fn cancel_job(&self, job_id: &str) -> Result<bool> {
|
||||
let req = self.client.post(&format!("/v1/job/{}/cancel", job_id));
|
||||
let (request_id, rsp) = self.client.send(req).await?;
|
||||
let rsp = self.client.check_response(&request_id, rsp).await?;
|
||||
let body: RemoteCancelJobResponse = rsp.json().await.err_to_http(request_id)?;
|
||||
Ok(body.cancelled)
|
||||
}
|
||||
|
||||
async fn open_table(&self, request: OpenTableRequest) -> Result<Arc<dyn BaseTable>> {
|
||||
let identifier = build_table_identifier(
|
||||
&request.name,
|
||||
@@ -1977,6 +1990,27 @@ mod tests {
|
||||
assert_eq!(jobs.len(), 1);
|
||||
assert_eq!(jobs[0].state, "running");
|
||||
assert_eq!(jobs[0].units_total, Some(2));
|
||||
|
||||
// cancel_job
|
||||
let conn = Connection::new_with_handler(|request| {
|
||||
assert_eq!(request.method(), &reqwest::Method::POST);
|
||||
assert_eq!(request.url().path(), "/v1/job/j-3/cancel");
|
||||
http::Response::builder()
|
||||
.status(200)
|
||||
.body(r#"{"cancelled":true}"#)
|
||||
.unwrap()
|
||||
});
|
||||
assert!(conn.cancel_job("j-3").await.unwrap());
|
||||
|
||||
// cancel_job: no such inflight job -> false, not an error
|
||||
let conn = Connection::new_with_handler(|request| {
|
||||
assert_eq!(request.url().path(), "/v1/job/gone/cancel");
|
||||
http::Response::builder()
|
||||
.status(200)
|
||||
.body(r#"{"cancelled":false}"#)
|
||||
.unwrap()
|
||||
});
|
||||
assert!(!conn.cancel_job("gone").await.unwrap());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
Reference in New Issue
Block a user