feat: {add|alter|drop}_columns APIs (#1015)

Initial work for #959. This exposes the basic functionality for each in
all of the APIs. Will add user guide documentation in a later PR.
This commit is contained in:
Will Jones
2024-02-26 11:04:53 -08:00
committed by GitHub
parent 8a52619bc0
commit 5af74b5aca
14 changed files with 660 additions and 11 deletions

View File

@@ -42,7 +42,10 @@ const {
tableCompactFiles,
tableListIndices,
tableIndexStats,
tableSchema
tableSchema,
tableAddColumns,
tableAlterColumns,
tableDropColumns
// eslint-disable-next-line @typescript-eslint/no-var-requires
} = require('../native.js')
@@ -500,6 +503,59 @@ export interface Table<T = number[]> {
filter(value: string): Query<T>
schema: Promise<Schema>
// TODO: Support BatchUDF
/**
* Add new columns with defined values.
*
* @param newColumnTransforms pairs of column names and the SQL expression to use
* to calculate the value of the new column. These
* expressions will be evaluated for each row in the
* table, and can reference existing columns in the table.
*/
addColumns(newColumnTransforms: Array<{ name: string, valueSql: string }>): Promise<void>
/**
* Alter the name or nullability of columns.
*
* @param columnAlterations One or more alterations to apply to columns.
*/
alterColumns(columnAlterations: ColumnAlteration[]): Promise<void>
/**
* Drop one or more columns from the dataset
*
* This is a metadata-only operation and does not remove the data from the
* underlying storage. In order to remove the data, you must subsequently
* call ``compact_files`` to rewrite the data without the removed columns and
* then call ``cleanup_files`` to remove the old files.
*
* @param columnNames The names of the columns to drop. These can be nested
* column references (e.g. "a.b.c") or top-level column
* names (e.g. "a").
*/
dropColumns(columnNames: string[]): Promise<void>
}
/**
* A definition of a column alteration. The alteration changes the column at
* `path` to have the new name `name`, to be nullable if `nullable` is true,
* and to have the data type `data_type`. At least one of `rename` or `nullable`
* must be provided.
*/
export interface ColumnAlteration {
/**
* The path to the column to alter. This is a dot-separated path to the column.
* If it is a top-level column then it is just the name of the column. If it is
* a nested column then it is the path to the column, e.g. "a.b.c" for a column
* `c` nested inside a column `b` nested inside a column `a`.
*/
path: string
rename?: string
/**
* Set the new nullability. Note that a nullable column cannot be made non-nullable.
*/
nullable?: boolean
}
export interface UpdateArgs {
@@ -1028,6 +1084,18 @@ export class LocalTable<T = number[]> implements Table<T> {
return false
}
}
async addColumns (newColumnTransforms: Array<{ name: string, valueSql: string }>): Promise<void> {
return tableAddColumns.call(this._tbl, newColumnTransforms)
}
async alterColumns (columnAlterations: ColumnAlteration[]): Promise<void> {
return tableAlterColumns.call(this._tbl, columnAlterations)
}
async dropColumns (columnNames: string[]): Promise<void> {
return tableDropColumns.call(this._tbl, columnNames)
}
}
export interface CleanupStats {

View File

@@ -25,7 +25,8 @@ import {
type UpdateArgs,
type UpdateSqlArgs,
makeArrowTable,
type MergeInsertArgs
type MergeInsertArgs,
type ColumnAlteration
} from '../index'
import { Query } from '../query'
@@ -474,4 +475,16 @@ export class RemoteTable<T = number[]> implements Table<T> {
numUnindexedRows: results.data.num_unindexed_rows
}
}
async addColumns (newColumnTransforms: Array<{ name: string, valueSql: string }>): Promise<void> {
throw new Error('Add columns is not yet supported in LanceDB Cloud.')
}
async alterColumns (columnAlterations: ColumnAlteration[]): Promise<void> {
throw new Error('Alter columns is not yet supported in LanceDB Cloud.')
}
async dropColumns (columnNames: string[]): Promise<void> {
throw new Error('Drop columns is not yet supported in LanceDB Cloud.')
}
}

