Files
lancedb/python/src/table.rs
Will Jones c0230f91d2 feat(rust)!: accept RecordBatch, Vec<RecordBatch> in create_table() and Table.add() (#2948)
BREAKING CHANGE: Arbitrary `impl RecordBatchReader` is no longer
accepted, it must be made into `Box<dyn RecordBatchReader>`.

This PR replaces `IntoArrow` with a new trait `Scannable` to define
input row data. This provides the following advantages:

1. **We can implement `Scannable` for more types than `IntoArrow`, such
as `RecordBatch` and `Vec<RecordBatch>`.** The `IntoArrow` trait was
implemented for arbitrary `T: RecordBatchReader`, and the Rust compiler
would prevent us from implementing it for foreign types like
`RecordBatch` because (theoretically) those types might implement
`RecordBatchReader` in the future. That's why we implement `Scannable`
for `Box<dyn RecordBatchReader>` instead; since it's a concrete type it
doesn't block implementing for other foreign types.
2. **We can potentially replay `Scannable` values**. Previously, we had
to choose between buffering all data in memory and supporting retries of
writes. But because `Scannable` things can optionally support
re-scanning, we now have a way of supporting retries while also
streaming.
3. **`Scannable` can provide hints like `num_rows`, which can be used to
schedule parallel writers.** Without knowing the total number of rows,
it's difficult to know whether it's worth writing multiple files in
parallel.

We don't yet fully take advantage of (2) and (3) yet, but will in future
PRs. For (2), in order to be ready to leverage this, we need to hook the
`Scannable` implementation up to Python and NodeJS bindings. Right now
they always pass down a stream, but we want to make sure they support
retries when possible. And for (3), this will need to be hooked up to
#2939 and to a pipeline for running pre-processing steps (like embedding
generation).

## Other changes

* Moved `create_table` and `add_data` into their own modules. I've
created a follow up issue to split up `table.rs` further, as it's by far
the largest file: https://github.com/lancedb/lancedb/issues/2949
* Eliminated the `HAS_DATA` generic for `CreateTableBuilder`. I didn't
see any public-facing places where we differentiated methods, which is
why I felt this simplification was okay.
* Added an `Error::External` variant and integrated some conversions to
allow certain errors to pass through transparently. This will fully work
once we upgrade Lance and get to take advantage of changes in
https://github.com/lance-format/lance/pull/5606
* Added LZ4 compression support for write requests to remote endpoints.
I checked and this has been supported on the server for > 1 year.

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-13 14:18:36 -08:00

939 lines
30 KiB
Rust

// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use std::{collections::HashMap, sync::Arc};
use crate::{
connection::Connection,
error::PythonErrorExt,
index::{extract_index_params, IndexConfig},
query::{Query, TakeQuery},
};
use arrow::{
datatypes::{DataType, Schema},
ffi_stream::ArrowArrayStreamReader,
pyarrow::{FromPyArrow, PyArrowType, ToPyArrow},
};
use lancedb::table::{
AddDataMode, ColumnAlteration, Duration, NewColumnTransform, OptimizeAction, OptimizeOptions,
Table as LanceDbTable,
};
use pyo3::{
exceptions::{PyKeyError, PyRuntimeError, PyValueError},
pyclass, pymethods,
types::{IntoPyDict, PyAnyMethods, PyDict, PyDictMethods},
Bound, FromPyObject, PyAny, PyRef, PyResult, Python,
};
use pyo3_async_runtimes::tokio::future_into_py;
/// Statistics about a compaction operation.
#[pyclass(get_all)]
#[derive(Clone, Debug)]
pub struct CompactionStats {
/// The number of fragments removed
pub fragments_removed: u64,
/// The number of new, compacted fragments added
pub fragments_added: u64,
/// The number of data files removed
pub files_removed: u64,
/// The number of new, compacted data files added
pub files_added: u64,
}
/// Statistics about a cleanup operation
#[pyclass(get_all)]
#[derive(Clone, Debug)]
pub struct RemovalStats {
/// The number of bytes removed
pub bytes_removed: u64,
/// The number of old versions removed
pub old_versions_removed: u64,
}
/// Statistics about an optimize operation
#[pyclass(get_all)]
#[derive(Clone, Debug)]
pub struct OptimizeStats {
/// Statistics about the compaction operation
pub compaction: CompactionStats,
/// Statistics about the removal operation
pub prune: RemovalStats,
}
#[pyclass(get_all)]
#[derive(Clone, Debug)]
pub struct UpdateResult {
pub rows_updated: u64,
pub version: u64,
}
#[pymethods]
impl UpdateResult {
pub fn __repr__(&self) -> String {
format!(
"UpdateResult(rows_updated={}, version={})",
self.rows_updated, self.version
)
}
}
impl From<lancedb::table::UpdateResult> for UpdateResult {
fn from(result: lancedb::table::UpdateResult) -> Self {
Self {
rows_updated: result.rows_updated,
version: result.version,
}
}
}
#[pyclass(get_all)]
#[derive(Clone, Debug)]
pub struct AddResult {
pub version: u64,
}
#[pymethods]
impl AddResult {
pub fn __repr__(&self) -> String {
format!("AddResult(version={})", self.version)
}
}
impl From<lancedb::table::AddResult> for AddResult {
fn from(result: lancedb::table::AddResult) -> Self {
Self {
version: result.version,
}
}
}
#[pyclass(get_all)]
#[derive(Clone, Debug)]
pub struct DeleteResult {
pub version: u64,
}
#[pymethods]
impl DeleteResult {
pub fn __repr__(&self) -> String {
format!("DeleteResult(version={})", self.version)
}
}
impl From<lancedb::table::DeleteResult> for DeleteResult {
fn from(result: lancedb::table::DeleteResult) -> Self {
Self {
version: result.version,
}
}
}
#[pyclass(get_all)]
#[derive(Clone, Debug)]
pub struct MergeResult {
pub version: u64,
pub num_updated_rows: u64,
pub num_inserted_rows: u64,
pub num_deleted_rows: u64,
pub num_attempts: u32,
}
#[pymethods]
impl MergeResult {
pub fn __repr__(&self) -> String {
format!(
"MergeResult(version={}, num_updated_rows={}, num_inserted_rows={}, num_deleted_rows={}, num_attempts={})",
self.version,
self.num_updated_rows,
self.num_inserted_rows,
self.num_deleted_rows,
self.num_attempts
)
}
}
impl From<lancedb::table::MergeResult> for MergeResult {
fn from(result: lancedb::table::MergeResult) -> Self {
Self {
version: result.version,
num_updated_rows: result.num_updated_rows,
num_inserted_rows: result.num_inserted_rows,
num_deleted_rows: result.num_deleted_rows,
num_attempts: result.num_attempts,
}
}
}
#[pyclass(get_all)]
#[derive(Clone, Debug)]
pub struct AddColumnsResult {
pub version: u64,
}
#[pymethods]
impl AddColumnsResult {
pub fn __repr__(&self) -> String {
format!("AddColumnsResult(version={})", self.version)
}
}
impl From<lancedb::table::AddColumnsResult> for AddColumnsResult {
fn from(result: lancedb::table::AddColumnsResult) -> Self {
Self {
version: result.version,
}
}
}
#[pyclass(get_all)]
#[derive(Clone, Debug)]
pub struct AlterColumnsResult {
pub version: u64,
}
#[pymethods]
impl AlterColumnsResult {
pub fn __repr__(&self) -> String {
format!("AlterColumnsResult(version={})", self.version)
}
}
impl From<lancedb::table::AlterColumnsResult> for AlterColumnsResult {
fn from(result: lancedb::table::AlterColumnsResult) -> Self {
Self {
version: result.version,
}
}
}
#[pyclass(get_all)]
#[derive(Clone, Debug)]
pub struct DropColumnsResult {
pub version: u64,
}
#[pymethods]
impl DropColumnsResult {
pub fn __repr__(&self) -> String {
format!("DropColumnsResult(version={})", self.version)
}
}
impl From<lancedb::table::DropColumnsResult> for DropColumnsResult {
fn from(result: lancedb::table::DropColumnsResult) -> Self {
Self {
version: result.version,
}
}
}
#[pyclass]
pub struct Table {
// We keep a copy of the name to use if the inner table is dropped
name: String,
inner: Option<LanceDbTable>,
}
#[pymethods]
impl OptimizeStats {
pub fn __repr__(&self) -> String {
format!(
"OptimizeStats(compaction={:?}, prune={:?})",
self.compaction, self.prune
)
}
}
impl Table {
pub(crate) fn new(inner: LanceDbTable) -> Self {
Self {
name: inner.name().to_string(),
inner: Some(inner),
}
}
}
impl Table {
pub(crate) fn inner_ref(&self) -> PyResult<&LanceDbTable> {
self.inner
.as_ref()
.ok_or_else(|| PyRuntimeError::new_err(format!("Table {} is closed", self.name)))
}
}
#[pymethods]
impl Table {
pub fn name(&self) -> String {
self.name.clone()
}
/// Returns True if the table is open, False if it is closed.
pub fn is_open(&self) -> bool {
self.inner.is_some()
}
/// Closes the table, releasing any resources associated with it.
pub fn close(&mut self) {
self.inner.take();
}
pub fn database(&self) -> PyResult<Connection> {
let inner = self.inner_ref()?.clone();
let inner_connection =
lancedb::Connection::new(inner.database().clone(), inner.embedding_registry().clone());
Ok(Connection::new(inner_connection))
}
pub fn schema(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
let schema = inner.schema().await.infer_error()?;
Python::attach(|py| schema.to_pyarrow(py).map(|obj| obj.unbind()))
})
}
pub fn add<'a>(
self_: PyRef<'a, Self>,
data: Bound<'_, PyAny>,
mode: String,
) -> PyResult<Bound<'a, PyAny>> {
let batches: Box<dyn arrow::array::RecordBatchReader + Send> =
Box::new(ArrowArrayStreamReader::from_pyarrow_bound(&data)?);
let mut op = self_.inner_ref()?.add(batches);
if mode == "append" {
op = op.mode(AddDataMode::Append);
} else if mode == "overwrite" {
op = op.mode(AddDataMode::Overwrite);
} else {
return Err(PyValueError::new_err(format!("Invalid mode: {}", mode)));
}
future_into_py(self_.py(), async move {
let result = op.execute().await.infer_error()?;
Ok(AddResult::from(result))
})
}
pub fn delete(self_: PyRef<'_, Self>, condition: String) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
let result = inner.delete(&condition).await.infer_error()?;
Ok(DeleteResult::from(result))
})
}
#[pyo3(signature = (updates, r#where=None))]
pub fn update<'a>(
self_: PyRef<'a, Self>,
updates: &Bound<'_, PyDict>,
r#where: Option<String>,
) -> PyResult<Bound<'a, PyAny>> {
let mut op = self_.inner_ref()?.update();
if let Some(only_if) = r#where {
op = op.only_if(only_if);
}
for (column_name, value) in updates.into_iter() {
let column_name: String = column_name.extract()?;
let value: String = value.extract()?;
op = op.column(column_name, value);
}
future_into_py(self_.py(), async move {
let result = op.execute().await.infer_error()?;
Ok(UpdateResult::from(result))
})
}
#[pyo3(signature = (filter=None))]
pub fn count_rows(
self_: PyRef<'_, Self>,
filter: Option<String>,
) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
inner.count_rows(filter).await.infer_error()
})
}
#[pyo3(signature = (column, index=None, replace=None, wait_timeout=None, *, name=None, train=None))]
pub fn create_index<'a>(
self_: PyRef<'a, Self>,
column: String,
index: Option<Bound<'_, PyAny>>,
replace: Option<bool>,
wait_timeout: Option<Bound<'_, PyAny>>,
name: Option<String>,
train: Option<bool>,
) -> PyResult<Bound<'a, PyAny>> {
let index = extract_index_params(&index)?;
let timeout = wait_timeout.map(|t| t.extract::<std::time::Duration>().unwrap());
let mut op = self_
.inner_ref()?
.create_index_with_timeout(&[column], index, timeout);
if let Some(replace) = replace {
op = op.replace(replace);
}
if let Some(name) = name {
op = op.name(name);
}
if let Some(train) = train {
op = op.train(train);
}
future_into_py(self_.py(), async move {
op.execute().await.infer_error()?;
Ok(())
})
}
pub fn drop_index(self_: PyRef<'_, Self>, index_name: String) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
inner.drop_index(&index_name).await.infer_error()?;
Ok(())
})
}
pub fn wait_for_index<'a>(
self_: PyRef<'a, Self>,
index_names: Vec<String>,
timeout: Bound<'_, PyAny>,
) -> PyResult<Bound<'a, PyAny>> {
let inner = self_.inner_ref()?.clone();
let timeout = timeout.extract::<std::time::Duration>()?;
future_into_py(self_.py(), async move {
let index_refs = index_names
.iter()
.map(String::as_str)
.collect::<Vec<&str>>();
inner
.wait_for_index(&index_refs, timeout)
.await
.infer_error()?;
Ok(())
})
}
pub fn prewarm_index(self_: PyRef<'_, Self>, index_name: String) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
inner.prewarm_index(&index_name).await.infer_error()?;
Ok(())
})
}
pub fn list_indices(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
Ok(inner
.list_indices()
.await
.infer_error()?
.into_iter()
.map(IndexConfig::from)
.collect::<Vec<_>>())
})
}
pub fn index_stats(self_: PyRef<'_, Self>, index_name: String) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
let stats = inner.index_stats(&index_name).await.infer_error()?;
if let Some(stats) = stats {
Python::attach(|py| {
let dict = PyDict::new(py);
dict.set_item("num_indexed_rows", stats.num_indexed_rows)?;
dict.set_item("num_unindexed_rows", stats.num_unindexed_rows)?;
dict.set_item("index_type", stats.index_type.to_string())?;
if let Some(distance_type) = stats.distance_type {
dict.set_item("distance_type", distance_type.to_string())?;
}
if let Some(num_indices) = stats.num_indices {
dict.set_item("num_indices", num_indices)?;
}
if let Some(loss) = stats.loss {
dict.set_item("loss", loss)?;
}
Ok(Some(dict.unbind()))
})
} else {
Ok(None)
}
})
}
pub fn stats(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
let stats = inner.stats().await.infer_error()?;
Python::attach(|py| {
let dict = PyDict::new(py);
dict.set_item("total_bytes", stats.total_bytes)?;
dict.set_item("num_rows", stats.num_rows)?;
dict.set_item("num_indices", stats.num_indices)?;
let fragment_stats = PyDict::new(py);
fragment_stats.set_item("num_fragments", stats.fragment_stats.num_fragments)?;
fragment_stats.set_item(
"num_small_fragments",
stats.fragment_stats.num_small_fragments,
)?;
let fragment_lengths = PyDict::new(py);
fragment_lengths.set_item("min", stats.fragment_stats.lengths.min)?;
fragment_lengths.set_item("max", stats.fragment_stats.lengths.max)?;
fragment_lengths.set_item("mean", stats.fragment_stats.lengths.mean)?;
fragment_lengths.set_item("p25", stats.fragment_stats.lengths.p25)?;
fragment_lengths.set_item("p50", stats.fragment_stats.lengths.p50)?;
fragment_lengths.set_item("p75", stats.fragment_stats.lengths.p75)?;
fragment_lengths.set_item("p99", stats.fragment_stats.lengths.p99)?;
fragment_stats.set_item("lengths", fragment_lengths)?;
dict.set_item("fragment_stats", fragment_stats)?;
Ok(Some(dict.unbind()))
})
})
}
pub fn uri(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move { inner.uri().await.infer_error() })
}
pub fn initial_storage_options(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
Ok(inner.initial_storage_options().await)
})
}
pub fn latest_storage_options(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
inner.latest_storage_options().await.infer_error()
})
}
pub fn __repr__(&self) -> String {
match &self.inner {
None => format!("ClosedTable({})", self.name),
Some(inner) => inner.to_string(),
}
}
pub fn version(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(
self_.py(),
async move { inner.version().await.infer_error() },
)
}
pub fn list_versions(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
let versions = inner.list_versions().await.infer_error()?;
let versions_as_dict = Python::attach(|py| {
versions
.iter()
.map(|v| {
let dict = PyDict::new(py);
dict.set_item("version", v.version).unwrap();
dict.set_item(
"timestamp",
v.timestamp.timestamp_nanos_opt().unwrap_or_default(),
)
.unwrap();
let tup: Vec<(&String, &String)> = v.metadata.iter().collect();
dict.set_item("metadata", tup.into_py_dict(py)?).unwrap();
Ok(dict.unbind())
})
.collect::<PyResult<Vec<_>>>()
});
versions_as_dict
})
}
pub fn checkout(self_: PyRef<'_, Self>, version: LanceVersion) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
let py = self_.py();
future_into_py(py, async move {
match version {
LanceVersion::Version(version_num) => {
inner.checkout(version_num).await.infer_error()
}
LanceVersion::Tag(tag) => inner.checkout_tag(&tag).await.infer_error(),
}
})
}
pub fn checkout_latest(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
inner.checkout_latest().await.infer_error()
})
}
#[pyo3(signature = (version=None))]
pub fn restore(
self_: PyRef<'_, Self>,
version: Option<LanceVersion>,
) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
let py = self_.py();
future_into_py(py, async move {
if let Some(version) = version {
match version {
LanceVersion::Version(num) => inner.checkout(num).await.infer_error()?,
LanceVersion::Tag(tag) => inner.checkout_tag(&tag).await.infer_error()?,
}
}
inner.restore().await.infer_error()
})
}
pub fn query(&self) -> Query {
Query::new(self.inner_ref().unwrap().query())
}
#[getter]
pub fn tags(&self) -> PyResult<Tags> {
Ok(Tags::new(self.inner_ref()?.clone()))
}
#[pyo3(signature = (offsets))]
pub fn take_offsets(self_: PyRef<'_, Self>, offsets: Vec<u64>) -> PyResult<TakeQuery> {
Ok(TakeQuery::new(
self_.inner_ref()?.clone().take_offsets(offsets),
))
}
#[pyo3(signature = (row_ids))]
pub fn take_row_ids(self_: PyRef<'_, Self>, row_ids: Vec<u64>) -> PyResult<TakeQuery> {
Ok(TakeQuery::new(
self_.inner_ref()?.clone().take_row_ids(row_ids),
))
}
/// Optimize the on-disk data by compacting and pruning old data, for better performance.
#[pyo3(signature = (cleanup_since_ms=None, delete_unverified=None))]
pub fn optimize(
self_: PyRef<'_, Self>,
cleanup_since_ms: Option<u64>,
delete_unverified: Option<bool>,
) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
let older_than = if let Some(ms) = cleanup_since_ms {
if ms > i64::MAX as u64 {
return Err(PyValueError::new_err(format!(
"cleanup_since_ms must be between {} and -{}",
i32::MAX,
i32::MAX
)));
}
Duration::try_milliseconds(ms as i64)
} else {
None
};
future_into_py(self_.py(), async move {
let compaction_stats = inner
.optimize(OptimizeAction::Compact {
options: lancedb::table::CompactionOptions::default(),
remap_options: None,
})
.await
.infer_error()?
.compaction
.unwrap();
let prune_stats = inner
.optimize(OptimizeAction::Prune {
older_than,
delete_unverified,
error_if_tagged_old_versions: None,
})
.await
.infer_error()?
.prune
.unwrap();
inner
.optimize(lancedb::table::OptimizeAction::Index(
OptimizeOptions::default(),
))
.await
.infer_error()?;
Ok(OptimizeStats {
compaction: CompactionStats {
files_added: compaction_stats.files_added as u64,
files_removed: compaction_stats.files_removed as u64,
fragments_added: compaction_stats.fragments_added as u64,
fragments_removed: compaction_stats.fragments_removed as u64,
},
prune: RemovalStats {
bytes_removed: prune_stats.bytes_removed,
old_versions_removed: prune_stats.old_versions,
},
})
})
}
pub fn execute_merge_insert<'a>(
self_: PyRef<'a, Self>,
data: Bound<'a, PyAny>,
parameters: MergeInsertParams,
) -> PyResult<Bound<'a, PyAny>> {
let batches: ArrowArrayStreamReader = ArrowArrayStreamReader::from_pyarrow_bound(&data)?;
let on = parameters.on.iter().map(|s| s.as_str()).collect::<Vec<_>>();
let mut builder = self_.inner_ref()?.merge_insert(&on);
if parameters.when_matched_update_all {
builder.when_matched_update_all(parameters.when_matched_update_all_condition);
}
if parameters.when_not_matched_insert_all {
builder.when_not_matched_insert_all();
}
if parameters.when_not_matched_by_source_delete {
builder
.when_not_matched_by_source_delete(parameters.when_not_matched_by_source_condition);
}
if let Some(timeout) = parameters.timeout {
builder.timeout(timeout);
}
if let Some(use_index) = parameters.use_index {
builder.use_index(use_index);
}
future_into_py(self_.py(), async move {
let res = builder.execute(Box::new(batches)).await.infer_error()?;
Ok(MergeResult::from(res))
})
}
pub fn uses_v2_manifest_paths(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
inner
.as_native()
.ok_or_else(|| PyValueError::new_err("This cannot be run on a remote table"))?
.uses_v2_manifest_paths()
.await
.infer_error()
})
}
pub fn migrate_manifest_paths_v2(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
inner
.as_native()
.ok_or_else(|| PyValueError::new_err("This cannot be run on a remote table"))?
.migrate_manifest_paths_v2()
.await
.infer_error()
})
}
pub fn add_columns(
self_: PyRef<'_, Self>,
definitions: Vec<(String, String)>,
) -> PyResult<Bound<'_, PyAny>> {
let definitions = NewColumnTransform::SqlExpressions(definitions);
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
let result = inner.add_columns(definitions, None).await.infer_error()?;
Ok(AddColumnsResult::from(result))
})
}
pub fn add_columns_with_schema(
self_: PyRef<'_, Self>,
schema: PyArrowType<Schema>,
) -> PyResult<Bound<'_, PyAny>> {
let arrow_schema = &schema.0;
let transform = NewColumnTransform::AllNulls(Arc::new(arrow_schema.clone()));
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
let result = inner.add_columns(transform, None).await.infer_error()?;
Ok(AddColumnsResult::from(result))
})
}
pub fn alter_columns<'a>(
self_: PyRef<'a, Self>,
alterations: Vec<Bound<PyDict>>,
) -> PyResult<Bound<'a, PyAny>> {
let alterations = alterations
.iter()
.map(|alteration| {
let path = alteration
.get_item("path")?
.ok_or_else(|| PyValueError::new_err("Missing path"))?
.extract()?;
let rename = {
// We prefer rename, but support name for backwards compatibility
let rename = if let Ok(Some(rename)) = alteration.get_item("rename") {
Some(rename)
} else {
alteration.get_item("name")?
};
rename.map(|name| name.extract()).transpose()?
};
let nullable = alteration
.get_item("nullable")?
.map(|val| val.extract())
.transpose()?;
let data_type = alteration
.get_item("data_type")?
.map(|val| DataType::from_pyarrow_bound(&val))
.transpose()?;
Ok(ColumnAlteration {
path,
rename,
nullable,
data_type,
})
})
.collect::<PyResult<Vec<_>>>()?;
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
let result = inner.alter_columns(&alterations).await.infer_error()?;
Ok(AlterColumnsResult::from(result))
})
}
pub fn drop_columns(self_: PyRef<Self>, columns: Vec<String>) -> PyResult<Bound<PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
let column_refs = columns.iter().map(String::as_str).collect::<Vec<&str>>();
let result = inner.drop_columns(&column_refs).await.infer_error()?;
Ok(DropColumnsResult::from(result))
})
}
pub fn replace_field_metadata<'a>(
self_: PyRef<'a, Self>,
field_name: String,
metadata: &Bound<'_, PyDict>,
) -> PyResult<Bound<'a, PyAny>> {
let mut new_metadata = HashMap::<String, String>::new();
for (column_name, value) in metadata.into_iter() {
let key: String = column_name.extract()?;
let value: String = value.extract()?;
new_metadata.insert(key, value);
}
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
let native_tbl = inner
.as_native()
.ok_or_else(|| PyValueError::new_err("This cannot be run on a remote table"))?;
let schema = native_tbl.manifest().await.infer_error()?.schema;
let field = schema
.field(&field_name)
.ok_or_else(|| PyKeyError::new_err(format!("Field {} not found", field_name)))?;
native_tbl
.replace_field_metadata(vec![(field.id as u32, new_metadata)])
.await
.infer_error()?;
Ok(())
})
}
}
#[derive(FromPyObject)]
pub enum LanceVersion {
Version(u64),
Tag(String),
}
#[derive(FromPyObject)]
#[pyo3(from_item_all)]
pub struct MergeInsertParams {
on: Vec<String>,
when_matched_update_all: bool,
when_matched_update_all_condition: Option<String>,
when_not_matched_insert_all: bool,
when_not_matched_by_source_delete: bool,
when_not_matched_by_source_condition: Option<String>,
timeout: Option<std::time::Duration>,
use_index: Option<bool>,
}
#[pyclass]
pub struct Tags {
inner: LanceDbTable,
}
impl Tags {
pub fn new(table: LanceDbTable) -> Self {
Self { inner: table }
}
}
#[pymethods]
impl Tags {
pub fn list(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner.clone();
future_into_py(self_.py(), async move {
let tags = inner.tags().await.infer_error()?;
let res = tags.list().await.infer_error()?;
Python::attach(|py| {
let py_dict = PyDict::new(py);
for (key, contents) in res {
let value_dict = PyDict::new(py);
value_dict.set_item("version", contents.version)?;
value_dict.set_item("manifest_size", contents.manifest_size)?;
py_dict.set_item(key, value_dict)?;
}
Ok(py_dict.unbind())
})
})
}
pub fn get_version(self_: PyRef<'_, Self>, tag: String) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner.clone();
future_into_py(self_.py(), async move {
let tags = inner.tags().await.infer_error()?;
let res = tags.get_version(tag.as_str()).await.infer_error()?;
Ok(res)
})
}
pub fn create(self_: PyRef<Self>, tag: String, version: u64) -> PyResult<Bound<PyAny>> {
let inner = self_.inner.clone();
future_into_py(self_.py(), async move {
let mut tags = inner.tags().await.infer_error()?;
tags.create(tag.as_str(), version).await.infer_error()?;
Ok(())
})
}
pub fn delete(self_: PyRef<Self>, tag: String) -> PyResult<Bound<PyAny>> {
let inner = self_.inner.clone();
future_into_py(self_.py(), async move {
let mut tags = inner.tags().await.infer_error()?;
tags.delete(tag.as_str()).await.infer_error()?;
Ok(())
})
}
pub fn update(self_: PyRef<Self>, tag: String, version: u64) -> PyResult<Bound<PyAny>> {
let inner = self_.inner.clone();
future_into_py(self_.py(), async move {
let mut tags = inner.tags().await.infer_error()?;
tags.update(tag.as_str(), version).await.infer_error()?;
Ok(())
})
}
}