From e2e45dd5a64c7a8055498d6251ff6692b639c537 Mon Sep 17 00:00:00 2001 From: albertlockett Date: Wed, 31 Jan 2024 16:41:56 -0500 Subject: [PATCH] merge insert --- node/src/index.ts | 67 +++++++++++++++++++++++++++++++ node/src/remote/client.ts | 2 +- node/src/remote/index.ts | 34 ++++++++++++++++ rust/vectordb/src/lib.rs | 1 + rust/vectordb/src/merge_insert.rs | 63 +++++++++++++++++++++++++++++ 5 files changed, 166 insertions(+), 1 deletion(-) create mode 100644 rust/vectordb/src/merge_insert.rs diff --git a/node/src/index.ts b/node/src/index.ts index ed34013a1..810d25e0f 100644 --- a/node/src/index.ts +++ b/node/src/index.ts @@ -451,6 +451,11 @@ export interface Table { indexStats: (indexUuid: string) => Promise filter(value: string): Query + + /** + * TODO comment + */ + mergeInsert: () => MergeInsertBuilder schema: Promise } @@ -900,6 +905,15 @@ export class LocalTable implements Table { return false } } + + mergeInsert: () => MergeInsertBuilder = () => { + return new MergeInsertBuilder(async (args: { + params: MergeInsertParams + data: Array> | ArrowTable + }) => { + throw new Error('Not implemented') + }) + } } export interface CleanupStats { @@ -1076,3 +1090,56 @@ export enum MetricType { */ Dot = 'dot', } + +export interface MergeInsertParams { + whenMatchedUpdateAll: boolean + whenNotMatchedInsertAll: boolean + whenNotMatchedBySourceDelete: boolean + whenNotMatchedBySourceCondition: boolean +} + +type MergeInsertCallback = (args: { + params: MergeInsertParams + data: Array> | ArrowTable +}) => Promise + +export class MergeInsertBuilder { + readonly #callback: MergeInsertCallback + readonly #params: MergeInsertParams + + constructor (callback: MergeInsertCallback) { + this.#callback = callback + this.#params = { + whenMatchedUpdateAll: false, + whenNotMatchedInsertAll: false, + whenNotMatchedBySourceDelete: false, + whenNotMatchedBySourceCondition: false + } + } + + whenMatchedUpdateAll (): MergeInsertBuilder { + this.#params.whenMatchedUpdateAll = true + return this + } + + whenNotMatchedInsertAll (): MergeInsertBuilder { + this.#params.whenNotMatchedInsertAll = true + return this + } + + whenNotMatchedBySourceDelete (): MergeInsertBuilder { + this.#params.whenNotMatchedBySourceDelete = true + return this + } + + whenNotMatchedBySourceCondition (): MergeInsertBuilder { + this.#params.whenNotMatchedBySourceCondition = true + return this + } + + async execute ({ data }: { + data: Array> | ArrowTable + }): Promise { + await this.#callback({ params: this.#params, data }) + } +} diff --git a/node/src/remote/client.ts b/node/src/remote/client.ts index 3d4d59a28..6c55801ae 100644 --- a/node/src/remote/client.ts +++ b/node/src/remote/client.ts @@ -120,7 +120,7 @@ export class HttpLancedbClient { public async post ( path: string, data?: any, - params?: Record, + params?: Record, content?: string | undefined ): Promise { const response = await axios.post( diff --git a/node/src/remote/index.ts b/node/src/remote/index.ts index b08d9e6c5..e820b300d 100644 --- a/node/src/remote/index.ts +++ b/node/src/remote/index.ts @@ -24,6 +24,8 @@ import { type IndexStats, type UpdateArgs, type UpdateSqlArgs, + type MergeInsertParams, + MergeInsertBuilder, makeArrowTable } from '../index' import { Query } from '../query' @@ -424,4 +426,36 @@ export class RemoteTable implements Table { numUnindexedRows: results.data.num_unindexed_rows } } + + mergeInsert: () => MergeInsertBuilder = () => { + return new MergeInsertBuilder(async ({ data, params }: { + params: MergeInsertParams + data: Array> | ArrowTable + }) => { + // TODO -- uncomment this this + // let tbl: ArrowTable + // if (data instanceof ArrowTable) { + // tbl = data + // } else { + // tbl = makeArrowTable(data, await this.schema) + // } + const tbl = data as ArrowTable + + const buffer = await fromTableToStreamBuffer(tbl, this._embeddings) + + console.log({ buffer }) + + await this._client.post( + `/v1/table/${this._name}/merge_insert/`, + buffer, + { + when_matched_update_all: params.whenMatchedUpdateAll, + when_not_matched_insert_all: params.whenNotMatchedInsertAll, + when_not_matched_by_source_delete: params.whenNotMatchedBySourceDelete, + when_not_matched_by_source_condition: params.whenNotMatchedBySourceCondition + }, + 'application/vnd.apache.arrow.stream' + ) + }) + } } diff --git a/rust/vectordb/src/lib.rs b/rust/vectordb/src/lib.rs index fc4ac1495..361cca7a5 100644 --- a/rust/vectordb/src/lib.rs +++ b/rust/vectordb/src/lib.rs @@ -175,6 +175,7 @@ pub mod error; pub mod index; pub mod io; pub mod ipc; +pub mod merge_insert; pub mod query; pub mod table; pub mod utils; diff --git a/rust/vectordb/src/merge_insert.rs b/rust/vectordb/src/merge_insert.rs new file mode 100644 index 000000000..ab3cb4fe0 --- /dev/null +++ b/rust/vectordb/src/merge_insert.rs @@ -0,0 +1,63 @@ +// Copyright 2024 Lance Developers. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use arrow_array::RecordBatchReader; +use lance::dataset; + +use crate::TableRef; +use crate::error::{Error, Result}; + +pub struct MergeInsertBuilder { + table: TableRef, + when_matched_update_all: bool, + when_not_matched_insert_all: bool, + when_not_matched_by_source_delete: bool, + when_not_matched_by_source_condition: bool, +} + +impl MergeInsertBuilder { + pub(crate) fn new(table: TableRef) -> Self { + Self { + table, + when_matched_update_all: false, + when_not_matched_insert_all: false, + when_not_matched_by_source_delete: false, + when_not_matched_by_source_condition: false, + } + } + + pub fn when_matched_update_all(mut self) -> Self { + self.when_matched_update_all = true; + self + } + + pub fn when_not_matched_insert_all(mut self) -> Self { + self.when_not_matched_insert_all = true; + self + } + + pub fn when_not_matched_by_source_delete(mut self) -> Self { + self.when_not_matched_by_source_delete = true; + self + } + + pub fn when_not_matched_by_source_condition(mut self) -> Self { + self.when_not_matched_by_source_condition = true; + self + } + + pub async fn execute(batches: Box) -> Result<()> { + Ok(()) + } +} \ No newline at end of file