mirror of
https://github.com/lancedb/lancedb.git
synced 2026-05-14 10:30:40 +00:00
refactor: modularize table.rs and extract delete logic (#2952)
References #2949 Moved DeleteResult and delete() implementation to src/table/delete.rs. No functional changes. Added a test delete which works. Will work on refactoring update next.
This commit is contained in:
@@ -79,10 +79,11 @@ use self::merge::MergeInsertBuilder;
|
||||
|
||||
pub mod datafusion;
|
||||
pub(crate) mod dataset;
|
||||
pub mod delete;
|
||||
pub mod merge;
|
||||
|
||||
use crate::index::waiter::wait_for_index;
|
||||
pub use chrono::Duration;
|
||||
pub use delete::DeleteResult;
|
||||
use futures::future::{join_all, Either};
|
||||
pub use lance::dataset::optimize::CompactionOptions;
|
||||
pub use lance::dataset::refs::{TagContents, Tags as LanceTags};
|
||||
@@ -446,15 +447,6 @@ pub struct AddResult {
|
||||
pub version: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
pub struct DeleteResult {
|
||||
// The commit version associated with the operation.
|
||||
// A version of `0` indicates compatibility with legacy servers that do not return
|
||||
/// a commit version.
|
||||
#[serde(default)]
|
||||
pub version: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
pub struct MergeResult {
|
||||
// The commit version associated with the operation.
|
||||
@@ -3078,11 +3070,8 @@ impl BaseTable for NativeTable {
|
||||
|
||||
/// Delete rows from the table
|
||||
async fn delete(&self, predicate: &str) -> Result<DeleteResult> {
|
||||
let mut dataset = self.dataset.get_mut().await?;
|
||||
dataset.delete(predicate).await?;
|
||||
Ok(DeleteResult {
|
||||
version: dataset.version().version,
|
||||
})
|
||||
// Delegate to the submodule implementation
|
||||
delete::execute_delete(self, predicate).await
|
||||
}
|
||||
|
||||
async fn tags(&self) -> Result<Box<dyn Tags + '_>> {
|
||||
|
||||
161
rust/lancedb/src/table/delete.rs
Normal file
161
rust/lancedb/src/table/delete.rs
Normal file
@@ -0,0 +1,161 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::NativeTable;
|
||||
use crate::Result;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
pub struct DeleteResult {
|
||||
// The commit version associated with the operation.
|
||||
// A version of `0` indicates compatibility with legacy servers that do not return
|
||||
/// a commit version.
|
||||
#[serde(default)]
|
||||
pub version: u64,
|
||||
}
|
||||
|
||||
/// Internal implementation of the delete logic
|
||||
///
|
||||
/// This logic was moved from NativeTable::delete to keep table.rs clean.
|
||||
pub(crate) async fn execute_delete(table: &NativeTable, predicate: &str) -> Result<DeleteResult> {
|
||||
// We access the dataset from the table. Since this is in the same module hierarchy (super),
|
||||
// and 'dataset' is pub(crate), we can access it.
|
||||
let mut dataset = table.dataset.get_mut().await?;
|
||||
|
||||
// Perform the actual delete on the Lance dataset
|
||||
dataset.delete(predicate).await?;
|
||||
|
||||
// Return the result with the new version
|
||||
Ok(DeleteResult {
|
||||
version: dataset.version().version,
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::connect;
|
||||
use arrow_array::{record_batch, Int32Array, RecordBatch, RecordBatchIterator};
|
||||
use arrow_schema::{DataType, Field, Schema};
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::query::ExecutableQuery;
|
||||
use futures::TryStreamExt;
|
||||
#[tokio::test]
|
||||
async fn test_delete_simple() {
|
||||
let conn = connect("memory://").execute().await.unwrap();
|
||||
|
||||
// 1. Create a table with values 0 to 9
|
||||
let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)]));
|
||||
let batch = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![Arc::new(Int32Array::from_iter_values(0..10))],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let table = conn
|
||||
.create_table(
|
||||
"test_delete",
|
||||
RecordBatchIterator::new(vec![Ok(batch)], schema),
|
||||
)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// 2. Verify initial state
|
||||
assert_eq!(table.count_rows(None).await.unwrap(), 10);
|
||||
|
||||
// 3. Execute Delete (removes values > 5)
|
||||
table.delete("i > 5").await.unwrap();
|
||||
|
||||
// 4. Verify results
|
||||
assert_eq!(table.count_rows(None).await.unwrap(), 6); // 0, 1, 2, 3, 4, 5 remain
|
||||
|
||||
// 5. Verify specific data consistency
|
||||
let batches = table
|
||||
.query()
|
||||
.execute()
|
||||
.await
|
||||
.unwrap()
|
||||
.try_collect::<Vec<_>>()
|
||||
.await
|
||||
.unwrap();
|
||||
let batch = &batches[0];
|
||||
let array = batch
|
||||
.column(0)
|
||||
.as_any()
|
||||
.downcast_ref::<Int32Array>()
|
||||
.unwrap();
|
||||
|
||||
// Ensure no value > 5 exists
|
||||
for val in array.iter() {
|
||||
assert!(val.unwrap() <= 5);
|
||||
}
|
||||
}
|
||||
#[tokio::test]
|
||||
async fn rows_removed_schema_same() {
|
||||
let conn = connect("memory://").execute().await.unwrap();
|
||||
let batch = record_batch!(
|
||||
("id", Int32, [1, 2, 3, 4, 5]),
|
||||
("name", Utf8, ["a", "b", "c", "d", "e"])
|
||||
)
|
||||
.unwrap();
|
||||
let original_schema = batch.schema();
|
||||
|
||||
let table = conn
|
||||
.create_table(
|
||||
"test_delete_all",
|
||||
RecordBatchIterator::new(vec![Ok(batch)], original_schema.clone()),
|
||||
)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
table.delete("true").await.unwrap();
|
||||
|
||||
assert_eq!(table.count_rows(None).await.unwrap(), 0);
|
||||
|
||||
let current_schema = table.schema().await.unwrap();
|
||||
//check if the original schema is the same as current
|
||||
assert_eq!(current_schema, original_schema);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_delete_false_increments_version() {
|
||||
let conn = connect("memory://").execute().await.unwrap();
|
||||
|
||||
// Create a table with 5 rows
|
||||
let batch = record_batch!(("id", Int32, [1, 2, 3, 4, 5])).unwrap();
|
||||
|
||||
let schema = batch.schema();
|
||||
|
||||
let table = conn
|
||||
.create_table(
|
||||
"test_delete_noop",
|
||||
RecordBatchIterator::new(vec![Ok(batch)], schema),
|
||||
)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Capture the initial state (Rows = 5, Version = 1)
|
||||
let initial_rows = table.count_rows(None).await.unwrap();
|
||||
let initial_version = table.version().await.unwrap();
|
||||
|
||||
assert_eq!(initial_rows, 5);
|
||||
table.delete("false").await.unwrap();
|
||||
|
||||
// Rows should still be 5
|
||||
let current_rows = table.count_rows(None).await.unwrap();
|
||||
assert_eq!(
|
||||
current_rows, initial_rows,
|
||||
"Data should not change when predicate is false"
|
||||
);
|
||||
|
||||
// version check
|
||||
let current_version = table.version().await.unwrap();
|
||||
assert!(
|
||||
current_version > initial_version,
|
||||
"Table version must increment after delete operation"
|
||||
);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user