View File

@@ -37,8 +37,10 @@ import {
Utf8,
Table as ArrowTable,
vectorFromArray,
Float64,
Float32,
Float16
Float16,
Int64
} from 'apache-arrow'
const expect = chai.expect
@@ -1057,3 +1059,63 @@ describe('Compact and cleanup', function () {
assert.equal(await table.countRows(), 3)
})
})
describe('schema evolution', function () {
// Create a new sample table
it('can add a new column to the schema', async function () {
const dir = await track().mkdir('lancejs')
const con = await lancedb.connect(dir)
const table = await con.createTable('vectors', [
{ id: 1n, vector: [0.1, 0.2] }
])
await table.addColumns([{ name: 'price', valueSql: 'cast(10.0 as float)' }])
const expectedSchema = new Schema([
new Field('id', new Int64()),
new Field('vector', new FixedSizeList(2, new Field('item', new Float32(), true))),
new Field('price', new Float32())
])
expect(await table.schema).to.deep.equal(expectedSchema)
})
it('can alter the columns in the schema', async function () {
const dir = await track().mkdir('lancejs')
const con = await lancedb.connect(dir)
const schema = new Schema([
new Field('id', new Int64(), false),
new Field('vector', new FixedSizeList(2, new Field('item', new Float32(), true))),
new Field('price', new Float64(), false)
])
const table = await con.createTable('vectors', [
{ id: 1n, vector: [0.1, 0.2], price: 10.0 }
])
expect(await table.schema).to.deep.equal(schema)
await table.alterColumns([
{ path: 'id', rename: 'new_id' },
{ path: 'price', nullable: true }
])
const expectedSchema = new Schema([
new Field('new_id', new Int64(), false),
new Field('vector', new FixedSizeList(2, new Field('item', new Float32(), true))),
new Field('price', new Float64(), true)
])
expect(await table.schema).to.deep.equal(expectedSchema)
})
it('can drop a column from the schema', async function () {
const dir = await track().mkdir('lancejs')
const con = await lancedb.connect(dir)
const table = await con.createTable('vectors', [
{ id: 1n, vector: [0.1, 0.2] }
])
await table.dropColumns(['vector'])
const expectedSchema = new Schema([
new Field('id', new Int64(), false)
])
expect(await table.schema).to.deep.equal(expectedSchema)
})
})

View File

@@ -17,7 +17,7 @@ import * as path from "path";
import * as fs from "fs";
import { connect } from "../dist";
import { Schema, Field, Float32, Int32, FixedSizeList } from "apache-arrow";
import { Schema, Field, Float32, Int32, FixedSizeList, Int64, Float64 } from "apache-arrow";
import { makeArrowTable } from "../dist/arrow";
describe("Test creating index", () => {
@@ -214,4 +214,69 @@ describe("Read consistency interval", () => {
expect(await table2.countRows()).toEqual(2n);
}
});
});
describe('schema evolution', function () {
let tmpDir: string;
beforeEach(() => {
tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "schema-evolution-"));
});
// Create a new sample table
it('can add a new column to the schema', async function () {
const con = await connect(tmpDir)
const table = await con.createTable('vectors', [
{ id: 1n, vector: [0.1, 0.2] }
])
await table.addColumns([{ name: 'price', valueSql: 'cast(10.0 as float)' }])
const expectedSchema = new Schema([
new Field('id', new Int64(), true),
new Field('vector', new FixedSizeList(2, new Field('item', new Float32(), true)), true),
new Field('price', new Float32(), false)
])
expect(await table.schema()).toEqual(expectedSchema)
});
it('can alter the columns in the schema', async function () {
const con = await connect(tmpDir)
const schema = new Schema([
new Field('id', new Int64(), true),
new Field('vector', new FixedSizeList(2, new Field('item', new Float32(), true)), true),
new Field('price', new Float64(), false)
])
const table = await con.createTable('vectors', [
{ id: 1n, vector: [0.1, 0.2] }
])
// Can create a non-nullable column only through addColumns at the moment.
await table.addColumns([{ name: 'price', valueSql: 'cast(10.0 as double)' }])
expect(await table.schema()).toEqual(schema)
await table.alterColumns([
{ path: 'id', rename: 'new_id' },
{ path: 'price', nullable: true }
])
const expectedSchema = new Schema([
new Field('new_id', new Int64(), true),
new Field('vector', new FixedSizeList(2, new Field('item', new Float32(), true)), true),
new Field('price', new Float64(), true)
])
expect(await table.schema()).toEqual(expectedSchema)
});
it('can drop a column from the schema', async function () {
const con = await connect(tmpDir)
const table = await con.createTable('vectors', [
{ id: 1n, vector: [0.1, 0.2] }
])
await table.dropColumns(['vector'])
const expectedSchema = new Schema([
new Field('id', new Int64(), true)
])
expect(await table.schema()).toEqual(expectedSchema)
});
});

