From 5af74b5acadcf78f7b9e43c28ea0e4177f646706 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Mon, 26 Feb 2024 11:04:53 -0800 Subject: [PATCH] feat: `{add|alter|drop}_columns` APIs (#1015) Initial work for #959. This exposes the basic functionality for each in all of the APIs. Will add user guide documentation in a later PR. --- node/src/index.ts | 70 +++++++++++++++++++- node/src/remote/index.ts | 15 ++++- node/src/test/test.ts | 64 +++++++++++++++++- nodejs/__test__/table.test.ts | 67 ++++++++++++++++++- nodejs/lancedb/native.d.ts | 35 ++++++++++ nodejs/lancedb/table.ts | 40 +++++++++++- nodejs/src/table.rs | 107 ++++++++++++++++++++++++++++++- python/lancedb/remote/table.py | 17 ++++- python/lancedb/table.py | 66 +++++++++++++++++++ python/pyproject.toml | 2 +- python/tests/test_table.py | 26 ++++++++ rust/ffi/node/src/lib.rs | 3 + rust/ffi/node/src/table.rs | 114 ++++++++++++++++++++++++++++++++- rust/lancedb/src/table.rs | 45 ++++++++++++- 14 files changed, 660 insertions(+), 11 deletions(-) diff --git a/node/src/index.ts b/node/src/index.ts index c5e2b7f2..8dd48788 100644 --- a/node/src/index.ts +++ b/node/src/index.ts @@ -42,7 +42,10 @@ const { tableCompactFiles, tableListIndices, tableIndexStats, - tableSchema + tableSchema, + tableAddColumns, + tableAlterColumns, + tableDropColumns // eslint-disable-next-line @typescript-eslint/no-var-requires } = require('../native.js') @@ -500,6 +503,59 @@ export interface Table { filter(value: string): Query schema: Promise + + // TODO: Support BatchUDF + /** + * Add new columns with defined values. + * + * @param newColumnTransforms pairs of column names and the SQL expression to use + * to calculate the value of the new column. These + * expressions will be evaluated for each row in the + * table, and can reference existing columns in the table. + */ + addColumns(newColumnTransforms: Array<{ name: string, valueSql: string }>): Promise + + /** + * Alter the name or nullability of columns. + * + * @param columnAlterations One or more alterations to apply to columns. + */ + alterColumns(columnAlterations: ColumnAlteration[]): Promise + + /** + * Drop one or more columns from the dataset + * + * This is a metadata-only operation and does not remove the data from the + * underlying storage. In order to remove the data, you must subsequently + * call ``compact_files`` to rewrite the data without the removed columns and + * then call ``cleanup_files`` to remove the old files. + * + * @param columnNames The names of the columns to drop. These can be nested + * column references (e.g. "a.b.c") or top-level column + * names (e.g. "a"). + */ + dropColumns(columnNames: string[]): Promise +} + +/** + * A definition of a column alteration. The alteration changes the column at + * `path` to have the new name `name`, to be nullable if `nullable` is true, + * and to have the data type `data_type`. At least one of `rename` or `nullable` + * must be provided. + */ +export interface ColumnAlteration { + /** + * The path to the column to alter. This is a dot-separated path to the column. + * If it is a top-level column then it is just the name of the column. If it is + * a nested column then it is the path to the column, e.g. "a.b.c" for a column + * `c` nested inside a column `b` nested inside a column `a`. + */ + path: string + rename?: string + /** + * Set the new nullability. Note that a nullable column cannot be made non-nullable. + */ + nullable?: boolean } export interface UpdateArgs { @@ -1028,6 +1084,18 @@ export class LocalTable implements Table { return false } } + + async addColumns (newColumnTransforms: Array<{ name: string, valueSql: string }>): Promise { + return tableAddColumns.call(this._tbl, newColumnTransforms) + } + + async alterColumns (columnAlterations: ColumnAlteration[]): Promise { + return tableAlterColumns.call(this._tbl, columnAlterations) + } + + async dropColumns (columnNames: string[]): Promise { + return tableDropColumns.call(this._tbl, columnNames) + } } export interface CleanupStats { diff --git a/node/src/remote/index.ts b/node/src/remote/index.ts index 9255f31f..56c3181d 100644 --- a/node/src/remote/index.ts +++ b/node/src/remote/index.ts @@ -25,7 +25,8 @@ import { type UpdateArgs, type UpdateSqlArgs, makeArrowTable, - type MergeInsertArgs + type MergeInsertArgs, + type ColumnAlteration } from '../index' import { Query } from '../query' @@ -474,4 +475,16 @@ export class RemoteTable implements Table { numUnindexedRows: results.data.num_unindexed_rows } } + + async addColumns (newColumnTransforms: Array<{ name: string, valueSql: string }>): Promise { + throw new Error('Add columns is not yet supported in LanceDB Cloud.') + } + + async alterColumns (columnAlterations: ColumnAlteration[]): Promise { + throw new Error('Alter columns is not yet supported in LanceDB Cloud.') + } + + async dropColumns (columnNames: string[]): Promise { + throw new Error('Drop columns is not yet supported in LanceDB Cloud.') + } } diff --git a/node/src/test/test.ts b/node/src/test/test.ts index 20b05087..3f8e5161 100644 --- a/node/src/test/test.ts +++ b/node/src/test/test.ts @@ -37,8 +37,10 @@ import { Utf8, Table as ArrowTable, vectorFromArray, + Float64, Float32, - Float16 + Float16, + Int64 } from 'apache-arrow' const expect = chai.expect @@ -1057,3 +1059,63 @@ describe('Compact and cleanup', function () { assert.equal(await table.countRows(), 3) }) }) + +describe('schema evolution', function () { + // Create a new sample table + it('can add a new column to the schema', async function () { + const dir = await track().mkdir('lancejs') + const con = await lancedb.connect(dir) + const table = await con.createTable('vectors', [ + { id: 1n, vector: [0.1, 0.2] } + ]) + + await table.addColumns([{ name: 'price', valueSql: 'cast(10.0 as float)' }]) + + const expectedSchema = new Schema([ + new Field('id', new Int64()), + new Field('vector', new FixedSizeList(2, new Field('item', new Float32(), true))), + new Field('price', new Float32()) + ]) + expect(await table.schema).to.deep.equal(expectedSchema) + }) + + it('can alter the columns in the schema', async function () { + const dir = await track().mkdir('lancejs') + const con = await lancedb.connect(dir) + const schema = new Schema([ + new Field('id', new Int64(), false), + new Field('vector', new FixedSizeList(2, new Field('item', new Float32(), true))), + new Field('price', new Float64(), false) + ]) + const table = await con.createTable('vectors', [ + { id: 1n, vector: [0.1, 0.2], price: 10.0 } + ]) + expect(await table.schema).to.deep.equal(schema) + + await table.alterColumns([ + { path: 'id', rename: 'new_id' }, + { path: 'price', nullable: true } + ]) + + const expectedSchema = new Schema([ + new Field('new_id', new Int64(), false), + new Field('vector', new FixedSizeList(2, new Field('item', new Float32(), true))), + new Field('price', new Float64(), true) + ]) + expect(await table.schema).to.deep.equal(expectedSchema) + }) + + it('can drop a column from the schema', async function () { + const dir = await track().mkdir('lancejs') + const con = await lancedb.connect(dir) + const table = await con.createTable('vectors', [ + { id: 1n, vector: [0.1, 0.2] } + ]) + await table.dropColumns(['vector']) + + const expectedSchema = new Schema([ + new Field('id', new Int64(), false) + ]) + expect(await table.schema).to.deep.equal(expectedSchema) + }) +}) diff --git a/nodejs/__test__/table.test.ts b/nodejs/__test__/table.test.ts index c58eb5ea..50a15247 100644 --- a/nodejs/__test__/table.test.ts +++ b/nodejs/__test__/table.test.ts @@ -17,7 +17,7 @@ import * as path from "path"; import * as fs from "fs"; import { connect } from "../dist"; -import { Schema, Field, Float32, Int32, FixedSizeList } from "apache-arrow"; +import { Schema, Field, Float32, Int32, FixedSizeList, Int64, Float64 } from "apache-arrow"; import { makeArrowTable } from "../dist/arrow"; describe("Test creating index", () => { @@ -214,4 +214,69 @@ describe("Read consistency interval", () => { expect(await table2.countRows()).toEqual(2n); } }); +}); + + +describe('schema evolution', function () { + let tmpDir: string; + beforeEach(() => { + tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "schema-evolution-")); + }); + + // Create a new sample table + it('can add a new column to the schema', async function () { + const con = await connect(tmpDir) + const table = await con.createTable('vectors', [ + { id: 1n, vector: [0.1, 0.2] } + ]) + + await table.addColumns([{ name: 'price', valueSql: 'cast(10.0 as float)' }]) + + const expectedSchema = new Schema([ + new Field('id', new Int64(), true), + new Field('vector', new FixedSizeList(2, new Field('item', new Float32(), true)), true), + new Field('price', new Float32(), false) + ]) + expect(await table.schema()).toEqual(expectedSchema) + }); + + it('can alter the columns in the schema', async function () { + const con = await connect(tmpDir) + const schema = new Schema([ + new Field('id', new Int64(), true), + new Field('vector', new FixedSizeList(2, new Field('item', new Float32(), true)), true), + new Field('price', new Float64(), false) + ]) + const table = await con.createTable('vectors', [ + { id: 1n, vector: [0.1, 0.2] } + ]) + // Can create a non-nullable column only through addColumns at the moment. + await table.addColumns([{ name: 'price', valueSql: 'cast(10.0 as double)' }]) + expect(await table.schema()).toEqual(schema) + + await table.alterColumns([ + { path: 'id', rename: 'new_id' }, + { path: 'price', nullable: true } + ]) + + const expectedSchema = new Schema([ + new Field('new_id', new Int64(), true), + new Field('vector', new FixedSizeList(2, new Field('item', new Float32(), true)), true), + new Field('price', new Float64(), true) + ]) + expect(await table.schema()).toEqual(expectedSchema) + }); + + it('can drop a column from the schema', async function () { + const con = await connect(tmpDir) + const table = await con.createTable('vectors', [ + { id: 1n, vector: [0.1, 0.2] } + ]) + await table.dropColumns(['vector']) + + const expectedSchema = new Schema([ + new Field('id', new Int64(), true) + ]) + expect(await table.schema()).toEqual(expectedSchema) + }); }); \ No newline at end of file diff --git a/nodejs/lancedb/native.d.ts b/nodejs/lancedb/native.d.ts index d3896594..573b70c7 100644 --- a/nodejs/lancedb/native.d.ts +++ b/nodejs/lancedb/native.d.ts @@ -12,6 +12,38 @@ export const enum MetricType { Cosine = 1, Dot = 2 } +/** + * A definition of a column alteration. The alteration changes the column at + * `path` to have the new name `name`, to be nullable if `nullable` is true, + * and to have the data type `data_type`. At least one of `rename` or `nullable` + * must be provided. + */ +export interface ColumnAlteration { + /** + * The path to the column to alter. This is a dot-separated path to the column. + * If it is a top-level column then it is just the name of the column. If it is + * a nested column then it is the path to the column, e.g. "a.b.c" for a column + * `c` nested inside a column `b` nested inside a column `a`. + */ + path: string + /** + * The new name of the column. If not provided then the name will not be changed. + * This must be distinct from the names of all other columns in the table. + */ + rename?: string + /** Set the new nullability. Note that a nullable column cannot be made non-nullable. */ + nullable?: boolean +} +/** A definition of a new column to add to a table. */ +export interface AddColumnsSql { + /** The name of the new column. */ + name: string + /** + * The values to populate the new column with, as a SQL expression. + * The expression can reference other columns in the table. + */ + valueSql: string +} export interface ConnectionOptions { uri: string apiKey?: string @@ -89,4 +121,7 @@ export class Table { delete(predicate: string): Promise createIndex(): IndexBuilder query(): Query + addColumns(transforms: Array): Promise + alterColumns(alterations: Array): Promise + dropColumns(columns: Array): Promise } diff --git a/nodejs/lancedb/table.ts b/nodejs/lancedb/table.ts index 819e3fc7..d3b9ea37 100644 --- a/nodejs/lancedb/table.ts +++ b/nodejs/lancedb/table.ts @@ -13,7 +13,7 @@ // limitations under the License. import { Schema, tableFromIPC } from "apache-arrow"; -import { Table as _NativeTable } from "./native"; +import { AddColumnsSql, ColumnAlteration, Table as _NativeTable } from "./native"; import { toBuffer, Data } from "./arrow"; import { Query } from "./query"; import { IndexBuilder } from "./indexer"; @@ -150,4 +150,42 @@ export class Table { } return q; } + + // TODO: Support BatchUDF + /** + * Add new columns with defined values. + * + * @param newColumnTransforms pairs of column names and the SQL expression to use + * to calculate the value of the new column. These + * expressions will be evaluated for each row in the + * table, and can reference existing columns in the table. + */ + async addColumns(newColumnTransforms: AddColumnsSql[]): Promise { + await this.inner.addColumns(newColumnTransforms); + } + + /** + * Alter the name or nullability of columns. + * + * @param columnAlterations One or more alterations to apply to columns. + */ + async alterColumns(columnAlterations: ColumnAlteration[]): Promise { + await this.inner.alterColumns(columnAlterations); + } + + /** + * Drop one or more columns from the dataset + * + * This is a metadata-only operation and does not remove the data from the + * underlying storage. In order to remove the data, you must subsequently + * call ``compact_files`` to rewrite the data without the removed columns and + * then call ``cleanup_files`` to remove the old files. + * + * @param columnNames The names of the columns to drop. These can be nested + * column references (e.g. "a.b.c") or top-level column + * names (e.g. "a"). + */ + async dropColumns(columnNames: string[]): Promise { + await this.inner.dropColumns(columnNames); + } } diff --git a/nodejs/src/table.rs b/nodejs/src/table.rs index c1882c29..74d73cbd 100644 --- a/nodejs/src/table.rs +++ b/nodejs/src/table.rs @@ -13,8 +13,11 @@ // limitations under the License. use arrow_ipc::writer::FileWriter; -use lancedb::table::AddDataOptions; -use lancedb::{ipc::ipc_file_to_batches, table::TableRef}; +use lance::dataset::ColumnAlteration as LanceColumnAlteration; +use lancedb::{ + ipc::ipc_file_to_batches, + table::{AddDataOptions, TableRef}, +}; use napi::bindgen_prelude::*; use napi_derive::napi; @@ -93,4 +96,104 @@ impl Table { pub fn query(&self) -> Query { Query::new(self) } + + #[napi] + pub async fn add_columns(&self, transforms: Vec) -> napi::Result<()> { + let transforms = transforms + .into_iter() + .map(|sql| (sql.name, sql.value_sql)) + .collect::>(); + let transforms = lance::dataset::NewColumnTransform::SqlExpressions(transforms); + self.table + .add_columns(transforms, None) + .await + .map_err(|err| { + napi::Error::from_reason(format!( + "Failed to add columns to table {}: {}", + self.table, err + )) + })?; + Ok(()) + } + + #[napi] + pub async fn alter_columns(&self, alterations: Vec) -> napi::Result<()> { + for alteration in &alterations { + if alteration.rename.is_none() && alteration.nullable.is_none() { + return Err(napi::Error::from_reason( + "Alteration must have a 'rename' or 'nullable' field.", + )); + } + } + let alterations = alterations + .into_iter() + .map(LanceColumnAlteration::from) + .collect::>(); + + self.table + .alter_columns(&alterations) + .await + .map_err(|err| { + napi::Error::from_reason(format!( + "Failed to alter columns in table {}: {}", + self.table, err + )) + })?; + Ok(()) + } + + #[napi] + pub async fn drop_columns(&self, columns: Vec) -> napi::Result<()> { + let col_refs = columns.iter().map(String::as_str).collect::>(); + self.table.drop_columns(&col_refs).await.map_err(|err| { + napi::Error::from_reason(format!( + "Failed to drop columns from table {}: {}", + self.table, err + )) + })?; + Ok(()) + } +} + +/// A definition of a column alteration. The alteration changes the column at +/// `path` to have the new name `name`, to be nullable if `nullable` is true, +/// and to have the data type `data_type`. At least one of `rename` or `nullable` +/// must be provided. +#[napi(object)] +pub struct ColumnAlteration { + /// The path to the column to alter. This is a dot-separated path to the column. + /// If it is a top-level column then it is just the name of the column. If it is + /// a nested column then it is the path to the column, e.g. "a.b.c" for a column + /// `c` nested inside a column `b` nested inside a column `a`. + pub path: String, + /// The new name of the column. If not provided then the name will not be changed. + /// This must be distinct from the names of all other columns in the table. + pub rename: Option, + /// Set the new nullability. Note that a nullable column cannot be made non-nullable. + pub nullable: Option, +} + +impl From for LanceColumnAlteration { + fn from(js: ColumnAlteration) -> Self { + let ColumnAlteration { + path, + rename, + nullable, + } = js; + Self { + path, + rename, + nullable, + } + } +} + +/// A definition of a new column to add to a table. +#[napi(object)] +pub struct AddColumnsSql { + /// The name of the new column. + pub name: String, + /// The values to populate the new column with, as a SQL expression. + /// The expression can reference other columns in the table. + pub value_sql: String, } diff --git a/python/lancedb/remote/table.py b/python/lancedb/remote/table.py index a8766eef..147a0b08 100644 --- a/python/lancedb/remote/table.py +++ b/python/lancedb/remote/table.py @@ -15,7 +15,7 @@ import logging import uuid from concurrent.futures import Future from functools import cached_property -from typing import Dict, Optional, Union +from typing import Dict, Iterable, Optional, Union import pyarrow as pa from lance import json_to_schema @@ -473,6 +473,21 @@ class RemoteTable(Table): "count_rows() is not yet supported on the LanceDB cloud" ) + def add_columns(self, transforms: Dict[str, str]): + raise NotImplementedError( + "add_columns() is not yet supported on the LanceDB cloud" + ) + + def alter_columns(self, alterations: Iterable[Dict[str, str]]): + raise NotImplementedError( + "alter_columns() is not yet supported on the LanceDB cloud" + ) + + def drop_columns(self, columns: Iterable[str]): + raise NotImplementedError( + "drop_columns() is not yet supported on the LanceDB cloud" + ) + def add_index(tbl: pa.Table, i: int) -> pa.Table: return tbl.add_column( diff --git a/python/lancedb/table.py b/python/lancedb/table.py index 04bad713..b654b6d8 100644 --- a/python/lancedb/table.py +++ b/python/lancedb/table.py @@ -660,6 +660,56 @@ class Table(ABC): For most cases, the default should be fine. """ + @abstractmethod + def add_columns(self, transforms: Dict[str, str]): + """ + Add new columns with defined values. + + This is not yet available in LanceDB Cloud. + + Parameters + ---------- + transforms: Dict[str, str] + A map of column name to a SQL expression to use to calculate the + value of the new column. These expressions will be evaluated for + each row in the table, and can reference existing columns. + """ + + @abstractmethod + def alter_columns(self, alterations: Iterable[Dict[str, str]]): + """ + Alter column names and nullability. + + This is not yet available in LanceDB Cloud. + + alterations : Iterable[Dict[str, Any]] + A sequence of dictionaries, each with the following keys: + - "path": str + The column path to alter. For a top-level column, this is the name. + For a nested column, this is the dot-separated path, e.g. "a.b.c". + - "name": str, optional + The new name of the column. If not specified, the column name is + not changed. + - "nullable": bool, optional + Whether the column should be nullable. If not specified, the column + nullability is not changed. Only non-nullable columns can be changed + to nullable. Currently, you cannot change a nullable column to + non-nullable. + """ + + @abstractmethod + def drop_columns(self, columns: Iterable[str]): + """ + Drop columns from the table. + + This is not yet available in LanceDB Cloud. + + Parameters + ---------- + columns : Iterable[str] + The names of the columns to drop. + """ + class _LanceDatasetRef(ABC): @property @@ -1536,6 +1586,22 @@ class LanceTable(Table): """ return self.to_lance().optimize.compact_files(*args, **kwargs) + def add_columns(self, transforms: Dict[str, str]): + self._dataset_mut.add_columns(transforms) + + def alter_columns(self, *alterations: Iterable[Dict[str, str]]): + modified = [] + # I called this name in pylance, but I think I regret that now. So we + # allow both name and rename. + for alter in alterations: + if "rename" in alter: + alter["name"] = alter.pop("rename") + modified.append(alter) + self._dataset_mut.alter_columns(*modified) + + def drop_columns(self, columns: Iterable[str]): + self._dataset_mut.drop_columns(columns) + def _sanitize_schema( data: pa.Table, diff --git a/python/pyproject.toml b/python/pyproject.toml index 59d05ebc..3fedcaf6 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -62,7 +62,7 @@ lancedb = "lancedb.cli.cli:cli" requires = ["setuptools", "wheel"] build-backend = "setuptools.build_meta" -[tool.ruff] +[tool.ruff.lint] select = ["F", "E", "W", "I", "G", "TCH", "PERF"] [tool.pytest.ini_options] diff --git a/python/tests/test_table.py b/python/tests/test_table.py index 9282a5c6..6b2eac92 100644 --- a/python/tests/test_table.py +++ b/python/tests/test_table.py @@ -898,3 +898,29 @@ def test_restore_consistency(tmp_path): table.add([{"id": 2}]) assert table_fixed.version == table.version - 1 assert table_ref_latest.version == table.version + + +# Schema evolution +def test_add_columns(tmp_path): + db = lancedb.connect(tmp_path) + data = pa.table({"id": [0, 1]}) + table = LanceTable.create(db, "my_table", data=data) + table.add_columns({"new_col": "id + 2"}) + assert table.to_arrow().column_names == ["id", "new_col"] + assert table.to_arrow()["new_col"].to_pylist() == [2, 3] + + +def test_alter_columns(tmp_path): + db = lancedb.connect(tmp_path) + data = pa.table({"id": [0, 1]}) + table = LanceTable.create(db, "my_table", data=data) + table.alter_columns({"path": "id", "rename": "new_id"}) + assert table.to_arrow().column_names == ["new_id"] + + +def test_drop_columns(tmp_path): + db = lancedb.connect(tmp_path) + data = pa.table({"id": [0, 1], "category": ["a", "b"]}) + table = LanceTable.create(db, "my_table", data=data) + table.drop_columns(["category"]) + assert table.to_arrow().column_names == ["id"] diff --git a/rust/ffi/node/src/lib.rs b/rust/ffi/node/src/lib.rs index 33c8e457..070e2afc 100644 --- a/rust/ffi/node/src/lib.rs +++ b/rust/ffi/node/src/lib.rs @@ -286,5 +286,8 @@ fn main(mut cx: ModuleContext) -> NeonResult<()> { index::vector::table_create_vector_index, )?; cx.export_function("tableSchema", JsTable::js_schema)?; + cx.export_function("tableAddColumns", JsTable::js_add_columns)?; + cx.export_function("tableAlterColumns", JsTable::js_alter_columns)?; + cx.export_function("tableDropColumns", JsTable::js_drop_columns)?; Ok(()) } diff --git a/rust/ffi/node/src/table.rs b/rust/ffi/node/src/table.rs index cbc6fc3e..5aa38727 100644 --- a/rust/ffi/node/src/table.rs +++ b/rust/ffi/node/src/table.rs @@ -16,7 +16,7 @@ use std::ops::Deref; use arrow_array::{RecordBatch, RecordBatchIterator}; use lance::dataset::optimize::CompactionOptions; -use lance::dataset::{WriteMode, WriteParams}; +use lance::dataset::{ColumnAlteration, NewColumnTransform, WriteMode, WriteParams}; use lance::io::ObjectStoreParams; use lancedb::table::{AddDataOptions, OptimizeAction, WriteOptions}; @@ -544,4 +544,116 @@ impl JsTable { }); Ok(promise) } + + pub(crate) fn js_add_columns(mut cx: FunctionContext) -> JsResult { + let expressions = cx + .argument::(0)? + .to_vec(&mut cx)? + .into_iter() + .map(|val| { + let obj = val.downcast_or_throw::(&mut cx)?; + let name = obj.get::(&mut cx, "name")?.value(&mut cx); + let sql = obj + .get::(&mut cx, "valueSql")? + .value(&mut cx); + Ok((name, sql)) + }) + .collect::>>()?; + + let transforms = NewColumnTransform::SqlExpressions(expressions); + + let js_table = cx.this().downcast_or_throw::, _>(&mut cx)?; + let rt = runtime(&mut cx)?; + + let (deferred, promise) = cx.promise(); + let channel = cx.channel(); + let table = js_table.table.clone(); + + rt.spawn(async move { + let result = table.add_columns(transforms, None).await; + deferred.settle_with(&channel, move |mut cx| { + result.or_throw(&mut cx)?; + Ok(cx.undefined()) + }) + }); + + Ok(promise) + } + + pub(crate) fn js_alter_columns(mut cx: FunctionContext) -> JsResult { + let alterations = cx + .argument::(0)? + .to_vec(&mut cx)? + .into_iter() + .map(|val| { + let obj = val.downcast_or_throw::(&mut cx)?; + let path = obj.get::(&mut cx, "path")?.value(&mut cx); + let rename = obj + .get_opt::(&mut cx, "rename")? + .map(|val| val.value(&mut cx)); + let nullable = obj + .get_opt::(&mut cx, "nullable")? + .map(|val| val.value(&mut cx)); + // TODO: support data type here. Will need to do some serialization/deserialization + + if rename.is_none() && nullable.is_none() { + return cx.throw_error("At least one of 'name' or 'nullable' must be provided"); + } + + Ok(ColumnAlteration { + path, + rename, + nullable, + }) + }) + .collect::>>()?; + + let js_table = cx.this().downcast_or_throw::, _>(&mut cx)?; + let rt = runtime(&mut cx)?; + + let (deferred, promise) = cx.promise(); + let channel = cx.channel(); + let table = js_table.table.clone(); + + rt.spawn(async move { + let result = table.alter_columns(&alterations).await; + deferred.settle_with(&channel, move |mut cx| { + result.or_throw(&mut cx)?; + Ok(cx.undefined()) + }) + }); + + Ok(promise) + } + + pub(crate) fn js_drop_columns(mut cx: FunctionContext) -> JsResult { + let columns = cx + .argument::(0)? + .to_vec(&mut cx)? + .into_iter() + .map(|val| { + Ok(val + .downcast_or_throw::(&mut cx)? + .value(&mut cx)) + }) + .collect::>>()?; + + let js_table = cx.this().downcast_or_throw::, _>(&mut cx)?; + let rt = runtime(&mut cx)?; + + let (deferred, promise) = cx.promise(); + let channel = cx.channel(); + let table = js_table.table.clone(); + + rt.spawn(async move { + let col_refs = columns.iter().map(|s| s.as_str()).collect::>(); + let result = table.drop_columns(&col_refs).await; + deferred.settle_with(&channel, move |mut cx| { + result.or_throw(&mut cx)?; + Ok(cx.undefined()) + }) + }); + + Ok(promise) + } } diff --git a/rust/lancedb/src/table.rs b/rust/lancedb/src/table.rs index 1646af8a..c9638f08 100644 --- a/rust/lancedb/src/table.rs +++ b/rust/lancedb/src/table.rs @@ -27,7 +27,10 @@ use lance::dataset::optimize::{ compact_files, CompactionMetrics, CompactionOptions, IndexRemapperOptions, }; pub use lance::dataset::ReadParams; -use lance::dataset::{Dataset, UpdateBuilder, WhenMatched, WriteMode, WriteParams}; +use lance::dataset::{ + ColumnAlteration, Dataset, NewColumnTransform, UpdateBuilder, WhenMatched, WriteMode, + WriteParams, +}; use lance::dataset::{MergeInsertBuilder as LanceMergeInsertBuilder, WhenNotMatchedBySource}; use lance::io::WrappingObjectStore; use lance_index::{optimize::OptimizeOptions, DatasetIndexExt}; @@ -376,6 +379,19 @@ pub trait Table: std::fmt::Display + Send + Sync { /// Modeled after ``VACUUM`` in PostgreSQL. /// Not all implementations support explicit optimization. async fn optimize(&self, action: OptimizeAction) -> Result; + + /// Add new columns to the table, providing values to fill in. + async fn add_columns( + &self, + transforms: NewColumnTransform, + read_columns: Option>, + ) -> Result<()>; + + /// Change a column's name or nullability. + async fn alter_columns(&self, alterations: &[ColumnAlteration]) -> Result<()>; + + /// Remove columns from the table. + async fn drop_columns(&self, columns: &[&str]) -> Result<()>; } /// Reference to a Table pointer. @@ -902,6 +918,33 @@ impl Table for NativeTable { } Ok(stats) } + + async fn add_columns( + &self, + transforms: NewColumnTransform, + read_columns: Option>, + ) -> Result<()> { + self.dataset + .get_mut() + .await? + .add_columns(transforms, read_columns) + .await?; + Ok(()) + } + + async fn alter_columns(&self, alterations: &[ColumnAlteration]) -> Result<()> { + self.dataset + .get_mut() + .await? + .alter_columns(alterations) + .await?; + Ok(()) + } + + async fn drop_columns(&self, columns: &[&str]) -> Result<()> { + self.dataset.get_mut().await?.drop_columns(columns).await?; + Ok(()) + } } #[cfg(test)]