feat: add the optimize function to nodejs and async python (#1257)

The optimize function is pretty crucial for getting good performance
when building a large scale dataset but it was only exposed in rust
(many sync python users are probably doing this via to_lance today)

This PR adds the optimize function to nodejs and to python.

I left the function marked experimental because I think there will
likely be changes to optimization (e.g. if we add features like
"optimize on write"). I also only exposed the `cleanup_older_than`
configuration parameter since this one is very commonly used and the
rest have sensible defaults and we don't really know why we would
recommend different values for these defaults anyways.
This commit is contained in:
Weston Pace
2024-05-20 07:09:31 -07:00
committed by GitHub
parent 5349e8b1db
commit 4f512af024
9 changed files with 407 additions and 20 deletions

View File

@@ -419,3 +419,31 @@ describe("when dealing with versioning", () => {
);
});
});
describe("when optimizing a dataset", () => {
let tmpDir: tmp.DirResult;
let table: Table;
beforeEach(async () => {
tmpDir = tmp.dirSync({ unsafeCleanup: true });
const con = await connect(tmpDir.name);
table = await con.createTable("vectors", [{ id: 1 }]);
await table.add([{ id: 2 }]);
});
afterEach(() => {
tmpDir.removeCallback();
});
it("compacts files", async () => {
const stats = await table.optimize();
expect(stats.compaction.filesAdded).toBe(1);
expect(stats.compaction.filesRemoved).toBe(2);
expect(stats.compaction.fragmentsAdded).toBe(1);
expect(stats.compaction.fragmentsRemoved).toBe(2);
});
it("cleanups old versions", async () => {
const stats = await table.optimize({ cleanupOlderThan: new Date() });
expect(stats.prune.bytesRemoved).toBeGreaterThan(0);
expect(stats.prune.oldVersionsRemoved).toBe(3);
});
});

View File

