From db7bdefe7713a63a69e0ec4d55ad7aa5933c0782 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 11 Oct 2023 12:49:12 -0700 Subject: [PATCH] feat: cleanup and compaction (#518) #488 --- Cargo.toml | 1 + node/src/index.ts | 107 ++++++++++++++++++++++++- node/src/test/test.ts | 44 ++++++++++- python/lancedb/table.py | 56 ++++++++++++- python/tests/test_table.py | 29 +++++++ rust/ffi/node/Cargo.toml | 1 + rust/ffi/node/src/index/vector.rs | 8 +- rust/ffi/node/src/lib.rs | 2 + rust/ffi/node/src/table.rs | 113 +++++++++++++++++++++++++++ rust/vectordb/Cargo.toml | 1 + rust/vectordb/src/io/object_store.rs | 1 - rust/vectordb/src/table.rs | 38 +++++++++ 12 files changed, 394 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3c325441..7f5e198a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ arrow-ord = "43.0" arrow-schema = "43.0" arrow-arith = "43.0" arrow-cast = "43.0" +chrono = "0.4.23" half = { "version" = "=2.2.1", default-features = false, features = [ "num-traits" ] } diff --git a/node/src/index.ts b/node/src/index.ts index 30dff810..35085de0 100644 --- a/node/src/index.ts +++ b/node/src/index.ts @@ -23,7 +23,7 @@ import { Query } from './query' import { isEmbeddingFunction } from './embedding/embedding_function' // eslint-disable-next-line @typescript-eslint/no-var-requires -const { databaseNew, databaseTableNames, databaseOpenTable, databaseDropTable, tableCreate, tableAdd, tableCreateVectorIndex, tableCountRows, tableDelete } = require('../native.js') +const { databaseNew, databaseTableNames, databaseOpenTable, databaseDropTable, tableCreate, tableAdd, tableCreateVectorIndex, tableCountRows, tableDelete, tableCleanupOldVersions, tableCompactFiles } = require('../native.js') export { Query } export type { EmbeddingFunction } @@ -459,6 +459,111 @@ export class LocalTable implements Table { async delete (filter: string): Promise { return tableDelete.call(this._tbl, filter).then((newTable: any) => { this._tbl = newTable }) } + + /** + * Clean up old versions of the table, freeing disk space. + * + * @param olderThan The minimum age in minutes of the versions to delete. If not + * provided, defaults to two weeks. + * @param deleteUnverified Because they may be part of an in-progress + * transaction, uncommitted files newer than 7 days old are + * not deleted by default. This means that failed transactions + * can leave around data that takes up disk space for up to + * 7 days. You can override this safety mechanism by setting + * this option to `true`, only if you promise there are no + * in progress writes while you run this operation. Failure to + * uphold this promise can lead to corrupted tables. + * @returns + */ + async cleanupOldVersions (olderThan?: number, deleteUnverified?: boolean): Promise { + return tableCleanupOldVersions.call(this._tbl, olderThan, deleteUnverified) + .then((res: { newTable: any, metrics: CleanupStats }) => { + this._tbl = res.newTable + return res.metrics + }) + } + + /** + * Run the compaction process on the table. + * + * This can be run after making several small appends to optimize the table + * for faster reads. + * + * @param options Advanced options configuring compaction. In most cases, you + * can omit this arguments, as the default options are sensible + * for most tables. + * @returns Metrics about the compaction operation. + */ + async compactFiles (options?: CompactionOptions): Promise { + const optionsArg = options ?? {} + return tableCompactFiles.call(this._tbl, optionsArg) + .then((res: { newTable: any, metrics: CompactionMetrics }) => { + this._tbl = res.newTable + return res.metrics + }) + } +} + +export interface CleanupStats { + /** + * The number of bytes removed from disk. + */ + bytesRemoved: number + /** + * The number of old table versions removed. + */ + oldVersions: number +} + +export interface CompactionOptions { + /** + * The number of rows per fragment to target. Fragments that have fewer rows + * will be compacted into adjacent fragments to produce larger fragments. + * Defaults to 1024 * 1024. + */ + targetRowsPerFragment?: number + /** + * The maximum number of rows per group. Defaults to 1024. + */ + maxRowsPerGroup?: number + /** + * If true, fragments that have rows that are deleted may be compacted to + * remove the deleted rows. This can improve the performance of queries. + * Default is true. + */ + materializeDeletions?: boolean + /** + * A number between 0 and 1, representing the proportion of rows that must be + * marked deleted before a fragment is a candidate for compaction to remove + * the deleted rows. Default is 10%. + */ + materializeDeletionsThreshold?: number + /** + * The number of threads to use for compaction. If not provided, defaults to + * the number of cores on the machine. + */ + numThreads?: number +} + +export interface CompactionMetrics { + /** + * The number of fragments that were removed. + */ + fragmentsRemoved: number + /** + * The number of new fragments that were created. + */ + fragmentsAdded: number + /** + * The number of files that were removed. Each fragment may have more than one + * file. + */ + filesRemoved: number + /** + * The number of files added. This is typically equal to the number of + * fragments added. + */ + filesAdded: number } /// Config to build IVF_PQ index. diff --git a/node/src/test/test.ts b/node/src/test/test.ts index c0d43543..28830589 100644 --- a/node/src/test/test.ts +++ b/node/src/test/test.ts @@ -18,7 +18,7 @@ import * as chai from 'chai' import * as chaiAsPromised from 'chai-as-promised' import * as lancedb from '../index' -import { type AwsCredentials, type EmbeddingFunction, MetricType, Query, WriteMode, DefaultWriteOptions, isWriteOptions } from '../index' +import { type AwsCredentials, type EmbeddingFunction, MetricType, Query, WriteMode, DefaultWriteOptions, isWriteOptions, type LocalTable } from '../index' import { FixedSizeList, Field, Int32, makeVector, Schema, Utf8, Table as ArrowTable, vectorFromArray, Float32 } from 'apache-arrow' const expect = chai.expect @@ -446,3 +446,45 @@ describe('WriteOptions', function () { }) }) }) + +describe('Compact and cleanup', function () { + it('can cleanup after compaction', async function () { + const dir = await track().mkdir('lancejs') + const con = await lancedb.connect(dir) + + const data = [ + { price: 10, name: 'foo', vector: [1, 2, 3] }, + { price: 50, name: 'bar', vector: [4, 5, 6] } + ] + const table = await con.createTable('t1', data) as LocalTable + + const newData = [ + { price: 30, name: 'baz', vector: [7, 8, 9] } + ] + await table.add(newData) + + const compactionMetrics = await table.compactFiles({ + numThreads: 2 + }) + assert.equal(compactionMetrics.fragmentsRemoved, 2) + assert.equal(compactionMetrics.fragmentsAdded, 1) + assert.equal(await table.countRows(), 3) + + await table.cleanupOldVersions() + assert.equal(await table.countRows(), 3) + + // should have no effect, but this validates the arguments are parsed. + await table.compactFiles({ + targetRowsPerFragment: 1024 * 10, + maxRowsPerGroup: 1024, + materializeDeletions: true, + materializeDeletionsThreshold: 0.5, + numThreads: 2 + }) + + const cleanupMetrics = await table.cleanupOldVersions(0, true) + assert.isAtLeast(cleanupMetrics.bytesRemoved, 1) + assert.isAtLeast(cleanupMetrics.oldVersions, 1) + assert.equal(await table.countRows(), 3) + }) +}) diff --git a/python/lancedb/table.py b/python/lancedb/table.py index 03da59ae..a20cdb28 100644 --- a/python/lancedb/table.py +++ b/python/lancedb/table.py @@ -16,6 +16,7 @@ from __future__ import annotations import inspect import os from abc import ABC, abstractmethod +from datetime import timedelta from functools import cached_property from typing import Any, Iterable, List, Optional, Union @@ -24,7 +25,7 @@ import numpy as np import pyarrow as pa import pyarrow.compute as pc from lance import LanceDataset -from lance.dataset import ReaderLike +from lance.dataset import CleanupStats, ReaderLike from lance.vector import vec_to_table from .common import DATA, VEC, VECTOR_COLUMN_NAME @@ -395,6 +396,17 @@ class LanceTable(Table): raise ValueError(f"Invalid version {version}") self._reset_dataset(version=version) + try: + # Accessing the property updates the cached value + _ = self._dataset + except Exception as e: + if "not found" in str(e): + raise ValueError( + f"Version {version} no longer exists. Was it cleaned up?" + ) + else: + raise e + def restore(self, version: int = None): """Restore a version of the table. This is an in-place operation. @@ -878,6 +890,48 @@ class LanceTable(Table): }, ) + def cleanup_old_versions( + self, + older_than: Optional[timedelta] = None, + *, + delete_unverified: bool = False, + ) -> CleanupStats: + """ + Clean up old versions of the table, freeing disk space. + + Parameters + ---------- + older_than: timedelta, default None + The minimum age of the version to delete. If None, then this defaults + to two weeks. + delete_unverified: bool, default False + Because they may be part of an in-progress transaction, files newer + than 7 days old are not deleted by default. If you are sure that + there are no in-progress transactions, then you can set this to True + to delete all files older than `older_than`. + + Returns + ------- + CleanupStats + The stats of the cleanup operation, including how many bytes were + freed. + """ + return self.to_lance().cleanup_old_versions( + older_than, delete_unverified=delete_unverified + ) + + def compact_files(self, *args, **kwargs): + """ + Run the compaction process on the table. + + This can be run after making several small appends to optimize the table + for faster reads. + + Arguments are passed onto :meth:`lance.dataset.DatasetOptimizer.compact_files`. + For most cases, the default should be fine. + """ + return self.to_lance().optimize.compact_files(*args, **kwargs) + def _sanitize_schema( data: pa.Table, diff --git a/python/tests/test_table.py b/python/tests/test_table.py index 0c001bda..0024bf25 100644 --- a/python/tests/test_table.py +++ b/python/tests/test_table.py @@ -12,6 +12,7 @@ # limitations under the License. import functools +from datetime import timedelta from pathlib import Path from typing import List from unittest.mock import PropertyMock, patch @@ -442,3 +443,31 @@ def test_empty_query(db): df = table.search().select(["id"]).where("text='bar'").limit(1).to_pandas() val = df.id.iloc[0] assert val == 1 + + +def test_compact_cleanup(db): + table = LanceTable.create( + db, + "my_table", + data=[{"text": "foo", "id": 0}, {"text": "bar", "id": 1}], + ) + + table.add([{"text": "baz", "id": 2}]) + assert len(table) == 3 + assert table.version == 3 + + stats = table.compact_files() + assert len(table) == 3 + assert table.version == 4 + assert stats.fragments_removed > 0 + assert stats.fragments_added == 1 + + stats = table.cleanup_old_versions() + assert stats.bytes_removed == 0 + + stats = table.cleanup_old_versions(older_than=timedelta(0), delete_unverified=True) + assert stats.bytes_removed > 0 + assert table.version == 4 + + with pytest.raises(Exception, match="Version 3 no longer exists"): + table.checkout(3) diff --git a/rust/ffi/node/Cargo.toml b/rust/ffi/node/Cargo.toml index d7b1338d..11302291 100644 --- a/rust/ffi/node/Cargo.toml +++ b/rust/ffi/node/Cargo.toml @@ -13,6 +13,7 @@ crate-type = ["cdylib"] arrow-array = { workspace = true } arrow-ipc = { workspace = true } arrow-schema = { workspace = true } +chrono = { workspace = true } conv = "0.3.3" once_cell = "1" futures = "0.3" diff --git a/rust/ffi/node/src/index/vector.rs b/rust/ffi/node/src/index/vector.rs index a41fba98..9412d097 100644 --- a/rust/ffi/node/src/index/vector.rs +++ b/rust/ffi/node/src/index/vector.rs @@ -78,9 +78,11 @@ fn get_index_params_builder( num_partitions.map(|np| { let max_iters = max_iters.unwrap_or(50); - let mut ivf_params = IvfBuildParams::default(); - ivf_params.num_partitions = np; - ivf_params.max_iters = max_iters; + let ivf_params = IvfBuildParams { + num_partitions: np, + max_iters, + ..Default::default() + }; index_builder.ivf_params(ivf_params) }); diff --git a/rust/ffi/node/src/lib.rs b/rust/ffi/node/src/lib.rs index b0139b34..9ef42d13 100644 --- a/rust/ffi/node/src/lib.rs +++ b/rust/ffi/node/src/lib.rs @@ -237,6 +237,8 @@ fn main(mut cx: ModuleContext) -> NeonResult<()> { cx.export_function("tableAdd", JsTable::js_add)?; cx.export_function("tableCountRows", JsTable::js_count_rows)?; cx.export_function("tableDelete", JsTable::js_delete)?; + cx.export_function("tableCleanupOldVersions", JsTable::js_cleanup)?; + cx.export_function("tableCompactFiles", JsTable::js_compact)?; cx.export_function( "tableCreateVectorIndex", index::vector::table_create_vector_index, diff --git a/rust/ffi/node/src/table.rs b/rust/ffi/node/src/table.rs index 0363dc2a..24483927 100644 --- a/rust/ffi/node/src/table.rs +++ b/rust/ffi/node/src/table.rs @@ -13,6 +13,7 @@ // limitations under the License. use arrow_array::RecordBatchIterator; +use lance::dataset::optimize::CompactionOptions; use lance::dataset::{WriteMode, WriteParams}; use lance::io::object_store::ObjectStoreParams; @@ -163,4 +164,116 @@ impl JsTable { }); Ok(promise) } + + pub(crate) fn js_cleanup(mut cx: FunctionContext) -> JsResult { + let js_table = cx.this().downcast_or_throw::, _>(&mut cx)?; + let rt = runtime(&mut cx)?; + let (deferred, promise) = cx.promise(); + let table = js_table.table.clone(); + let channel = cx.channel(); + + let older_than: i64 = cx + .argument_opt(0) + .and_then(|val| val.downcast::(&mut cx).ok()) + .map(|val| val.value(&mut cx) as i64) + .unwrap_or_else(|| 2 * 7 * 24 * 60); // 2 weeks + let older_than = chrono::Duration::minutes(older_than); + let delete_unverified: bool = cx + .argument_opt(1) + .and_then(|val| val.downcast::(&mut cx).ok()) + .map(|val| val.value(&mut cx)) + .unwrap_or_default(); + + rt.spawn(async move { + let stats = table + .cleanup_old_versions(older_than, Some(delete_unverified)) + .await; + + deferred.settle_with(&channel, move |mut cx| { + let stats = stats.or_throw(&mut cx)?; + + let output_metrics = JsObject::new(&mut cx); + let bytes_removed = cx.number(stats.bytes_removed as f64); + output_metrics.set(&mut cx, "bytesRemoved", bytes_removed)?; + + let old_versions = cx.number(stats.old_versions as f64); + output_metrics.set(&mut cx, "oldVersions", old_versions)?; + + let output_table = cx.boxed(JsTable::from(table)); + + let output = JsObject::new(&mut cx); + output.set(&mut cx, "metrics", output_metrics)?; + output.set(&mut cx, "newTable", output_table)?; + + Ok(output) + }) + }); + Ok(promise) + } + + pub(crate) fn js_compact(mut cx: FunctionContext) -> JsResult { + let js_table = cx.this().downcast_or_throw::, _>(&mut cx)?; + let rt = runtime(&mut cx)?; + let (deferred, promise) = cx.promise(); + let mut table = js_table.table.clone(); + let channel = cx.channel(); + + let js_options = cx.argument::(0)?; + let mut options = CompactionOptions::default(); + + if let Some(target_rows) = + js_options.get_opt::(&mut cx, "targetRowsPerFragment")? + { + options.target_rows_per_fragment = target_rows.value(&mut cx) as usize; + } + if let Some(max_per_group) = + js_options.get_opt::(&mut cx, "maxRowsPerGroup")? + { + options.max_rows_per_group = max_per_group.value(&mut cx) as usize; + } + if let Some(materialize_deletions) = + js_options.get_opt::(&mut cx, "materializeDeletions")? + { + options.materialize_deletions = materialize_deletions.value(&mut cx); + } + if let Some(materialize_deletions_threshold) = + js_options.get_opt::(&mut cx, "materializeDeletionsThreshold")? + { + options.materialize_deletions_threshold = + materialize_deletions_threshold.value(&mut cx) as f32; + } + if let Some(num_threads) = js_options.get_opt::(&mut cx, "numThreads")? { + options.num_threads = num_threads.value(&mut cx) as usize; + } + + rt.spawn(async move { + let stats = table.compact_files(options).await; + + deferred.settle_with(&channel, move |mut cx| { + let stats = stats.or_throw(&mut cx)?; + + let output_metrics = JsObject::new(&mut cx); + let fragments_removed = cx.number(stats.fragments_removed as f64); + output_metrics.set(&mut cx, "fragmentsRemoved", fragments_removed)?; + + let fragments_added = cx.number(stats.fragments_added as f64); + output_metrics.set(&mut cx, "fragmentsAdded", fragments_added)?; + + let files_removed = cx.number(stats.files_removed as f64); + output_metrics.set(&mut cx, "filesRemoved", files_removed)?; + + let files_added = cx.number(stats.files_added as f64); + output_metrics.set(&mut cx, "filesAdded", files_added)?; + + let output_table = cx.boxed(JsTable::from(table)); + + let output = JsObject::new(&mut cx); + output.set(&mut cx, "metrics", output_metrics)?; + output.set(&mut cx, "newTable", output_table)?; + + Ok(output) + }) + }); + Ok(promise) + } } diff --git a/rust/vectordb/Cargo.toml b/rust/vectordb/Cargo.toml index 1415dc7a..b769d6f6 100644 --- a/rust/vectordb/Cargo.toml +++ b/rust/vectordb/Cargo.toml @@ -16,6 +16,7 @@ arrow-data = { workspace = true } arrow-schema = { workspace = true } arrow-ord = { workspace = true } arrow-cast = { workspace = true } +chrono = { workspace = true } object_store = { workspace = true } snafu = { workspace = true } half = { workspace = true } diff --git a/rust/vectordb/src/io/object_store.rs b/rust/vectordb/src/io/object_store.rs index a56b9243..77234b14 100644 --- a/rust/vectordb/src/io/object_store.rs +++ b/rust/vectordb/src/io/object_store.rs @@ -14,7 +14,6 @@ //! A mirroring object store that mirror writes to a secondary object store - use std::{ fmt::Formatter, pin::Pin, diff --git a/rust/vectordb/src/table.rs b/rust/vectordb/src/table.rs index 77341630..71bb9434 100644 --- a/rust/vectordb/src/table.rs +++ b/rust/vectordb/src/table.rs @@ -12,10 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use chrono::Duration; use std::sync::Arc; use arrow_array::{Float32Array, RecordBatchReader}; use arrow_schema::SchemaRef; +use lance::dataset::cleanup::RemovalStats; +use lance::dataset::optimize::{compact_files, CompactionMetrics, CompactionOptions}; use lance::dataset::{Dataset, WriteParams}; use lance::index::IndexType; use lance::io::object_store::WrappingObjectStore; @@ -305,6 +308,41 @@ impl Table { self.dataset = Arc::new(dataset); Ok(()) } + + /// Remove old versions of the dataset from disk. + /// + /// # Arguments + /// * `older_than` - The duration of time to keep versions of the dataset. + /// * `delete_unverified` - Because they may be part of an in-progress + /// transaction, files newer than 7 days old are not deleted by default. + /// If you are sure that there are no in-progress transactions, then you + /// can set this to True to delete all files older than `older_than`. + /// + /// This calls into [lance::dataset::Dataset::cleanup_old_versions] and + /// returns the result. + pub async fn cleanup_old_versions( + &self, + older_than: Duration, + delete_unverified: Option, + ) -> Result { + Ok(self + .dataset + .cleanup_old_versions(older_than, delete_unverified) + .await?) + } + + /// Compact files in the dataset. + /// + /// This can be run after making several small appends to optimize the table + /// for faster reads. + /// + /// This calls into [lance::dataset::optimize::compact_files]. + pub async fn compact_files(&mut self, options: CompactionOptions) -> Result { + let mut dataset = self.dataset.as_ref().clone(); + let metrics = compact_files(&mut dataset, options).await?; + self.dataset = Arc::new(dataset); + Ok(metrics) + } } #[cfg(test)]