diff --git a/python/python/lancedb/remote/table.py b/python/python/lancedb/remote/table.py index 07d04399f..4a39ce6f9 100644 --- a/python/python/lancedb/remote/table.py +++ b/python/python/lancedb/remote/table.py @@ -904,6 +904,8 @@ class RemoteTable(Table): where: Optional[str] = None, num_workers: Optional[int] = None, max_workers: Optional[int] = None, + batch_size: Optional[int] = None, + priority: Optional[str] = None, ) -> str: """Trigger recompute of computed columns (REFRESH COLUMN). @@ -911,6 +913,11 @@ class RemoteTable(Table): binding; columns bound to the same struct-returning function refresh together. Returns the refresh job id. Server-backed feature (LanceDB Enterprise / Cloud). + + num_workers / max_workers / batch_size / priority are per-refresh + scheduling knobs (how to run THIS refresh) and override any default + the function carries. `priority` is a Kueue tier + (training | interactive | backfill). """ if isinstance(columns, str): columns = [columns] @@ -920,10 +927,11 @@ class RemoteTable(Table): where=where, num_workers=num_workers, max_workers=max_workers, + batch_size=batch_size, + priority=priority, ) ) - def alter_columns( self, *alterations: Iterable[Dict[str, str]] ) -> AlterColumnsResult: diff --git a/python/python/lancedb/table.py b/python/python/lancedb/table.py index bcaf70bf8..62b84deff 100644 --- a/python/python/lancedb/table.py +++ b/python/python/lancedb/table.py @@ -702,7 +702,6 @@ def _normalize_progress(progress): return progress, False - def _computed_groups(computed): """Group computed columns by expression, preserving declaration order (struct-returning functions need their columns adjacent so schema order @@ -846,7 +845,7 @@ class Table(ABC): import warnings warnings.warn( - 'add_computed_column is deprecated; use add_columns(computed=' + "add_computed_column is deprecated; use add_columns(computed=" '{"vec": embed("data")}).', DeprecationWarning, stacklevel=2, @@ -3783,7 +3782,11 @@ class LanceTable(Table): def add_columns( self, - transforms: Dict[str, str] | pa.field | List[pa.field] | pa.Schema | None = None, + transforms: Dict[str, str] + | pa.field + | List[pa.field] + | pa.Schema + | None = None, *, computed: Optional[Dict] = None, ) -> Optional[AddColumnsResult]: @@ -3809,6 +3812,7 @@ class LanceTable(Table): num_workers: Optional[int] = None, max_workers: Optional[int] = None, batch_size: Optional[int] = None, + priority: Optional[str] = None, ) -> str: """Trigger recompute of computed columns (REFRESH COLUMN). @@ -3817,9 +3821,10 @@ class LanceTable(Table): refresh together. Returns the refresh job id. Server-backed feature (LanceDB Enterprise / Cloud). - num_workers / max_workers / batch_size are per-refresh scheduling - knobs (how to run THIS refresh) and override any default the - function carries. + num_workers / max_workers / batch_size / priority are per-refresh + scheduling knobs (how to run THIS refresh) and override any default + the function carries. `priority` is a Kueue tier + (training | interactive | backfill). """ if isinstance(columns, str): columns = [columns] @@ -3830,10 +3835,10 @@ class LanceTable(Table): num_workers=num_workers, max_workers=max_workers, batch_size=batch_size, + priority=priority, ) ) - def alter_columns( self, *alterations: Iterable[Dict[str, str]] ) -> AlterColumnsResult: @@ -5518,13 +5523,15 @@ class AsyncTable: num_workers: Optional[int] = None, max_workers: Optional[int] = None, batch_size: Optional[int] = None, + priority: Optional[str] = None, ) -> str: """Trigger recompute of computed columns (REFRESH COLUMN). Returns the refresh job id. Server-backed feature. - num_workers / max_workers / batch_size are per-refresh scheduling - knobs (how to run THIS refresh); they override any default the - function carries.""" + num_workers / max_workers / batch_size / priority are per-refresh + scheduling knobs (how to run THIS refresh); they override any default + the function carries. `priority` is a Kueue tier + (training | interactive | backfill).""" if isinstance(columns, str): columns = [columns] return await self._inner.refresh_column( @@ -5533,11 +5540,16 @@ class AsyncTable: num_workers=num_workers, max_workers=max_workers, batch_size=batch_size, + priority=priority, ) async def add_columns( self, - transforms: dict[str, str] | pa.field | List[pa.field] | pa.Schema | None = None, + transforms: dict[str, str] + | pa.field + | List[pa.field] + | pa.Schema + | None = None, *, computed: Optional[Dict] = None, ) -> Optional[AddColumnsResult]: @@ -5598,7 +5610,7 @@ class AsyncTable: import warnings warnings.warn( - 'add_computed_column is deprecated; use add_columns(computed=' + "add_computed_column is deprecated; use add_columns(computed=" '{"col": fn("input_col")}).', DeprecationWarning, stacklevel=2, diff --git a/python/src/table.rs b/python/src/table.rs index 1ef09e7b6..ff203e86e 100644 --- a/python/src/table.rs +++ b/python/src/table.rs @@ -1074,7 +1074,7 @@ impl Table { }) } - #[pyo3(signature = (columns, where_clause=None, num_workers=None, max_workers=None, batch_size=None))] + #[pyo3(signature = (columns, where_clause=None, num_workers=None, max_workers=None, batch_size=None, priority=None))] pub fn refresh_column( self_: PyRef<'_, Self>, columns: Vec, @@ -1082,11 +1082,19 @@ impl Table { num_workers: Option, max_workers: Option, batch_size: Option, + priority: Option, ) -> PyResult> { let inner = self_.inner_ref()?.clone(); future_into_py(self_.py(), async move { inner - .refresh_column(&columns, where_clause, num_workers, max_workers, batch_size) + .refresh_column( + &columns, + where_clause, + num_workers, + max_workers, + batch_size, + priority, + ) .await .infer_error() }) diff --git a/rust/lancedb/src/remote/table.rs b/rust/lancedb/src/remote/table.rs index cfbd82d31..9d95e3930 100644 --- a/rust/lancedb/src/remote/table.rs +++ b/rust/lancedb/src/remote/table.rs @@ -2339,6 +2339,7 @@ impl BaseTable for RemoteTable { num_workers: Option, max_workers: Option, batch_size: Option, + priority: Option, ) -> Result { let mut body = serde_json::json!({ "columns": columns }); if let Some(w) = where_clause { @@ -2353,6 +2354,9 @@ impl BaseTable for RemoteTable { if let Some(n) = batch_size { body["batch_size"] = n.into(); } + if let Some(p) = priority { + body["priority"] = serde_json::Value::String(p); + } let request = self .client .post(&format!("/v1/table/{}/refresh_column", self.identifier)) @@ -2877,7 +2881,7 @@ mod tests { }); let job_id = table - .refresh_column(&["vec".to_string()], None, Some(2), None, None) + .refresh_column(&["vec".to_string()], None, Some(2), None, None, None) .await .unwrap(); assert_eq!(job_id, "j-9"); diff --git a/rust/lancedb/src/table.rs b/rust/lancedb/src/table.rs index 90b605b90..b12f70a4e 100644 --- a/rust/lancedb/src/table.rs +++ b/rust/lancedb/src/table.rs @@ -647,6 +647,7 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync { _num_workers: Option, _max_workers: Option, _batch_size: Option, + _priority: Option, ) -> Result { Err(Error::NotSupported { message: "refresh_column is not supported by this table".into(), @@ -1515,9 +1516,17 @@ impl Table { num_workers: Option, max_workers: Option, batch_size: Option, + priority: Option, ) -> Result { self.inner - .refresh_column(columns, where_clause, num_workers, max_workers, batch_size) + .refresh_column( + columns, + where_clause, + num_workers, + max_workers, + batch_size, + priority, + ) .await }