diff --git a/python/python/lancedb/db.py b/python/python/lancedb/db.py index 034e42566..72c141dea 100644 --- a/python/python/lancedb/db.py +++ b/python/python/lancedb/db.py @@ -802,6 +802,20 @@ class DBConnection(EnforceOverrides): """ return LOOP.run(self._conn.cancel_job(job_id)) + def job_history(self, job_id: "str | None" = None): + """Durable history of completed server-side jobs (SHOW JOB HISTORY). + + Pass ``job_id`` to narrow to a single job. Unlike :meth:`list_jobs` + (live, inflight) these are the terminal records. + """ + return LOOP.run(self._conn.job_history(job_id)) + + def errors(self, job_id: "str | None" = None, table: "str | None" = None): + """Per-row UDF errors recorded by ``error_policy=skip`` (SHOW ERRORS), + optionally filtered by ``job_id`` and/or ``table``. + """ + return LOOP.run(self._conn.errors(job_id, table)) + class LanceDBConnection(DBConnection): """ @@ -2205,6 +2219,22 @@ class AsyncConnection(object): """ return await self._inner.cancel_job(job_id) + async def job_history(self, job_id: "str | None" = None): + """Durable history of completed server-side jobs (SHOW JOB HISTORY). + + Reads each table's durable job-history store. Pass ``job_id`` to narrow + to a single job. Unlike :meth:`list_jobs` (live, inflight) these are the + terminal records, with created/updated/completed timestamps. + """ + return await self._inner.job_history(job_id) + + async def errors(self, job_id: "str | None" = None, table: "str | None" = None): + """Per-row UDF errors recorded by ``error_policy=skip`` (SHOW ERRORS). + + Optionally filtered by ``job_id`` and/or ``table``. + """ + return await self._inner.errors(job_id, table) + async def rename_table( self, cur_name: str, diff --git a/python/src/connection.rs b/python/src/connection.rs index b8407a705..fde3f724b 100644 --- a/python/src/connection.rs +++ b/python/src/connection.rs @@ -70,6 +70,39 @@ pub struct JobInfo { pub error: Option, } +/// One durable, completed/terminal server-side job record (SHOW JOB HISTORY). +#[pyclass(get_all)] +#[derive(Clone)] +pub struct JobHistoryEntry { + pub table: String, + pub job_id: String, + pub job_type: String, + pub state: String, + pub column: Option, + pub created_ms: i64, + pub updated_ms: i64, + pub completed_ms: Option, + pub rows_processed: Option, + pub rows_skipped: Option, + pub error: Option, + pub events: Option, +} + +/// One per-row UDF error recorded by `error_policy=skip` (SHOW ERRORS). +#[pyclass(get_all)] +#[derive(Clone)] +pub struct JobErrorEntry { + pub job_id: String, + pub table: String, + pub column: String, + pub error_type: String, + pub error_message: String, + pub fragment_id: Option, + pub source_row_id: Option, + pub table_version: Option, + pub age_seconds: Option, +} + /// The plan a REFRESH MATERIALIZED VIEW would execute (EXPLAIN REFRESH). #[pyclass(get_all)] #[derive(Clone)] @@ -611,6 +644,63 @@ impl Connection { }) } + #[pyo3(signature = (job_id=None))] + pub fn job_history( + self_: PyRef<'_, Self>, + job_id: Option, + ) -> PyResult> { + let inner = self_.get_inner()?.clone(); + future_into_py(self_.py(), async move { + let rows = inner.job_history(job_id.as_deref()).await.infer_error()?; + Ok(rows + .into_iter() + .map(|r| JobHistoryEntry { + table: r.table, + job_id: r.job_id, + job_type: r.job_type, + state: r.state, + column: r.column, + created_ms: r.created_ms, + updated_ms: r.updated_ms, + completed_ms: r.completed_ms, + rows_processed: r.rows_processed, + rows_skipped: r.rows_skipped, + error: r.error, + events: r.events, + }) + .collect::>()) + }) + } + + #[pyo3(signature = (job_id=None, table=None))] + pub fn errors( + self_: PyRef<'_, Self>, + job_id: Option, + table: Option, + ) -> PyResult> { + let inner = self_.get_inner()?.clone(); + future_into_py(self_.py(), async move { + let rows = inner + .errors(job_id.as_deref(), table.as_deref()) + .await + .infer_error()?; + Ok(rows + .into_iter() + .map(|e| JobErrorEntry { + job_id: e.job_id, + table: e.table, + column: e.column, + error_type: e.error_type, + error_message: e.error_message, + fragment_id: e.fragment_id, + source_row_id: e.source_row_id, + table_version: e.table_version, + age_seconds: e.age_seconds, + }) + .collect::>()) + }) + } + #[pyo3(signature = (cur_name, new_name, cur_namespace_path=None, new_namespace_path=None))] pub fn rename_table( self_: PyRef<'_, Self>, diff --git a/python/src/lib.rs b/python/src/lib.rs index 7956e58b1..fbac8d415 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -44,6 +44,8 @@ pub fn _lancedb(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/rust/lancedb/src/connection.rs b/rust/lancedb/src/connection.rs index 84b8d1776..fba72cd7c 100644 --- a/rust/lancedb/src/connection.rs +++ b/rust/lancedb/src/connection.rs @@ -24,8 +24,9 @@ use crate::data::scannable::Scannable; use crate::database::listing::ListingDatabase; use crate::database::{ CloneTableRequest, CreateFunctionRequest, CreateMaterializedViewRequest, Database, - DatabaseOptions, FunctionInfo, JobInfo, MaterializedViewInfo, MvRefreshPlan, OpenTableRequest, - ReadConsistency, RefreshMaterializedViewRequest, TableLineageRequest, TableNamesRequest, + DatabaseOptions, FunctionInfo, JobErrorInfo, JobHistoryInfo, JobInfo, MaterializedViewInfo, + MvRefreshPlan, OpenTableRequest, ReadConsistency, RefreshMaterializedViewRequest, + TableLineageRequest, TableNamesRequest, }; use crate::embeddings::{EmbeddingRegistry, MemoryRegistry}; use crate::error::{Error, Result}; @@ -580,6 +581,22 @@ impl Connection { self.internal.get_job(job_id, table_hint).await } + /// Durable job history (SHOW JOB HISTORY) across the database's tables. + /// Pass `job_id` to narrow to a single job. + pub async fn job_history(&self, job_id: Option<&str>) -> Result> { + self.internal.job_history(job_id).await + } + + /// Per-row UDF errors (SHOW ERRORS) across the database's tables, optionally + /// filtered by `job_id` and/or `table`. + pub async fn errors( + &self, + job_id: Option<&str>, + table: Option<&str>, + ) -> Result> { + self.internal.errors(job_id, table).await + } + /// Rename a table in the database. /// /// This is only supported in LanceDB Cloud. diff --git a/rust/lancedb/src/database.rs b/rust/lancedb/src/database.rs index 151c3e95c..1bc12c4fc 100644 --- a/rust/lancedb/src/database.rs +++ b/rust/lancedb/src/database.rs @@ -341,6 +341,42 @@ pub struct JobInfo { pub error: Option, } +/// A row from `job_history`: one durable, completed/terminal server-side job +/// record (SHOW JOB HISTORY), read from a table's `_job_history` store. Unlike +/// `JobInfo` (live, inflight jobs) this carries created/updated/completed +/// timestamps and the lifecycle event log. +#[derive(Debug, Clone)] +pub struct JobHistoryInfo { + pub table: String, + pub job_id: String, + pub job_type: String, + pub state: String, + pub column: Option, + pub created_ms: i64, + pub updated_ms: i64, + pub completed_ms: Option, + pub rows_processed: Option, + pub rows_skipped: Option, + pub error: Option, + /// Newline-joined lifecycle event log, oldest first. + pub events: Option, +} + +/// A row from `errors`: one per-row UDF failure recorded by `error_policy=skip` +/// (SHOW ERRORS). +#[derive(Debug, Clone)] +pub struct JobErrorInfo { + pub job_id: String, + pub table: String, + pub column: String, + pub error_type: String, + pub error_message: String, + pub fragment_id: Option, + pub source_row_id: Option, + pub table_version: Option, + pub age_seconds: Option, +} + /// The plan a `REFRESH MATERIALIZED VIEW` would execute, as returned by /// `explain_refresh_materialized_view` (EXPLAIN REFRESH). No work is run. #[derive(Debug, Clone)] @@ -486,6 +522,20 @@ pub trait Database: async fn get_job(&self, _job_id: &str, _table_hint: Option<&str>) -> Result> { not_supported("get_job") } + /// Durable job history (SHOW JOB HISTORY) across the database's tables, + /// optionally narrowed to a single `job_id`. + async fn job_history(&self, _job_id: Option<&str>) -> Result> { + not_supported("job_history") + } + /// Per-row UDF errors (SHOW ERRORS) recorded by `error_policy=skip` across + /// the database's tables, optionally filtered by `job_id` and/or `table`. + async fn errors( + &self, + _job_id: Option<&str>, + _table: Option<&str>, + ) -> Result> { + not_supported("errors") + } /// Open a table in the database async fn open_table(&self, request: OpenTableRequest) -> Result>; diff --git a/rust/lancedb/src/remote/db.rs b/rust/lancedb/src/remote/db.rs index adab42447..727496149 100644 --- a/rust/lancedb/src/remote/db.rs +++ b/rust/lancedb/src/remote/db.rs @@ -20,9 +20,9 @@ use lance_namespace::models::{ use crate::Error; use crate::database::{ CloneTableRequest, CreateFunctionRequest, CreateMaterializedViewRequest, CreateTableMode, - CreateTableRequest, Database, DatabaseOptions, FunctionInfo, JobInfo, MaterializedViewInfo, - MvRefreshPlan, OpenTableRequest, ReadConsistency, RefreshMaterializedViewRequest, - TableLineageRequest, TableNamesRequest, + CreateTableRequest, Database, DatabaseOptions, FunctionInfo, JobErrorInfo, JobHistoryInfo, + JobInfo, MaterializedViewInfo, MvRefreshPlan, OpenTableRequest, ReadConsistency, + RefreshMaterializedViewRequest, TableLineageRequest, TableNamesRequest, }; use crate::error::Result; use crate::remote::util::stream_as_body; @@ -193,6 +193,90 @@ impl From for JobInfo { } } +#[derive(serde::Deserialize)] +struct RemoteJobHistoryEntry { + table: String, + job_id: String, + job_type: String, + state: String, + #[serde(default)] + column: Option, + created_ms: i64, + updated_ms: i64, + #[serde(default)] + completed_ms: Option, + #[serde(default)] + rows_processed: Option, + #[serde(default)] + rows_skipped: Option, + #[serde(default)] + error: Option, + #[serde(default)] + events: Option, +} + +#[derive(serde::Deserialize)] +struct RemoteJobHistoryResponse { + jobs: Vec, +} + +impl From for JobHistoryInfo { + fn from(j: RemoteJobHistoryEntry) -> Self { + JobHistoryInfo { + table: j.table, + job_id: j.job_id, + job_type: j.job_type, + state: j.state, + column: j.column, + created_ms: j.created_ms, + updated_ms: j.updated_ms, + completed_ms: j.completed_ms, + rows_processed: j.rows_processed, + rows_skipped: j.rows_skipped, + error: j.error, + events: j.events, + } + } +} + +#[derive(serde::Deserialize)] +struct RemoteErrorEntry { + job_id: String, + table: String, + column: String, + error_type: String, + error_message: String, + #[serde(default)] + fragment_id: Option, + #[serde(default)] + source_row_id: Option, + #[serde(default)] + table_version: Option, + #[serde(default)] + age_seconds: Option, +} + +#[derive(serde::Deserialize)] +struct RemoteErrorsResponse { + errors: Vec, +} + +impl From for JobErrorInfo { + fn from(e: RemoteErrorEntry) -> Self { + JobErrorInfo { + job_id: e.job_id, + table: e.table, + column: e.column, + error_type: e.error_type, + error_message: e.error_message, + fragment_id: e.fragment_id, + source_row_id: e.source_row_id, + table_version: e.table_version, + age_seconds: e.age_seconds, + } + } +} + // Request structure for the remote clone table API #[derive(serde::Serialize)] struct RemoteCloneTableRequest { @@ -998,6 +1082,31 @@ impl Database for RemoteDatabase { Ok(body.cancelled) } + async fn job_history(&self, job_id: Option<&str>) -> Result> { + let mut req = self.client.get("/v1/job/history"); + if let Some(j) = job_id { + req = req.query(&[("job", j)]); + } + let (request_id, rsp) = self.client.send(req).await?; + let rsp = self.client.check_response(&request_id, rsp).await?; + let body: RemoteJobHistoryResponse = rsp.json().await.err_to_http(request_id)?; + Ok(body.jobs.into_iter().map(JobHistoryInfo::from).collect()) + } + + async fn errors(&self, job_id: Option<&str>, table: Option<&str>) -> Result> { + let mut req = self.client.get("/v1/job/errors"); + if let Some(j) = job_id { + req = req.query(&[("job", j)]); + } + if let Some(t) = table { + req = req.query(&[("table", t)]); + } + let (request_id, rsp) = self.client.send(req).await?; + let rsp = self.client.check_response(&request_id, rsp).await?; + let body: RemoteErrorsResponse = rsp.json().await.err_to_http(request_id)?; + Ok(body.errors.into_iter().map(JobErrorInfo::from).collect()) + } + async fn open_table(&self, request: OpenTableRequest) -> Result> { let identifier = build_table_identifier( &request.name, @@ -2106,6 +2215,52 @@ mod tests { .unwrap() }); assert!(!conn.cancel_job("gone").await.unwrap()); + + // job_history: GET /v1/job/history, no filter + let conn = Connection::new_with_handler(|request| { + assert_eq!(request.method(), &reqwest::Method::GET); + assert_eq!(request.url().path(), "/v1/job/history"); + assert!(request.url().query().is_none()); + http::Response::builder() + .status(200) + .body( + r#"{"jobs":[{"table":"docs","job_id":"j-1","job_type":"udf_virtual_column_backfill","state":"done","column":"vec","created_ms":1000,"updated_ms":2000,"completed_ms":2000,"rows_processed":42,"rows_skipped":3,"error":null,"events":"created\ndone"}]}"#, + ) + .unwrap() + }); + let hist = conn.job_history(None).await.unwrap(); + assert_eq!(hist.len(), 1); + assert_eq!(hist[0].state, "done"); + assert_eq!(hist[0].rows_processed, Some(42)); + assert_eq!(hist[0].events.as_deref(), Some("created\ndone")); + + // job_history: ?job= narrows to one job + let conn = Connection::new_with_handler(|request| { + assert_eq!(request.url().path(), "/v1/job/history"); + assert_eq!(request.url().query(), Some("job=j-1")); + http::Response::builder() + .status(200) + .body(r#"{"jobs":[]}"#) + .unwrap() + }); + assert!(conn.job_history(Some("j-1")).await.unwrap().is_empty()); + + // errors: GET /v1/job/errors with job + table filters + let conn = Connection::new_with_handler(|request| { + assert_eq!(request.method(), &reqwest::Method::GET); + assert_eq!(request.url().path(), "/v1/job/errors"); + assert_eq!(request.url().query(), Some("job=j-1&table=docs")); + http::Response::builder() + .status(200) + .body( + r#"{"errors":[{"job_id":"j-1","table":"docs","column":"vec","error_type":"ValueError","error_message":"boom","fragment_id":0,"source_row_id":42,"table_version":7,"age_seconds":5}]}"#, + ) + .unwrap() + }); + let errs = conn.errors(Some("j-1"), Some("docs")).await.unwrap(); + assert_eq!(errs.len(), 1); + assert_eq!(errs[0].error_type, "ValueError"); + assert_eq!(errs[0].source_row_id, Some(42)); } #[tokio::test]