Compare commits

...

8 Commits

Author SHA1 Message Date
Lance Release
515ab5f417 Bump version: 0.14.0-beta.1 → 0.14.0 2024-10-09 18:53:35 +00:00
Lance Release
8d0055fe6b Bump version: 0.14.0-beta.0 → 0.14.0-beta.1 2024-10-09 18:53:34 +00:00
Will Jones
5f9d8509b3 feat: upgrade Lance to v0.18.2 (#1737)
Includes changes from v0.18.1 and v0.18.2:

* [v0.18.1 change
log](https://github.com/lancedb/lance/releases/tag/v0.18.1)
* [v0.18.2 change
log](https://github.com/lancedb/lance/releases/tag/v0.18.2)

Closes #1656
Closes #1615
Closes #1661
2024-10-09 11:46:46 -06:00
Will Jones
f3b6a1f55b feat(node): bind remote SDK to rust implementation (#1730)
Closes [#2509](https://github.com/lancedb/sophon/issues/2509)

This is the Node.js analogue of #1700
2024-10-09 11:46:27 -06:00
Will Jones
aff25e3bf9 fix(node): add native packages to bump version (#1738)
We weren't bumping the version, so when users downloaded our package
from npm, they were getting the old binaries.
2024-10-08 23:03:53 -06:00
Will Jones
8509f73221 feat: better errors for remote SDK (#1722)
* Adds nicer errors to remote SDK, that expose useful properties like
`request_id` and `status_code`.
* Makes sure the Python tracebacks print nicely by mapping the `source`
field from a Rust error to the `__cause__` field.
2024-10-08 22:21:13 -06:00
Will Jones
607476788e feat(rust): list_indices in remote SDK (#1726)
Implements `list_indices`.

---------

Co-authored-by: Weston Pace <weston.pace@gmail.com>
2024-10-08 21:45:21 -06:00
Gagan Bhullar
4d458d5829 feat(python): drop support for dictionary in Table.add (#1725)
PR closes #1706
2024-10-08 20:41:08 -06:00
36 changed files with 1957 additions and 1523 deletions

View File

@@ -66,6 +66,32 @@ glob = "nodejs/npm/*/package.json"
replace = "\"version\": \"{new_version}\","
search = "\"version\": \"{current_version}\","
# vectodb node binary packages
[[tool.bumpversion.files]]
glob = "node/package.json"
replace = "\"@lancedb/vectordb-darwin-arm64\": \"{new_version}\""
search = "\"@lancedb/vectordb-darwin-arm64\": \"{current_version}\""
[[tool.bumpversion.files]]
glob = "node/package.json"
replace = "\"@lancedb/vectordb-darwin-x64\": \"{new_version}\""
search = "\"@lancedb/vectordb-darwin-x64\": \"{current_version}\""
[[tool.bumpversion.files]]
glob = "node/package.json"
replace = "\"@lancedb/vectordb-linux-arm64-gnu\": \"{new_version}\""
search = "\"@lancedb/vectordb-linux-arm64-gnu\": \"{current_version}\""
[[tool.bumpversion.files]]
glob = "node/package.json"
replace = "\"@lancedb/vectordb-linux-x64-gnu\": \"{new_version}\""
search = "\"@lancedb/vectordb-linux-x64-gnu\": \"{current_version}\""
[[tool.bumpversion.files]]
glob = "node/package.json"
replace = "\"@lancedb/vectordb-win32-x64-msvc\": \"{new_version}\""
search = "\"@lancedb/vectordb-win32-x64-msvc\": \"{current_version}\""
# Cargo files
# ------------
[[tool.bumpversion.files]]
@@ -77,3 +103,8 @@ search = "\nversion = \"{current_version}\""
filename = "rust/lancedb/Cargo.toml"
replace = "\nversion = \"{new_version}\""
search = "\nversion = \"{current_version}\""
[[tool.bumpversion.files]]
filename = "nodejs/Cargo.toml"
replace = "\nversion = \"{new_version}\""
search = "\nversion = \"{current_version}\""

View File

@@ -20,13 +20,13 @@ keywords = ["lancedb", "lance", "database", "vector", "search"]
categories = ["database-implementations"]
[workspace.dependencies]
lance = { "version" = "=0.18.0", "features" = ["dynamodb"] }
lance-index = { "version" = "=0.18.0" }
lance-linalg = { "version" = "=0.18.0" }
lance-table = { "version" = "=0.18.0" }
lance-testing = { "version" = "=0.18.0" }
lance-datafusion = { "version" = "=0.18.0" }
lance-encoding = { "version" = "=0.18.0" }
lance = { "version" = "=0.18.2", "features" = ["dynamodb"] }
lance-index = { "version" = "=0.18.2" }
lance-linalg = { "version" = "=0.18.2" }
lance-table = { "version" = "=0.18.2" }
lance-testing = { "version" = "=0.18.2" }
lance-datafusion = { "version" = "=0.18.2" }
lance-encoding = { "version" = "=0.18.2" }
# Note that this one does not include pyarrow
arrow = { version = "52.2", optional = false }
arrow-array = "52.2"
@@ -38,8 +38,8 @@ arrow-arith = "52.2"
arrow-cast = "52.2"
async-trait = "0"
chrono = "0.4.35"
datafusion-common = "40.0"
datafusion-physical-plan = "40.0"
datafusion-common = "41.0"
datafusion-physical-plan = "41.0"
half = { "version" = "=2.4.1", default-features = false, features = [
"num-traits",
] }

1436
node/package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -58,7 +58,7 @@
"ts-node-dev": "^2.0.0",
"typedoc": "^0.24.7",
"typedoc-plugin-markdown": "^3.15.3",
"typescript": "*",
"typescript": "^5.1.0",
"uuid": "^9.0.0"
},
"dependencies": {
@@ -88,10 +88,10 @@
}
},
"optionalDependencies": {
"@lancedb/vectordb-darwin-arm64": "0.4.20",
"@lancedb/vectordb-darwin-x64": "0.4.20",
"@lancedb/vectordb-linux-arm64-gnu": "0.4.20",
"@lancedb/vectordb-linux-x64-gnu": "0.4.20",
"@lancedb/vectordb-win32-x64-msvc": "0.4.20"
"@lancedb/vectordb-darwin-arm64": "0.11.0-beta.1",
"@lancedb/vectordb-darwin-x64": "0.11.0-beta.1",
"@lancedb/vectordb-linux-arm64-gnu": "0.11.0-beta.1",
"@lancedb/vectordb-linux-x64-gnu": "0.11.0-beta.1",
"@lancedb/vectordb-win32-x64-msvc": "0.11.0-beta.1"
}
}

View File

@@ -14,6 +14,7 @@
import { describe } from 'mocha'
import * as chai from 'chai'
import { assert } from 'chai'
import * as chaiAsPromised from 'chai-as-promised'
import { v4 as uuidv4 } from 'uuid'
@@ -22,7 +23,6 @@ import { tmpdir } from 'os'
import * as fs from 'fs'
import * as path from 'path'
const assert = chai.assert
chai.use(chaiAsPromised)
describe('LanceDB AWS Integration test', function () {

View File

@@ -142,9 +142,9 @@ export class Query<T = number[]> {
Object.keys(entry).forEach((key: string) => {
if (entry[key] instanceof Vector) {
// toJSON() returns f16 array correctly
newObject[key] = (entry[key] as Vector).toJSON()
newObject[key] = (entry[key] as any).toJSON()
} else {
newObject[key] = entry[key]
newObject[key] = entry[key] as any
}
})
return newObject as unknown as T

View File

@@ -247,9 +247,9 @@ export class RemoteQuery<T = number[]> extends Query<T> {
const newObject: Record<string, unknown> = {}
Object.keys(entry).forEach((key: string) => {
if (entry[key] instanceof Vector) {
newObject[key] = (entry[key] as Vector).toArray()
newObject[key] = (entry[key] as any).toArray()
} else {
newObject[key] = entry[key]
newObject[key] = entry[key] as any
}
})
return newObject as unknown as T

View File

@@ -14,6 +14,7 @@
import { describe } from "mocha";
import { track } from "temp";
import { assert, expect } from 'chai'
import * as chai from "chai";
import * as chaiAsPromised from "chai-as-promised";
@@ -44,8 +45,6 @@ import {
} from "apache-arrow";
import type { RemoteRequest, RemoteResponse } from "../middleware";
const expect = chai.expect;
const assert = chai.assert;
chai.use(chaiAsPromised);
describe("LanceDB client", function () {
@@ -169,7 +168,7 @@ describe("LanceDB client", function () {
// Should reject a bad filter
await expect(table.filter("id % 2 = 0 AND").execute()).to.be.rejectedWith(
/.*sql parser error: Expected an expression:, found: EOF.*/
/.*sql parser error: .*/
);
});

View File

@@ -1,7 +1,7 @@
[package]
name = "lancedb-nodejs"
edition.workspace = true
version = "0.0.0"
version = "0.11.0-beta.1"
license.workspace = true
description.workspace = true
repository.workspace = true
@@ -14,7 +14,7 @@ crate-type = ["cdylib"]
[dependencies]
arrow-ipc.workspace = true
futures.workspace = true
lancedb = { path = "../rust/lancedb" }
lancedb = { path = "../rust/lancedb", features = ["remote"] }
napi = { version = "2.16.8", default-features = false, features = [
"napi9",
"async",

View File

@@ -0,0 +1,93 @@
// Copyright 2024 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import * as http from "http";
import { RequestListener } from "http";
import { Connection, ConnectionOptions, connect } from "../lancedb";
async function withMockDatabase(
listener: RequestListener,
callback: (db: Connection) => void,
connectionOptions?: ConnectionOptions,
) {
const server = http.createServer(listener);
server.listen(8000);
const db = await connect(
"db://dev",
Object.assign(
{
apiKey: "fake",
hostOverride: "http://localhost:8000",
},
connectionOptions,
),
);
try {
await callback(db);
} finally {
server.close();
}
}
describe("remote connection", () => {
it("should accept partial connection options", async () => {
await connect("db://test", {
apiKey: "fake",
clientConfig: {
timeoutConfig: { readTimeout: 5 },
retryConfig: { retries: 2 },
},
});
});
it("should pass down apiKey and userAgent", async () => {
await withMockDatabase(
(req, res) => {
expect(req.headers["x-api-key"]).toEqual("fake");
expect(req.headers["user-agent"]).toEqual(
`LanceDB-Node-Client/${process.env.npm_package_version}`,
);
const body = JSON.stringify({ tables: [] });
res.writeHead(200, { "Content-Type": "application/json" }).end(body);
},
async (db) => {
const tableNames = await db.tableNames();
expect(tableNames).toEqual([]);
},
);
});
it("allows customizing user agent", async () => {
await withMockDatabase(
(req, res) => {
expect(req.headers["user-agent"]).toEqual("MyApp/1.0");
const body = JSON.stringify({ tables: [] });
res.writeHead(200, { "Content-Type": "application/json" }).end(body);
},
async (db) => {
const tableNames = await db.tableNames();
expect(tableNames).toEqual([]);
},
{
clientConfig: {
userAgent: "MyApp/1.0",
},
},
);
});
});

View File

@@ -23,8 +23,6 @@ import {
Connection as LanceDbConnection,
} from "./native.js";
import { RemoteConnection, RemoteConnectionOptions } from "./remote";
export {
WriteOptions,
WriteMode,
@@ -33,6 +31,9 @@ export {
ConnectionOptions,
IndexStatistics,
IndexConfig,
ClientConfig,
TimeoutConfig,
RetryConfig,
} from "./native.js";
export {
@@ -87,7 +88,7 @@ export * as embedding from "./embedding";
*/
export async function connect(
uri: string,
opts?: Partial<ConnectionOptions | RemoteConnectionOptions>,
opts?: Partial<ConnectionOptions>,
): Promise<Connection>;
/**
* Connect to a LanceDB instance at the given URI.
@@ -108,13 +109,11 @@ export async function connect(
* ```
*/
export async function connect(
opts: Partial<RemoteConnectionOptions | ConnectionOptions> & { uri: string },
opts: Partial<ConnectionOptions> & { uri: string },
): Promise<Connection>;
export async function connect(
uriOrOptions:
| string
| (Partial<RemoteConnectionOptions | ConnectionOptions> & { uri: string }),
opts: Partial<ConnectionOptions | RemoteConnectionOptions> = {},
uriOrOptions: string | (Partial<ConnectionOptions> & { uri: string }),
opts: Partial<ConnectionOptions> = {},
): Promise<Connection> {
let uri: string | undefined;
if (typeof uriOrOptions !== "string") {
@@ -129,9 +128,6 @@ export async function connect(
throw new Error("uri is required");
}
if (uri?.startsWith("db://")) {
return new RemoteConnection(uri, opts as RemoteConnectionOptions);
}
opts = (opts as ConnectionOptions) ?? {};
(<ConnectionOptions>opts).storageOptions = cleanseStorageOptions(
(<ConnectionOptions>opts).storageOptions,

View File

@@ -1,218 +0,0 @@
// Copyright 2023 LanceDB Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import axios, {
AxiosError,
type AxiosResponse,
type ResponseType,
} from "axios";
import { Table as ArrowTable } from "../arrow";
import { tableFromIPC } from "../arrow";
import { VectorQuery } from "../query";
export class RestfulLanceDBClient {
#dbName: string;
#region: string;
#apiKey: string;
#hostOverride?: string;
#closed: boolean = false;
#timeout: number = 12 * 1000; // 12 seconds;
#session?: import("axios").AxiosInstance;
constructor(
dbName: string,
apiKey: string,
region: string,
hostOverride?: string,
timeout?: number,
) {
this.#dbName = dbName;
this.#apiKey = apiKey;
this.#region = region;
this.#hostOverride = hostOverride ?? this.#hostOverride;
this.#timeout = timeout ?? this.#timeout;
}
// todo: cache the session.
get session(): import("axios").AxiosInstance {
if (this.#session !== undefined) {
return this.#session;
} else {
return axios.create({
baseURL: this.url,
headers: {
// biome-ignore lint: external API
Authorization: `Bearer ${this.#apiKey}`,
},
transformResponse: decodeErrorData,
timeout: this.#timeout,
});
}
}
get url(): string {
return (
this.#hostOverride ??
`https://${this.#dbName}.${this.#region}.api.lancedb.com`
);
}
get headers(): { [key: string]: string } {
const headers: { [key: string]: string } = {
"x-api-key": this.#apiKey,
"x-request-id": "na",
};
if (this.#region == "local") {
headers["Host"] = `${this.#dbName}.${this.#region}.api.lancedb.com`;
}
if (this.#hostOverride) {
headers["x-lancedb-database"] = this.#dbName;
}
return headers;
}
isOpen(): boolean {
return !this.#closed;
}
private checkNotClosed(): void {
if (this.#closed) {
throw new Error("Connection is closed");
}
}
close(): void {
this.#session = undefined;
this.#closed = true;
}
// biome-ignore lint/suspicious/noExplicitAny: <explanation>
async get(uri: string, params?: Record<string, any>): Promise<any> {
this.checkNotClosed();
uri = new URL(uri, this.url).toString();
let response;
try {
response = await this.session.get(uri, {
headers: this.headers,
params,
});
} catch (e) {
if (e instanceof AxiosError && e.response) {
response = e.response;
} else {
throw e;
}
}
RestfulLanceDBClient.checkStatus(response!);
return response!.data;
}
// biome-ignore lint/suspicious/noExplicitAny: api response
async post(uri: string, body?: any): Promise<any>;
async post(
uri: string,
// biome-ignore lint/suspicious/noExplicitAny: api request
body: any,
additional: {
config?: { responseType: "arraybuffer" };
headers?: Record<string, string>;
params?: Record<string, string>;
},
): Promise<Buffer>;
async post(
uri: string,
// biome-ignore lint/suspicious/noExplicitAny: api request
body?: any,
additional?: {
config?: { responseType: ResponseType };
headers?: Record<string, string>;
params?: Record<string, string>;
},
// biome-ignore lint/suspicious/noExplicitAny: api response
): Promise<any> {
this.checkNotClosed();
uri = new URL(uri, this.url).toString();
additional = Object.assign(
{ config: { responseType: "json" } },
additional,
);
const headers = { ...this.headers, ...additional.headers };
if (!headers["Content-Type"]) {
headers["Content-Type"] = "application/json";
}
let response;
try {
response = await this.session.post(uri, body, {
headers,
responseType: additional!.config!.responseType,
params: new Map(Object.entries(additional.params ?? {})),
});
} catch (e) {
if (e instanceof AxiosError && e.response) {
response = e.response;
} else {
throw e;
}
}
RestfulLanceDBClient.checkStatus(response!);
if (additional!.config!.responseType === "arraybuffer") {
return response!.data;
} else {
return JSON.parse(response!.data);
}
}
async listTables(limit = 10, pageToken = ""): Promise<string[]> {
const json = await this.get("/v1/table", { limit, pageToken });
return json.tables;
}
async query(tableName: string, query: VectorQuery): Promise<ArrowTable> {
const tbl = await this.post(`/v1/table/${tableName}/query`, query, {
config: {
responseType: "arraybuffer",
},
});
return tableFromIPC(tbl);
}
static checkStatus(response: AxiosResponse): void {
if (response.status === 404) {
throw new Error(`Not found: ${response.data}`);
} else if (response.status >= 400 && response.status < 500) {
throw new Error(
`Bad Request: ${response.status}, error: ${response.data}`,
);
} else if (response.status >= 500 && response.status < 600) {
throw new Error(
`Internal Server Error: ${response.status}, error: ${response.data}`,
);
} else if (response.status !== 200) {
throw new Error(
`Unknown Error: ${response.status}, error: ${response.data}`,
);
}
}
}
function decodeErrorData(data: unknown) {
if (Buffer.isBuffer(data)) {
const decoded = data.toString("utf-8");
return decoded;
}
return data;
}

View File

@@ -1,193 +0,0 @@
import { Schema } from "apache-arrow";
import {
Data,
SchemaLike,
fromTableToStreamBuffer,
makeEmptyTable,
} from "../arrow";
import {
Connection,
CreateTableOptions,
OpenTableOptions,
TableNamesOptions,
} from "../connection";
import { Table } from "../table";
import { TTLCache } from "../util";
import { RestfulLanceDBClient } from "./client";
import { RemoteTable } from "./table";
export interface RemoteConnectionOptions {
apiKey?: string;
region?: string;
hostOverride?: string;
timeout?: number;
}
export class RemoteConnection extends Connection {
#dbName: string;
#apiKey: string;
#region: string;
#client: RestfulLanceDBClient;
#tableCache = new TTLCache(300_000);
constructor(
url: string,
{ apiKey, region, hostOverride, timeout }: RemoteConnectionOptions,
) {
super();
apiKey = apiKey ?? process.env.LANCEDB_API_KEY;
region = region ?? process.env.LANCEDB_REGION;
if (!apiKey) {
throw new Error("apiKey is required when connecting to LanceDB Cloud");
}
if (!region) {
throw new Error("region is required when connecting to LanceDB Cloud");
}
const parsed = new URL(url);
if (parsed.protocol !== "db:") {
throw new Error(
`invalid protocol: ${parsed.protocol}, only accepts db://`,
);
}
this.#dbName = parsed.hostname;
this.#apiKey = apiKey;
this.#region = region;
this.#client = new RestfulLanceDBClient(
this.#dbName,
this.#apiKey,
this.#region,
hostOverride,
timeout,
);
}
isOpen(): boolean {
return this.#client.isOpen();
}
close(): void {
return this.#client.close();
}
display(): string {
return `RemoteConnection(${this.#dbName})`;
}
async tableNames(options?: Partial<TableNamesOptions>): Promise<string[]> {
const response = await this.#client.get("/v1/table/", {
limit: options?.limit ?? 10,
// biome-ignore lint/style/useNamingConvention: <explanation>
page_token: options?.startAfter ?? "",
});
const body = await response.body();
for (const table of body.tables) {
this.#tableCache.set(table, true);
}
return body.tables;
}
async openTable(
name: string,
_options?: Partial<OpenTableOptions> | undefined,
): Promise<Table> {
if (this.#tableCache.get(name) === undefined) {
await this.#client.post(
`/v1/table/${encodeURIComponent(name)}/describe/`,
);
this.#tableCache.set(name, true);
}
return new RemoteTable(this.#client, name, this.#dbName);
}
async createTable(
nameOrOptions:
| string
| ({ name: string; data: Data } & Partial<CreateTableOptions>),
data?: Data,
options?: Partial<CreateTableOptions> | undefined,
): Promise<Table> {
if (typeof nameOrOptions !== "string" && "name" in nameOrOptions) {
const { name, data, ...options } = nameOrOptions;
return this.createTable(name, data, options);
}
if (data === undefined) {
throw new Error("data is required");
}
if (options?.mode) {
console.warn(
"option 'mode' is not supported in LanceDB Cloud",
"LanceDB Cloud only supports the default 'create' mode.",
"If the table already exists, an error will be thrown.",
);
}
if (options?.embeddingFunction) {
console.warn(
"embedding_functions is not yet supported on LanceDB Cloud.",
"Please vote https://github.com/lancedb/lancedb/issues/626 ",
"for this feature.",
);
}
const { buf } = await Table.parseTableData(
data,
options,
true /** streaming */,
);
await this.#client.post(
`/v1/table/${encodeURIComponent(nameOrOptions)}/create/`,
buf,
{
config: {
responseType: "arraybuffer",
},
headers: { "Content-Type": "application/vnd.apache.arrow.stream" },
},
);
this.#tableCache.set(nameOrOptions, true);
return new RemoteTable(this.#client, nameOrOptions, this.#dbName);
}
async createEmptyTable(
name: string,
schema: SchemaLike,
options?: Partial<CreateTableOptions> | undefined,
): Promise<Table> {
if (options?.mode) {
console.warn(`mode is not supported on LanceDB Cloud`);
}
if (options?.embeddingFunction) {
console.warn(
"embeddingFunction is not yet supported on LanceDB Cloud.",
"Please vote https://github.com/lancedb/lancedb/issues/626 ",
"for this feature.",
);
}
const emptyTable = makeEmptyTable(schema);
const buf = await fromTableToStreamBuffer(emptyTable);
await this.#client.post(
`/v1/table/${encodeURIComponent(name)}/create/`,
buf,
{
config: {
responseType: "arraybuffer",
},
headers: { "Content-Type": "application/vnd.apache.arrow.stream" },
},
);
this.#tableCache.set(name, true);
return new RemoteTable(this.#client, name, this.#dbName);
}
async dropTable(name: string): Promise<void> {
await this.#client.post(`/v1/table/${encodeURIComponent(name)}/drop/`);
this.#tableCache.delete(name);
}
}

View File

@@ -1,3 +0,0 @@
export { RestfulLanceDBClient } from "./client";
export { type RemoteConnectionOptions, RemoteConnection } from "./connection";
export { RemoteTable } from "./table";

View File

@@ -1,226 +0,0 @@
// Copyright 2023 LanceDB Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import { Table as ArrowTable } from "apache-arrow";
import { Data, IntoVector } from "../arrow";
import { IndexStatistics } from "..";
import { CreateTableOptions } from "../connection";
import { IndexOptions } from "../indices";
import { MergeInsertBuilder } from "../merge";
import { VectorQuery } from "../query";
import { AddDataOptions, Table, UpdateOptions } from "../table";
import { IntoSql, toSQL } from "../util";
import { RestfulLanceDBClient } from "./client";
export class RemoteTable extends Table {
#client: RestfulLanceDBClient;
#name: string;
// Used in the display() method
#dbName: string;
get #tablePrefix() {
return `/v1/table/${encodeURIComponent(this.#name)}/`;
}
get name(): string {
return this.#name;
}
public constructor(
client: RestfulLanceDBClient,
tableName: string,
dbName: string,
) {
super();
this.#client = client;
this.#name = tableName;
this.#dbName = dbName;
}
isOpen(): boolean {
return !this.#client.isOpen();
}
close(): void {
this.#client.close();
}
display(): string {
return `RemoteTable(${this.#dbName}; ${this.#name})`;
}
async schema(): Promise<import("apache-arrow").Schema> {
const resp = await this.#client.post(`${this.#tablePrefix}/describe/`);
// TODO: parse this into a valid arrow schema
return resp.schema;
}
async add(data: Data, options?: Partial<AddDataOptions>): Promise<void> {
const { buf, mode } = await Table.parseTableData(
data,
options as CreateTableOptions,
true,
);
await this.#client.post(`${this.#tablePrefix}/insert/`, buf, {
params: {
mode,
},
headers: {
"Content-Type": "application/vnd.apache.arrow.stream",
},
});
}
async update(
optsOrUpdates:
| (Map<string, string> | Record<string, string>)
| ({
values: Map<string, IntoSql> | Record<string, IntoSql>;
} & Partial<UpdateOptions>)
| ({
valuesSql: Map<string, string> | Record<string, string>;
} & Partial<UpdateOptions>),
options?: Partial<UpdateOptions>,
): Promise<void> {
const isValues =
"values" in optsOrUpdates && typeof optsOrUpdates.values !== "string";
const isValuesSql =
"valuesSql" in optsOrUpdates &&
typeof optsOrUpdates.valuesSql !== "string";
const isMap = (obj: unknown): obj is Map<string, string> => {
return obj instanceof Map;
};
let predicate;
let columns: [string, string][];
switch (true) {
case isMap(optsOrUpdates):
columns = Array.from(optsOrUpdates.entries());
predicate = options?.where;
break;
case isValues && isMap(optsOrUpdates.values):
columns = Array.from(optsOrUpdates.values.entries()).map(([k, v]) => [
k,
toSQL(v),
]);
predicate = optsOrUpdates.where;
break;
case isValues && !isMap(optsOrUpdates.values):
columns = Object.entries(optsOrUpdates.values).map(([k, v]) => [
k,
toSQL(v),
]);
predicate = optsOrUpdates.where;
break;
case isValuesSql && isMap(optsOrUpdates.valuesSql):
columns = Array.from(optsOrUpdates.valuesSql.entries());
predicate = optsOrUpdates.where;
break;
case isValuesSql && !isMap(optsOrUpdates.valuesSql):
columns = Object.entries(optsOrUpdates.valuesSql).map(([k, v]) => [
k,
v,
]);
predicate = optsOrUpdates.where;
break;
default:
columns = Object.entries(optsOrUpdates as Record<string, string>);
predicate = options?.where;
}
await this.#client.post(`${this.#tablePrefix}/update/`, {
predicate: predicate ?? null,
updates: columns,
});
}
async countRows(filter?: unknown): Promise<number> {
const payload = { predicate: filter };
return await this.#client.post(`${this.#tablePrefix}/count_rows/`, payload);
}
async delete(predicate: unknown): Promise<void> {
const payload = { predicate };
await this.#client.post(`${this.#tablePrefix}/delete/`, payload);
}
async createIndex(
column: string,
options?: Partial<IndexOptions>,
): Promise<void> {
if (options !== undefined) {
console.warn("options are not yet supported on the LanceDB cloud");
}
const indexType = "vector";
const metric = "L2";
const data = {
column,
// biome-ignore lint/style/useNamingConvention: external API
index_type: indexType,
// biome-ignore lint/style/useNamingConvention: external API
metric_type: metric,
};
await this.#client.post(`${this.#tablePrefix}/create_index`, data);
}
query(): import("..").Query {
throw new Error("query() is not yet supported on the LanceDB cloud");
}
search(_query: string | IntoVector): VectorQuery {
throw new Error("search() is not yet supported on the LanceDB cloud");
}
vectorSearch(_vector: unknown): import("..").VectorQuery {
throw new Error("vectorSearch() is not yet supported on the LanceDB cloud");
}
addColumns(_newColumnTransforms: unknown): Promise<void> {
throw new Error("addColumns() is not yet supported on the LanceDB cloud");
}
alterColumns(_columnAlterations: unknown): Promise<void> {
throw new Error("alterColumns() is not yet supported on the LanceDB cloud");
}
dropColumns(_columnNames: unknown): Promise<void> {
throw new Error("dropColumns() is not yet supported on the LanceDB cloud");
}
async version(): Promise<number> {
const resp = await this.#client.post(`${this.#tablePrefix}/describe/`);
return resp.version;
}
checkout(_version: unknown): Promise<void> {
throw new Error("checkout() is not yet supported on the LanceDB cloud");
}
checkoutLatest(): Promise<void> {
throw new Error(
"checkoutLatest() is not yet supported on the LanceDB cloud",
);
}
restore(): Promise<void> {
throw new Error("restore() is not yet supported on the LanceDB cloud");
}
optimize(_options?: unknown): Promise<import("../native").OptimizeStats> {
throw new Error("optimize() is not yet supported on the LanceDB cloud");
}
async listIndices(): Promise<import("../native").IndexConfig[]> {
return await this.#client.post(`${this.#tablePrefix}/index/list/`);
}
toArrow(): Promise<ArrowTable> {
throw new Error("toArrow() is not yet supported on the LanceDB cloud");
}
mergeInsert(_on: string | string[]): MergeInsertBuilder {
throw new Error("mergeInsert() is not yet supported on the LanceDB cloud");
}
async indexStats(_name: string): Promise<IndexStatistics | undefined> {
throw new Error("indexStats() is not yet supported on the LanceDB cloud");
}
}

View File

@@ -1,12 +1,12 @@
{
"name": "@lancedb/lancedb",
"version": "0.10.0-beta.1",
"version": "0.11.0-beta.1",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@lancedb/lancedb",
"version": "0.10.0-beta.1",
"version": "0.11.0-beta.1",
"cpu": [
"x64",
"arm64"
@@ -18,7 +18,6 @@
"win32"
],
"dependencies": {
"axios": "^1.7.2",
"reflect-metadata": "^0.2.2"
},
"devDependencies": {
@@ -30,6 +29,7 @@
"@napi-rs/cli": "^2.18.3",
"@types/axios": "^0.14.0",
"@types/jest": "^29.1.2",
"@types/node": "^22.7.4",
"@types/tmp": "^0.2.6",
"apache-arrow-13": "npm:apache-arrow@13.0.0",
"apache-arrow-14": "npm:apache-arrow@14.0.0",
@@ -4648,11 +4648,12 @@
"optional": true
},
"node_modules/@types/node": {
"version": "20.14.11",
"resolved": "https://registry.npmjs.org/@types/node/-/node-20.14.11.tgz",
"integrity": "sha512-kprQpL8MMeszbz6ojB5/tU8PLN4kesnN8Gjzw349rDlNgsSzg90lAVj3llK99Dh7JON+t9AuscPPFW6mPbTnSA==",
"version": "22.7.4",
"resolved": "https://registry.npmjs.org/@types/node/-/node-22.7.4.tgz",
"integrity": "sha512-y+NPi1rFzDs1NdQHHToqeiX2TIS79SWEAw9GYhkkx8bD0ChpfqC+n2j5OXOCpzfojBEBt6DnEnnG9MY0zk1XLg==",
"devOptional": true,
"dependencies": {
"undici-types": "~5.26.4"
"undici-types": "~6.19.2"
}
},
"node_modules/@types/node-fetch": {
@@ -4665,6 +4666,12 @@
"form-data": "^4.0.0"
}
},
"node_modules/@types/node/node_modules/undici-types": {
"version": "6.19.8",
"resolved": "https://registry.npmjs.org/undici-types/-/undici-types-6.19.8.tgz",
"integrity": "sha512-ve2KP6f/JnbPBFyobGHuerC9g1FYGn/F8n1LWTwNxCEzd6IfqTwUQcNXgEtmmQ6DlRrC1hrSrBnCZPokRrDHjw==",
"devOptional": true
},
"node_modules/@types/pad-left": {
"version": "2.1.1",
"resolved": "https://registry.npmjs.org/@types/pad-left/-/pad-left-2.1.1.tgz",
@@ -4963,6 +4970,21 @@
"arrow2csv": "bin/arrow2csv.cjs"
}
},
"node_modules/apache-arrow-15/node_modules/@types/node": {
"version": "20.16.10",
"resolved": "https://registry.npmjs.org/@types/node/-/node-20.16.10.tgz",
"integrity": "sha512-vQUKgWTjEIRFCvK6CyriPH3MZYiYlNy0fKiEYHWbcoWLEgs4opurGGKlebrTLqdSMIbXImH6XExNiIyNUv3WpA==",
"dev": true,
"dependencies": {
"undici-types": "~6.19.2"
}
},
"node_modules/apache-arrow-15/node_modules/undici-types": {
"version": "6.19.8",
"resolved": "https://registry.npmjs.org/undici-types/-/undici-types-6.19.8.tgz",
"integrity": "sha512-ve2KP6f/JnbPBFyobGHuerC9g1FYGn/F8n1LWTwNxCEzd6IfqTwUQcNXgEtmmQ6DlRrC1hrSrBnCZPokRrDHjw==",
"dev": true
},
"node_modules/apache-arrow-16": {
"name": "apache-arrow",
"version": "16.0.0",
@@ -4984,6 +5006,21 @@
"arrow2csv": "bin/arrow2csv.cjs"
}
},
"node_modules/apache-arrow-16/node_modules/@types/node": {
"version": "20.16.10",
"resolved": "https://registry.npmjs.org/@types/node/-/node-20.16.10.tgz",
"integrity": "sha512-vQUKgWTjEIRFCvK6CyriPH3MZYiYlNy0fKiEYHWbcoWLEgs4opurGGKlebrTLqdSMIbXImH6XExNiIyNUv3WpA==",
"dev": true,
"dependencies": {
"undici-types": "~6.19.2"
}
},
"node_modules/apache-arrow-16/node_modules/undici-types": {
"version": "6.19.8",
"resolved": "https://registry.npmjs.org/undici-types/-/undici-types-6.19.8.tgz",
"integrity": "sha512-ve2KP6f/JnbPBFyobGHuerC9g1FYGn/F8n1LWTwNxCEzd6IfqTwUQcNXgEtmmQ6DlRrC1hrSrBnCZPokRrDHjw==",
"dev": true
},
"node_modules/apache-arrow-17": {
"name": "apache-arrow",
"version": "17.0.0",
@@ -5011,12 +5048,42 @@
"integrity": "sha512-BwR5KP3Es/CSht0xqBcUXS3qCAUVXwpRKsV2+arxeb65atasuXG9LykC9Ab10Cw3s2raH92ZqOeILaQbsB2ACg==",
"dev": true
},
"node_modules/apache-arrow-17/node_modules/@types/node": {
"version": "20.16.10",
"resolved": "https://registry.npmjs.org/@types/node/-/node-20.16.10.tgz",
"integrity": "sha512-vQUKgWTjEIRFCvK6CyriPH3MZYiYlNy0fKiEYHWbcoWLEgs4opurGGKlebrTLqdSMIbXImH6XExNiIyNUv3WpA==",
"dev": true,
"dependencies": {
"undici-types": "~6.19.2"
}
},
"node_modules/apache-arrow-17/node_modules/flatbuffers": {
"version": "24.3.25",
"resolved": "https://registry.npmjs.org/flatbuffers/-/flatbuffers-24.3.25.tgz",
"integrity": "sha512-3HDgPbgiwWMI9zVB7VYBHaMrbOO7Gm0v+yD2FV/sCKj+9NDeVL7BOBYUuhWAQGKWOzBo8S9WdMvV0eixO233XQ==",
"dev": true
},
"node_modules/apache-arrow-17/node_modules/undici-types": {
"version": "6.19.8",
"resolved": "https://registry.npmjs.org/undici-types/-/undici-types-6.19.8.tgz",
"integrity": "sha512-ve2KP6f/JnbPBFyobGHuerC9g1FYGn/F8n1LWTwNxCEzd6IfqTwUQcNXgEtmmQ6DlRrC1hrSrBnCZPokRrDHjw==",
"dev": true
},
"node_modules/apache-arrow/node_modules/@types/node": {
"version": "20.16.10",
"resolved": "https://registry.npmjs.org/@types/node/-/node-20.16.10.tgz",
"integrity": "sha512-vQUKgWTjEIRFCvK6CyriPH3MZYiYlNy0fKiEYHWbcoWLEgs4opurGGKlebrTLqdSMIbXImH6XExNiIyNUv3WpA==",
"peer": true,
"dependencies": {
"undici-types": "~6.19.2"
}
},
"node_modules/apache-arrow/node_modules/undici-types": {
"version": "6.19.8",
"resolved": "https://registry.npmjs.org/undici-types/-/undici-types-6.19.8.tgz",
"integrity": "sha512-ve2KP6f/JnbPBFyobGHuerC9g1FYGn/F8n1LWTwNxCEzd6IfqTwUQcNXgEtmmQ6DlRrC1hrSrBnCZPokRrDHjw==",
"peer": true
},
"node_modules/argparse": {
"version": "1.0.10",
"resolved": "https://registry.npmjs.org/argparse/-/argparse-1.0.10.tgz",
@@ -5046,12 +5113,14 @@
"node_modules/asynckit": {
"version": "0.4.0",
"resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz",
"integrity": "sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q=="
"integrity": "sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q==",
"devOptional": true
},
"node_modules/axios": {
"version": "1.7.2",
"resolved": "https://registry.npmjs.org/axios/-/axios-1.7.2.tgz",
"integrity": "sha512-2A8QhOMrbomlDuiLeK9XibIBzuHeRcqqNOHp0Cyp5EoJ1IFDh+XZH3A6BkXtv0K4gFGCI0Y4BM7B1wOEi0Rmgw==",
"dev": true,
"dependencies": {
"follow-redirects": "^1.15.6",
"form-data": "^4.0.0",
@@ -5536,6 +5605,7 @@
"version": "1.0.8",
"resolved": "https://registry.npmjs.org/combined-stream/-/combined-stream-1.0.8.tgz",
"integrity": "sha512-FQN4MRfuJeHf7cBbBMJFXhKSDq+2kAArBlmRBvcvFE5BB1HZKXtSFASDhdlz9zOYwxh8lDdnvmMOe/+5cdoEdg==",
"devOptional": true,
"dependencies": {
"delayed-stream": "~1.0.0"
},
@@ -5723,6 +5793,7 @@
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/delayed-stream/-/delayed-stream-1.0.0.tgz",
"integrity": "sha512-ZySD7Nf91aLB0RxL4KGrKHBXl7Eds1DAmEdcoVawXnLD7SDhpNgtuII2aAkg7a7QS41jxPSZ17p4VdGnMHk3MQ==",
"devOptional": true,
"engines": {
"node": ">=0.4.0"
}
@@ -6248,6 +6319,7 @@
"version": "1.15.6",
"resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.15.6.tgz",
"integrity": "sha512-wWN62YITEaOpSK584EZXJafH1AGpO8RVgElfkuXbTOrPX4fIfOyEpW/CsiNd8JdYrAoOvafRTOEnvsO++qCqFA==",
"dev": true,
"funding": [
{
"type": "individual",
@@ -6267,6 +6339,7 @@
"version": "4.0.0",
"resolved": "https://registry.npmjs.org/form-data/-/form-data-4.0.0.tgz",
"integrity": "sha512-ETEklSGi5t0QMZuiXoA/Q6vcnxcLQP5vdugSpuAyi6SVGi2clPPp+xgEhuMaHC+zGgn31Kd235W35f7Hykkaww==",
"devOptional": true,
"dependencies": {
"asynckit": "^0.4.0",
"combined-stream": "^1.0.8",
@@ -7773,6 +7846,7 @@
"version": "1.52.0",
"resolved": "https://registry.npmjs.org/mime-db/-/mime-db-1.52.0.tgz",
"integrity": "sha512-sPU4uV7dYlvtWJxwwxHD0PuihVNiE7TyAbQ5SWxDCB9mUYvOgroQOwYQQOKPJ8CIbE+1ETVlOoK1UC2nU3gYvg==",
"devOptional": true,
"engines": {
"node": ">= 0.6"
}
@@ -7781,6 +7855,7 @@
"version": "2.1.35",
"resolved": "https://registry.npmjs.org/mime-types/-/mime-types-2.1.35.tgz",
"integrity": "sha512-ZDY+bPm5zTTF+YpCrAU9nK0UgICYPT0QtT1NZWFv4s++TNkcgVaT0g6+4R2uI4MjQjzysHB1zxuWL50hzaeXiw==",
"devOptional": true,
"dependencies": {
"mime-db": "1.52.0"
},
@@ -8393,7 +8468,8 @@
"node_modules/proxy-from-env": {
"version": "1.1.0",
"resolved": "https://registry.npmjs.org/proxy-from-env/-/proxy-from-env-1.1.0.tgz",
"integrity": "sha512-D+zkORCbA9f1tdWRK0RaCR3GPv50cMxcrz4X8k5LTSUD1Dkw47mKJEZQNunItRTkWwgtaUSo1RVFRIG9ZXiFYg=="
"integrity": "sha512-D+zkORCbA9f1tdWRK0RaCR3GPv50cMxcrz4X8k5LTSUD1Dkw47mKJEZQNunItRTkWwgtaUSo1RVFRIG9ZXiFYg==",
"dev": true
},
"node_modules/pump": {
"version": "3.0.0",
@@ -9561,7 +9637,8 @@
"node_modules/undici-types": {
"version": "5.26.5",
"resolved": "https://registry.npmjs.org/undici-types/-/undici-types-5.26.5.tgz",
"integrity": "sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA=="
"integrity": "sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA==",
"optional": true
},
"node_modules/update-browserslist-db": {
"version": "1.0.13",

View File

@@ -40,6 +40,7 @@
"@napi-rs/cli": "^2.18.3",
"@types/axios": "^0.14.0",
"@types/jest": "^29.1.2",
"@types/node": "^22.7.4",
"@types/tmp": "^0.2.6",
"apache-arrow-13": "npm:apache-arrow@13.0.0",
"apache-arrow-14": "npm:apache-arrow@14.0.0",
@@ -81,7 +82,6 @@
"version": "napi version"
},
"dependencies": {
"axios": "^1.7.2",
"reflect-metadata": "^0.2.2"
},
"optionalDependencies": {

View File

@@ -68,6 +68,24 @@ impl Connection {
builder = builder.storage_option(key, value);
}
}
let client_config = options.client_config.unwrap_or_default();
builder = builder.client_config(client_config.into());
if let Some(api_key) = options.api_key {
builder = builder.api_key(&api_key);
}
if let Some(region) = options.region {
builder = builder.region(&region);
} else {
builder = builder.region("us-east-1");
}
if let Some(host_override) = options.host_override {
builder = builder.host_override(&host_override);
}
Ok(Self::inner_new(
builder
.execute()

View File

@@ -22,6 +22,7 @@ mod index;
mod iterator;
pub mod merge;
mod query;
pub mod remote;
mod table;
mod util;
@@ -42,6 +43,19 @@ pub struct ConnectionOptions {
///
/// The available options are described at https://lancedb.github.io/lancedb/guides/storage/
pub storage_options: Option<HashMap<String, String>>,
/// (For LanceDB cloud only): configuration for the remote HTTP client.
pub client_config: Option<remote::ClientConfig>,
/// (For LanceDB cloud only): the API key to use with LanceDB Cloud.
///
/// Can also be set via the environment variable `LANCEDB_API_KEY`.
pub api_key: Option<String>,
/// (For LanceDB cloud only): the region to use for LanceDB cloud.
/// Defaults to 'us-east-1'.
pub region: Option<String>,
/// (For LanceDB cloud only): the host to use for LanceDB cloud. Used
/// for testing purposes.
pub host_override: Option<String>,
}
/// Write mode for writing a table.

120
nodejs/src/remote.rs Normal file
View File

@@ -0,0 +1,120 @@
// Copyright 2024 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use napi_derive::*;
/// Timeout configuration for remote HTTP client.
#[napi(object)]
#[derive(Debug)]
pub struct TimeoutConfig {
/// The timeout for establishing a connection in seconds. Default is 120
/// seconds (2 minutes). This can also be set via the environment variable
/// `LANCE_CLIENT_CONNECT_TIMEOUT`, as an integer number of seconds.
pub connect_timeout: Option<f64>,
/// The timeout for reading data from the server in seconds. Default is 300
/// seconds (5 minutes). This can also be set via the environment variable
/// `LANCE_CLIENT_READ_TIMEOUT`, as an integer number of seconds.
pub read_timeout: Option<f64>,
/// The timeout for keeping idle connections in the connection pool in seconds.
/// Default is 300 seconds (5 minutes). This can also be set via the
/// environment variable `LANCE_CLIENT_CONNECTION_TIMEOUT`, as an integer
/// number of seconds.
pub pool_idle_timeout: Option<f64>,
}
/// Retry configuration for the remote HTTP client.
#[napi(object)]
#[derive(Debug)]
pub struct RetryConfig {
/// The maximum number of retries for a request. Default is 3. You can also
/// set this via the environment variable `LANCE_CLIENT_MAX_RETRIES`.
pub retries: Option<u8>,
/// The maximum number of retries for connection errors. Default is 3. You
/// can also set this via the environment variable `LANCE_CLIENT_CONNECT_RETRIES`.
pub connect_retries: Option<u8>,
/// The maximum number of retries for read errors. Default is 3. You can also
/// set this via the environment variable `LANCE_CLIENT_READ_RETRIES`.
pub read_retries: Option<u8>,
/// The backoff factor to apply between retries. Default is 0.25. Between each retry
/// the client will wait for the amount of seconds:
/// `{backoff factor} * (2 ** ({number of previous retries}))`. So for the default
/// of 0.25, the first retry will wait 0.25 seconds, the second retry will wait 0.5
/// seconds, the third retry will wait 1 second, etc.
///
/// You can also set this via the environment variable
/// `LANCE_CLIENT_RETRY_BACKOFF_FACTOR`.
pub backoff_factor: Option<f64>,
/// The jitter to apply to the backoff factor, in seconds. Default is 0.25.
///
/// A random value between 0 and `backoff_jitter` will be added to the backoff
/// factor in seconds. So for the default of 0.25 seconds, between 0 and 250
/// milliseconds will be added to the sleep between each retry.
///
/// You can also set this via the environment variable
/// `LANCE_CLIENT_RETRY_BACKOFF_JITTER`.
pub backoff_jitter: Option<f64>,
/// The HTTP status codes for which to retry the request. Default is
/// [429, 500, 502, 503].
///
/// You can also set this via the environment variable
/// `LANCE_CLIENT_RETRY_STATUSES`. Use a comma-separated list of integers.
pub statuses: Option<Vec<u16>>,
}
#[napi(object)]
#[derive(Debug, Default)]
pub struct ClientConfig {
pub user_agent: Option<String>,
pub retry_config: Option<RetryConfig>,
pub timeout_config: Option<TimeoutConfig>,
}
impl From<TimeoutConfig> for lancedb::remote::TimeoutConfig {
fn from(config: TimeoutConfig) -> Self {
Self {
connect_timeout: config
.connect_timeout
.map(std::time::Duration::from_secs_f64),
read_timeout: config.read_timeout.map(std::time::Duration::from_secs_f64),
pool_idle_timeout: config
.pool_idle_timeout
.map(std::time::Duration::from_secs_f64),
}
}
}
impl From<RetryConfig> for lancedb::remote::RetryConfig {
fn from(config: RetryConfig) -> Self {
Self {
retries: config.retries,
connect_retries: config.connect_retries,
read_retries: config.read_retries,
backoff_factor: config.backoff_factor.map(|v| v as f32),
backoff_jitter: config.backoff_jitter.map(|v| v as f32),
statuses: config.statuses,
}
}
}
impl From<ClientConfig> for lancedb::remote::ClientConfig {
fn from(config: ClientConfig) -> Self {
Self {
user_agent: config
.user_agent
.unwrap_or(concat!("LanceDB-Node-Client/", env!("CARGO_PKG_VERSION")).to_string()),
retry_config: config.retry_config.map(Into::into).unwrap_or_default(),
timeout_config: config.timeout_config.map(Into::into).unwrap_or_default(),
}
}
}

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.14.0-beta.0"
current_version = "0.14.0"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb-python"
version = "0.14.0-beta.0"
version = "0.14.0"
edition.workspace = true
description = "Python bindings for LanceDB"
license.workspace = true

View File

@@ -3,7 +3,7 @@ name = "lancedb"
# version in Cargo.toml
dependencies = [
"deprecation",
"pylance==0.18.0",
"pylance==0.18.2",
"requests>=2.31.0",
"retry>=0.9.2",
"tqdm>=4.27.0",

View File

@@ -20,7 +20,7 @@ from .util import safe_import_pandas
pd = safe_import_pandas()
DATA = Union[List[dict], dict, "pd.DataFrame", pa.Table, Iterable[pa.RecordBatch]]
DATA = Union[List[dict], "pd.DataFrame", pa.Table, Iterable[pa.RecordBatch]]
VEC = Union[list, np.ndarray, pa.Array, pa.ChunkedArray]
URI = Union[str, Path]
VECTOR_COLUMN_NAME = "vector"

View File

@@ -96,7 +96,7 @@ class DBConnection(EnforceOverrides):
User must provide at least one of `data` or `schema`.
Acceptable types are:
- dict or list-of-dict
- list-of-dict
- pandas.DataFrame
@@ -579,7 +579,7 @@ class AsyncConnection(object):
User must provide at least one of `data` or `schema`.
Acceptable types are:
- dict or list-of-dict
- list-of-dict
- pandas.DataFrame

View File

@@ -103,19 +103,29 @@ class RestfulLanceDBClient:
@staticmethod
def _check_status(resp: requests.Response):
# Leaving request id empty for now, as we'll be replacing this impl
# with the Rust one shortly.
if resp.status_code == 404:
raise LanceDBClientError(f"Not found: {resp.text}")
raise LanceDBClientError(
f"Not found: {resp.text}", request_id="", status_code=404
)
elif 400 <= resp.status_code < 500:
raise LanceDBClientError(
f"Bad Request: {resp.status_code}, error: {resp.text}"
f"Bad Request: {resp.status_code}, error: {resp.text}",
request_id="",
status_code=resp.status_code,
)
elif 500 <= resp.status_code < 600:
raise LanceDBClientError(
f"Internal Server Error: {resp.status_code}, error: {resp.text}"
f"Internal Server Error: {resp.status_code}, error: {resp.text}",
request_id="",
status_code=resp.status_code,
)
elif resp.status_code != 200:
raise LanceDBClientError(
f"Unknown Error: {resp.status_code}, error: {resp.text}"
f"Unknown Error: {resp.status_code}, error: {resp.text}",
request_id="",
status_code=resp.status_code,
)
@_check_not_closed

View File

@@ -12,5 +12,102 @@
# limitations under the License.
from typing import Optional
class LanceDBClientError(RuntimeError):
"""An error that occurred in the LanceDB client.
Attributes
----------
message: str
The error message.
request_id: str
The id of the request that failed. This can be provided in error reports
to help diagnose the issue.
status_code: int
The HTTP status code of the response. May be None if the request
failed before the response was received.
"""
def __init__(
self, message: str, request_id: str, status_code: Optional[int] = None
):
super().__init__(message)
self.request_id = request_id
self.status_code = status_code
class HttpError(LanceDBClientError):
"""An error that occurred during an HTTP request.
Attributes
----------
message: str
The error message.
request_id: str
The id of the request that failed. This can be provided in error reports
to help diagnose the issue.
status_code: int
The HTTP status code of the response. May be None if the request
failed before the response was received.
"""
pass
class RetryError(LanceDBClientError):
"""An error that occurs when the client has exceeded the maximum number of retries.
The retry strategy can be adjusted by setting the
[retry_config](lancedb.remote.ClientConfig.retry_config) in the client
configuration. This is passed in the `client_config` argument of
[connect](lancedb.connect) and [connect_async](lancedb.connect_async).
The __cause__ attribute of this exception will be the last exception that
caused the retry to fail. It will be an
[HttpError][lancedb.remote.errors.HttpError] instance.
Attributes
----------
message: str
The retry error message, which will describe which retry limit was hit.
request_id: str
The id of the request that failed. This can be provided in error reports
to help diagnose the issue.
request_failures: int
The number of request failures.
connect_failures: int
The number of connect failures.
read_failures: int
The number of read failures.
max_request_failures: int
The maximum number of request failures.
max_connect_failures: int
The maximum number of connect failures.
max_read_failures: int
The maximum number of read failures.
status_code: int
The HTTP status code of the last response. May be None if the request
failed before the response was received.
"""
def __init__(
self,
message: str,
request_id: str,
request_failures: int,
connect_failures: int,
read_failures: int,
max_request_failures: int,
max_connect_failures: int,
max_read_failures: int,
status_code: Optional[int],
):
super().__init__(message, request_id, status_code)
self.request_failures = request_failures
self.connect_failures = connect_failures
self.read_failures = read_failures
self.max_request_failures = max_request_failures
self.max_connect_failures = max_connect_failures
self.max_read_failures = max_read_failures

View File

@@ -31,7 +31,6 @@ import pyarrow.compute as pc
import pyarrow.fs as pa_fs
from lance import LanceDataset
from lance.dependencies import _check_for_hugging_face
from lance.vector import vec_to_table
from .common import DATA, VEC, VECTOR_COLUMN_NAME
from .embeddings import EmbeddingFunctionConfig, EmbeddingFunctionRegistry
@@ -87,6 +86,9 @@ def _coerce_to_table(data, schema: Optional[pa.Schema] = None) -> pa.Table:
if isinstance(data, LanceModel):
raise ValueError("Cannot add a single LanceModel to a table. Use a list.")
if isinstance(data, dict):
raise ValueError("Cannot add a single dictionary to a table. Use a list.")
if isinstance(data, list):
# convert to list of dict if data is a bunch of LanceModels
if isinstance(data[0], LanceModel):
@@ -98,8 +100,6 @@ def _coerce_to_table(data, schema: Optional[pa.Schema] = None) -> pa.Table:
return pa.Table.from_batches(data, schema=schema)
else:
return pa.Table.from_pylist(data)
elif isinstance(data, dict):
return vec_to_table(data)
elif _check_for_pandas(data) and isinstance(data, pd.DataFrame):
# Do not add schema here, since schema may contains the vector column
table = pa.Table.from_pandas(data, preserve_index=False)
@@ -554,7 +554,7 @@ class Table(ABC):
data: DATA
The data to insert into the table. Acceptable types are:
- dict or list-of-dict
- list-of-dict
- pandas.DataFrame
@@ -1409,7 +1409,7 @@ class LanceTable(Table):
Parameters
----------
data: list-of-dict, dict, pd.DataFrame
data: list-of-dict, pd.DataFrame
The data to insert into the table.
mode: str
The mode to use when writing the data. Valid values are
@@ -2348,7 +2348,7 @@ class AsyncTable:
data: DATA
The data to insert into the table. Acceptable types are:
- dict or list-of-dict
- list-of-dict
- pandas.DataFrame

View File

@@ -354,7 +354,7 @@ async def test_create_mode_async(tmp_path):
)
await db.create_table("test", data=data)
with pytest.raises(RuntimeError):
with pytest.raises(ValueError, match="already exists"):
await db.create_table("test", data=data)
new_data = pd.DataFrame(
@@ -382,7 +382,7 @@ async def test_create_exist_ok_async(tmp_path):
)
tbl = await db.create_table("test", data=data)
with pytest.raises(RuntimeError):
with pytest.raises(ValueError, match="already exists"):
await db.create_table("test", data=data)
# open the table but don't add more rows

View File

@@ -1,12 +1,14 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
import contextlib
import http.server
import threading
from unittest.mock import MagicMock
import uuid
import lancedb
from lancedb.remote.errors import HttpError, RetryError
import pyarrow as pa
from lancedb.remote.client import VectorQuery, VectorQueryResult
import pytest
@@ -98,6 +100,33 @@ def make_mock_http_handler(handler):
return MockLanceDBHandler
@contextlib.asynccontextmanager
async def mock_lancedb_connection(handler):
with http.server.HTTPServer(
("localhost", 8080), make_mock_http_handler(handler)
) as server:
handle = threading.Thread(target=server.serve_forever)
handle.start()
db = await lancedb.connect_async(
"db://dev",
api_key="fake",
host_override="http://localhost:8080",
client_config={
"retry_config": {"retries": 2},
"timeout_config": {
"connect_timeout": 1,
},
},
)
try:
yield db
finally:
server.shutdown()
handle.join()
@pytest.mark.asyncio
async def test_async_remote_db():
def handler(request):
@@ -114,28 +143,50 @@ async def test_async_remote_db():
request.end_headers()
request.wfile.write(b'{"tables": []}')
def run_server():
with http.server.HTTPServer(
("localhost", 8080), make_mock_http_handler(handler)
) as server:
# we will only make one request
server.handle_request()
async with mock_lancedb_connection(handler) as db:
table_names = await db.table_names()
assert table_names == []
handle = threading.Thread(target=run_server)
handle.start()
db = await lancedb.connect_async(
"db://dev",
api_key="fake",
host_override="http://localhost:8080",
client_config={
"retry_config": {"retries": 2},
"timeout_config": {
"connect_timeout": 1,
},
},
)
table_names = await db.table_names()
assert table_names == []
@pytest.mark.asyncio
async def test_http_error():
request_id_holder = {"request_id": None}
handle.join()
def handler(request):
request_id_holder["request_id"] = request.headers["x-request-id"]
request.send_response(507)
request.end_headers()
request.wfile.write(b"Internal Server Error")
async with mock_lancedb_connection(handler) as db:
with pytest.raises(HttpError, match="Internal Server Error") as exc_info:
await db.table_names()
assert exc_info.value.request_id == request_id_holder["request_id"]
assert exc_info.value.status_code == 507
@pytest.mark.asyncio
async def test_retry_error():
request_id_holder = {"request_id": None}
def handler(request):
request_id_holder["request_id"] = request.headers["x-request-id"]
request.send_response(429)
request.end_headers()
request.wfile.write(b"Try again later")
async with mock_lancedb_connection(handler) as db:
with pytest.raises(RetryError, match="Hit retry limit") as exc_info:
await db.table_names()
assert exc_info.value.request_id == request_id_holder["request_id"]
assert exc_info.value.status_code == 429
cause = exc_info.value.__cause__
assert isinstance(cause, HttpError)
assert "Try again later" in str(cause)
assert cause.request_id == request_id_holder["request_id"]
assert cause.status_code == 429

View File

@@ -193,6 +193,24 @@ def test_empty_table(db):
tbl.add(data=data)
def test_add_dictionary(db):
schema = pa.schema(
[
pa.field("vector", pa.list_(pa.float32(), 2)),
pa.field("item", pa.string()),
pa.field("price", pa.float32()),
]
)
tbl = LanceTable.create(db, "test", schema=schema)
data = {"vector": [3.1, 4.1], "item": "foo", "price": 10.0}
with pytest.raises(ValueError) as excep_info:
tbl.add(data=data)
assert (
str(excep_info.value)
== "Cannot add a single dictionary to a table. Use a list."
)
def test_add(db):
schema = pa.schema(
[

View File

@@ -14,7 +14,9 @@
use pyo3::{
exceptions::{PyIOError, PyNotImplementedError, PyOSError, PyRuntimeError, PyValueError},
PyResult,
intern,
types::{PyAnyMethods, PyNone},
PyErr, PyResult, Python,
};
use lancedb::error::Error as LanceError;
@@ -38,12 +40,79 @@ impl<T> PythonErrorExt<T> for std::result::Result<T, LanceError> {
LanceError::InvalidInput { .. }
| LanceError::InvalidTableName { .. }
| LanceError::TableNotFound { .. }
| LanceError::Schema { .. } => self.value_error(),
| LanceError::Schema { .. }
| LanceError::TableAlreadyExists { .. } => self.value_error(),
LanceError::CreateDir { .. } => self.os_error(),
LanceError::ObjectStore { .. } => Err(PyIOError::new_err(err.to_string())),
LanceError::NotSupported { .. } => {
Err(PyNotImplementedError::new_err(err.to_string()))
}
LanceError::Http {
request_id,
source,
status_code,
} => Python::with_gil(|py| {
let message = err.to_string();
let http_err_cls = py
.import_bound(intern!(py, "lancedb.remote.errors"))?
.getattr(intern!(py, "HttpError"))?;
let err = http_err_cls.call1((
message,
request_id,
status_code.map(|s| s.as_u16()),
))?;
if let Some(cause) = source.source() {
// The HTTP error already includes the first cause. But
// we can add the rest of the chain if there is any more.
let cause_err = http_from_rust_error(
py,
cause,
request_id,
status_code.map(|s| s.as_u16()),
)?;
err.setattr(intern!(py, "__cause__"), cause_err)?;
}
Err(PyErr::from_value_bound(err))
}),
LanceError::Retry {
request_id,
request_failures,
max_request_failures,
connect_failures,
max_connect_failures,
read_failures,
max_read_failures,
source,
status_code,
} => Python::with_gil(|py| {
let cause_err = http_from_rust_error(
py,
source.as_ref(),
request_id,
status_code.map(|s| s.as_u16()),
)?;
let message = err.to_string();
let retry_error_cls = py
.import_bound(intern!(py, "lancedb.remote.errors"))?
.getattr("RetryError")?;
let err = retry_error_cls.call1((
message,
request_id,
*request_failures,
*connect_failures,
*read_failures,
*max_request_failures,
*max_connect_failures,
*max_read_failures,
status_code.map(|s| s.as_u16()),
))?;
err.setattr(intern!(py, "__cause__"), cause_err)?;
Err(PyErr::from_value_bound(err))
}),
_ => self.runtime_error(),
},
}
@@ -61,3 +130,24 @@ impl<T> PythonErrorExt<T> for std::result::Result<T, LanceError> {
self.map_err(|err| PyValueError::new_err(err.to_string()))
}
}
fn http_from_rust_error(
py: Python<'_>,
err: &dyn std::error::Error,
request_id: &str,
status_code: Option<u16>,
) -> PyResult<PyErr> {
let message = err.to_string();
let http_err_cls = py.import("lancedb.remote.errors")?.getattr("HttpError")?;
let py_err = http_err_cls.call1((message, request_id, status_code))?;
// Reset the traceback since it doesn't provide additional information.
let py_err = py_err.call_method1(intern!(py, "with_traceback"), (PyNone::get_bound(py),))?;
if let Some(cause) = err.source() {
let cause_err = http_from_rust_error(py, cause, request_id, status_code)?;
py_err.setattr(intern!(py, "__cause__"), cause_err)?;
}
Ok(PyErr::from_value(py_err))
}

View File

@@ -46,8 +46,37 @@ pub enum Error {
ObjectStore { source: object_store::Error },
#[snafu(display("lance error: {source}"))]
Lance { source: lance::Error },
#[snafu(display("Http error: {message}"))]
Http { message: String },
#[cfg(feature = "remote")]
#[snafu(display("Http error: (request_id={request_id}) {source}"))]
Http {
#[snafu(source(from(reqwest::Error, Box::new)))]
source: Box<dyn std::error::Error + Send + Sync>,
request_id: String,
/// Status code associated with the error, if available.
/// This is not always available, for example when the error is due to a
/// connection failure. It may also be missing if the request was
/// successful but there was an error decoding the response.
status_code: Option<reqwest::StatusCode>,
},
#[cfg(feature = "remote")]
#[snafu(display(
"Hit retry limit for request_id={request_id} (\
request_failures={request_failures}/{max_request_failures}, \
connect_failures={connect_failures}/{max_connect_failures}, \
read_failures={read_failures}/{max_read_failures})"
))]
Retry {
request_id: String,
request_failures: u8,
max_request_failures: u8,
connect_failures: u8,
max_connect_failures: u8,
read_failures: u8,
max_read_failures: u8,
#[snafu(source(from(reqwest::Error, Box::new)))]
source: Box<dyn std::error::Error + Send + Sync>,
status_code: Option<reqwest::StatusCode>,
},
#[snafu(display("Arrow error: {source}"))]
Arrow { source: ArrowError },
#[snafu(display("LanceDBError: not supported: {message}"))]
@@ -98,24 +127,6 @@ impl<T> From<PoisonError<T>> for Error {
}
}
#[cfg(feature = "remote")]
impl From<reqwest::Error> for Error {
fn from(e: reqwest::Error) -> Self {
Self::Http {
message: e.to_string(),
}
}
}
#[cfg(feature = "remote")]
impl From<url::ParseError> for Error {
fn from(e: url::ParseError) -> Self {
Self::Http {
message: e.to_string(),
}
}
}
#[cfg(feature = "polars")]
impl From<polars::prelude::PolarsError> for Error {
fn from(source: polars::prelude::PolarsError) -> Self {

View File

@@ -216,10 +216,12 @@ impl RestfulLanceDbClient<Sender> {
host_override: Option<String>,
client_config: ClientConfig,
) -> Result<Self> {
let parsed_url = url::Url::parse(db_url)?;
let parsed_url = url::Url::parse(db_url).map_err(|err| Error::InvalidInput {
message: format!("db_url is not a valid URL. '{db_url}'. Error: {err}"),
})?;
debug_assert_eq!(parsed_url.scheme(), "db");
if !parsed_url.has_host() {
return Err(Error::Http {
return Err(Error::InvalidInput {
message: format!("Invalid database URL (missing host) '{}'", db_url),
});
}
@@ -255,7 +257,11 @@ impl RestfulLanceDbClient<Sender> {
host_override.is_some(),
)?)
.user_agent(client_config.user_agent)
.build()?;
.build()
.map_err(|err| Error::Other {
message: "Failed to build HTTP client".into(),
source: Some(Box::new(err)),
})?;
let host = match host_override {
Some(host_override) => host_override,
None => format!("https://{}.{}.api.lancedb.com", db_name, region),
@@ -284,7 +290,7 @@ impl<S: HttpSend> RestfulLanceDbClient<S> {
let mut headers = HeaderMap::new();
headers.insert(
"x-api-key",
HeaderValue::from_str(api_key).map_err(|_| Error::Http {
HeaderValue::from_str(api_key).map_err(|_| Error::InvalidInput {
message: "non-ascii api key provided".to_string(),
})?,
);
@@ -292,7 +298,7 @@ impl<S: HttpSend> RestfulLanceDbClient<S> {
let host = format!("{}.local.api.lancedb.com", db_name);
headers.insert(
"Host",
HeaderValue::from_str(&host).map_err(|_| Error::Http {
HeaderValue::from_str(&host).map_err(|_| Error::InvalidInput {
message: format!("non-ascii database name '{}' provided", db_name),
})?,
);
@@ -300,7 +306,7 @@ impl<S: HttpSend> RestfulLanceDbClient<S> {
if has_host_override {
headers.insert(
"x-lancedb-database",
HeaderValue::from_str(db_name).map_err(|_| Error::Http {
HeaderValue::from_str(db_name).map_err(|_| Error::InvalidInput {
message: format!("non-ascii database name '{}' provided", db_name),
})?,
);
@@ -319,22 +325,30 @@ impl<S: HttpSend> RestfulLanceDbClient<S> {
self.client.post(full_uri)
}
pub async fn send(&self, req: RequestBuilder, with_retry: bool) -> Result<Response> {
pub async fn send(&self, req: RequestBuilder, with_retry: bool) -> Result<(String, Response)> {
let (client, request) = req.build_split();
let mut request = request.unwrap();
// Set a request id.
// TODO: allow the user to supply this, through middleware?
if request.headers().get(REQUEST_ID_HEADER).is_none() {
let request_id = uuid::Uuid::new_v4();
let request_id = HeaderValue::from_str(&request_id.to_string()).unwrap();
request.headers_mut().insert(REQUEST_ID_HEADER, request_id);
}
let request_id = if let Some(request_id) = request.headers().get(REQUEST_ID_HEADER) {
request_id.to_str().unwrap().to_string()
} else {
let request_id = uuid::Uuid::new_v4().to_string();
let header = HeaderValue::from_str(&request_id).unwrap();
request.headers_mut().insert(REQUEST_ID_HEADER, header);
request_id
};
if with_retry {
self.send_with_retry_impl(client, request).await
self.send_with_retry_impl(client, request, request_id).await
} else {
Ok(self.sender.send(&client, request).await?)
let response = self
.sender
.send(&client, request)
.await
.err_to_http(request_id.clone())?;
Ok((request_id, response))
}
}
@@ -342,98 +356,178 @@ impl<S: HttpSend> RestfulLanceDbClient<S> {
&self,
client: reqwest::Client,
req: Request,
) -> Result<Response> {
let mut request_failures = 0;
let mut connect_failures = 0;
let mut read_failures = 0;
request_id: String,
) -> Result<(String, Response)> {
let mut retry_counter = RetryCounter::new(&self.retry_config, request_id);
loop {
// This only works if the request body is not a stream. If it is
// a stream, we can't use the retry path. We would need to implement
// an outer retry.
let request = req.try_clone().ok_or_else(|| Error::Http {
let request = req.try_clone().ok_or_else(|| Error::Runtime {
message: "Attempted to retry a request that cannot be cloned".to_string(),
})?;
let response = self.sender.send(&client, request).await;
let status_code = response.as_ref().map(|r| r.status());
match status_code {
Ok(status) if status.is_success() => return Ok(response?),
Ok(status) if self.retry_config.statuses.contains(&status) => {
request_failures += 1;
if request_failures >= self.retry_config.retries {
// TODO: better error
return Err(Error::Runtime {
message: format!(
"Request failed after {} retries with status code {}",
request_failures, status
),
});
}
let response = self
.sender
.send(&client, request)
.await
.map(|r| (r.status(), r));
match response {
Ok((status, response)) if status.is_success() => {
return Ok((retry_counter.request_id, response))
}
Ok((status, response)) if self.retry_config.statuses.contains(&status) => {
let source = self
.check_response(&retry_counter.request_id, response)
.await
.unwrap_err();
retry_counter.increment_request_failures(source)?;
}
Err(err) if err.is_connect() => {
connect_failures += 1;
if connect_failures >= self.retry_config.connect_retries {
return Err(Error::Runtime {
message: format!(
"Request failed after {} connect retries with error: {}",
connect_failures, err
),
});
}
retry_counter.increment_connect_failures(err)?;
}
Err(err) if err.is_timeout() || err.is_body() || err.is_decode() => {
read_failures += 1;
if read_failures >= self.retry_config.read_retries {
return Err(Error::Runtime {
message: format!(
"Request failed after {} read retries with error: {}",
read_failures, err
),
});
}
retry_counter.increment_read_failures(err)?;
}
Ok(_) | Err(_) => return Ok(response?),
Err(err) => {
let status_code = err.status();
return Err(Error::Http {
source: Box::new(err),
request_id: retry_counter.request_id,
status_code,
});
}
Ok((_, response)) => return Ok((retry_counter.request_id, response)),
}
let backoff = self.retry_config.backoff_factor * (2.0f32.powi(request_failures as i32));
let jitter = rand::random::<f32>() * self.retry_config.backoff_jitter;
let sleep_time = Duration::from_secs_f32(backoff + jitter);
debug!(
"Retrying request {:?} ({}/{} connect, {}/{} read, {}/{} read) in {:?}",
req.headers()
.get("x-request-id")
.and_then(|v| v.to_str().ok()),
connect_failures,
self.retry_config.connect_retries,
request_failures,
self.retry_config.retries,
read_failures,
self.retry_config.read_retries,
sleep_time
);
let sleep_time = retry_counter.next_sleep_time();
tokio::time::sleep(sleep_time).await;
}
}
async fn rsp_to_str(response: Response) -> String {
pub async fn check_response(&self, request_id: &str, response: Response) -> Result<Response> {
// Try to get the response text, but if that fails, just return the status code
let status = response.status();
response.text().await.unwrap_or_else(|_| status.to_string())
if status.is_success() {
Ok(response)
} else {
let response_text = response.text().await.ok();
let message = if let Some(response_text) = response_text {
format!("{}: {}", status, response_text)
} else {
status.to_string()
};
Err(Error::Http {
source: message.into(),
request_id: request_id.into(),
status_code: Some(status),
})
}
}
}
struct RetryCounter<'a> {
request_failures: u8,
connect_failures: u8,
read_failures: u8,
config: &'a ResolvedRetryConfig,
request_id: String,
}
impl<'a> RetryCounter<'a> {
fn new(config: &'a ResolvedRetryConfig, request_id: String) -> Self {
Self {
request_failures: 0,
connect_failures: 0,
read_failures: 0,
config,
request_id,
}
}
pub async fn check_response(&self, response: Response) -> Result<Response> {
let status_int: u16 = u16::from(response.status());
if (400..500).contains(&status_int) {
Err(Error::InvalidInput {
message: Self::rsp_to_str(response).await,
})
} else if status_int != 200 {
Err(Error::Runtime {
message: Self::rsp_to_str(response).await,
fn check_out_of_retries(
&self,
source: Box<dyn std::error::Error + Send + Sync>,
status_code: Option<reqwest::StatusCode>,
) -> Result<()> {
if self.request_failures >= self.config.retries
|| self.connect_failures >= self.config.connect_retries
|| self.read_failures >= self.config.read_retries
{
Err(Error::Retry {
request_id: self.request_id.clone(),
request_failures: self.request_failures,
max_request_failures: self.config.retries,
connect_failures: self.connect_failures,
max_connect_failures: self.config.connect_retries,
read_failures: self.read_failures,
max_read_failures: self.config.read_retries,
source,
status_code,
})
} else {
Ok(response)
Ok(())
}
}
fn increment_request_failures(&mut self, source: crate::Error) -> Result<()> {
self.request_failures += 1;
let status_code = if let crate::Error::Http { status_code, .. } = &source {
*status_code
} else {
None
};
self.check_out_of_retries(Box::new(source), status_code)
}
fn increment_connect_failures(&mut self, source: reqwest::Error) -> Result<()> {
self.connect_failures += 1;
let status_code = source.status();
self.check_out_of_retries(Box::new(source), status_code)
}
fn increment_read_failures(&mut self, source: reqwest::Error) -> Result<()> {
self.read_failures += 1;
let status_code = source.status();
self.check_out_of_retries(Box::new(source), status_code)
}
fn next_sleep_time(&self) -> Duration {
let backoff = self.config.backoff_factor * (2.0f32.powi(self.request_failures as i32));
let jitter = rand::random::<f32>() * self.config.backoff_jitter;
let sleep_time = Duration::from_secs_f32(backoff + jitter);
debug!(
"Retrying request {:?} ({}/{} connect, {}/{} read, {}/{} read) in {:?}",
self.request_id,
self.connect_failures,
self.config.connect_retries,
self.request_failures,
self.config.retries,
self.read_failures,
self.config.read_retries,
sleep_time
);
sleep_time
}
}
pub trait RequestResultExt {
type Output;
fn err_to_http(self, request_id: String) -> Result<Self::Output>;
}
impl<T> RequestResultExt for reqwest::Result<T> {
type Output = T;
fn err_to_http(self, request_id: String) -> Result<T> {
self.map_err(|err| {
let status_code = err.status();
Error::Http {
source: Box::new(err),
request_id,
status_code,
}
})
}
}
#[cfg(test)]

View File

@@ -29,7 +29,7 @@ use crate::embeddings::EmbeddingRegistry;
use crate::error::Result;
use crate::Table;
use super::client::{ClientConfig, HttpSend, RestfulLanceDbClient, Sender};
use super::client::{ClientConfig, HttpSend, RequestResultExt, RestfulLanceDbClient, Sender};
use super::table::RemoteTable;
use super::util::batches_to_ipc_bytes;
use super::ARROW_STREAM_CONTENT_TYPE;
@@ -105,9 +105,13 @@ impl<S: HttpSend> ConnectionInternal for RemoteDatabase<S> {
if let Some(start_after) = options.start_after {
req = req.query(&[("page_token", start_after)]);
}
let rsp = self.client.send(req, true).await?;
let rsp = self.client.check_response(rsp).await?;
let tables = rsp.json::<ListTablesResponse>().await?.tables;
let (request_id, rsp) = self.client.send(req, true).await?;
let rsp = self.client.check_response(&request_id, rsp).await?;
let tables = rsp
.json::<ListTablesResponse>()
.await
.err_to_http(request_id)?
.tables;
for table in &tables {
self.table_cache.insert(table.clone(), ()).await;
}
@@ -130,13 +134,11 @@ impl<S: HttpSend> ConnectionInternal for RemoteDatabase<S> {
.client
.post(&format!("/v1/table/{}/create/", options.name))
.body(data_buffer)
.header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE)
// This is currently expected by LanceDb cloud but will be removed soon.
.header("x-request-id", "na");
let rsp = self.client.send(req, false).await?;
.header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE);
let (request_id, rsp) = self.client.send(req, false).await?;
if rsp.status() == StatusCode::BAD_REQUEST {
let body = rsp.text().await?;
let body = rsp.text().await.err_to_http(request_id.clone())?;
if body.contains("already exists") {
return Err(crate::Error::TableAlreadyExists { name: options.name });
} else {
@@ -144,7 +146,7 @@ impl<S: HttpSend> ConnectionInternal for RemoteDatabase<S> {
}
}
self.client.check_response(rsp).await?;
self.client.check_response(&request_id, rsp).await?;
self.table_cache.insert(options.name.clone(), ()).await;
@@ -160,11 +162,11 @@ impl<S: HttpSend> ConnectionInternal for RemoteDatabase<S> {
let req = self
.client
.get(&format!("/v1/table/{}/describe/", options.name));
let resp = self.client.send(req, true).await?;
let (request_id, resp) = self.client.send(req, true).await?;
if resp.status() == StatusCode::NOT_FOUND {
return Err(crate::Error::TableNotFound { name: options.name });
}
self.client.check_response(resp).await?;
self.client.check_response(&request_id, resp).await?;
}
Ok(Table::new(Arc::new(RemoteTable::new(
@@ -178,8 +180,8 @@ impl<S: HttpSend> ConnectionInternal for RemoteDatabase<S> {
.client
.post(&format!("/v1/table/{}/rename/", current_name));
let req = req.json(&serde_json::json!({ "new_table_name": new_name }));
let resp = self.client.send(req, false).await?;
self.client.check_response(resp).await?;
let (request_id, resp) = self.client.send(req, false).await?;
self.client.check_response(&request_id, resp).await?;
self.table_cache.remove(current_name).await;
self.table_cache.insert(new_name.into(), ()).await;
Ok(())
@@ -187,8 +189,8 @@ impl<S: HttpSend> ConnectionInternal for RemoteDatabase<S> {
async fn drop_table(&self, name: &str) -> Result<()> {
let req = self.client.post(&format!("/v1/table/{}/drop/", name));
let resp = self.client.send(req, true).await?;
self.client.check_response(resp).await?;
let (request_id, resp) = self.client.send(req, true).await?;
self.client.check_response(&request_id, resp).await?;
self.table_cache.remove(name).await;
Ok(())
}
@@ -206,16 +208,57 @@ impl<S: HttpSend> ConnectionInternal for RemoteDatabase<S> {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::sync::{Arc, OnceLock};
use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator};
use arrow_schema::{DataType, Field, Schema};
use crate::{
remote::{ARROW_STREAM_CONTENT_TYPE, JSON_CONTENT_TYPE},
Connection,
Connection, Error,
};
#[tokio::test]
async fn test_retries() {
// We'll record the request_id here, to check it matches the one in the error.
let seen_request_id = Arc::new(OnceLock::new());
let seen_request_id_ref = seen_request_id.clone();
let conn = Connection::new_with_handler(move |request| {
// Request id should be the same on each retry.
let request_id = request.headers()["x-request-id"]
.to_str()
.unwrap()
.to_string();
let seen_id = seen_request_id_ref.get_or_init(|| request_id.clone());
assert_eq!(&request_id, seen_id);
http::Response::builder()
.status(500)
.body("internal server error")
.unwrap()
});
let result = conn.table_names().execute().await;
if let Err(Error::Retry {
request_id,
request_failures,
max_request_failures,
source,
..
}) = result
{
let expected_id = seen_request_id.get().unwrap();
assert_eq!(&request_id, expected_id);
assert_eq!(request_failures, max_request_failures);
assert!(
source.to_string().contains("internal server error"),
"source: {:?}",
source
);
} else {
panic!("unexpected result: {:?}", result);
};
}
#[tokio::test]
async fn test_table_names() {
let conn = Connection::new_with_handler(|request| {

View File

@@ -34,6 +34,7 @@ use crate::{
},
};
use super::client::RequestResultExt;
use super::client::{HttpSend, RestfulLanceDbClient, Sender};
use super::{ARROW_STREAM_CONTENT_TYPE, JSON_CONTENT_TYPE};
@@ -53,15 +54,25 @@ impl<S: HttpSend> RemoteTable<S> {
let request = self
.client
.post(&format!("/v1/table/{}/describe/", self.name));
let response = self.client.send(request, true).await?;
let (request_id, response) = self.client.send(request, true).await?;
let response = self.check_table_response(response).await?;
let response = self.check_table_response(&request_id, response).await?;
let body = response.text().await?;
serde_json::from_str(&body).map_err(|e| Error::Http {
message: format!("Failed to parse table description: {}", e),
})
match response.text().await {
Ok(body) => serde_json::from_str(&body).map_err(|e| Error::Http {
source: format!("Failed to parse table description: {}", e).into(),
request_id,
status_code: None,
}),
Err(err) => {
let status_code = err.status();
Err(Error::Http {
source: Box::new(err),
request_id,
status_code,
})
}
}
}
fn reader_as_body(data: Box<dyn RecordBatchReader + Send>) -> Result<reqwest::Body> {
@@ -87,18 +98,23 @@ impl<S: HttpSend> RemoteTable<S> {
Ok(reqwest::Body::wrap_stream(body_stream))
}
async fn check_table_response(&self, response: reqwest::Response) -> Result<reqwest::Response> {
async fn check_table_response(
&self,
request_id: &str,
response: reqwest::Response,
) -> Result<reqwest::Response> {
if response.status() == StatusCode::NOT_FOUND {
return Err(Error::TableNotFound {
name: self.name.clone(),
});
}
self.client.check_response(response).await
self.client.check_response(request_id, response).await
}
async fn read_arrow_stream(
&self,
request_id: &str,
body: reqwest::Response,
) -> Result<SendableRecordBatchStream> {
// Assert that the content type is correct
@@ -106,24 +122,31 @@ impl<S: HttpSend> RemoteTable<S> {
.headers()
.get(CONTENT_TYPE)
.ok_or_else(|| Error::Http {
message: "Missing content type".into(),
source: "Missing content type".into(),
request_id: request_id.to_string(),
status_code: None,
})?
.to_str()
.map_err(|e| Error::Http {
message: format!("Failed to parse content type: {}", e),
source: format!("Failed to parse content type: {}", e).into(),
request_id: request_id.to_string(),
status_code: None,
})?;
if content_type != ARROW_STREAM_CONTENT_TYPE {
return Err(Error::Http {
message: format!(
source: format!(
"Expected content type {}, got {}",
ARROW_STREAM_CONTENT_TYPE, content_type
),
)
.into(),
request_id: request_id.to_string(),
status_code: None,
});
}
// There isn't a way to actually stream this data yet. I have an upstream issue:
// https://github.com/apache/arrow-rs/issues/6420
let body = body.bytes().await?;
let body = body.bytes().await.err_to_http(request_id.into())?;
let reader = StreamReader::try_new(body.reader(), None)?;
let schema = reader.schema();
let stream = futures::stream::iter(reader).map_err(DataFusionError::from);
@@ -259,14 +282,16 @@ impl<S: HttpSend> TableInternal for RemoteTable<S> {
request = request.json(&serde_json::json!({}));
}
let response = self.client.send(request, true).await?;
let (request_id, response) = self.client.send(request, true).await?;
let response = self.check_table_response(response).await?;
let response = self.check_table_response(&request_id, response).await?;
let body = response.text().await?;
let body = response.text().await.err_to_http(request_id.clone())?;
serde_json::from_str(&body).map_err(|e| Error::Http {
message: format!("Failed to parse row count: {}", e),
source: format!("Failed to parse row count: {}", e).into(),
request_id,
status_code: None,
})
}
async fn add(
@@ -288,9 +313,9 @@ impl<S: HttpSend> TableInternal for RemoteTable<S> {
}
}
let response = self.client.send(request, false).await?;
let (request_id, response) = self.client.send(request, false).await?;
self.check_table_response(response).await?;
self.check_table_response(&request_id, response).await?;
Ok(())
}
@@ -339,9 +364,9 @@ impl<S: HttpSend> TableInternal for RemoteTable<S> {
let request = request.json(&body);
let response = self.client.send(request, true).await?;
let (request_id, response) = self.client.send(request, true).await?;
let stream = self.read_arrow_stream(response).await?;
let stream = self.read_arrow_stream(&request_id, response).await?;
Ok(Arc::new(OneShotExec::new(stream)))
}
@@ -361,9 +386,9 @@ impl<S: HttpSend> TableInternal for RemoteTable<S> {
let request = request.json(&body);
let response = self.client.send(request, true).await?;
let (request_id, response) = self.client.send(request, true).await?;
let stream = self.read_arrow_stream(response).await?;
let stream = self.read_arrow_stream(&request_id, response).await?;
Ok(DatasetRecordBatchStream::new(stream))
}
@@ -383,17 +408,20 @@ impl<S: HttpSend> TableInternal for RemoteTable<S> {
"only_if": update.filter,
}));
let response = self.client.send(request, false).await?;
let (request_id, response) = self.client.send(request, false).await?;
let response = self.check_table_response(response).await?;
let response = self.check_table_response(&request_id, response).await?;
let body = response.text().await?;
let body = response.text().await.err_to_http(request_id.clone())?;
serde_json::from_str(&body).map_err(|e| Error::Http {
message: format!(
source: format!(
"Failed to parse updated rows result from response {}: {}",
body, e
),
)
.into(),
request_id,
status_code: None,
})
}
async fn delete(&self, predicate: &str) -> Result<()> {
@@ -402,8 +430,8 @@ impl<S: HttpSend> TableInternal for RemoteTable<S> {
.client
.post(&format!("/v1/table/{}/delete/", self.name))
.json(&body);
let response = self.client.send(request, false).await?;
self.check_table_response(response).await?;
let (request_id, response) = self.client.send(request, false).await?;
self.check_table_response(&request_id, response).await?;
Ok(())
}
@@ -474,9 +502,9 @@ impl<S: HttpSend> TableInternal for RemoteTable<S> {
let request = request.json(&body);
let response = self.client.send(request, false).await?;
let (request_id, response) = self.client.send(request, false).await?;
self.check_table_response(response).await?;
self.check_table_response(&request_id, response).await?;
Ok(())
}
@@ -495,9 +523,9 @@ impl<S: HttpSend> TableInternal for RemoteTable<S> {
.header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE)
.body(body);
let response = self.client.send(request, false).await?;
let (request_id, response) = self.client.send(request, false).await?;
self.check_table_response(response).await?;
self.check_table_response(&request_id, response).await?;
Ok(())
}
@@ -525,28 +553,79 @@ impl<S: HttpSend> TableInternal for RemoteTable<S> {
message: "drop_columns is not yet supported.".into(),
})
}
async fn list_indices(&self) -> Result<Vec<IndexConfig>> {
Err(Error::NotSupported {
message: "list_indices is not yet supported.".into(),
})
// Make request to list the indices
let request = self
.client
.post(&format!("/v1/table/{}/index/list/", self.name));
let (request_id, response) = self.client.send(request, true).await?;
let response = self.check_table_response(&request_id, response).await?;
#[derive(Deserialize)]
struct ListIndicesResponse {
indexes: Vec<IndexConfigResponse>,
}
#[derive(Deserialize)]
struct IndexConfigResponse {
index_name: String,
columns: Vec<String>,
}
let body = response.text().await.err_to_http(request_id.clone())?;
let body: ListIndicesResponse = serde_json::from_str(&body).map_err(|err| Error::Http {
source: format!(
"Failed to parse list_indices response: {}, body: {}",
err, body
)
.into(),
request_id,
status_code: None,
})?;
// Make request to get stats for each index, so we get the index type.
// This is a bit inefficient, but it's the only way to get the index type.
let mut futures = Vec::with_capacity(body.indexes.len());
for index in body.indexes {
let future = async move {
match self.index_stats(&index.index_name).await {
Ok(Some(stats)) => Ok(Some(IndexConfig {
name: index.index_name,
index_type: stats.index_type,
columns: index.columns,
})),
Ok(None) => Ok(None), // The index must have been deleted since we listed it.
Err(e) => Err(e),
}
};
futures.push(future);
}
let results = futures::future::try_join_all(futures).await?;
let index_configs = results.into_iter().flatten().collect();
Ok(index_configs)
}
async fn index_stats(&self, index_name: &str) -> Result<Option<IndexStatistics>> {
let request = self.client.post(&format!(
"/v1/table/{}/index/{}/stats/",
self.name, index_name
));
let response = self.client.send(request, true).await?;
let (request_id, response) = self.client.send(request, true).await?;
if response.status() == StatusCode::NOT_FOUND {
return Ok(None);
}
let response = self.check_table_response(response).await?;
let response = self.check_table_response(&request_id, response).await?;
let body = response.text().await?;
let body = response.text().await.err_to_http(request_id.clone())?;
let stats = serde_json::from_str(&body).map_err(|e| Error::Http {
message: format!("Failed to parse index statistics: {}", e),
source: format!("Failed to parse index statistics: {}", e).into(),
request_id,
status_code: None,
})?;
Ok(Some(stats))
@@ -1181,6 +1260,69 @@ mod tests {
}
}
#[tokio::test]
async fn test_list_indices() {
let table = Table::new_with_handler("my_table", |request| {
assert_eq!(request.method(), "POST");
let response_body = match request.url().path() {
"/v1/table/my_table/index/list/" => {
serde_json::json!({
"indexes": [
{
"index_name": "vector_idx",
"index_uuid": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
"columns": ["vector"],
"index_status": "done",
},
{
"index_name": "my_idx",
"index_uuid": "34255f64-5717-4562-b3fc-2c963f66afa6",
"columns": ["my_column"],
"index_status": "done",
},
]
})
}
"/v1/table/my_table/index/vector_idx/stats/" => {
serde_json::json!({
"num_indexed_rows": 100000,
"num_unindexed_rows": 0,
"index_type": "IVF_PQ",
"distance_type": "l2"
})
}
"/v1/table/my_table/index/my_idx/stats/" => {
serde_json::json!({
"num_indexed_rows": 100000,
"num_unindexed_rows": 0,
"index_type": "LABEL_LIST"
})
}
path => panic!("Unexpected path: {}", path),
};
http::Response::builder()
.status(200)
.body(serde_json::to_string(&response_body).unwrap())
.unwrap()
});
let indices = table.list_indices().await.unwrap();
let expected = vec![
IndexConfig {
name: "vector_idx".into(),
index_type: IndexType::IvfPq,
columns: vec!["vector".into()],
},
IndexConfig {
name: "my_idx".into(),
index_type: IndexType::LabelList,
columns: vec!["my_column".into()],
},
];
assert_eq!(indices, expected);
}
#[tokio::test]
async fn test_index_stats() {
let table = Table::new_with_handler("my_table", |request| {