From 3c487e5fc7e09cdd06c0c1a0a46b220f121c6d78 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 5 Dec 2024 14:44:50 -0800 Subject: [PATCH] perf: re-use table instance during write (#1909) Previously, whenever `Table.add()` was called, we would write and re-open the underlying dataset. This was bad for performance, as it reset the table cache and initiated a lot of IO. It also could be the source of bugs, since we didn't necessarily pass all the necessary connection options down when re-opening the table. Closes #1655 --- python/python/lancedb/table.py | 10 +----- rust/lancedb/src/table.rs | 62 ++++++++-------------------------- 2 files changed, 16 insertions(+), 56 deletions(-) diff --git a/python/python/lancedb/table.py b/python/python/lancedb/table.py index 45388ab8..07c4f17e 100644 --- a/python/python/lancedb/table.py +++ b/python/python/lancedb/table.py @@ -1624,15 +1624,7 @@ class LanceTable(Table): on_bad_vectors=on_bad_vectors, fill_value=fill_value, ) - # Access the dataset_mut property to ensure that the dataset is mutable. - self._ref.dataset_mut - self._ref.dataset = lance.write_dataset( - data, - self._dataset_uri, - schema=self.schema, - mode=mode, - storage_options=self._ref.storage_options, - ) + self._ref.dataset_mut.insert(data, mode=mode, schema=self.schema) def merge( self, diff --git a/rust/lancedb/src/table.rs b/rust/lancedb/src/table.rs index 11dfbb89..6ba2f241 100644 --- a/rust/lancedb/src/table.rs +++ b/rust/lancedb/src/table.rs @@ -14,7 +14,6 @@ //! LanceDB Table APIs -use std::collections::HashMap; use std::path::Path; use std::sync::Arc; @@ -37,7 +36,8 @@ pub use lance::dataset::ColumnAlteration; pub use lance::dataset::NewColumnTransform; pub use lance::dataset::ReadParams; use lance::dataset::{ - Dataset, UpdateBuilder as LanceUpdateBuilder, Version, WhenMatched, WriteMode, WriteParams, + Dataset, InsertBuilder, UpdateBuilder as LanceUpdateBuilder, Version, WhenMatched, WriteMode, + WriteParams, }; use lance::dataset::{MergeInsertBuilder as LanceMergeInsertBuilder, WhenNotMatchedBySource}; use lance::io::WrappingObjectStore; @@ -1046,12 +1046,6 @@ pub struct NativeTable { name: String, uri: String, pub(crate) dataset: dataset::DatasetConsistencyWrapper, - - // the object store wrapper to use on write path - store_wrapper: Option>, - - storage_options: HashMap, - // This comes from the connection options. We store here so we can pass down // to the dataset when we recreate it (for example, in checkout_latest). read_consistency_interval: Option, @@ -1117,13 +1111,6 @@ impl NativeTable { None => params, }; - let storage_options = params - .store_options - .clone() - .unwrap_or_default() - .storage_options - .unwrap_or_default(); - let dataset = DatasetBuilder::from_uri(uri) .with_read_params(params) .load() @@ -1141,8 +1128,6 @@ impl NativeTable { name: name.to_string(), uri: uri.to_string(), dataset, - store_wrapper: write_store_wrapper, - storage_options, read_consistency_interval, }) } @@ -1191,12 +1176,6 @@ impl NativeTable { Some(wrapper) => params.patch_with_store_wrapper(wrapper)?, None => params, }; - let storage_options = params - .store_params - .clone() - .unwrap_or_default() - .storage_options - .unwrap_or_default(); let dataset = Dataset::write(batches, uri, Some(params)) .await @@ -1210,8 +1189,6 @@ impl NativeTable { name: name.to_string(), uri: uri.to_string(), dataset: DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval), - store_wrapper: write_store_wrapper, - storage_options, read_consistency_interval, }) } @@ -1758,10 +1735,13 @@ impl TableInternal for NativeTable { add: AddDataBuilder, data: Box, ) -> Result<()> { - let data = - MaybeEmbedded::try_new(data, self.table_definition().await?, add.embedding_registry)?; + let data = Box::new(MaybeEmbedded::try_new( + data, + self.table_definition().await?, + add.embedding_registry, + )?) as Box; - let mut lance_params = add.write_options.lance_write_params.unwrap_or(WriteParams { + let lance_params = add.write_options.lance_write_params.unwrap_or(WriteParams { mode: match add.mode { AddDataMode::Append => WriteMode::Append, AddDataMode::Overwrite => WriteMode::Overwrite, @@ -1769,27 +1749,15 @@ impl TableInternal for NativeTable { ..Default::default() }); - // Bring storage options from table - let storage_options = lance_params - .store_params - .get_or_insert(Default::default()) - .storage_options - .get_or_insert(Default::default()); - for (key, value) in self.storage_options.iter() { - if !storage_options.contains_key(key) { - storage_options.insert(key.clone(), value.clone()); - } - } - - // patch the params if we have a write store wrapper - let lance_params = match self.store_wrapper.clone() { - Some(wrapper) => lance_params.patch_with_store_wrapper(wrapper)?, - None => lance_params, + let dataset = { + // Limited scope for the mutable borrow of self.dataset avoids deadlock. + let ds = self.dataset.get_mut().await?; + InsertBuilder::new(Arc::new(ds.clone())) + .with_params(&lance_params) + .execute_stream(data) + .await? }; - self.dataset.ensure_mutable().await?; - let dataset = Dataset::write(data, &self.uri, Some(lance_params)).await?; - self.dataset.set_latest(dataset).await; Ok(()) }