feat: cleanup and compaction (#518)

#488
This commit is contained in:
Will Jones
2023-10-11 12:49:12 -07:00
committed by Weston Pace
parent 541b06664f
commit c07207c661
12 changed files with 394 additions and 7 deletions

View File

@@ -13,6 +13,7 @@ crate-type = ["cdylib"]
arrow-array = { workspace = true }
arrow-ipc = { workspace = true }
arrow-schema = { workspace = true }
chrono = { workspace = true }
conv = "0.3.3"
once_cell = "1"
futures = "0.3"

View File

@@ -78,9 +78,11 @@ fn get_index_params_builder(
num_partitions.map(|np| {
let max_iters = max_iters.unwrap_or(50);
let mut ivf_params = IvfBuildParams::default();
ivf_params.num_partitions = np;
ivf_params.max_iters = max_iters;
let ivf_params = IvfBuildParams {
num_partitions: np,
max_iters,
..Default::default()
};
index_builder.ivf_params(ivf_params)
});

View File

@@ -237,6 +237,8 @@ fn main(mut cx: ModuleContext) -> NeonResult<()> {
cx.export_function("tableAdd", JsTable::js_add)?;
cx.export_function("tableCountRows", JsTable::js_count_rows)?;
cx.export_function("tableDelete", JsTable::js_delete)?;
cx.export_function("tableCleanupOldVersions", JsTable::js_cleanup)?;
cx.export_function("tableCompactFiles", JsTable::js_compact)?;
cx.export_function(
"tableCreateVectorIndex",
index::vector::table_create_vector_index,

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use arrow_array::RecordBatchIterator;
use lance::dataset::optimize::CompactionOptions;
use lance::dataset::{WriteMode, WriteParams};
use lance::io::object_store::ObjectStoreParams;
@@ -163,4 +164,116 @@ impl JsTable {
});
Ok(promise)
}
pub(crate) fn js_cleanup(mut cx: FunctionContext) -> JsResult<JsPromise> {
let js_table = cx.this().downcast_or_throw::<JsBox<JsTable>, _>(&mut cx)?;
let rt = runtime(&mut cx)?;
let (deferred, promise) = cx.promise();
let table = js_table.table.clone();
let channel = cx.channel();
let older_than: i64 = cx
.argument_opt(0)
.and_then(|val| val.downcast::<JsNumber, _>(&mut cx).ok())
.map(|val| val.value(&mut cx) as i64)
.unwrap_or_else(|| 2 * 7 * 24 * 60); // 2 weeks
let older_than = chrono::Duration::minutes(older_than);
let delete_unverified: bool = cx
.argument_opt(1)
.and_then(|val| val.downcast::<JsBoolean, _>(&mut cx).ok())
.map(|val| val.value(&mut cx))
.unwrap_or_default();
rt.spawn(async move {
let stats = table
.cleanup_old_versions(older_than, Some(delete_unverified))
.await;
deferred.settle_with(&channel, move |mut cx| {
let stats = stats.or_throw(&mut cx)?;
let output_metrics = JsObject::new(&mut cx);
let bytes_removed = cx.number(stats.bytes_removed as f64);
output_metrics.set(&mut cx, "bytesRemoved", bytes_removed)?;
let old_versions = cx.number(stats.old_versions as f64);
output_metrics.set(&mut cx, "oldVersions", old_versions)?;
let output_table = cx.boxed(JsTable::from(table));
let output = JsObject::new(&mut cx);
output.set(&mut cx, "metrics", output_metrics)?;
output.set(&mut cx, "newTable", output_table)?;
Ok(output)
})
});
Ok(promise)
}
pub(crate) fn js_compact(mut cx: FunctionContext) -> JsResult<JsPromise> {
let js_table = cx.this().downcast_or_throw::<JsBox<JsTable>, _>(&mut cx)?;
let rt = runtime(&mut cx)?;
let (deferred, promise) = cx.promise();
let mut table = js_table.table.clone();
let channel = cx.channel();
let js_options = cx.argument::<JsObject>(0)?;
let mut options = CompactionOptions::default();
if let Some(target_rows) =
js_options.get_opt::<JsNumber, _, _>(&mut cx, "targetRowsPerFragment")?
{
options.target_rows_per_fragment = target_rows.value(&mut cx) as usize;
}
if let Some(max_per_group) =
js_options.get_opt::<JsNumber, _, _>(&mut cx, "maxRowsPerGroup")?
{
options.max_rows_per_group = max_per_group.value(&mut cx) as usize;
}
if let Some(materialize_deletions) =
js_options.get_opt::<JsBoolean, _, _>(&mut cx, "materializeDeletions")?
{
options.materialize_deletions = materialize_deletions.value(&mut cx);
}
if let Some(materialize_deletions_threshold) =
js_options.get_opt::<JsNumber, _, _>(&mut cx, "materializeDeletionsThreshold")?
{
options.materialize_deletions_threshold =
materialize_deletions_threshold.value(&mut cx) as f32;
}
if let Some(num_threads) = js_options.get_opt::<JsNumber, _, _>(&mut cx, "numThreads")? {
options.num_threads = num_threads.value(&mut cx) as usize;
}
rt.spawn(async move {
let stats = table.compact_files(options).await;
deferred.settle_with(&channel, move |mut cx| {
let stats = stats.or_throw(&mut cx)?;
let output_metrics = JsObject::new(&mut cx);
let fragments_removed = cx.number(stats.fragments_removed as f64);
output_metrics.set(&mut cx, "fragmentsRemoved", fragments_removed)?;
let fragments_added = cx.number(stats.fragments_added as f64);
output_metrics.set(&mut cx, "fragmentsAdded", fragments_added)?;
let files_removed = cx.number(stats.files_removed as f64);
output_metrics.set(&mut cx, "filesRemoved", files_removed)?;
let files_added = cx.number(stats.files_added as f64);
output_metrics.set(&mut cx, "filesAdded", files_added)?;
let output_table = cx.boxed(JsTable::from(table));
let output = JsObject::new(&mut cx);
output.set(&mut cx, "metrics", output_metrics)?;
output.set(&mut cx, "newTable", output_table)?;
Ok(output)
})
});
Ok(promise)
}
}

View File

@@ -16,6 +16,7 @@ arrow-data = { workspace = true }
arrow-schema = { workspace = true }
arrow-ord = { workspace = true }
arrow-cast = { workspace = true }
chrono = { workspace = true }
object_store = { workspace = true }
snafu = { workspace = true }
half = { workspace = true }

View File

@@ -14,7 +14,6 @@
//! A mirroring object store that mirror writes to a secondary object store
use std::{
fmt::Formatter,
pin::Pin,

View File

@@ -12,10 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use chrono::Duration;
use std::sync::Arc;
use arrow_array::{Float32Array, RecordBatchReader};
use arrow_schema::SchemaRef;
use lance::dataset::cleanup::RemovalStats;
use lance::dataset::optimize::{compact_files, CompactionMetrics, CompactionOptions};
use lance::dataset::{Dataset, WriteParams};
use lance::index::IndexType;
use lance::io::object_store::WrappingObjectStore;
@@ -305,6 +308,41 @@ impl Table {
self.dataset = Arc::new(dataset);
Ok(())
}
/// Remove old versions of the dataset from disk.
///
/// # Arguments
/// * `older_than` - The duration of time to keep versions of the dataset.
/// * `delete_unverified` - Because they may be part of an in-progress
/// transaction, files newer than 7 days old are not deleted by default.
/// If you are sure that there are no in-progress transactions, then you
/// can set this to True to delete all files older than `older_than`.
///
/// This calls into [lance::dataset::Dataset::cleanup_old_versions] and
/// returns the result.
pub async fn cleanup_old_versions(
&self,
older_than: Duration,
delete_unverified: Option<bool>,
) -> Result<RemovalStats> {
Ok(self
.dataset
.cleanup_old_versions(older_than, delete_unverified)
.await?)
}
/// Compact files in the dataset.
///
/// This can be run after making several small appends to optimize the table
/// for faster reads.
///
/// This calls into [lance::dataset::optimize::compact_files].
pub async fn compact_files(&mut self, options: CompactionOptions) -> Result<CompactionMetrics> {
let mut dataset = self.dataset.as_ref().clone();
let metrics = compact_files(&mut dataset, options).await?;
self.dataset = Arc::new(dataset);
Ok(metrics)
}
}
#[cfg(test)]