View File

@@ -12,6 +12,38 @@ export const enum MetricType {
Cosine = 1,
Dot = 2
}
/**
* A definition of a column alteration. The alteration changes the column at
* `path` to have the new name `name`, to be nullable if `nullable` is true,
* and to have the data type `data_type`. At least one of `rename` or `nullable`
* must be provided.
*/
export interface ColumnAlteration {
/**
* The path to the column to alter. This is a dot-separated path to the column.
* If it is a top-level column then it is just the name of the column. If it is
* a nested column then it is the path to the column, e.g. "a.b.c" for a column
* `c` nested inside a column `b` nested inside a column `a`.
*/
path: string
/**
* The new name of the column. If not provided then the name will not be changed.
* This must be distinct from the names of all other columns in the table.
*/
rename?: string
/** Set the new nullability. Note that a nullable column cannot be made non-nullable. */
nullable?: boolean
}
/** A definition of a new column to add to a table. */
export interface AddColumnsSql {
/** The name of the new column. */
name: string
/**
* The values to populate the new column with, as a SQL expression.
* The expression can reference other columns in the table.
*/
valueSql: string
}
export interface ConnectionOptions {
uri: string
apiKey?: string
@@ -89,4 +121,7 @@ export class Table {
delete(predicate: string): Promise<void>
createIndex(): IndexBuilder
query(): Query
addColumns(transforms: Array<AddColumnsSql>): Promise<void>
alterColumns(alterations: Array<ColumnAlteration>): Promise<void>
dropColumns(columns: Array<string>): Promise<void>
}

View File

@@ -13,7 +13,7 @@
// limitations under the License.
import { Schema, tableFromIPC } from "apache-arrow";
import { Table as _NativeTable } from "./native";
import { AddColumnsSql, ColumnAlteration, Table as _NativeTable } from "./native";
import { toBuffer, Data } from "./arrow";
import { Query } from "./query";
import { IndexBuilder } from "./indexer";
@@ -150,4 +150,42 @@ export class Table {
}
return q;
}
// TODO: Support BatchUDF
/**
* Add new columns with defined values.
*
* @param newColumnTransforms pairs of column names and the SQL expression to use
* to calculate the value of the new column. These
* expressions will be evaluated for each row in the
* table, and can reference existing columns in the table.
*/
async addColumns(newColumnTransforms: AddColumnsSql[]): Promise<void> {
await this.inner.addColumns(newColumnTransforms);
}
/**
* Alter the name or nullability of columns.
*
* @param columnAlterations One or more alterations to apply to columns.
*/
async alterColumns(columnAlterations: ColumnAlteration[]): Promise<void> {
await this.inner.alterColumns(columnAlterations);
}
/**
* Drop one or more columns from the dataset
*
* This is a metadata-only operation and does not remove the data from the
* underlying storage. In order to remove the data, you must subsequently
* call ``compact_files`` to rewrite the data without the removed columns and
* then call ``cleanup_files`` to remove the old files.
*
* @param columnNames The names of the columns to drop. These can be nested
* column references (e.g. "a.b.c") or top-level column
* names (e.g. "a").
*/
async dropColumns(columnNames: string[]): Promise<void> {
await this.inner.dropColumns(columnNames);
}
}

