Compare commits

..

7 Commits

Author SHA1 Message Date
rmeng
93b7ae61be feat: upgrade to 0.12.4 2024-06-20 15:57:57 -04:00
Cory Grinstead
f41eb899dc chore(rust): lock toolchain & fix clippy (#1389)
- fix some clippy errors from ci running a different toolchain. 
- add some saftey notes about some unsafe blocks. 

- locks the toolchain so that it is consistent across dev and CI.
2024-06-20 12:13:03 -05:00
Cory Grinstead
e7022b990e feat(nodejs): feature parity [1/N] - remote table (#1378)
closes https://github.com/lancedb/lancedb/issues/1362
2024-06-17 15:23:27 -05:00
Weston Pace
ea86dad4b7 feat: upgrade lance to 0.12.2-beta.2 (#1381) 2024-06-14 05:43:26 -07:00
harsha-mangena
a45656b8b6 docs: remove code-block:: python from docs (#1366)
- refer #1264
- fixed minor documentation issue
2024-06-11 13:13:02 -07:00
Cory Grinstead
bc19a75f65 feat(nodejs): merge insert (#1351)
closes https://github.com/lancedb/lancedb/issues/1349
2024-06-11 15:05:15 -05:00
Ryan Green
8e348ab4bd fix: use JS naming convention in new index stats fields (#1377)
Changes new index stats fields in node client from snake case to camel
case.
2024-06-10 16:41:31 -02:30
28 changed files with 1343 additions and 251 deletions

View File

@@ -46,6 +46,7 @@ runs:
with: with:
command: build command: build
working-directory: python working-directory: python
docker-options: "-e PIP_EXTRA_INDEX_URL=https://pypi.fury.io/lancedb/"
target: aarch64-unknown-linux-gnu target: aarch64-unknown-linux-gnu
manylinux: ${{ inputs.manylinux }} manylinux: ${{ inputs.manylinux }}
args: ${{ inputs.args }} args: ${{ inputs.args }}

View File

@@ -21,5 +21,6 @@ runs:
with: with:
command: build command: build
args: ${{ inputs.args }} args: ${{ inputs.args }}
docker-options: "-e PIP_EXTRA_INDEX_URL=https://pypi.fury.io/lancedb/"
working-directory: python working-directory: python
interpreter: 3.${{ inputs.python-minor-version }} interpreter: 3.${{ inputs.python-minor-version }}

View File

@@ -26,6 +26,7 @@ runs:
with: with:
command: build command: build
args: ${{ inputs.args }} args: ${{ inputs.args }}
docker-options: "-e PIP_EXTRA_INDEX_URL=https://pypi.fury.io/lancedb/"
working-directory: python working-directory: python
- uses: actions/upload-artifact@v3 - uses: actions/upload-artifact@v3
with: with:

View File

@@ -65,7 +65,7 @@ jobs:
workspaces: python workspaces: python
- name: Install - name: Install
run: | run: |
pip install -e .[tests,dev,embeddings] pip install --extra-index-url https://pypi.fury.io/lancedb/ -e .[tests,dev,embeddings]
pip install tantivy pip install tantivy
pip install mlx pip install mlx
- name: Doctest - name: Doctest
@@ -189,7 +189,7 @@ jobs:
- name: Install lancedb - name: Install lancedb
run: | run: |
pip install "pydantic<2" pip install "pydantic<2"
pip install -e .[tests] pip install --extra-index-url https://pypi.fury.io/lancedb/ -e .[tests]
pip install tantivy pip install tantivy
- name: Run tests - name: Run tests
run: pytest -m "not slow and not s3_test" -x -v --durations=30 python/tests run: pytest -m "not slow and not s3_test" -x -v --durations=30 python/tests

View File

@@ -15,7 +15,7 @@ runs:
- name: Install lancedb - name: Install lancedb
shell: bash shell: bash
run: | run: |
pip3 install $(ls target/wheels/lancedb-*.whl)[tests,dev] pip3 install --extra-index-url https://pypi.fury.io/lancedb/ $(ls target/wheels/lancedb-*.whl)[tests,dev]
- name: Setup localstack for integration tests - name: Setup localstack for integration tests
if: ${{ inputs.integration == 'true' }} if: ${{ inputs.integration == 'true' }}
shell: bash shell: bash

View File

@@ -14,7 +14,7 @@ repos:
hooks: hooks:
- id: local-biome-check - id: local-biome-check
name: biome check name: biome check
entry: npx @biomejs/biome check --config-path nodejs/biome.json nodejs/ entry: npx @biomejs/biome@1.7.3 check --config-path nodejs/biome.json nodejs/
language: system language: system
types: [text] types: [text]
files: "nodejs/.*" files: "nodejs/.*"

View File

@@ -20,11 +20,13 @@ keywords = ["lancedb", "lance", "database", "vector", "search"]
categories = ["database-implementations"] categories = ["database-implementations"]
[workspace.dependencies] [workspace.dependencies]
lance = { "version" = "=0.12.1", "features" = ["dynamodb"] } lance = { "version" = "=0.12.4", "features" = [
lance-index = { "version" = "=0.12.1" } "dynamodb",
lance-linalg = { "version" = "=0.12.1" } ]}
lance-testing = { "version" = "=0.12.1" } lance-index = { "version" = "=0.12.4" }
lance-datafusion = { "version" = "=0.12.1" } lance-linalg = { "version" = "=0.12.4" }
lance-testing = { "version" = "=0.12.4" }
lance-datafusion = { "version" = "=0.12.4" }
# Note that this one does not include pyarrow # Note that this one does not include pyarrow
arrow = { version = "51.0", optional = false } arrow = { version = "51.0", optional = false }
arrow-array = "51.0" arrow-array = "51.0"

View File

@@ -132,6 +132,140 @@ describe.each([arrow, arrowOld])("Given a table", (arrow: any) => {
}); });
}); });
describe("merge insert", () => {
let tmpDir: tmp.DirResult;
let table: Table;
beforeEach(async () => {
tmpDir = tmp.dirSync({ unsafeCleanup: true });
const conn = await connect(tmpDir.name);
table = await conn.createTable("some_table", [
{ a: 1, b: "a" },
{ a: 2, b: "b" },
{ a: 3, b: "c" },
]);
});
afterEach(() => tmpDir.removeCallback());
test("upsert", async () => {
const newData = [
{ a: 2, b: "x" },
{ a: 3, b: "y" },
{ a: 4, b: "z" },
];
await table
.mergeInsert("a")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute(newData);
const expected = [
{ a: 1, b: "a" },
{ a: 2, b: "x" },
{ a: 3, b: "y" },
{ a: 4, b: "z" },
];
expect(
JSON.parse(JSON.stringify((await table.toArrow()).toArray())),
).toEqual(expected);
});
test("conditional update", async () => {
const newData = [
{ a: 2, b: "x" },
{ a: 3, b: "y" },
{ a: 4, b: "z" },
];
await table
.mergeInsert("a")
.whenMatchedUpdateAll({ where: "target.b = 'b'" })
.execute(newData);
const expected = [
{ a: 1, b: "a" },
{ a: 2, b: "x" },
{ a: 3, b: "c" },
];
// round trip to arrow and back to json to avoid comparing arrow objects to js object
// biome-ignore lint/suspicious/noExplicitAny: test
let res: any[] = JSON.parse(
JSON.stringify((await table.toArrow()).toArray()),
);
res = res.sort((a, b) => a.a - b.a);
expect(res).toEqual(expected);
});
test("insert if not exists", async () => {
const newData = [
{ a: 2, b: "x" },
{ a: 3, b: "y" },
{ a: 4, b: "z" },
];
await table.mergeInsert("a").whenNotMatchedInsertAll().execute(newData);
const expected = [
{ a: 1, b: "a" },
{ a: 2, b: "b" },
{ a: 3, b: "c" },
{ a: 4, b: "z" },
];
// biome-ignore lint/suspicious/noExplicitAny: <explanation>
let res: any[] = JSON.parse(
JSON.stringify((await table.toArrow()).toArray()),
);
res = res.sort((a, b) => a.a - b.a);
expect(res).toEqual(expected);
});
test("replace range", async () => {
const newData = [
{ a: 2, b: "x" },
{ a: 4, b: "z" },
];
await table
.mergeInsert("a")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete({ where: "a > 2" })
.execute(newData);
const expected = [
{ a: 1, b: "a" },
{ a: 2, b: "x" },
{ a: 4, b: "z" },
];
// biome-ignore lint/suspicious/noExplicitAny: <explanation>
let res: any[] = JSON.parse(
JSON.stringify((await table.toArrow()).toArray()),
);
res = res.sort((a, b) => a.a - b.a);
expect(res).toEqual(expected);
});
test("replace range no condition", async () => {
const newData = [
{ a: 2, b: "x" },
{ a: 4, b: "z" },
];
await table
.mergeInsert("a")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute(newData);
const expected = [
{ a: 2, b: "x" },
{ a: 4, b: "z" },
];
// biome-ignore lint/suspicious/noExplicitAny: test
let res: any[] = JSON.parse(
JSON.stringify((await table.toArrow()).toArray()),
);
res = res.sort((a, b) => a.a - b.a);
expect(res).toEqual(expected);
});
});
describe("When creating an index", () => { describe("When creating an index", () => {
let tmpDir: tmp.DirResult; let tmpDir: tmp.DirResult;
const schema = new Schema([ const schema = new Schema([

View File

@@ -77,7 +77,7 @@
"noDuplicateObjectKeys": "error", "noDuplicateObjectKeys": "error",
"noDuplicateParameters": "error", "noDuplicateParameters": "error",
"noEmptyBlockStatements": "error", "noEmptyBlockStatements": "error",
"noExplicitAny": "error", "noExplicitAny": "warn",
"noExtraNonNullAssertion": "error", "noExtraNonNullAssertion": "error",
"noFallthroughSwitchClause": "error", "noFallthroughSwitchClause": "error",
"noFunctionAssign": "error", "noFunctionAssign": "error",

View File

@@ -13,37 +13,10 @@
// limitations under the License. // limitations under the License.
import { Table as ArrowTable, Schema } from "./arrow"; import { Table as ArrowTable, Schema } from "./arrow";
import { import { fromTableToBuffer, makeEmptyTable } from "./arrow";
fromTableToBuffer,
isArrowTable,
makeArrowTable,
makeEmptyTable,
} from "./arrow";
import { EmbeddingFunctionConfig, getRegistry } from "./embedding/registry"; import { EmbeddingFunctionConfig, getRegistry } from "./embedding/registry";
import { ConnectionOptions, Connection as LanceDbConnection } from "./native"; import { Connection as LanceDbConnection } from "./native";
import { Table } from "./table"; import { LocalTable, Table } from "./table";
/**
* Connect to a LanceDB instance at the given URI.
*
* Accepted formats:
*
* - `/path/to/database` - local database
* - `s3://bucket/path/to/database` or `gs://bucket/path/to/database` - database on cloud storage
* - `db://host:port` - remote database (LanceDB cloud)
* @param {string} uri - The uri of the database. If the database uri starts
* with `db://` then it connects to a remote database.
* @see {@link ConnectionOptions} for more details on the URI format.
*/
export async function connect(
uri: string,
opts?: Partial<ConnectionOptions>,
): Promise<Connection> {
opts = opts ?? {};
opts.storageOptions = cleanseStorageOptions(opts.storageOptions);
const nativeConn = await LanceDbConnection.new(uri, opts);
return new Connection(nativeConn);
}
export interface CreateTableOptions { export interface CreateTableOptions {
/** /**
@@ -117,7 +90,6 @@ export interface TableNamesOptions {
/** An optional limit to the number of results to return. */ /** An optional limit to the number of results to return. */
limit?: number; limit?: number;
} }
/** /**
* A LanceDB Connection that allows you to open tables and create new ones. * A LanceDB Connection that allows you to open tables and create new ones.
* *
@@ -136,17 +108,15 @@ export interface TableNamesOptions {
* Any created tables are independent and will continue to work even if * Any created tables are independent and will continue to work even if
* the underlying connection has been closed. * the underlying connection has been closed.
*/ */
export class Connection { export abstract class Connection {
readonly inner: LanceDbConnection; [Symbol.for("nodejs.util.inspect.custom")](): string {
return this.display();
constructor(inner: LanceDbConnection) {
this.inner = inner;
} }
/** Return true if the connection has not been closed */ /**
isOpen(): boolean { * Return true if the connection has not been closed
return this.inner.isOpen(); */
} abstract isOpen(): boolean;
/** /**
* Close the connection, releasing any underlying resources. * Close the connection, releasing any underlying resources.
@@ -155,14 +125,12 @@ export class Connection {
* *
* Any attempt to use the connection after it is closed will result in an error. * Any attempt to use the connection after it is closed will result in an error.
*/ */
close(): void { abstract close(): void;
this.inner.close();
}
/** Return a brief description of the connection */ /**
display(): string { * Return a brief description of the connection
return this.inner.display(); */
} abstract display(): string;
/** /**
* List all the table names in this database. * List all the table names in this database.
@@ -170,15 +138,73 @@ export class Connection {
* Tables will be returned in lexicographical order. * Tables will be returned in lexicographical order.
* @param {Partial<TableNamesOptions>} options - options to control the * @param {Partial<TableNamesOptions>} options - options to control the
* paging / start point * paging / start point
*
*/ */
async tableNames(options?: Partial<TableNamesOptions>): Promise<string[]> { abstract tableNames(options?: Partial<TableNamesOptions>): Promise<string[]>;
return this.inner.tableNames(options?.startAfter, options?.limit);
}
/** /**
* Open a table in the database. * Open a table in the database.
* @param {string} name - The name of the table * @param {string} name - The name of the table
*/ */
abstract openTable(
name: string,
options?: Partial<OpenTableOptions>,
): Promise<Table>;
/**
* Creates a new Table and initialize it with new data.
* @param {string} name - The name of the table.
* @param {Record<string, unknown>[] | ArrowTable} data - Non-empty Array of Records
* to be inserted into the table
*/
abstract createTable(
name: string,
data: Record<string, unknown>[] | ArrowTable,
options?: Partial<CreateTableOptions>,
): Promise<Table>;
/**
* Creates a new empty Table
* @param {string} name - The name of the table.
* @param {Schema} schema - The schema of the table
*/
abstract createEmptyTable(
name: string,
schema: Schema,
options?: Partial<CreateTableOptions>,
): Promise<Table>;
/**
* Drop an existing table.
* @param {string} name The name of the table to drop.
*/
abstract dropTable(name: string): Promise<void>;
}
export class LocalConnection extends Connection {
readonly inner: LanceDbConnection;
constructor(inner: LanceDbConnection) {
super();
this.inner = inner;
}
isOpen(): boolean {
return this.inner.isOpen();
}
close(): void {
this.inner.close();
}
display(): string {
return this.inner.display();
}
async tableNames(options?: Partial<TableNamesOptions>): Promise<string[]> {
return this.inner.tableNames(options?.startAfter, options?.limit);
}
async openTable( async openTable(
name: string, name: string,
options?: Partial<OpenTableOptions>, options?: Partial<OpenTableOptions>,
@@ -189,39 +215,15 @@ export class Connection {
options?.indexCacheSize, options?.indexCacheSize,
); );
return new Table(innerTable); return new LocalTable(innerTable);
} }
/**
* Creates a new Table and initialize it with new data.
* @param {string} name - The name of the table.
* @param {Record<string, unknown>[] | ArrowTable} data - Non-empty Array of Records
* to be inserted into the table
*/
async createTable( async createTable(
name: string, name: string,
data: Record<string, unknown>[] | ArrowTable, data: Record<string, unknown>[] | ArrowTable,
options?: Partial<CreateTableOptions>, options?: Partial<CreateTableOptions>,
): Promise<Table> { ): Promise<Table> {
let mode: string = options?.mode ?? "create"; const { buf, mode } = await Table.parseTableData(data, options);
const existOk = options?.existOk ?? false;
if (mode === "create" && existOk) {
mode = "exist_ok";
}
let table: ArrowTable;
if (isArrowTable(data)) {
table = data;
} else {
table = makeArrowTable(data, options);
}
const buf = await fromTableToBuffer(
table,
options?.embeddingFunction,
options?.schema,
);
const innerTable = await this.inner.createTable( const innerTable = await this.inner.createTable(
name, name,
buf, buf,
@@ -230,14 +232,9 @@ export class Connection {
options?.useLegacyFormat, options?.useLegacyFormat,
); );
return new Table(innerTable); return new LocalTable(innerTable);
} }
/**
* Creates a new empty Table
* @param {string} name - The name of the table.
* @param {Schema} schema - The schema of the table
*/
async createEmptyTable( async createEmptyTable(
name: string, name: string,
schema: Schema, schema: Schema,
@@ -265,13 +262,9 @@ export class Connection {
cleanseStorageOptions(options?.storageOptions), cleanseStorageOptions(options?.storageOptions),
options?.useLegacyFormat, options?.useLegacyFormat,
); );
return new Table(innerTable); return new LocalTable(innerTable);
} }
/**
* Drop an existing table.
* @param {string} name The name of the table to drop.
*/
async dropTable(name: string): Promise<void> { async dropTable(name: string): Promise<void> {
return this.inner.dropTable(name); return this.inner.dropTable(name);
} }
@@ -280,7 +273,7 @@ export class Connection {
/** /**
* Takes storage options and makes all the keys snake case. * Takes storage options and makes all the keys snake case.
*/ */
function cleanseStorageOptions( export function cleanseStorageOptions(
options?: Record<string, string>, options?: Record<string, string>,
): Record<string, string> | undefined { ): Record<string, string> | undefined {
if (options === undefined) { if (options === undefined) {

View File

@@ -12,6 +12,19 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
import {
Connection,
LocalConnection,
cleanseStorageOptions,
} from "./connection";
import {
ConnectionOptions,
Connection as LanceDbConnection,
} from "./native.js";
import { RemoteConnection, RemoteConnectionOptions } from "./remote";
export { export {
WriteOptions, WriteOptions,
WriteMode, WriteMode,
@@ -19,18 +32,20 @@ export {
ColumnAlteration, ColumnAlteration,
ConnectionOptions, ConnectionOptions,
} from "./native.js"; } from "./native.js";
export { export {
makeArrowTable, makeArrowTable,
MakeArrowTableOptions, MakeArrowTableOptions,
Data, Data,
VectorColumnOptions, VectorColumnOptions,
} from "./arrow"; } from "./arrow";
export { export {
connect,
Connection, Connection,
CreateTableOptions, CreateTableOptions,
TableNamesOptions, TableNamesOptions,
} from "./connection"; } from "./connection";
export { export {
ExecutableQuery, ExecutableQuery,
Query, Query,
@@ -38,6 +53,46 @@ export {
VectorQuery, VectorQuery,
RecordBatchIterator, RecordBatchIterator,
} from "./query"; } from "./query";
export { Index, IndexOptions, IvfPqOptions } from "./indices"; export { Index, IndexOptions, IvfPqOptions } from "./indices";
export { Table, AddDataOptions, IndexConfig, UpdateOptions } from "./table";
export {
Table,
AddDataOptions,
IndexConfig,
UpdateOptions,
} from "./table";
export * as embedding from "./embedding"; export * as embedding from "./embedding";
/**
* Connect to a LanceDB instance at the given URI.
*
* Accepted formats:
*
* - `/path/to/database` - local database
* - `s3://bucket/path/to/database` or `gs://bucket/path/to/database` - database on cloud storage
* - `db://host:port` - remote database (LanceDB cloud)
* @param {string} uri - The uri of the database. If the database uri starts
* with `db://` then it connects to a remote database.
* @see {@link ConnectionOptions} for more details on the URI format.
*/
export async function connect(
uri: string,
opts?: Partial<ConnectionOptions | RemoteConnectionOptions>,
): Promise<Connection> {
if (!uri) {
throw new Error("uri is required");
}
opts = opts ?? {};
if (uri?.startsWith("db://")) {
return new RemoteConnection(uri, opts as RemoteConnectionOptions);
}
opts = (opts as ConnectionOptions) ?? {};
(<ConnectionOptions>opts).storageOptions = cleanseStorageOptions(
(<ConnectionOptions>opts).storageOptions,
);
const nativeConn = await LanceDbConnection.new(uri, opts);
return new LocalConnection(nativeConn);
}

70
nodejs/lancedb/merge.ts Normal file
View File

@@ -0,0 +1,70 @@
import { Data, fromDataToBuffer } from "./arrow";
import { NativeMergeInsertBuilder } from "./native";
/** A builder used to create and run a merge insert operation */
export class MergeInsertBuilder {
#native: NativeMergeInsertBuilder;
/** Construct a MergeInsertBuilder. __Internal use only.__ */
constructor(native: NativeMergeInsertBuilder) {
this.#native = native;
}
/**
* Rows that exist in both the source table (new data) and
* the target table (old data) will be updated, replacing
* the old row with the corresponding matching row.
*
* If there are multiple matches then the behavior is undefined.
* Currently this causes multiple copies of the row to be created
* but that behavior is subject to change.
*
* An optional condition may be specified. If it is, then only
* matched rows that satisfy the condtion will be updated. Any
* rows that do not satisfy the condition will be left as they
* are. Failing to satisfy the condition does not cause a
* "matched row" to become a "not matched" row.
*
* The condition should be an SQL string. Use the prefix
* target. to refer to rows in the target table (old data)
* and the prefix source. to refer to rows in the source
* table (new data).
*
* For example, "target.last_update < source.last_update"
*/
whenMatchedUpdateAll(options?: { where: string }): MergeInsertBuilder {
return new MergeInsertBuilder(
this.#native.whenMatchedUpdateAll(options?.where),
);
}
/**
* Rows that exist only in the source table (new data) should
* be inserted into the target table.
*/
whenNotMatchedInsertAll(): MergeInsertBuilder {
return new MergeInsertBuilder(this.#native.whenNotMatchedInsertAll());
}
/**
* Rows that exist only in the target table (old data) will be
* deleted. An optional condition can be provided to limit what
* data is deleted.
*
* @param options.where - An optional condition to limit what data is deleted
*/
whenNotMatchedBySourceDelete(options?: {
where: string;
}): MergeInsertBuilder {
return new MergeInsertBuilder(
this.#native.whenNotMatchedBySourceDelete(options?.where),
);
}
/**
* Executes the merge insert operation
*
* Nothing is returned but the `Table` is updated
*/
async execute(data: Data): Promise<void> {
const buffer = await fromDataToBuffer(data);
await this.#native.execute(buffer);
}
}

View File

@@ -0,0 +1,221 @@
// Copyright 2023 LanceDB Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import axios, {
AxiosError,
type AxiosResponse,
type ResponseType,
} from "axios";
import { Table as ArrowTable } from "../arrow";
import { tableFromIPC } from "../arrow";
import { VectorQuery } from "../query";
export class RestfulLanceDBClient {
#dbName: string;
#region: string;
#apiKey: string;
#hostOverride?: string;
#closed: boolean = false;
#connectionTimeout: number = 12 * 1000; // 12 seconds;
#readTimeout: number = 30 * 1000; // 30 seconds;
#session?: import("axios").AxiosInstance;
constructor(
dbName: string,
apiKey: string,
region: string,
hostOverride?: string,
connectionTimeout?: number,
readTimeout?: number,
) {
this.#dbName = dbName;
this.#apiKey = apiKey;
this.#region = region;
this.#hostOverride = hostOverride ?? this.#hostOverride;
this.#connectionTimeout = connectionTimeout ?? this.#connectionTimeout;
this.#readTimeout = readTimeout ?? this.#readTimeout;
}
// todo: cache the session.
get session(): import("axios").AxiosInstance {
if (this.#session !== undefined) {
return this.#session;
} else {
return axios.create({
baseURL: this.url,
headers: {
// biome-ignore lint/style/useNamingConvention: external api
Authorization: `Bearer ${this.#apiKey}`,
},
transformResponse: decodeErrorData,
timeout: this.#connectionTimeout,
});
}
}
get url(): string {
return (
this.#hostOverride ??
`https://${this.#dbName}.${this.#region}.api.lancedb.com`
);
}
get headers(): { [key: string]: string } {
const headers: { [key: string]: string } = {
"x-api-key": this.#apiKey,
"x-request-id": "na",
};
if (this.#region == "local") {
headers["Host"] = `${this.#dbName}.${this.#region}.api.lancedb.com`;
}
if (this.#hostOverride) {
headers["x-lancedb-database"] = this.#dbName;
}
return headers;
}
isOpen(): boolean {
return !this.#closed;
}
private checkNotClosed(): void {
if (this.#closed) {
throw new Error("Connection is closed");
}
}
close(): void {
this.#session = undefined;
this.#closed = true;
}
// biome-ignore lint/suspicious/noExplicitAny: <explanation>
async get(uri: string, params?: Record<string, any>): Promise<any> {
this.checkNotClosed();
uri = new URL(uri, this.url).toString();
let response;
try {
response = await this.session.get(uri, {
headers: this.headers,
params,
});
} catch (e) {
if (e instanceof AxiosError) {
response = e.response;
} else {
throw e;
}
}
RestfulLanceDBClient.checkStatus(response!);
return response!.data;
}
// biome-ignore lint/suspicious/noExplicitAny: api response
async post(uri: string, body?: any): Promise<any>;
async post(
uri: string,
// biome-ignore lint/suspicious/noExplicitAny: api request
body: any,
additional: {
config?: { responseType: "arraybuffer" };
headers?: Record<string, string>;
params?: Record<string, string>;
},
): Promise<Buffer>;
async post(
uri: string,
// biome-ignore lint/suspicious/noExplicitAny: api request
body?: any,
additional?: {
config?: { responseType: ResponseType };
headers?: Record<string, string>;
params?: Record<string, string>;
},
// biome-ignore lint/suspicious/noExplicitAny: api response
): Promise<any> {
this.checkNotClosed();
uri = new URL(uri, this.url).toString();
additional = Object.assign(
{ config: { responseType: "json" } },
additional,
);
const headers = { ...this.headers, ...additional.headers };
if (!headers["Content-Type"]) {
headers["Content-Type"] = "application/json";
}
let response;
try {
response = await this.session.post(uri, body, {
headers,
responseType: additional!.config!.responseType,
params: new Map(Object.entries(additional.params ?? {})),
});
} catch (e) {
if (e instanceof AxiosError) {
response = e.response;
} else {
throw e;
}
}
RestfulLanceDBClient.checkStatus(response!);
if (additional!.config!.responseType === "arraybuffer") {
return response!.data;
} else {
return JSON.parse(response!.data);
}
}
async listTables(limit = 10, pageToken = ""): Promise<string[]> {
const json = await this.get("/v1/table", { limit, pageToken });
return json.tables;
}
async query(tableName: string, query: VectorQuery): Promise<ArrowTable> {
const tbl = await this.post(`/v1/table/${tableName}/query`, query, {
config: {
responseType: "arraybuffer",
},
});
return tableFromIPC(tbl);
}
static checkStatus(response: AxiosResponse): void {
if (response.status === 404) {
throw new Error(`Not found: ${response.data}`);
} else if (response.status >= 400 && response.status < 500) {
throw new Error(
`Bad Request: ${response.status}, error: ${response.data}`,
);
} else if (response.status >= 500 && response.status < 600) {
throw new Error(
`Internal Server Error: ${response.status}, error: ${response.data}`,
);
} else if (response.status !== 200) {
throw new Error(
`Unknown Error: ${response.status}, error: ${response.data}`,
);
}
}
}
function decodeErrorData(data: unknown) {
if (Buffer.isBuffer(data)) {
const decoded = data.toString("utf-8");
return decoded;
}
return data;
}

View File

@@ -0,0 +1,187 @@
import { Schema } from "apache-arrow";
import { Data, fromTableToStreamBuffer, makeEmptyTable } from "../arrow";
import {
Connection,
CreateTableOptions,
OpenTableOptions,
TableNamesOptions,
} from "../connection";
import { Table } from "../table";
import { TTLCache } from "../util";
import { RestfulLanceDBClient } from "./client";
import { RemoteTable } from "./table";
export interface RemoteConnectionOptions {
apiKey?: string;
region?: string;
hostOverride?: string;
connectionTimeout?: number;
readTimeout?: number;
}
export class RemoteConnection extends Connection {
#dbName: string;
#apiKey: string;
#region: string;
#client: RestfulLanceDBClient;
#tableCache = new TTLCache(300_000);
constructor(
url: string,
{
apiKey,
region,
hostOverride,
connectionTimeout,
readTimeout,
}: RemoteConnectionOptions,
) {
super();
apiKey = apiKey ?? process.env.LANCEDB_API_KEY;
region = region ?? process.env.LANCEDB_REGION;
if (!apiKey) {
throw new Error("apiKey is required when connecting to LanceDB Cloud");
}
if (!region) {
throw new Error("region is required when connecting to LanceDB Cloud");
}
const parsed = new URL(url);
if (parsed.protocol !== "db:") {
throw new Error(
`invalid protocol: ${parsed.protocol}, only accepts db://`,
);
}
this.#dbName = parsed.hostname;
this.#apiKey = apiKey;
this.#region = region;
this.#client = new RestfulLanceDBClient(
this.#dbName,
this.#apiKey,
this.#region,
hostOverride,
connectionTimeout,
readTimeout,
);
}
isOpen(): boolean {
return this.#client.isOpen();
}
close(): void {
return this.#client.close();
}
display(): string {
return `RemoteConnection(${this.#dbName})`;
}
async tableNames(options?: Partial<TableNamesOptions>): Promise<string[]> {
const response = await this.#client.get("/v1/table/", {
limit: options?.limit ?? 10,
// biome-ignore lint/style/useNamingConvention: <explanation>
page_token: options?.startAfter ?? "",
});
const body = await response.body();
for (const table of body.tables) {
this.#tableCache.set(table, true);
}
return body.tables;
}
async openTable(
name: string,
_options?: Partial<OpenTableOptions> | undefined,
): Promise<Table> {
if (this.#tableCache.get(name) === undefined) {
await this.#client.post(
`/v1/table/${encodeURIComponent(name)}/describe/`,
);
this.#tableCache.set(name, true);
}
return new RemoteTable(this.#client, name, this.#dbName);
}
async createTable(
tableName: string,
data: Data,
options?: Partial<CreateTableOptions> | undefined,
): Promise<Table> {
if (options?.mode) {
console.warn(
"option 'mode' is not supported in LanceDB Cloud",
"LanceDB Cloud only supports the default 'create' mode.",
"If the table already exists, an error will be thrown.",
);
}
if (options?.embeddingFunction) {
console.warn(
"embedding_functions is not yet supported on LanceDB Cloud.",
"Please vote https://github.com/lancedb/lancedb/issues/626 ",
"for this feature.",
);
}
const { buf } = await Table.parseTableData(
data,
options,
true /** streaming */,
);
await this.#client.post(
`/v1/table/${encodeURIComponent(tableName)}/create/`,
buf,
{
config: {
responseType: "arraybuffer",
},
headers: { "Content-Type": "application/vnd.apache.arrow.stream" },
},
);
this.#tableCache.set(tableName, true);
return new RemoteTable(this.#client, tableName, this.#dbName);
}
async createEmptyTable(
name: string,
schema: Schema,
options?: Partial<CreateTableOptions> | undefined,
): Promise<Table> {
if (options?.mode) {
console.warn(`mode is not supported on LanceDB Cloud`);
}
if (options?.embeddingFunction) {
console.warn(
"embeddingFunction is not yet supported on LanceDB Cloud.",
"Please vote https://github.com/lancedb/lancedb/issues/626 ",
"for this feature.",
);
}
const emptyTable = makeEmptyTable(schema);
const buf = await fromTableToStreamBuffer(emptyTable);
await this.#client.post(
`/v1/table/${encodeURIComponent(name)}/create/`,
buf,
{
config: {
responseType: "arraybuffer",
},
headers: { "Content-Type": "application/vnd.apache.arrow.stream" },
},
);
this.#tableCache.set(name, true);
return new RemoteTable(this.#client, name, this.#dbName);
}
async dropTable(name: string): Promise<void> {
await this.#client.post(`/v1/table/${encodeURIComponent(name)}/drop/`);
this.#tableCache.delete(name);
}
}

View File

@@ -0,0 +1,3 @@
export { RestfulLanceDBClient } from "./client";
export { type RemoteConnectionOptions, RemoteConnection } from "./connection";
export { RemoteTable } from "./table";

View File

@@ -0,0 +1,164 @@
// Copyright 2023 LanceDB Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import { Table as ArrowTable } from "apache-arrow";
import { Data, IntoVector } from "../arrow";
import { CreateTableOptions } from "../connection";
import { IndexOptions } from "../indices";
import { MergeInsertBuilder } from "../merge";
import { VectorQuery } from "../query";
import { AddDataOptions, Table, UpdateOptions } from "../table";
import { RestfulLanceDBClient } from "./client";
export class RemoteTable extends Table {
#client: RestfulLanceDBClient;
#name: string;
// Used in the display() method
#dbName: string;
get #tablePrefix() {
return `/v1/table/${encodeURIComponent(this.#name)}/`;
}
public constructor(
client: RestfulLanceDBClient,
tableName: string,
dbName: string,
) {
super();
this.#client = client;
this.#name = tableName;
this.#dbName = dbName;
}
isOpen(): boolean {
return !this.#client.isOpen();
}
close(): void {
this.#client.close();
}
display(): string {
return `RemoteTable(${this.#dbName}; ${this.#name})`;
}
async schema(): Promise<import("apache-arrow").Schema> {
const resp = await this.#client.post(`${this.#tablePrefix}/describe/`);
// TODO: parse this into a valid arrow schema
return resp.schema;
}
async add(data: Data, options?: Partial<AddDataOptions>): Promise<void> {
const { buf, mode } = await Table.parseTableData(
data,
options as CreateTableOptions,
true,
);
await this.#client.post(`${this.#tablePrefix}/insert/`, buf, {
params: {
mode,
},
headers: {
"Content-Type": "application/vnd.apache.arrow.stream",
},
});
}
async update(
updates: Map<string, string> | Record<string, string>,
options?: Partial<UpdateOptions>,
): Promise<void> {
await this.#client.post(`${this.#tablePrefix}/update/`, {
predicate: options?.where ?? null,
updates: Object.entries(updates).map(([key, value]) => [key, value]),
});
}
async countRows(filter?: unknown): Promise<number> {
const payload = { predicate: filter };
return await this.#client.post(`${this.#tablePrefix}/count_rows/`, payload);
}
async delete(predicate: unknown): Promise<void> {
const payload = { predicate };
await this.#client.post(`${this.#tablePrefix}/delete/`, payload);
}
async createIndex(
column: string,
options?: Partial<IndexOptions>,
): Promise<void> {
if (options !== undefined) {
console.warn("options are not yet supported on the LanceDB cloud");
}
const indexType = "vector";
const metric = "L2";
const data = {
column,
// biome-ignore lint/style/useNamingConvention: external API
index_type: indexType,
// biome-ignore lint/style/useNamingConvention: external API
metric_type: metric,
};
await this.#client.post(`${this.#tablePrefix}/create_index`, data);
}
query(): import("..").Query {
throw new Error("query() is not yet supported on the LanceDB cloud");
}
search(query: IntoVector): VectorQuery;
search(query: string): Promise<VectorQuery>;
search(_query: string | IntoVector): VectorQuery | Promise<VectorQuery> {
throw new Error("search() is not yet supported on the LanceDB cloud");
}
vectorSearch(_vector: unknown): import("..").VectorQuery {
throw new Error("vectorSearch() is not yet supported on the LanceDB cloud");
}
addColumns(_newColumnTransforms: unknown): Promise<void> {
throw new Error("addColumns() is not yet supported on the LanceDB cloud");
}
alterColumns(_columnAlterations: unknown): Promise<void> {
throw new Error("alterColumns() is not yet supported on the LanceDB cloud");
}
dropColumns(_columnNames: unknown): Promise<void> {
throw new Error("dropColumns() is not yet supported on the LanceDB cloud");
}
async version(): Promise<number> {
const resp = await this.#client.post(`${this.#tablePrefix}/describe/`);
return resp.version;
}
checkout(_version: unknown): Promise<void> {
throw new Error("checkout() is not yet supported on the LanceDB cloud");
}
checkoutLatest(): Promise<void> {
throw new Error(
"checkoutLatest() is not yet supported on the LanceDB cloud",
);
}
restore(): Promise<void> {
throw new Error("restore() is not yet supported on the LanceDB cloud");
}
optimize(_options?: unknown): Promise<import("../native").OptimizeStats> {
throw new Error("optimize() is not yet supported on the LanceDB cloud");
}
async listIndices(): Promise<import("../native").IndexConfig[]> {
return await this.#client.post(`${this.#tablePrefix}/index/list/`);
}
toArrow(): Promise<ArrowTable> {
throw new Error("toArrow() is not yet supported on the LanceDB cloud");
}
mergeInsert(_on: string | string[]): MergeInsertBuilder {
throw new Error("mergeInsert() is not yet supported on the LanceDB cloud");
}
}

View File

@@ -18,11 +18,17 @@ import {
IntoVector, IntoVector,
Schema, Schema,
fromDataToBuffer, fromDataToBuffer,
fromTableToBuffer,
fromTableToStreamBuffer,
isArrowTable,
makeArrowTable,
tableFromIPC, tableFromIPC,
} from "./arrow"; } from "./arrow";
import { CreateTableOptions } from "./connection";
import { EmbeddingFunctionConfig, getRegistry } from "./embedding/registry"; import { EmbeddingFunctionConfig, getRegistry } from "./embedding/registry";
import { IndexOptions } from "./indices"; import { IndexOptions } from "./indices";
import { MergeInsertBuilder } from "./merge";
import { import {
AddColumnsSql, AddColumnsSql,
ColumnAlteration, ColumnAlteration,
@@ -88,19 +94,13 @@ export interface OptimizeOptions {
* Closing a table is optional. It not closed, it will be closed when it is garbage * Closing a table is optional. It not closed, it will be closed when it is garbage
* collected. * collected.
*/ */
export class Table { export abstract class Table {
private readonly inner: _NativeTable; [Symbol.for("nodejs.util.inspect.custom")](): string {
return this.display();
/** Construct a Table. Internal use only. */
constructor(inner: _NativeTable) {
this.inner = inner;
} }
/** Return true if the table has not been closed */ /** Return true if the table has not been closed */
isOpen(): boolean { abstract isOpen(): boolean;
return this.inner.isOpen();
}
/** /**
* Close the table, releasing any underlying resources. * Close the table, releasing any underlying resources.
* *
@@ -108,48 +108,16 @@ export class Table {
* *
* Any attempt to use the table after it is closed will result in an error. * Any attempt to use the table after it is closed will result in an error.
*/ */
close(): void { abstract close(): void;
this.inner.close();
}
/** Return a brief description of the table */ /** Return a brief description of the table */
display(): string { abstract display(): string;
return this.inner.display();
}
async #getEmbeddingFunctions(): Promise<
Map<string, EmbeddingFunctionConfig>
> {
const schema = await this.schema();
const registry = getRegistry();
return registry.parseFunctions(schema.metadata);
}
/** Get the schema of the table. */ /** Get the schema of the table. */
async schema(): Promise<Schema> { abstract schema(): Promise<Schema>;
const schemaBuf = await this.inner.schema();
const tbl = tableFromIPC(schemaBuf);
return tbl.schema;
}
/** /**
* Insert records into this Table. * Insert records into this Table.
* @param {Data} data Records to be inserted into the Table * @param {Data} data Records to be inserted into the Table
*/ */
async add(data: Data, options?: Partial<AddDataOptions>): Promise<void> { abstract add(data: Data, options?: Partial<AddDataOptions>): Promise<void>;
const mode = options?.mode ?? "append";
const schema = await this.schema();
const registry = getRegistry();
const functions = registry.parseFunctions(schema.metadata);
const buffer = await fromDataToBuffer(
data,
functions.values().next().value,
schema,
);
await this.inner.add(buffer, mode);
}
/** /**
* Update existing records in the Table * Update existing records in the Table
* *
@@ -175,30 +143,14 @@ export class Table {
* @param {Partial<UpdateOptions>} options - additional options to control * @param {Partial<UpdateOptions>} options - additional options to control
* the update behavior * the update behavior
*/ */
async update( abstract update(
updates: Map<string, string> | Record<string, string>, updates: Map<string, string> | Record<string, string>,
options?: Partial<UpdateOptions>, options?: Partial<UpdateOptions>,
) { ): Promise<void>;
const onlyIf = options?.where;
let columns: [string, string][];
if (updates instanceof Map) {
columns = Array.from(updates.entries());
} else {
columns = Object.entries(updates);
}
await this.inner.update(onlyIf, columns);
}
/** Count the total number of rows in the dataset. */ /** Count the total number of rows in the dataset. */
async countRows(filter?: string): Promise<number> { abstract countRows(filter?: string): Promise<number>;
return await this.inner.countRows(filter);
}
/** Delete the rows that satisfy the predicate. */ /** Delete the rows that satisfy the predicate. */
async delete(predicate: string): Promise<void> { abstract delete(predicate: string): Promise<void>;
await this.inner.delete(predicate);
}
/** /**
* Create an index to speed up queries. * Create an index to speed up queries.
* *
@@ -225,13 +177,10 @@ export class Table {
* // Or create a Scalar index * // Or create a Scalar index
* await table.createIndex("my_float_col"); * await table.createIndex("my_float_col");
*/ */
async createIndex(column: string, options?: Partial<IndexOptions>) { abstract createIndex(
// Bit of a hack to get around the fact that TS has no package-scope. column: string,
// biome-ignore lint/suspicious/noExplicitAny: skip options?: Partial<IndexOptions>,
const nativeIndex = (options?.config as any)?.inner; ): Promise<void>;
await this.inner.createIndex(nativeIndex, column, options?.replace);
}
/** /**
* Create a {@link Query} Builder. * Create a {@link Query} Builder.
* *
@@ -282,44 +231,20 @@ export class Table {
* } * }
* @returns {Query} A builder that can be used to parameterize the query * @returns {Query} A builder that can be used to parameterize the query
*/ */
query(): Query { abstract query(): Query;
return new Query(this.inner);
}
/** /**
* Create a search query to find the nearest neighbors * Create a search query to find the nearest neighbors
* of the given query vector * of the given query vector
* @param {string} query - the query. This will be converted to a vector using the table's provided embedding function * @param {string} query - the query. This will be converted to a vector using the table's provided embedding function
* @rejects {Error} If no embedding functions are defined in the table * @rejects {Error} If no embedding functions are defined in the table
*/ */
search(query: string): Promise<VectorQuery>; abstract search(query: string): Promise<VectorQuery>;
/** /**
* Create a search query to find the nearest neighbors * Create a search query to find the nearest neighbors
* of the given query vector * of the given query vector
* @param {IntoVector} query - the query vector * @param {IntoVector} query - the query vector
*/ */
search(query: IntoVector): VectorQuery; abstract search(query: IntoVector): VectorQuery;
search(query: string | IntoVector): Promise<VectorQuery> | VectorQuery {
if (typeof query !== "string") {
return this.vectorSearch(query);
} else {
return this.#getEmbeddingFunctions().then(async (functions) => {
// TODO: Support multiple embedding functions
const embeddingFunc: EmbeddingFunctionConfig | undefined = functions
.values()
.next().value;
if (!embeddingFunc) {
return Promise.reject(
new Error("No embedding functions are defined in the table"),
);
}
const embeddings =
await embeddingFunc.function.computeQueryEmbeddings(query);
return this.query().nearestTo(embeddings);
});
}
}
/** /**
* Search the table with a given query vector. * Search the table with a given query vector.
* *
@@ -327,11 +252,7 @@ export class Table {
* is the same thing as calling `nearestTo` on the builder returned * is the same thing as calling `nearestTo` on the builder returned
* by `query`. @see {@link Query#nearestTo} for more details. * by `query`. @see {@link Query#nearestTo} for more details.
*/ */
vectorSearch(vector: IntoVector): VectorQuery { abstract vectorSearch(vector: IntoVector): VectorQuery;
return this.query().nearestTo(vector);
}
// TODO: Support BatchUDF
/** /**
* Add new columns with defined values. * Add new columns with defined values.
* @param {AddColumnsSql[]} newColumnTransforms pairs of column names and * @param {AddColumnsSql[]} newColumnTransforms pairs of column names and
@@ -339,19 +260,14 @@ export class Table {
* expressions will be evaluated for each row in the table, and can * expressions will be evaluated for each row in the table, and can
* reference existing columns in the table. * reference existing columns in the table.
*/ */
async addColumns(newColumnTransforms: AddColumnsSql[]): Promise<void> { abstract addColumns(newColumnTransforms: AddColumnsSql[]): Promise<void>;
await this.inner.addColumns(newColumnTransforms);
}
/** /**
* Alter the name or nullability of columns. * Alter the name or nullability of columns.
* @param {ColumnAlteration[]} columnAlterations One or more alterations to * @param {ColumnAlteration[]} columnAlterations One or more alterations to
* apply to columns. * apply to columns.
*/ */
async alterColumns(columnAlterations: ColumnAlteration[]): Promise<void> { abstract alterColumns(columnAlterations: ColumnAlteration[]): Promise<void>;
await this.inner.alterColumns(columnAlterations);
}
/** /**
* Drop one or more columns from the dataset * Drop one or more columns from the dataset
* *
@@ -363,15 +279,10 @@ export class Table {
* be nested column references (e.g. "a.b.c") or top-level column names * be nested column references (e.g. "a.b.c") or top-level column names
* (e.g. "a"). * (e.g. "a").
*/ */
async dropColumns(columnNames: string[]): Promise<void> { abstract dropColumns(columnNames: string[]): Promise<void>;
await this.inner.dropColumns(columnNames);
}
/** Retrieve the version of the table */ /** Retrieve the version of the table */
async version(): Promise<number> {
return await this.inner.version();
}
abstract version(): Promise<number>;
/** /**
* Checks out a specific version of the table _This is an in-place operation._ * Checks out a specific version of the table _This is an in-place operation._
* *
@@ -397,19 +308,14 @@ export class Table {
* console.log(await table.version()); // 2 * console.log(await table.version()); // 2
* ``` * ```
*/ */
async checkout(version: number): Promise<void> { abstract checkout(version: number): Promise<void>;
await this.inner.checkout(version);
}
/** /**
* Checkout the latest version of the table. _This is an in-place operation._ * Checkout the latest version of the table. _This is an in-place operation._
* *
* The table will be set back into standard mode, and will track the latest * The table will be set back into standard mode, and will track the latest
* version of the table. * version of the table.
*/ */
async checkoutLatest(): Promise<void> { abstract checkoutLatest(): Promise<void>;
await this.inner.checkoutLatest();
}
/** /**
* Restore the table to the currently checked out version * Restore the table to the currently checked out version
@@ -423,10 +329,7 @@ export class Table {
* Once the operation concludes the table will no longer be in a checked * Once the operation concludes the table will no longer be in a checked
* out state and the read_consistency_interval, if any, will apply. * out state and the read_consistency_interval, if any, will apply.
*/ */
async restore(): Promise<void> { abstract restore(): Promise<void>;
await this.inner.restore();
}
/** /**
* Optimize the on-disk data and indices for better performance. * Optimize the on-disk data and indices for better performance.
* *
@@ -457,6 +360,191 @@ export class Table {
* you have added or modified 100,000 or more records or run more than 20 data * you have added or modified 100,000 or more records or run more than 20 data
* modification operations. * modification operations.
*/ */
abstract optimize(options?: Partial<OptimizeOptions>): Promise<OptimizeStats>;
/** List all indices that have been created with {@link Table.createIndex} */
abstract listIndices(): Promise<IndexConfig[]>;
/** Return the table as an arrow table */
abstract toArrow(): Promise<ArrowTable>;
abstract mergeInsert(on: string | string[]): MergeInsertBuilder;
static async parseTableData(
// biome-ignore lint/suspicious/noExplicitAny: <explanation>
data: Record<string, unknown>[] | ArrowTable<any>,
options?: Partial<CreateTableOptions>,
streaming = false,
) {
let mode: string = options?.mode ?? "create";
const existOk = options?.existOk ?? false;
if (mode === "create" && existOk) {
mode = "exist_ok";
}
let table: ArrowTable;
if (isArrowTable(data)) {
table = data;
} else {
table = makeArrowTable(data, options);
}
if (streaming) {
const buf = await fromTableToStreamBuffer(
table,
options?.embeddingFunction,
options?.schema,
);
return { buf, mode };
} else {
const buf = await fromTableToBuffer(
table,
options?.embeddingFunction,
options?.schema,
);
return { buf, mode };
}
}
}
export class LocalTable extends Table {
private readonly inner: _NativeTable;
constructor(inner: _NativeTable) {
super();
this.inner = inner;
}
isOpen(): boolean {
return this.inner.isOpen();
}
close(): void {
this.inner.close();
}
display(): string {
return this.inner.display();
}
private async getEmbeddingFunctions(): Promise<
Map<string, EmbeddingFunctionConfig>
> {
const schema = await this.schema();
const registry = getRegistry();
return registry.parseFunctions(schema.metadata);
}
/** Get the schema of the table. */
async schema(): Promise<Schema> {
const schemaBuf = await this.inner.schema();
const tbl = tableFromIPC(schemaBuf);
return tbl.schema;
}
async add(data: Data, options?: Partial<AddDataOptions>): Promise<void> {
const mode = options?.mode ?? "append";
const schema = await this.schema();
const registry = getRegistry();
const functions = registry.parseFunctions(schema.metadata);
const buffer = await fromDataToBuffer(
data,
functions.values().next().value,
schema,
);
await this.inner.add(buffer, mode);
}
async update(
updates: Map<string, string> | Record<string, string>,
options?: Partial<UpdateOptions>,
) {
const onlyIf = options?.where;
let columns: [string, string][];
if (updates instanceof Map) {
columns = Array.from(updates.entries());
} else {
columns = Object.entries(updates);
}
await this.inner.update(onlyIf, columns);
}
async countRows(filter?: string): Promise<number> {
return await this.inner.countRows(filter);
}
async delete(predicate: string): Promise<void> {
await this.inner.delete(predicate);
}
async createIndex(column: string, options?: Partial<IndexOptions>) {
// Bit of a hack to get around the fact that TS has no package-scope.
// biome-ignore lint/suspicious/noExplicitAny: skip
const nativeIndex = (options?.config as any)?.inner;
await this.inner.createIndex(nativeIndex, column, options?.replace);
}
query(): Query {
return new Query(this.inner);
}
search(query: string): Promise<VectorQuery>;
search(query: IntoVector): VectorQuery;
search(query: string | IntoVector): Promise<VectorQuery> | VectorQuery {
if (typeof query !== "string") {
return this.vectorSearch(query);
} else {
return this.getEmbeddingFunctions().then(async (functions) => {
// TODO: Support multiple embedding functions
const embeddingFunc: EmbeddingFunctionConfig | undefined = functions
.values()
.next().value;
if (!embeddingFunc) {
return Promise.reject(
new Error("No embedding functions are defined in the table"),
);
}
const embeddings =
await embeddingFunc.function.computeQueryEmbeddings(query);
return this.query().nearestTo(embeddings);
});
}
}
vectorSearch(vector: IntoVector): VectorQuery {
return this.query().nearestTo(vector);
}
// TODO: Support BatchUDF
async addColumns(newColumnTransforms: AddColumnsSql[]): Promise<void> {
await this.inner.addColumns(newColumnTransforms);
}
async alterColumns(columnAlterations: ColumnAlteration[]): Promise<void> {
await this.inner.alterColumns(columnAlterations);
}
async dropColumns(columnNames: string[]): Promise<void> {
await this.inner.dropColumns(columnNames);
}
async version(): Promise<number> {
return await this.inner.version();
}
async checkout(version: number): Promise<void> {
await this.inner.checkout(version);
}
async checkoutLatest(): Promise<void> {
await this.inner.checkoutLatest();
}
async restore(): Promise<void> {
await this.inner.restore();
}
async optimize(options?: Partial<OptimizeOptions>): Promise<OptimizeStats> { async optimize(options?: Partial<OptimizeOptions>): Promise<OptimizeStats> {
let cleanupOlderThanMs; let cleanupOlderThanMs;
if ( if (
@@ -469,13 +557,16 @@ export class Table {
return await this.inner.optimize(cleanupOlderThanMs); return await this.inner.optimize(cleanupOlderThanMs);
} }
/** List all indices that have been created with {@link Table.createIndex} */
async listIndices(): Promise<IndexConfig[]> { async listIndices(): Promise<IndexConfig[]> {
return await this.inner.listIndices(); return await this.inner.listIndices();
} }
/** Return the table as an arrow table */
async toArrow(): Promise<ArrowTable> { async toArrow(): Promise<ArrowTable> {
return await this.query().toArrow(); return await this.query().toArrow();
} }
mergeInsert(on: string | string[]): MergeInsertBuilder {
on = Array.isArray(on) ? on : [on];
return new MergeInsertBuilder(this.inner.mergeInsert(on));
}
} }

35
nodejs/lancedb/util.ts Normal file
View File

@@ -0,0 +1,35 @@
export class TTLCache {
// biome-ignore lint/suspicious/noExplicitAny: <explanation>
private readonly cache: Map<string, { value: any; expires: number }>;
/**
* @param ttl Time to live in milliseconds
*/
constructor(private readonly ttl: number) {
this.cache = new Map();
}
// biome-ignore lint/suspicious/noExplicitAny: <explanation>
get(key: string): any | undefined {
const entry = this.cache.get(key);
if (entry === undefined) {
return undefined;
}
if (entry.expires < Date.now()) {
this.cache.delete(key);
return undefined;
}
return entry.value;
}
// biome-ignore lint/suspicious/noExplicitAny: <explanation>
set(key: string, value: any): void {
this.cache.set(key, { value, expires: Date.now() + this.ttl });
}
delete(key: string): void {
this.cache.delete(key);
}
}

View File

@@ -1,12 +1,12 @@
{ {
"name": "@lancedb/lancedb", "name": "@lancedb/lancedb",
"version": "0.5.0", "version": "0.5.2",
"lockfileVersion": 3, "lockfileVersion": 3,
"requires": true, "requires": true,
"packages": { "packages": {
"": { "": {
"name": "@lancedb/lancedb", "name": "@lancedb/lancedb",
"version": "0.5.0", "version": "0.5.2",
"cpu": [ "cpu": [
"x64", "x64",
"arm64" "arm64"
@@ -18,7 +18,10 @@
"win32" "win32"
], ],
"dependencies": { "dependencies": {
"@types/axios": "^0.14.0",
"apache-arrow": "^15.0.0", "apache-arrow": "^15.0.0",
"axios": "^1.7.2",
"memoize": "^10.0.0",
"openai": "^4.29.2", "openai": "^4.29.2",
"reflect-metadata": "^0.2.2" "reflect-metadata": "^0.2.2"
}, },
@@ -3123,6 +3126,15 @@
"tslib": "^2.4.0" "tslib": "^2.4.0"
} }
}, },
"node_modules/@types/axios": {
"version": "0.14.0",
"resolved": "https://registry.npmjs.org/@types/axios/-/axios-0.14.0.tgz",
"integrity": "sha512-KqQnQbdYE54D7oa/UmYVMZKq7CO4l8DEENzOKc4aBRwxCXSlJXGz83flFx5L7AWrOQnmuN3kVsRdt+GZPPjiVQ==",
"deprecated": "This is a stub types definition for axios (https://github.com/mzabriskie/axios). axios provides its own type definitions, so you don't need @types/axios installed!",
"dependencies": {
"axios": "*"
}
},
"node_modules/@types/babel__core": { "node_modules/@types/babel__core": {
"version": "7.20.5", "version": "7.20.5",
"resolved": "https://registry.npmjs.org/@types/babel__core/-/babel__core-7.20.5.tgz", "resolved": "https://registry.npmjs.org/@types/babel__core/-/babel__core-7.20.5.tgz",
@@ -3497,6 +3509,16 @@
"resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz", "resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz",
"integrity": "sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q==" "integrity": "sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q=="
}, },
"node_modules/axios": {
"version": "1.7.2",
"resolved": "https://registry.npmjs.org/axios/-/axios-1.7.2.tgz",
"integrity": "sha512-2A8QhOMrbomlDuiLeK9XibIBzuHeRcqqNOHp0Cyp5EoJ1IFDh+XZH3A6BkXtv0K4gFGCI0Y4BM7B1wOEi0Rmgw==",
"dependencies": {
"follow-redirects": "^1.15.6",
"form-data": "^4.0.0",
"proxy-from-env": "^1.1.0"
}
},
"node_modules/babel-jest": { "node_modules/babel-jest": {
"version": "29.7.0", "version": "29.7.0",
"resolved": "https://registry.npmjs.org/babel-jest/-/babel-jest-29.7.0.tgz", "resolved": "https://registry.npmjs.org/babel-jest/-/babel-jest-29.7.0.tgz",
@@ -4478,6 +4500,25 @@
"integrity": "sha512-36yxDn5H7OFZQla0/jFJmbIKTdZAQHngCedGxiMmpNfEZM0sdEeT+WczLQrjK6D7o2aiyLYDnkw0R3JK0Qv1RQ==", "integrity": "sha512-36yxDn5H7OFZQla0/jFJmbIKTdZAQHngCedGxiMmpNfEZM0sdEeT+WczLQrjK6D7o2aiyLYDnkw0R3JK0Qv1RQ==",
"dev": true "dev": true
}, },
"node_modules/follow-redirects": {
"version": "1.15.6",
"resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.15.6.tgz",
"integrity": "sha512-wWN62YITEaOpSK584EZXJafH1AGpO8RVgElfkuXbTOrPX4fIfOyEpW/CsiNd8JdYrAoOvafRTOEnvsO++qCqFA==",
"funding": [
{
"type": "individual",
"url": "https://github.com/sponsors/RubenVerborgh"
}
],
"engines": {
"node": ">=4.0"
},
"peerDependenciesMeta": {
"debug": {
"optional": true
}
}
},
"node_modules/form-data": { "node_modules/form-data": {
"version": "4.0.0", "version": "4.0.0",
"resolved": "https://registry.npmjs.org/form-data/-/form-data-4.0.0.tgz", "resolved": "https://registry.npmjs.org/form-data/-/form-data-4.0.0.tgz",
@@ -5901,6 +5942,20 @@
"is-buffer": "~1.1.6" "is-buffer": "~1.1.6"
} }
}, },
"node_modules/memoize": {
"version": "10.0.0",
"resolved": "https://registry.npmjs.org/memoize/-/memoize-10.0.0.tgz",
"integrity": "sha512-H6cBLgsi6vMWOcCpvVCdFFnl3kerEXbrYh9q+lY6VXvQSmM6CkmV08VOwT+WE2tzIEqRPFfAq3fm4v/UIW6mSA==",
"dependencies": {
"mimic-function": "^5.0.0"
},
"engines": {
"node": ">=18"
},
"funding": {
"url": "https://github.com/sindresorhus/memoize?sponsor=1"
}
},
"node_modules/merge-stream": { "node_modules/merge-stream": {
"version": "2.0.0", "version": "2.0.0",
"resolved": "https://registry.npmjs.org/merge-stream/-/merge-stream-2.0.0.tgz", "resolved": "https://registry.npmjs.org/merge-stream/-/merge-stream-2.0.0.tgz",
@@ -5948,6 +6003,17 @@
"node": ">= 0.6" "node": ">= 0.6"
} }
}, },
"node_modules/mimic-function": {
"version": "5.0.1",
"resolved": "https://registry.npmjs.org/mimic-function/-/mimic-function-5.0.1.tgz",
"integrity": "sha512-VP79XUPxV2CigYP3jWwAUFSku2aKqBH7uTAapFWCBqutsbmDo96KY5o8uh6U+/YSIn5OxJnXp73beVkpqMIGhA==",
"engines": {
"node": ">=18"
},
"funding": {
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/minimatch": { "node_modules/minimatch": {
"version": "3.1.2", "version": "3.1.2",
"resolved": "https://registry.npmjs.org/minimatch/-/minimatch-3.1.2.tgz", "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-3.1.2.tgz",
@@ -6359,6 +6425,11 @@
"node": ">= 6" "node": ">= 6"
} }
}, },
"node_modules/proxy-from-env": {
"version": "1.1.0",
"resolved": "https://registry.npmjs.org/proxy-from-env/-/proxy-from-env-1.1.0.tgz",
"integrity": "sha512-D+zkORCbA9f1tdWRK0RaCR3GPv50cMxcrz4X8k5LTSUD1Dkw47mKJEZQNunItRTkWwgtaUSo1RVFRIG9ZXiFYg=="
},
"node_modules/punycode": { "node_modules/punycode": {
"version": "2.3.1", "version": "2.3.1",
"resolved": "https://registry.npmjs.org/punycode/-/punycode-2.3.1.tgz", "resolved": "https://registry.npmjs.org/punycode/-/punycode-2.3.1.tgz",

View File

@@ -65,7 +65,9 @@
"version": "napi version" "version": "napi version"
}, },
"dependencies": { "dependencies": {
"@types/axios": "^0.14.0",
"apache-arrow": "^15.0.0", "apache-arrow": "^15.0.0",
"axios": "^1.7.2",
"openai": "^4.29.2", "openai": "^4.29.2",
"reflect-metadata": "^0.2.2" "reflect-metadata": "^0.2.2"
} }

View File

@@ -20,6 +20,7 @@ mod connection;
mod error; mod error;
mod index; mod index;
mod iterator; mod iterator;
pub mod merge;
mod query; mod query;
mod table; mod table;
mod util; mod util;

53
nodejs/src/merge.rs Normal file
View File

@@ -0,0 +1,53 @@
use lancedb::{arrow::IntoArrow, ipc::ipc_file_to_batches, table::merge::MergeInsertBuilder};
use napi::bindgen_prelude::*;
use napi_derive::napi;
#[napi]
#[derive(Clone)]
/// A builder used to create and run a merge insert operation
pub struct NativeMergeInsertBuilder {
pub(crate) inner: MergeInsertBuilder,
}
#[napi]
impl NativeMergeInsertBuilder {
#[napi]
pub fn when_matched_update_all(&self, condition: Option<String>) -> Self {
let mut this = self.clone();
this.inner.when_matched_update_all(condition);
this
}
#[napi]
pub fn when_not_matched_insert_all(&self) -> Self {
let mut this = self.clone();
this.inner.when_not_matched_insert_all();
this
}
#[napi]
pub fn when_not_matched_by_source_delete(&self, filter: Option<String>) -> Self {
let mut this = self.clone();
this.inner.when_not_matched_by_source_delete(filter);
this
}
#[napi]
pub async fn execute(&self, buf: Buffer) -> napi::Result<()> {
let data = ipc_file_to_batches(buf.to_vec())
.and_then(IntoArrow::into_arrow)
.map_err(|e| napi::Error::from_reason(format!("Failed to read IPC file: {}", e)))?;
let this = self.clone();
this.inner
.execute(data)
.await
.map_err(|e| napi::Error::from_reason(format!("Failed to execute merge insert: {}", e)))
}
}
impl From<MergeInsertBuilder> for NativeMergeInsertBuilder {
fn from(inner: MergeInsertBuilder) -> Self {
Self { inner }
}
}

View File

@@ -23,6 +23,7 @@ use napi_derive::napi;
use crate::error::NapiErrorExt; use crate::error::NapiErrorExt;
use crate::index::Index; use crate::index::Index;
use crate::merge::NativeMergeInsertBuilder;
use crate::query::{Query, VectorQuery}; use crate::query::{Query, VectorQuery};
#[napi] #[napi]
@@ -328,6 +329,12 @@ impl Table {
.map(IndexConfig::from) .map(IndexConfig::from)
.collect::<Vec<_>>()) .collect::<Vec<_>>())
} }
#[napi]
pub fn merge_insert(&self, on: Vec<String>) -> napi::Result<NativeMergeInsertBuilder> {
let on: Vec<_> = on.iter().map(String::as_str).collect();
Ok(self.inner_ref()?.merge_insert(on.as_slice()).into())
}
} }
#[napi(object)] #[napi(object)]

