feat(client): Table.load_columns() REST client for LOAD COLUMNS

Geneva Table.load_columns() parity on the REST-only client. Fills existing
columns from an external Parquet/Lance/IPC source by primary-key join.

- BaseTable::load_columns default (NotSupported) + public Table::load_columns,
  taking a LoadColumnsRequest (source uris/format/storage_options, target/source
  key, (target, source?) column mappings, on_missing, worker/batch/commit knobs).
- Remote impl POSTs to /v1/table/{id}/load_columns with the matching body;
  mock test asserts the request shape.
- PyO3 binding + Python remote Table.load_columns(source, pk, columns, *,
  source_format, source_pk, on_missing, ...) accepting a column list or
  {target: source} dict.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Wyatt Alt
2026-06-14 19:44:51 -07:00
committed by Jack Ye
parent 8b38500b07
commit 5810974b37
4 changed files with 247 additions and 2 deletions

View File

@@ -932,6 +932,70 @@ class RemoteTable(Table):
)
)
def load_columns(
self,
source: Union[str, Iterable[str]],
pk: str,
columns: Union[Iterable[str], Dict[str, str]],
*,
source_format: str = "parquet",
source_pk: Optional[str] = None,
on_missing: str = "carry",
source_storage_options: Optional[Dict[str, str]] = None,
num_workers: Optional[int] = None,
max_workers: Optional[int] = None,
batch_size: Optional[int] = None,
commit_granularity: Optional[int] = None,
priority: Optional[str] = None,
) -> str:
"""Fill existing columns from an external source by primary-key join.
The distributed-job equivalent of Geneva's ``Table.load_columns()``:
imports precomputed values (e.g. embeddings) from Parquet/Lance/IPC into
this table, matching on a primary key. Returns the load job id.
Server-backed feature (LanceDB Enterprise / Cloud).
Parameters
----------
source: str | list[str]
One source URI or a list of URIs.
pk: str
Destination primary-key column. Also the source key unless
``source_pk`` is given.
columns: list[str] | dict[str, str]
Value columns to load. A list loads same-named columns; a dict maps
``{target: source}``.
source_format: str
``"parquet"`` (default), ``"lance"``, or ``"ipc"``.
source_pk: str, optional
Source primary-key column when it differs from ``pk``.
on_missing: str
Behavior for destination rows with no source match:
``"carry"`` (default, keep existing), ``"null"``, or ``"error"``.
"""
if isinstance(source, str):
source = [source]
if isinstance(columns, dict):
mappings = [(target, src) for target, src in columns.items()]
else:
mappings = [(c, None) for c in columns]
return LOOP.run(
self._table.load_columns(
list(source),
source_format,
pk,
mappings,
source_key=source_pk,
source_storage_options=source_storage_options,
on_missing=on_missing,
num_workers=num_workers,
max_workers=max_workers,
batch_size=batch_size,
commit_granularity=commit_granularity,
priority=priority,
)
)
def alter_columns(
self, *alterations: Iterable[Dict[str, str]]
) -> AlterColumnsResult:

View File

@@ -17,8 +17,8 @@ use arrow::{
pyarrow::{FromPyArrow, PyArrowType, ToPyArrow},
};
use lancedb::table::{
AddDataMode, ColumnAlteration, Duration, FieldMetadataUpdate, NewColumnTransform,
OptimizeAction, OptimizeOptions, Ref, Table as LanceDbTable,
AddDataMode, ColumnAlteration, Duration, FieldMetadataUpdate, LoadColumnsRequest,
NewColumnTransform, OptimizeAction, OptimizeOptions, Ref, Table as LanceDbTable,
};
use pyo3::{
Bound, FromPyObject, Py, PyAny, PyRef, PyResult, Python,
@@ -1100,6 +1100,43 @@ impl Table {
})
}
#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (source_uris, source_format, target_key, columns, source_key=None, source_storage_options=None, on_missing=None, num_workers=None, max_workers=None, batch_size=None, commit_granularity=None, priority=None))]
pub fn load_columns(
self_: PyRef<'_, Self>,
source_uris: Vec<String>,
source_format: String,
target_key: String,
columns: Vec<(String, Option<String>)>,
source_key: Option<String>,
source_storage_options: Option<std::collections::HashMap<String, String>>,
on_missing: Option<String>,
num_workers: Option<u32>,
max_workers: Option<u32>,
batch_size: Option<u32>,
commit_granularity: Option<u32>,
priority: Option<String>,
) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
let request = LoadColumnsRequest {
source_uris,
source_format,
source_storage_options,
target_key,
source_key,
columns,
on_missing,
num_workers,
max_workers,
batch_size,
commit_granularity,
priority,
};
future_into_py(self_.py(), async move {
inner.load_columns(request).await.infer_error()
})
}
pub fn add_columns(
self_: PyRef<'_, Self>,
definitions: Vec<(String, String)>,