View File

@@ -13,8 +13,11 @@
// limitations under the License.
use arrow_ipc::writer::FileWriter;
use lancedb::table::AddDataOptions;
use lancedb::{ipc::ipc_file_to_batches, table::TableRef};
use lance::dataset::ColumnAlteration as LanceColumnAlteration;
use lancedb::{
ipc::ipc_file_to_batches,
table::{AddDataOptions, TableRef},
};
use napi::bindgen_prelude::*;
use napi_derive::napi;
@@ -93,4 +96,104 @@ impl Table {
pub fn query(&self) -> Query {
Query::new(self)
}
#[napi]
pub async fn add_columns(&self, transforms: Vec<AddColumnsSql>) -> napi::Result<()> {
let transforms = transforms
.into_iter()
.map(|sql| (sql.name, sql.value_sql))
.collect::<Vec<_>>();
let transforms = lance::dataset::NewColumnTransform::SqlExpressions(transforms);
self.table
.add_columns(transforms, None)
.await
.map_err(|err| {
napi::Error::from_reason(format!(
"Failed to add columns to table {}: {}",
self.table, err
))
})?;
Ok(())
}
#[napi]
pub async fn alter_columns(&self, alterations: Vec<ColumnAlteration>) -> napi::Result<()> {
for alteration in &alterations {
if alteration.rename.is_none() && alteration.nullable.is_none() {
return Err(napi::Error::from_reason(
"Alteration must have a 'rename' or 'nullable' field.",
));
}
}
let alterations = alterations
.into_iter()
.map(LanceColumnAlteration::from)
.collect::<Vec<_>>();
self.table
.alter_columns(&alterations)
.await
.map_err(|err| {
napi::Error::from_reason(format!(
"Failed to alter columns in table {}: {}",
self.table, err
))
})?;
Ok(())
}
#[napi]
pub async fn drop_columns(&self, columns: Vec<String>) -> napi::Result<()> {
let col_refs = columns.iter().map(String::as_str).collect::<Vec<_>>();
self.table.drop_columns(&col_refs).await.map_err(|err| {
napi::Error::from_reason(format!(
"Failed to drop columns from table {}: {}",
self.table, err
))
})?;
Ok(())
}
}
/// A definition of a column alteration. The alteration changes the column at
/// `path` to have the new name `name`, to be nullable if `nullable` is true,
/// and to have the data type `data_type`. At least one of `rename` or `nullable`
/// must be provided.
#[napi(object)]
pub struct ColumnAlteration {
/// The path to the column to alter. This is a dot-separated path to the column.
/// If it is a top-level column then it is just the name of the column. If it is
/// a nested column then it is the path to the column, e.g. "a.b.c" for a column
/// `c` nested inside a column `b` nested inside a column `a`.
pub path: String,
/// The new name of the column. If not provided then the name will not be changed.
/// This must be distinct from the names of all other columns in the table.
pub rename: Option<String>,
/// Set the new nullability. Note that a nullable column cannot be made non-nullable.
pub nullable: Option<bool>,
}
impl From<ColumnAlteration> for LanceColumnAlteration {
fn from(js: ColumnAlteration) -> Self {
let ColumnAlteration {
path,
rename,
nullable,
} = js;
Self {
path,
rename,
nullable,
}
}
}
/// A definition of a new column to add to a table.
#[napi(object)]
pub struct AddColumnsSql {
/// The name of the new column.
pub name: String,
/// The values to populate the new column with, as a SQL expression.
/// The expression can reference other columns in the table.
pub value_sql: String,
}