View File

@@ -3,7 +3,7 @@ name = "lancedb"
# version in Cargo.toml # version in Cargo.toml
dependencies = [ dependencies = [
"deprecation", "deprecation",
"pylance==0.12.1", "pylance==0.12.2-beta.2",
"ratelimiter~=1.0", "ratelimiter~=1.0",
"requests>=2.31.0", "requests>=2.31.0",
"retry>=0.9.2", "retry>=0.9.2",
@@ -57,15 +57,10 @@ tests = [
"duckdb", "duckdb",
"pytz", "pytz",
"polars>=0.19", "polars>=0.19",
"tantivy" "tantivy",
] ]
dev = ["ruff", "pre-commit"] dev = ["ruff", "pre-commit"]
docs = [ docs = ["mkdocs", "mkdocs-jupyter", "mkdocs-material", "mkdocstrings[python]"]
"mkdocs",
"mkdocs-jupyter",
"mkdocs-material",
"mkdocstrings[python]",
]
clip = ["torch", "pillow", "open-clip"] clip = ["torch", "pillow", "open-clip"]
embeddings = [ embeddings = [
"openai>=1.6.1", "openai>=1.6.1",
@@ -100,5 +95,5 @@ addopts = "--strict-markers --ignore-glob=lancedb/embeddings/*.py"
markers = [ markers = [
"slow: marks tests as slow (deselect with '-m \"not slow\"')", "slow: marks tests as slow (deselect with '-m \"not slow\"')",
"asyncio", "asyncio",
"s3_test" "s3_test",
] ]

View File

@@ -337,7 +337,6 @@ class Table(ABC):
For example, the following scan will be faster if the column ``my_col`` has For example, the following scan will be faster if the column ``my_col`` has
a scalar index: a scalar index:
.. code-block:: python
import lancedb import lancedb
@@ -348,8 +347,6 @@ class Table(ABC):
Scalar indices can also speed up scans containing a vector search and a Scalar indices can also speed up scans containing a vector search and a
prefilter: prefilter:
.. code-block::python
import lancedb import lancedb
db = lancedb.connect("/data/lance") db = lancedb.connect("/data/lance")
@@ -385,7 +382,6 @@ class Table(ABC):
Examples Examples
-------- --------
.. code-block:: python
import lance import lance

2
rust-toolchain.toml Normal file
View File

@@ -0,0 +1,2 @@
[toolchain]
channel = "1.79.0"

View File

@@ -84,7 +84,8 @@ pub fn convert_polars_arrow_array_to_arrow_rs_array(
arrow_datatype: arrow_schema::DataType, arrow_datatype: arrow_schema::DataType,
) -> std::result::Result<arrow_array::ArrayRef, arrow_schema::ArrowError> { ) -> std::result::Result<arrow_array::ArrayRef, arrow_schema::ArrowError> {
let polars_c_array = polars_arrow::ffi::export_array_to_c(polars_array); let polars_c_array = polars_arrow::ffi::export_array_to_c(polars_array);
let arrow_c_array = unsafe { mem::transmute(polars_c_array) }; // Safety: `polars_arrow::ffi::ArrowArray` has the same memory layout as `arrow::ffi::FFI_ArrowArray`.
let arrow_c_array: arrow_data::ffi::FFI_ArrowArray = unsafe { mem::transmute(polars_c_array) };
Ok(arrow_array::make_array(unsafe { Ok(arrow_array::make_array(unsafe {
arrow::ffi::from_ffi_and_data_type(arrow_c_array, arrow_datatype) arrow::ffi::from_ffi_and_data_type(arrow_c_array, arrow_datatype)
}?)) }?))
@@ -96,7 +97,8 @@ fn convert_arrow_rs_array_to_polars_arrow_array(
polars_arrow_dtype: polars::datatypes::ArrowDataType, polars_arrow_dtype: polars::datatypes::ArrowDataType,
) -> Result<Box<dyn polars_arrow::array::Array>> { ) -> Result<Box<dyn polars_arrow::array::Array>> {
let arrow_c_array = arrow::ffi::FFI_ArrowArray::new(&arrow_rs_array.to_data()); let arrow_c_array = arrow::ffi::FFI_ArrowArray::new(&arrow_rs_array.to_data());
let polars_c_array = unsafe { mem::transmute(arrow_c_array) }; // Safety: `polars_arrow::ffi::ArrowArray` has the same memory layout as `arrow::ffi::FFI_ArrowArray`.
let polars_c_array: polars_arrow::ffi::ArrowArray = unsafe { mem::transmute(arrow_c_array) };
Ok(unsafe { polars_arrow::ffi::import_array_from_c(polars_c_array, polars_arrow_dtype) }?) Ok(unsafe { polars_arrow::ffi::import_array_from_c(polars_c_array, polars_arrow_dtype) }?)
} }
@@ -104,7 +106,9 @@ fn convert_polars_arrow_field_to_arrow_rs_field(
polars_arrow_field: polars_arrow::datatypes::Field, polars_arrow_field: polars_arrow::datatypes::Field,
) -> Result<arrow_schema::Field> { ) -> Result<arrow_schema::Field> {
let polars_c_schema = polars_arrow::ffi::export_field_to_c(&polars_arrow_field); let polars_c_schema = polars_arrow::ffi::export_field_to_c(&polars_arrow_field);
let arrow_c_schema: arrow::ffi::FFI_ArrowSchema = unsafe { mem::transmute(polars_c_schema) }; // Safety: `polars_arrow::ffi::ArrowSchema` has the same memory layout as `arrow::ffi::FFI_ArrowSchema`.
let arrow_c_schema: arrow::ffi::FFI_ArrowSchema =
unsafe { mem::transmute::<_, _>(polars_c_schema) };
let arrow_rs_dtype = arrow_schema::DataType::try_from(&arrow_c_schema)?; let arrow_rs_dtype = arrow_schema::DataType::try_from(&arrow_c_schema)?;
Ok(arrow_schema::Field::new( Ok(arrow_schema::Field::new(
polars_arrow_field.name, polars_arrow_field.name,
@@ -118,6 +122,8 @@ fn convert_arrow_rs_field_to_polars_arrow_field(
) -> Result<polars_arrow::datatypes::Field> { ) -> Result<polars_arrow::datatypes::Field> {
let arrow_rs_dtype = arrow_rs_field.data_type(); let arrow_rs_dtype = arrow_rs_field.data_type();
let arrow_c_schema = arrow::ffi::FFI_ArrowSchema::try_from(arrow_rs_dtype)?; let arrow_c_schema = arrow::ffi::FFI_ArrowSchema::try_from(arrow_rs_dtype)?;
let polars_c_schema: polars_arrow::ffi::ArrowSchema = unsafe { mem::transmute(arrow_c_schema) }; // Safety: `polars_arrow::ffi::ArrowSchema` has the same memory layout as `arrow::ffi::FFI_ArrowSchema`.
let polars_c_schema: polars_arrow::ffi::ArrowSchema =
unsafe { mem::transmute::<_, _>(arrow_c_schema) };
Ok(unsafe { polars_arrow::ffi::import_field_from_c(&polars_c_schema) }?) Ok(unsafe { polars_arrow::ffi::import_field_from_c(&polars_c_schema) }?)
} }

View File

@@ -23,6 +23,7 @@ use super::TableInternal;
/// A builder used to create and run a merge insert operation /// A builder used to create and run a merge insert operation
/// ///
/// See [`super::Table::merge_insert`] for more context /// See [`super::Table::merge_insert`] for more context
#[derive(Debug, Clone)]
pub struct MergeInsertBuilder { pub struct MergeInsertBuilder {
table: Arc<dyn TableInternal>, table: Arc<dyn TableInternal>,
pub(super) on: Vec<String>, pub(super) on: Vec<String>,