chore(rust): simplified version of optimize (#869)

Consolidate various optimize() into one method, similar to postgres
VACCUM in the process of preparing Rust API for public use
This commit is contained in:
Lei Xu
2024-01-26 11:36:04 -08:00
committed by GitHub
parent a6cf24b359
commit e01ef63488
6 changed files with 134 additions and 66 deletions

View File

@@ -391,24 +391,6 @@ describe('LanceDB client', function () {
})
}).timeout(120000)
it('fails to create a new table when the vector column is missing', async function () {
const dir = await track().mkdir('lancejs')
const con = await lancedb.connect(dir)
const data = [
{
id: 1,
price: 10
}
]
const create = con.createTable('missing_vector', data)
await expect(create).to.be.rejectedWith(
Error,
"column 'vector' is missing"
)
})
it('use overwrite flag to overwrite existing table', async function () {
const dir = await track().mkdir('lancejs')
const con = await lancedb.connect(dir)

View File

@@ -1,4 +1,4 @@
// Copyright 2023 Lance Developers.
// Copyright 2024 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -19,19 +19,8 @@ use arrow_array::RecordBatch;
use arrow_ipc::reader::FileReader;
use arrow_ipc::writer::FileWriter;
use arrow_schema::SchemaRef;
use vectordb::table::VECTOR_COLUMN_NAME;
use crate::error::{MissingColumnSnafu, Result};
use snafu::prelude::*;
fn validate_vector_column(record_batch: &RecordBatch) -> Result<()> {
record_batch
.column_by_name(VECTOR_COLUMN_NAME)
.map(|_| ())
.context(MissingColumnSnafu {
name: VECTOR_COLUMN_NAME,
})
}
use crate::error::Result;
pub(crate) fn arrow_buffer_to_record_batch(slice: &[u8]) -> Result<(Vec<RecordBatch>, SchemaRef)> {
let mut batches: Vec<RecordBatch> = Vec::new();
@@ -39,7 +28,6 @@ pub(crate) fn arrow_buffer_to_record_batch(slice: &[u8]) -> Result<(Vec<RecordBa
let schema = file_reader.schema();
for b in file_reader {
let record_batch = b?;
validate_vector_column(&record_batch)?;
batches.push(record_batch);
}
Ok((batches, schema))

View File

@@ -19,6 +19,7 @@ use neon::{
};
use crate::{error::ResultExt, runtime, table::JsTable};
use vectordb::Table;
pub(crate) fn table_create_scalar_index(mut cx: FunctionContext) -> JsResult<JsPromise> {
let js_table = cx.this().downcast_or_throw::<JsBox<JsTable>, _>(&mut cx)?;
@@ -35,7 +36,9 @@ pub(crate) fn table_create_scalar_index(mut cx: FunctionContext) -> JsResult<JsP
let idx_result = table
.as_native()
.unwrap()
.create_scalar_index(&column, replace)
.create_index(&[&column])
.replace(replace)
.build()
.await;
deferred.settle_with(&channel, move |mut cx| {

View File

@@ -16,6 +16,7 @@ use arrow_array::{RecordBatch, RecordBatchIterator};
use lance::dataset::optimize::CompactionOptions;
use lance::dataset::{WriteMode, WriteParams};
use lance::io::ObjectStoreParams;
use vectordb::table::OptimizeAction;
use crate::arrow::{arrow_buffer_to_record_batch, record_batch_to_buffer};
use neon::prelude::*;
@@ -245,27 +246,30 @@ impl JsTable {
.map(|val| val.value(&mut cx) as i64)
.unwrap_or_else(|| 2 * 7 * 24 * 60); // 2 weeks
let older_than = chrono::Duration::minutes(older_than);
let delete_unverified: bool = cx
.argument_opt(1)
.and_then(|val| val.downcast::<JsBoolean, _>(&mut cx).ok())
.map(|val| val.value(&mut cx))
.unwrap_or_default();
let delete_unverified: Option<bool> = Some(
cx.argument_opt(1)
.and_then(|val| val.downcast::<JsBoolean, _>(&mut cx).ok())
.map(|val| val.value(&mut cx))
.unwrap_or_default(),
);
rt.spawn(async move {
let stats = table
.as_native()
.unwrap()
.cleanup_old_versions(older_than, Some(delete_unverified))
.optimize(OptimizeAction::Prune {
older_than,
delete_unverified,
})
.await;
deferred.settle_with(&channel, move |mut cx| {
let stats = stats.or_throw(&mut cx)?;
let prune_stats = stats.prune.as_ref().expect("Prune stats missing");
let output_metrics = JsObject::new(&mut cx);
let bytes_removed = cx.number(stats.bytes_removed as f64);
let bytes_removed = cx.number(prune_stats.bytes_removed as f64);
output_metrics.set(&mut cx, "bytesRemoved", bytes_removed)?;
let old_versions = cx.number(stats.old_versions as f64);
let old_versions = cx.number(prune_stats.old_versions as f64);
output_metrics.set(&mut cx, "oldVersions", old_versions)?;
let output_table = cx.boxed(JsTable::from(table));
@@ -317,13 +321,15 @@ impl JsTable {
rt.spawn(async move {
let stats = table
.as_native()
.unwrap()
.compact_files(options, None)
.optimize(OptimizeAction::Compact {
options,
remap_options: None,
})
.await;
deferred.settle_with(&channel, move |mut cx| {
let stats = stats.or_throw(&mut cx)?;
let stats = stats.compaction.as_ref().expect("Compact stats missing");
let output_metrics = JsObject::new(&mut cx);
let fragments_removed = cx.number(stats.fragments_removed as f64);

View File

@@ -14,6 +14,7 @@
use std::{cmp::max, sync::Arc};
use lance::index::scalar::ScalarIndexParams;
use lance_index::{DatasetIndexExt, IndexType};
pub use lance_linalg::distance::MetricType;
@@ -232,10 +233,14 @@ impl IndexBuilder {
let mut dataset = tbl.clone_inner_dataset();
match params {
IndexParams::Scalar { replace } => {
self.table
.as_native()
.unwrap()
.create_scalar_index(column, replace)
dataset
.create_index(
&[&column],
IndexType::Scalar,
None,
&ScalarIndexParams::default(),
replace,
)
.await?
}
IndexParams::IvfPq {

View File

@@ -27,9 +27,9 @@ use lance::dataset::optimize::{
};
pub use lance::dataset::ReadParams;
use lance::dataset::{Dataset, UpdateBuilder, WriteParams};
use lance::index::scalar::ScalarIndexParams;
use lance::io::WrappingObjectStore;
use lance_index::{optimize::OptimizeOptions, DatasetIndexExt, IndexType};
use lance_index::{optimize::OptimizeOptions, DatasetIndexExt};
use log::info;
use crate::error::{Error, Result};
use crate::index::vector::{VectorIndex, VectorIndexStatistics};
@@ -38,7 +38,46 @@ use crate::query::Query;
use crate::utils::{PatchReadParam, PatchWriteParam};
use crate::WriteMode;
pub const VECTOR_COLUMN_NAME: &str = "vector";
/// Optimize the dataset.
///
/// Similar to `VACUUM` in PostgreSQL, it offers different options to
/// optimize different parts of the table on disk.
///
/// By default, it optimizes everything, as [`OptimizeAction::All`].
pub enum OptimizeAction {
/// Run optimization on every, with default options.
All,
/// Compact files in the dataset
Compact {
options: CompactionOptions,
remap_options: Option<Arc<dyn IndexRemapperOptions>>,
},
/// Prune old version of datasets.
Prune {
/// The duration of time to keep versions of the dataset.
older_than: Duration,
/// Because they may be part of an in-progress transaction, files newer than 7 days old are not deleted by default.
/// If you are sure that there are no in-progress transactions, then you can set this to True to delete all files older than `older_than`.
delete_unverified: Option<bool>,
},
/// Optimize index.
Index(OptimizeOptions),
}
impl Default for OptimizeAction {
fn default() -> Self {
Self::All
}
}
/// Statistics about the optimization.
pub struct OptimizeStats {
/// Stats of the file compaction.
pub compaction: Option<CompactionMetrics>,
/// Stats of the version pruning
pub prune: Option<RemovalStats>,
}
/// A Table is a collection of strong typed Rows.
///
@@ -194,6 +233,14 @@ pub trait Table: std::fmt::Display + Send + Sync {
/// # });
/// ```
fn query(&self) -> Query;
/// Optimize the on-disk data and indices for better performance.
///
/// <section class="warning">Experimental API</section>
///
/// Modeled after ``VACCUM`` in PostgreSQL.
/// Not all implementations support explicit optimization.
async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats>;
}
/// Reference to a Table pointer.
@@ -396,17 +443,8 @@ impl NativeTable {
self.dataset.lock().expect("lock poison").version().version
}
/// Create a scalar index on the table
pub async fn create_scalar_index(&self, column: &str, replace: bool) -> Result<()> {
let mut dataset = self.clone_inner_dataset();
let params = ScalarIndexParams::default();
dataset
.create_index(&[column], IndexType::Scalar, None, &params, replace)
.await?;
Ok(())
}
pub async fn optimize_indices(&mut self, options: &OptimizeOptions) -> Result<()> {
async fn optimize_indices(&self, options: &OptimizeOptions) -> Result<()> {
info!("LanceDB: optimizing indices: {:?}", options);
let mut dataset = self.clone_inner_dataset();
dataset.optimize_indices(options).await?;
@@ -463,7 +501,7 @@ impl NativeTable {
///
/// This calls into [lance::dataset::Dataset::cleanup_old_versions] and
/// returns the result.
pub async fn cleanup_old_versions(
async fn cleanup_old_versions(
&self,
older_than: Duration,
delete_unverified: Option<bool>,
@@ -480,7 +518,7 @@ impl NativeTable {
/// for faster reads.
///
/// This calls into [lance::dataset::optimize::compact_files].
pub async fn compact_files(
async fn compact_files(
&self,
options: CompactionOptions,
remap_options: Option<Arc<dyn IndexRemapperOptions>>,
@@ -614,6 +652,52 @@ impl Table for NativeTable {
self.reset_dataset(dataset);
Ok(())
}
async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats> {
let mut stats = OptimizeStats {
compaction: None,
prune: None,
};
match action {
OptimizeAction::All => {
stats.compaction = self
.optimize(OptimizeAction::Compact {
options: CompactionOptions::default(),
remap_options: None,
})
.await?
.compaction;
stats.prune = self
.optimize(OptimizeAction::Prune {
older_than: Duration::days(7),
delete_unverified: None,
})
.await?
.prune;
self.optimize(OptimizeAction::Index(OptimizeOptions::default()))
.await?;
}
OptimizeAction::Compact {
options,
remap_options,
} => {
stats.compaction = Some(self.compact_files(options, remap_options).await?);
}
OptimizeAction::Prune {
older_than,
delete_unverified,
} => {
stats.prune = Some(
self.cleanup_old_versions(older_than, delete_unverified)
.await?,
);
}
OptimizeAction::Index(options) => {
self.optimize_indices(&options).await?;
}
}
Ok(stats)
}
}
#[cfg(test)]