View File

@@ -15,7 +15,7 @@ import logging
import uuid
from concurrent.futures import Future
from functools import cached_property
from typing import Dict, Optional, Union
from typing import Dict, Iterable, Optional, Union
import pyarrow as pa
from lance import json_to_schema
@@ -473,6 +473,21 @@ class RemoteTable(Table):
"count_rows() is not yet supported on the LanceDB cloud"
)
def add_columns(self, transforms: Dict[str, str]):
raise NotImplementedError(
"add_columns() is not yet supported on the LanceDB cloud"
)
def alter_columns(self, alterations: Iterable[Dict[str, str]]):
raise NotImplementedError(
"alter_columns() is not yet supported on the LanceDB cloud"
)
def drop_columns(self, columns: Iterable[str]):
raise NotImplementedError(
"drop_columns() is not yet supported on the LanceDB cloud"
)
def add_index(tbl: pa.Table, i: int) -> pa.Table:
return tbl.add_column(

View File

@@ -660,6 +660,56 @@ class Table(ABC):
For most cases, the default should be fine.
"""
@abstractmethod
def add_columns(self, transforms: Dict[str, str]):
"""
Add new columns with defined values.
This is not yet available in LanceDB Cloud.
Parameters
----------
transforms: Dict[str, str]
A map of column name to a SQL expression to use to calculate the
value of the new column. These expressions will be evaluated for
each row in the table, and can reference existing columns.
"""
@abstractmethod
def alter_columns(self, alterations: Iterable[Dict[str, str]]):
"""
Alter column names and nullability.
This is not yet available in LanceDB Cloud.
alterations : Iterable[Dict[str, Any]]
A sequence of dictionaries, each with the following keys:
- "path": str
The column path to alter. For a top-level column, this is the name.
For a nested column, this is the dot-separated path, e.g. "a.b.c".
- "name": str, optional
The new name of the column. If not specified, the column name is
not changed.
- "nullable": bool, optional
Whether the column should be nullable. If not specified, the column
nullability is not changed. Only non-nullable columns can be changed
to nullable. Currently, you cannot change a nullable column to
non-nullable.
"""
@abstractmethod
def drop_columns(self, columns: Iterable[str]):
"""
Drop columns from the table.
This is not yet available in LanceDB Cloud.
Parameters
----------
columns : Iterable[str]
The names of the columns to drop.
"""
class _LanceDatasetRef(ABC):
@property
@@ -1536,6 +1586,22 @@ class LanceTable(Table):
"""
return self.to_lance().optimize.compact_files(*args, **kwargs)
def add_columns(self, transforms: Dict[str, str]):
self._dataset_mut.add_columns(transforms)
def alter_columns(self, *alterations: Iterable[Dict[str, str]]):
modified = []
# I called this name in pylance, but I think I regret that now. So we
# allow both name and rename.
for alter in alterations:
if "rename" in alter:
alter["name"] = alter.pop("rename")
modified.append(alter)
self._dataset_mut.alter_columns(*modified)
def drop_columns(self, columns: Iterable[str]):
self._dataset_mut.drop_columns(columns)
def _sanitize_schema(
data: pa.Table,

View File

@@ -62,7 +62,7 @@ lancedb = "lancedb.cli.cli:cli"
requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta"
[tool.ruff]
[tool.ruff.lint]
select = ["F", "E", "W", "I", "G", "TCH", "PERF"]
[tool.pytest.ini_options]

View File

@@ -898,3 +898,29 @@ def test_restore_consistency(tmp_path):
table.add([{"id": 2}])
assert table_fixed.version == table.version - 1
assert table_ref_latest.version == table.version
# Schema evolution
def test_add_columns(tmp_path):
db = lancedb.connect(tmp_path)
data = pa.table({"id": [0, 1]})
table = LanceTable.create(db, "my_table", data=data)
table.add_columns({"new_col": "id + 2"})
assert table.to_arrow().column_names == ["id", "new_col"]
assert table.to_arrow()["new_col"].to_pylist() == [2, 3]
def test_alter_columns(tmp_path):
db = lancedb.connect(tmp_path)
data = pa.table({"id": [0, 1]})
table = LanceTable.create(db, "my_table", data=data)
table.alter_columns({"path": "id", "rename": "new_id"})
assert table.to_arrow().column_names == ["new_id"]
def test_drop_columns(tmp_path):
db = lancedb.connect(tmp_path)
data = pa.table({"id": [0, 1], "category": ["a", "b"]})
table = LanceTable.create(db, "my_table", data=data)
table.drop_columns(["category"])
assert table.to_arrow().column_names == ["id"]

View File

@@ -286,5 +286,8 @@ fn main(mut cx: ModuleContext) -> NeonResult<()> {
index::vector::table_create_vector_index,
)?;
cx.export_function("tableSchema", JsTable::js_schema)?;
cx.export_function("tableAddColumns", JsTable::js_add_columns)?;
cx.export_function("tableAlterColumns", JsTable::js_alter_columns)?;
cx.export_function("tableDropColumns", JsTable::js_drop_columns)?;
Ok(())
}

View File

@@ -16,7 +16,7 @@ use std::ops::Deref;
use arrow_array::{RecordBatch, RecordBatchIterator};
use lance::dataset::optimize::CompactionOptions;
use lance::dataset::{WriteMode, WriteParams};
use lance::dataset::{ColumnAlteration, NewColumnTransform, WriteMode, WriteParams};
use lance::io::ObjectStoreParams;
use lancedb::table::{AddDataOptions, OptimizeAction, WriteOptions};
@@ -544,4 +544,116 @@ impl JsTable {
});
Ok(promise)
}
pub(crate) fn js_add_columns(mut cx: FunctionContext) -> JsResult<JsPromise> {
let expressions = cx
.argument::<JsArray>(0)?
.to_vec(&mut cx)?
.into_iter()
.map(|val| {
let obj = val.downcast_or_throw::<JsObject, _>(&mut cx)?;
let name = obj.get::<JsString, _, _>(&mut cx, "name")?.value(&mut cx);
let sql = obj
.get::<JsString, _, _>(&mut cx, "valueSql")?
.value(&mut cx);
Ok((name, sql))
})
.collect::<NeonResult<Vec<(String, String)>>>()?;
let transforms = NewColumnTransform::SqlExpressions(expressions);
let js_table = cx.this().downcast_or_throw::<JsBox<Self>, _>(&mut cx)?;
let rt = runtime(&mut cx)?;
let (deferred, promise) = cx.promise();
let channel = cx.channel();
let table = js_table.table.clone();
rt.spawn(async move {
let result = table.add_columns(transforms, None).await;
deferred.settle_with(&channel, move |mut cx| {
result.or_throw(&mut cx)?;
Ok(cx.undefined())
})
});
Ok(promise)
}
pub(crate) fn js_alter_columns(mut cx: FunctionContext) -> JsResult<JsPromise> {
let alterations = cx
.argument::<JsArray>(0)?
.to_vec(&mut cx)?
.into_iter()
.map(|val| {
let obj = val.downcast_or_throw::<JsObject, _>(&mut cx)?;
let path = obj.get::<JsString, _, _>(&mut cx, "path")?.value(&mut cx);
let rename = obj
.get_opt::<JsString, _, _>(&mut cx, "rename")?
.map(|val| val.value(&mut cx));
let nullable = obj
.get_opt::<JsBoolean, _, _>(&mut cx, "nullable")?
.map(|val| val.value(&mut cx));
// TODO: support data type here. Will need to do some serialization/deserialization
if rename.is_none() && nullable.is_none() {
return cx.throw_error("At least one of 'name' or 'nullable' must be provided");
}
Ok(ColumnAlteration {
path,
rename,
nullable,
})
})
.collect::<NeonResult<Vec<ColumnAlteration>>>()?;
let js_table = cx.this().downcast_or_throw::<JsBox<Self>, _>(&mut cx)?;
let rt = runtime(&mut cx)?;
let (deferred, promise) = cx.promise();
let channel = cx.channel();
let table = js_table.table.clone();
rt.spawn(async move {
let result = table.alter_columns(&alterations).await;
deferred.settle_with(&channel, move |mut cx| {
result.or_throw(&mut cx)?;
Ok(cx.undefined())
})
});
Ok(promise)
}
pub(crate) fn js_drop_columns(mut cx: FunctionContext) -> JsResult<JsPromise> {
let columns = cx
.argument::<JsArray>(0)?
.to_vec(&mut cx)?
.into_iter()
.map(|val| {
Ok(val
.downcast_or_throw::<JsString, _>(&mut cx)?
.value(&mut cx))
})
.collect::<NeonResult<Vec<String>>>()?;
let js_table = cx.this().downcast_or_throw::<JsBox<Self>, _>(&mut cx)?;
let rt = runtime(&mut cx)?;
let (deferred, promise) = cx.promise();
let channel = cx.channel();
let table = js_table.table.clone();
rt.spawn(async move {
let col_refs = columns.iter().map(|s| s.as_str()).collect::<Vec<_>>();
let result = table.drop_columns(&col_refs).await;
deferred.settle_with(&channel, move |mut cx| {
result.or_throw(&mut cx)?;
Ok(cx.undefined())
})
});
Ok(promise)
}
}

