Files
lancedb/nodejs/src/table.rs
BubbleCal 7ff6ec7fe3 feat: upgrade to lance v0.25.0-beta.5 (#2248)
- adds `loss` into the index stats for vector index
- now `optimize` can retrain the vector index

---------

Signed-off-by: BubbleCal <bubble-cal@outlook.com>
2025-03-21 10:12:23 -07:00

524 lines
17 KiB
Rust

// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use std::collections::HashMap;
use arrow_ipc::writer::FileWriter;
use lancedb::ipc::ipc_file_to_batches;
use lancedb::table::{
AddDataMode, ColumnAlteration as LanceColumnAlteration, Duration, NewColumnTransform,
OptimizeAction, OptimizeOptions, Table as LanceDbTable,
};
use napi::bindgen_prelude::*;
use napi_derive::napi;
use crate::error::NapiErrorExt;
use crate::index::Index;
use crate::merge::NativeMergeInsertBuilder;
use crate::query::{Query, VectorQuery};
#[napi]
pub struct Table {
// We keep a duplicate of the table name so we can use it for error
// messages even if the table has been closed
pub name: String,
pub(crate) inner: Option<LanceDbTable>,
}
impl Table {
fn inner_ref(&self) -> napi::Result<&LanceDbTable> {
self.inner
.as_ref()
.ok_or_else(|| napi::Error::from_reason(format!("Table {} is closed", self.name)))
}
}
#[napi]
impl Table {
pub(crate) fn new(table: LanceDbTable) -> Self {
Self {
name: table.name().to_string(),
inner: Some(table),
}
}
#[napi]
pub fn display(&self) -> String {
match &self.inner {
None => format!("ClosedTable({})", self.name),
Some(inner) => inner.to_string(),
}
}
#[napi]
pub fn is_open(&self) -> bool {
self.inner.is_some()
}
#[napi]
pub fn close(&mut self) {
self.inner.take();
}
/// Return Schema as empty Arrow IPC file.
#[napi(catch_unwind)]
pub async fn schema(&self) -> napi::Result<Buffer> {
let schema = self.inner_ref()?.schema().await.default_error()?;
let mut writer = FileWriter::try_new(vec![], &schema)
.map_err(|e| napi::Error::from_reason(format!("Failed to create IPC file: {}", e)))?;
writer
.finish()
.map_err(|e| napi::Error::from_reason(format!("Failed to finish IPC file: {}", e)))?;
Ok(Buffer::from(writer.into_inner().map_err(|e| {
napi::Error::from_reason(format!("Failed to get IPC file: {}", e))
})?))
}
#[napi(catch_unwind)]
pub async fn add(&self, buf: Buffer, mode: String) -> napi::Result<()> {
let batches = ipc_file_to_batches(buf.to_vec())
.map_err(|e| napi::Error::from_reason(format!("Failed to read IPC file: {}", e)))?;
let mut op = self.inner_ref()?.add(batches);
op = if mode == "append" {
op.mode(AddDataMode::Append)
} else if mode == "overwrite" {
op.mode(AddDataMode::Overwrite)
} else {
return Err(napi::Error::from_reason(format!("Invalid mode: {}", mode)));
};
op.execute().await.default_error()
}
#[napi(catch_unwind)]
pub async fn count_rows(&self, filter: Option<String>) -> napi::Result<i64> {
self.inner_ref()?
.count_rows(filter)
.await
.map(|val| val as i64)
.default_error()
}
#[napi(catch_unwind)]
pub async fn delete(&self, predicate: String) -> napi::Result<()> {
self.inner_ref()?.delete(&predicate).await.default_error()
}
#[napi(catch_unwind)]
pub async fn create_index(
&self,
index: Option<&Index>,
column: String,
replace: Option<bool>,
) -> napi::Result<()> {
let lancedb_index = if let Some(index) = index {
index.consume()?
} else {
lancedb::index::Index::Auto
};
let mut builder = self.inner_ref()?.create_index(&[column], lancedb_index);
if let Some(replace) = replace {
builder = builder.replace(replace);
}
builder.execute().await.default_error()
}
#[napi(catch_unwind)]
pub async fn drop_index(&self, index_name: String) -> napi::Result<()> {
self.inner_ref()?
.drop_index(&index_name)
.await
.default_error()
}
#[napi(catch_unwind)]
pub async fn update(
&self,
only_if: Option<String>,
columns: Vec<(String, String)>,
) -> napi::Result<u64> {
let mut op = self.inner_ref()?.update();
if let Some(only_if) = only_if {
op = op.only_if(only_if);
}
for (column_name, value) in columns {
op = op.column(column_name, value);
}
op.execute().await.default_error()
}
#[napi(catch_unwind)]
pub fn query(&self) -> napi::Result<Query> {
Ok(Query::new(self.inner_ref()?.query()))
}
#[napi(catch_unwind)]
pub fn vector_search(&self, vector: Float32Array) -> napi::Result<VectorQuery> {
self.query()?.nearest_to(vector)
}
#[napi(catch_unwind)]
pub async fn add_columns(&self, transforms: Vec<AddColumnsSql>) -> napi::Result<()> {
let transforms = transforms
.into_iter()
.map(|sql| (sql.name, sql.value_sql))
.collect::<Vec<_>>();
let transforms = NewColumnTransform::SqlExpressions(transforms);
self.inner_ref()?
.add_columns(transforms, None)
.await
.default_error()?;
Ok(())
}
#[napi(catch_unwind)]
pub async fn alter_columns(&self, alterations: Vec<ColumnAlteration>) -> napi::Result<()> {
for alteration in &alterations {
if alteration.rename.is_none()
&& alteration.nullable.is_none()
&& alteration.data_type.is_none()
{
return Err(napi::Error::from_reason(
"Alteration must have a 'rename', 'dataType', or 'nullable' field.",
));
}
}
let alterations = alterations
.into_iter()
.map(LanceColumnAlteration::try_from)
.collect::<std::result::Result<Vec<_>, String>>()
.map_err(napi::Error::from_reason)?;
self.inner_ref()?
.alter_columns(&alterations)
.await
.default_error()?;
Ok(())
}
#[napi(catch_unwind)]
pub async fn drop_columns(&self, columns: Vec<String>) -> napi::Result<()> {
let col_refs = columns.iter().map(String::as_str).collect::<Vec<_>>();
self.inner_ref()?
.drop_columns(&col_refs)
.await
.default_error()?;
Ok(())
}
#[napi(catch_unwind)]
pub async fn version(&self) -> napi::Result<i64> {
self.inner_ref()?
.version()
.await
.map(|val| val as i64)
.default_error()
}
#[napi(catch_unwind)]
pub async fn checkout(&self, version: i64) -> napi::Result<()> {
self.inner_ref()?
.checkout(version as u64)
.await
.default_error()
}
#[napi(catch_unwind)]
pub async fn checkout_latest(&self) -> napi::Result<()> {
self.inner_ref()?.checkout_latest().await.default_error()
}
#[napi(catch_unwind)]
pub async fn list_versions(&self) -> napi::Result<Vec<Version>> {
self.inner_ref()?
.list_versions()
.await
.map(|versions| {
versions
.iter()
.map(|version| Version {
version: version.version as i64,
timestamp: version.timestamp.timestamp_micros(),
metadata: version
.metadata
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect(),
})
.collect()
})
.default_error()
}
#[napi(catch_unwind)]
pub async fn restore(&self) -> napi::Result<()> {
self.inner_ref()?.restore().await.default_error()
}
#[napi(catch_unwind)]
pub async fn optimize(
&self,
older_than_ms: Option<i64>,
delete_unverified: Option<bool>,
) -> napi::Result<OptimizeStats> {
let inner = self.inner_ref()?;
let older_than = if let Some(ms) = older_than_ms {
if ms == i64::MIN {
return Err(napi::Error::from_reason(format!(
"older_than_ms can not be {}",
i32::MIN,
)));
}
Duration::try_milliseconds(ms)
} else {
None
};
let compaction_stats = inner
.optimize(OptimizeAction::Compact {
options: lancedb::table::CompactionOptions::default(),
remap_options: None,
})
.await
.default_error()?
.compaction
.unwrap();
let prune_stats = inner
.optimize(OptimizeAction::Prune {
older_than,
delete_unverified,
error_if_tagged_old_versions: None,
})
.await
.default_error()?
.prune
.unwrap();
inner
.optimize(lancedb::table::OptimizeAction::Index(
OptimizeOptions::default(),
))
.await
.default_error()?;
Ok(OptimizeStats {
compaction: CompactionStats {
files_added: compaction_stats.files_added as i64,
files_removed: compaction_stats.files_removed as i64,
fragments_added: compaction_stats.fragments_added as i64,
fragments_removed: compaction_stats.fragments_removed as i64,
},
prune: RemovalStats {
bytes_removed: prune_stats.bytes_removed as i64,
old_versions_removed: prune_stats.old_versions as i64,
},
})
}
#[napi(catch_unwind)]
pub async fn list_indices(&self) -> napi::Result<Vec<IndexConfig>> {
Ok(self
.inner_ref()?
.list_indices()
.await
.default_error()?
.into_iter()
.map(IndexConfig::from)
.collect::<Vec<_>>())
}
#[napi(catch_unwind)]
pub async fn index_stats(&self, index_name: String) -> napi::Result<Option<IndexStatistics>> {
let tbl = self.inner_ref()?;
let stats = tbl.index_stats(&index_name).await.default_error()?;
Ok(stats.map(IndexStatistics::from))
}
#[napi(catch_unwind)]
pub fn merge_insert(&self, on: Vec<String>) -> napi::Result<NativeMergeInsertBuilder> {
let on: Vec<_> = on.iter().map(String::as_str).collect();
Ok(self.inner_ref()?.merge_insert(on.as_slice()).into())
}
#[napi(catch_unwind)]
pub async fn uses_v2_manifest_paths(&self) -> napi::Result<bool> {
self.inner_ref()?
.as_native()
.ok_or_else(|| napi::Error::from_reason("This cannot be run on a remote table"))?
.uses_v2_manifest_paths()
.await
.default_error()
}
#[napi(catch_unwind)]
pub async fn migrate_manifest_paths_v2(&self) -> napi::Result<()> {
self.inner_ref()?
.as_native()
.ok_or_else(|| napi::Error::from_reason("This cannot be run on a remote table"))?
.migrate_manifest_paths_v2()
.await
.default_error()
}
}
#[napi(object)]
/// A description of an index currently configured on a column
pub struct IndexConfig {
/// The name of the index
pub name: String,
/// The type of the index
pub index_type: String,
/// The columns in the index
///
/// Currently this is always an array of size 1. In the future there may
/// be more columns to represent composite indices.
pub columns: Vec<String>,
}
impl From<lancedb::index::IndexConfig> for IndexConfig {
fn from(value: lancedb::index::IndexConfig) -> Self {
let index_type = format!("{:?}", value.index_type);
Self {
index_type,
columns: value.columns,
name: value.name,
}
}
}
/// Statistics about a compaction operation.
#[napi(object)]
#[derive(Clone, Debug)]
pub struct CompactionStats {
/// The number of fragments removed
pub fragments_removed: i64,
/// The number of new, compacted fragments added
pub fragments_added: i64,
/// The number of data files removed
pub files_removed: i64,
/// The number of new, compacted data files added
pub files_added: i64,
}
/// Statistics about a cleanup operation
#[napi(object)]
#[derive(Clone, Debug)]
pub struct RemovalStats {
/// The number of bytes removed
pub bytes_removed: i64,
/// The number of old versions removed
pub old_versions_removed: i64,
}
/// Statistics about an optimize operation
#[napi(object)]
#[derive(Clone, Debug)]
pub struct OptimizeStats {
/// Statistics about the compaction operation
pub compaction: CompactionStats,
/// Statistics about the removal operation
pub prune: RemovalStats,
}
/// A definition of a column alteration. The alteration changes the column at
/// `path` to have the new name `name`, to be nullable if `nullable` is true,
/// and to have the data type `data_type`. At least one of `rename` or `nullable`
/// must be provided.
#[napi(object)]
pub struct ColumnAlteration {
/// The path to the column to alter. This is a dot-separated path to the column.
/// If it is a top-level column then it is just the name of the column. If it is
/// a nested column then it is the path to the column, e.g. "a.b.c" for a column
/// `c` nested inside a column `b` nested inside a column `a`.
pub path: String,
/// The new name of the column. If not provided then the name will not be changed.
/// This must be distinct from the names of all other columns in the table.
pub rename: Option<String>,
/// A new data type for the column. If not provided then the data type will not be changed.
/// Changing data types is limited to casting to the same general type. For example, these
/// changes are valid:
/// * `int32` -> `int64` (integers)
/// * `double` -> `float` (floats)
/// * `string` -> `large_string` (strings)
/// But these changes are not:
/// * `int32` -> `double` (mix integers and floats)
/// * `string` -> `int32` (mix strings and integers)
pub data_type: Option<String>,
/// Set the new nullability. Note that a nullable column cannot be made non-nullable.
pub nullable: Option<bool>,
}
impl TryFrom<ColumnAlteration> for LanceColumnAlteration {
type Error = String;
fn try_from(js: ColumnAlteration) -> std::result::Result<Self, Self::Error> {
let ColumnAlteration {
path,
rename,
nullable,
data_type,
} = js;
let data_type = if let Some(data_type) = data_type {
Some(
lancedb::utils::string_to_datatype(&data_type)
.ok_or_else(|| format!("Invalid data type: {}", data_type))?,
)
} else {
None
};
Ok(Self {
path,
rename,
nullable,
data_type,
})
}
}
/// A definition of a new column to add to a table.
#[napi(object)]
pub struct AddColumnsSql {
/// The name of the new column.
pub name: String,
/// The values to populate the new column with, as a SQL expression.
/// The expression can reference other columns in the table.
pub value_sql: String,
}
#[napi(object)]
pub struct IndexStatistics {
/// The number of rows indexed by the index
pub num_indexed_rows: f64,
/// The number of rows not indexed
pub num_unindexed_rows: f64,
/// The type of the index
pub index_type: String,
/// The type of the distance function used by the index. This is only
/// present for vector indices. Scalar and full text search indices do
/// not have a distance function.
pub distance_type: Option<String>,
/// The number of parts this index is split into.
pub num_indices: Option<u32>,
/// The KMeans loss value of the index,
/// it is only present for vector indices.
pub loss: Option<f64>,
}
impl From<lancedb::index::IndexStatistics> for IndexStatistics {
fn from(value: lancedb::index::IndexStatistics) -> Self {
Self {
num_indexed_rows: value.num_indexed_rows as f64,
num_unindexed_rows: value.num_unindexed_rows as f64,
index_type: value.index_type.to_string(),
distance_type: value.distance_type.map(|d| d.to_string()),
num_indices: value.num_indices,
loss: value.loss,
}
}
}
#[napi(object)]
pub struct Version {
pub version: i64,
pub timestamp: i64,
pub metadata: HashMap<String, String>,
}