mirror of
https://github.com/lancedb/lancedb.git
synced 2026-07-02 18:40:40 +00:00
feat(refresh): priority as a per-refresh knob; fix batch_size on RemoteTable
Thread priority (Kueue tier) through refresh_column at every layer (Python sync+async + RemoteTable -> pyo3 -> Rust client trait/public/remote -> REST body), mirroring num_workers/batch_size. The function keeps its priority as a default; the per-refresh value overrides. Also adds the previously-missed batch_size to RemoteTable.refresh_column (the REST sync path). cargo check (lancedb --features remote --tests, lancedb-python) + ruff clean. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -904,6 +904,8 @@ class RemoteTable(Table):
|
||||
where: Optional[str] = None,
|
||||
num_workers: Optional[int] = None,
|
||||
max_workers: Optional[int] = None,
|
||||
batch_size: Optional[int] = None,
|
||||
priority: Optional[str] = None,
|
||||
) -> str:
|
||||
"""Trigger recompute of computed columns (REFRESH COLUMN).
|
||||
|
||||
@@ -911,6 +913,11 @@ class RemoteTable(Table):
|
||||
binding; columns bound to the same struct-returning function
|
||||
refresh together. Returns the refresh job id. Server-backed
|
||||
feature (LanceDB Enterprise / Cloud).
|
||||
|
||||
num_workers / max_workers / batch_size / priority are per-refresh
|
||||
scheduling knobs (how to run THIS refresh) and override any default
|
||||
the function carries. `priority` is a Kueue tier
|
||||
(training | interactive | backfill).
|
||||
"""
|
||||
if isinstance(columns, str):
|
||||
columns = [columns]
|
||||
@@ -920,10 +927,11 @@ class RemoteTable(Table):
|
||||
where=where,
|
||||
num_workers=num_workers,
|
||||
max_workers=max_workers,
|
||||
batch_size=batch_size,
|
||||
priority=priority,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def alter_columns(
|
||||
self, *alterations: Iterable[Dict[str, str]]
|
||||
) -> AlterColumnsResult:
|
||||
|
||||
@@ -702,7 +702,6 @@ def _normalize_progress(progress):
|
||||
return progress, False
|
||||
|
||||
|
||||
|
||||
def _computed_groups(computed):
|
||||
"""Group computed columns by expression, preserving declaration order
|
||||
(struct-returning functions need their columns adjacent so schema order
|
||||
@@ -846,7 +845,7 @@ class Table(ABC):
|
||||
import warnings
|
||||
|
||||
warnings.warn(
|
||||
'add_computed_column is deprecated; use add_columns(computed='
|
||||
"add_computed_column is deprecated; use add_columns(computed="
|
||||
'{"vec": embed("data")}).',
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
@@ -3783,7 +3782,11 @@ class LanceTable(Table):
|
||||
|
||||
def add_columns(
|
||||
self,
|
||||
transforms: Dict[str, str] | pa.field | List[pa.field] | pa.Schema | None = None,
|
||||
transforms: Dict[str, str]
|
||||
| pa.field
|
||||
| List[pa.field]
|
||||
| pa.Schema
|
||||
| None = None,
|
||||
*,
|
||||
computed: Optional[Dict] = None,
|
||||
) -> Optional[AddColumnsResult]:
|
||||
@@ -3809,6 +3812,7 @@ class LanceTable(Table):
|
||||
num_workers: Optional[int] = None,
|
||||
max_workers: Optional[int] = None,
|
||||
batch_size: Optional[int] = None,
|
||||
priority: Optional[str] = None,
|
||||
) -> str:
|
||||
"""Trigger recompute of computed columns (REFRESH COLUMN).
|
||||
|
||||
@@ -3817,9 +3821,10 @@ class LanceTable(Table):
|
||||
refresh together. Returns the refresh job id. Server-backed
|
||||
feature (LanceDB Enterprise / Cloud).
|
||||
|
||||
num_workers / max_workers / batch_size are per-refresh scheduling
|
||||
knobs (how to run THIS refresh) and override any default the
|
||||
function carries.
|
||||
num_workers / max_workers / batch_size / priority are per-refresh
|
||||
scheduling knobs (how to run THIS refresh) and override any default
|
||||
the function carries. `priority` is a Kueue tier
|
||||
(training | interactive | backfill).
|
||||
"""
|
||||
if isinstance(columns, str):
|
||||
columns = [columns]
|
||||
@@ -3830,10 +3835,10 @@ class LanceTable(Table):
|
||||
num_workers=num_workers,
|
||||
max_workers=max_workers,
|
||||
batch_size=batch_size,
|
||||
priority=priority,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def alter_columns(
|
||||
self, *alterations: Iterable[Dict[str, str]]
|
||||
) -> AlterColumnsResult:
|
||||
@@ -5518,13 +5523,15 @@ class AsyncTable:
|
||||
num_workers: Optional[int] = None,
|
||||
max_workers: Optional[int] = None,
|
||||
batch_size: Optional[int] = None,
|
||||
priority: Optional[str] = None,
|
||||
) -> str:
|
||||
"""Trigger recompute of computed columns (REFRESH COLUMN).
|
||||
Returns the refresh job id. Server-backed feature.
|
||||
|
||||
num_workers / max_workers / batch_size are per-refresh scheduling
|
||||
knobs (how to run THIS refresh); they override any default the
|
||||
function carries."""
|
||||
num_workers / max_workers / batch_size / priority are per-refresh
|
||||
scheduling knobs (how to run THIS refresh); they override any default
|
||||
the function carries. `priority` is a Kueue tier
|
||||
(training | interactive | backfill)."""
|
||||
if isinstance(columns, str):
|
||||
columns = [columns]
|
||||
return await self._inner.refresh_column(
|
||||
@@ -5533,11 +5540,16 @@ class AsyncTable:
|
||||
num_workers=num_workers,
|
||||
max_workers=max_workers,
|
||||
batch_size=batch_size,
|
||||
priority=priority,
|
||||
)
|
||||
|
||||
async def add_columns(
|
||||
self,
|
||||
transforms: dict[str, str] | pa.field | List[pa.field] | pa.Schema | None = None,
|
||||
transforms: dict[str, str]
|
||||
| pa.field
|
||||
| List[pa.field]
|
||||
| pa.Schema
|
||||
| None = None,
|
||||
*,
|
||||
computed: Optional[Dict] = None,
|
||||
) -> Optional[AddColumnsResult]:
|
||||
@@ -5598,7 +5610,7 @@ class AsyncTable:
|
||||
import warnings
|
||||
|
||||
warnings.warn(
|
||||
'add_computed_column is deprecated; use add_columns(computed='
|
||||
"add_computed_column is deprecated; use add_columns(computed="
|
||||
'{"col": fn("input_col")}).',
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
|
||||
Reference in New Issue
Block a user