View File

@@ -27,7 +27,10 @@ use lance::dataset::optimize::{
compact_files, CompactionMetrics, CompactionOptions, IndexRemapperOptions,
};
pub use lance::dataset::ReadParams;
use lance::dataset::{Dataset, UpdateBuilder, WhenMatched, WriteMode, WriteParams};
use lance::dataset::{
ColumnAlteration, Dataset, NewColumnTransform, UpdateBuilder, WhenMatched, WriteMode,
WriteParams,
};
use lance::dataset::{MergeInsertBuilder as LanceMergeInsertBuilder, WhenNotMatchedBySource};
use lance::io::WrappingObjectStore;
use lance_index::{optimize::OptimizeOptions, DatasetIndexExt};
@@ -376,6 +379,19 @@ pub trait Table: std::fmt::Display + Send + Sync {
/// Modeled after ``VACUUM`` in PostgreSQL.
/// Not all implementations support explicit optimization.
async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats>;
/// Add new columns to the table, providing values to fill in.
async fn add_columns(
&self,
transforms: NewColumnTransform,
read_columns: Option<Vec<String>>,
) -> Result<()>;
/// Change a column's name or nullability.
async fn alter_columns(&self, alterations: &[ColumnAlteration]) -> Result<()>;
/// Remove columns from the table.
async fn drop_columns(&self, columns: &[&str]) -> Result<()>;
}
/// Reference to a Table pointer.
@@ -902,6 +918,33 @@ impl Table for NativeTable {
}
Ok(stats)
}
async fn add_columns(
&self,
transforms: NewColumnTransform,
read_columns: Option<Vec<String>>,
) -> Result<()> {
self.dataset
.get_mut()
.await?
.add_columns(transforms, read_columns)
.await?;
Ok(())
}
async fn alter_columns(&self, alterations: &[ColumnAlteration]) -> Result<()> {
self.dataset
.get_mut()
.await?
.alter_columns(alterations)
.await?;
Ok(())
}
async fn drop_columns(&self, columns: &[&str]) -> Result<()> {
self.dataset.get_mut().await?.drop_columns(columns).await?;
Ok(())
}
}
#[cfg(test)]