feat: cleanup and compaction (#518)

#488
This commit is contained in:
Will Jones
2023-10-11 12:49:12 -07:00
committed by GitHub
parent e41894b071
commit db7bdefe77
12 changed files with 394 additions and 7 deletions

View File

@@ -17,6 +17,7 @@ arrow-ord = "43.0"
arrow-schema = "43.0"
arrow-arith = "43.0"
arrow-cast = "43.0"
chrono = "0.4.23"
half = { "version" = "=2.2.1", default-features = false, features = [
"num-traits"
] }

View File

@@ -23,7 +23,7 @@ import { Query } from './query'
import { isEmbeddingFunction } from './embedding/embedding_function'
// eslint-disable-next-line @typescript-eslint/no-var-requires
const { databaseNew, databaseTableNames, databaseOpenTable, databaseDropTable, tableCreate, tableAdd, tableCreateVectorIndex, tableCountRows, tableDelete } = require('../native.js')
const { databaseNew, databaseTableNames, databaseOpenTable, databaseDropTable, tableCreate, tableAdd, tableCreateVectorIndex, tableCountRows, tableDelete, tableCleanupOldVersions, tableCompactFiles } = require('../native.js')
export { Query }
export type { EmbeddingFunction }
@@ -459,6 +459,111 @@ export class LocalTable<T = number[]> implements Table<T> {
async delete (filter: string): Promise<void> {
return tableDelete.call(this._tbl, filter).then((newTable: any) => { this._tbl = newTable })
}
/**
* Clean up old versions of the table, freeing disk space.
*
* @param olderThan The minimum age in minutes of the versions to delete. If not
* provided, defaults to two weeks.
* @param deleteUnverified Because they may be part of an in-progress
* transaction, uncommitted files newer than 7 days old are
* not deleted by default. This means that failed transactions
* can leave around data that takes up disk space for up to
* 7 days. You can override this safety mechanism by setting
* this option to `true`, only if you promise there are no
* in progress writes while you run this operation. Failure to
* uphold this promise can lead to corrupted tables.
* @returns
*/
async cleanupOldVersions (olderThan?: number, deleteUnverified?: boolean): Promise<CleanupStats> {
return tableCleanupOldVersions.call(this._tbl, olderThan, deleteUnverified)
.then((res: { newTable: any, metrics: CleanupStats }) => {
this._tbl = res.newTable
return res.metrics
})
}
/**
* Run the compaction process on the table.
*
* This can be run after making several small appends to optimize the table
* for faster reads.
*
* @param options Advanced options configuring compaction. In most cases, you
* can omit this arguments, as the default options are sensible
* for most tables.
* @returns Metrics about the compaction operation.
*/
async compactFiles (options?: CompactionOptions): Promise<CompactionMetrics> {
const optionsArg = options ?? {}
return tableCompactFiles.call(this._tbl, optionsArg)
.then((res: { newTable: any, metrics: CompactionMetrics }) => {
this._tbl = res.newTable
return res.metrics
})
}
}
export interface CleanupStats {
/**
* The number of bytes removed from disk.
*/
bytesRemoved: number
/**
* The number of old table versions removed.
*/
oldVersions: number
}
export interface CompactionOptions {
/**
* The number of rows per fragment to target. Fragments that have fewer rows
* will be compacted into adjacent fragments to produce larger fragments.
* Defaults to 1024 * 1024.
*/
targetRowsPerFragment?: number
/**
* The maximum number of rows per group. Defaults to 1024.
*/
maxRowsPerGroup?: number
/**
* If true, fragments that have rows that are deleted may be compacted to
* remove the deleted rows. This can improve the performance of queries.
* Default is true.
*/
materializeDeletions?: boolean
/**
* A number between 0 and 1, representing the proportion of rows that must be
* marked deleted before a fragment is a candidate for compaction to remove
* the deleted rows. Default is 10%.
*/
materializeDeletionsThreshold?: number
/**
* The number of threads to use for compaction. If not provided, defaults to
* the number of cores on the machine.
*/
numThreads?: number
}
export interface CompactionMetrics {
/**
* The number of fragments that were removed.
*/
fragmentsRemoved: number
/**
* The number of new fragments that were created.
*/
fragmentsAdded: number
/**
* The number of files that were removed. Each fragment may have more than one
* file.
*/
filesRemoved: number
/**
* The number of files added. This is typically equal to the number of
* fragments added.
*/
filesAdded: number
}
/// Config to build IVF_PQ index.

View File

@@ -18,7 +18,7 @@ import * as chai from 'chai'
import * as chaiAsPromised from 'chai-as-promised'
import * as lancedb from '../index'
import { type AwsCredentials, type EmbeddingFunction, MetricType, Query, WriteMode, DefaultWriteOptions, isWriteOptions } from '../index'
import { type AwsCredentials, type EmbeddingFunction, MetricType, Query, WriteMode, DefaultWriteOptions, isWriteOptions, type LocalTable } from '../index'
import { FixedSizeList, Field, Int32, makeVector, Schema, Utf8, Table as ArrowTable, vectorFromArray, Float32 } from 'apache-arrow'
const expect = chai.expect
@@ -446,3 +446,45 @@ describe('WriteOptions', function () {
})
})
})
describe('Compact and cleanup', function () {
it('can cleanup after compaction', async function () {
const dir = await track().mkdir('lancejs')
const con = await lancedb.connect(dir)
const data = [
{ price: 10, name: 'foo', vector: [1, 2, 3] },
{ price: 50, name: 'bar', vector: [4, 5, 6] }
]
const table = await con.createTable('t1', data) as LocalTable
const newData = [
{ price: 30, name: 'baz', vector: [7, 8, 9] }
]
await table.add(newData)
const compactionMetrics = await table.compactFiles({
numThreads: 2
})
assert.equal(compactionMetrics.fragmentsRemoved, 2)
assert.equal(compactionMetrics.fragmentsAdded, 1)
assert.equal(await table.countRows(), 3)
await table.cleanupOldVersions()
assert.equal(await table.countRows(), 3)
// should have no effect, but this validates the arguments are parsed.
await table.compactFiles({
targetRowsPerFragment: 1024 * 10,
maxRowsPerGroup: 1024,
materializeDeletions: true,
materializeDeletionsThreshold: 0.5,
numThreads: 2
})
const cleanupMetrics = await table.cleanupOldVersions(0, true)
assert.isAtLeast(cleanupMetrics.bytesRemoved, 1)
assert.isAtLeast(cleanupMetrics.oldVersions, 1)
assert.equal(await table.countRows(), 3)
})
})

