diff --git a/node/src/test/test.ts b/node/src/test/test.ts index a2d446b1..7cf2b1ae 100644 --- a/node/src/test/test.ts +++ b/node/src/test/test.ts @@ -391,24 +391,6 @@ describe('LanceDB client', function () { }) }).timeout(120000) - it('fails to create a new table when the vector column is missing', async function () { - const dir = await track().mkdir('lancejs') - const con = await lancedb.connect(dir) - - const data = [ - { - id: 1, - price: 10 - } - ] - - const create = con.createTable('missing_vector', data) - await expect(create).to.be.rejectedWith( - Error, - "column 'vector' is missing" - ) - }) - it('use overwrite flag to overwrite existing table', async function () { const dir = await track().mkdir('lancejs') const con = await lancedb.connect(dir) diff --git a/rust/ffi/node/src/arrow.rs b/rust/ffi/node/src/arrow.rs index 0e1054ce..2c90abf7 100644 --- a/rust/ffi/node/src/arrow.rs +++ b/rust/ffi/node/src/arrow.rs @@ -1,4 +1,4 @@ -// Copyright 2023 Lance Developers. +// Copyright 2024 Lance Developers. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -19,19 +19,8 @@ use arrow_array::RecordBatch; use arrow_ipc::reader::FileReader; use arrow_ipc::writer::FileWriter; use arrow_schema::SchemaRef; -use vectordb::table::VECTOR_COLUMN_NAME; -use crate::error::{MissingColumnSnafu, Result}; -use snafu::prelude::*; - -fn validate_vector_column(record_batch: &RecordBatch) -> Result<()> { - record_batch - .column_by_name(VECTOR_COLUMN_NAME) - .map(|_| ()) - .context(MissingColumnSnafu { - name: VECTOR_COLUMN_NAME, - }) -} +use crate::error::Result; pub(crate) fn arrow_buffer_to_record_batch(slice: &[u8]) -> Result<(Vec, SchemaRef)> { let mut batches: Vec = Vec::new(); @@ -39,7 +28,6 @@ pub(crate) fn arrow_buffer_to_record_batch(slice: &[u8]) -> Result<(Vec JsResult { let js_table = cx.this().downcast_or_throw::, _>(&mut cx)?; @@ -35,7 +36,9 @@ pub(crate) fn table_create_scalar_index(mut cx: FunctionContext) -> JsResult(&mut cx).ok()) - .map(|val| val.value(&mut cx)) - .unwrap_or_default(); + let delete_unverified: Option = Some( + cx.argument_opt(1) + .and_then(|val| val.downcast::(&mut cx).ok()) + .map(|val| val.value(&mut cx)) + .unwrap_or_default(), + ); rt.spawn(async move { let stats = table - .as_native() - .unwrap() - .cleanup_old_versions(older_than, Some(delete_unverified)) + .optimize(OptimizeAction::Prune { + older_than, + delete_unverified, + }) .await; deferred.settle_with(&channel, move |mut cx| { let stats = stats.or_throw(&mut cx)?; + let prune_stats = stats.prune.as_ref().expect("Prune stats missing"); let output_metrics = JsObject::new(&mut cx); - let bytes_removed = cx.number(stats.bytes_removed as f64); + let bytes_removed = cx.number(prune_stats.bytes_removed as f64); output_metrics.set(&mut cx, "bytesRemoved", bytes_removed)?; - let old_versions = cx.number(stats.old_versions as f64); + let old_versions = cx.number(prune_stats.old_versions as f64); output_metrics.set(&mut cx, "oldVersions", old_versions)?; let output_table = cx.boxed(JsTable::from(table)); @@ -317,13 +321,15 @@ impl JsTable { rt.spawn(async move { let stats = table - .as_native() - .unwrap() - .compact_files(options, None) + .optimize(OptimizeAction::Compact { + options, + remap_options: None, + }) .await; deferred.settle_with(&channel, move |mut cx| { let stats = stats.or_throw(&mut cx)?; + let stats = stats.compaction.as_ref().expect("Compact stats missing"); let output_metrics = JsObject::new(&mut cx); let fragments_removed = cx.number(stats.fragments_removed as f64); diff --git a/rust/vectordb/src/index.rs b/rust/vectordb/src/index.rs index 66f2c92c..78225fb9 100644 --- a/rust/vectordb/src/index.rs +++ b/rust/vectordb/src/index.rs @@ -14,6 +14,7 @@ use std::{cmp::max, sync::Arc}; +use lance::index::scalar::ScalarIndexParams; use lance_index::{DatasetIndexExt, IndexType}; pub use lance_linalg::distance::MetricType; @@ -232,10 +233,14 @@ impl IndexBuilder { let mut dataset = tbl.clone_inner_dataset(); match params { IndexParams::Scalar { replace } => { - self.table - .as_native() - .unwrap() - .create_scalar_index(column, replace) + dataset + .create_index( + &[&column], + IndexType::Scalar, + None, + &ScalarIndexParams::default(), + replace, + ) .await? } IndexParams::IvfPq { diff --git a/rust/vectordb/src/table.rs b/rust/vectordb/src/table.rs index 266c125c..49035fca 100644 --- a/rust/vectordb/src/table.rs +++ b/rust/vectordb/src/table.rs @@ -27,9 +27,9 @@ use lance::dataset::optimize::{ }; pub use lance::dataset::ReadParams; use lance::dataset::{Dataset, UpdateBuilder, WriteParams}; -use lance::index::scalar::ScalarIndexParams; use lance::io::WrappingObjectStore; -use lance_index::{optimize::OptimizeOptions, DatasetIndexExt, IndexType}; +use lance_index::{optimize::OptimizeOptions, DatasetIndexExt}; +use log::info; use crate::error::{Error, Result}; use crate::index::vector::{VectorIndex, VectorIndexStatistics}; @@ -38,7 +38,46 @@ use crate::query::Query; use crate::utils::{PatchReadParam, PatchWriteParam}; use crate::WriteMode; -pub const VECTOR_COLUMN_NAME: &str = "vector"; +/// Optimize the dataset. +/// +/// Similar to `VACUUM` in PostgreSQL, it offers different options to +/// optimize different parts of the table on disk. +/// +/// By default, it optimizes everything, as [`OptimizeAction::All`]. +pub enum OptimizeAction { + /// Run optimization on every, with default options. + All, + /// Compact files in the dataset + Compact { + options: CompactionOptions, + remap_options: Option>, + }, + /// Prune old version of datasets. + Prune { + /// The duration of time to keep versions of the dataset. + older_than: Duration, + /// Because they may be part of an in-progress transaction, files newer than 7 days old are not deleted by default. + /// If you are sure that there are no in-progress transactions, then you can set this to True to delete all files older than `older_than`. + delete_unverified: Option, + }, + /// Optimize index. + Index(OptimizeOptions), +} + +impl Default for OptimizeAction { + fn default() -> Self { + Self::All + } +} + +/// Statistics about the optimization. +pub struct OptimizeStats { + /// Stats of the file compaction. + pub compaction: Option, + + /// Stats of the version pruning + pub prune: Option, +} /// A Table is a collection of strong typed Rows. /// @@ -194,6 +233,14 @@ pub trait Table: std::fmt::Display + Send + Sync { /// # }); /// ``` fn query(&self) -> Query; + + /// Optimize the on-disk data and indices for better performance. + /// + ///
Experimental API
+ /// + /// Modeled after ``VACCUM`` in PostgreSQL. + /// Not all implementations support explicit optimization. + async fn optimize(&self, action: OptimizeAction) -> Result; } /// Reference to a Table pointer. @@ -396,17 +443,8 @@ impl NativeTable { self.dataset.lock().expect("lock poison").version().version } - /// Create a scalar index on the table - pub async fn create_scalar_index(&self, column: &str, replace: bool) -> Result<()> { - let mut dataset = self.clone_inner_dataset(); - let params = ScalarIndexParams::default(); - dataset - .create_index(&[column], IndexType::Scalar, None, ¶ms, replace) - .await?; - Ok(()) - } - - pub async fn optimize_indices(&mut self, options: &OptimizeOptions) -> Result<()> { + async fn optimize_indices(&self, options: &OptimizeOptions) -> Result<()> { + info!("LanceDB: optimizing indices: {:?}", options); let mut dataset = self.clone_inner_dataset(); dataset.optimize_indices(options).await?; @@ -463,7 +501,7 @@ impl NativeTable { /// /// This calls into [lance::dataset::Dataset::cleanup_old_versions] and /// returns the result. - pub async fn cleanup_old_versions( + async fn cleanup_old_versions( &self, older_than: Duration, delete_unverified: Option, @@ -480,7 +518,7 @@ impl NativeTable { /// for faster reads. /// /// This calls into [lance::dataset::optimize::compact_files]. - pub async fn compact_files( + async fn compact_files( &self, options: CompactionOptions, remap_options: Option>, @@ -614,6 +652,52 @@ impl Table for NativeTable { self.reset_dataset(dataset); Ok(()) } + + async fn optimize(&self, action: OptimizeAction) -> Result { + let mut stats = OptimizeStats { + compaction: None, + prune: None, + }; + match action { + OptimizeAction::All => { + stats.compaction = self + .optimize(OptimizeAction::Compact { + options: CompactionOptions::default(), + remap_options: None, + }) + .await? + .compaction; + stats.prune = self + .optimize(OptimizeAction::Prune { + older_than: Duration::days(7), + delete_unverified: None, + }) + .await? + .prune; + self.optimize(OptimizeAction::Index(OptimizeOptions::default())) + .await?; + } + OptimizeAction::Compact { + options, + remap_options, + } => { + stats.compaction = Some(self.compact_files(options, remap_options).await?); + } + OptimizeAction::Prune { + older_than, + delete_unverified, + } => { + stats.prune = Some( + self.cleanup_old_versions(older_than, delete_unverified) + .await?, + ); + } + OptimizeAction::Index(options) => { + self.optimize_indices(&options).await?; + } + } + Ok(stats) + } } #[cfg(test)]