From 4f512af024e1691a022842ad8bbb69e2ba72e80c Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 20 May 2024 07:09:31 -0700 Subject: [PATCH] feat: add the optimize function to nodejs and async python (#1257) The optimize function is pretty crucial for getting good performance when building a large scale dataset but it was only exposed in rust (many sync python users are probably doing this via to_lance today) This PR adds the optimize function to nodejs and to python. I left the function marked experimental because I think there will likely be changes to optimization (e.g. if we add features like "optimize on write"). I also only exposed the `cleanup_older_than` configuration parameter since this one is very commonly used and the rest have sensible defaults and we don't really know why we would recommend different values for these defaults anyways. --- nodejs/__test__/table.test.ts | 28 ++++++++++ nodejs/lancedb/table.ts | 60 ++++++++++++++++++++ nodejs/src/table.rs | 81 ++++++++++++++++++++++++++- python/python/lancedb/_lancedb.pyi | 14 +++++ python/python/lancedb/table.py | 45 ++++++++++++++- python/python/tests/test_table.py | 26 +++++++++ python/src/table.rs | 81 ++++++++++++++++++++++++++- rust/ffi/node/src/table.rs | 2 +- rust/lancedb/src/table.rs | 90 +++++++++++++++++++++++++----- 9 files changed, 407 insertions(+), 20 deletions(-) diff --git a/nodejs/__test__/table.test.ts b/nodejs/__test__/table.test.ts index 5c770fbe..cccb62d5 100644 --- a/nodejs/__test__/table.test.ts +++ b/nodejs/__test__/table.test.ts @@ -419,3 +419,31 @@ describe("when dealing with versioning", () => { ); }); }); + +describe("when optimizing a dataset", () => { + let tmpDir: tmp.DirResult; + let table: Table; + beforeEach(async () => { + tmpDir = tmp.dirSync({ unsafeCleanup: true }); + const con = await connect(tmpDir.name); + table = await con.createTable("vectors", [{ id: 1 }]); + await table.add([{ id: 2 }]); + }); + afterEach(() => { + tmpDir.removeCallback(); + }); + + it("compacts files", async () => { + const stats = await table.optimize(); + expect(stats.compaction.filesAdded).toBe(1); + expect(stats.compaction.filesRemoved).toBe(2); + expect(stats.compaction.fragmentsAdded).toBe(1); + expect(stats.compaction.fragmentsRemoved).toBe(2); + }); + + it("cleanups old versions", async () => { + const stats = await table.optimize({ cleanupOlderThan: new Date() }); + expect(stats.prune.bytesRemoved).toBeGreaterThan(0); + expect(stats.prune.oldVersionsRemoved).toBe(3); + }); +}); diff --git a/nodejs/lancedb/table.ts b/nodejs/lancedb/table.ts index 12e73648..acb58f38 100644 --- a/nodejs/lancedb/table.ts +++ b/nodejs/lancedb/table.ts @@ -19,6 +19,7 @@ import { AddColumnsSql, ColumnAlteration, IndexConfig, + OptimizeStats, Table as _NativeTable, } from "./native"; import { Query, VectorQuery } from "./query"; @@ -50,6 +51,23 @@ export interface UpdateOptions { where: string; } +export interface OptimizeOptions { + /** + * If set then all versions older than the given date + * be removed. The current version will never be removed. + * The default is 7 days + * @example + * // Delete all versions older than 1 day + * const olderThan = new Date(); + * olderThan.setDate(olderThan.getDate() - 1)); + * tbl.cleanupOlderVersions(olderThan); + * + * // Delete all versions except the current version + * tbl.cleanupOlderVersions(new Date()); + */ + cleanupOlderThan: Date; +} + /** * A Table is a collection of Records in a LanceDB Database. * @@ -352,6 +370,48 @@ export class Table { await this.inner.restore(); } + /** + * Optimize the on-disk data and indices for better performance. + * + * Modeled after ``VACUUM`` in PostgreSQL. + * + * Optimization covers three operations: + * + * - Compaction: Merges small files into larger ones + * - Prune: Removes old versions of the dataset + * - Index: Optimizes the indices, adding new data to existing indices + * + * + * Experimental API + * ---------------- + * + * The optimization process is undergoing active development and may change. + * Our goal with these changes is to improve the performance of optimization and + * reduce the complexity. + * + * That being said, it is essential today to run optimize if you want the best + * performance. It should be stable and safe to use in production, but it our + * hope that the API may be simplified (or not even need to be called) in the + * future. + * + * The frequency an application shoudl call optimize is based on the frequency of + * data modifications. If data is frequently added, deleted, or updated then + * optimize should be run frequently. A good rule of thumb is to run optimize if + * you have added or modified 100,000 or more records or run more than 20 data + * modification operations. + */ + async optimize(options?: Partial): Promise { + let cleanupOlderThanMs; + if ( + options?.cleanupOlderThan !== undefined && + options?.cleanupOlderThan !== null + ) { + cleanupOlderThanMs = + new Date().getTime() - options.cleanupOlderThan.getTime(); + } + return await this.inner.optimize(cleanupOlderThanMs); + } + /** List all indices that have been created with {@link Table.createIndex} */ async listIndices(): Promise { return await this.inner.listIndices(); diff --git a/nodejs/src/table.rs b/nodejs/src/table.rs index c1cf7d46..69c4e67d 100644 --- a/nodejs/src/table.rs +++ b/nodejs/src/table.rs @@ -15,8 +15,8 @@ use arrow_ipc::writer::FileWriter; use lancedb::ipc::ipc_file_to_batches; use lancedb::table::{ - AddDataMode, ColumnAlteration as LanceColumnAlteration, NewColumnTransform, - Table as LanceDbTable, + AddDataMode, ColumnAlteration as LanceColumnAlteration, Duration, NewColumnTransform, + OptimizeAction, OptimizeOptions, Table as LanceDbTable, }; use napi::bindgen_prelude::*; use napi_derive::napi; @@ -263,6 +263,49 @@ impl Table { self.inner_ref()?.restore().await.default_error() } + #[napi] + pub async fn optimize(&self, older_than_ms: Option) -> napi::Result { + let inner = self.inner_ref()?; + + let compaction_stats = inner + .optimize(OptimizeAction::Compact { + options: lancedb::table::CompactionOptions::default(), + remap_options: None, + }) + .await + .default_error()? + .compaction + .unwrap(); + let older_than = older_than_ms.map(Duration::milliseconds); + let prune_stats = inner + .optimize(OptimizeAction::Prune { + older_than, + delete_unverified: None, + }) + .await + .default_error()? + .prune + .unwrap(); + inner + .optimize(lancedb::table::OptimizeAction::Index( + OptimizeOptions::default(), + )) + .await + .default_error()?; + Ok(OptimizeStats { + compaction: CompactionStats { + files_added: compaction_stats.files_added as i64, + files_removed: compaction_stats.files_removed as i64, + fragments_added: compaction_stats.fragments_added as i64, + fragments_removed: compaction_stats.fragments_removed as i64, + }, + prune: RemovalStats { + bytes_removed: prune_stats.bytes_removed as i64, + old_versions_removed: prune_stats.old_versions as i64, + }, + }) + } + #[napi] pub async fn list_indices(&self) -> napi::Result> { Ok(self @@ -298,6 +341,40 @@ impl From for IndexConfig { } } +/// Statistics about a compaction operation. +#[napi(object)] +#[derive(Clone, Debug)] +pub struct CompactionStats { + /// The number of fragments removed + pub fragments_removed: i64, + /// The number of new, compacted fragments added + pub fragments_added: i64, + /// The number of data files removed + pub files_removed: i64, + /// The number of new, compacted data files added + pub files_added: i64, +} + +/// Statistics about a cleanup operation +#[napi(object)] +#[derive(Clone, Debug)] +pub struct RemovalStats { + /// The number of bytes removed + pub bytes_removed: i64, + /// The number of old versions removed + pub old_versions_removed: i64, +} + +/// Statistics about an optimize operation +#[napi(object)] +#[derive(Clone, Debug)] +pub struct OptimizeStats { + /// Statistics about the compaction operation + pub compaction: CompactionStats, + /// Statistics about the removal operation + pub prune: RemovalStats, +} + /// A definition of a column alteration. The alteration changes the column at /// `path` to have the new name `name`, to be nullable if `nullable` is true, /// and to have the data type `data_type`. At least one of `rename` or `nullable` diff --git a/python/python/lancedb/_lancedb.pyi b/python/python/lancedb/_lancedb.pyi index 7ee7b7da..0c6bd893 100644 --- a/python/python/lancedb/_lancedb.pyi +++ b/python/python/lancedb/_lancedb.pyi @@ -86,3 +86,17 @@ class VectorQuery: def refine_factor(self, refine_factor: int): ... def nprobes(self, nprobes: int): ... def bypass_vector_index(self): ... + +class CompactionStats: + fragments_removed: int + fragments_added: int + files_removed: int + files_added: int + +class RemovalStats: + bytes_removed: int + old_versions_removed: int + +class OptimizeStats: + compaction: CompactionStats + prune: RemovalStats diff --git a/python/python/lancedb/table.py b/python/python/lancedb/table.py index 02593c8f..1fc9ca4d 100644 --- a/python/python/lancedb/table.py +++ b/python/python/lancedb/table.py @@ -58,7 +58,7 @@ if TYPE_CHECKING: import PIL from lance.dataset import CleanupStats, ReaderLike - from ._lancedb import Table as LanceDBTable + from ._lancedb import Table as LanceDBTable, OptimizeStats from .db import LanceDBConnection from .index import BTree, IndexConfig, IvfPq @@ -2377,6 +2377,49 @@ class AsyncTable: """ await self._inner.restore() + async def optimize( + self, *, cleanup_older_than: Optional[timedelta] = None + ) -> OptimizeStats: + """ + Optimize the on-disk data and indices for better performance. + + Modeled after ``VACUUM`` in PostgreSQL. + + Optimization covers three operations: + + * Compaction: Merges small files into larger ones + * Prune: Removes old versions of the dataset + * Index: Optimizes the indices, adding new data to existing indices + + Parameters + ---------- + cleanup_older_than: timedelta, optional default 7 days + All files belonging to versions older than this will be removed. Set + to 0 days to remove all versions except the latest. The latest version + is never removed. + + Experimental API + ---------------- + + The optimization process is undergoing active development and may change. + Our goal with these changes is to improve the performance of optimization and + reduce the complexity. + + That being said, it is essential today to run optimize if you want the best + performance. It should be stable and safe to use in production, but it our + hope that the API may be simplified (or not even need to be called) in the + future. + + The frequency an application shoudl call optimize is based on the frequency of + data modifications. If data is frequently added, deleted, or updated then + optimize should be run frequently. A good rule of thumb is to run optimize if + you have added or modified 100,000 or more records or run more than 20 data + modification operations. + """ + if cleanup_older_than is not None: + cleanup_older_than = round(cleanup_older_than.total_seconds() * 1000) + return await self._inner.optimize(cleanup_older_than) + async def list_indices(self) -> IndexConfig: """ List all indices that have been created with Self::create_index diff --git a/python/python/tests/test_table.py b/python/python/tests/test_table.py index 518b19e1..e951290d 100644 --- a/python/python/tests/test_table.py +++ b/python/python/tests/test_table.py @@ -1025,3 +1025,29 @@ async def test_time_travel(db_async: AsyncConnection): # Can't use restore if not checked out with pytest.raises(ValueError, match="checkout before running restore"): await table.restore() + + +@pytest.mark.asyncio +async def test_optimize(db_async: AsyncConnection): + table = await db_async.create_table( + "test", + data=[{"x": [1]}], + ) + await table.add( + data=[ + {"x": [2]}, + ], + ) + stats = await table.optimize() + assert stats.compaction.files_removed == 2 + assert stats.compaction.files_added == 1 + assert stats.compaction.fragments_added == 1 + assert stats.compaction.fragments_removed == 2 + assert stats.prune.bytes_removed == 0 + assert stats.prune.old_versions_removed == 0 + + stats = await table.optimize(cleanup_older_than=timedelta(seconds=0)) + assert stats.prune.bytes_removed > 0 + assert stats.prune.old_versions_removed == 3 + + assert await table.query().to_arrow() == pa.table({"x": [[1], [2]]}) diff --git a/python/src/table.rs b/python/src/table.rs index d959100b..7b1fa632 100644 --- a/python/src/table.rs +++ b/python/src/table.rs @@ -2,7 +2,9 @@ use arrow::{ ffi_stream::ArrowArrayStreamReader, pyarrow::{FromPyArrow, ToPyArrow}, }; -use lancedb::table::{AddDataMode, Table as LanceDbTable}; +use lancedb::table::{ + AddDataMode, Duration, OptimizeAction, OptimizeOptions, Table as LanceDbTable, +}; use pyo3::{ exceptions::{PyRuntimeError, PyValueError}, pyclass, pymethods, @@ -17,6 +19,40 @@ use crate::{ query::Query, }; +/// Statistics about a compaction operation. +#[pyclass(get_all)] +#[derive(Clone, Debug)] +pub struct CompactionStats { + /// The number of fragments removed + pub fragments_removed: u64, + /// The number of new, compacted fragments added + pub fragments_added: u64, + /// The number of data files removed + pub files_removed: u64, + /// The number of new, compacted data files added + pub files_added: u64, +} + +/// Statistics about a cleanup operation +#[pyclass(get_all)] +#[derive(Clone, Debug)] +pub struct RemovalStats { + /// The number of bytes removed + pub bytes_removed: u64, + /// The number of old versions removed + pub old_versions_removed: u64, +} + +/// Statistics about an optimize operation +#[pyclass(get_all)] +#[derive(Clone, Debug)] +pub struct OptimizeStats { + /// Statistics about the compaction operation + pub compaction: CompactionStats, + /// Statistics about the removal operation + pub prune: RemovalStats, +} + #[pyclass] pub struct Table { // We keep a copy of the name to use if the inner table is dropped @@ -191,4 +227,47 @@ impl Table { pub fn query(&self) -> Query { Query::new(self.inner_ref().unwrap().query()) } + + pub fn optimize(self_: PyRef<'_, Self>, cleanup_since_ms: Option) -> PyResult<&PyAny> { + let inner = self_.inner_ref()?.clone(); + future_into_py(self_.py(), async move { + let compaction_stats = inner + .optimize(OptimizeAction::Compact { + options: lancedb::table::CompactionOptions::default(), + remap_options: None, + }) + .await + .infer_error()? + .compaction + .unwrap(); + let older_than = cleanup_since_ms.map(|since| Duration::milliseconds(since as i64)); + let prune_stats = inner + .optimize(OptimizeAction::Prune { + older_than, + delete_unverified: None, + }) + .await + .infer_error()? + .prune + .unwrap(); + inner + .optimize(lancedb::table::OptimizeAction::Index( + OptimizeOptions::default(), + )) + .await + .infer_error()?; + Ok(OptimizeStats { + compaction: CompactionStats { + files_added: compaction_stats.files_added as u64, + files_removed: compaction_stats.files_removed as u64, + fragments_added: compaction_stats.fragments_added as u64, + fragments_removed: compaction_stats.fragments_removed as u64, + }, + prune: RemovalStats { + bytes_removed: prune_stats.bytes_removed, + old_versions_removed: prune_stats.old_versions, + }, + }) + }) + } } diff --git a/rust/ffi/node/src/table.rs b/rust/ffi/node/src/table.rs index cf4c4322..1555526b 100644 --- a/rust/ffi/node/src/table.rs +++ b/rust/ffi/node/src/table.rs @@ -324,7 +324,7 @@ impl JsTable { rt.spawn(async move { let stats = table .optimize(OptimizeAction::Prune { - older_than, + older_than: Some(older_than), delete_unverified, }) .await; diff --git a/rust/lancedb/src/table.rs b/rust/lancedb/src/table.rs index 3e45aeff..c7b56197 100644 --- a/rust/lancedb/src/table.rs +++ b/rust/lancedb/src/table.rs @@ -23,12 +23,9 @@ use arrow::datatypes::Float32Type; use arrow_array::{RecordBatchIterator, RecordBatchReader}; use arrow_schema::{DataType, Field, Schema, SchemaRef}; use async_trait::async_trait; -use chrono::Duration; use lance::dataset::builder::DatasetBuilder; use lance::dataset::cleanup::RemovalStats; -use lance::dataset::optimize::{ - compact_files, CompactionMetrics, CompactionOptions, IndexRemapperOptions, -}; +use lance::dataset::optimize::{compact_files, CompactionMetrics, IndexRemapperOptions}; use lance::dataset::scanner::{DatasetRecordBatchStream, Scanner}; pub use lance::dataset::ColumnAlteration; pub use lance::dataset::NewColumnTransform; @@ -41,8 +38,8 @@ use lance::io::WrappingObjectStore; use lance_index::vector::hnsw::builder::HnswBuildParams; use lance_index::vector::ivf::IvfBuildParams; use lance_index::vector::sq::builder::SQBuildParams; +use lance_index::DatasetIndexExt; use lance_index::IndexType; -use lance_index::{optimize::OptimizeOptions, DatasetIndexExt}; use log::info; use serde::{Deserialize, Serialize}; use snafu::whatever; @@ -70,6 +67,10 @@ use self::merge::MergeInsertBuilder; pub(crate) mod dataset; pub mod merge; +pub use chrono::Duration; +pub use lance::dataset::optimize::CompactionOptions; +pub use lance_index::optimize::OptimizeOptions; + /// Defines the type of column #[derive(Debug, Clone, Serialize, Deserialize)] pub enum ColumnKind { @@ -150,22 +151,58 @@ impl TableDefinition { /// /// By default, it optimizes everything, as [`OptimizeAction::All`]. pub enum OptimizeAction { - /// Run optimization on every, with default options. + /// Run all optimizations with default values All, - /// Compact files in the dataset + /// Compacts files in the dataset + /// + /// LanceDb uses a readonly filesystem for performance and safe concurrency. Every time + /// new data is added it will be added into new files. Small files + /// can hurt both read and write performance. Compaction will merge small files + /// into larger ones. + /// + /// All operations that modify data (add, delete, update, merge insert, etc.) will create + /// new files. If these operations are run frequently then compaction should run frequently. + /// + /// If these operations are never run (search only) then compaction is not necessary. Compact { options: CompactionOptions, remap_options: Option>, }, - /// Prune old version of datasets. + /// Prune old version of datasets + /// + /// Every change in LanceDb is additive. When data is removed from a dataset a new version is + /// created that doesn't contain the removed data. However, the old version, which does contain + /// the removed data, is left in place. This is necessary for consistency and concurrency and + /// also enables time travel functionality like the ability to checkout an older version of the + /// dataset to undo changes. + /// + /// Over time, these old versions can consume a lot of disk space. The prune operation will + /// remove versions of the dataset that are older than a certain age. This will free up the + /// space used by that old data. + /// + /// Once a version is pruned it can no longer be checked out. Prune { /// The duration of time to keep versions of the dataset. - older_than: Duration, + older_than: Option, /// Because they may be part of an in-progress transaction, files newer than 7 days old are not deleted by default. /// If you are sure that there are no in-progress transactions, then you can set this to True to delete all files older than `older_than`. delete_unverified: Option, }, - /// Optimize index. + /// Optimize the indices + /// + /// This operation optimizes all indices in the table. When new data is added to LanceDb + /// it is not added to the indices. However, it can still turn up in searches because the search + /// function will scan both the indexed data and the unindexed data in parallel. Over time, the + /// unindexed data can become large enough that the search performance is slow. This operation + /// will add the unindexed data to the indices without rerunning the full index creation process. + /// + /// Optimizing an index is faster than re-training the index but it does not typically adjust the + /// underlying model relied upon by the index. This can eventually lead to poor search accuracy + /// and so users may still want to occasionally retrain the index after adding a large amount of + /// data. + /// + /// For example, when using IVF, an index will create clusters. Optimizing an index assigns unindexed + /// data to the existing clusters, but it does not move the clusters or create new clusters. Index(OptimizeOptions), } @@ -757,10 +794,30 @@ impl Table { /// Optimize the on-disk data and indices for better performance. /// + /// Modeled after ``VACUUM`` in PostgreSQL. + /// + /// Optimization is discussed in more detail in the [OptimizeAction] documentation + /// and covers three operations: + /// + /// * Compaction: Merges small files into larger ones + /// * Prune: Removes old versions of the dataset + /// * Index: Optimizes the indices, adding new data to existing indices + /// ///
Experimental API
/// - /// Modeled after ``VACUUM`` in PostgreSQL. - /// Not all implementations support explicit optimization. + /// The optimization process is undergoing active development and may change. + /// Our goal with these changes is to improve the performance of optimization and + /// reduce the complexity. + /// + /// That being said, it is essential today to run optimize if you want the best + /// performance. It should be stable and safe to use in production, but it our + /// hope that the API may be simplified (or not even need to be called) in the future. + /// + /// The frequency an application shoudl call optimize is based on the frequency of + /// data modifications. If data is frequently added, deleted, or updated then + /// optimize should be run frequently. A good rule of thumb is to run optimize if + /// you have added or modified 100,000 or more records or run more than 20 data + /// modification operations. pub async fn optimize(&self, action: OptimizeAction) -> Result { self.inner.optimize(action).await } @@ -1654,7 +1711,7 @@ impl TableInternal for NativeTable { .compaction; stats.prune = self .optimize(OptimizeAction::Prune { - older_than: Duration::try_days(7).unwrap(), + older_than: None, delete_unverified: None, }) .await? @@ -1673,8 +1730,11 @@ impl TableInternal for NativeTable { delete_unverified, } => { stats.prune = Some( - self.cleanup_old_versions(older_than, delete_unverified) - .await?, + self.cleanup_old_versions( + older_than.unwrap_or(Duration::days(7)), + delete_unverified, + ) + .await?, ); } OptimizeAction::Index(options) => {