@@ -19,6 +19,7 @@ import {
AddColumnsSql,
ColumnAlteration,
IndexConfig,
OptimizeStats,
Table as _NativeTable,
} from "./native";
import { Query, VectorQuery } from "./query";
@@ -50,6 +51,23 @@ export interface UpdateOptions {
where: string;
}
export interface OptimizeOptions {
/**
* If set then all versions older than the given date
* be removed. The current version will never be removed.
* The default is 7 days
* @example
* // Delete all versions older than 1 day
* const olderThan = new Date();
* olderThan.setDate(olderThan.getDate() - 1));
* tbl.cleanupOlderVersions(olderThan);
*
* // Delete all versions except the current version
* tbl.cleanupOlderVersions(new Date());
*/
cleanupOlderThan: Date;
}
/**
* A Table is a collection of Records in a LanceDB Database.
*
@@ -352,6 +370,48 @@ export class Table {
await this.inner.restore();
}
/**
* Optimize the on-disk data and indices for better performance.
*
* Modeled after ``VACUUM`` in PostgreSQL.
*
* Optimization covers three operations:
*
* - Compaction: Merges small files into larger ones
* - Prune: Removes old versions of the dataset
* - Index: Optimizes the indices, adding new data to existing indices
*
*
* Experimental API
* ----------------
*
* The optimization process is undergoing active development and may change.
* Our goal with these changes is to improve the performance of optimization and
* reduce the complexity.
*
* That being said, it is essential today to run optimize if you want the best
* performance. It should be stable and safe to use in production, but it our
* hope that the API may be simplified (or not even need to be called) in the
* future.
*
* The frequency an application shoudl call optimize is based on the frequency of
* data modifications. If data is frequently added, deleted, or updated then
* optimize should be run frequently. A good rule of thumb is to run optimize if
* you have added or modified 100,000 or more records or run more than 20 data
* modification operations.
*/
async optimize(options?: Partial<OptimizeOptions>): Promise<OptimizeStats> {
let cleanupOlderThanMs;
if (
options?.cleanupOlderThan !== undefined &&
options?.cleanupOlderThan !== null
) {
cleanupOlderThanMs =
new Date().getTime() - options.cleanupOlderThan.getTime();
}
return await this.inner.optimize(cleanupOlderThanMs);
}
/** List all indices that have been created with {@link Table.createIndex} */
async listIndices(): Promise<IndexConfig[]> {
return await this.inner.listIndices();

View File

@@ -15,8 +15,8 @@
use arrow_ipc::writer::FileWriter;
use lancedb::ipc::ipc_file_to_batches;
use lancedb::table::{
AddDataMode, ColumnAlteration as LanceColumnAlteration, NewColumnTransform,
Table as LanceDbTable,
AddDataMode, ColumnAlteration as LanceColumnAlteration, Duration, NewColumnTransform,
OptimizeAction, OptimizeOptions, Table as LanceDbTable,
};
use napi::bindgen_prelude::*;
use napi_derive::napi;
@@ -263,6 +263,49 @@ impl Table {
self.inner_ref()?.restore().await.default_error()
}
#[napi]
pub async fn optimize(&self, older_than_ms: Option<i64>) -> napi::Result<OptimizeStats> {
let inner = self.inner_ref()?;
let compaction_stats = inner
.optimize(OptimizeAction::Compact {
options: lancedb::table::CompactionOptions::default(),
remap_options: None,
})
.await
.default_error()?
.compaction
.unwrap();
let older_than = older_than_ms.map(Duration::milliseconds);
let prune_stats = inner
.optimize(OptimizeAction::Prune {
older_than,
delete_unverified: None,
})
.await
.default_error()?
.prune
.unwrap();
inner
.optimize(lancedb::table::OptimizeAction::Index(
OptimizeOptions::default(),
))
.await
.default_error()?;
Ok(OptimizeStats {
compaction: CompactionStats {
files_added: compaction_stats.files_added as i64,
files_removed: compaction_stats.files_removed as i64,
fragments_added: compaction_stats.fragments_added as i64,
fragments_removed: compaction_stats.fragments_removed as i64,
},
prune: RemovalStats {
bytes_removed: prune_stats.bytes_removed as i64,
old_versions_removed: prune_stats.old_versions as i64,
},
})
}
#[napi]
pub async fn list_indices(&self) -> napi::Result<Vec<IndexConfig>> {
Ok(self
@@ -298,6 +341,40 @@ impl From<lancedb::index::IndexConfig> for IndexConfig {
}
}
/// Statistics about a compaction operation.
#[napi(object)]
#[derive(Clone, Debug)]
pub struct CompactionStats {
/// The number of fragments removed
pub fragments_removed: i64,
/// The number of new, compacted fragments added
pub fragments_added: i64,
/// The number of data files removed
pub files_removed: i64,
/// The number of new, compacted data files added
pub files_added: i64,
}
/// Statistics about a cleanup operation
#[napi(object)]
#[derive(Clone, Debug)]
pub struct RemovalStats {
/// The number of bytes removed
pub bytes_removed: i64,
/// The number of old versions removed
pub old_versions_removed: i64,
}
/// Statistics about an optimize operation
#[napi(object)]
#[derive(Clone, Debug)]
pub struct OptimizeStats {
/// Statistics about the compaction operation
pub compaction: CompactionStats,
/// Statistics about the removal operation
pub prune: RemovalStats,
}
/// A definition of a column alteration. The alteration changes the column at
/// `path` to have the new name `name`, to be nullable if `nullable` is true,
/// and to have the data type `data_type`. At least one of `rename` or `nullable`

View File

@@ -86,3 +86,17 @@ class VectorQuery:
def refine_factor(self, refine_factor: int): ...
def nprobes(self, nprobes: int): ...
def bypass_vector_index(self): ...
class CompactionStats:
fragments_removed: int
fragments_added: int
files_removed: int
files_added: int
class RemovalStats:
bytes_removed: int
old_versions_removed: int
class OptimizeStats:
compaction: CompactionStats
prune: RemovalStats

View File

@@ -58,7 +58,7 @@ if TYPE_CHECKING:
import PIL
from lance.dataset import CleanupStats, ReaderLike
from ._lancedb import Table as LanceDBTable
from ._lancedb import Table as LanceDBTable, OptimizeStats
from .db import LanceDBConnection
from .index import BTree, IndexConfig, IvfPq
@@ -2377,6 +2377,49 @@ class AsyncTable:
"""
await self._inner.restore()
async def optimize(
self, *, cleanup_older_than: Optional[timedelta] = None
) -> OptimizeStats:
"""
Optimize the on-disk data and indices for better performance.
Modeled after ``VACUUM`` in PostgreSQL.
Optimization covers three operations:
* Compaction: Merges small files into larger ones
* Prune: Removes old versions of the dataset
* Index: Optimizes the indices, adding new data to existing indices
Parameters
----------
cleanup_older_than: timedelta, optional default 7 days
All files belonging to versions older than this will be removed. Set
to 0 days to remove all versions except the latest. The latest version
is never removed.
Experimental API
----------------
The optimization process is undergoing active development and may change.
Our goal with these changes is to improve the performance of optimization and
reduce the complexity.
That being said, it is essential today to run optimize if you want the best
performance. It should be stable and safe to use in production, but it our
hope that the API may be simplified (or not even need to be called) in the
future.
The frequency an application shoudl call optimize is based on the frequency of
data modifications. If data is frequently added, deleted, or updated then
optimize should be run frequently. A good rule of thumb is to run optimize if
you have added or modified 100,000 or more records or run more than 20 data
modification operations.
"""
if cleanup_older_than is not None:
cleanup_older_than = round(cleanup_older_than.total_seconds() * 1000)
return await self._inner.optimize(cleanup_older_than)
async def list_indices(self) -> IndexConfig:
"""
List all indices that have been created with Self::create_index

View File

@@ -1025,3 +1025,29 @@ async def test_time_travel(db_async: AsyncConnection):
# Can't use restore if not checked out
with pytest.raises(ValueError, match="checkout before running restore"):
await table.restore()
@pytest.mark.asyncio
async def test_optimize(db_async: AsyncConnection):
table = await db_async.create_table(
"test",
data=[{"x": [1]}],
)
await table.add(
data=[
{"x": [2]},
],
)
stats = await table.optimize()
assert stats.compaction.files_removed == 2
assert stats.compaction.files_added == 1
assert stats.compaction.fragments_added == 1
assert stats.compaction.fragments_removed == 2
assert stats.prune.bytes_removed == 0
assert stats.prune.old_versions_removed == 0
stats = await table.optimize(cleanup_older_than=timedelta(seconds=0))
assert stats.prune.bytes_removed > 0
assert stats.prune.old_versions_removed == 3
assert await table.query().to_arrow() == pa.table({"x": [[1], [2]]})

View File

@@ -2,7 +2,9 @@ use arrow::{
ffi_stream::ArrowArrayStreamReader,
pyarrow::{FromPyArrow, ToPyArrow},
};
use lancedb::table::{AddDataMode, Table as LanceDbTable};
use lancedb::table::{
AddDataMode, Duration, OptimizeAction, OptimizeOptions, Table as LanceDbTable,
};
use pyo3::{
exceptions::{PyRuntimeError, PyValueError},
pyclass, pymethods,
@@ -17,6 +19,40 @@ use crate::{
query::Query,
};
/// Statistics about a compaction operation.
#[pyclass(get_all)]
#[derive(Clone, Debug)]
pub struct CompactionStats {
/// The number of fragments removed
pub fragments_removed: u64,
/// The number of new, compacted fragments added
pub fragments_added: u64,
/// The number of data files removed
pub files_removed: u64,
/// The number of new, compacted data files added
pub files_added: u64,
}
/// Statistics about a cleanup operation
#[pyclass(get_all)]
#[derive(Clone, Debug)]
pub struct RemovalStats {
/// The number of bytes removed
pub bytes_removed: u64,
/// The number of old versions removed
pub old_versions_removed: u64,
}
/// Statistics about an optimize operation
#[pyclass(get_all)]
#[derive(Clone, Debug)]
pub struct OptimizeStats {
/// Statistics about the compaction operation
pub compaction: CompactionStats,
/// Statistics about the removal operation
pub prune: RemovalStats,
}
#[pyclass]
pub struct Table {
// We keep a copy of the name to use if the inner table is dropped
@@ -191,4 +227,47 @@ impl Table {
pub fn query(&self) -> Query {
Query::new(self.inner_ref().unwrap().query())
}
pub fn optimize(self_: PyRef<'_, Self>, cleanup_since_ms: Option<u64>) -> PyResult<&PyAny> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
let compaction_stats = inner
.optimize(OptimizeAction::Compact {
options: lancedb::table::CompactionOptions::default(),
remap_options: None,
})
.await
.infer_error()?
.compaction
.unwrap();
let older_than = cleanup_since_ms.map(|since| Duration::milliseconds(since as i64));
let prune_stats = inner
.optimize(OptimizeAction::Prune {
older_than,
delete_unverified: None,
})
.await
.infer_error()?
.prune
.unwrap();
inner
.optimize(lancedb::table::OptimizeAction::Index(
OptimizeOptions::default(),
))
.await
.infer_error()?;
Ok(OptimizeStats {
compaction: CompactionStats {
files_added: compaction_stats.files_added as u64,
files_removed: compaction_stats.files_removed as u64,
fragments_added: compaction_stats.fragments_added as u64,
fragments_removed: compaction_stats.fragments_removed as u64,
},
prune: RemovalStats {
bytes_removed: prune_stats.bytes_removed,
old_versions_removed: prune_stats.old_versions,
},
})
})
}
}

View File

@@ -324,7 +324,7 @@ impl JsTable {
rt.spawn(async move {
let stats = table
.optimize(OptimizeAction::Prune {
older_than,
older_than: Some(older_than),
delete_unverified,
})
.await;

View File

@@ -23,12 +23,9 @@ use arrow::datatypes::Float32Type;
use arrow_array::{RecordBatchIterator, RecordBatchReader};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use async_trait::async_trait;
use chrono::Duration;
use lance::dataset::builder::DatasetBuilder;
use lance::dataset::cleanup::RemovalStats;
use lance::dataset::optimize::{
compact_files, CompactionMetrics, CompactionOptions, IndexRemapperOptions,
};
use lance::dataset::optimize::{compact_files, CompactionMetrics, IndexRemapperOptions};
use lance::dataset::scanner::{DatasetRecordBatchStream, Scanner};
pub use lance::dataset::ColumnAlteration;
pub use lance::dataset::NewColumnTransform;
@@ -41,8 +38,8 @@ use lance::io::WrappingObjectStore;
use lance_index::vector::hnsw::builder::HnswBuildParams;
use lance_index::vector::ivf::IvfBuildParams;
use lance_index::vector::sq::builder::SQBuildParams;
use lance_index::DatasetIndexExt;
use lance_index::IndexType;
use lance_index::{optimize::OptimizeOptions, DatasetIndexExt};
use log::info;
use serde::{Deserialize, Serialize};
use snafu::whatever;
@@ -70,6 +67,10 @@ use self::merge::MergeInsertBuilder;
pub(crate) mod dataset;
pub mod merge;
pub use chrono::Duration;
pub use lance::dataset::optimize::CompactionOptions;
pub use lance_index::optimize::OptimizeOptions;
/// Defines the type of column
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ColumnKind {
@@ -150,22 +151,58 @@ impl TableDefinition {
///
/// By default, it optimizes everything, as [`OptimizeAction::All`].
pub enum OptimizeAction {
/// Run optimization on every, with default options.
/// Run all optimizations with default values
All,
/// Compact files in the dataset
/// Compacts files in the dataset
///
/// LanceDb uses a readonly filesystem for performance and safe concurrency. Every time
/// new data is added it will be added into new files. Small files
/// can hurt both read and write performance. Compaction will merge small files
/// into larger ones.
///
/// All operations that modify data (add, delete, update, merge insert, etc.) will create
/// new files. If these operations are run frequently then compaction should run frequently.
///
/// If these operations are never run (search only) then compaction is not necessary.
Compact {
options: CompactionOptions,
remap_options: Option<Arc<dyn IndexRemapperOptions>>,
},
/// Prune old version of datasets.
/// Prune old version of datasets
///
/// Every change in LanceDb is additive. When data is removed from a dataset a new version is
/// created that doesn't contain the removed data. However, the old version, which does contain
/// the removed data, is left in place. This is necessary for consistency and concurrency and
/// also enables time travel functionality like the ability to checkout an older version of the
/// dataset to undo changes.
///
/// Over time, these old versions can consume a lot of disk space. The prune operation will
/// remove versions of the dataset that are older than a certain age. This will free up the
/// space used by that old data.
///
/// Once a version is pruned it can no longer be checked out.
Prune {
/// The duration of time to keep versions of the dataset.
older_than: Duration,
older_than: Option<Duration>,
/// Because they may be part of an in-progress transaction, files newer than 7 days old are not deleted by default.
/// If you are sure that there are no in-progress transactions, then you can set this to True to delete all files older than `older_than`.
delete_unverified: Option<bool>,
},
/// Optimize index.
/// Optimize the indices
///
/// This operation optimizes all indices in the table. When new data is added to LanceDb
/// it is not added to the indices. However, it can still turn up in searches because the search
/// function will scan both the indexed data and the unindexed data in parallel. Over time, the
/// unindexed data can become large enough that the search performance is slow. This operation
/// will add the unindexed data to the indices without rerunning the full index creation process.
///
/// Optimizing an index is faster than re-training the index but it does not typically adjust the
/// underlying model relied upon by the index. This can eventually lead to poor search accuracy
/// and so users may still want to occasionally retrain the index after adding a large amount of
/// data.
///
/// For example, when using IVF, an index will create clusters. Optimizing an index assigns unindexed
/// data to the existing clusters, but it does not move the clusters or create new clusters.
Index(OptimizeOptions),
}
@@ -757,10 +794,30 @@ impl Table {
/// Optimize the on-disk data and indices for better performance.
///
/// Modeled after ``VACUUM`` in PostgreSQL.
///
/// Optimization is discussed in more detail in the [OptimizeAction] documentation
/// and covers three operations:
///
/// * Compaction: Merges small files into larger ones
/// * Prune: Removes old versions of the dataset
/// * Index: Optimizes the indices, adding new data to existing indices
///
/// <section class="warning">Experimental API</section>
///
/// Modeled after ``VACUUM`` in PostgreSQL.
/// Not all implementations support explicit optimization.
/// The optimization process is undergoing active development and may change.
/// Our goal with these changes is to improve the performance of optimization and
/// reduce the complexity.
///
/// That being said, it is essential today to run optimize if you want the best
/// performance. It should be stable and safe to use in production, but it our
/// hope that the API may be simplified (or not even need to be called) in the future.
///
/// The frequency an application shoudl call optimize is based on the frequency of
/// data modifications. If data is frequently added, deleted, or updated then
/// optimize should be run frequently. A good rule of thumb is to run optimize if
/// you have added or modified 100,000 or more records or run more than 20 data
/// modification operations.
pub async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats> {
self.inner.optimize(action).await
}
@@ -1654,7 +1711,7 @@ impl TableInternal for NativeTable {
.compaction;
stats.prune = self
.optimize(OptimizeAction::Prune {
older_than: Duration::try_days(7).unwrap(),
older_than: None,
delete_unverified: None,
})
.await?
@@ -1673,8 +1730,11 @@ impl TableInternal for NativeTable {
delete_unverified,
} => {
stats.prune = Some(
self.cleanup_old_versions(older_than, delete_unverified)
.await?,
self.cleanup_old_versions(
older_than.unwrap_or(Duration::days(7)),
delete_unverified,
)
.await?,
);
}
OptimizeAction::Index(options) => {