client: job_history() and errors() over REST (SHOW JOB HISTORY / SHOW ERRORS)

The client exposed list_jobs/get_job/cancel_job but not the durable job
history or the per-row UDF errors, so those SQL/REST surfaces had no SDK
equivalent. Add job_history(job_id=None) and errors(job_id=None, table=None)
through every layer:

- Database trait + Connection API (JobHistoryInfo, JobErrorInfo types).
- Remote REST impl: GET /v1/job/history (?job=) and GET /v1/job/errors
  (?job=&table=), with serde response types + From mappings.
- pyo3 bindings + pyclasses JobHistoryEntry / JobErrorEntry, registered.
- Python sync + async db.py wrappers.

Mirrors the existing list_jobs plumbing exactly. Remote-handler test asserts
the GET paths, query filters, and response parsing for both.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Wyatt Alt
2026-06-19 09:50:16 -07:00
committed by Jack Ye
parent 3f44f93e92
commit 142ac835d3
6 changed files with 349 additions and 5 deletions

View File

@@ -802,6 +802,20 @@ class DBConnection(EnforceOverrides):
"""
return LOOP.run(self._conn.cancel_job(job_id))
def job_history(self, job_id: "str | None" = None):
"""Durable history of completed server-side jobs (SHOW JOB HISTORY).
Pass ``job_id`` to narrow to a single job. Unlike :meth:`list_jobs`
(live, inflight) these are the terminal records.
"""
return LOOP.run(self._conn.job_history(job_id))
def errors(self, job_id: "str | None" = None, table: "str | None" = None):
"""Per-row UDF errors recorded by ``error_policy=skip`` (SHOW ERRORS),
optionally filtered by ``job_id`` and/or ``table``.
"""
return LOOP.run(self._conn.errors(job_id, table))
class LanceDBConnection(DBConnection):
"""
@@ -2205,6 +2219,22 @@ class AsyncConnection(object):
"""
return await self._inner.cancel_job(job_id)
async def job_history(self, job_id: "str | None" = None):
"""Durable history of completed server-side jobs (SHOW JOB HISTORY).
Reads each table's durable job-history store. Pass ``job_id`` to narrow
to a single job. Unlike :meth:`list_jobs` (live, inflight) these are the
terminal records, with created/updated/completed timestamps.
"""
return await self._inner.job_history(job_id)
async def errors(self, job_id: "str | None" = None, table: "str | None" = None):
"""Per-row UDF errors recorded by ``error_policy=skip`` (SHOW ERRORS).
Optionally filtered by ``job_id`` and/or ``table``.
"""
return await self._inner.errors(job_id, table)
async def rename_table(
self,
cur_name: str,

View File

@@ -70,6 +70,39 @@ pub struct JobInfo {
pub error: Option<String>,
}
/// One durable, completed/terminal server-side job record (SHOW JOB HISTORY).
#[pyclass(get_all)]
#[derive(Clone)]
pub struct JobHistoryEntry {
pub table: String,
pub job_id: String,
pub job_type: String,
pub state: String,
pub column: Option<String>,
pub created_ms: i64,
pub updated_ms: i64,
pub completed_ms: Option<i64>,
pub rows_processed: Option<i64>,
pub rows_skipped: Option<i64>,
pub error: Option<String>,
pub events: Option<String>,
}
/// One per-row UDF error recorded by `error_policy=skip` (SHOW ERRORS).
#[pyclass(get_all)]
#[derive(Clone)]
pub struct JobErrorEntry {
pub job_id: String,
pub table: String,
pub column: String,
pub error_type: String,
pub error_message: String,
pub fragment_id: Option<i64>,
pub source_row_id: Option<i64>,
pub table_version: Option<i64>,
pub age_seconds: Option<i64>,
}
/// The plan a REFRESH MATERIALIZED VIEW would execute (EXPLAIN REFRESH).
#[pyclass(get_all)]
#[derive(Clone)]
@@ -611,6 +644,63 @@ impl Connection {
})
}
#[pyo3(signature = (job_id=None))]
pub fn job_history(
self_: PyRef<'_, Self>,
job_id: Option<String>,
) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.get_inner()?.clone();
future_into_py(self_.py(), async move {
let rows = inner.job_history(job_id.as_deref()).await.infer_error()?;
Ok(rows
.into_iter()
.map(|r| JobHistoryEntry {
table: r.table,
job_id: r.job_id,
job_type: r.job_type,
state: r.state,
column: r.column,
created_ms: r.created_ms,
updated_ms: r.updated_ms,
completed_ms: r.completed_ms,
rows_processed: r.rows_processed,
rows_skipped: r.rows_skipped,
error: r.error,
events: r.events,
})
.collect::<Vec<_>>())
})
}
#[pyo3(signature = (job_id=None, table=None))]
pub fn errors(
self_: PyRef<'_, Self>,
job_id: Option<String>,
table: Option<String>,
) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.get_inner()?.clone();
future_into_py(self_.py(), async move {
let rows = inner
.errors(job_id.as_deref(), table.as_deref())
.await
.infer_error()?;
Ok(rows
.into_iter()
.map(|e| JobErrorEntry {
job_id: e.job_id,
table: e.table,
column: e.column,
error_type: e.error_type,
error_message: e.error_message,
fragment_id: e.fragment_id,
source_row_id: e.source_row_id,
table_version: e.table_version,
age_seconds: e.age_seconds,
})
.collect::<Vec<_>>())
})
}
#[pyo3(signature = (cur_name, new_name, cur_namespace_path=None, new_namespace_path=None))]
pub fn rename_table(
self_: PyRef<'_, Self>,

View File

@@ -44,6 +44,8 @@ pub fn _lancedb(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<connection::FunctionInfo>()?;
m.add_class::<connection::MaterializedViewInfo>()?;
m.add_class::<connection::JobInfo>()?;
m.add_class::<connection::JobHistoryEntry>()?;
m.add_class::<connection::JobErrorEntry>()?;
m.add_class::<Session>()?;
m.add_class::<Table>()?;
m.add_class::<IndexConfig>()?;

View File

@@ -24,8 +24,9 @@ use crate::data::scannable::Scannable;
use crate::database::listing::ListingDatabase;
use crate::database::{
CloneTableRequest, CreateFunctionRequest, CreateMaterializedViewRequest, Database,
DatabaseOptions, FunctionInfo, JobInfo, MaterializedViewInfo, MvRefreshPlan, OpenTableRequest,
ReadConsistency, RefreshMaterializedViewRequest, TableLineageRequest, TableNamesRequest,
DatabaseOptions, FunctionInfo, JobErrorInfo, JobHistoryInfo, JobInfo, MaterializedViewInfo,
MvRefreshPlan, OpenTableRequest, ReadConsistency, RefreshMaterializedViewRequest,
TableLineageRequest, TableNamesRequest,
};
use crate::embeddings::{EmbeddingRegistry, MemoryRegistry};
use crate::error::{Error, Result};
@@ -580,6 +581,22 @@ impl Connection {
self.internal.get_job(job_id, table_hint).await
}
/// Durable job history (SHOW JOB HISTORY) across the database's tables.
/// Pass `job_id` to narrow to a single job.
pub async fn job_history(&self, job_id: Option<&str>) -> Result<Vec<JobHistoryInfo>> {
self.internal.job_history(job_id).await
}
/// Per-row UDF errors (SHOW ERRORS) across the database's tables, optionally
/// filtered by `job_id` and/or `table`.
pub async fn errors(
&self,
job_id: Option<&str>,
table: Option<&str>,
) -> Result<Vec<JobErrorInfo>> {
self.internal.errors(job_id, table).await
}
/// Rename a table in the database.
///
/// This is only supported in LanceDB Cloud.

View File

@@ -341,6 +341,42 @@ pub struct JobInfo {
pub error: Option<String>,
}
/// A row from `job_history`: one durable, completed/terminal server-side job
/// record (SHOW JOB HISTORY), read from a table's `_job_history` store. Unlike
/// `JobInfo` (live, inflight jobs) this carries created/updated/completed
/// timestamps and the lifecycle event log.
#[derive(Debug, Clone)]
pub struct JobHistoryInfo {
pub table: String,
pub job_id: String,
pub job_type: String,
pub state: String,
pub column: Option<String>,
pub created_ms: i64,
pub updated_ms: i64,
pub completed_ms: Option<i64>,
pub rows_processed: Option<i64>,
pub rows_skipped: Option<i64>,
pub error: Option<String>,
/// Newline-joined lifecycle event log, oldest first.
pub events: Option<String>,
}
/// A row from `errors`: one per-row UDF failure recorded by `error_policy=skip`
/// (SHOW ERRORS).
#[derive(Debug, Clone)]
pub struct JobErrorInfo {
pub job_id: String,
pub table: String,
pub column: String,
pub error_type: String,
pub error_message: String,
pub fragment_id: Option<i64>,
pub source_row_id: Option<i64>,
pub table_version: Option<i64>,
pub age_seconds: Option<i64>,
}
/// The plan a `REFRESH MATERIALIZED VIEW` would execute, as returned by
/// `explain_refresh_materialized_view` (EXPLAIN REFRESH). No work is run.
#[derive(Debug, Clone)]
@@ -486,6 +522,20 @@ pub trait Database:
async fn get_job(&self, _job_id: &str, _table_hint: Option<&str>) -> Result<Option<JobInfo>> {
not_supported("get_job")
}
/// Durable job history (SHOW JOB HISTORY) across the database's tables,
/// optionally narrowed to a single `job_id`.
async fn job_history(&self, _job_id: Option<&str>) -> Result<Vec<JobHistoryInfo>> {
not_supported("job_history")
}
/// Per-row UDF errors (SHOW ERRORS) recorded by `error_policy=skip` across
/// the database's tables, optionally filtered by `job_id` and/or `table`.
async fn errors(
&self,
_job_id: Option<&str>,
_table: Option<&str>,
) -> Result<Vec<JobErrorInfo>> {
not_supported("errors")
}
/// Open a table in the database
async fn open_table(&self, request: OpenTableRequest) -> Result<Arc<dyn BaseTable>>;

View File

@@ -20,9 +20,9 @@ use lance_namespace::models::{
use crate::Error;
use crate::database::{
CloneTableRequest, CreateFunctionRequest, CreateMaterializedViewRequest, CreateTableMode,
CreateTableRequest, Database, DatabaseOptions, FunctionInfo, JobInfo, MaterializedViewInfo,
MvRefreshPlan, OpenTableRequest, ReadConsistency, RefreshMaterializedViewRequest,
TableLineageRequest, TableNamesRequest,
CreateTableRequest, Database, DatabaseOptions, FunctionInfo, JobErrorInfo, JobHistoryInfo,
JobInfo, MaterializedViewInfo, MvRefreshPlan, OpenTableRequest, ReadConsistency,
RefreshMaterializedViewRequest, TableLineageRequest, TableNamesRequest,
};
use crate::error::Result;
use crate::remote::util::stream_as_body;
@@ -193,6 +193,90 @@ impl From<RemoteJobEntry> for JobInfo {
}
}
#[derive(serde::Deserialize)]
struct RemoteJobHistoryEntry {
table: String,
job_id: String,
job_type: String,
state: String,
#[serde(default)]
column: Option<String>,
created_ms: i64,
updated_ms: i64,
#[serde(default)]
completed_ms: Option<i64>,
#[serde(default)]
rows_processed: Option<i64>,
#[serde(default)]
rows_skipped: Option<i64>,
#[serde(default)]
error: Option<String>,
#[serde(default)]
events: Option<String>,
}
#[derive(serde::Deserialize)]
struct RemoteJobHistoryResponse {
jobs: Vec<RemoteJobHistoryEntry>,
}
impl From<RemoteJobHistoryEntry> for JobHistoryInfo {
fn from(j: RemoteJobHistoryEntry) -> Self {
JobHistoryInfo {
table: j.table,
job_id: j.job_id,
job_type: j.job_type,
state: j.state,
column: j.column,
created_ms: j.created_ms,
updated_ms: j.updated_ms,
completed_ms: j.completed_ms,
rows_processed: j.rows_processed,
rows_skipped: j.rows_skipped,
error: j.error,
events: j.events,
}
}
}
#[derive(serde::Deserialize)]
struct RemoteErrorEntry {
job_id: String,
table: String,
column: String,
error_type: String,
error_message: String,
#[serde(default)]
fragment_id: Option<i64>,
#[serde(default)]
source_row_id: Option<i64>,
#[serde(default)]
table_version: Option<i64>,
#[serde(default)]
age_seconds: Option<i64>,
}
#[derive(serde::Deserialize)]
struct RemoteErrorsResponse {
errors: Vec<RemoteErrorEntry>,
}
impl From<RemoteErrorEntry> for JobErrorInfo {
fn from(e: RemoteErrorEntry) -> Self {
JobErrorInfo {
job_id: e.job_id,
table: e.table,
column: e.column,
error_type: e.error_type,
error_message: e.error_message,
fragment_id: e.fragment_id,
source_row_id: e.source_row_id,
table_version: e.table_version,
age_seconds: e.age_seconds,
}
}
}
// Request structure for the remote clone table API
#[derive(serde::Serialize)]
struct RemoteCloneTableRequest {
@@ -998,6 +1082,31 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
Ok(body.cancelled)
}
async fn job_history(&self, job_id: Option<&str>) -> Result<Vec<JobHistoryInfo>> {
let mut req = self.client.get("/v1/job/history");
if let Some(j) = job_id {
req = req.query(&[("job", j)]);
}
let (request_id, rsp) = self.client.send(req).await?;
let rsp = self.client.check_response(&request_id, rsp).await?;
let body: RemoteJobHistoryResponse = rsp.json().await.err_to_http(request_id)?;
Ok(body.jobs.into_iter().map(JobHistoryInfo::from).collect())
}
async fn errors(&self, job_id: Option<&str>, table: Option<&str>) -> Result<Vec<JobErrorInfo>> {
let mut req = self.client.get("/v1/job/errors");
if let Some(j) = job_id {
req = req.query(&[("job", j)]);
}
if let Some(t) = table {
req = req.query(&[("table", t)]);
}
let (request_id, rsp) = self.client.send(req).await?;
let rsp = self.client.check_response(&request_id, rsp).await?;
let body: RemoteErrorsResponse = rsp.json().await.err_to_http(request_id)?;
Ok(body.errors.into_iter().map(JobErrorInfo::from).collect())
}
async fn open_table(&self, request: OpenTableRequest) -> Result<Arc<dyn BaseTable>> {
let identifier = build_table_identifier(
&request.name,
@@ -2106,6 +2215,52 @@ mod tests {
.unwrap()
});
assert!(!conn.cancel_job("gone").await.unwrap());
// job_history: GET /v1/job/history, no filter
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::GET);
assert_eq!(request.url().path(), "/v1/job/history");
assert!(request.url().query().is_none());
http::Response::builder()
.status(200)
.body(
r#"{"jobs":[{"table":"docs","job_id":"j-1","job_type":"udf_virtual_column_backfill","state":"done","column":"vec","created_ms":1000,"updated_ms":2000,"completed_ms":2000,"rows_processed":42,"rows_skipped":3,"error":null,"events":"created\ndone"}]}"#,
)
.unwrap()
});
let hist = conn.job_history(None).await.unwrap();
assert_eq!(hist.len(), 1);
assert_eq!(hist[0].state, "done");
assert_eq!(hist[0].rows_processed, Some(42));
assert_eq!(hist[0].events.as_deref(), Some("created\ndone"));
// job_history: ?job= narrows to one job
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.url().path(), "/v1/job/history");
assert_eq!(request.url().query(), Some("job=j-1"));
http::Response::builder()
.status(200)
.body(r#"{"jobs":[]}"#)
.unwrap()
});
assert!(conn.job_history(Some("j-1")).await.unwrap().is_empty());
// errors: GET /v1/job/errors with job + table filters
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::GET);
assert_eq!(request.url().path(), "/v1/job/errors");
assert_eq!(request.url().query(), Some("job=j-1&table=docs"));
http::Response::builder()
.status(200)
.body(
r#"{"errors":[{"job_id":"j-1","table":"docs","column":"vec","error_type":"ValueError","error_message":"boom","fragment_id":0,"source_row_id":42,"table_version":7,"age_seconds":5}]}"#,
)
.unwrap()
});
let errs = conn.errors(Some("j-1"), Some("docs")).await.unwrap();
assert_eq!(errs.len(), 1);
assert_eq!(errs[0].error_type, "ValueError");
assert_eq!(errs[0].source_row_id, Some(42));
}
#[tokio::test]