diff --git a/python/python/lancedb/remote/table.py b/python/python/lancedb/remote/table.py index 1d6c0be0d..07d04399f 100644 --- a/python/python/lancedb/remote/table.py +++ b/python/python/lancedb/remote/table.py @@ -884,8 +884,18 @@ class RemoteTable(Table): def count_rows(self, filter: Optional[str] = None) -> int: return LOOP.run(self._table.count_rows(filter)) - def add_columns(self, transforms: Dict[str, str]) -> AddColumnsResult: - return LOOP.run(self._table.add_columns(transforms)) + def add_columns( + self, + transforms: Optional[Dict[str, str]] = None, + *, + computed: Optional[Dict[str, tuple]] = None, + ) -> Optional[AddColumnsResult]: + result = None + if transforms is not None: + result = LOOP.run(self._table.add_columns(transforms)) + if computed: + LOOP.run(self._table.add_columns(computed=computed)) + return result def refresh_column( self, diff --git a/python/python/lancedb/table.py b/python/python/lancedb/table.py index 686ce5003..d2a5a7bdd 100644 --- a/python/python/lancedb/table.py +++ b/python/python/lancedb/table.py @@ -702,6 +702,22 @@ def _normalize_progress(progress): return progress, False + +def _computed_groups(computed): + """Group {column: (sql_type, expression)} by expression, preserving + declaration order (struct-returning functions need their columns + adjacent so schema order matches field order).""" + groups = [] + for name, (sql_type, expression) in computed.items(): + for expr, cols in groups: + if expr == expression: + cols.append((name, sql_type)) + break + else: + groups.append((expression, [(name, sql_type)])) + return groups + + class Table(ABC): """ A Table is a collection of Records in a LanceDB Database. @@ -3710,9 +3726,20 @@ class LanceTable(Table): return LOOP.run(self._table.index_stats(index_name)) def add_columns( - self, transforms: Dict[str, str] | pa.field | List[pa.field] | pa.Schema - ) -> AddColumnsResult: - return LOOP.run(self._table.add_columns(transforms)) + self, + transforms: Dict[str, str] | pa.field | List[pa.field] | pa.Schema | None = None, + *, + computed: Optional[Dict[str, tuple]] = None, + ) -> Optional[AddColumnsResult]: + result = None + if transforms is not None: + result = LOOP.run(self._table.add_columns(transforms)) + if computed: + # computed: {column: (sql_type, expression)} -- declares the + # binding only; the server fills the values (server-backed). + result_unused = LOOP.run(self._table.add_columns(computed=computed)) + del result_unused + return result def refresh_column( self, @@ -5437,8 +5464,11 @@ class AsyncTable: ) async def add_columns( - self, transforms: dict[str, str] | pa.field | List[pa.field] | pa.Schema - ) -> AddColumnsResult: + self, + transforms: dict[str, str] | pa.field | List[pa.field] | pa.Schema | None = None, + *, + computed: Optional[Dict[str, tuple]] = None, + ) -> Optional[AddColumnsResult]: """ Add new columns with defined values. @@ -5457,6 +5487,7 @@ class AsyncTable: version: the new version number of the table after adding columns. """ + result = None if isinstance(transforms, pa.Field): transforms = [transforms] if isinstance(transforms, list) and all( @@ -5464,9 +5495,15 @@ class AsyncTable: ): transforms = pa.schema(transforms) if isinstance(transforms, pa.Schema): - return await self._inner.add_columns_with_schema(transforms) - else: - return await self._inner.add_columns(list(transforms.items())) + result = await self._inner.add_columns_with_schema(transforms) + elif transforms is not None: + result = await self._inner.add_columns(list(transforms.items())) + if computed: + # computed: {column: (sql_type, expression)} -- declares the + # binding only; the server fills the values (server-backed). + for expression, cols in _computed_groups(computed): + await self._inner.add_computed_columns(cols, expression) + return result async def alter_columns( self, *alterations: Iterable[dict[str, Any]] diff --git a/python/src/table.rs b/python/src/table.rs index 4da9e2b80..fdec38237 100644 --- a/python/src/table.rs +++ b/python/src/table.rs @@ -1060,6 +1060,20 @@ impl Table { }) } + pub fn add_computed_columns( + self_: PyRef<'_, Self>, + columns: Vec<(String, String)>, + expression: String, + ) -> PyResult> { + let inner = self_.inner_ref()?.clone(); + future_into_py(self_.py(), async move { + inner + .add_computed_columns(&columns, &expression) + .await + .infer_error() + }) + } + #[pyo3(signature = (columns, where_clause=None, num_workers=None, max_workers=None))] pub fn refresh_column( self_: PyRef<'_, Self>, diff --git a/rust/lancedb/src/remote/table.rs b/rust/lancedb/src/remote/table.rs index 47055ab1d..c52f6d393 100644 --- a/rust/lancedb/src/remote/table.rs +++ b/rust/lancedb/src/remote/table.rs @@ -2309,6 +2309,29 @@ impl BaseTable for RemoteTable { message: "optimize is not supported on LanceDB cloud.".into(), }) } + async fn add_computed_columns( + &self, + columns: &[(String, String)], + expression: &str, + ) -> Result<()> { + let new_columns: Vec = columns + .iter() + .map(|(name, data_type)| { + serde_json::json!({ + "name": name, + "computed": { "data_type": data_type, "expression": expression }, + }) + }) + .collect(); + let request = self + .client + .post(&format!("/v1/table/{}/add_columns/", self.identifier)) + .json(&serde_json::json!({ "new_columns": new_columns })); + let (request_id, response) = self.send(request, true).await?; + self.check_table_response(&request_id, response).await?; + Ok(()) + } + async fn refresh_column( &self, columns: &[String], diff --git a/rust/lancedb/src/table.rs b/rust/lancedb/src/table.rs index 7ad896dab..61e18a18d 100644 --- a/rust/lancedb/src/table.rs +++ b/rust/lancedb/src/table.rs @@ -620,6 +620,21 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync { transforms: NewColumnTransform, read_columns: Option>, ) -> Result; + /// Declare computed columns bound to a registered function: each + /// `(name, sql_type)` is added all-null with the expression stored + /// as its binding; no compute happens here (the server's lazy + /// detector or refresh_column fills them). Several columns map a + /// struct-returning function's fields positionally. Server-backed + /// feature; the default returns NotSupported. + async fn add_computed_columns( + &self, + _columns: &[(String, String)], + _expression: &str, + ) -> Result<()> { + Err(Error::NotSupported { + message: "computed columns are not supported by this table".into(), + }) + } /// Trigger recompute of computed columns. The expression is /// resolved server-side from each column's stored binding; columns /// bound to the same struct-returning function refresh together. @@ -1477,6 +1492,17 @@ impl Table { self.inner.add_columns(transforms, read_columns).await } + /// Declare computed columns bound to a registered function + /// (`(name, sql_type)` pairs + a `f(args)` expression). No compute + /// happens here. Server-backed feature. + pub async fn add_computed_columns( + &self, + columns: &[(String, String)], + expression: &str, + ) -> Result<()> { + self.inner.add_computed_columns(columns, expression).await + } + /// Trigger recompute of computed columns (REFRESH COLUMN). The /// expression comes from each column's stored binding; columns /// bound to the same struct-returning function refresh together.