feat: implement bindings to return merge stats (#2367)

Based on this comment:
https://github.com/lancedb/lancedb/issues/2228#issuecomment-2730463075
and https://github.com/lancedb/lance/pull/2357

Here is my attempt at implementing bindings for returning merge stats
from a `merge_insert.execute` call for lancedb.

Note: I have almost no idea what I am doing in Rust but tried to follow
existing code patterns and pay attention to compiler hints.
- The change in nodejs binding appeared to be necessary to get
compilation to work, presumably this could actual work properly by
returning some kind of NAPI JS object of the stats data?
- I am unsure of what to do with the remote/table.rs changes -
necessarily for compilation to work; I assume this is related to LanceDB
cloud, but unsure the best way to handle that at this point.

Proof of function:

```python
import pandas as pd
import lancedb


db = lancedb.connect("/tmp/test.db")

test_data = pd.DataFrame(
    {
        "title": ["Hello", "Test Document", "Example", "Data Sample", "Last One"],
        "id": [1, 2, 3, 4, 5],
        "content": [
            "World",
            "This is a test",
            "Another example",
            "More test data",
            "Final entry",
        ],
    }
)

table = db.create_table("documents", data=test_data, exist_ok=True, mode="overwrite")

update_data = pd.DataFrame(
    {
        "title": [
            "Hello, World",
            "Test Document, it's good",
            "Example",
            "Data Sample",
            "Last One",
            "New One",
        ],
        "id": [1, 2, 3, 4, 5, 6],
        "content": [
            "World",
            "This is a test",
            "Another example",
            "More test data",
            "Final entry",
            "New content",
        ],
    }
)

stats = (
    table.merge_insert(on="id")
    .when_matched_update_all()
    .when_not_matched_insert_all()
    .execute(update_data)
)

print(stats)
```

returns

```
{'num_inserted_rows': 1, 'num_updated_rows': 5, 'num_deleted_rows': 0}
```

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Summary by CodeRabbit

- **New Features**
- Merge-insert operations now return detailed statistics, including
counts of inserted, updated, and deleted rows.
- **Bug Fixes**
- Tests updated to validate returned merge-insert statistics for
accuracy.
- **Documentation**
- Method documentation improved to reflect new return values and clarify
merge operation results.
- Added documentation for the new `MergeStats` interface detailing
operation statistics.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: Will Jones <willjones127@gmail.com>
This commit is contained in:
Alex Pilon
2025-05-01 13:00:20 -04:00
committed by GitHub
parent 5deb26bc8b
commit f315f9665a
13 changed files with 140 additions and 34 deletions

View File

@@ -33,20 +33,20 @@ Construct a MergeInsertBuilder. __Internal use only.__
### execute()
```ts
execute(data): Promise<void>
execute(data): Promise<MergeStats>
```
Executes the merge insert operation
Nothing is returned but the `Table` is updated
#### Parameters
* **data**: [`Data`](../type-aliases/Data.md)
#### Returns
`Promise`&lt;`void`&gt;
`Promise`&lt;[`MergeStats`](../interfaces/MergeStats.md)&gt;
Statistics about the merge operation: counts of inserted, updated, and deleted rows
***

View File

@@ -54,6 +54,7 @@
- [IndexStatistics](interfaces/IndexStatistics.md)
- [IvfFlatOptions](interfaces/IvfFlatOptions.md)
- [IvfPqOptions](interfaces/IvfPqOptions.md)
- [MergeStats](interfaces/MergeStats.md)
- [OpenTableOptions](interfaces/OpenTableOptions.md)
- [OptimizeOptions](interfaces/OptimizeOptions.md)
- [OptimizeStats](interfaces/OptimizeStats.md)

View File

@@ -0,0 +1,31 @@
[**@lancedb/lancedb**](../README.md) • **Docs**
***
[@lancedb/lancedb](../globals.md) / MergeStats
# Interface: MergeStats
## Properties
### numDeletedRows
```ts
numDeletedRows: bigint;
```
***
### numInsertedRows
```ts
numInsertedRows: bigint;
```
***
### numUpdatedRows
```ts
numUpdatedRows: bigint;
```

View File

@@ -338,11 +338,16 @@ describe("merge insert", () => {
{ a: 3, b: "y" },
{ a: 4, b: "z" },
];
await table
const stats = await table
.mergeInsert("a")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute(newData);
expect(stats.numInsertedRows).toBe(1n);
expect(stats.numUpdatedRows).toBe(2n);
expect(stats.numDeletedRows).toBe(0n);
const expected = [
{ a: 1, b: "a" },
{ a: 2, b: "x" },

View File

@@ -28,6 +28,7 @@ export {
FragmentSummaryStats,
Tags,
TagContents,
MergeStats,
} from "./native.js";
export {

View File

@@ -1,7 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
import { Data, Schema, fromDataToBuffer } from "./arrow";
import { NativeMergeInsertBuilder } from "./native";
import { MergeStats, NativeMergeInsertBuilder } from "./native";
/** A builder used to create and run a merge insert operation */
export class MergeInsertBuilder {
@@ -73,9 +73,9 @@ export class MergeInsertBuilder {
/**
* Executes the merge insert operation
*
* Nothing is returned but the `Table` is updated
* @returns Statistics about the merge operation: counts of inserted, updated, and deleted rows
*/
async execute(data: Data): Promise<void> {
async execute(data: Data): Promise<MergeStats> {
let schema: Schema;
if (this.#schema instanceof Promise) {
schema = await this.#schema;
@@ -84,6 +84,6 @@ export class MergeInsertBuilder {
schema = this.#schema;
}
const buffer = await fromDataToBuffer(data, undefined, schema);
await this.#native.execute(buffer);
return await this.#native.execute(buffer);
}
}

View File

@@ -37,7 +37,7 @@ impl NativeMergeInsertBuilder {
}
#[napi(catch_unwind)]
pub async fn execute(&self, buf: Buffer) -> napi::Result<()> {
pub async fn execute(&self, buf: Buffer) -> napi::Result<MergeStats> {
let data = ipc_file_to_batches(buf.to_vec())
.and_then(IntoArrow::into_arrow)
.map_err(|e| {
@@ -46,12 +46,14 @@ impl NativeMergeInsertBuilder {
let this = self.clone();
this.inner.execute(data).await.map_err(|e| {
let stats = this.inner.execute(data).await.map_err(|e| {
napi::Error::from_reason(format!(
"Failed to execute merge insert: {}",
convert_error(&e)
))
})
})?;
Ok(stats.into())
}
}
@@ -60,3 +62,20 @@ impl From<MergeInsertBuilder> for NativeMergeInsertBuilder {
Self { inner }
}
}
#[napi(object)]
pub struct MergeStats {
pub num_inserted_rows: BigInt,
pub num_updated_rows: BigInt,
pub num_deleted_rows: BigInt,
}
impl From<lancedb::table::MergeStats> for MergeStats {
fn from(stats: lancedb::table::MergeStats) -> Self {
Self {
num_inserted_rows: stats.num_inserted_rows.into(),
num_updated_rows: stats.num_updated_rows.into(),
num_deleted_rows: stats.num_deleted_rows.into(),
}
}
}

View File

@@ -962,10 +962,12 @@ class Table(ABC):
>>> table = db.create_table("my_table", data)
>>> new_data = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]})
>>> # Perform a "upsert" operation
>>> table.merge_insert("a") \\
>>> stats = table.merge_insert("a") \\
... .when_matched_update_all() \\
... .when_not_matched_insert_all() \\
... .execute(new_data)
>>> stats
{'num_inserted_rows': 1, 'num_updated_rows': 2, 'num_deleted_rows': 0}
>>> # The order of new rows is non-deterministic since we use
>>> # a hash-join as part of this operation and so we sort here
>>> table.to_arrow().sort_by("a").to_pandas()
@@ -2489,7 +2491,9 @@ class LanceTable(Table):
on_bad_vectors: OnBadVectorsType,
fill_value: float,
):
LOOP.run(self._table._do_merge(merge, new_data, on_bad_vectors, fill_value))
return LOOP.run(
self._table._do_merge(merge, new_data, on_bad_vectors, fill_value)
)
@deprecation.deprecated(
deprecated_in="0.21.0",
@@ -3277,10 +3281,12 @@ class AsyncTable:
>>> table = db.create_table("my_table", data)
>>> new_data = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]})
>>> # Perform a "upsert" operation
>>> table.merge_insert("a") \\
>>> stats = table.merge_insert("a") \\
... .when_matched_update_all() \\
... .when_not_matched_insert_all() \\
... .execute(new_data)
>>> stats
{'num_inserted_rows': 1, 'num_updated_rows': 2, 'num_deleted_rows': 0}
>>> # The order of new rows is non-deterministic since we use
>>> # a hash-join as part of this operation and so we sort here
>>> table.to_arrow().sort_by("a").to_pandas()
@@ -3636,7 +3642,7 @@ class AsyncTable:
)
if isinstance(data, pa.Table):
data = pa.RecordBatchReader.from_batches(data.schema, data.to_batches())
await self._inner.execute_merge_insert(
return await self._inner.execute_merge_insert(
data,
dict(
on=merge._on,

View File

@@ -18,15 +18,19 @@ def test_upsert(mem_db):
{"id": 1, "name": "Bobby"},
{"id": 2, "name": "Charlie"},
]
(
stats = (
table.merge_insert("id")
.when_matched_update_all()
.when_not_matched_insert_all()
.execute(new_users)
)
table.count_rows() # 3
stats # {'num_inserted_rows': 1, 'num_updated_rows': 1, 'num_deleted_rows': 0}
# --8<-- [end:upsert_basic]
assert table.count_rows() == 3
assert stats["num_inserted_rows"] == 1
assert stats["num_updated_rows"] == 1
assert stats["num_deleted_rows"] == 0
@pytest.mark.asyncio
@@ -44,15 +48,19 @@ async def test_upsert_async(mem_db_async):
{"id": 1, "name": "Bobby"},
{"id": 2, "name": "Charlie"},
]
await (
stats = await (
table.merge_insert("id")
.when_matched_update_all()
.when_not_matched_insert_all()
.execute(new_users)
)
await table.count_rows() # 3
stats # {'num_inserted_rows': 1, 'num_updated_rows': 1, 'num_deleted_rows': 0}
# --8<-- [end:upsert_basic_async]
assert await table.count_rows() == 3
assert stats["num_inserted_rows"] == 1
assert stats["num_updated_rows"] == 1
assert stats["num_deleted_rows"] == 0
def test_insert_if_not_exists(mem_db):
@@ -69,10 +77,16 @@ def test_insert_if_not_exists(mem_db):
{"domain": "google.com", "name": "Google"},
{"domain": "facebook.com", "name": "Facebook"},
]
(table.merge_insert("domain").when_not_matched_insert_all().execute(new_domains))
stats = (
table.merge_insert("domain").when_not_matched_insert_all().execute(new_domains)
)
table.count_rows() # 3
stats # {'num_inserted_rows': 1, 'num_updated_rows': 0, 'num_deleted_rows': 0}
# --8<-- [end:insert_if_not_exists]
assert table.count_rows() == 3
assert stats["num_inserted_rows"] == 1
assert stats["num_updated_rows"] == 0
assert stats["num_deleted_rows"] == 0
@pytest.mark.asyncio
@@ -90,12 +104,16 @@ async def test_insert_if_not_exists_async(mem_db_async):
{"domain": "google.com", "name": "Google"},
{"domain": "facebook.com", "name": "Facebook"},
]
await (
stats = await (
table.merge_insert("domain").when_not_matched_insert_all().execute(new_domains)
)
await table.count_rows() # 3
stats # {'num_inserted_rows': 1, 'num_updated_rows': 0, 'num_deleted_rows': 0}
# --8<-- [end:insert_if_not_exists_async]
assert await table.count_rows() == 3
assert stats["num_inserted_rows"] == 1
assert stats["num_updated_rows"] == 0
assert stats["num_deleted_rows"] == 0
def test_replace_range(mem_db):
@@ -113,7 +131,7 @@ def test_replace_range(mem_db):
new_chunks = [
{"doc_id": 1, "chunk_id": 0, "text": "Baz"},
]
(
stats = (
table.merge_insert(["doc_id", "chunk_id"])
.when_matched_update_all()
.when_not_matched_insert_all()
@@ -121,8 +139,12 @@ def test_replace_range(mem_db):
.execute(new_chunks)
)
table.count_rows("doc_id = 1") # 1
stats # {'num_inserted_rows': 0, 'num_updated_rows': 1, 'num_deleted_rows': 1}
# --8<-- [end:replace_range]
assert table.count_rows("doc_id = 1") == 1
assert stats["num_inserted_rows"] == 0
assert stats["num_updated_rows"] == 1
assert stats["num_deleted_rows"] == 1
@pytest.mark.asyncio
@@ -141,7 +163,7 @@ async def test_replace_range_async(mem_db_async):
new_chunks = [
{"doc_id": 1, "chunk_id": 0, "text": "Baz"},
]
await (
stats = await (
table.merge_insert(["doc_id", "chunk_id"])
.when_matched_update_all()
.when_not_matched_insert_all()
@@ -149,5 +171,9 @@ async def test_replace_range_async(mem_db_async):
.execute(new_chunks)
)
await table.count_rows("doc_id = 1") # 1
stats # {'num_inserted_rows': 0, 'num_updated_rows': 1, 'num_deleted_rows': 1}
# --8<-- [end:replace_range_async]
assert await table.count_rows("doc_id = 1") == 1
assert stats["num_inserted_rows"] == 0
assert stats["num_updated_rows"] == 1
assert stats["num_deleted_rows"] == 1

View File

@@ -489,8 +489,14 @@ impl Table {
}
future_into_py(self_.py(), async move {
builder.execute(Box::new(batches)).await.infer_error()?;
Ok(())
let stats = builder.execute(Box::new(batches)).await.infer_error()?;
Python::with_gil(|py| {
let dict = PyDict::new(py);
dict.set_item("num_inserted_rows", stats.num_inserted_rows)?;
dict.set_item("num_updated_rows", stats.num_updated_rows)?;
dict.set_item("num_deleted_rows", stats.num_deleted_rows)?;
Ok(dict.unbind())
})
})
}

View File

@@ -47,6 +47,7 @@ use crate::{
TableDefinition, UpdateBuilder,
},
};
use lance::dataset::MergeStats;
const REQUEST_TIMEOUT_HEADER: HeaderName = HeaderName::from_static("x-request-timeout-ms");
@@ -1022,7 +1023,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
&self,
params: MergeInsertBuilder,
new_data: Box<dyn RecordBatchReader + Send>,
) -> Result<()> {
) -> Result<MergeStats> {
self.check_mutable().await?;
let query = MergeInsertRequest::try_from(params)?;
@@ -1034,9 +1035,11 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
let (request_id, response) = self.send_streaming(request, new_data, true).await?;
// TODO: server can response with these stats in response body.
// We should test that we can handle both empty response from old server
// and response with stats from new server.
self.check_table_response(&request_id, response).await?;
Ok(())
Ok(MergeStats::default())
}
async fn tags(&self) -> Result<Box<dyn Tags + '_>> {
@@ -1348,7 +1351,12 @@ mod tests {
Box::pin(table.count_rows(None).map_ok(|_| ())),
Box::pin(table.update().column("a", "a + 1").execute().map_ok(|_| ())),
Box::pin(table.add(example_data()).execute().map_ok(|_| ())),
Box::pin(table.merge_insert(&["test"]).execute(example_data())),
Box::pin(
table
.merge_insert(&["test"])
.execute(example_data())
.map_ok(|_| ()),
),
Box::pin(table.delete("false")),
Box::pin(table.add_columns(
NewColumnTransform::SqlExpressions(vec![("x".into(), "y".into())]),

View File

@@ -20,6 +20,7 @@ use lance::dataset::cleanup::RemovalStats;
use lance::dataset::optimize::{compact_files, CompactionMetrics, IndexRemapperOptions};
use lance::dataset::scanner::Scanner;
pub use lance::dataset::ColumnAlteration;
pub use lance::dataset::MergeStats;
pub use lance::dataset::NewColumnTransform;
pub use lance::dataset::ReadParams;
pub use lance::dataset::Version;
@@ -487,7 +488,7 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
&self,
params: MergeInsertBuilder,
new_data: Box<dyn RecordBatchReader + Send>,
) -> Result<()>;
) -> Result<MergeStats>;
/// Gets the table tag manager.
async fn tags(&self) -> Result<Box<dyn Tags + '_>>;
/// Optimize the dataset.
@@ -2367,7 +2368,7 @@ impl BaseTable for NativeTable {
&self,
params: MergeInsertBuilder,
new_data: Box<dyn RecordBatchReader + Send>,
) -> Result<()> {
) -> Result<MergeStats> {
let dataset = Arc::new(self.dataset.get().await?.clone());
let mut builder = LanceMergeInsertBuilder::try_new(dataset.clone(), params.on)?;
match (
@@ -2394,9 +2395,9 @@ impl BaseTable for NativeTable {
builder.when_not_matched_by_source(WhenNotMatchedBySource::Keep);
}
let job = builder.try_build()?;
let (new_dataset, _stats) = job.execute_reader(new_data).await?;
let (new_dataset, stats) = job.execute_reader(new_data).await?;
self.dataset.set_latest(new_dataset.as_ref().clone()).await;
Ok(())
Ok(stats)
}
/// Delete rows from the table

View File

@@ -4,6 +4,7 @@
use std::sync::Arc;
use arrow_array::RecordBatchReader;
use lance::dataset::MergeStats;
use crate::Result;
@@ -86,8 +87,9 @@ impl MergeInsertBuilder {
/// Executes the merge insert operation
///
/// Nothing is returned but the [`super::Table`] is updated
pub async fn execute(self, new_data: Box<dyn RecordBatchReader + Send>) -> Result<()> {
/// Returns statistics about the merge operation including the number of rows
/// inserted, updated, and deleted.
pub async fn execute(self, new_data: Box<dyn RecordBatchReader + Send>) -> Result<MergeStats> {
self.table.clone().merge_insert(self, new_data).await
}
}