From d4f4fef3baf0dff887cb7612d0c841cdcde53fcc Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Sun, 14 Jun 2026 06:30:08 -0700 Subject: [PATCH] feat(refresh): batch_size is a per-refresh knob (refresh_column), not a function-only option batch_size / num_workers / max_workers are invocation concerns (how to schedule THIS refresh), so expose batch_size on refresh_column through every layer (Python sync+async -> pyo3 -> Rust client -> the REST RefreshColumnRequest.batch_size, which the handler already forwards into the backfill). num_workers/max_workers were already invocation- placed; batch_size was the gap. The function may still carry a default; the refresh override wins (extends the batch_size_override model). Both crates cargo-check clean. Co-Authored-By: Claude Opus 4.8 (1M context) --- python/python/lancedb/table.py | 14 +++++++++++++- python/src/table.rs | 5 +++-- rust/lancedb/src/remote/table.rs | 6 +++++- rust/lancedb/src/table.rs | 4 +++- 4 files changed, 24 insertions(+), 5 deletions(-) diff --git a/python/python/lancedb/table.py b/python/python/lancedb/table.py index 4c23b6e21..bcaf70bf8 100644 --- a/python/python/lancedb/table.py +++ b/python/python/lancedb/table.py @@ -3808,6 +3808,7 @@ class LanceTable(Table): where: Optional[str] = None, num_workers: Optional[int] = None, max_workers: Optional[int] = None, + batch_size: Optional[int] = None, ) -> str: """Trigger recompute of computed columns (REFRESH COLUMN). @@ -3815,6 +3816,10 @@ class LanceTable(Table): binding; columns bound to the same struct-returning function refresh together. Returns the refresh job id. Server-backed feature (LanceDB Enterprise / Cloud). + + num_workers / max_workers / batch_size are per-refresh scheduling + knobs (how to run THIS refresh) and override any default the + function carries. """ if isinstance(columns, str): columns = [columns] @@ -3824,6 +3829,7 @@ class LanceTable(Table): where=where, num_workers=num_workers, max_workers=max_workers, + batch_size=batch_size, ) ) @@ -5511,9 +5517,14 @@ class AsyncTable: where: Optional[str] = None, num_workers: Optional[int] = None, max_workers: Optional[int] = None, + batch_size: Optional[int] = None, ) -> str: """Trigger recompute of computed columns (REFRESH COLUMN). - Returns the refresh job id. Server-backed feature.""" + Returns the refresh job id. Server-backed feature. + + num_workers / max_workers / batch_size are per-refresh scheduling + knobs (how to run THIS refresh); they override any default the + function carries.""" if isinstance(columns, str): columns = [columns] return await self._inner.refresh_column( @@ -5521,6 +5532,7 @@ class AsyncTable: where_clause=where, num_workers=num_workers, max_workers=max_workers, + batch_size=batch_size, ) async def add_columns( diff --git a/python/src/table.rs b/python/src/table.rs index fdec38237..1ef09e7b6 100644 --- a/python/src/table.rs +++ b/python/src/table.rs @@ -1074,18 +1074,19 @@ impl Table { }) } - #[pyo3(signature = (columns, where_clause=None, num_workers=None, max_workers=None))] + #[pyo3(signature = (columns, where_clause=None, num_workers=None, max_workers=None, batch_size=None))] pub fn refresh_column( self_: PyRef<'_, Self>, columns: Vec, where_clause: Option, num_workers: Option, max_workers: Option, + batch_size: Option, ) -> PyResult> { let inner = self_.inner_ref()?.clone(); future_into_py(self_.py(), async move { inner - .refresh_column(&columns, where_clause, num_workers, max_workers) + .refresh_column(&columns, where_clause, num_workers, max_workers, batch_size) .await .infer_error() }) diff --git a/rust/lancedb/src/remote/table.rs b/rust/lancedb/src/remote/table.rs index c52f6d393..cfbd82d31 100644 --- a/rust/lancedb/src/remote/table.rs +++ b/rust/lancedb/src/remote/table.rs @@ -2338,6 +2338,7 @@ impl BaseTable for RemoteTable { where_clause: Option, num_workers: Option, max_workers: Option, + batch_size: Option, ) -> Result { let mut body = serde_json::json!({ "columns": columns }); if let Some(w) = where_clause { @@ -2349,6 +2350,9 @@ impl BaseTable for RemoteTable { if let Some(n) = max_workers { body["max_workers"] = n.into(); } + if let Some(n) = batch_size { + body["batch_size"] = n.into(); + } let request = self .client .post(&format!("/v1/table/{}/refresh_column", self.identifier)) @@ -2873,7 +2877,7 @@ mod tests { }); let job_id = table - .refresh_column(&["vec".to_string()], None, Some(2), None) + .refresh_column(&["vec".to_string()], None, Some(2), None, None) .await .unwrap(); assert_eq!(job_id, "j-9"); diff --git a/rust/lancedb/src/table.rs b/rust/lancedb/src/table.rs index 61e18a18d..90b605b90 100644 --- a/rust/lancedb/src/table.rs +++ b/rust/lancedb/src/table.rs @@ -646,6 +646,7 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync { _where_clause: Option, _num_workers: Option, _max_workers: Option, + _batch_size: Option, ) -> Result { Err(Error::NotSupported { message: "refresh_column is not supported by this table".into(), @@ -1513,9 +1514,10 @@ impl Table { where_clause: Option, num_workers: Option, max_workers: Option, + batch_size: Option, ) -> Result { self.inner - .refresh_column(columns, where_clause, num_workers, max_workers) + .refresh_column(columns, where_clause, num_workers, max_workers, batch_size) .await }