feat: SDK surface for functions, materialized views, jobs, refresh_column

Adds the derived-compute interface to the SDK:

- Database trait: create/list/drop_function, create/refresh/alter/
  drop/list_materialized_view, list_jobs -- default implementations
  return Error::NotSupported (NotImplementedError in python), so
  existing Database impls are unaffected; local single-node
  implementations are planned. BaseTable gains refresh_column with
  the same default.
- RemoteDatabase/RemoteTable implement them against the server REST
  routes (/v1/function/*, /v1/materialized_view/*, /v1/job/list,
  /v1/table/{id}/refresh_column), with mock-HTTP unit tests.
- Connection/Table public methods, pyo3 bindings (FunctionInfo,
  MaterializedViewInfo, JobInfo pyclasses), and python wrappers:
  sync on the DBConnection base (shared by local and remote
  connections), async on AsyncConnection; refresh_column on
  LanceTable, RemoteTable, and AsyncTable.
This commit is contained in:
Wyatt Alt
2026-06-12 10:00:07 -07:00
committed by Jack Ye
parent 10fecdf051
commit ff3c7111b9
11 changed files with 1182 additions and 6 deletions

View File

@@ -563,6 +563,101 @@ class DBConnection(EnforceOverrides):
raise NotImplementedError("serialize is not supported for this connection type")
# -- Derived compute: functions, materialized views, jobs -------------
# Server-backed features (LanceDB Enterprise / Cloud); local
# connections raise NotImplementedError for now.
def create_function(
self,
name: str,
language: str,
return_type: str,
body: str,
options: Optional[Dict[str, str]] = None,
):
"""Register a UDF (CREATE FUNCTION).
Parameters
----------
name: str
Function name.
language: str
Implementation language (currently "python").
return_type: str
SQL return type, e.g. "FLOAT", "FLOAT[1536]",
"STRUCT(a FLOAT, b VARCHAR)", "TABLE(chunk VARCHAR, idx INT)".
body: str
Function body: source text, or base64 cloudpickle bytes when
options["body_format"] == "cloudpickle".
options: dict, optional
input_columns, pip, num_gpus, batch_size, timeout,
error_policy, docker_image, body_format, ...
"""
LOOP.run(self._conn.create_function(name, language, return_type, body, options))
def list_functions(self):
"""List registered functions (SHOW FUNCTIONS)."""
return LOOP.run(self._conn.list_functions())
def drop_function(self, name: str):
"""Drop a registered function (DROP FUNCTION)."""
LOOP.run(self._conn.drop_function(name))
def create_materialized_view(
self,
name: str,
query: str,
*,
auto_refresh: bool = False,
with_no_data: bool = False,
) -> Optional[str]:
"""Create a materialized view (CREATE MATERIALIZED VIEW).
`query` is the view's SELECT statement, e.g.
"SELECT id, embed(body) AS vec FROM articles WHERE id > 1".
Returns the initial-population job id, or None when
with_no_data=True.
"""
return LOOP.run(
self._conn.create_materialized_view(
name, query, auto_refresh=auto_refresh, with_no_data=with_no_data
)
)
def refresh_materialized_view(
self,
name: str,
*,
src_version: Optional[int] = None,
num_workers: Optional[int] = None,
max_workers: Optional[int] = None,
) -> str:
"""Refresh a materialized view; returns the refresh job id."""
return LOOP.run(
self._conn.refresh_materialized_view(
name,
src_version=src_version,
num_workers=num_workers,
max_workers=max_workers,
)
)
def alter_materialized_view(self, name: str, *, auto_refresh: bool):
"""Update a materialized view's options (ALTER MATERIALIZED VIEW)."""
LOOP.run(self._conn.alter_materialized_view(name, auto_refresh=auto_refresh))
def drop_materialized_view(self, name: str):
"""Drop a materialized view definition (DROP MATERIALIZED VIEW)."""
LOOP.run(self._conn.drop_materialized_view(name))
def list_materialized_views(self):
"""List registered materialized view definitions."""
return LOOP.run(self._conn.list_materialized_views())
def list_jobs(self):
"""List inflight server-side jobs across the database's tables."""
return LOOP.run(self._conn.list_jobs())
class LanceDBConnection(DBConnection):
"""
A connection to a LanceDB database.
@@ -1787,6 +1882,75 @@ class AsyncConnection(object):
)
return AsyncTable(table)
# -- Derived compute: functions, materialized views, jobs -------------
# Server-backed features (LanceDB Enterprise / Cloud); local
# connections raise NotImplementedError for now.
async def create_function(
self,
name: str,
language: str,
return_type: str,
body: str,
options: Optional[Dict[str, str]] = None,
):
"""Register a UDF (CREATE FUNCTION)."""
await self._inner.create_function(name, language, return_type, body, options)
async def list_functions(self):
"""List registered functions (SHOW FUNCTIONS)."""
return await self._inner.list_functions()
async def drop_function(self, name: str):
"""Drop a registered function (DROP FUNCTION)."""
await self._inner.drop_function(name)
async def create_materialized_view(
self,
name: str,
query: str,
*,
auto_refresh: bool = False,
with_no_data: bool = False,
) -> Optional[str]:
"""Create a materialized view; returns the initial-population
job id, or None when with_no_data=True."""
return await self._inner.create_materialized_view(
name, query, auto_refresh=auto_refresh, with_no_data=with_no_data
)
async def refresh_materialized_view(
self,
name: str,
*,
src_version: Optional[int] = None,
num_workers: Optional[int] = None,
max_workers: Optional[int] = None,
) -> str:
"""Refresh a materialized view; returns the refresh job id."""
return await self._inner.refresh_materialized_view(
name,
src_version=src_version,
num_workers=num_workers,
max_workers=max_workers,
)
async def alter_materialized_view(self, name: str, *, auto_refresh: bool):
"""Update a materialized view's options."""
await self._inner.alter_materialized_view(name, auto_refresh)
async def drop_materialized_view(self, name: str):
"""Drop a materialized view definition."""
await self._inner.drop_materialized_view(name)
async def list_materialized_views(self):
"""List registered materialized view definitions."""
return await self._inner.list_materialized_views()
async def list_jobs(self):
"""List inflight server-side jobs across the database's tables."""
return await self._inner.list_jobs()
async def rename_table(
self,
cur_name: str,

View File

@@ -887,6 +887,33 @@ class RemoteTable(Table):
def add_columns(self, transforms: Dict[str, str]) -> AddColumnsResult:
return LOOP.run(self._table.add_columns(transforms))
def refresh_column(
self,
columns,
*,
where: Optional[str] = None,
num_workers: Optional[int] = None,
max_workers: Optional[int] = None,
) -> str:
"""Trigger recompute of computed columns (REFRESH COLUMN).
The expression is resolved server-side from each column's stored
binding; columns bound to the same struct-returning function
refresh together. Returns the refresh job id. Server-backed
feature (LanceDB Enterprise / Cloud).
"""
if isinstance(columns, str):
columns = [columns]
return LOOP.run(
self._table.refresh_column(
list(columns),
where=where,
num_workers=num_workers,
max_workers=max_workers,
)
)
def alter_columns(
self, *alterations: Iterable[Dict[str, str]]
) -> AlterColumnsResult:

View File

@@ -3714,6 +3714,33 @@ class LanceTable(Table):
) -> AddColumnsResult:
return LOOP.run(self._table.add_columns(transforms))
def refresh_column(
self,
columns,
*,
where: Optional[str] = None,
num_workers: Optional[int] = None,
max_workers: Optional[int] = None,
) -> str:
"""Trigger recompute of computed columns (REFRESH COLUMN).
The expression is resolved server-side from each column's stored
binding; columns bound to the same struct-returning function
refresh together. Returns the refresh job id. Server-backed
feature (LanceDB Enterprise / Cloud).
"""
if isinstance(columns, str):
columns = [columns]
return LOOP.run(
self._table.refresh_column(
list(columns),
where=where,
num_workers=num_workers,
max_workers=max_workers,
)
)
def alter_columns(
self, *alterations: Iterable[Dict[str, str]]
) -> AlterColumnsResult:
@@ -5390,6 +5417,25 @@ class AsyncTable:
return await self._inner.update(updates_sql, where)
async def refresh_column(
self,
columns,
*,
where: Optional[str] = None,
num_workers: Optional[int] = None,
max_workers: Optional[int] = None,
) -> str:
"""Trigger recompute of computed columns (REFRESH COLUMN).
Returns the refresh job id. Server-backed feature."""
if isinstance(columns, str):
columns = [columns]
return await self._inner.refresh_column(
list(columns),
where_clause=where,
num_workers=num_workers,
max_workers=max_workers,
)
async def add_columns(
self, transforms: dict[str, str] | pa.field | List[pa.field] | pa.Schema
) -> AddColumnsResult:

View File

@@ -18,7 +18,10 @@ use lancedb::{
connection::Connection as LanceConnection,
connection::NamespaceClientPushdownOperation,
database::namespace::LanceNamespaceDatabase,
database::{CreateTableMode, Database, ReadConsistency},
database::{
CreateFunctionRequest, CreateMaterializedViewRequest, CreateTableMode, Database,
ReadConsistency, RefreshMaterializedViewRequest,
},
};
use pyo3::{
Bound, FromPyObject, Py, PyAny, PyRef, PyResult, Python,
@@ -27,6 +30,46 @@ use pyo3::{
types::{PyDict, PyDictMethods},
};
/// A registered function, as returned by `list_functions`.
#[pyclass(get_all)]
#[derive(Clone)]
pub struct FunctionInfo {
pub name: String,
pub language: String,
pub return_type: String,
pub description: String,
}
/// A registered materialized view definition.
#[pyclass(get_all)]
#[derive(Clone)]
pub struct MaterializedViewInfo {
pub name: String,
pub source_table: String,
pub projection: Vec<String>,
pub udf_columns: Vec<String>,
pub filter: Option<String>,
pub auto_refresh: bool,
}
/// One inflight server-side job.
#[pyclass(get_all)]
#[derive(Clone)]
pub struct JobInfo {
pub table: String,
pub job_id: String,
pub job_type: String,
pub state: String,
pub column: Option<String>,
pub age_seconds: Option<i64>,
pub command: Option<String>,
pub units_done: Option<i64>,
pub units_total: Option<i64>,
pub committed: bool,
pub rows_skipped: u64,
pub error: Option<String>,
}
#[pyclass]
pub struct Connection {
inner: Option<LanceConnection>,
@@ -310,6 +353,163 @@ impl Connection {
})
}
#[pyo3(signature = (name, language, return_type, body, options=None))]
pub fn create_function(
self_: PyRef<'_, Self>,
name: String,
language: String,
return_type: String,
body: String,
options: Option<HashMap<String, String>>,
) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.get_inner()?.clone();
future_into_py(self_.py(), async move {
inner
.create_function(CreateFunctionRequest {
name,
language,
return_type,
body,
options: options.unwrap_or_default(),
})
.await
.infer_error()
})
}
pub fn list_functions(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.get_inner()?.clone();
future_into_py(self_.py(), async move {
let functions = inner.list_functions().await.infer_error()?;
Ok(functions
.into_iter()
.map(|f| FunctionInfo {
name: f.name,
language: f.language,
return_type: f.return_type,
description: f.description,
})
.collect::<Vec<_>>())
})
}
pub fn drop_function(self_: PyRef<'_, Self>, name: String) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.get_inner()?.clone();
future_into_py(self_.py(), async move {
inner.drop_function(&name).await.infer_error()
})
}
#[pyo3(signature = (name, query, auto_refresh=false, with_no_data=false))]
pub fn create_materialized_view(
self_: PyRef<'_, Self>,
name: String,
query: String,
auto_refresh: bool,
with_no_data: bool,
) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.get_inner()?.clone();
future_into_py(self_.py(), async move {
inner
.create_materialized_view(CreateMaterializedViewRequest {
name,
query,
auto_refresh,
with_no_data,
})
.await
.infer_error()
})
}
#[pyo3(signature = (name, src_version=None, num_workers=None, max_workers=None))]
pub fn refresh_materialized_view(
self_: PyRef<'_, Self>,
name: String,
src_version: Option<u64>,
num_workers: Option<u32>,
max_workers: Option<u32>,
) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.get_inner()?.clone();
future_into_py(self_.py(), async move {
inner
.refresh_materialized_view(RefreshMaterializedViewRequest {
name,
src_version,
num_workers,
max_workers,
})
.await
.infer_error()
})
}
pub fn alter_materialized_view(
self_: PyRef<'_, Self>,
name: String,
auto_refresh: bool,
) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.get_inner()?.clone();
future_into_py(self_.py(), async move {
inner
.alter_materialized_view(&name, auto_refresh)
.await
.infer_error()
})
}
pub fn drop_materialized_view(
self_: PyRef<'_, Self>,
name: String,
) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.get_inner()?.clone();
future_into_py(self_.py(), async move {
inner.drop_materialized_view(&name).await.infer_error()
})
}
pub fn list_materialized_views(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.get_inner()?.clone();
future_into_py(self_.py(), async move {
let views = inner.list_materialized_views().await.infer_error()?;
Ok(views
.into_iter()
.map(|v| MaterializedViewInfo {
name: v.name,
source_table: v.source_table,
projection: v.projection,
udf_columns: v.udf_columns,
filter: v.filter,
auto_refresh: v.auto_refresh,
})
.collect::<Vec<_>>())
})
}
pub fn list_jobs(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.get_inner()?.clone();
future_into_py(self_.py(), async move {
let jobs = inner.list_jobs().await.infer_error()?;
Ok(jobs
.into_iter()
.map(|j| JobInfo {
table: j.table,
job_id: j.job_id,
job_type: j.job_type,
state: j.state,
column: j.column,
age_seconds: j.age_seconds,
command: j.command,
units_done: j.units_done,
units_total: j.units_total,
committed: j.committed,
rows_skipped: j.rows_skipped,
error: j.error,
})
.collect::<Vec<_>>())
})
}
#[pyo3(signature = (cur_name, new_name, cur_namespace_path=None, new_namespace_path=None))]
pub fn rename_table(
self_: PyRef<'_, Self>,

View File

@@ -41,6 +41,9 @@ pub fn _lancedb(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
.write_style("LANCEDB_LOG_STYLE");
env_logger::init_from_env(env);
m.add_class::<Connection>()?;
m.add_class::<connection::FunctionInfo>()?;
m.add_class::<connection::MaterializedViewInfo>()?;
m.add_class::<connection::JobInfo>()?;
m.add_class::<Session>()?;
m.add_class::<Table>()?;
m.add_class::<IndexConfig>()?;

View File

@@ -1060,6 +1060,23 @@ impl Table {
})
}
#[pyo3(signature = (columns, where_clause=None, num_workers=None, max_workers=None))]
pub fn refresh_column(
self_: PyRef<'_, Self>,
columns: Vec<String>,
where_clause: Option<String>,
num_workers: Option<u32>,
max_workers: Option<u32>,
) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
inner
.refresh_column(&columns, where_clause, num_workers, max_workers)
.await
.infer_error()
})
}
pub fn add_columns(
self_: PyRef<'_, Self>,
definitions: Vec<(String, String)>,

View File

@@ -23,8 +23,9 @@ use crate::connection::create_table::CreateTableBuilder;
use crate::data::scannable::Scannable;
use crate::database::listing::ListingDatabase;
use crate::database::{
CloneTableRequest, Database, DatabaseOptions, OpenTableRequest, ReadConsistency,
TableNamesRequest,
CloneTableRequest, CreateFunctionRequest, CreateMaterializedViewRequest, Database,
DatabaseOptions, FunctionInfo, JobInfo, MaterializedViewInfo, OpenTableRequest,
ReadConsistency, RefreshMaterializedViewRequest, TableNamesRequest,
};
use crate::embeddings::{EmbeddingRegistry, MemoryRegistry};
use crate::error::{Error, Result};
@@ -488,6 +489,64 @@ impl Connection {
)
}
// -- Derived compute: functions, materialized views, jobs -------------
// Server-backed features (LanceDB Enterprise / Cloud); local
// databases return NotSupported for now.
/// Register a UDF (CREATE FUNCTION).
pub async fn create_function(&self, request: CreateFunctionRequest) -> Result<()> {
self.internal.create_function(request).await
}
/// List registered functions (SHOW FUNCTIONS).
pub async fn list_functions(&self) -> Result<Vec<FunctionInfo>> {
self.internal.list_functions().await
}
/// Drop a registered function (DROP FUNCTION).
pub async fn drop_function(&self, name: &str) -> Result<()> {
self.internal.drop_function(name).await
}
/// Create a materialized view (CREATE MATERIALIZED VIEW). Returns
/// the initial-population job id, absent when `with_no_data`.
pub async fn create_materialized_view(
&self,
request: CreateMaterializedViewRequest,
) -> Result<Option<String>> {
self.internal.create_materialized_view(request).await
}
/// Refresh a materialized view; returns the refresh job id.
pub async fn refresh_materialized_view(
&self,
request: RefreshMaterializedViewRequest,
) -> Result<String> {
self.internal.refresh_materialized_view(request).await
}
/// Update a materialized view's options (ALTER MATERIALIZED VIEW).
pub async fn alter_materialized_view(&self, name: &str, auto_refresh: bool) -> Result<()> {
self.internal
.alter_materialized_view(name, auto_refresh)
.await
}
/// Drop a materialized view definition (DROP MATERIALIZED VIEW).
pub async fn drop_materialized_view(&self, name: &str) -> Result<()> {
self.internal.drop_materialized_view(name).await
}
/// List registered materialized view definitions.
pub async fn list_materialized_views(&self) -> Result<Vec<MaterializedViewInfo>> {
self.internal.list_materialized_views().await
}
/// List inflight server-side jobs across the database's tables.
pub async fn list_jobs(&self) -> Result<Vec<JobInfo>> {
self.internal.list_jobs().await
}
/// Rename a table in the database.
///
/// This is only supported in LanceDB Cloud.

View File

@@ -27,7 +27,7 @@ use lance_namespace::models::{
};
use crate::data::scannable::Scannable;
use crate::error::Result;
use crate::error::{Error, Result};
use crate::table::{BaseTable, WriteOptions};
pub mod listing;
@@ -200,6 +200,129 @@ pub enum ReadConsistency {
Strong,
}
/// A request to register a UDF (CREATE FUNCTION).
///
/// Functions are first-class database objects, decoupled from any
/// column; computed columns and materialized views reference them by
/// name. Server-backed feature (LanceDB Enterprise / Cloud).
#[derive(Debug, Clone)]
pub struct CreateFunctionRequest {
/// Function name.
pub name: String,
/// Implementation language (currently "python").
pub language: String,
/// SQL return type, e.g. `FLOAT`, `FLOAT[1536]`,
/// `STRUCT(a FLOAT, b VARCHAR)`, `TABLE(chunk VARCHAR, idx INT)`.
pub return_type: String,
/// Function body: source text, or base64 cloudpickle bytes when
/// `options["body_format"] = "cloudpickle"`.
pub body: String,
/// Options: input_columns, pip, num_gpus, batch_size, timeout,
/// error_policy, docker_image, body_format, ...
pub options: HashMap<String, String>,
}
/// A registered function, as returned by `list_functions`.
#[derive(Debug, Clone)]
pub struct FunctionInfo {
pub name: String,
pub language: String,
pub return_type: String,
pub description: String,
}
/// A request to create a materialized view (CREATE MATERIALIZED VIEW).
#[derive(Debug, Clone)]
pub struct CreateMaterializedViewRequest {
/// View name.
pub name: String,
/// The view's SELECT statement, e.g.
/// `SELECT id, embed(body) AS vec FROM articles WHERE id > 1`.
/// Bare columns project through; function-call columns compute via
/// registered UDFs (a RETURNS TABLE function makes a row-expanding
/// chunker view).
pub query: String,
/// Refresh automatically when the source table changes.
pub auto_refresh: bool,
/// Register the definition only; skip the initial population.
pub with_no_data: bool,
}
impl CreateMaterializedViewRequest {
pub fn new(name: impl Into<String>, query: impl Into<String>) -> Self {
Self {
name: name.into(),
query: query.into(),
auto_refresh: false,
with_no_data: false,
}
}
}
/// A request to refresh a materialized view.
#[derive(Debug, Clone)]
pub struct RefreshMaterializedViewRequest {
/// View name.
pub name: String,
/// Pin the refresh to a source-table version; latest when absent.
pub src_version: Option<u64>,
/// Initial worker count.
pub num_workers: Option<u32>,
/// Elastic worker ceiling.
pub max_workers: Option<u32>,
}
impl RefreshMaterializedViewRequest {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
src_version: None,
num_workers: None,
max_workers: None,
}
}
}
/// A registered materialized view definition, as returned by
/// `list_materialized_views`.
#[derive(Debug, Clone)]
pub struct MaterializedViewInfo {
pub name: String,
pub source_table: String,
/// Source columns projected through.
pub projection: Vec<String>,
/// `alias=expression` per UDF-computed column.
pub udf_columns: Vec<String>,
pub filter: Option<String>,
pub auto_refresh: bool,
}
/// A row from `list_jobs`: one inflight server-side job (index build,
/// compaction, column refresh, view refresh, ...).
#[derive(Debug, Clone)]
pub struct JobInfo {
pub table: String,
pub job_id: String,
pub job_type: String,
/// Lifecycle state: "running", "cancelling", or "stale".
pub state: String,
pub column: Option<String>,
pub age_seconds: Option<i64>,
pub command: Option<String>,
pub units_done: Option<i64>,
pub units_total: Option<i64>,
/// Whether the job's final commit has completed (output visible).
pub committed: bool,
pub rows_skipped: u64,
pub error: Option<String>,
}
fn not_supported<T>(what: &str) -> Result<T> {
Err(Error::NotSupported {
message: format!("{} is not supported by this database", what),
})
}
/// The `Database` trait defines the interface for database implementations.
///
/// A database is responsible for managing tables and their metadata.
@@ -245,6 +368,57 @@ pub trait Database:
///
/// See [`CloneTableRequest`] for detailed documentation and examples.
async fn clone_table(&self, request: CloneTableRequest) -> Result<Arc<dyn BaseTable>>;
// -- Derived compute: functions, materialized views, jobs -------------
//
// Server-backed features (LanceDB Enterprise / Cloud). The defaults
// return NotSupported; the remote database overrides them. Local
// single-node implementations are planned.
/// Register a UDF (CREATE FUNCTION).
async fn create_function(&self, _request: CreateFunctionRequest) -> Result<()> {
not_supported("create_function")
}
/// List registered functions (SHOW FUNCTIONS).
async fn list_functions(&self) -> Result<Vec<FunctionInfo>> {
not_supported("list_functions")
}
/// Drop a registered function (DROP FUNCTION).
async fn drop_function(&self, _name: &str) -> Result<()> {
not_supported("drop_function")
}
/// Create a materialized view (CREATE MATERIALIZED VIEW). Returns
/// the initial-population job id, absent when `with_no_data`.
async fn create_materialized_view(
&self,
_request: CreateMaterializedViewRequest,
) -> Result<Option<String>> {
not_supported("create_materialized_view")
}
/// Refresh a materialized view; returns the refresh job id.
async fn refresh_materialized_view(
&self,
_request: RefreshMaterializedViewRequest,
) -> Result<String> {
not_supported("refresh_materialized_view")
}
/// Update a materialized view's options (ALTER MATERIALIZED VIEW).
async fn alter_materialized_view(&self, _name: &str, _auto_refresh: bool) -> Result<()> {
not_supported("alter_materialized_view")
}
/// Drop a materialized view definition (DROP MATERIALIZED VIEW).
async fn drop_materialized_view(&self, _name: &str) -> Result<()> {
not_supported("drop_materialized_view")
}
/// List registered materialized view definitions.
async fn list_materialized_views(&self) -> Result<Vec<MaterializedViewInfo>> {
not_supported("list_materialized_views")
}
/// List inflight server-side jobs across the database's tables.
async fn list_jobs(&self) -> Result<Vec<JobInfo>> {
not_supported("list_jobs")
}
/// Open a table in the database
async fn open_table(&self, request: OpenTableRequest) -> Result<Arc<dyn BaseTable>>;
/// Rename a table in the database

View File

@@ -19,8 +19,9 @@ use lance_namespace::models::{
use crate::Error;
use crate::database::{
CloneTableRequest, CreateTableMode, CreateTableRequest, Database, DatabaseOptions,
OpenTableRequest, ReadConsistency, TableNamesRequest,
CloneTableRequest, CreateFunctionRequest, CreateMaterializedViewRequest, CreateTableMode,
CreateTableRequest, Database, DatabaseOptions, FunctionInfo, JobInfo, MaterializedViewInfo,
OpenTableRequest, ReadConsistency, RefreshMaterializedViewRequest, TableNamesRequest,
};
use crate::error::Result;
use crate::remote::util::stream_as_body;
@@ -33,6 +34,111 @@ use super::client::{
use super::table::RemoteTable;
use super::util::parse_server_version;
// Wire types for the derived-compute routes (functions, materialized
// views, jobs). Field shapes mirror the server's REST contract.
#[derive(serde::Serialize)]
struct RemoteCreateFunctionRequest {
language: String,
return_type: String,
body: String,
options: std::collections::HashMap<String, String>,
}
#[derive(serde::Deserialize)]
struct RemoteFunctionEntry {
name: String,
language: String,
return_type: String,
#[serde(default)]
description: String,
}
#[derive(serde::Deserialize)]
struct RemoteListFunctionsResponse {
functions: Vec<RemoteFunctionEntry>,
}
#[derive(serde::Serialize)]
struct RemoteCreateMaterializedViewRequest {
query: String,
auto_refresh: bool,
with_no_data: bool,
}
#[derive(serde::Deserialize)]
struct RemoteCreateMaterializedViewResponse {
#[serde(default)]
job_id: Option<String>,
}
#[derive(serde::Serialize)]
struct RemoteRefreshMaterializedViewRequest {
#[serde(skip_serializing_if = "Option::is_none")]
src_version: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
num_workers: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
max_workers: Option<u32>,
}
#[derive(serde::Deserialize)]
struct RemoteRefreshMaterializedViewResponse {
job_id: String,
}
#[derive(serde::Serialize)]
struct RemoteAlterMaterializedViewRequest {
auto_refresh: bool,
}
#[derive(serde::Deserialize)]
struct RemoteMaterializedViewEntry {
name: String,
source_table: String,
#[serde(default)]
projection: Vec<String>,
#[serde(default)]
udf_columns: Vec<String>,
#[serde(default)]
filter: Option<String>,
#[serde(default)]
auto_refresh: bool,
}
#[derive(serde::Deserialize)]
struct RemoteListMaterializedViewsResponse {
views: Vec<RemoteMaterializedViewEntry>,
}
#[derive(serde::Deserialize)]
struct RemoteJobEntry {
table: String,
job_id: String,
job_type: String,
state: String,
#[serde(default)]
column: Option<String>,
#[serde(default)]
age_seconds: Option<i64>,
#[serde(default)]
command: Option<String>,
#[serde(default)]
units_done: Option<i64>,
#[serde(default)]
units_total: Option<i64>,
#[serde(default)]
committed: bool,
#[serde(default)]
rows_skipped: u64,
#[serde(default)]
error: Option<String>,
}
#[derive(serde::Deserialize)]
struct RemoteListJobsResponse {
jobs: Vec<RemoteJobEntry>,
}
// Request structure for the remote clone table API
#[derive(serde::Serialize)]
struct RemoteCloneTableRequest {
@@ -641,6 +747,149 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
Ok(table)
}
async fn create_function(&self, request: CreateFunctionRequest) -> Result<()> {
let body = RemoteCreateFunctionRequest {
language: request.language,
return_type: request.return_type,
body: request.body,
options: request.options,
};
let req = self
.client
.post(&format!("/v1/function/{}/create", request.name))
.json(&body);
let (request_id, rsp) = self.client.send(req).await?;
self.client.check_response(&request_id, rsp).await?;
Ok(())
}
async fn list_functions(&self) -> Result<Vec<FunctionInfo>> {
let req = self.client.get("/v1/function/list");
let (request_id, rsp) = self.client.send(req).await?;
let rsp = self.client.check_response(&request_id, rsp).await?;
let body: RemoteListFunctionsResponse = rsp.json().await.err_to_http(request_id)?;
Ok(body
.functions
.into_iter()
.map(|f| FunctionInfo {
name: f.name,
language: f.language,
return_type: f.return_type,
description: f.description,
})
.collect())
}
async fn drop_function(&self, name: &str) -> Result<()> {
let req = self.client.post(&format!("/v1/function/{}/drop", name));
let (request_id, rsp) = self.client.send(req).await?;
self.client.check_response(&request_id, rsp).await?;
Ok(())
}
async fn create_materialized_view(
&self,
request: CreateMaterializedViewRequest,
) -> Result<Option<String>> {
let body = RemoteCreateMaterializedViewRequest {
query: request.query,
auto_refresh: request.auto_refresh,
with_no_data: request.with_no_data,
};
let req = self
.client
.post(&format!("/v1/materialized_view/{}/create", request.name))
.json(&body);
let (request_id, rsp) = self.client.send(req).await?;
let rsp = self.client.check_response(&request_id, rsp).await?;
let body: RemoteCreateMaterializedViewResponse =
rsp.json().await.err_to_http(request_id)?;
Ok(body.job_id)
}
async fn refresh_materialized_view(
&self,
request: RefreshMaterializedViewRequest,
) -> Result<String> {
let body = RemoteRefreshMaterializedViewRequest {
src_version: request.src_version,
num_workers: request.num_workers,
max_workers: request.max_workers,
};
let req = self
.client
.post(&format!("/v1/materialized_view/{}/refresh", request.name))
.json(&body);
let (request_id, rsp) = self.client.send(req).await?;
let rsp = self.client.check_response(&request_id, rsp).await?;
let body: RemoteRefreshMaterializedViewResponse =
rsp.json().await.err_to_http(request_id)?;
Ok(body.job_id)
}
async fn alter_materialized_view(&self, name: &str, auto_refresh: bool) -> Result<()> {
let req = self
.client
.post(&format!("/v1/materialized_view/{}/alter", name))
.json(&RemoteAlterMaterializedViewRequest { auto_refresh });
let (request_id, rsp) = self.client.send(req).await?;
self.client.check_response(&request_id, rsp).await?;
Ok(())
}
async fn drop_materialized_view(&self, name: &str) -> Result<()> {
let req = self
.client
.post(&format!("/v1/materialized_view/{}/drop", name));
let (request_id, rsp) = self.client.send(req).await?;
self.client.check_response(&request_id, rsp).await?;
Ok(())
}
async fn list_materialized_views(&self) -> Result<Vec<MaterializedViewInfo>> {
let req = self.client.get("/v1/materialized_view/list");
let (request_id, rsp) = self.client.send(req).await?;
let rsp = self.client.check_response(&request_id, rsp).await?;
let body: RemoteListMaterializedViewsResponse = rsp.json().await.err_to_http(request_id)?;
Ok(body
.views
.into_iter()
.map(|v| MaterializedViewInfo {
name: v.name,
source_table: v.source_table,
projection: v.projection,
udf_columns: v.udf_columns,
filter: v.filter,
auto_refresh: v.auto_refresh,
})
.collect())
}
async fn list_jobs(&self) -> Result<Vec<JobInfo>> {
let req = self.client.get("/v1/job/list");
let (request_id, rsp) = self.client.send(req).await?;
let rsp = self.client.check_response(&request_id, rsp).await?;
let body: RemoteListJobsResponse = rsp.json().await.err_to_http(request_id)?;
Ok(body
.jobs
.into_iter()
.map(|j| JobInfo {
table: j.table,
job_id: j.job_id,
job_type: j.job_type,
state: j.state,
column: j.column,
age_seconds: j.age_seconds,
command: j.command,
units_done: j.units_done,
units_total: j.units_total,
committed: j.committed,
rows_skipped: j.rows_skipped,
error: j.error,
})
.collect())
}
async fn open_table(&self, request: OpenTableRequest) -> Result<Arc<dyn BaseTable>> {
let identifier = build_table_identifier(
&request.name,
@@ -1580,6 +1829,156 @@ mod tests {
}
}
#[tokio::test]
async fn test_derived_compute_routes() {
// create_function
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::POST);
assert_eq!(request.url().path(), "/v1/function/embed/create");
let body: serde_json::Value =
serde_json::from_slice(request.body().unwrap().as_bytes().unwrap()).unwrap();
assert_eq!(body["language"], "python");
assert_eq!(body["return_type"], "FLOAT[4]");
assert_eq!(body["body"], "def embed(x): ...");
assert_eq!(body["options"]["pip"], "torch");
http::Response::builder()
.status(200)
.body(r#"{"name":"embed","status":"OK"}"#)
.unwrap()
});
conn.create_function(crate::database::CreateFunctionRequest {
name: "embed".into(),
language: "python".into(),
return_type: "FLOAT[4]".into(),
body: "def embed(x): ...".into(),
options: [("pip".to_string(), "torch".to_string())].into(),
})
.await
.unwrap();
// list_functions
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::GET);
assert_eq!(request.url().path(), "/v1/function/list");
http::Response::builder()
.status(200)
.body(
r#"{"functions":[{"name":"embed","language":"python","return_type":"Float32","description":""}]}"#,
)
.unwrap()
});
let functions = conn.list_functions().await.unwrap();
assert_eq!(functions.len(), 1);
assert_eq!(functions[0].name, "embed");
// drop_function
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::POST);
assert_eq!(request.url().path(), "/v1/function/embed/drop");
http::Response::builder()
.status(200)
.body(r#"{"name":"embed","status":"OK"}"#)
.unwrap()
});
conn.drop_function("embed").await.unwrap();
// create_materialized_view
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::POST);
assert_eq!(request.url().path(), "/v1/materialized_view/mv1/create");
let body: serde_json::Value =
serde_json::from_slice(request.body().unwrap().as_bytes().unwrap()).unwrap();
assert_eq!(body["query"], "SELECT id, embed(body) AS vec FROM docs");
assert_eq!(body["auto_refresh"], true);
assert_eq!(body["with_no_data"], false);
http::Response::builder()
.status(200)
.body(r#"{"name":"mv1","job_id":"j-1"}"#)
.unwrap()
});
let mut request = crate::database::CreateMaterializedViewRequest::new(
"mv1",
"SELECT id, embed(body) AS vec FROM docs",
);
request.auto_refresh = true;
let job_id = conn.create_materialized_view(request).await.unwrap();
assert_eq!(job_id.as_deref(), Some("j-1"));
// refresh_materialized_view
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::POST);
assert_eq!(request.url().path(), "/v1/materialized_view/mv1/refresh");
let body: serde_json::Value =
serde_json::from_slice(request.body().unwrap().as_bytes().unwrap()).unwrap();
assert_eq!(body["num_workers"], 2);
assert!(body.get("src_version").is_none());
http::Response::builder()
.status(202)
.body(r#"{"job_id":"j-2"}"#)
.unwrap()
});
let mut request = crate::database::RefreshMaterializedViewRequest::new("mv1");
request.num_workers = Some(2);
let job_id = conn.refresh_materialized_view(request).await.unwrap();
assert_eq!(job_id, "j-2");
// alter_materialized_view
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::POST);
assert_eq!(request.url().path(), "/v1/materialized_view/mv1/alter");
let body: serde_json::Value =
serde_json::from_slice(request.body().unwrap().as_bytes().unwrap()).unwrap();
assert_eq!(body["auto_refresh"], false);
http::Response::builder()
.status(200)
.body(r#"{"name":"mv1","status":"OK"}"#)
.unwrap()
});
conn.alter_materialized_view("mv1", false).await.unwrap();
// drop_materialized_view
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.url().path(), "/v1/materialized_view/mv1/drop");
http::Response::builder()
.status(200)
.body(r#"{"name":"mv1","status":"OK"}"#)
.unwrap()
});
conn.drop_materialized_view("mv1").await.unwrap();
// list_materialized_views
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::GET);
assert_eq!(request.url().path(), "/v1/materialized_view/list");
http::Response::builder()
.status(200)
.body(
r#"{"views":[{"name":"mv1","source_table":"docs","projection":["id"],"udf_columns":["vec=embed(body)"],"filter":null,"auto_refresh":true}]}"#,
)
.unwrap()
});
let views = conn.list_materialized_views().await.unwrap();
assert_eq!(views.len(), 1);
assert_eq!(views[0].source_table, "docs");
assert!(views[0].auto_refresh);
// list_jobs
let conn = Connection::new_with_handler(|request| {
assert_eq!(request.method(), &reqwest::Method::GET);
assert_eq!(request.url().path(), "/v1/job/list");
http::Response::builder()
.status(200)
.body(
r#"{"jobs":[{"table":"docs","job_id":"j-3","job_type":"udf_virtual_column_backfill","state":"running","column":"vec","age_seconds":4,"command":null,"units_done":1,"units_total":2,"committed":false,"rows_skipped":0,"error":null}]}"#,
)
.unwrap()
});
let jobs = conn.list_jobs().await.unwrap();
assert_eq!(jobs.len(), 1);
assert_eq!(jobs[0].state, "running");
assert_eq!(jobs[0].units_total, Some(2));
}
#[tokio::test]
async fn test_clone_table() {
let conn = Connection::new_with_handler(|request| {

View File

@@ -2309,6 +2309,37 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
message: "optimize is not supported on LanceDB cloud.".into(),
})
}
async fn refresh_column(
&self,
columns: &[String],
where_clause: Option<String>,
num_workers: Option<u32>,
max_workers: Option<u32>,
) -> Result<String> {
let mut body = serde_json::json!({ "columns": columns });
if let Some(w) = where_clause {
body["where_clause"] = serde_json::Value::String(w);
}
if let Some(n) = num_workers {
body["num_workers"] = n.into();
}
if let Some(n) = max_workers {
body["max_workers"] = n.into();
}
let request = self
.client
.post(&format!("/v1/table/{}/refresh_column", self.identifier))
.json(&body);
let (request_id, response) = self.send(request, true).await?;
let response = self.check_table_response(&request_id, response).await?;
#[derive(serde::Deserialize)]
struct RefreshColumnResponse {
job_id: String,
}
let body: RefreshColumnResponse = response.json().await.err_to_http(request_id)?;
Ok(body.job_id)
}
async fn add_columns(
&self,
transforms: NewColumnTransform,
@@ -2801,6 +2832,30 @@ mod tests {
}
}
#[tokio::test]
async fn test_refresh_column() {
let table = Table::new_with_handler("my_table", |request| {
assert_eq!(request.method(), "POST");
assert_eq!(request.url().path(), "/v1/table/my_table/refresh_column");
let body: serde_json::Value =
serde_json::from_slice(request.body().unwrap().as_bytes().unwrap()).unwrap();
assert_eq!(body["columns"], serde_json::json!(["vec"]));
assert_eq!(body["num_workers"], 2);
assert!(body.get("where_clause").is_none());
http::Response::builder()
.status(202)
.body(r#"{"job_id":"j-9"}"#)
.unwrap()
});
let job_id = table
.refresh_column(&["vec".to_string()], None, Some(2), None)
.await
.unwrap();
assert_eq!(job_id, "j-9");
}
#[tokio::test]
async fn test_version() {
let table = Table::new_with_handler("my_table", |request| {

View File

@@ -620,6 +620,22 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
transforms: NewColumnTransform,
read_columns: Option<Vec<String>>,
) -> Result<AddColumnsResult>;
/// Trigger recompute of computed columns. The expression is
/// resolved server-side from each column's stored binding; columns
/// bound to the same struct-returning function refresh together.
/// Returns the refresh job id. Server-backed feature (LanceDB
/// Enterprise / Cloud); the default returns NotSupported.
async fn refresh_column(
&self,
_columns: &[String],
_where_clause: Option<String>,
_num_workers: Option<u32>,
_max_workers: Option<u32>,
) -> Result<String> {
Err(Error::NotSupported {
message: "refresh_column is not supported by this table".into(),
})
}
/// Alter columns in the table.
async fn alter_columns(&self, alterations: &[ColumnAlteration]) -> Result<AlterColumnsResult>;
/// Drop columns from the table.
@@ -1461,6 +1477,22 @@ impl Table {
self.inner.add_columns(transforms, read_columns).await
}
/// Trigger recompute of computed columns (REFRESH COLUMN). The
/// expression comes from each column's stored binding; columns
/// bound to the same struct-returning function refresh together.
/// Returns the refresh job id. Server-backed feature.
pub async fn refresh_column(
&self,
columns: &[String],
where_clause: Option<String>,
num_workers: Option<u32>,
max_workers: Option<u32>,
) -> Result<String> {
self.inner
.refresh_column(columns, where_clause, num_workers, max_workers)
.await
}
/// Change a column's name or nullability.
pub async fn alter_columns(
&self,