perf: re-use table instance during write (#1909)

Previously, whenever `Table.add()` was called, we would write and
re-open the underlying dataset. This was bad for performance, as it
reset the table cache and initiated a lot of IO. It also could be the
source of bugs, since we didn't necessarily pass all the necessary
connection options down when re-opening the table.

Closes #1655
This commit is contained in:
Will Jones
2024-12-05 14:44:50 -08:00
committed by GitHub
parent d6219d687c
commit 3c487e5fc7
2 changed files with 16 additions and 56 deletions

View File

@@ -1624,15 +1624,7 @@ class LanceTable(Table):
on_bad_vectors=on_bad_vectors,
fill_value=fill_value,
)
# Access the dataset_mut property to ensure that the dataset is mutable.
self._ref.dataset_mut
self._ref.dataset = lance.write_dataset(
data,
self._dataset_uri,
schema=self.schema,
mode=mode,
storage_options=self._ref.storage_options,
)
self._ref.dataset_mut.insert(data, mode=mode, schema=self.schema)
def merge(
self,

View File

@@ -14,7 +14,6 @@
//! LanceDB Table APIs
use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
@@ -37,7 +36,8 @@ pub use lance::dataset::ColumnAlteration;
pub use lance::dataset::NewColumnTransform;
pub use lance::dataset::ReadParams;
use lance::dataset::{
Dataset, UpdateBuilder as LanceUpdateBuilder, Version, WhenMatched, WriteMode, WriteParams,
Dataset, InsertBuilder, UpdateBuilder as LanceUpdateBuilder, Version, WhenMatched, WriteMode,
WriteParams,
};
use lance::dataset::{MergeInsertBuilder as LanceMergeInsertBuilder, WhenNotMatchedBySource};
use lance::io::WrappingObjectStore;
@@ -1046,12 +1046,6 @@ pub struct NativeTable {
name: String,
uri: String,
pub(crate) dataset: dataset::DatasetConsistencyWrapper,
// the object store wrapper to use on write path
store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
storage_options: HashMap<String, String>,
// This comes from the connection options. We store here so we can pass down
// to the dataset when we recreate it (for example, in checkout_latest).
read_consistency_interval: Option<std::time::Duration>,
@@ -1117,13 +1111,6 @@ impl NativeTable {
None => params,
};
let storage_options = params
.store_options
.clone()
.unwrap_or_default()
.storage_options
.unwrap_or_default();
let dataset = DatasetBuilder::from_uri(uri)
.with_read_params(params)
.load()
@@ -1141,8 +1128,6 @@ impl NativeTable {
name: name.to_string(),
uri: uri.to_string(),
dataset,
store_wrapper: write_store_wrapper,
storage_options,
read_consistency_interval,
})
}
@@ -1191,12 +1176,6 @@ impl NativeTable {
Some(wrapper) => params.patch_with_store_wrapper(wrapper)?,
None => params,
};
let storage_options = params
.store_params
.clone()
.unwrap_or_default()
.storage_options
.unwrap_or_default();
let dataset = Dataset::write(batches, uri, Some(params))
.await
@@ -1210,8 +1189,6 @@ impl NativeTable {
name: name.to_string(),
uri: uri.to_string(),
dataset: DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval),
store_wrapper: write_store_wrapper,
storage_options,
read_consistency_interval,
})
}
@@ -1758,10 +1735,13 @@ impl TableInternal for NativeTable {
add: AddDataBuilder<NoData>,
data: Box<dyn RecordBatchReader + Send>,
) -> Result<()> {
let data =
MaybeEmbedded::try_new(data, self.table_definition().await?, add.embedding_registry)?;
let data = Box::new(MaybeEmbedded::try_new(
data,
self.table_definition().await?,
add.embedding_registry,
)?) as Box<dyn RecordBatchReader + Send>;
let mut lance_params = add.write_options.lance_write_params.unwrap_or(WriteParams {
let lance_params = add.write_options.lance_write_params.unwrap_or(WriteParams {
mode: match add.mode {
AddDataMode::Append => WriteMode::Append,
AddDataMode::Overwrite => WriteMode::Overwrite,
@@ -1769,27 +1749,15 @@ impl TableInternal for NativeTable {
..Default::default()
});
// Bring storage options from table
let storage_options = lance_params
.store_params
.get_or_insert(Default::default())
.storage_options
.get_or_insert(Default::default());
for (key, value) in self.storage_options.iter() {
if !storage_options.contains_key(key) {
storage_options.insert(key.clone(), value.clone());
}
}
// patch the params if we have a write store wrapper
let lance_params = match self.store_wrapper.clone() {
Some(wrapper) => lance_params.patch_with_store_wrapper(wrapper)?,
None => lance_params,
let dataset = {
// Limited scope for the mutable borrow of self.dataset avoids deadlock.
let ds = self.dataset.get_mut().await?;
InsertBuilder::new(Arc::new(ds.clone()))
.with_params(&lance_params)
.execute_stream(data)
.await?
};
self.dataset.ensure_mutable().await?;
let dataset = Dataset::write(data, &self.uri, Some(lance_params)).await?;
self.dataset.set_latest(dataset).await;
Ok(())
}