mirror of
https://github.com/lancedb/lancedb.git
synced 2026-05-31 19:00:39 +00:00
## Summary When an `LsmWriteSpec` is installed on a table (#3396), `merge_insert` upsert calls are dispatched through Lance's MemWAL `ShardWriter` (LSM-style append) instead of the standard merge path. - **`use_lsm_write`** — a `merge_insert` builder option, default `true`; set it `false` to use the standard path for a call even when a spec is set. - **`assume_pre_sharded`** — a `merge_insert` builder option, default `false`; skips the per-row shard check and routes by the first row only. - **`close_lsm_writers`** — drains and closes the table's cached MemWAL shard writers. - The `merge_insert` **`on`** columns default to, and are validated against, the table's unenforced primary key. - Shard writers are cached alongside the dataset (in `DatasetConsistencyWrapper`) and reused for the session. - `MergeResult` gains **`num_rows`** — on the LSM path the insert/update breakdown is unknown until compaction, so only the total is reported. Routing covers all three sharding strategies — bucket (murmur3, Iceberg-compatible), identity, and unsharded. Each `merge_insert` call targets a single shard; the whole input is collected and validated before a single atomic `ShardWriter::put`, so a validation failure leaves the MemWAL untouched. Bindings: Python (`merge_insert(...).use_lsm_write(...)` / `.assume_pre_sharded(...)`, `Table.close_lsm_writers`) and TypeScript (`mergeInsert(...).useLsmWrite(...)` / `.assumePreSharded(...)`, `Table.closeLsmWriters`). ## Context Reconstructed from the original #3354 branch onto current `main`: the branch predated the #3394 (unenforced primary key) / #3396 (`LsmWriteSpec`) split and has been rebuilt on that merged foundation. Depends on Lance `v7.0.0-beta.13`. The MemWAL read path (reading un-flushed shard data back into queries) and remote (LanceDB Cloud) LSM support are follow-ups. --------- Co-authored-by: Jack Ye <yezhaoqin@gmail.com>
166 lines
5.8 KiB
TypeScript
166 lines
5.8 KiB
TypeScript
// SPDX-License-Identifier: Apache-2.0
|
|
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
|
import { Data, Schema, fromDataToBuffer } from "./arrow";
|
|
import { MergeResult, NativeMergeInsertBuilder } from "./native";
|
|
|
|
/** A builder used to create and run a merge insert operation */
|
|
export class MergeInsertBuilder {
|
|
#native: NativeMergeInsertBuilder;
|
|
#schema: Schema | Promise<Schema>;
|
|
|
|
/** Construct a MergeInsertBuilder. __Internal use only.__ */
|
|
constructor(
|
|
native: NativeMergeInsertBuilder,
|
|
schema: Schema | Promise<Schema>,
|
|
) {
|
|
this.#native = native;
|
|
this.#schema = schema;
|
|
}
|
|
|
|
/**
|
|
* Rows that exist in both the source table (new data) and
|
|
* the target table (old data) will be updated, replacing
|
|
* the old row with the corresponding matching row.
|
|
*
|
|
* If there are multiple matches then the behavior is undefined.
|
|
* Currently this causes multiple copies of the row to be created
|
|
* but that behavior is subject to change.
|
|
*
|
|
* An optional condition may be specified. If it is, then only
|
|
* matched rows that satisfy the condtion will be updated. Any
|
|
* rows that do not satisfy the condition will be left as they
|
|
* are. Failing to satisfy the condition does not cause a
|
|
* "matched row" to become a "not matched" row.
|
|
*
|
|
* The condition should be an SQL string. Use the prefix
|
|
* target. to refer to rows in the target table (old data)
|
|
* and the prefix source. to refer to rows in the source
|
|
* table (new data).
|
|
*
|
|
* For example, "target.last_update < source.last_update"
|
|
*/
|
|
whenMatchedUpdateAll(options?: { where: string }): MergeInsertBuilder {
|
|
return new MergeInsertBuilder(
|
|
this.#native.whenMatchedUpdateAll(options?.where),
|
|
this.#schema,
|
|
);
|
|
}
|
|
/**
|
|
* Rows that exist only in the source table (new data) should
|
|
* be inserted into the target table.
|
|
*/
|
|
whenNotMatchedInsertAll(): MergeInsertBuilder {
|
|
return new MergeInsertBuilder(
|
|
this.#native.whenNotMatchedInsertAll(),
|
|
this.#schema,
|
|
);
|
|
}
|
|
/**
|
|
* Rows that exist only in the target table (old data) will be
|
|
* deleted. An optional condition can be provided to limit what
|
|
* data is deleted.
|
|
*
|
|
* @param options.where - An optional condition to limit what data is deleted
|
|
*/
|
|
whenNotMatchedBySourceDelete(options?: {
|
|
where: string;
|
|
}): MergeInsertBuilder {
|
|
return new MergeInsertBuilder(
|
|
this.#native.whenNotMatchedBySourceDelete(options?.where),
|
|
this.#schema,
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Controls whether to use indexes for the merge operation.
|
|
*
|
|
* When set to `true` (the default), the operation will use an index if available
|
|
* on the join key for improved performance. When set to `false`, it forces a full
|
|
* table scan even if an index exists. This can be useful for benchmarking or when
|
|
* the query optimizer chooses a suboptimal path.
|
|
*
|
|
* @param useIndex - Whether to use indices for the merge operation. Defaults to `true`.
|
|
*/
|
|
useIndex(useIndex: boolean): MergeInsertBuilder {
|
|
return new MergeInsertBuilder(
|
|
this.#native.useIndex(useIndex),
|
|
this.#schema,
|
|
);
|
|
}
|
|
/**
|
|
* Controls whether the merge uses the MemWAL LSM write path.
|
|
*
|
|
* By default (unset), a `mergeInsert` on a table with an LSM write spec is
|
|
* routed through Lance's MemWAL shard writer, and a table without one uses
|
|
* the standard path. Pass `false` to force the standard path even when a
|
|
* spec is set. Pass `true` to require a spec — `mergeInsert` rejects if none
|
|
* is installed.
|
|
*
|
|
* @param useLsmWrite - Whether to use the LSM write path.
|
|
*/
|
|
useLsmWrite(useLsmWrite: boolean): MergeInsertBuilder {
|
|
return new MergeInsertBuilder(
|
|
this.#native.useLsmWrite(useLsmWrite),
|
|
this.#schema,
|
|
);
|
|
}
|
|
/**
|
|
* Controls how an LSM merge checks that its input targets a single shard.
|
|
*
|
|
* When a table has an LSM write spec, every row in a `mergeInsert` call must
|
|
* route to the same shard. When `true` (the default), every row is inspected
|
|
* to verify this. When `false`, only the first row is inspected and the
|
|
* shard it routes to is used for the whole input — a faster path for callers
|
|
* that have already pre-sharded their input. Has no effect on tables without
|
|
* an LSM write spec.
|
|
*
|
|
* @param validateSingleShard - Whether to check every row routes to one shard. Defaults to `true`.
|
|
*/
|
|
validateSingleShard(validateSingleShard: boolean): MergeInsertBuilder {
|
|
return new MergeInsertBuilder(
|
|
this.#native.validateSingleShard(validateSingleShard),
|
|
this.#schema,
|
|
);
|
|
}
|
|
/**
|
|
* Executes the merge insert operation
|
|
*
|
|
* @returns {Promise<MergeResult>} the merge result
|
|
*/
|
|
async execute(
|
|
data: Data,
|
|
execOptions?: Partial<WriteExecutionOptions>,
|
|
): Promise<MergeResult> {
|
|
let schema: Schema;
|
|
if (this.#schema instanceof Promise) {
|
|
schema = await this.#schema;
|
|
this.#schema = schema; // In case of future calls
|
|
} else {
|
|
schema = this.#schema;
|
|
}
|
|
|
|
if (execOptions?.timeoutMs !== undefined) {
|
|
this.#native.setTimeout(execOptions.timeoutMs);
|
|
}
|
|
|
|
const buffer = await fromDataToBuffer(data, undefined, schema);
|
|
return await this.#native.execute(buffer);
|
|
}
|
|
}
|
|
|
|
export interface WriteExecutionOptions {
|
|
/**
|
|
* Maximum time to run the operation before cancelling it.
|
|
*
|
|
* By default, there is a 30-second timeout that is only enforced after the
|
|
* first attempt. This is to prevent spending too long retrying to resolve
|
|
* conflicts. For example, if a write attempt takes 20 seconds and fails,
|
|
* the second attempt will be cancelled after 10 seconds, hitting the
|
|
* 30-second timeout. However, a write that takes one hour and succeeds on the
|
|
* first attempt will not be cancelled.
|
|
*
|
|
* When this is set, the timeout is enforced on all attempts, including the first.
|
|
*/
|
|
timeoutMs?: number;
|
|
}
|