From e7022b990e395da10a54c958aa03613243d10a50 Mon Sep 17 00:00:00 2001 From: Cory Grinstead Date: Mon, 17 Jun 2024 15:23:27 -0500 Subject: [PATCH] feat(nodejs): feature parity [1/N] - remote table (#1378) closes https://github.com/lancedb/lancedb/issues/1362 --- .pre-commit-config.yaml | 2 +- nodejs/biome.json | 2 +- nodejs/lancedb/connection.ts | 169 +++++++------- nodejs/lancedb/index.ts | 59 ++++- nodejs/lancedb/remote/client.ts | 221 ++++++++++++++++++ nodejs/lancedb/remote/connection.ts | 187 +++++++++++++++ nodejs/lancedb/remote/index.ts | 3 + nodejs/lancedb/remote/table.ts | 164 +++++++++++++ nodejs/lancedb/table.ts | 350 +++++++++++++++++----------- nodejs/lancedb/util.ts | 35 +++ nodejs/package-lock.json | 75 +++++- nodejs/package.json | 2 + 12 files changed, 1043 insertions(+), 226 deletions(-) create mode 100644 nodejs/lancedb/remote/client.ts create mode 100644 nodejs/lancedb/remote/connection.ts create mode 100644 nodejs/lancedb/remote/index.ts create mode 100644 nodejs/lancedb/remote/table.ts create mode 100644 nodejs/lancedb/util.ts diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 48720aad..0849b4c7 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -14,7 +14,7 @@ repos: hooks: - id: local-biome-check name: biome check - entry: npx @biomejs/biome check --config-path nodejs/biome.json nodejs/ + entry: npx @biomejs/biome@1.7.3 check --config-path nodejs/biome.json nodejs/ language: system types: [text] files: "nodejs/.*" diff --git a/nodejs/biome.json b/nodejs/biome.json index 26bb2a4d..61ed1209 100644 --- a/nodejs/biome.json +++ b/nodejs/biome.json @@ -77,7 +77,7 @@ "noDuplicateObjectKeys": "error", "noDuplicateParameters": "error", "noEmptyBlockStatements": "error", - "noExplicitAny": "error", + "noExplicitAny": "warn", "noExtraNonNullAssertion": "error", "noFallthroughSwitchClause": "error", "noFunctionAssign": "error", diff --git a/nodejs/lancedb/connection.ts b/nodejs/lancedb/connection.ts index e8f795db..f38b8ce3 100644 --- a/nodejs/lancedb/connection.ts +++ b/nodejs/lancedb/connection.ts @@ -13,37 +13,10 @@ // limitations under the License. import { Table as ArrowTable, Schema } from "./arrow"; -import { - fromTableToBuffer, - isArrowTable, - makeArrowTable, - makeEmptyTable, -} from "./arrow"; +import { fromTableToBuffer, makeEmptyTable } from "./arrow"; import { EmbeddingFunctionConfig, getRegistry } from "./embedding/registry"; -import { ConnectionOptions, Connection as LanceDbConnection } from "./native"; -import { Table } from "./table"; - -/** - * Connect to a LanceDB instance at the given URI. - * - * Accepted formats: - * - * - `/path/to/database` - local database - * - `s3://bucket/path/to/database` or `gs://bucket/path/to/database` - database on cloud storage - * - `db://host:port` - remote database (LanceDB cloud) - * @param {string} uri - The uri of the database. If the database uri starts - * with `db://` then it connects to a remote database. - * @see {@link ConnectionOptions} for more details on the URI format. - */ -export async function connect( - uri: string, - opts?: Partial, -): Promise { - opts = opts ?? {}; - opts.storageOptions = cleanseStorageOptions(opts.storageOptions); - const nativeConn = await LanceDbConnection.new(uri, opts); - return new Connection(nativeConn); -} +import { Connection as LanceDbConnection } from "./native"; +import { LocalTable, Table } from "./table"; export interface CreateTableOptions { /** @@ -117,7 +90,6 @@ export interface TableNamesOptions { /** An optional limit to the number of results to return. */ limit?: number; } - /** * A LanceDB Connection that allows you to open tables and create new ones. * @@ -136,17 +108,15 @@ export interface TableNamesOptions { * Any created tables are independent and will continue to work even if * the underlying connection has been closed. */ -export class Connection { - readonly inner: LanceDbConnection; - - constructor(inner: LanceDbConnection) { - this.inner = inner; +export abstract class Connection { + [Symbol.for("nodejs.util.inspect.custom")](): string { + return this.display(); } - /** Return true if the connection has not been closed */ - isOpen(): boolean { - return this.inner.isOpen(); - } + /** + * Return true if the connection has not been closed + */ + abstract isOpen(): boolean; /** * Close the connection, releasing any underlying resources. @@ -155,14 +125,12 @@ export class Connection { * * Any attempt to use the connection after it is closed will result in an error. */ - close(): void { - this.inner.close(); - } + abstract close(): void; - /** Return a brief description of the connection */ - display(): string { - return this.inner.display(); - } + /** + * Return a brief description of the connection + */ + abstract display(): string; /** * List all the table names in this database. @@ -170,15 +138,73 @@ export class Connection { * Tables will be returned in lexicographical order. * @param {Partial} options - options to control the * paging / start point + * */ - async tableNames(options?: Partial): Promise { - return this.inner.tableNames(options?.startAfter, options?.limit); - } + abstract tableNames(options?: Partial): Promise; /** * Open a table in the database. * @param {string} name - The name of the table */ + abstract openTable( + name: string, + options?: Partial, + ): Promise; + + /** + * Creates a new Table and initialize it with new data. + * @param {string} name - The name of the table. + * @param {Record[] | ArrowTable} data - Non-empty Array of Records + * to be inserted into the table + */ + abstract createTable( + name: string, + data: Record[] | ArrowTable, + options?: Partial, + ): Promise
; + + /** + * Creates a new empty Table + * @param {string} name - The name of the table. + * @param {Schema} schema - The schema of the table + */ + abstract createEmptyTable( + name: string, + schema: Schema, + options?: Partial, + ): Promise
; + + /** + * Drop an existing table. + * @param {string} name The name of the table to drop. + */ + abstract dropTable(name: string): Promise; +} + +export class LocalConnection extends Connection { + readonly inner: LanceDbConnection; + + constructor(inner: LanceDbConnection) { + super(); + this.inner = inner; + } + + isOpen(): boolean { + return this.inner.isOpen(); + } + + close(): void { + this.inner.close(); + } + + display(): string { + return this.inner.display(); + } + + async tableNames(options?: Partial): Promise { + return this.inner.tableNames(options?.startAfter, options?.limit); + } + async openTable( name: string, options?: Partial, @@ -189,39 +215,15 @@ export class Connection { options?.indexCacheSize, ); - return new Table(innerTable); + return new LocalTable(innerTable); } - /** - * Creates a new Table and initialize it with new data. - * @param {string} name - The name of the table. - * @param {Record[] | ArrowTable} data - Non-empty Array of Records - * to be inserted into the table - */ async createTable( name: string, data: Record[] | ArrowTable, options?: Partial, ): Promise
{ - let mode: string = options?.mode ?? "create"; - const existOk = options?.existOk ?? false; - - if (mode === "create" && existOk) { - mode = "exist_ok"; - } - - let table: ArrowTable; - if (isArrowTable(data)) { - table = data; - } else { - table = makeArrowTable(data, options); - } - - const buf = await fromTableToBuffer( - table, - options?.embeddingFunction, - options?.schema, - ); + const { buf, mode } = await Table.parseTableData(data, options); const innerTable = await this.inner.createTable( name, buf, @@ -230,14 +232,9 @@ export class Connection { options?.useLegacyFormat, ); - return new Table(innerTable); + return new LocalTable(innerTable); } - /** - * Creates a new empty Table - * @param {string} name - The name of the table. - * @param {Schema} schema - The schema of the table - */ async createEmptyTable( name: string, schema: Schema, @@ -265,13 +262,9 @@ export class Connection { cleanseStorageOptions(options?.storageOptions), options?.useLegacyFormat, ); - return new Table(innerTable); + return new LocalTable(innerTable); } - /** - * Drop an existing table. - * @param {string} name The name of the table to drop. - */ async dropTable(name: string): Promise { return this.inner.dropTable(name); } @@ -280,7 +273,7 @@ export class Connection { /** * Takes storage options and makes all the keys snake case. */ -function cleanseStorageOptions( +export function cleanseStorageOptions( options?: Record, ): Record | undefined { if (options === undefined) { diff --git a/nodejs/lancedb/index.ts b/nodejs/lancedb/index.ts index 2cb7cf3f..3be59718 100644 --- a/nodejs/lancedb/index.ts +++ b/nodejs/lancedb/index.ts @@ -12,6 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +import { + Connection, + LocalConnection, + cleanseStorageOptions, +} from "./connection"; + +import { + ConnectionOptions, + Connection as LanceDbConnection, +} from "./native.js"; + +import { RemoteConnection, RemoteConnectionOptions } from "./remote"; + export { WriteOptions, WriteMode, @@ -19,18 +32,20 @@ export { ColumnAlteration, ConnectionOptions, } from "./native.js"; + export { makeArrowTable, MakeArrowTableOptions, Data, VectorColumnOptions, } from "./arrow"; + export { - connect, Connection, CreateTableOptions, TableNamesOptions, } from "./connection"; + export { ExecutableQuery, Query, @@ -38,6 +53,46 @@ export { VectorQuery, RecordBatchIterator, } from "./query"; + export { Index, IndexOptions, IvfPqOptions } from "./indices"; -export { Table, AddDataOptions, IndexConfig, UpdateOptions } from "./table"; + +export { + Table, + AddDataOptions, + IndexConfig, + UpdateOptions, +} from "./table"; + export * as embedding from "./embedding"; + +/** + * Connect to a LanceDB instance at the given URI. + * + * Accepted formats: + * + * - `/path/to/database` - local database + * - `s3://bucket/path/to/database` or `gs://bucket/path/to/database` - database on cloud storage + * - `db://host:port` - remote database (LanceDB cloud) + * @param {string} uri - The uri of the database. If the database uri starts + * with `db://` then it connects to a remote database. + * @see {@link ConnectionOptions} for more details on the URI format. + */ +export async function connect( + uri: string, + opts?: Partial, +): Promise { + if (!uri) { + throw new Error("uri is required"); + } + opts = opts ?? {}; + + if (uri?.startsWith("db://")) { + return new RemoteConnection(uri, opts as RemoteConnectionOptions); + } + opts = (opts as ConnectionOptions) ?? {}; + (opts).storageOptions = cleanseStorageOptions( + (opts).storageOptions, + ); + const nativeConn = await LanceDbConnection.new(uri, opts); + return new LocalConnection(nativeConn); +} diff --git a/nodejs/lancedb/remote/client.ts b/nodejs/lancedb/remote/client.ts new file mode 100644 index 00000000..d7cf9d3e --- /dev/null +++ b/nodejs/lancedb/remote/client.ts @@ -0,0 +1,221 @@ +// Copyright 2023 LanceDB Developers. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import axios, { + AxiosError, + type AxiosResponse, + type ResponseType, +} from "axios"; +import { Table as ArrowTable } from "../arrow"; +import { tableFromIPC } from "../arrow"; +import { VectorQuery } from "../query"; + +export class RestfulLanceDBClient { + #dbName: string; + #region: string; + #apiKey: string; + #hostOverride?: string; + #closed: boolean = false; + #connectionTimeout: number = 12 * 1000; // 12 seconds; + #readTimeout: number = 30 * 1000; // 30 seconds; + #session?: import("axios").AxiosInstance; + + constructor( + dbName: string, + apiKey: string, + region: string, + hostOverride?: string, + connectionTimeout?: number, + readTimeout?: number, + ) { + this.#dbName = dbName; + this.#apiKey = apiKey; + this.#region = region; + this.#hostOverride = hostOverride ?? this.#hostOverride; + this.#connectionTimeout = connectionTimeout ?? this.#connectionTimeout; + this.#readTimeout = readTimeout ?? this.#readTimeout; + } + + // todo: cache the session. + get session(): import("axios").AxiosInstance { + if (this.#session !== undefined) { + return this.#session; + } else { + return axios.create({ + baseURL: this.url, + headers: { + // biome-ignore lint/style/useNamingConvention: external api + Authorization: `Bearer ${this.#apiKey}`, + }, + transformResponse: decodeErrorData, + timeout: this.#connectionTimeout, + }); + } + } + + get url(): string { + return ( + this.#hostOverride ?? + `https://${this.#dbName}.${this.#region}.api.lancedb.com` + ); + } + + get headers(): { [key: string]: string } { + const headers: { [key: string]: string } = { + "x-api-key": this.#apiKey, + "x-request-id": "na", + }; + if (this.#region == "local") { + headers["Host"] = `${this.#dbName}.${this.#region}.api.lancedb.com`; + } + if (this.#hostOverride) { + headers["x-lancedb-database"] = this.#dbName; + } + return headers; + } + + isOpen(): boolean { + return !this.#closed; + } + + private checkNotClosed(): void { + if (this.#closed) { + throw new Error("Connection is closed"); + } + } + + close(): void { + this.#session = undefined; + this.#closed = true; + } + + // biome-ignore lint/suspicious/noExplicitAny: + async get(uri: string, params?: Record): Promise { + this.checkNotClosed(); + uri = new URL(uri, this.url).toString(); + let response; + try { + response = await this.session.get(uri, { + headers: this.headers, + params, + }); + } catch (e) { + if (e instanceof AxiosError) { + response = e.response; + } else { + throw e; + } + } + + RestfulLanceDBClient.checkStatus(response!); + return response!.data; + } + + // biome-ignore lint/suspicious/noExplicitAny: api response + async post(uri: string, body?: any): Promise; + async post( + uri: string, + // biome-ignore lint/suspicious/noExplicitAny: api request + body: any, + additional: { + config?: { responseType: "arraybuffer" }; + headers?: Record; + params?: Record; + }, + ): Promise; + async post( + uri: string, + // biome-ignore lint/suspicious/noExplicitAny: api request + body?: any, + additional?: { + config?: { responseType: ResponseType }; + headers?: Record; + params?: Record; + }, + // biome-ignore lint/suspicious/noExplicitAny: api response + ): Promise { + this.checkNotClosed(); + uri = new URL(uri, this.url).toString(); + additional = Object.assign( + { config: { responseType: "json" } }, + additional, + ); + + const headers = { ...this.headers, ...additional.headers }; + + if (!headers["Content-Type"]) { + headers["Content-Type"] = "application/json"; + } + let response; + try { + response = await this.session.post(uri, body, { + headers, + responseType: additional!.config!.responseType, + params: new Map(Object.entries(additional.params ?? {})), + }); + } catch (e) { + if (e instanceof AxiosError) { + response = e.response; + } else { + throw e; + } + } + RestfulLanceDBClient.checkStatus(response!); + if (additional!.config!.responseType === "arraybuffer") { + return response!.data; + } else { + return JSON.parse(response!.data); + } + } + + async listTables(limit = 10, pageToken = ""): Promise { + const json = await this.get("/v1/table", { limit, pageToken }); + return json.tables; + } + + async query(tableName: string, query: VectorQuery): Promise { + const tbl = await this.post(`/v1/table/${tableName}/query`, query, { + config: { + responseType: "arraybuffer", + }, + }); + return tableFromIPC(tbl); + } + + static checkStatus(response: AxiosResponse): void { + if (response.status === 404) { + throw new Error(`Not found: ${response.data}`); + } else if (response.status >= 400 && response.status < 500) { + throw new Error( + `Bad Request: ${response.status}, error: ${response.data}`, + ); + } else if (response.status >= 500 && response.status < 600) { + throw new Error( + `Internal Server Error: ${response.status}, error: ${response.data}`, + ); + } else if (response.status !== 200) { + throw new Error( + `Unknown Error: ${response.status}, error: ${response.data}`, + ); + } + } +} + +function decodeErrorData(data: unknown) { + if (Buffer.isBuffer(data)) { + const decoded = data.toString("utf-8"); + return decoded; + } + return data; +} diff --git a/nodejs/lancedb/remote/connection.ts b/nodejs/lancedb/remote/connection.ts new file mode 100644 index 00000000..ef6e5808 --- /dev/null +++ b/nodejs/lancedb/remote/connection.ts @@ -0,0 +1,187 @@ +import { Schema } from "apache-arrow"; +import { Data, fromTableToStreamBuffer, makeEmptyTable } from "../arrow"; +import { + Connection, + CreateTableOptions, + OpenTableOptions, + TableNamesOptions, +} from "../connection"; +import { Table } from "../table"; +import { TTLCache } from "../util"; +import { RestfulLanceDBClient } from "./client"; +import { RemoteTable } from "./table"; + +export interface RemoteConnectionOptions { + apiKey?: string; + region?: string; + hostOverride?: string; + connectionTimeout?: number; + readTimeout?: number; +} + +export class RemoteConnection extends Connection { + #dbName: string; + #apiKey: string; + #region: string; + #client: RestfulLanceDBClient; + #tableCache = new TTLCache(300_000); + + constructor( + url: string, + { + apiKey, + region, + hostOverride, + connectionTimeout, + readTimeout, + }: RemoteConnectionOptions, + ) { + super(); + apiKey = apiKey ?? process.env.LANCEDB_API_KEY; + region = region ?? process.env.LANCEDB_REGION; + + if (!apiKey) { + throw new Error("apiKey is required when connecting to LanceDB Cloud"); + } + + if (!region) { + throw new Error("region is required when connecting to LanceDB Cloud"); + } + + const parsed = new URL(url); + if (parsed.protocol !== "db:") { + throw new Error( + `invalid protocol: ${parsed.protocol}, only accepts db://`, + ); + } + + this.#dbName = parsed.hostname; + this.#apiKey = apiKey; + this.#region = region; + this.#client = new RestfulLanceDBClient( + this.#dbName, + this.#apiKey, + this.#region, + hostOverride, + connectionTimeout, + readTimeout, + ); + } + + isOpen(): boolean { + return this.#client.isOpen(); + } + close(): void { + return this.#client.close(); + } + + display(): string { + return `RemoteConnection(${this.#dbName})`; + } + + async tableNames(options?: Partial): Promise { + const response = await this.#client.get("/v1/table/", { + limit: options?.limit ?? 10, + // biome-ignore lint/style/useNamingConvention: + page_token: options?.startAfter ?? "", + }); + const body = await response.body(); + for (const table of body.tables) { + this.#tableCache.set(table, true); + } + return body.tables; + } + + async openTable( + name: string, + _options?: Partial | undefined, + ): Promise
{ + if (this.#tableCache.get(name) === undefined) { + await this.#client.post( + `/v1/table/${encodeURIComponent(name)}/describe/`, + ); + this.#tableCache.set(name, true); + } + return new RemoteTable(this.#client, name, this.#dbName); + } + + async createTable( + tableName: string, + data: Data, + options?: Partial | undefined, + ): Promise
{ + if (options?.mode) { + console.warn( + "option 'mode' is not supported in LanceDB Cloud", + "LanceDB Cloud only supports the default 'create' mode.", + "If the table already exists, an error will be thrown.", + ); + } + if (options?.embeddingFunction) { + console.warn( + "embedding_functions is not yet supported on LanceDB Cloud.", + "Please vote https://github.com/lancedb/lancedb/issues/626 ", + "for this feature.", + ); + } + + const { buf } = await Table.parseTableData( + data, + options, + true /** streaming */, + ); + + await this.#client.post( + `/v1/table/${encodeURIComponent(tableName)}/create/`, + buf, + { + config: { + responseType: "arraybuffer", + }, + headers: { "Content-Type": "application/vnd.apache.arrow.stream" }, + }, + ); + this.#tableCache.set(tableName, true); + return new RemoteTable(this.#client, tableName, this.#dbName); + } + + async createEmptyTable( + name: string, + schema: Schema, + options?: Partial | undefined, + ): Promise
{ + if (options?.mode) { + console.warn(`mode is not supported on LanceDB Cloud`); + } + + if (options?.embeddingFunction) { + console.warn( + "embeddingFunction is not yet supported on LanceDB Cloud.", + "Please vote https://github.com/lancedb/lancedb/issues/626 ", + "for this feature.", + ); + } + const emptyTable = makeEmptyTable(schema); + const buf = await fromTableToStreamBuffer(emptyTable); + + await this.#client.post( + `/v1/table/${encodeURIComponent(name)}/create/`, + buf, + { + config: { + responseType: "arraybuffer", + }, + headers: { "Content-Type": "application/vnd.apache.arrow.stream" }, + }, + ); + + this.#tableCache.set(name, true); + return new RemoteTable(this.#client, name, this.#dbName); + } + + async dropTable(name: string): Promise { + await this.#client.post(`/v1/table/${encodeURIComponent(name)}/drop/`); + + this.#tableCache.delete(name); + } +} diff --git a/nodejs/lancedb/remote/index.ts b/nodejs/lancedb/remote/index.ts new file mode 100644 index 00000000..d1faaae9 --- /dev/null +++ b/nodejs/lancedb/remote/index.ts @@ -0,0 +1,3 @@ +export { RestfulLanceDBClient } from "./client"; +export { type RemoteConnectionOptions, RemoteConnection } from "./connection"; +export { RemoteTable } from "./table"; diff --git a/nodejs/lancedb/remote/table.ts b/nodejs/lancedb/remote/table.ts new file mode 100644 index 00000000..a9760adf --- /dev/null +++ b/nodejs/lancedb/remote/table.ts @@ -0,0 +1,164 @@ +// Copyright 2023 LanceDB Developers. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import { Table as ArrowTable } from "apache-arrow"; + +import { Data, IntoVector } from "../arrow"; + +import { CreateTableOptions } from "../connection"; +import { IndexOptions } from "../indices"; +import { MergeInsertBuilder } from "../merge"; +import { VectorQuery } from "../query"; +import { AddDataOptions, Table, UpdateOptions } from "../table"; +import { RestfulLanceDBClient } from "./client"; + +export class RemoteTable extends Table { + #client: RestfulLanceDBClient; + #name: string; + + // Used in the display() method + #dbName: string; + + get #tablePrefix() { + return `/v1/table/${encodeURIComponent(this.#name)}/`; + } + + public constructor( + client: RestfulLanceDBClient, + tableName: string, + dbName: string, + ) { + super(); + this.#client = client; + this.#name = tableName; + this.#dbName = dbName; + } + + isOpen(): boolean { + return !this.#client.isOpen(); + } + + close(): void { + this.#client.close(); + } + + display(): string { + return `RemoteTable(${this.#dbName}; ${this.#name})`; + } + + async schema(): Promise { + const resp = await this.#client.post(`${this.#tablePrefix}/describe/`); + // TODO: parse this into a valid arrow schema + return resp.schema; + } + async add(data: Data, options?: Partial): Promise { + const { buf, mode } = await Table.parseTableData( + data, + options as CreateTableOptions, + true, + ); + await this.#client.post(`${this.#tablePrefix}/insert/`, buf, { + params: { + mode, + }, + headers: { + "Content-Type": "application/vnd.apache.arrow.stream", + }, + }); + } + + async update( + updates: Map | Record, + options?: Partial, + ): Promise { + await this.#client.post(`${this.#tablePrefix}/update/`, { + predicate: options?.where ?? null, + updates: Object.entries(updates).map(([key, value]) => [key, value]), + }); + } + async countRows(filter?: unknown): Promise { + const payload = { predicate: filter }; + return await this.#client.post(`${this.#tablePrefix}/count_rows/`, payload); + } + + async delete(predicate: unknown): Promise { + const payload = { predicate }; + await this.#client.post(`${this.#tablePrefix}/delete/`, payload); + } + async createIndex( + column: string, + options?: Partial, + ): Promise { + if (options !== undefined) { + console.warn("options are not yet supported on the LanceDB cloud"); + } + const indexType = "vector"; + const metric = "L2"; + const data = { + column, + // biome-ignore lint/style/useNamingConvention: external API + index_type: indexType, + // biome-ignore lint/style/useNamingConvention: external API + metric_type: metric, + }; + await this.#client.post(`${this.#tablePrefix}/create_index`, data); + } + query(): import("..").Query { + throw new Error("query() is not yet supported on the LanceDB cloud"); + } + search(query: IntoVector): VectorQuery; + search(query: string): Promise; + search(_query: string | IntoVector): VectorQuery | Promise { + throw new Error("search() is not yet supported on the LanceDB cloud"); + } + vectorSearch(_vector: unknown): import("..").VectorQuery { + throw new Error("vectorSearch() is not yet supported on the LanceDB cloud"); + } + addColumns(_newColumnTransforms: unknown): Promise { + throw new Error("addColumns() is not yet supported on the LanceDB cloud"); + } + alterColumns(_columnAlterations: unknown): Promise { + throw new Error("alterColumns() is not yet supported on the LanceDB cloud"); + } + dropColumns(_columnNames: unknown): Promise { + throw new Error("dropColumns() is not yet supported on the LanceDB cloud"); + } + async version(): Promise { + const resp = await this.#client.post(`${this.#tablePrefix}/describe/`); + return resp.version; + } + checkout(_version: unknown): Promise { + throw new Error("checkout() is not yet supported on the LanceDB cloud"); + } + checkoutLatest(): Promise { + throw new Error( + "checkoutLatest() is not yet supported on the LanceDB cloud", + ); + } + restore(): Promise { + throw new Error("restore() is not yet supported on the LanceDB cloud"); + } + optimize(_options?: unknown): Promise { + throw new Error("optimize() is not yet supported on the LanceDB cloud"); + } + async listIndices(): Promise { + return await this.#client.post(`${this.#tablePrefix}/index/list/`); + } + toArrow(): Promise { + throw new Error("toArrow() is not yet supported on the LanceDB cloud"); + } + mergeInsert(_on: string | string[]): MergeInsertBuilder { + throw new Error("mergeInsert() is not yet supported on the LanceDB cloud"); + } +} diff --git a/nodejs/lancedb/table.ts b/nodejs/lancedb/table.ts index 22e460d9..ceb805b3 100644 --- a/nodejs/lancedb/table.ts +++ b/nodejs/lancedb/table.ts @@ -18,8 +18,13 @@ import { IntoVector, Schema, fromDataToBuffer, + fromTableToBuffer, + fromTableToStreamBuffer, + isArrowTable, + makeArrowTable, tableFromIPC, } from "./arrow"; +import { CreateTableOptions } from "./connection"; import { EmbeddingFunctionConfig, getRegistry } from "./embedding/registry"; import { IndexOptions } from "./indices"; @@ -89,19 +94,13 @@ export interface OptimizeOptions { * Closing a table is optional. It not closed, it will be closed when it is garbage * collected. */ -export class Table { - private readonly inner: _NativeTable; - - /** Construct a Table. Internal use only. */ - constructor(inner: _NativeTable) { - this.inner = inner; +export abstract class Table { + [Symbol.for("nodejs.util.inspect.custom")](): string { + return this.display(); } /** Return true if the table has not been closed */ - isOpen(): boolean { - return this.inner.isOpen(); - } - + abstract isOpen(): boolean; /** * Close the table, releasing any underlying resources. * @@ -109,48 +108,16 @@ export class Table { * * Any attempt to use the table after it is closed will result in an error. */ - close(): void { - this.inner.close(); - } - + abstract close(): void; /** Return a brief description of the table */ - display(): string { - return this.inner.display(); - } - - async #getEmbeddingFunctions(): Promise< - Map - > { - const schema = await this.schema(); - const registry = getRegistry(); - return registry.parseFunctions(schema.metadata); - } - + abstract display(): string; /** Get the schema of the table. */ - async schema(): Promise { - const schemaBuf = await this.inner.schema(); - const tbl = tableFromIPC(schemaBuf); - return tbl.schema; - } - + abstract schema(): Promise; /** * Insert records into this Table. * @param {Data} data Records to be inserted into the Table */ - async add(data: Data, options?: Partial): Promise { - const mode = options?.mode ?? "append"; - const schema = await this.schema(); - const registry = getRegistry(); - const functions = registry.parseFunctions(schema.metadata); - - const buffer = await fromDataToBuffer( - data, - functions.values().next().value, - schema, - ); - await this.inner.add(buffer, mode); - } - + abstract add(data: Data, options?: Partial): Promise; /** * Update existing records in the Table * @@ -176,30 +143,14 @@ export class Table { * @param {Partial} options - additional options to control * the update behavior */ - async update( + abstract update( updates: Map | Record, options?: Partial, - ) { - const onlyIf = options?.where; - let columns: [string, string][]; - if (updates instanceof Map) { - columns = Array.from(updates.entries()); - } else { - columns = Object.entries(updates); - } - await this.inner.update(onlyIf, columns); - } - + ): Promise; /** Count the total number of rows in the dataset. */ - async countRows(filter?: string): Promise { - return await this.inner.countRows(filter); - } - + abstract countRows(filter?: string): Promise; /** Delete the rows that satisfy the predicate. */ - async delete(predicate: string): Promise { - await this.inner.delete(predicate); - } - + abstract delete(predicate: string): Promise; /** * Create an index to speed up queries. * @@ -226,13 +177,10 @@ export class Table { * // Or create a Scalar index * await table.createIndex("my_float_col"); */ - async createIndex(column: string, options?: Partial) { - // Bit of a hack to get around the fact that TS has no package-scope. - // biome-ignore lint/suspicious/noExplicitAny: skip - const nativeIndex = (options?.config as any)?.inner; - await this.inner.createIndex(nativeIndex, column, options?.replace); - } - + abstract createIndex( + column: string, + options?: Partial, + ): Promise; /** * Create a {@link Query} Builder. * @@ -283,44 +231,20 @@ export class Table { * } * @returns {Query} A builder that can be used to parameterize the query */ - query(): Query { - return new Query(this.inner); - } - + abstract query(): Query; /** * Create a search query to find the nearest neighbors * of the given query vector * @param {string} query - the query. This will be converted to a vector using the table's provided embedding function * @rejects {Error} If no embedding functions are defined in the table */ - search(query: string): Promise; + abstract search(query: string): Promise; /** * Create a search query to find the nearest neighbors * of the given query vector * @param {IntoVector} query - the query vector */ - search(query: IntoVector): VectorQuery; - search(query: string | IntoVector): Promise | VectorQuery { - if (typeof query !== "string") { - return this.vectorSearch(query); - } else { - return this.#getEmbeddingFunctions().then(async (functions) => { - // TODO: Support multiple embedding functions - const embeddingFunc: EmbeddingFunctionConfig | undefined = functions - .values() - .next().value; - if (!embeddingFunc) { - return Promise.reject( - new Error("No embedding functions are defined in the table"), - ); - } - const embeddings = - await embeddingFunc.function.computeQueryEmbeddings(query); - return this.query().nearestTo(embeddings); - }); - } - } - + abstract search(query: IntoVector): VectorQuery; /** * Search the table with a given query vector. * @@ -328,11 +252,7 @@ export class Table { * is the same thing as calling `nearestTo` on the builder returned * by `query`. @see {@link Query#nearestTo} for more details. */ - vectorSearch(vector: IntoVector): VectorQuery { - return this.query().nearestTo(vector); - } - - // TODO: Support BatchUDF + abstract vectorSearch(vector: IntoVector): VectorQuery; /** * Add new columns with defined values. * @param {AddColumnsSql[]} newColumnTransforms pairs of column names and @@ -340,19 +260,14 @@ export class Table { * expressions will be evaluated for each row in the table, and can * reference existing columns in the table. */ - async addColumns(newColumnTransforms: AddColumnsSql[]): Promise { - await this.inner.addColumns(newColumnTransforms); - } + abstract addColumns(newColumnTransforms: AddColumnsSql[]): Promise; /** * Alter the name or nullability of columns. * @param {ColumnAlteration[]} columnAlterations One or more alterations to * apply to columns. */ - async alterColumns(columnAlterations: ColumnAlteration[]): Promise { - await this.inner.alterColumns(columnAlterations); - } - + abstract alterColumns(columnAlterations: ColumnAlteration[]): Promise; /** * Drop one or more columns from the dataset * @@ -364,15 +279,10 @@ export class Table { * be nested column references (e.g. "a.b.c") or top-level column names * (e.g. "a"). */ - async dropColumns(columnNames: string[]): Promise { - await this.inner.dropColumns(columnNames); - } - + abstract dropColumns(columnNames: string[]): Promise; /** Retrieve the version of the table */ - async version(): Promise { - return await this.inner.version(); - } + abstract version(): Promise; /** * Checks out a specific version of the table _This is an in-place operation._ * @@ -398,19 +308,14 @@ export class Table { * console.log(await table.version()); // 2 * ``` */ - async checkout(version: number): Promise { - await this.inner.checkout(version); - } - + abstract checkout(version: number): Promise; /** * Checkout the latest version of the table. _This is an in-place operation._ * * The table will be set back into standard mode, and will track the latest * version of the table. */ - async checkoutLatest(): Promise { - await this.inner.checkoutLatest(); - } + abstract checkoutLatest(): Promise; /** * Restore the table to the currently checked out version @@ -424,10 +329,7 @@ export class Table { * Once the operation concludes the table will no longer be in a checked * out state and the read_consistency_interval, if any, will apply. */ - async restore(): Promise { - await this.inner.restore(); - } - + abstract restore(): Promise; /** * Optimize the on-disk data and indices for better performance. * @@ -458,6 +360,191 @@ export class Table { * you have added or modified 100,000 or more records or run more than 20 data * modification operations. */ + abstract optimize(options?: Partial): Promise; + /** List all indices that have been created with {@link Table.createIndex} */ + abstract listIndices(): Promise; + /** Return the table as an arrow table */ + abstract toArrow(): Promise; + + abstract mergeInsert(on: string | string[]): MergeInsertBuilder; + + static async parseTableData( + // biome-ignore lint/suspicious/noExplicitAny: + data: Record[] | ArrowTable, + options?: Partial, + streaming = false, + ) { + let mode: string = options?.mode ?? "create"; + const existOk = options?.existOk ?? false; + + if (mode === "create" && existOk) { + mode = "exist_ok"; + } + + let table: ArrowTable; + if (isArrowTable(data)) { + table = data; + } else { + table = makeArrowTable(data, options); + } + if (streaming) { + const buf = await fromTableToStreamBuffer( + table, + options?.embeddingFunction, + options?.schema, + ); + return { buf, mode }; + } else { + const buf = await fromTableToBuffer( + table, + options?.embeddingFunction, + options?.schema, + ); + return { buf, mode }; + } + } +} + +export class LocalTable extends Table { + private readonly inner: _NativeTable; + + constructor(inner: _NativeTable) { + super(); + this.inner = inner; + } + + isOpen(): boolean { + return this.inner.isOpen(); + } + + close(): void { + this.inner.close(); + } + + display(): string { + return this.inner.display(); + } + + private async getEmbeddingFunctions(): Promise< + Map + > { + const schema = await this.schema(); + const registry = getRegistry(); + return registry.parseFunctions(schema.metadata); + } + + /** Get the schema of the table. */ + async schema(): Promise { + const schemaBuf = await this.inner.schema(); + const tbl = tableFromIPC(schemaBuf); + return tbl.schema; + } + + async add(data: Data, options?: Partial): Promise { + const mode = options?.mode ?? "append"; + const schema = await this.schema(); + const registry = getRegistry(); + const functions = registry.parseFunctions(schema.metadata); + + const buffer = await fromDataToBuffer( + data, + functions.values().next().value, + schema, + ); + await this.inner.add(buffer, mode); + } + + async update( + updates: Map | Record, + options?: Partial, + ) { + const onlyIf = options?.where; + let columns: [string, string][]; + if (updates instanceof Map) { + columns = Array.from(updates.entries()); + } else { + columns = Object.entries(updates); + } + await this.inner.update(onlyIf, columns); + } + + async countRows(filter?: string): Promise { + return await this.inner.countRows(filter); + } + + async delete(predicate: string): Promise { + await this.inner.delete(predicate); + } + + async createIndex(column: string, options?: Partial) { + // Bit of a hack to get around the fact that TS has no package-scope. + // biome-ignore lint/suspicious/noExplicitAny: skip + const nativeIndex = (options?.config as any)?.inner; + await this.inner.createIndex(nativeIndex, column, options?.replace); + } + + query(): Query { + return new Query(this.inner); + } + + search(query: string): Promise; + + search(query: IntoVector): VectorQuery; + search(query: string | IntoVector): Promise | VectorQuery { + if (typeof query !== "string") { + return this.vectorSearch(query); + } else { + return this.getEmbeddingFunctions().then(async (functions) => { + // TODO: Support multiple embedding functions + const embeddingFunc: EmbeddingFunctionConfig | undefined = functions + .values() + .next().value; + if (!embeddingFunc) { + return Promise.reject( + new Error("No embedding functions are defined in the table"), + ); + } + const embeddings = + await embeddingFunc.function.computeQueryEmbeddings(query); + return this.query().nearestTo(embeddings); + }); + } + } + + vectorSearch(vector: IntoVector): VectorQuery { + return this.query().nearestTo(vector); + } + + // TODO: Support BatchUDF + + async addColumns(newColumnTransforms: AddColumnsSql[]): Promise { + await this.inner.addColumns(newColumnTransforms); + } + + async alterColumns(columnAlterations: ColumnAlteration[]): Promise { + await this.inner.alterColumns(columnAlterations); + } + + async dropColumns(columnNames: string[]): Promise { + await this.inner.dropColumns(columnNames); + } + + async version(): Promise { + return await this.inner.version(); + } + + async checkout(version: number): Promise { + await this.inner.checkout(version); + } + + async checkoutLatest(): Promise { + await this.inner.checkoutLatest(); + } + + async restore(): Promise { + await this.inner.restore(); + } + async optimize(options?: Partial): Promise { let cleanupOlderThanMs; if ( @@ -470,15 +557,14 @@ export class Table { return await this.inner.optimize(cleanupOlderThanMs); } - /** List all indices that have been created with {@link Table.createIndex} */ async listIndices(): Promise { return await this.inner.listIndices(); } - /** Return the table as an arrow table */ async toArrow(): Promise { return await this.query().toArrow(); } + mergeInsert(on: string | string[]): MergeInsertBuilder { on = Array.isArray(on) ? on : [on]; return new MergeInsertBuilder(this.inner.mergeInsert(on)); diff --git a/nodejs/lancedb/util.ts b/nodejs/lancedb/util.ts new file mode 100644 index 00000000..6e9e696a --- /dev/null +++ b/nodejs/lancedb/util.ts @@ -0,0 +1,35 @@ +export class TTLCache { + // biome-ignore lint/suspicious/noExplicitAny: + private readonly cache: Map; + + /** + * @param ttl Time to live in milliseconds + */ + constructor(private readonly ttl: number) { + this.cache = new Map(); + } + + // biome-ignore lint/suspicious/noExplicitAny: + get(key: string): any | undefined { + const entry = this.cache.get(key); + if (entry === undefined) { + return undefined; + } + + if (entry.expires < Date.now()) { + this.cache.delete(key); + return undefined; + } + + return entry.value; + } + + // biome-ignore lint/suspicious/noExplicitAny: + set(key: string, value: any): void { + this.cache.set(key, { value, expires: Date.now() + this.ttl }); + } + + delete(key: string): void { + this.cache.delete(key); + } +} diff --git a/nodejs/package-lock.json b/nodejs/package-lock.json index cc55fc28..d621d4cc 100644 --- a/nodejs/package-lock.json +++ b/nodejs/package-lock.json @@ -1,12 +1,12 @@ { "name": "@lancedb/lancedb", - "version": "0.5.1", + "version": "0.5.2", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@lancedb/lancedb", - "version": "0.5.1", + "version": "0.5.2", "cpu": [ "x64", "arm64" @@ -18,7 +18,10 @@ "win32" ], "dependencies": { + "@types/axios": "^0.14.0", "apache-arrow": "^15.0.0", + "axios": "^1.7.2", + "memoize": "^10.0.0", "openai": "^4.29.2", "reflect-metadata": "^0.2.2" }, @@ -3123,6 +3126,15 @@ "tslib": "^2.4.0" } }, + "node_modules/@types/axios": { + "version": "0.14.0", + "resolved": "https://registry.npmjs.org/@types/axios/-/axios-0.14.0.tgz", + "integrity": "sha512-KqQnQbdYE54D7oa/UmYVMZKq7CO4l8DEENzOKc4aBRwxCXSlJXGz83flFx5L7AWrOQnmuN3kVsRdt+GZPPjiVQ==", + "deprecated": "This is a stub types definition for axios (https://github.com/mzabriskie/axios). axios provides its own type definitions, so you don't need @types/axios installed!", + "dependencies": { + "axios": "*" + } + }, "node_modules/@types/babel__core": { "version": "7.20.5", "resolved": "https://registry.npmjs.org/@types/babel__core/-/babel__core-7.20.5.tgz", @@ -3497,6 +3509,16 @@ "resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz", "integrity": "sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q==" }, + "node_modules/axios": { + "version": "1.7.2", + "resolved": "https://registry.npmjs.org/axios/-/axios-1.7.2.tgz", + "integrity": "sha512-2A8QhOMrbomlDuiLeK9XibIBzuHeRcqqNOHp0Cyp5EoJ1IFDh+XZH3A6BkXtv0K4gFGCI0Y4BM7B1wOEi0Rmgw==", + "dependencies": { + "follow-redirects": "^1.15.6", + "form-data": "^4.0.0", + "proxy-from-env": "^1.1.0" + } + }, "node_modules/babel-jest": { "version": "29.7.0", "resolved": "https://registry.npmjs.org/babel-jest/-/babel-jest-29.7.0.tgz", @@ -4478,6 +4500,25 @@ "integrity": "sha512-36yxDn5H7OFZQla0/jFJmbIKTdZAQHngCedGxiMmpNfEZM0sdEeT+WczLQrjK6D7o2aiyLYDnkw0R3JK0Qv1RQ==", "dev": true }, + "node_modules/follow-redirects": { + "version": "1.15.6", + "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.15.6.tgz", + "integrity": "sha512-wWN62YITEaOpSK584EZXJafH1AGpO8RVgElfkuXbTOrPX4fIfOyEpW/CsiNd8JdYrAoOvafRTOEnvsO++qCqFA==", + "funding": [ + { + "type": "individual", + "url": "https://github.com/sponsors/RubenVerborgh" + } + ], + "engines": { + "node": ">=4.0" + }, + "peerDependenciesMeta": { + "debug": { + "optional": true + } + } + }, "node_modules/form-data": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/form-data/-/form-data-4.0.0.tgz", @@ -5901,6 +5942,20 @@ "is-buffer": "~1.1.6" } }, + "node_modules/memoize": { + "version": "10.0.0", + "resolved": "https://registry.npmjs.org/memoize/-/memoize-10.0.0.tgz", + "integrity": "sha512-H6cBLgsi6vMWOcCpvVCdFFnl3kerEXbrYh9q+lY6VXvQSmM6CkmV08VOwT+WE2tzIEqRPFfAq3fm4v/UIW6mSA==", + "dependencies": { + "mimic-function": "^5.0.0" + }, + "engines": { + "node": ">=18" + }, + "funding": { + "url": "https://github.com/sindresorhus/memoize?sponsor=1" + } + }, "node_modules/merge-stream": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/merge-stream/-/merge-stream-2.0.0.tgz", @@ -5948,6 +6003,17 @@ "node": ">= 0.6" } }, + "node_modules/mimic-function": { + "version": "5.0.1", + "resolved": "https://registry.npmjs.org/mimic-function/-/mimic-function-5.0.1.tgz", + "integrity": "sha512-VP79XUPxV2CigYP3jWwAUFSku2aKqBH7uTAapFWCBqutsbmDo96KY5o8uh6U+/YSIn5OxJnXp73beVkpqMIGhA==", + "engines": { + "node": ">=18" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/minimatch": { "version": "3.1.2", "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-3.1.2.tgz", @@ -6359,6 +6425,11 @@ "node": ">= 6" } }, + "node_modules/proxy-from-env": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/proxy-from-env/-/proxy-from-env-1.1.0.tgz", + "integrity": "sha512-D+zkORCbA9f1tdWRK0RaCR3GPv50cMxcrz4X8k5LTSUD1Dkw47mKJEZQNunItRTkWwgtaUSo1RVFRIG9ZXiFYg==" + }, "node_modules/punycode": { "version": "2.3.1", "resolved": "https://registry.npmjs.org/punycode/-/punycode-2.3.1.tgz", diff --git a/nodejs/package.json b/nodejs/package.json index 623d0a00..4e2255a9 100644 --- a/nodejs/package.json +++ b/nodejs/package.json @@ -65,7 +65,9 @@ "version": "napi version" }, "dependencies": { + "@types/axios": "^0.14.0", "apache-arrow": "^15.0.0", + "axios": "^1.7.2", "openai": "^4.29.2", "reflect-metadata": "^0.2.2" }