feat(nodejs): merge insert (#1351)

closes https://github.com/lancedb/lancedb/issues/1349
This commit is contained in:
Cory Grinstead
2024-06-11 15:05:15 -05:00
committed by GitHub
parent 8e348ab4bd
commit bc19a75f65
8 changed files with 273 additions and 2 deletions

View File

@@ -132,6 +132,140 @@ describe.each([arrow, arrowOld])("Given a table", (arrow: any) => {
});
});
describe("merge insert", () => {
let tmpDir: tmp.DirResult;
let table: Table;
beforeEach(async () => {
tmpDir = tmp.dirSync({ unsafeCleanup: true });
const conn = await connect(tmpDir.name);
table = await conn.createTable("some_table", [
{ a: 1, b: "a" },
{ a: 2, b: "b" },
{ a: 3, b: "c" },
]);
});
afterEach(() => tmpDir.removeCallback());
test("upsert", async () => {
const newData = [
{ a: 2, b: "x" },
{ a: 3, b: "y" },
{ a: 4, b: "z" },
];
await table
.mergeInsert("a")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute(newData);
const expected = [
{ a: 1, b: "a" },
{ a: 2, b: "x" },
{ a: 3, b: "y" },
{ a: 4, b: "z" },
];
expect(
JSON.parse(JSON.stringify((await table.toArrow()).toArray())),
).toEqual(expected);
});
test("conditional update", async () => {
const newData = [
{ a: 2, b: "x" },
{ a: 3, b: "y" },
{ a: 4, b: "z" },
];
await table
.mergeInsert("a")
.whenMatchedUpdateAll({ where: "target.b = 'b'" })
.execute(newData);
const expected = [
{ a: 1, b: "a" },
{ a: 2, b: "x" },
{ a: 3, b: "c" },
];
// round trip to arrow and back to json to avoid comparing arrow objects to js object
// biome-ignore lint/suspicious/noExplicitAny: test
let res: any[] = JSON.parse(
JSON.stringify((await table.toArrow()).toArray()),
);
res = res.sort((a, b) => a.a - b.a);
expect(res).toEqual(expected);
});
test("insert if not exists", async () => {
const newData = [
{ a: 2, b: "x" },
{ a: 3, b: "y" },
{ a: 4, b: "z" },
];
await table.mergeInsert("a").whenNotMatchedInsertAll().execute(newData);
const expected = [
{ a: 1, b: "a" },
{ a: 2, b: "b" },
{ a: 3, b: "c" },
{ a: 4, b: "z" },
];
// biome-ignore lint/suspicious/noExplicitAny: <explanation>
let res: any[] = JSON.parse(
JSON.stringify((await table.toArrow()).toArray()),
);
res = res.sort((a, b) => a.a - b.a);
expect(res).toEqual(expected);
});
test("replace range", async () => {
const newData = [
{ a: 2, b: "x" },
{ a: 4, b: "z" },
];
await table
.mergeInsert("a")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete({ where: "a > 2" })
.execute(newData);
const expected = [
{ a: 1, b: "a" },
{ a: 2, b: "x" },
{ a: 4, b: "z" },
];
// biome-ignore lint/suspicious/noExplicitAny: <explanation>
let res: any[] = JSON.parse(
JSON.stringify((await table.toArrow()).toArray()),
);
res = res.sort((a, b) => a.a - b.a);
expect(res).toEqual(expected);
});
test("replace range no condition", async () => {
const newData = [
{ a: 2, b: "x" },
{ a: 4, b: "z" },
];
await table
.mergeInsert("a")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute(newData);
const expected = [
{ a: 2, b: "x" },
{ a: 4, b: "z" },
];
// biome-ignore lint/suspicious/noExplicitAny: test
let res: any[] = JSON.parse(
JSON.stringify((await table.toArrow()).toArray()),
);
res = res.sort((a, b) => a.a - b.a);
expect(res).toEqual(expected);
});
});
describe("When creating an index", () => {
let tmpDir: tmp.DirResult;
const schema = new Schema([

70
nodejs/lancedb/merge.ts Normal file
View File

@@ -0,0 +1,70 @@
import { Data, fromDataToBuffer } from "./arrow";
import { NativeMergeInsertBuilder } from "./native";
/** A builder used to create and run a merge insert operation */
export class MergeInsertBuilder {
#native: NativeMergeInsertBuilder;
/** Construct a MergeInsertBuilder. __Internal use only.__ */
constructor(native: NativeMergeInsertBuilder) {
this.#native = native;
}
/**
* Rows that exist in both the source table (new data) and
* the target table (old data) will be updated, replacing
* the old row with the corresponding matching row.
*
* If there are multiple matches then the behavior is undefined.
* Currently this causes multiple copies of the row to be created
* but that behavior is subject to change.
*
* An optional condition may be specified. If it is, then only
* matched rows that satisfy the condtion will be updated. Any
* rows that do not satisfy the condition will be left as they
* are. Failing to satisfy the condition does not cause a
* "matched row" to become a "not matched" row.
*
* The condition should be an SQL string. Use the prefix
* target. to refer to rows in the target table (old data)
* and the prefix source. to refer to rows in the source
* table (new data).
*
* For example, "target.last_update < source.last_update"
*/
whenMatchedUpdateAll(options?: { where: string }): MergeInsertBuilder {
return new MergeInsertBuilder(
this.#native.whenMatchedUpdateAll(options?.where),
);
}
/**
* Rows that exist only in the source table (new data) should
* be inserted into the target table.
*/
whenNotMatchedInsertAll(): MergeInsertBuilder {
return new MergeInsertBuilder(this.#native.whenNotMatchedInsertAll());
}
/**
* Rows that exist only in the target table (old data) will be
* deleted. An optional condition can be provided to limit what
* data is deleted.
*
* @param options.where - An optional condition to limit what data is deleted
*/
whenNotMatchedBySourceDelete(options?: {
where: string;
}): MergeInsertBuilder {
return new MergeInsertBuilder(
this.#native.whenNotMatchedBySourceDelete(options?.where),
);
}
/**
* Executes the merge insert operation
*
* Nothing is returned but the `Table` is updated
*/
async execute(data: Data): Promise<void> {
const buffer = await fromDataToBuffer(data);
await this.#native.execute(buffer);
}
}

View File

@@ -23,6 +23,7 @@ import {
import { EmbeddingFunctionConfig, getRegistry } from "./embedding/registry";
import { IndexOptions } from "./indices";
import { MergeInsertBuilder } from "./merge";
import {
AddColumnsSql,
ColumnAlteration,
@@ -478,4 +479,8 @@ export class Table {
async toArrow(): Promise<ArrowTable> {
return await this.query().toArrow();
}
mergeInsert(on: string | string[]): MergeInsertBuilder {
on = Array.isArray(on) ? on : [on];
return new MergeInsertBuilder(this.inner.mergeInsert(on));
}
}

View File

@@ -1,12 +1,12 @@
{
"name": "@lancedb/lancedb",
"version": "0.5.0",
"version": "0.5.1",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@lancedb/lancedb",
"version": "0.5.0",
"version": "0.5.1",
"cpu": [
"x64",
"arm64"

View File

@@ -20,6 +20,7 @@ mod connection;
mod error;
mod index;
mod iterator;
pub mod merge;
mod query;
mod table;
mod util;

53
nodejs/src/merge.rs Normal file
View File

@@ -0,0 +1,53 @@
use lancedb::{arrow::IntoArrow, ipc::ipc_file_to_batches, table::merge::MergeInsertBuilder};
use napi::bindgen_prelude::*;
use napi_derive::napi;
#[napi]
#[derive(Clone)]
/// A builder used to create and run a merge insert operation
pub struct NativeMergeInsertBuilder {
pub(crate) inner: MergeInsertBuilder,
}
#[napi]
impl NativeMergeInsertBuilder {
#[napi]
pub fn when_matched_update_all(&self, condition: Option<String>) -> Self {
let mut this = self.clone();
this.inner.when_matched_update_all(condition);
this
}
#[napi]
pub fn when_not_matched_insert_all(&self) -> Self {
let mut this = self.clone();
this.inner.when_not_matched_insert_all();
this
}
#[napi]
pub fn when_not_matched_by_source_delete(&self, filter: Option<String>) -> Self {
let mut this = self.clone();
this.inner.when_not_matched_by_source_delete(filter);
this
}
#[napi]
pub async fn execute(&self, buf: Buffer) -> napi::Result<()> {
let data = ipc_file_to_batches(buf.to_vec())
.and_then(IntoArrow::into_arrow)
.map_err(|e| napi::Error::from_reason(format!("Failed to read IPC file: {}", e)))?;
let this = self.clone();
this.inner
.execute(data)
.await
.map_err(|e| napi::Error::from_reason(format!("Failed to execute merge insert: {}", e)))
}
}
impl From<MergeInsertBuilder> for NativeMergeInsertBuilder {
fn from(inner: MergeInsertBuilder) -> Self {
Self { inner }
}
}

View File

@@ -23,6 +23,7 @@ use napi_derive::napi;
use crate::error::NapiErrorExt;
use crate::index::Index;
use crate::merge::NativeMergeInsertBuilder;
use crate::query::{Query, VectorQuery};
#[napi]
@@ -328,6 +329,12 @@ impl Table {
.map(IndexConfig::from)
.collect::<Vec<_>>())
}
#[napi]
pub fn merge_insert(&self, on: Vec<String>) -> napi::Result<NativeMergeInsertBuilder> {
let on: Vec<_> = on.iter().map(String::as_str).collect();
Ok(self.inner_ref()?.merge_insert(on.as_slice()).into())
}
}
#[napi(object)]