diff --git a/python/python/lancedb/db.py b/python/python/lancedb/db.py index 5ec697d4e..640a6c9ff 100644 --- a/python/python/lancedb/db.py +++ b/python/python/lancedb/db.py @@ -563,6 +563,101 @@ class DBConnection(EnforceOverrides): raise NotImplementedError("serialize is not supported for this connection type") + # -- Derived compute: functions, materialized views, jobs ------------- + # Server-backed features (LanceDB Enterprise / Cloud); local + # connections raise NotImplementedError for now. + + def create_function( + self, + name: str, + language: str, + return_type: str, + body: str, + options: Optional[Dict[str, str]] = None, + ): + """Register a UDF (CREATE FUNCTION). + + Parameters + ---------- + name: str + Function name. + language: str + Implementation language (currently "python"). + return_type: str + SQL return type, e.g. "FLOAT", "FLOAT[1536]", + "STRUCT(a FLOAT, b VARCHAR)", "TABLE(chunk VARCHAR, idx INT)". + body: str + Function body: source text, or base64 cloudpickle bytes when + options["body_format"] == "cloudpickle". + options: dict, optional + input_columns, pip, num_gpus, batch_size, timeout, + error_policy, docker_image, body_format, ... + """ + LOOP.run(self._conn.create_function(name, language, return_type, body, options)) + + def list_functions(self): + """List registered functions (SHOW FUNCTIONS).""" + return LOOP.run(self._conn.list_functions()) + + def drop_function(self, name: str): + """Drop a registered function (DROP FUNCTION).""" + LOOP.run(self._conn.drop_function(name)) + + def create_materialized_view( + self, + name: str, + query: str, + *, + auto_refresh: bool = False, + with_no_data: bool = False, + ) -> Optional[str]: + """Create a materialized view (CREATE MATERIALIZED VIEW). + + `query` is the view's SELECT statement, e.g. + "SELECT id, embed(body) AS vec FROM articles WHERE id > 1". + Returns the initial-population job id, or None when + with_no_data=True. + """ + return LOOP.run( + self._conn.create_materialized_view( + name, query, auto_refresh=auto_refresh, with_no_data=with_no_data + ) + ) + + def refresh_materialized_view( + self, + name: str, + *, + src_version: Optional[int] = None, + num_workers: Optional[int] = None, + max_workers: Optional[int] = None, + ) -> str: + """Refresh a materialized view; returns the refresh job id.""" + return LOOP.run( + self._conn.refresh_materialized_view( + name, + src_version=src_version, + num_workers=num_workers, + max_workers=max_workers, + ) + ) + + def alter_materialized_view(self, name: str, *, auto_refresh: bool): + """Update a materialized view's options (ALTER MATERIALIZED VIEW).""" + LOOP.run(self._conn.alter_materialized_view(name, auto_refresh=auto_refresh)) + + def drop_materialized_view(self, name: str): + """Drop a materialized view definition (DROP MATERIALIZED VIEW).""" + LOOP.run(self._conn.drop_materialized_view(name)) + + def list_materialized_views(self): + """List registered materialized view definitions.""" + return LOOP.run(self._conn.list_materialized_views()) + + def list_jobs(self): + """List inflight server-side jobs across the database's tables.""" + return LOOP.run(self._conn.list_jobs()) + class LanceDBConnection(DBConnection): """ A connection to a LanceDB database. @@ -1787,6 +1882,75 @@ class AsyncConnection(object): ) return AsyncTable(table) + # -- Derived compute: functions, materialized views, jobs ------------- + # Server-backed features (LanceDB Enterprise / Cloud); local + # connections raise NotImplementedError for now. + + async def create_function( + self, + name: str, + language: str, + return_type: str, + body: str, + options: Optional[Dict[str, str]] = None, + ): + """Register a UDF (CREATE FUNCTION).""" + await self._inner.create_function(name, language, return_type, body, options) + + async def list_functions(self): + """List registered functions (SHOW FUNCTIONS).""" + return await self._inner.list_functions() + + async def drop_function(self, name: str): + """Drop a registered function (DROP FUNCTION).""" + await self._inner.drop_function(name) + + async def create_materialized_view( + self, + name: str, + query: str, + *, + auto_refresh: bool = False, + with_no_data: bool = False, + ) -> Optional[str]: + """Create a materialized view; returns the initial-population + job id, or None when with_no_data=True.""" + return await self._inner.create_materialized_view( + name, query, auto_refresh=auto_refresh, with_no_data=with_no_data + ) + + async def refresh_materialized_view( + self, + name: str, + *, + src_version: Optional[int] = None, + num_workers: Optional[int] = None, + max_workers: Optional[int] = None, + ) -> str: + """Refresh a materialized view; returns the refresh job id.""" + return await self._inner.refresh_materialized_view( + name, + src_version=src_version, + num_workers=num_workers, + max_workers=max_workers, + ) + + async def alter_materialized_view(self, name: str, *, auto_refresh: bool): + """Update a materialized view's options.""" + await self._inner.alter_materialized_view(name, auto_refresh) + + async def drop_materialized_view(self, name: str): + """Drop a materialized view definition.""" + await self._inner.drop_materialized_view(name) + + async def list_materialized_views(self): + """List registered materialized view definitions.""" + return await self._inner.list_materialized_views() + + async def list_jobs(self): + """List inflight server-side jobs across the database's tables.""" + return await self._inner.list_jobs() + async def rename_table( self, cur_name: str, diff --git a/python/python/lancedb/remote/table.py b/python/python/lancedb/remote/table.py index 329baaa0e..1d6c0be0d 100644 --- a/python/python/lancedb/remote/table.py +++ b/python/python/lancedb/remote/table.py @@ -887,6 +887,33 @@ class RemoteTable(Table): def add_columns(self, transforms: Dict[str, str]) -> AddColumnsResult: return LOOP.run(self._table.add_columns(transforms)) + def refresh_column( + self, + columns, + *, + where: Optional[str] = None, + num_workers: Optional[int] = None, + max_workers: Optional[int] = None, + ) -> str: + """Trigger recompute of computed columns (REFRESH COLUMN). + + The expression is resolved server-side from each column's stored + binding; columns bound to the same struct-returning function + refresh together. Returns the refresh job id. Server-backed + feature (LanceDB Enterprise / Cloud). + """ + if isinstance(columns, str): + columns = [columns] + return LOOP.run( + self._table.refresh_column( + list(columns), + where=where, + num_workers=num_workers, + max_workers=max_workers, + ) + ) + + def alter_columns( self, *alterations: Iterable[Dict[str, str]] ) -> AlterColumnsResult: diff --git a/python/python/lancedb/table.py b/python/python/lancedb/table.py index 25df6b554..686ce5003 100644 --- a/python/python/lancedb/table.py +++ b/python/python/lancedb/table.py @@ -3714,6 +3714,33 @@ class LanceTable(Table): ) -> AddColumnsResult: return LOOP.run(self._table.add_columns(transforms)) + def refresh_column( + self, + columns, + *, + where: Optional[str] = None, + num_workers: Optional[int] = None, + max_workers: Optional[int] = None, + ) -> str: + """Trigger recompute of computed columns (REFRESH COLUMN). + + The expression is resolved server-side from each column's stored + binding; columns bound to the same struct-returning function + refresh together. Returns the refresh job id. Server-backed + feature (LanceDB Enterprise / Cloud). + """ + if isinstance(columns, str): + columns = [columns] + return LOOP.run( + self._table.refresh_column( + list(columns), + where=where, + num_workers=num_workers, + max_workers=max_workers, + ) + ) + + def alter_columns( self, *alterations: Iterable[Dict[str, str]] ) -> AlterColumnsResult: @@ -5390,6 +5417,25 @@ class AsyncTable: return await self._inner.update(updates_sql, where) + async def refresh_column( + self, + columns, + *, + where: Optional[str] = None, + num_workers: Optional[int] = None, + max_workers: Optional[int] = None, + ) -> str: + """Trigger recompute of computed columns (REFRESH COLUMN). + Returns the refresh job id. Server-backed feature.""" + if isinstance(columns, str): + columns = [columns] + return await self._inner.refresh_column( + list(columns), + where_clause=where, + num_workers=num_workers, + max_workers=max_workers, + ) + async def add_columns( self, transforms: dict[str, str] | pa.field | List[pa.field] | pa.Schema ) -> AddColumnsResult: diff --git a/python/src/connection.rs b/python/src/connection.rs index 1bba3cefc..b32ccfb2e 100644 --- a/python/src/connection.rs +++ b/python/src/connection.rs @@ -18,7 +18,10 @@ use lancedb::{ connection::Connection as LanceConnection, connection::NamespaceClientPushdownOperation, database::namespace::LanceNamespaceDatabase, - database::{CreateTableMode, Database, ReadConsistency}, + database::{ + CreateFunctionRequest, CreateMaterializedViewRequest, CreateTableMode, Database, + ReadConsistency, RefreshMaterializedViewRequest, + }, }; use pyo3::{ Bound, FromPyObject, Py, PyAny, PyRef, PyResult, Python, @@ -27,6 +30,46 @@ use pyo3::{ types::{PyDict, PyDictMethods}, }; +/// A registered function, as returned by `list_functions`. +#[pyclass(get_all)] +#[derive(Clone)] +pub struct FunctionInfo { + pub name: String, + pub language: String, + pub return_type: String, + pub description: String, +} + +/// A registered materialized view definition. +#[pyclass(get_all)] +#[derive(Clone)] +pub struct MaterializedViewInfo { + pub name: String, + pub source_table: String, + pub projection: Vec, + pub udf_columns: Vec, + pub filter: Option, + pub auto_refresh: bool, +} + +/// One inflight server-side job. +#[pyclass(get_all)] +#[derive(Clone)] +pub struct JobInfo { + pub table: String, + pub job_id: String, + pub job_type: String, + pub state: String, + pub column: Option, + pub age_seconds: Option, + pub command: Option, + pub units_done: Option, + pub units_total: Option, + pub committed: bool, + pub rows_skipped: u64, + pub error: Option, +} + #[pyclass] pub struct Connection { inner: Option, @@ -310,6 +353,163 @@ impl Connection { }) } + #[pyo3(signature = (name, language, return_type, body, options=None))] + pub fn create_function( + self_: PyRef<'_, Self>, + name: String, + language: String, + return_type: String, + body: String, + options: Option>, + ) -> PyResult> { + let inner = self_.get_inner()?.clone(); + future_into_py(self_.py(), async move { + inner + .create_function(CreateFunctionRequest { + name, + language, + return_type, + body, + options: options.unwrap_or_default(), + }) + .await + .infer_error() + }) + } + + pub fn list_functions(self_: PyRef<'_, Self>) -> PyResult> { + let inner = self_.get_inner()?.clone(); + future_into_py(self_.py(), async move { + let functions = inner.list_functions().await.infer_error()?; + Ok(functions + .into_iter() + .map(|f| FunctionInfo { + name: f.name, + language: f.language, + return_type: f.return_type, + description: f.description, + }) + .collect::>()) + }) + } + + pub fn drop_function(self_: PyRef<'_, Self>, name: String) -> PyResult> { + let inner = self_.get_inner()?.clone(); + future_into_py(self_.py(), async move { + inner.drop_function(&name).await.infer_error() + }) + } + + #[pyo3(signature = (name, query, auto_refresh=false, with_no_data=false))] + pub fn create_materialized_view( + self_: PyRef<'_, Self>, + name: String, + query: String, + auto_refresh: bool, + with_no_data: bool, + ) -> PyResult> { + let inner = self_.get_inner()?.clone(); + future_into_py(self_.py(), async move { + inner + .create_materialized_view(CreateMaterializedViewRequest { + name, + query, + auto_refresh, + with_no_data, + }) + .await + .infer_error() + }) + } + + #[pyo3(signature = (name, src_version=None, num_workers=None, max_workers=None))] + pub fn refresh_materialized_view( + self_: PyRef<'_, Self>, + name: String, + src_version: Option, + num_workers: Option, + max_workers: Option, + ) -> PyResult> { + let inner = self_.get_inner()?.clone(); + future_into_py(self_.py(), async move { + inner + .refresh_materialized_view(RefreshMaterializedViewRequest { + name, + src_version, + num_workers, + max_workers, + }) + .await + .infer_error() + }) + } + + pub fn alter_materialized_view( + self_: PyRef<'_, Self>, + name: String, + auto_refresh: bool, + ) -> PyResult> { + let inner = self_.get_inner()?.clone(); + future_into_py(self_.py(), async move { + inner + .alter_materialized_view(&name, auto_refresh) + .await + .infer_error() + }) + } + + pub fn drop_materialized_view( + self_: PyRef<'_, Self>, + name: String, + ) -> PyResult> { + let inner = self_.get_inner()?.clone(); + future_into_py(self_.py(), async move { + inner.drop_materialized_view(&name).await.infer_error() + }) + } + + pub fn list_materialized_views(self_: PyRef<'_, Self>) -> PyResult> { + let inner = self_.get_inner()?.clone(); + future_into_py(self_.py(), async move { + let views = inner.list_materialized_views().await.infer_error()?; + Ok(views + .into_iter() + .map(|v| MaterializedViewInfo { + name: v.name, + source_table: v.source_table, + projection: v.projection, + udf_columns: v.udf_columns, + filter: v.filter, + auto_refresh: v.auto_refresh, + }) + .collect::>()) + }) + } + + pub fn list_jobs(self_: PyRef<'_, Self>) -> PyResult> { + let inner = self_.get_inner()?.clone(); + future_into_py(self_.py(), async move { + let jobs = inner.list_jobs().await.infer_error()?; + Ok(jobs + .into_iter() + .map(|j| JobInfo { + table: j.table, + job_id: j.job_id, + job_type: j.job_type, + state: j.state, + column: j.column, + age_seconds: j.age_seconds, + command: j.command, + units_done: j.units_done, + units_total: j.units_total, + committed: j.committed, + rows_skipped: j.rows_skipped, + error: j.error, + }) + .collect::>()) + }) + } + #[pyo3(signature = (cur_name, new_name, cur_namespace_path=None, new_namespace_path=None))] pub fn rename_table( self_: PyRef<'_, Self>, diff --git a/python/src/lib.rs b/python/src/lib.rs index 72043c484..7956e58b1 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -41,6 +41,9 @@ pub fn _lancedb(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> { .write_style("LANCEDB_LOG_STYLE"); env_logger::init_from_env(env); m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/python/src/table.rs b/python/src/table.rs index 611f3c9f2..4da9e2b80 100644 --- a/python/src/table.rs +++ b/python/src/table.rs @@ -1060,6 +1060,23 @@ impl Table { }) } + #[pyo3(signature = (columns, where_clause=None, num_workers=None, max_workers=None))] + pub fn refresh_column( + self_: PyRef<'_, Self>, + columns: Vec, + where_clause: Option, + num_workers: Option, + max_workers: Option, + ) -> PyResult> { + let inner = self_.inner_ref()?.clone(); + future_into_py(self_.py(), async move { + inner + .refresh_column(&columns, where_clause, num_workers, max_workers) + .await + .infer_error() + }) + } + pub fn add_columns( self_: PyRef<'_, Self>, definitions: Vec<(String, String)>, diff --git a/rust/lancedb/src/connection.rs b/rust/lancedb/src/connection.rs index e0a4c22fa..218643284 100644 --- a/rust/lancedb/src/connection.rs +++ b/rust/lancedb/src/connection.rs @@ -23,8 +23,9 @@ use crate::connection::create_table::CreateTableBuilder; use crate::data::scannable::Scannable; use crate::database::listing::ListingDatabase; use crate::database::{ - CloneTableRequest, Database, DatabaseOptions, OpenTableRequest, ReadConsistency, - TableNamesRequest, + CloneTableRequest, CreateFunctionRequest, CreateMaterializedViewRequest, Database, + DatabaseOptions, FunctionInfo, JobInfo, MaterializedViewInfo, OpenTableRequest, + ReadConsistency, RefreshMaterializedViewRequest, TableNamesRequest, }; use crate::embeddings::{EmbeddingRegistry, MemoryRegistry}; use crate::error::{Error, Result}; @@ -488,6 +489,64 @@ impl Connection { ) } + // -- Derived compute: functions, materialized views, jobs ------------- + // Server-backed features (LanceDB Enterprise / Cloud); local + // databases return NotSupported for now. + + /// Register a UDF (CREATE FUNCTION). + pub async fn create_function(&self, request: CreateFunctionRequest) -> Result<()> { + self.internal.create_function(request).await + } + + /// List registered functions (SHOW FUNCTIONS). + pub async fn list_functions(&self) -> Result> { + self.internal.list_functions().await + } + + /// Drop a registered function (DROP FUNCTION). + pub async fn drop_function(&self, name: &str) -> Result<()> { + self.internal.drop_function(name).await + } + + /// Create a materialized view (CREATE MATERIALIZED VIEW). Returns + /// the initial-population job id, absent when `with_no_data`. + pub async fn create_materialized_view( + &self, + request: CreateMaterializedViewRequest, + ) -> Result> { + self.internal.create_materialized_view(request).await + } + + /// Refresh a materialized view; returns the refresh job id. + pub async fn refresh_materialized_view( + &self, + request: RefreshMaterializedViewRequest, + ) -> Result { + self.internal.refresh_materialized_view(request).await + } + + /// Update a materialized view's options (ALTER MATERIALIZED VIEW). + pub async fn alter_materialized_view(&self, name: &str, auto_refresh: bool) -> Result<()> { + self.internal + .alter_materialized_view(name, auto_refresh) + .await + } + + /// Drop a materialized view definition (DROP MATERIALIZED VIEW). + pub async fn drop_materialized_view(&self, name: &str) -> Result<()> { + self.internal.drop_materialized_view(name).await + } + + /// List registered materialized view definitions. + pub async fn list_materialized_views(&self) -> Result> { + self.internal.list_materialized_views().await + } + + /// List inflight server-side jobs across the database's tables. + pub async fn list_jobs(&self) -> Result> { + self.internal.list_jobs().await + } + /// Rename a table in the database. /// /// This is only supported in LanceDB Cloud. diff --git a/rust/lancedb/src/database.rs b/rust/lancedb/src/database.rs index 6ce5bdbc9..d6f788945 100644 --- a/rust/lancedb/src/database.rs +++ b/rust/lancedb/src/database.rs @@ -27,7 +27,7 @@ use lance_namespace::models::{ }; use crate::data::scannable::Scannable; -use crate::error::Result; +use crate::error::{Error, Result}; use crate::table::{BaseTable, WriteOptions}; pub mod listing; @@ -200,6 +200,129 @@ pub enum ReadConsistency { Strong, } +/// A request to register a UDF (CREATE FUNCTION). +/// +/// Functions are first-class database objects, decoupled from any +/// column; computed columns and materialized views reference them by +/// name. Server-backed feature (LanceDB Enterprise / Cloud). +#[derive(Debug, Clone)] +pub struct CreateFunctionRequest { + /// Function name. + pub name: String, + /// Implementation language (currently "python"). + pub language: String, + /// SQL return type, e.g. `FLOAT`, `FLOAT[1536]`, + /// `STRUCT(a FLOAT, b VARCHAR)`, `TABLE(chunk VARCHAR, idx INT)`. + pub return_type: String, + /// Function body: source text, or base64 cloudpickle bytes when + /// `options["body_format"] = "cloudpickle"`. + pub body: String, + /// Options: input_columns, pip, num_gpus, batch_size, timeout, + /// error_policy, docker_image, body_format, ... + pub options: HashMap, +} + +/// A registered function, as returned by `list_functions`. +#[derive(Debug, Clone)] +pub struct FunctionInfo { + pub name: String, + pub language: String, + pub return_type: String, + pub description: String, +} + +/// A request to create a materialized view (CREATE MATERIALIZED VIEW). +#[derive(Debug, Clone)] +pub struct CreateMaterializedViewRequest { + /// View name. + pub name: String, + /// The view's SELECT statement, e.g. + /// `SELECT id, embed(body) AS vec FROM articles WHERE id > 1`. + /// Bare columns project through; function-call columns compute via + /// registered UDFs (a RETURNS TABLE function makes a row-expanding + /// chunker view). + pub query: String, + /// Refresh automatically when the source table changes. + pub auto_refresh: bool, + /// Register the definition only; skip the initial population. + pub with_no_data: bool, +} + +impl CreateMaterializedViewRequest { + pub fn new(name: impl Into, query: impl Into) -> Self { + Self { + name: name.into(), + query: query.into(), + auto_refresh: false, + with_no_data: false, + } + } +} + +/// A request to refresh a materialized view. +#[derive(Debug, Clone)] +pub struct RefreshMaterializedViewRequest { + /// View name. + pub name: String, + /// Pin the refresh to a source-table version; latest when absent. + pub src_version: Option, + /// Initial worker count. + pub num_workers: Option, + /// Elastic worker ceiling. + pub max_workers: Option, +} + +impl RefreshMaterializedViewRequest { + pub fn new(name: impl Into) -> Self { + Self { + name: name.into(), + src_version: None, + num_workers: None, + max_workers: None, + } + } +} + +/// A registered materialized view definition, as returned by +/// `list_materialized_views`. +#[derive(Debug, Clone)] +pub struct MaterializedViewInfo { + pub name: String, + pub source_table: String, + /// Source columns projected through. + pub projection: Vec, + /// `alias=expression` per UDF-computed column. + pub udf_columns: Vec, + pub filter: Option, + pub auto_refresh: bool, +} + +/// A row from `list_jobs`: one inflight server-side job (index build, +/// compaction, column refresh, view refresh, ...). +#[derive(Debug, Clone)] +pub struct JobInfo { + pub table: String, + pub job_id: String, + pub job_type: String, + /// Lifecycle state: "running", "cancelling", or "stale". + pub state: String, + pub column: Option, + pub age_seconds: Option, + pub command: Option, + pub units_done: Option, + pub units_total: Option, + /// Whether the job's final commit has completed (output visible). + pub committed: bool, + pub rows_skipped: u64, + pub error: Option, +} + +fn not_supported(what: &str) -> Result { + Err(Error::NotSupported { + message: format!("{} is not supported by this database", what), + }) +} + /// The `Database` trait defines the interface for database implementations. /// /// A database is responsible for managing tables and their metadata. @@ -245,6 +368,57 @@ pub trait Database: /// /// See [`CloneTableRequest`] for detailed documentation and examples. async fn clone_table(&self, request: CloneTableRequest) -> Result>; + + // -- Derived compute: functions, materialized views, jobs ------------- + // + // Server-backed features (LanceDB Enterprise / Cloud). The defaults + // return NotSupported; the remote database overrides them. Local + // single-node implementations are planned. + + /// Register a UDF (CREATE FUNCTION). + async fn create_function(&self, _request: CreateFunctionRequest) -> Result<()> { + not_supported("create_function") + } + /// List registered functions (SHOW FUNCTIONS). + async fn list_functions(&self) -> Result> { + not_supported("list_functions") + } + /// Drop a registered function (DROP FUNCTION). + async fn drop_function(&self, _name: &str) -> Result<()> { + not_supported("drop_function") + } + /// Create a materialized view (CREATE MATERIALIZED VIEW). Returns + /// the initial-population job id, absent when `with_no_data`. + async fn create_materialized_view( + &self, + _request: CreateMaterializedViewRequest, + ) -> Result> { + not_supported("create_materialized_view") + } + /// Refresh a materialized view; returns the refresh job id. + async fn refresh_materialized_view( + &self, + _request: RefreshMaterializedViewRequest, + ) -> Result { + not_supported("refresh_materialized_view") + } + /// Update a materialized view's options (ALTER MATERIALIZED VIEW). + async fn alter_materialized_view(&self, _name: &str, _auto_refresh: bool) -> Result<()> { + not_supported("alter_materialized_view") + } + /// Drop a materialized view definition (DROP MATERIALIZED VIEW). + async fn drop_materialized_view(&self, _name: &str) -> Result<()> { + not_supported("drop_materialized_view") + } + /// List registered materialized view definitions. + async fn list_materialized_views(&self) -> Result> { + not_supported("list_materialized_views") + } + /// List inflight server-side jobs across the database's tables. + async fn list_jobs(&self) -> Result> { + not_supported("list_jobs") + } + /// Open a table in the database async fn open_table(&self, request: OpenTableRequest) -> Result>; /// Rename a table in the database diff --git a/rust/lancedb/src/remote/db.rs b/rust/lancedb/src/remote/db.rs index 25fcd23b1..15bbeb1b5 100644 --- a/rust/lancedb/src/remote/db.rs +++ b/rust/lancedb/src/remote/db.rs @@ -19,8 +19,9 @@ use lance_namespace::models::{ use crate::Error; use crate::database::{ - CloneTableRequest, CreateTableMode, CreateTableRequest, Database, DatabaseOptions, - OpenTableRequest, ReadConsistency, TableNamesRequest, + CloneTableRequest, CreateFunctionRequest, CreateMaterializedViewRequest, CreateTableMode, + CreateTableRequest, Database, DatabaseOptions, FunctionInfo, JobInfo, MaterializedViewInfo, + OpenTableRequest, ReadConsistency, RefreshMaterializedViewRequest, TableNamesRequest, }; use crate::error::Result; use crate::remote::util::stream_as_body; @@ -33,6 +34,111 @@ use super::client::{ use super::table::RemoteTable; use super::util::parse_server_version; +// Wire types for the derived-compute routes (functions, materialized +// views, jobs). Field shapes mirror the server's REST contract. +#[derive(serde::Serialize)] +struct RemoteCreateFunctionRequest { + language: String, + return_type: String, + body: String, + options: std::collections::HashMap, +} + +#[derive(serde::Deserialize)] +struct RemoteFunctionEntry { + name: String, + language: String, + return_type: String, + #[serde(default)] + description: String, +} + +#[derive(serde::Deserialize)] +struct RemoteListFunctionsResponse { + functions: Vec, +} + +#[derive(serde::Serialize)] +struct RemoteCreateMaterializedViewRequest { + query: String, + auto_refresh: bool, + with_no_data: bool, +} + +#[derive(serde::Deserialize)] +struct RemoteCreateMaterializedViewResponse { + #[serde(default)] + job_id: Option, +} + +#[derive(serde::Serialize)] +struct RemoteRefreshMaterializedViewRequest { + #[serde(skip_serializing_if = "Option::is_none")] + src_version: Option, + #[serde(skip_serializing_if = "Option::is_none")] + num_workers: Option, + #[serde(skip_serializing_if = "Option::is_none")] + max_workers: Option, +} + +#[derive(serde::Deserialize)] +struct RemoteRefreshMaterializedViewResponse { + job_id: String, +} + +#[derive(serde::Serialize)] +struct RemoteAlterMaterializedViewRequest { + auto_refresh: bool, +} + +#[derive(serde::Deserialize)] +struct RemoteMaterializedViewEntry { + name: String, + source_table: String, + #[serde(default)] + projection: Vec, + #[serde(default)] + udf_columns: Vec, + #[serde(default)] + filter: Option, + #[serde(default)] + auto_refresh: bool, +} + +#[derive(serde::Deserialize)] +struct RemoteListMaterializedViewsResponse { + views: Vec, +} + +#[derive(serde::Deserialize)] +struct RemoteJobEntry { + table: String, + job_id: String, + job_type: String, + state: String, + #[serde(default)] + column: Option, + #[serde(default)] + age_seconds: Option, + #[serde(default)] + command: Option, + #[serde(default)] + units_done: Option, + #[serde(default)] + units_total: Option, + #[serde(default)] + committed: bool, + #[serde(default)] + rows_skipped: u64, + #[serde(default)] + error: Option, +} + +#[derive(serde::Deserialize)] +struct RemoteListJobsResponse { + jobs: Vec, +} + // Request structure for the remote clone table API #[derive(serde::Serialize)] struct RemoteCloneTableRequest { @@ -641,6 +747,149 @@ impl Database for RemoteDatabase { Ok(table) } + async fn create_function(&self, request: CreateFunctionRequest) -> Result<()> { + let body = RemoteCreateFunctionRequest { + language: request.language, + return_type: request.return_type, + body: request.body, + options: request.options, + }; + let req = self + .client + .post(&format!("/v1/function/{}/create", request.name)) + .json(&body); + let (request_id, rsp) = self.client.send(req).await?; + self.client.check_response(&request_id, rsp).await?; + Ok(()) + } + + async fn list_functions(&self) -> Result> { + let req = self.client.get("/v1/function/list"); + let (request_id, rsp) = self.client.send(req).await?; + let rsp = self.client.check_response(&request_id, rsp).await?; + let body: RemoteListFunctionsResponse = rsp.json().await.err_to_http(request_id)?; + Ok(body + .functions + .into_iter() + .map(|f| FunctionInfo { + name: f.name, + language: f.language, + return_type: f.return_type, + description: f.description, + }) + .collect()) + } + + async fn drop_function(&self, name: &str) -> Result<()> { + let req = self.client.post(&format!("/v1/function/{}/drop", name)); + let (request_id, rsp) = self.client.send(req).await?; + self.client.check_response(&request_id, rsp).await?; + Ok(()) + } + + async fn create_materialized_view( + &self, + request: CreateMaterializedViewRequest, + ) -> Result> { + let body = RemoteCreateMaterializedViewRequest { + query: request.query, + auto_refresh: request.auto_refresh, + with_no_data: request.with_no_data, + }; + let req = self + .client + .post(&format!("/v1/materialized_view/{}/create", request.name)) + .json(&body); + let (request_id, rsp) = self.client.send(req).await?; + let rsp = self.client.check_response(&request_id, rsp).await?; + let body: RemoteCreateMaterializedViewResponse = + rsp.json().await.err_to_http(request_id)?; + Ok(body.job_id) + } + + async fn refresh_materialized_view( + &self, + request: RefreshMaterializedViewRequest, + ) -> Result { + let body = RemoteRefreshMaterializedViewRequest { + src_version: request.src_version, + num_workers: request.num_workers, + max_workers: request.max_workers, + }; + let req = self + .client + .post(&format!("/v1/materialized_view/{}/refresh", request.name)) + .json(&body); + let (request_id, rsp) = self.client.send(req).await?; + let rsp = self.client.check_response(&request_id, rsp).await?; + let body: RemoteRefreshMaterializedViewResponse = + rsp.json().await.err_to_http(request_id)?; + Ok(body.job_id) + } + + async fn alter_materialized_view(&self, name: &str, auto_refresh: bool) -> Result<()> { + let req = self + .client + .post(&format!("/v1/materialized_view/{}/alter", name)) + .json(&RemoteAlterMaterializedViewRequest { auto_refresh }); + let (request_id, rsp) = self.client.send(req).await?; + self.client.check_response(&request_id, rsp).await?; + Ok(()) + } + + async fn drop_materialized_view(&self, name: &str) -> Result<()> { + let req = self + .client + .post(&format!("/v1/materialized_view/{}/drop", name)); + let (request_id, rsp) = self.client.send(req).await?; + self.client.check_response(&request_id, rsp).await?; + Ok(()) + } + + async fn list_materialized_views(&self) -> Result> { + let req = self.client.get("/v1/materialized_view/list"); + let (request_id, rsp) = self.client.send(req).await?; + let rsp = self.client.check_response(&request_id, rsp).await?; + let body: RemoteListMaterializedViewsResponse = rsp.json().await.err_to_http(request_id)?; + Ok(body + .views + .into_iter() + .map(|v| MaterializedViewInfo { + name: v.name, + source_table: v.source_table, + projection: v.projection, + udf_columns: v.udf_columns, + filter: v.filter, + auto_refresh: v.auto_refresh, + }) + .collect()) + } + + async fn list_jobs(&self) -> Result> { + let req = self.client.get("/v1/job/list"); + let (request_id, rsp) = self.client.send(req).await?; + let rsp = self.client.check_response(&request_id, rsp).await?; + let body: RemoteListJobsResponse = rsp.json().await.err_to_http(request_id)?; + Ok(body + .jobs + .into_iter() + .map(|j| JobInfo { + table: j.table, + job_id: j.job_id, + job_type: j.job_type, + state: j.state, + column: j.column, + age_seconds: j.age_seconds, + command: j.command, + units_done: j.units_done, + units_total: j.units_total, + committed: j.committed, + rows_skipped: j.rows_skipped, + error: j.error, + }) + .collect()) + } + async fn open_table(&self, request: OpenTableRequest) -> Result> { let identifier = build_table_identifier( &request.name, @@ -1580,6 +1829,156 @@ mod tests { } } + #[tokio::test] + async fn test_derived_compute_routes() { + // create_function + let conn = Connection::new_with_handler(|request| { + assert_eq!(request.method(), &reqwest::Method::POST); + assert_eq!(request.url().path(), "/v1/function/embed/create"); + let body: serde_json::Value = + serde_json::from_slice(request.body().unwrap().as_bytes().unwrap()).unwrap(); + assert_eq!(body["language"], "python"); + assert_eq!(body["return_type"], "FLOAT[4]"); + assert_eq!(body["body"], "def embed(x): ..."); + assert_eq!(body["options"]["pip"], "torch"); + http::Response::builder() + .status(200) + .body(r#"{"name":"embed","status":"OK"}"#) + .unwrap() + }); + conn.create_function(crate::database::CreateFunctionRequest { + name: "embed".into(), + language: "python".into(), + return_type: "FLOAT[4]".into(), + body: "def embed(x): ...".into(), + options: [("pip".to_string(), "torch".to_string())].into(), + }) + .await + .unwrap(); + + // list_functions + let conn = Connection::new_with_handler(|request| { + assert_eq!(request.method(), &reqwest::Method::GET); + assert_eq!(request.url().path(), "/v1/function/list"); + http::Response::builder() + .status(200) + .body( + r#"{"functions":[{"name":"embed","language":"python","return_type":"Float32","description":""}]}"#, + ) + .unwrap() + }); + let functions = conn.list_functions().await.unwrap(); + assert_eq!(functions.len(), 1); + assert_eq!(functions[0].name, "embed"); + + // drop_function + let conn = Connection::new_with_handler(|request| { + assert_eq!(request.method(), &reqwest::Method::POST); + assert_eq!(request.url().path(), "/v1/function/embed/drop"); + http::Response::builder() + .status(200) + .body(r#"{"name":"embed","status":"OK"}"#) + .unwrap() + }); + conn.drop_function("embed").await.unwrap(); + + // create_materialized_view + let conn = Connection::new_with_handler(|request| { + assert_eq!(request.method(), &reqwest::Method::POST); + assert_eq!(request.url().path(), "/v1/materialized_view/mv1/create"); + let body: serde_json::Value = + serde_json::from_slice(request.body().unwrap().as_bytes().unwrap()).unwrap(); + assert_eq!(body["query"], "SELECT id, embed(body) AS vec FROM docs"); + assert_eq!(body["auto_refresh"], true); + assert_eq!(body["with_no_data"], false); + http::Response::builder() + .status(200) + .body(r#"{"name":"mv1","job_id":"j-1"}"#) + .unwrap() + }); + let mut request = crate::database::CreateMaterializedViewRequest::new( + "mv1", + "SELECT id, embed(body) AS vec FROM docs", + ); + request.auto_refresh = true; + let job_id = conn.create_materialized_view(request).await.unwrap(); + assert_eq!(job_id.as_deref(), Some("j-1")); + + // refresh_materialized_view + let conn = Connection::new_with_handler(|request| { + assert_eq!(request.method(), &reqwest::Method::POST); + assert_eq!(request.url().path(), "/v1/materialized_view/mv1/refresh"); + let body: serde_json::Value = + serde_json::from_slice(request.body().unwrap().as_bytes().unwrap()).unwrap(); + assert_eq!(body["num_workers"], 2); + assert!(body.get("src_version").is_none()); + http::Response::builder() + .status(202) + .body(r#"{"job_id":"j-2"}"#) + .unwrap() + }); + let mut request = crate::database::RefreshMaterializedViewRequest::new("mv1"); + request.num_workers = Some(2); + let job_id = conn.refresh_materialized_view(request).await.unwrap(); + assert_eq!(job_id, "j-2"); + + // alter_materialized_view + let conn = Connection::new_with_handler(|request| { + assert_eq!(request.method(), &reqwest::Method::POST); + assert_eq!(request.url().path(), "/v1/materialized_view/mv1/alter"); + let body: serde_json::Value = + serde_json::from_slice(request.body().unwrap().as_bytes().unwrap()).unwrap(); + assert_eq!(body["auto_refresh"], false); + http::Response::builder() + .status(200) + .body(r#"{"name":"mv1","status":"OK"}"#) + .unwrap() + }); + conn.alter_materialized_view("mv1", false).await.unwrap(); + + // drop_materialized_view + let conn = Connection::new_with_handler(|request| { + assert_eq!(request.url().path(), "/v1/materialized_view/mv1/drop"); + http::Response::builder() + .status(200) + .body(r#"{"name":"mv1","status":"OK"}"#) + .unwrap() + }); + conn.drop_materialized_view("mv1").await.unwrap(); + + // list_materialized_views + let conn = Connection::new_with_handler(|request| { + assert_eq!(request.method(), &reqwest::Method::GET); + assert_eq!(request.url().path(), "/v1/materialized_view/list"); + http::Response::builder() + .status(200) + .body( + r#"{"views":[{"name":"mv1","source_table":"docs","projection":["id"],"udf_columns":["vec=embed(body)"],"filter":null,"auto_refresh":true}]}"#, + ) + .unwrap() + }); + let views = conn.list_materialized_views().await.unwrap(); + assert_eq!(views.len(), 1); + assert_eq!(views[0].source_table, "docs"); + assert!(views[0].auto_refresh); + + // list_jobs + let conn = Connection::new_with_handler(|request| { + assert_eq!(request.method(), &reqwest::Method::GET); + assert_eq!(request.url().path(), "/v1/job/list"); + http::Response::builder() + .status(200) + .body( + r#"{"jobs":[{"table":"docs","job_id":"j-3","job_type":"udf_virtual_column_backfill","state":"running","column":"vec","age_seconds":4,"command":null,"units_done":1,"units_total":2,"committed":false,"rows_skipped":0,"error":null}]}"#, + ) + .unwrap() + }); + let jobs = conn.list_jobs().await.unwrap(); + assert_eq!(jobs.len(), 1); + assert_eq!(jobs[0].state, "running"); + assert_eq!(jobs[0].units_total, Some(2)); + } + #[tokio::test] async fn test_clone_table() { let conn = Connection::new_with_handler(|request| { diff --git a/rust/lancedb/src/remote/table.rs b/rust/lancedb/src/remote/table.rs index f3976d4af..47055ab1d 100644 --- a/rust/lancedb/src/remote/table.rs +++ b/rust/lancedb/src/remote/table.rs @@ -2309,6 +2309,37 @@ impl BaseTable for RemoteTable { message: "optimize is not supported on LanceDB cloud.".into(), }) } + async fn refresh_column( + &self, + columns: &[String], + where_clause: Option, + num_workers: Option, + max_workers: Option, + ) -> Result { + let mut body = serde_json::json!({ "columns": columns }); + if let Some(w) = where_clause { + body["where_clause"] = serde_json::Value::String(w); + } + if let Some(n) = num_workers { + body["num_workers"] = n.into(); + } + if let Some(n) = max_workers { + body["max_workers"] = n.into(); + } + let request = self + .client + .post(&format!("/v1/table/{}/refresh_column", self.identifier)) + .json(&body); + let (request_id, response) = self.send(request, true).await?; + let response = self.check_table_response(&request_id, response).await?; + #[derive(serde::Deserialize)] + struct RefreshColumnResponse { + job_id: String, + } + let body: RefreshColumnResponse = response.json().await.err_to_http(request_id)?; + Ok(body.job_id) + } + async fn add_columns( &self, transforms: NewColumnTransform, @@ -2801,6 +2832,30 @@ mod tests { } } + #[tokio::test] + async fn test_refresh_column() { + let table = Table::new_with_handler("my_table", |request| { + assert_eq!(request.method(), "POST"); + assert_eq!(request.url().path(), "/v1/table/my_table/refresh_column"); + let body: serde_json::Value = + serde_json::from_slice(request.body().unwrap().as_bytes().unwrap()).unwrap(); + assert_eq!(body["columns"], serde_json::json!(["vec"])); + assert_eq!(body["num_workers"], 2); + assert!(body.get("where_clause").is_none()); + + http::Response::builder() + .status(202) + .body(r#"{"job_id":"j-9"}"#) + .unwrap() + }); + + let job_id = table + .refresh_column(&["vec".to_string()], None, Some(2), None) + .await + .unwrap(); + assert_eq!(job_id, "j-9"); + } + #[tokio::test] async fn test_version() { let table = Table::new_with_handler("my_table", |request| { diff --git a/rust/lancedb/src/table.rs b/rust/lancedb/src/table.rs index 156a07821..7ad896dab 100644 --- a/rust/lancedb/src/table.rs +++ b/rust/lancedb/src/table.rs @@ -620,6 +620,22 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync { transforms: NewColumnTransform, read_columns: Option>, ) -> Result; + /// Trigger recompute of computed columns. The expression is + /// resolved server-side from each column's stored binding; columns + /// bound to the same struct-returning function refresh together. + /// Returns the refresh job id. Server-backed feature (LanceDB + /// Enterprise / Cloud); the default returns NotSupported. + async fn refresh_column( + &self, + _columns: &[String], + _where_clause: Option, + _num_workers: Option, + _max_workers: Option, + ) -> Result { + Err(Error::NotSupported { + message: "refresh_column is not supported by this table".into(), + }) + } /// Alter columns in the table. async fn alter_columns(&self, alterations: &[ColumnAlteration]) -> Result; /// Drop columns from the table. @@ -1461,6 +1477,22 @@ impl Table { self.inner.add_columns(transforms, read_columns).await } + /// Trigger recompute of computed columns (REFRESH COLUMN). The + /// expression comes from each column's stored binding; columns + /// bound to the same struct-returning function refresh together. + /// Returns the refresh job id. Server-backed feature. + pub async fn refresh_column( + &self, + columns: &[String], + where_clause: Option, + num_workers: Option, + max_workers: Option, + ) -> Result { + self.inner + .refresh_column(columns, where_clause, num_workers, max_workers) + .await + } + /// Change a column's name or nullability. pub async fn alter_columns( &self,