View File

@@ -16,6 +16,7 @@ from __future__ import annotations
import inspect
import os
from abc import ABC, abstractmethod
from datetime import timedelta
from functools import cached_property
from typing import Any, Iterable, List, Optional, Union
@@ -24,7 +25,7 @@ import numpy as np
import pyarrow as pa
import pyarrow.compute as pc
from lance import LanceDataset
from lance.dataset import ReaderLike
from lance.dataset import CleanupStats, ReaderLike
from lance.vector import vec_to_table
from .common import DATA, VEC, VECTOR_COLUMN_NAME
@@ -395,6 +396,17 @@ class LanceTable(Table):
raise ValueError(f"Invalid version {version}")
self._reset_dataset(version=version)
try:
# Accessing the property updates the cached value
_ = self._dataset
except Exception as e:
if "not found" in str(e):
raise ValueError(
f"Version {version} no longer exists. Was it cleaned up?"
)
else:
raise e
def restore(self, version: int = None):
"""Restore a version of the table. This is an in-place operation.
@@ -878,6 +890,48 @@ class LanceTable(Table):
},
)
def cleanup_old_versions(
self,
older_than: Optional[timedelta] = None,
*,
delete_unverified: bool = False,
) -> CleanupStats:
"""
Clean up old versions of the table, freeing disk space.
Parameters
----------
older_than: timedelta, default None
The minimum age of the version to delete. If None, then this defaults
to two weeks.
delete_unverified: bool, default False
Because they may be part of an in-progress transaction, files newer
than 7 days old are not deleted by default. If you are sure that
there are no in-progress transactions, then you can set this to True
to delete all files older than `older_than`.
Returns
-------
CleanupStats
The stats of the cleanup operation, including how many bytes were
freed.
"""
return self.to_lance().cleanup_old_versions(
older_than, delete_unverified=delete_unverified
)
def compact_files(self, *args, **kwargs):
"""
Run the compaction process on the table.
This can be run after making several small appends to optimize the table
for faster reads.
Arguments are passed onto :meth:`lance.dataset.DatasetOptimizer.compact_files`.
For most cases, the default should be fine.
"""
return self.to_lance().optimize.compact_files(*args, **kwargs)
def _sanitize_schema(
data: pa.Table,

View File

@@ -12,6 +12,7 @@
# limitations under the License.
import functools
from datetime import timedelta
from pathlib import Path
from typing import List
from unittest.mock import PropertyMock, patch
@@ -442,3 +443,31 @@ def test_empty_query(db):
df = table.search().select(["id"]).where("text='bar'").limit(1).to_pandas()
val = df.id.iloc[0]
assert val == 1
def test_compact_cleanup(db):
table = LanceTable.create(
db,
"my_table",
data=[{"text": "foo", "id": 0}, {"text": "bar", "id": 1}],
)
table.add([{"text": "baz", "id": 2}])
assert len(table) == 3
assert table.version == 3
stats = table.compact_files()
assert len(table) == 3
assert table.version == 4
assert stats.fragments_removed > 0
assert stats.fragments_added == 1
stats = table.cleanup_old_versions()
assert stats.bytes_removed == 0
stats = table.cleanup_old_versions(older_than=timedelta(0), delete_unverified=True)
assert stats.bytes_removed > 0
assert table.version == 4
with pytest.raises(Exception, match="Version 3 no longer exists"):
table.checkout(3)

View File

@@ -13,6 +13,7 @@ crate-type = ["cdylib"]
arrow-array = { workspace = true }
arrow-ipc = { workspace = true }
arrow-schema = { workspace = true }
chrono = { workspace = true }
conv = "0.3.3"
once_cell = "1"
futures = "0.3"

View File

@@ -78,9 +78,11 @@ fn get_index_params_builder(
num_partitions.map(|np| {
let max_iters = max_iters.unwrap_or(50);
let mut ivf_params = IvfBuildParams::default();
ivf_params.num_partitions = np;
ivf_params.max_iters = max_iters;
let ivf_params = IvfBuildParams {
num_partitions: np,
max_iters,
..Default::default()
};
index_builder.ivf_params(ivf_params)
});

View File

@@ -237,6 +237,8 @@ fn main(mut cx: ModuleContext) -> NeonResult<()> {
cx.export_function("tableAdd", JsTable::js_add)?;
cx.export_function("tableCountRows", JsTable::js_count_rows)?;
cx.export_function("tableDelete", JsTable::js_delete)?;
cx.export_function("tableCleanupOldVersions", JsTable::js_cleanup)?;
cx.export_function("tableCompactFiles", JsTable::js_compact)?;
cx.export_function(
"tableCreateVectorIndex",
index::vector::table_create_vector_index,

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use arrow_array::RecordBatchIterator;
use lance::dataset::optimize::CompactionOptions;
use lance::dataset::{WriteMode, WriteParams};
use lance::io::object_store::ObjectStoreParams;
@@ -163,4 +164,116 @@ impl JsTable {
});
Ok(promise)
}
pub(crate) fn js_cleanup(mut cx: FunctionContext) -> JsResult<JsPromise> {
let js_table = cx.this().downcast_or_throw::<JsBox<JsTable>, _>(&mut cx)?;
let rt = runtime(&mut cx)?;
let (deferred, promise) = cx.promise();
let table = js_table.table.clone();
let channel = cx.channel();
let older_than: i64 = cx
.argument_opt(0)
.and_then(|val| val.downcast::<JsNumber, _>(&mut cx).ok())
.map(|val| val.value(&mut cx) as i64)
.unwrap_or_else(|| 2 * 7 * 24 * 60); // 2 weeks
let older_than = chrono::Duration::minutes(older_than);
let delete_unverified: bool = cx
.argument_opt(1)
.and_then(|val| val.downcast::<JsBoolean, _>(&mut cx).ok())
.map(|val| val.value(&mut cx))
.unwrap_or_default();
rt.spawn(async move {
let stats = table
.cleanup_old_versions(older_than, Some(delete_unverified))
.await;
deferred.settle_with(&channel, move |mut cx| {
let stats = stats.or_throw(&mut cx)?;
let output_metrics = JsObject::new(&mut cx);
let bytes_removed = cx.number(stats.bytes_removed as f64);
output_metrics.set(&mut cx, "bytesRemoved", bytes_removed)?;
let old_versions = cx.number(stats.old_versions as f64);
output_metrics.set(&mut cx, "oldVersions", old_versions)?;
let output_table = cx.boxed(JsTable::from(table));
let output = JsObject::new(&mut cx);
output.set(&mut cx, "metrics", output_metrics)?;
output.set(&mut cx, "newTable", output_table)?;
Ok(output)
})
});
Ok(promise)
}
pub(crate) fn js_compact(mut cx: FunctionContext) -> JsResult<JsPromise> {
let js_table = cx.this().downcast_or_throw::<JsBox<JsTable>, _>(&mut cx)?;
let rt = runtime(&mut cx)?;
let (deferred, promise) = cx.promise();
let mut table = js_table.table.clone();
let channel = cx.channel();
let js_options = cx.argument::<JsObject>(0)?;
let mut options = CompactionOptions::default();
if let Some(target_rows) =
js_options.get_opt::<JsNumber, _, _>(&mut cx, "targetRowsPerFragment")?
{
options.target_rows_per_fragment = target_rows.value(&mut cx) as usize;
}
if let Some(max_per_group) =
js_options.get_opt::<JsNumber, _, _>(&mut cx, "maxRowsPerGroup")?
{
options.max_rows_per_group = max_per_group.value(&mut cx) as usize;
}
if let Some(materialize_deletions) =
js_options.get_opt::<JsBoolean, _, _>(&mut cx, "materializeDeletions")?
{
options.materialize_deletions = materialize_deletions.value(&mut cx);
}
if let Some(materialize_deletions_threshold) =
js_options.get_opt::<JsNumber, _, _>(&mut cx, "materializeDeletionsThreshold")?
{
options.materialize_deletions_threshold =
materialize_deletions_threshold.value(&mut cx) as f32;
}
if let Some(num_threads) = js_options.get_opt::<JsNumber, _, _>(&mut cx, "numThreads")? {
options.num_threads = num_threads.value(&mut cx) as usize;
}
rt.spawn(async move {
let stats = table.compact_files(options).await;
deferred.settle_with(&channel, move |mut cx| {
let stats = stats.or_throw(&mut cx)?;
let output_metrics = JsObject::new(&mut cx);
let fragments_removed = cx.number(stats.fragments_removed as f64);
output_metrics.set(&mut cx, "fragmentsRemoved", fragments_removed)?;
let fragments_added = cx.number(stats.fragments_added as f64);
output_metrics.set(&mut cx, "fragmentsAdded", fragments_added)?;
let files_removed = cx.number(stats.files_removed as f64);
output_metrics.set(&mut cx, "filesRemoved", files_removed)?;
let files_added = cx.number(stats.files_added as f64);
output_metrics.set(&mut cx, "filesAdded", files_added)?;
let output_table = cx.boxed(JsTable::from(table));
let output = JsObject::new(&mut cx);
output.set(&mut cx, "metrics", output_metrics)?;
output.set(&mut cx, "newTable", output_table)?;
Ok(output)
})
});
Ok(promise)
}
}

View File

@@ -16,6 +16,7 @@ arrow-data = { workspace = true }
arrow-schema = { workspace = true }
arrow-ord = { workspace = true }
arrow-cast = { workspace = true }
chrono = { workspace = true }
object_store = { workspace = true }
snafu = { workspace = true }
half = { workspace = true }

View File

@@ -14,7 +14,6 @@
//! A mirroring object store that mirror writes to a secondary object store
use std::{
fmt::Formatter,
pin::Pin,

View File

@@ -12,10 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use chrono::Duration;
use std::sync::Arc;
use arrow_array::{Float32Array, RecordBatchReader};
use arrow_schema::SchemaRef;
use lance::dataset::cleanup::RemovalStats;
use lance::dataset::optimize::{compact_files, CompactionMetrics, CompactionOptions};
use lance::dataset::{Dataset, WriteParams};
use lance::index::IndexType;
use lance::io::object_store::WrappingObjectStore;
@@ -305,6 +308,41 @@ impl Table {
self.dataset = Arc::new(dataset);
Ok(())
}
/// Remove old versions of the dataset from disk.
///
/// # Arguments
/// * `older_than` - The duration of time to keep versions of the dataset.
/// * `delete_unverified` - Because they may be part of an in-progress
/// transaction, files newer than 7 days old are not deleted by default.
/// If you are sure that there are no in-progress transactions, then you
/// can set this to True to delete all files older than `older_than`.
///
/// This calls into [lance::dataset::Dataset::cleanup_old_versions] and
/// returns the result.
pub async fn cleanup_old_versions(
&self,
older_than: Duration,
delete_unverified: Option<bool>,
) -> Result<RemovalStats> {
Ok(self
.dataset
.cleanup_old_versions(older_than, delete_unverified)
.await?)
}
/// Compact files in the dataset.
///
/// This can be run after making several small appends to optimize the table
/// for faster reads.
///
/// This calls into [lance::dataset::optimize::compact_files].
pub async fn compact_files(&mut self, options: CompactionOptions) -> Result<CompactionMetrics> {
let mut dataset = self.dataset.as_ref().clone();
let metrics = compact_files(&mut dataset, options).await?;
self.dataset = Arc::new(dataset);
Ok(metrics)
}
}
#[cfg(test)]