mirror of
https://github.com/lancedb/lancedb.git
synced 2025-12-23 05:19:58 +00:00
Compare commits
8 Commits
read-consi
...
python-v0.
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
515ab5f417 | ||
|
|
8d0055fe6b | ||
|
|
5f9d8509b3 | ||
|
|
f3b6a1f55b | ||
|
|
aff25e3bf9 | ||
|
|
8509f73221 | ||
|
|
607476788e | ||
|
|
4d458d5829 |
@@ -66,6 +66,32 @@ glob = "nodejs/npm/*/package.json"
|
||||
replace = "\"version\": \"{new_version}\","
|
||||
search = "\"version\": \"{current_version}\","
|
||||
|
||||
# vectodb node binary packages
|
||||
[[tool.bumpversion.files]]
|
||||
glob = "node/package.json"
|
||||
replace = "\"@lancedb/vectordb-darwin-arm64\": \"{new_version}\""
|
||||
search = "\"@lancedb/vectordb-darwin-arm64\": \"{current_version}\""
|
||||
|
||||
[[tool.bumpversion.files]]
|
||||
glob = "node/package.json"
|
||||
replace = "\"@lancedb/vectordb-darwin-x64\": \"{new_version}\""
|
||||
search = "\"@lancedb/vectordb-darwin-x64\": \"{current_version}\""
|
||||
|
||||
[[tool.bumpversion.files]]
|
||||
glob = "node/package.json"
|
||||
replace = "\"@lancedb/vectordb-linux-arm64-gnu\": \"{new_version}\""
|
||||
search = "\"@lancedb/vectordb-linux-arm64-gnu\": \"{current_version}\""
|
||||
|
||||
[[tool.bumpversion.files]]
|
||||
glob = "node/package.json"
|
||||
replace = "\"@lancedb/vectordb-linux-x64-gnu\": \"{new_version}\""
|
||||
search = "\"@lancedb/vectordb-linux-x64-gnu\": \"{current_version}\""
|
||||
|
||||
[[tool.bumpversion.files]]
|
||||
glob = "node/package.json"
|
||||
replace = "\"@lancedb/vectordb-win32-x64-msvc\": \"{new_version}\""
|
||||
search = "\"@lancedb/vectordb-win32-x64-msvc\": \"{current_version}\""
|
||||
|
||||
# Cargo files
|
||||
# ------------
|
||||
[[tool.bumpversion.files]]
|
||||
@@ -77,3 +103,8 @@ search = "\nversion = \"{current_version}\""
|
||||
filename = "rust/lancedb/Cargo.toml"
|
||||
replace = "\nversion = \"{new_version}\""
|
||||
search = "\nversion = \"{current_version}\""
|
||||
|
||||
[[tool.bumpversion.files]]
|
||||
filename = "nodejs/Cargo.toml"
|
||||
replace = "\nversion = \"{new_version}\""
|
||||
search = "\nversion = \"{current_version}\""
|
||||
|
||||
18
Cargo.toml
18
Cargo.toml
@@ -20,13 +20,13 @@ keywords = ["lancedb", "lance", "database", "vector", "search"]
|
||||
categories = ["database-implementations"]
|
||||
|
||||
[workspace.dependencies]
|
||||
lance = { "version" = "=0.18.0", "features" = ["dynamodb"] }
|
||||
lance-index = { "version" = "=0.18.0" }
|
||||
lance-linalg = { "version" = "=0.18.0" }
|
||||
lance-table = { "version" = "=0.18.0" }
|
||||
lance-testing = { "version" = "=0.18.0" }
|
||||
lance-datafusion = { "version" = "=0.18.0" }
|
||||
lance-encoding = { "version" = "=0.18.0" }
|
||||
lance = { "version" = "=0.18.2", "features" = ["dynamodb"] }
|
||||
lance-index = { "version" = "=0.18.2" }
|
||||
lance-linalg = { "version" = "=0.18.2" }
|
||||
lance-table = { "version" = "=0.18.2" }
|
||||
lance-testing = { "version" = "=0.18.2" }
|
||||
lance-datafusion = { "version" = "=0.18.2" }
|
||||
lance-encoding = { "version" = "=0.18.2" }
|
||||
# Note that this one does not include pyarrow
|
||||
arrow = { version = "52.2", optional = false }
|
||||
arrow-array = "52.2"
|
||||
@@ -38,8 +38,8 @@ arrow-arith = "52.2"
|
||||
arrow-cast = "52.2"
|
||||
async-trait = "0"
|
||||
chrono = "0.4.35"
|
||||
datafusion-common = "40.0"
|
||||
datafusion-physical-plan = "40.0"
|
||||
datafusion-common = "41.0"
|
||||
datafusion-physical-plan = "41.0"
|
||||
half = { "version" = "=2.4.1", default-features = false, features = [
|
||||
"num-traits",
|
||||
] }
|
||||
|
||||
1436
node/package-lock.json
generated
1436
node/package-lock.json
generated
File diff suppressed because it is too large
Load Diff
@@ -58,7 +58,7 @@
|
||||
"ts-node-dev": "^2.0.0",
|
||||
"typedoc": "^0.24.7",
|
||||
"typedoc-plugin-markdown": "^3.15.3",
|
||||
"typescript": "*",
|
||||
"typescript": "^5.1.0",
|
||||
"uuid": "^9.0.0"
|
||||
},
|
||||
"dependencies": {
|
||||
@@ -88,10 +88,10 @@
|
||||
}
|
||||
},
|
||||
"optionalDependencies": {
|
||||
"@lancedb/vectordb-darwin-arm64": "0.4.20",
|
||||
"@lancedb/vectordb-darwin-x64": "0.4.20",
|
||||
"@lancedb/vectordb-linux-arm64-gnu": "0.4.20",
|
||||
"@lancedb/vectordb-linux-x64-gnu": "0.4.20",
|
||||
"@lancedb/vectordb-win32-x64-msvc": "0.4.20"
|
||||
"@lancedb/vectordb-darwin-arm64": "0.11.0-beta.1",
|
||||
"@lancedb/vectordb-darwin-x64": "0.11.0-beta.1",
|
||||
"@lancedb/vectordb-linux-arm64-gnu": "0.11.0-beta.1",
|
||||
"@lancedb/vectordb-linux-x64-gnu": "0.11.0-beta.1",
|
||||
"@lancedb/vectordb-win32-x64-msvc": "0.11.0-beta.1"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
import { describe } from 'mocha'
|
||||
import * as chai from 'chai'
|
||||
import { assert } from 'chai'
|
||||
import * as chaiAsPromised from 'chai-as-promised'
|
||||
import { v4 as uuidv4 } from 'uuid'
|
||||
|
||||
@@ -22,7 +23,6 @@ import { tmpdir } from 'os'
|
||||
import * as fs from 'fs'
|
||||
import * as path from 'path'
|
||||
|
||||
const assert = chai.assert
|
||||
chai.use(chaiAsPromised)
|
||||
|
||||
describe('LanceDB AWS Integration test', function () {
|
||||
|
||||
@@ -142,9 +142,9 @@ export class Query<T = number[]> {
|
||||
Object.keys(entry).forEach((key: string) => {
|
||||
if (entry[key] instanceof Vector) {
|
||||
// toJSON() returns f16 array correctly
|
||||
newObject[key] = (entry[key] as Vector).toJSON()
|
||||
newObject[key] = (entry[key] as any).toJSON()
|
||||
} else {
|
||||
newObject[key] = entry[key]
|
||||
newObject[key] = entry[key] as any
|
||||
}
|
||||
})
|
||||
return newObject as unknown as T
|
||||
|
||||
@@ -247,9 +247,9 @@ export class RemoteQuery<T = number[]> extends Query<T> {
|
||||
const newObject: Record<string, unknown> = {}
|
||||
Object.keys(entry).forEach((key: string) => {
|
||||
if (entry[key] instanceof Vector) {
|
||||
newObject[key] = (entry[key] as Vector).toArray()
|
||||
newObject[key] = (entry[key] as any).toArray()
|
||||
} else {
|
||||
newObject[key] = entry[key]
|
||||
newObject[key] = entry[key] as any
|
||||
}
|
||||
})
|
||||
return newObject as unknown as T
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
import { describe } from "mocha";
|
||||
import { track } from "temp";
|
||||
import { assert, expect } from 'chai'
|
||||
import * as chai from "chai";
|
||||
import * as chaiAsPromised from "chai-as-promised";
|
||||
|
||||
@@ -44,8 +45,6 @@ import {
|
||||
} from "apache-arrow";
|
||||
import type { RemoteRequest, RemoteResponse } from "../middleware";
|
||||
|
||||
const expect = chai.expect;
|
||||
const assert = chai.assert;
|
||||
chai.use(chaiAsPromised);
|
||||
|
||||
describe("LanceDB client", function () {
|
||||
@@ -169,7 +168,7 @@ describe("LanceDB client", function () {
|
||||
|
||||
// Should reject a bad filter
|
||||
await expect(table.filter("id % 2 = 0 AND").execute()).to.be.rejectedWith(
|
||||
/.*sql parser error: Expected an expression:, found: EOF.*/
|
||||
/.*sql parser error: .*/
|
||||
);
|
||||
});
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
[package]
|
||||
name = "lancedb-nodejs"
|
||||
edition.workspace = true
|
||||
version = "0.0.0"
|
||||
version = "0.11.0-beta.1"
|
||||
license.workspace = true
|
||||
description.workspace = true
|
||||
repository.workspace = true
|
||||
@@ -14,7 +14,7 @@ crate-type = ["cdylib"]
|
||||
[dependencies]
|
||||
arrow-ipc.workspace = true
|
||||
futures.workspace = true
|
||||
lancedb = { path = "../rust/lancedb" }
|
||||
lancedb = { path = "../rust/lancedb", features = ["remote"] }
|
||||
napi = { version = "2.16.8", default-features = false, features = [
|
||||
"napi9",
|
||||
"async",
|
||||
|
||||
93
nodejs/__test__/remote.test.ts
Normal file
93
nodejs/__test__/remote.test.ts
Normal file
@@ -0,0 +1,93 @@
|
||||
// Copyright 2024 Lance Developers.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
import * as http from "http";
|
||||
import { RequestListener } from "http";
|
||||
import { Connection, ConnectionOptions, connect } from "../lancedb";
|
||||
|
||||
async function withMockDatabase(
|
||||
listener: RequestListener,
|
||||
callback: (db: Connection) => void,
|
||||
connectionOptions?: ConnectionOptions,
|
||||
) {
|
||||
const server = http.createServer(listener);
|
||||
server.listen(8000);
|
||||
|
||||
const db = await connect(
|
||||
"db://dev",
|
||||
Object.assign(
|
||||
{
|
||||
apiKey: "fake",
|
||||
hostOverride: "http://localhost:8000",
|
||||
},
|
||||
connectionOptions,
|
||||
),
|
||||
);
|
||||
|
||||
try {
|
||||
await callback(db);
|
||||
} finally {
|
||||
server.close();
|
||||
}
|
||||
}
|
||||
|
||||
describe("remote connection", () => {
|
||||
it("should accept partial connection options", async () => {
|
||||
await connect("db://test", {
|
||||
apiKey: "fake",
|
||||
clientConfig: {
|
||||
timeoutConfig: { readTimeout: 5 },
|
||||
retryConfig: { retries: 2 },
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it("should pass down apiKey and userAgent", async () => {
|
||||
await withMockDatabase(
|
||||
(req, res) => {
|
||||
expect(req.headers["x-api-key"]).toEqual("fake");
|
||||
expect(req.headers["user-agent"]).toEqual(
|
||||
`LanceDB-Node-Client/${process.env.npm_package_version}`,
|
||||
);
|
||||
|
||||
const body = JSON.stringify({ tables: [] });
|
||||
res.writeHead(200, { "Content-Type": "application/json" }).end(body);
|
||||
},
|
||||
async (db) => {
|
||||
const tableNames = await db.tableNames();
|
||||
expect(tableNames).toEqual([]);
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
it("allows customizing user agent", async () => {
|
||||
await withMockDatabase(
|
||||
(req, res) => {
|
||||
expect(req.headers["user-agent"]).toEqual("MyApp/1.0");
|
||||
|
||||
const body = JSON.stringify({ tables: [] });
|
||||
res.writeHead(200, { "Content-Type": "application/json" }).end(body);
|
||||
},
|
||||
async (db) => {
|
||||
const tableNames = await db.tableNames();
|
||||
expect(tableNames).toEqual([]);
|
||||
},
|
||||
{
|
||||
clientConfig: {
|
||||
userAgent: "MyApp/1.0",
|
||||
},
|
||||
},
|
||||
);
|
||||
});
|
||||
});
|
||||
@@ -23,8 +23,6 @@ import {
|
||||
Connection as LanceDbConnection,
|
||||
} from "./native.js";
|
||||
|
||||
import { RemoteConnection, RemoteConnectionOptions } from "./remote";
|
||||
|
||||
export {
|
||||
WriteOptions,
|
||||
WriteMode,
|
||||
@@ -33,6 +31,9 @@ export {
|
||||
ConnectionOptions,
|
||||
IndexStatistics,
|
||||
IndexConfig,
|
||||
ClientConfig,
|
||||
TimeoutConfig,
|
||||
RetryConfig,
|
||||
} from "./native.js";
|
||||
|
||||
export {
|
||||
@@ -87,7 +88,7 @@ export * as embedding from "./embedding";
|
||||
*/
|
||||
export async function connect(
|
||||
uri: string,
|
||||
opts?: Partial<ConnectionOptions | RemoteConnectionOptions>,
|
||||
opts?: Partial<ConnectionOptions>,
|
||||
): Promise<Connection>;
|
||||
/**
|
||||
* Connect to a LanceDB instance at the given URI.
|
||||
@@ -108,13 +109,11 @@ export async function connect(
|
||||
* ```
|
||||
*/
|
||||
export async function connect(
|
||||
opts: Partial<RemoteConnectionOptions | ConnectionOptions> & { uri: string },
|
||||
opts: Partial<ConnectionOptions> & { uri: string },
|
||||
): Promise<Connection>;
|
||||
export async function connect(
|
||||
uriOrOptions:
|
||||
| string
|
||||
| (Partial<RemoteConnectionOptions | ConnectionOptions> & { uri: string }),
|
||||
opts: Partial<ConnectionOptions | RemoteConnectionOptions> = {},
|
||||
uriOrOptions: string | (Partial<ConnectionOptions> & { uri: string }),
|
||||
opts: Partial<ConnectionOptions> = {},
|
||||
): Promise<Connection> {
|
||||
let uri: string | undefined;
|
||||
if (typeof uriOrOptions !== "string") {
|
||||
@@ -129,9 +128,6 @@ export async function connect(
|
||||
throw new Error("uri is required");
|
||||
}
|
||||
|
||||
if (uri?.startsWith("db://")) {
|
||||
return new RemoteConnection(uri, opts as RemoteConnectionOptions);
|
||||
}
|
||||
opts = (opts as ConnectionOptions) ?? {};
|
||||
(<ConnectionOptions>opts).storageOptions = cleanseStorageOptions(
|
||||
(<ConnectionOptions>opts).storageOptions,
|
||||
|
||||
@@ -1,218 +0,0 @@
|
||||
// Copyright 2023 LanceDB Developers.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
import axios, {
|
||||
AxiosError,
|
||||
type AxiosResponse,
|
||||
type ResponseType,
|
||||
} from "axios";
|
||||
import { Table as ArrowTable } from "../arrow";
|
||||
import { tableFromIPC } from "../arrow";
|
||||
import { VectorQuery } from "../query";
|
||||
|
||||
export class RestfulLanceDBClient {
|
||||
#dbName: string;
|
||||
#region: string;
|
||||
#apiKey: string;
|
||||
#hostOverride?: string;
|
||||
#closed: boolean = false;
|
||||
#timeout: number = 12 * 1000; // 12 seconds;
|
||||
#session?: import("axios").AxiosInstance;
|
||||
|
||||
constructor(
|
||||
dbName: string,
|
||||
apiKey: string,
|
||||
region: string,
|
||||
hostOverride?: string,
|
||||
timeout?: number,
|
||||
) {
|
||||
this.#dbName = dbName;
|
||||
this.#apiKey = apiKey;
|
||||
this.#region = region;
|
||||
this.#hostOverride = hostOverride ?? this.#hostOverride;
|
||||
this.#timeout = timeout ?? this.#timeout;
|
||||
}
|
||||
|
||||
// todo: cache the session.
|
||||
get session(): import("axios").AxiosInstance {
|
||||
if (this.#session !== undefined) {
|
||||
return this.#session;
|
||||
} else {
|
||||
return axios.create({
|
||||
baseURL: this.url,
|
||||
headers: {
|
||||
// biome-ignore lint: external API
|
||||
Authorization: `Bearer ${this.#apiKey}`,
|
||||
},
|
||||
transformResponse: decodeErrorData,
|
||||
timeout: this.#timeout,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
get url(): string {
|
||||
return (
|
||||
this.#hostOverride ??
|
||||
`https://${this.#dbName}.${this.#region}.api.lancedb.com`
|
||||
);
|
||||
}
|
||||
|
||||
get headers(): { [key: string]: string } {
|
||||
const headers: { [key: string]: string } = {
|
||||
"x-api-key": this.#apiKey,
|
||||
"x-request-id": "na",
|
||||
};
|
||||
if (this.#region == "local") {
|
||||
headers["Host"] = `${this.#dbName}.${this.#region}.api.lancedb.com`;
|
||||
}
|
||||
if (this.#hostOverride) {
|
||||
headers["x-lancedb-database"] = this.#dbName;
|
||||
}
|
||||
return headers;
|
||||
}
|
||||
|
||||
isOpen(): boolean {
|
||||
return !this.#closed;
|
||||
}
|
||||
|
||||
private checkNotClosed(): void {
|
||||
if (this.#closed) {
|
||||
throw new Error("Connection is closed");
|
||||
}
|
||||
}
|
||||
|
||||
close(): void {
|
||||
this.#session = undefined;
|
||||
this.#closed = true;
|
||||
}
|
||||
|
||||
// biome-ignore lint/suspicious/noExplicitAny: <explanation>
|
||||
async get(uri: string, params?: Record<string, any>): Promise<any> {
|
||||
this.checkNotClosed();
|
||||
uri = new URL(uri, this.url).toString();
|
||||
let response;
|
||||
try {
|
||||
response = await this.session.get(uri, {
|
||||
headers: this.headers,
|
||||
params,
|
||||
});
|
||||
} catch (e) {
|
||||
if (e instanceof AxiosError && e.response) {
|
||||
response = e.response;
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
RestfulLanceDBClient.checkStatus(response!);
|
||||
return response!.data;
|
||||
}
|
||||
|
||||
// biome-ignore lint/suspicious/noExplicitAny: api response
|
||||
async post(uri: string, body?: any): Promise<any>;
|
||||
async post(
|
||||
uri: string,
|
||||
// biome-ignore lint/suspicious/noExplicitAny: api request
|
||||
body: any,
|
||||
additional: {
|
||||
config?: { responseType: "arraybuffer" };
|
||||
headers?: Record<string, string>;
|
||||
params?: Record<string, string>;
|
||||
},
|
||||
): Promise<Buffer>;
|
||||
async post(
|
||||
uri: string,
|
||||
// biome-ignore lint/suspicious/noExplicitAny: api request
|
||||
body?: any,
|
||||
additional?: {
|
||||
config?: { responseType: ResponseType };
|
||||
headers?: Record<string, string>;
|
||||
params?: Record<string, string>;
|
||||
},
|
||||
// biome-ignore lint/suspicious/noExplicitAny: api response
|
||||
): Promise<any> {
|
||||
this.checkNotClosed();
|
||||
uri = new URL(uri, this.url).toString();
|
||||
additional = Object.assign(
|
||||
{ config: { responseType: "json" } },
|
||||
additional,
|
||||
);
|
||||
|
||||
const headers = { ...this.headers, ...additional.headers };
|
||||
|
||||
if (!headers["Content-Type"]) {
|
||||
headers["Content-Type"] = "application/json";
|
||||
}
|
||||
let response;
|
||||
try {
|
||||
response = await this.session.post(uri, body, {
|
||||
headers,
|
||||
responseType: additional!.config!.responseType,
|
||||
params: new Map(Object.entries(additional.params ?? {})),
|
||||
});
|
||||
} catch (e) {
|
||||
if (e instanceof AxiosError && e.response) {
|
||||
response = e.response;
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
RestfulLanceDBClient.checkStatus(response!);
|
||||
if (additional!.config!.responseType === "arraybuffer") {
|
||||
return response!.data;
|
||||
} else {
|
||||
return JSON.parse(response!.data);
|
||||
}
|
||||
}
|
||||
|
||||
async listTables(limit = 10, pageToken = ""): Promise<string[]> {
|
||||
const json = await this.get("/v1/table", { limit, pageToken });
|
||||
return json.tables;
|
||||
}
|
||||
|
||||
async query(tableName: string, query: VectorQuery): Promise<ArrowTable> {
|
||||
const tbl = await this.post(`/v1/table/${tableName}/query`, query, {
|
||||
config: {
|
||||
responseType: "arraybuffer",
|
||||
},
|
||||
});
|
||||
return tableFromIPC(tbl);
|
||||
}
|
||||
|
||||
static checkStatus(response: AxiosResponse): void {
|
||||
if (response.status === 404) {
|
||||
throw new Error(`Not found: ${response.data}`);
|
||||
} else if (response.status >= 400 && response.status < 500) {
|
||||
throw new Error(
|
||||
`Bad Request: ${response.status}, error: ${response.data}`,
|
||||
);
|
||||
} else if (response.status >= 500 && response.status < 600) {
|
||||
throw new Error(
|
||||
`Internal Server Error: ${response.status}, error: ${response.data}`,
|
||||
);
|
||||
} else if (response.status !== 200) {
|
||||
throw new Error(
|
||||
`Unknown Error: ${response.status}, error: ${response.data}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function decodeErrorData(data: unknown) {
|
||||
if (Buffer.isBuffer(data)) {
|
||||
const decoded = data.toString("utf-8");
|
||||
return decoded;
|
||||
}
|
||||
return data;
|
||||
}
|
||||
@@ -1,193 +0,0 @@
|
||||
import { Schema } from "apache-arrow";
|
||||
import {
|
||||
Data,
|
||||
SchemaLike,
|
||||
fromTableToStreamBuffer,
|
||||
makeEmptyTable,
|
||||
} from "../arrow";
|
||||
import {
|
||||
Connection,
|
||||
CreateTableOptions,
|
||||
OpenTableOptions,
|
||||
TableNamesOptions,
|
||||
} from "../connection";
|
||||
import { Table } from "../table";
|
||||
import { TTLCache } from "../util";
|
||||
import { RestfulLanceDBClient } from "./client";
|
||||
import { RemoteTable } from "./table";
|
||||
|
||||
export interface RemoteConnectionOptions {
|
||||
apiKey?: string;
|
||||
region?: string;
|
||||
hostOverride?: string;
|
||||
timeout?: number;
|
||||
}
|
||||
|
||||
export class RemoteConnection extends Connection {
|
||||
#dbName: string;
|
||||
#apiKey: string;
|
||||
#region: string;
|
||||
#client: RestfulLanceDBClient;
|
||||
#tableCache = new TTLCache(300_000);
|
||||
|
||||
constructor(
|
||||
url: string,
|
||||
{ apiKey, region, hostOverride, timeout }: RemoteConnectionOptions,
|
||||
) {
|
||||
super();
|
||||
apiKey = apiKey ?? process.env.LANCEDB_API_KEY;
|
||||
region = region ?? process.env.LANCEDB_REGION;
|
||||
|
||||
if (!apiKey) {
|
||||
throw new Error("apiKey is required when connecting to LanceDB Cloud");
|
||||
}
|
||||
|
||||
if (!region) {
|
||||
throw new Error("region is required when connecting to LanceDB Cloud");
|
||||
}
|
||||
|
||||
const parsed = new URL(url);
|
||||
if (parsed.protocol !== "db:") {
|
||||
throw new Error(
|
||||
`invalid protocol: ${parsed.protocol}, only accepts db://`,
|
||||
);
|
||||
}
|
||||
|
||||
this.#dbName = parsed.hostname;
|
||||
this.#apiKey = apiKey;
|
||||
this.#region = region;
|
||||
this.#client = new RestfulLanceDBClient(
|
||||
this.#dbName,
|
||||
this.#apiKey,
|
||||
this.#region,
|
||||
hostOverride,
|
||||
timeout,
|
||||
);
|
||||
}
|
||||
|
||||
isOpen(): boolean {
|
||||
return this.#client.isOpen();
|
||||
}
|
||||
close(): void {
|
||||
return this.#client.close();
|
||||
}
|
||||
|
||||
display(): string {
|
||||
return `RemoteConnection(${this.#dbName})`;
|
||||
}
|
||||
|
||||
async tableNames(options?: Partial<TableNamesOptions>): Promise<string[]> {
|
||||
const response = await this.#client.get("/v1/table/", {
|
||||
limit: options?.limit ?? 10,
|
||||
// biome-ignore lint/style/useNamingConvention: <explanation>
|
||||
page_token: options?.startAfter ?? "",
|
||||
});
|
||||
const body = await response.body();
|
||||
for (const table of body.tables) {
|
||||
this.#tableCache.set(table, true);
|
||||
}
|
||||
return body.tables;
|
||||
}
|
||||
|
||||
async openTable(
|
||||
name: string,
|
||||
_options?: Partial<OpenTableOptions> | undefined,
|
||||
): Promise<Table> {
|
||||
if (this.#tableCache.get(name) === undefined) {
|
||||
await this.#client.post(
|
||||
`/v1/table/${encodeURIComponent(name)}/describe/`,
|
||||
);
|
||||
this.#tableCache.set(name, true);
|
||||
}
|
||||
return new RemoteTable(this.#client, name, this.#dbName);
|
||||
}
|
||||
|
||||
async createTable(
|
||||
nameOrOptions:
|
||||
| string
|
||||
| ({ name: string; data: Data } & Partial<CreateTableOptions>),
|
||||
data?: Data,
|
||||
options?: Partial<CreateTableOptions> | undefined,
|
||||
): Promise<Table> {
|
||||
if (typeof nameOrOptions !== "string" && "name" in nameOrOptions) {
|
||||
const { name, data, ...options } = nameOrOptions;
|
||||
return this.createTable(name, data, options);
|
||||
}
|
||||
if (data === undefined) {
|
||||
throw new Error("data is required");
|
||||
}
|
||||
if (options?.mode) {
|
||||
console.warn(
|
||||
"option 'mode' is not supported in LanceDB Cloud",
|
||||
"LanceDB Cloud only supports the default 'create' mode.",
|
||||
"If the table already exists, an error will be thrown.",
|
||||
);
|
||||
}
|
||||
if (options?.embeddingFunction) {
|
||||
console.warn(
|
||||
"embedding_functions is not yet supported on LanceDB Cloud.",
|
||||
"Please vote https://github.com/lancedb/lancedb/issues/626 ",
|
||||
"for this feature.",
|
||||
);
|
||||
}
|
||||
|
||||
const { buf } = await Table.parseTableData(
|
||||
data,
|
||||
options,
|
||||
true /** streaming */,
|
||||
);
|
||||
|
||||
await this.#client.post(
|
||||
`/v1/table/${encodeURIComponent(nameOrOptions)}/create/`,
|
||||
buf,
|
||||
{
|
||||
config: {
|
||||
responseType: "arraybuffer",
|
||||
},
|
||||
headers: { "Content-Type": "application/vnd.apache.arrow.stream" },
|
||||
},
|
||||
);
|
||||
this.#tableCache.set(nameOrOptions, true);
|
||||
return new RemoteTable(this.#client, nameOrOptions, this.#dbName);
|
||||
}
|
||||
|
||||
async createEmptyTable(
|
||||
name: string,
|
||||
schema: SchemaLike,
|
||||
options?: Partial<CreateTableOptions> | undefined,
|
||||
): Promise<Table> {
|
||||
if (options?.mode) {
|
||||
console.warn(`mode is not supported on LanceDB Cloud`);
|
||||
}
|
||||
|
||||
if (options?.embeddingFunction) {
|
||||
console.warn(
|
||||
"embeddingFunction is not yet supported on LanceDB Cloud.",
|
||||
"Please vote https://github.com/lancedb/lancedb/issues/626 ",
|
||||
"for this feature.",
|
||||
);
|
||||
}
|
||||
const emptyTable = makeEmptyTable(schema);
|
||||
const buf = await fromTableToStreamBuffer(emptyTable);
|
||||
|
||||
await this.#client.post(
|
||||
`/v1/table/${encodeURIComponent(name)}/create/`,
|
||||
buf,
|
||||
{
|
||||
config: {
|
||||
responseType: "arraybuffer",
|
||||
},
|
||||
headers: { "Content-Type": "application/vnd.apache.arrow.stream" },
|
||||
},
|
||||
);
|
||||
|
||||
this.#tableCache.set(name, true);
|
||||
return new RemoteTable(this.#client, name, this.#dbName);
|
||||
}
|
||||
|
||||
async dropTable(name: string): Promise<void> {
|
||||
await this.#client.post(`/v1/table/${encodeURIComponent(name)}/drop/`);
|
||||
|
||||
this.#tableCache.delete(name);
|
||||
}
|
||||
}
|
||||
@@ -1,3 +0,0 @@
|
||||
export { RestfulLanceDBClient } from "./client";
|
||||
export { type RemoteConnectionOptions, RemoteConnection } from "./connection";
|
||||
export { RemoteTable } from "./table";
|
||||
@@ -1,226 +0,0 @@
|
||||
// Copyright 2023 LanceDB Developers.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
import { Table as ArrowTable } from "apache-arrow";
|
||||
|
||||
import { Data, IntoVector } from "../arrow";
|
||||
|
||||
import { IndexStatistics } from "..";
|
||||
import { CreateTableOptions } from "../connection";
|
||||
import { IndexOptions } from "../indices";
|
||||
import { MergeInsertBuilder } from "../merge";
|
||||
import { VectorQuery } from "../query";
|
||||
import { AddDataOptions, Table, UpdateOptions } from "../table";
|
||||
import { IntoSql, toSQL } from "../util";
|
||||
import { RestfulLanceDBClient } from "./client";
|
||||
|
||||
export class RemoteTable extends Table {
|
||||
#client: RestfulLanceDBClient;
|
||||
#name: string;
|
||||
|
||||
// Used in the display() method
|
||||
#dbName: string;
|
||||
|
||||
get #tablePrefix() {
|
||||
return `/v1/table/${encodeURIComponent(this.#name)}/`;
|
||||
}
|
||||
|
||||
get name(): string {
|
||||
return this.#name;
|
||||
}
|
||||
|
||||
public constructor(
|
||||
client: RestfulLanceDBClient,
|
||||
tableName: string,
|
||||
dbName: string,
|
||||
) {
|
||||
super();
|
||||
this.#client = client;
|
||||
this.#name = tableName;
|
||||
this.#dbName = dbName;
|
||||
}
|
||||
|
||||
isOpen(): boolean {
|
||||
return !this.#client.isOpen();
|
||||
}
|
||||
|
||||
close(): void {
|
||||
this.#client.close();
|
||||
}
|
||||
|
||||
display(): string {
|
||||
return `RemoteTable(${this.#dbName}; ${this.#name})`;
|
||||
}
|
||||
|
||||
async schema(): Promise<import("apache-arrow").Schema> {
|
||||
const resp = await this.#client.post(`${this.#tablePrefix}/describe/`);
|
||||
// TODO: parse this into a valid arrow schema
|
||||
return resp.schema;
|
||||
}
|
||||
async add(data: Data, options?: Partial<AddDataOptions>): Promise<void> {
|
||||
const { buf, mode } = await Table.parseTableData(
|
||||
data,
|
||||
options as CreateTableOptions,
|
||||
true,
|
||||
);
|
||||
await this.#client.post(`${this.#tablePrefix}/insert/`, buf, {
|
||||
params: {
|
||||
mode,
|
||||
},
|
||||
headers: {
|
||||
"Content-Type": "application/vnd.apache.arrow.stream",
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
async update(
|
||||
optsOrUpdates:
|
||||
| (Map<string, string> | Record<string, string>)
|
||||
| ({
|
||||
values: Map<string, IntoSql> | Record<string, IntoSql>;
|
||||
} & Partial<UpdateOptions>)
|
||||
| ({
|
||||
valuesSql: Map<string, string> | Record<string, string>;
|
||||
} & Partial<UpdateOptions>),
|
||||
options?: Partial<UpdateOptions>,
|
||||
): Promise<void> {
|
||||
const isValues =
|
||||
"values" in optsOrUpdates && typeof optsOrUpdates.values !== "string";
|
||||
const isValuesSql =
|
||||
"valuesSql" in optsOrUpdates &&
|
||||
typeof optsOrUpdates.valuesSql !== "string";
|
||||
const isMap = (obj: unknown): obj is Map<string, string> => {
|
||||
return obj instanceof Map;
|
||||
};
|
||||
|
||||
let predicate;
|
||||
let columns: [string, string][];
|
||||
switch (true) {
|
||||
case isMap(optsOrUpdates):
|
||||
columns = Array.from(optsOrUpdates.entries());
|
||||
predicate = options?.where;
|
||||
break;
|
||||
case isValues && isMap(optsOrUpdates.values):
|
||||
columns = Array.from(optsOrUpdates.values.entries()).map(([k, v]) => [
|
||||
k,
|
||||
toSQL(v),
|
||||
]);
|
||||
predicate = optsOrUpdates.where;
|
||||
break;
|
||||
case isValues && !isMap(optsOrUpdates.values):
|
||||
columns = Object.entries(optsOrUpdates.values).map(([k, v]) => [
|
||||
k,
|
||||
toSQL(v),
|
||||
]);
|
||||
predicate = optsOrUpdates.where;
|
||||
break;
|
||||
|
||||
case isValuesSql && isMap(optsOrUpdates.valuesSql):
|
||||
columns = Array.from(optsOrUpdates.valuesSql.entries());
|
||||
predicate = optsOrUpdates.where;
|
||||
break;
|
||||
case isValuesSql && !isMap(optsOrUpdates.valuesSql):
|
||||
columns = Object.entries(optsOrUpdates.valuesSql).map(([k, v]) => [
|
||||
k,
|
||||
v,
|
||||
]);
|
||||
predicate = optsOrUpdates.where;
|
||||
break;
|
||||
default:
|
||||
columns = Object.entries(optsOrUpdates as Record<string, string>);
|
||||
predicate = options?.where;
|
||||
}
|
||||
|
||||
await this.#client.post(`${this.#tablePrefix}/update/`, {
|
||||
predicate: predicate ?? null,
|
||||
updates: columns,
|
||||
});
|
||||
}
|
||||
async countRows(filter?: unknown): Promise<number> {
|
||||
const payload = { predicate: filter };
|
||||
return await this.#client.post(`${this.#tablePrefix}/count_rows/`, payload);
|
||||
}
|
||||
|
||||
async delete(predicate: unknown): Promise<void> {
|
||||
const payload = { predicate };
|
||||
await this.#client.post(`${this.#tablePrefix}/delete/`, payload);
|
||||
}
|
||||
async createIndex(
|
||||
column: string,
|
||||
options?: Partial<IndexOptions>,
|
||||
): Promise<void> {
|
||||
if (options !== undefined) {
|
||||
console.warn("options are not yet supported on the LanceDB cloud");
|
||||
}
|
||||
const indexType = "vector";
|
||||
const metric = "L2";
|
||||
const data = {
|
||||
column,
|
||||
// biome-ignore lint/style/useNamingConvention: external API
|
||||
index_type: indexType,
|
||||
// biome-ignore lint/style/useNamingConvention: external API
|
||||
metric_type: metric,
|
||||
};
|
||||
await this.#client.post(`${this.#tablePrefix}/create_index`, data);
|
||||
}
|
||||
query(): import("..").Query {
|
||||
throw new Error("query() is not yet supported on the LanceDB cloud");
|
||||
}
|
||||
|
||||
search(_query: string | IntoVector): VectorQuery {
|
||||
throw new Error("search() is not yet supported on the LanceDB cloud");
|
||||
}
|
||||
vectorSearch(_vector: unknown): import("..").VectorQuery {
|
||||
throw new Error("vectorSearch() is not yet supported on the LanceDB cloud");
|
||||
}
|
||||
addColumns(_newColumnTransforms: unknown): Promise<void> {
|
||||
throw new Error("addColumns() is not yet supported on the LanceDB cloud");
|
||||
}
|
||||
alterColumns(_columnAlterations: unknown): Promise<void> {
|
||||
throw new Error("alterColumns() is not yet supported on the LanceDB cloud");
|
||||
}
|
||||
dropColumns(_columnNames: unknown): Promise<void> {
|
||||
throw new Error("dropColumns() is not yet supported on the LanceDB cloud");
|
||||
}
|
||||
async version(): Promise<number> {
|
||||
const resp = await this.#client.post(`${this.#tablePrefix}/describe/`);
|
||||
return resp.version;
|
||||
}
|
||||
checkout(_version: unknown): Promise<void> {
|
||||
throw new Error("checkout() is not yet supported on the LanceDB cloud");
|
||||
}
|
||||
checkoutLatest(): Promise<void> {
|
||||
throw new Error(
|
||||
"checkoutLatest() is not yet supported on the LanceDB cloud",
|
||||
);
|
||||
}
|
||||
restore(): Promise<void> {
|
||||
throw new Error("restore() is not yet supported on the LanceDB cloud");
|
||||
}
|
||||
optimize(_options?: unknown): Promise<import("../native").OptimizeStats> {
|
||||
throw new Error("optimize() is not yet supported on the LanceDB cloud");
|
||||
}
|
||||
async listIndices(): Promise<import("../native").IndexConfig[]> {
|
||||
return await this.#client.post(`${this.#tablePrefix}/index/list/`);
|
||||
}
|
||||
toArrow(): Promise<ArrowTable> {
|
||||
throw new Error("toArrow() is not yet supported on the LanceDB cloud");
|
||||
}
|
||||
mergeInsert(_on: string | string[]): MergeInsertBuilder {
|
||||
throw new Error("mergeInsert() is not yet supported on the LanceDB cloud");
|
||||
}
|
||||
async indexStats(_name: string): Promise<IndexStatistics | undefined> {
|
||||
throw new Error("indexStats() is not yet supported on the LanceDB cloud");
|
||||
}
|
||||
}
|
||||
97
nodejs/package-lock.json
generated
97
nodejs/package-lock.json
generated
@@ -1,12 +1,12 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb",
|
||||
"version": "0.10.0-beta.1",
|
||||
"version": "0.11.0-beta.1",
|
||||
"lockfileVersion": 3,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "@lancedb/lancedb",
|
||||
"version": "0.10.0-beta.1",
|
||||
"version": "0.11.0-beta.1",
|
||||
"cpu": [
|
||||
"x64",
|
||||
"arm64"
|
||||
@@ -18,7 +18,6 @@
|
||||
"win32"
|
||||
],
|
||||
"dependencies": {
|
||||
"axios": "^1.7.2",
|
||||
"reflect-metadata": "^0.2.2"
|
||||
},
|
||||
"devDependencies": {
|
||||
@@ -30,6 +29,7 @@
|
||||
"@napi-rs/cli": "^2.18.3",
|
||||
"@types/axios": "^0.14.0",
|
||||
"@types/jest": "^29.1.2",
|
||||
"@types/node": "^22.7.4",
|
||||
"@types/tmp": "^0.2.6",
|
||||
"apache-arrow-13": "npm:apache-arrow@13.0.0",
|
||||
"apache-arrow-14": "npm:apache-arrow@14.0.0",
|
||||
@@ -4648,11 +4648,12 @@
|
||||
"optional": true
|
||||
},
|
||||
"node_modules/@types/node": {
|
||||
"version": "20.14.11",
|
||||
"resolved": "https://registry.npmjs.org/@types/node/-/node-20.14.11.tgz",
|
||||
"integrity": "sha512-kprQpL8MMeszbz6ojB5/tU8PLN4kesnN8Gjzw349rDlNgsSzg90lAVj3llK99Dh7JON+t9AuscPPFW6mPbTnSA==",
|
||||
"version": "22.7.4",
|
||||
"resolved": "https://registry.npmjs.org/@types/node/-/node-22.7.4.tgz",
|
||||
"integrity": "sha512-y+NPi1rFzDs1NdQHHToqeiX2TIS79SWEAw9GYhkkx8bD0ChpfqC+n2j5OXOCpzfojBEBt6DnEnnG9MY0zk1XLg==",
|
||||
"devOptional": true,
|
||||
"dependencies": {
|
||||
"undici-types": "~5.26.4"
|
||||
"undici-types": "~6.19.2"
|
||||
}
|
||||
},
|
||||
"node_modules/@types/node-fetch": {
|
||||
@@ -4665,6 +4666,12 @@
|
||||
"form-data": "^4.0.0"
|
||||
}
|
||||
},
|
||||
"node_modules/@types/node/node_modules/undici-types": {
|
||||
"version": "6.19.8",
|
||||
"resolved": "https://registry.npmjs.org/undici-types/-/undici-types-6.19.8.tgz",
|
||||
"integrity": "sha512-ve2KP6f/JnbPBFyobGHuerC9g1FYGn/F8n1LWTwNxCEzd6IfqTwUQcNXgEtmmQ6DlRrC1hrSrBnCZPokRrDHjw==",
|
||||
"devOptional": true
|
||||
},
|
||||
"node_modules/@types/pad-left": {
|
||||
"version": "2.1.1",
|
||||
"resolved": "https://registry.npmjs.org/@types/pad-left/-/pad-left-2.1.1.tgz",
|
||||
@@ -4963,6 +4970,21 @@
|
||||
"arrow2csv": "bin/arrow2csv.cjs"
|
||||
}
|
||||
},
|
||||
"node_modules/apache-arrow-15/node_modules/@types/node": {
|
||||
"version": "20.16.10",
|
||||
"resolved": "https://registry.npmjs.org/@types/node/-/node-20.16.10.tgz",
|
||||
"integrity": "sha512-vQUKgWTjEIRFCvK6CyriPH3MZYiYlNy0fKiEYHWbcoWLEgs4opurGGKlebrTLqdSMIbXImH6XExNiIyNUv3WpA==",
|
||||
"dev": true,
|
||||
"dependencies": {
|
||||
"undici-types": "~6.19.2"
|
||||
}
|
||||
},
|
||||
"node_modules/apache-arrow-15/node_modules/undici-types": {
|
||||
"version": "6.19.8",
|
||||
"resolved": "https://registry.npmjs.org/undici-types/-/undici-types-6.19.8.tgz",
|
||||
"integrity": "sha512-ve2KP6f/JnbPBFyobGHuerC9g1FYGn/F8n1LWTwNxCEzd6IfqTwUQcNXgEtmmQ6DlRrC1hrSrBnCZPokRrDHjw==",
|
||||
"dev": true
|
||||
},
|
||||
"node_modules/apache-arrow-16": {
|
||||
"name": "apache-arrow",
|
||||
"version": "16.0.0",
|
||||
@@ -4984,6 +5006,21 @@
|
||||
"arrow2csv": "bin/arrow2csv.cjs"
|
||||
}
|
||||
},
|
||||
"node_modules/apache-arrow-16/node_modules/@types/node": {
|
||||
"version": "20.16.10",
|
||||
"resolved": "https://registry.npmjs.org/@types/node/-/node-20.16.10.tgz",
|
||||
"integrity": "sha512-vQUKgWTjEIRFCvK6CyriPH3MZYiYlNy0fKiEYHWbcoWLEgs4opurGGKlebrTLqdSMIbXImH6XExNiIyNUv3WpA==",
|
||||
"dev": true,
|
||||
"dependencies": {
|
||||
"undici-types": "~6.19.2"
|
||||
}
|
||||
},
|
||||
"node_modules/apache-arrow-16/node_modules/undici-types": {
|
||||
"version": "6.19.8",
|
||||
"resolved": "https://registry.npmjs.org/undici-types/-/undici-types-6.19.8.tgz",
|
||||
"integrity": "sha512-ve2KP6f/JnbPBFyobGHuerC9g1FYGn/F8n1LWTwNxCEzd6IfqTwUQcNXgEtmmQ6DlRrC1hrSrBnCZPokRrDHjw==",
|
||||
"dev": true
|
||||
},
|
||||
"node_modules/apache-arrow-17": {
|
||||
"name": "apache-arrow",
|
||||
"version": "17.0.0",
|
||||
@@ -5011,12 +5048,42 @@
|
||||
"integrity": "sha512-BwR5KP3Es/CSht0xqBcUXS3qCAUVXwpRKsV2+arxeb65atasuXG9LykC9Ab10Cw3s2raH92ZqOeILaQbsB2ACg==",
|
||||
"dev": true
|
||||
},
|
||||
"node_modules/apache-arrow-17/node_modules/@types/node": {
|
||||
"version": "20.16.10",
|
||||
"resolved": "https://registry.npmjs.org/@types/node/-/node-20.16.10.tgz",
|
||||
"integrity": "sha512-vQUKgWTjEIRFCvK6CyriPH3MZYiYlNy0fKiEYHWbcoWLEgs4opurGGKlebrTLqdSMIbXImH6XExNiIyNUv3WpA==",
|
||||
"dev": true,
|
||||
"dependencies": {
|
||||
"undici-types": "~6.19.2"
|
||||
}
|
||||
},
|
||||
"node_modules/apache-arrow-17/node_modules/flatbuffers": {
|
||||
"version": "24.3.25",
|
||||
"resolved": "https://registry.npmjs.org/flatbuffers/-/flatbuffers-24.3.25.tgz",
|
||||
"integrity": "sha512-3HDgPbgiwWMI9zVB7VYBHaMrbOO7Gm0v+yD2FV/sCKj+9NDeVL7BOBYUuhWAQGKWOzBo8S9WdMvV0eixO233XQ==",
|
||||
"dev": true
|
||||
},
|
||||
"node_modules/apache-arrow-17/node_modules/undici-types": {
|
||||
"version": "6.19.8",
|
||||
"resolved": "https://registry.npmjs.org/undici-types/-/undici-types-6.19.8.tgz",
|
||||
"integrity": "sha512-ve2KP6f/JnbPBFyobGHuerC9g1FYGn/F8n1LWTwNxCEzd6IfqTwUQcNXgEtmmQ6DlRrC1hrSrBnCZPokRrDHjw==",
|
||||
"dev": true
|
||||
},
|
||||
"node_modules/apache-arrow/node_modules/@types/node": {
|
||||
"version": "20.16.10",
|
||||
"resolved": "https://registry.npmjs.org/@types/node/-/node-20.16.10.tgz",
|
||||
"integrity": "sha512-vQUKgWTjEIRFCvK6CyriPH3MZYiYlNy0fKiEYHWbcoWLEgs4opurGGKlebrTLqdSMIbXImH6XExNiIyNUv3WpA==",
|
||||
"peer": true,
|
||||
"dependencies": {
|
||||
"undici-types": "~6.19.2"
|
||||
}
|
||||
},
|
||||
"node_modules/apache-arrow/node_modules/undici-types": {
|
||||
"version": "6.19.8",
|
||||
"resolved": "https://registry.npmjs.org/undici-types/-/undici-types-6.19.8.tgz",
|
||||
"integrity": "sha512-ve2KP6f/JnbPBFyobGHuerC9g1FYGn/F8n1LWTwNxCEzd6IfqTwUQcNXgEtmmQ6DlRrC1hrSrBnCZPokRrDHjw==",
|
||||
"peer": true
|
||||
},
|
||||
"node_modules/argparse": {
|
||||
"version": "1.0.10",
|
||||
"resolved": "https://registry.npmjs.org/argparse/-/argparse-1.0.10.tgz",
|
||||
@@ -5046,12 +5113,14 @@
|
||||
"node_modules/asynckit": {
|
||||
"version": "0.4.0",
|
||||
"resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz",
|
||||
"integrity": "sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q=="
|
||||
"integrity": "sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q==",
|
||||
"devOptional": true
|
||||
},
|
||||
"node_modules/axios": {
|
||||
"version": "1.7.2",
|
||||
"resolved": "https://registry.npmjs.org/axios/-/axios-1.7.2.tgz",
|
||||
"integrity": "sha512-2A8QhOMrbomlDuiLeK9XibIBzuHeRcqqNOHp0Cyp5EoJ1IFDh+XZH3A6BkXtv0K4gFGCI0Y4BM7B1wOEi0Rmgw==",
|
||||
"dev": true,
|
||||
"dependencies": {
|
||||
"follow-redirects": "^1.15.6",
|
||||
"form-data": "^4.0.0",
|
||||
@@ -5536,6 +5605,7 @@
|
||||
"version": "1.0.8",
|
||||
"resolved": "https://registry.npmjs.org/combined-stream/-/combined-stream-1.0.8.tgz",
|
||||
"integrity": "sha512-FQN4MRfuJeHf7cBbBMJFXhKSDq+2kAArBlmRBvcvFE5BB1HZKXtSFASDhdlz9zOYwxh8lDdnvmMOe/+5cdoEdg==",
|
||||
"devOptional": true,
|
||||
"dependencies": {
|
||||
"delayed-stream": "~1.0.0"
|
||||
},
|
||||
@@ -5723,6 +5793,7 @@
|
||||
"version": "1.0.0",
|
||||
"resolved": "https://registry.npmjs.org/delayed-stream/-/delayed-stream-1.0.0.tgz",
|
||||
"integrity": "sha512-ZySD7Nf91aLB0RxL4KGrKHBXl7Eds1DAmEdcoVawXnLD7SDhpNgtuII2aAkg7a7QS41jxPSZ17p4VdGnMHk3MQ==",
|
||||
"devOptional": true,
|
||||
"engines": {
|
||||
"node": ">=0.4.0"
|
||||
}
|
||||
@@ -6248,6 +6319,7 @@
|
||||
"version": "1.15.6",
|
||||
"resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.15.6.tgz",
|
||||
"integrity": "sha512-wWN62YITEaOpSK584EZXJafH1AGpO8RVgElfkuXbTOrPX4fIfOyEpW/CsiNd8JdYrAoOvafRTOEnvsO++qCqFA==",
|
||||
"dev": true,
|
||||
"funding": [
|
||||
{
|
||||
"type": "individual",
|
||||
@@ -6267,6 +6339,7 @@
|
||||
"version": "4.0.0",
|
||||
"resolved": "https://registry.npmjs.org/form-data/-/form-data-4.0.0.tgz",
|
||||
"integrity": "sha512-ETEklSGi5t0QMZuiXoA/Q6vcnxcLQP5vdugSpuAyi6SVGi2clPPp+xgEhuMaHC+zGgn31Kd235W35f7Hykkaww==",
|
||||
"devOptional": true,
|
||||
"dependencies": {
|
||||
"asynckit": "^0.4.0",
|
||||
"combined-stream": "^1.0.8",
|
||||
@@ -7773,6 +7846,7 @@
|
||||
"version": "1.52.0",
|
||||
"resolved": "https://registry.npmjs.org/mime-db/-/mime-db-1.52.0.tgz",
|
||||
"integrity": "sha512-sPU4uV7dYlvtWJxwwxHD0PuihVNiE7TyAbQ5SWxDCB9mUYvOgroQOwYQQOKPJ8CIbE+1ETVlOoK1UC2nU3gYvg==",
|
||||
"devOptional": true,
|
||||
"engines": {
|
||||
"node": ">= 0.6"
|
||||
}
|
||||
@@ -7781,6 +7855,7 @@
|
||||
"version": "2.1.35",
|
||||
"resolved": "https://registry.npmjs.org/mime-types/-/mime-types-2.1.35.tgz",
|
||||
"integrity": "sha512-ZDY+bPm5zTTF+YpCrAU9nK0UgICYPT0QtT1NZWFv4s++TNkcgVaT0g6+4R2uI4MjQjzysHB1zxuWL50hzaeXiw==",
|
||||
"devOptional": true,
|
||||
"dependencies": {
|
||||
"mime-db": "1.52.0"
|
||||
},
|
||||
@@ -8393,7 +8468,8 @@
|
||||
"node_modules/proxy-from-env": {
|
||||
"version": "1.1.0",
|
||||
"resolved": "https://registry.npmjs.org/proxy-from-env/-/proxy-from-env-1.1.0.tgz",
|
||||
"integrity": "sha512-D+zkORCbA9f1tdWRK0RaCR3GPv50cMxcrz4X8k5LTSUD1Dkw47mKJEZQNunItRTkWwgtaUSo1RVFRIG9ZXiFYg=="
|
||||
"integrity": "sha512-D+zkORCbA9f1tdWRK0RaCR3GPv50cMxcrz4X8k5LTSUD1Dkw47mKJEZQNunItRTkWwgtaUSo1RVFRIG9ZXiFYg==",
|
||||
"dev": true
|
||||
},
|
||||
"node_modules/pump": {
|
||||
"version": "3.0.0",
|
||||
@@ -9561,7 +9637,8 @@
|
||||
"node_modules/undici-types": {
|
||||
"version": "5.26.5",
|
||||
"resolved": "https://registry.npmjs.org/undici-types/-/undici-types-5.26.5.tgz",
|
||||
"integrity": "sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA=="
|
||||
"integrity": "sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA==",
|
||||
"optional": true
|
||||
},
|
||||
"node_modules/update-browserslist-db": {
|
||||
"version": "1.0.13",
|
||||
|
||||
@@ -40,6 +40,7 @@
|
||||
"@napi-rs/cli": "^2.18.3",
|
||||
"@types/axios": "^0.14.0",
|
||||
"@types/jest": "^29.1.2",
|
||||
"@types/node": "^22.7.4",
|
||||
"@types/tmp": "^0.2.6",
|
||||
"apache-arrow-13": "npm:apache-arrow@13.0.0",
|
||||
"apache-arrow-14": "npm:apache-arrow@14.0.0",
|
||||
@@ -81,7 +82,6 @@
|
||||
"version": "napi version"
|
||||
},
|
||||
"dependencies": {
|
||||
"axios": "^1.7.2",
|
||||
"reflect-metadata": "^0.2.2"
|
||||
},
|
||||
"optionalDependencies": {
|
||||
|
||||
@@ -68,6 +68,24 @@ impl Connection {
|
||||
builder = builder.storage_option(key, value);
|
||||
}
|
||||
}
|
||||
|
||||
let client_config = options.client_config.unwrap_or_default();
|
||||
builder = builder.client_config(client_config.into());
|
||||
|
||||
if let Some(api_key) = options.api_key {
|
||||
builder = builder.api_key(&api_key);
|
||||
}
|
||||
|
||||
if let Some(region) = options.region {
|
||||
builder = builder.region(®ion);
|
||||
} else {
|
||||
builder = builder.region("us-east-1");
|
||||
}
|
||||
|
||||
if let Some(host_override) = options.host_override {
|
||||
builder = builder.host_override(&host_override);
|
||||
}
|
||||
|
||||
Ok(Self::inner_new(
|
||||
builder
|
||||
.execute()
|
||||
|
||||
@@ -22,6 +22,7 @@ mod index;
|
||||
mod iterator;
|
||||
pub mod merge;
|
||||
mod query;
|
||||
pub mod remote;
|
||||
mod table;
|
||||
mod util;
|
||||
|
||||
@@ -42,6 +43,19 @@ pub struct ConnectionOptions {
|
||||
///
|
||||
/// The available options are described at https://lancedb.github.io/lancedb/guides/storage/
|
||||
pub storage_options: Option<HashMap<String, String>>,
|
||||
|
||||
/// (For LanceDB cloud only): configuration for the remote HTTP client.
|
||||
pub client_config: Option<remote::ClientConfig>,
|
||||
/// (For LanceDB cloud only): the API key to use with LanceDB Cloud.
|
||||
///
|
||||
/// Can also be set via the environment variable `LANCEDB_API_KEY`.
|
||||
pub api_key: Option<String>,
|
||||
/// (For LanceDB cloud only): the region to use for LanceDB cloud.
|
||||
/// Defaults to 'us-east-1'.
|
||||
pub region: Option<String>,
|
||||
/// (For LanceDB cloud only): the host to use for LanceDB cloud. Used
|
||||
/// for testing purposes.
|
||||
pub host_override: Option<String>,
|
||||
}
|
||||
|
||||
/// Write mode for writing a table.
|
||||
|
||||
120
nodejs/src/remote.rs
Normal file
120
nodejs/src/remote.rs
Normal file
@@ -0,0 +1,120 @@
|
||||
// Copyright 2024 Lance Developers.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use napi_derive::*;
|
||||
|
||||
/// Timeout configuration for remote HTTP client.
|
||||
#[napi(object)]
|
||||
#[derive(Debug)]
|
||||
pub struct TimeoutConfig {
|
||||
/// The timeout for establishing a connection in seconds. Default is 120
|
||||
/// seconds (2 minutes). This can also be set via the environment variable
|
||||
/// `LANCE_CLIENT_CONNECT_TIMEOUT`, as an integer number of seconds.
|
||||
pub connect_timeout: Option<f64>,
|
||||
/// The timeout for reading data from the server in seconds. Default is 300
|
||||
/// seconds (5 minutes). This can also be set via the environment variable
|
||||
/// `LANCE_CLIENT_READ_TIMEOUT`, as an integer number of seconds.
|
||||
pub read_timeout: Option<f64>,
|
||||
/// The timeout for keeping idle connections in the connection pool in seconds.
|
||||
/// Default is 300 seconds (5 minutes). This can also be set via the
|
||||
/// environment variable `LANCE_CLIENT_CONNECTION_TIMEOUT`, as an integer
|
||||
/// number of seconds.
|
||||
pub pool_idle_timeout: Option<f64>,
|
||||
}
|
||||
|
||||
/// Retry configuration for the remote HTTP client.
|
||||
#[napi(object)]
|
||||
#[derive(Debug)]
|
||||
pub struct RetryConfig {
|
||||
/// The maximum number of retries for a request. Default is 3. You can also
|
||||
/// set this via the environment variable `LANCE_CLIENT_MAX_RETRIES`.
|
||||
pub retries: Option<u8>,
|
||||
/// The maximum number of retries for connection errors. Default is 3. You
|
||||
/// can also set this via the environment variable `LANCE_CLIENT_CONNECT_RETRIES`.
|
||||
pub connect_retries: Option<u8>,
|
||||
/// The maximum number of retries for read errors. Default is 3. You can also
|
||||
/// set this via the environment variable `LANCE_CLIENT_READ_RETRIES`.
|
||||
pub read_retries: Option<u8>,
|
||||
/// The backoff factor to apply between retries. Default is 0.25. Between each retry
|
||||
/// the client will wait for the amount of seconds:
|
||||
/// `{backoff factor} * (2 ** ({number of previous retries}))`. So for the default
|
||||
/// of 0.25, the first retry will wait 0.25 seconds, the second retry will wait 0.5
|
||||
/// seconds, the third retry will wait 1 second, etc.
|
||||
///
|
||||
/// You can also set this via the environment variable
|
||||
/// `LANCE_CLIENT_RETRY_BACKOFF_FACTOR`.
|
||||
pub backoff_factor: Option<f64>,
|
||||
/// The jitter to apply to the backoff factor, in seconds. Default is 0.25.
|
||||
///
|
||||
/// A random value between 0 and `backoff_jitter` will be added to the backoff
|
||||
/// factor in seconds. So for the default of 0.25 seconds, between 0 and 250
|
||||
/// milliseconds will be added to the sleep between each retry.
|
||||
///
|
||||
/// You can also set this via the environment variable
|
||||
/// `LANCE_CLIENT_RETRY_BACKOFF_JITTER`.
|
||||
pub backoff_jitter: Option<f64>,
|
||||
/// The HTTP status codes for which to retry the request. Default is
|
||||
/// [429, 500, 502, 503].
|
||||
///
|
||||
/// You can also set this via the environment variable
|
||||
/// `LANCE_CLIENT_RETRY_STATUSES`. Use a comma-separated list of integers.
|
||||
pub statuses: Option<Vec<u16>>,
|
||||
}
|
||||
|
||||
#[napi(object)]
|
||||
#[derive(Debug, Default)]
|
||||
pub struct ClientConfig {
|
||||
pub user_agent: Option<String>,
|
||||
pub retry_config: Option<RetryConfig>,
|
||||
pub timeout_config: Option<TimeoutConfig>,
|
||||
}
|
||||
|
||||
impl From<TimeoutConfig> for lancedb::remote::TimeoutConfig {
|
||||
fn from(config: TimeoutConfig) -> Self {
|
||||
Self {
|
||||
connect_timeout: config
|
||||
.connect_timeout
|
||||
.map(std::time::Duration::from_secs_f64),
|
||||
read_timeout: config.read_timeout.map(std::time::Duration::from_secs_f64),
|
||||
pool_idle_timeout: config
|
||||
.pool_idle_timeout
|
||||
.map(std::time::Duration::from_secs_f64),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<RetryConfig> for lancedb::remote::RetryConfig {
|
||||
fn from(config: RetryConfig) -> Self {
|
||||
Self {
|
||||
retries: config.retries,
|
||||
connect_retries: config.connect_retries,
|
||||
read_retries: config.read_retries,
|
||||
backoff_factor: config.backoff_factor.map(|v| v as f32),
|
||||
backoff_jitter: config.backoff_jitter.map(|v| v as f32),
|
||||
statuses: config.statuses,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ClientConfig> for lancedb::remote::ClientConfig {
|
||||
fn from(config: ClientConfig) -> Self {
|
||||
Self {
|
||||
user_agent: config
|
||||
.user_agent
|
||||
.unwrap_or(concat!("LanceDB-Node-Client/", env!("CARGO_PKG_VERSION")).to_string()),
|
||||
retry_config: config.retry_config.map(Into::into).unwrap_or_default(),
|
||||
timeout_config: config.timeout_config.map(Into::into).unwrap_or_default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,5 @@
|
||||
[tool.bumpversion]
|
||||
current_version = "0.14.0-beta.0"
|
||||
current_version = "0.14.0"
|
||||
parse = """(?x)
|
||||
(?P<major>0|[1-9]\\d*)\\.
|
||||
(?P<minor>0|[1-9]\\d*)\\.
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "lancedb-python"
|
||||
version = "0.14.0-beta.0"
|
||||
version = "0.14.0"
|
||||
edition.workspace = true
|
||||
description = "Python bindings for LanceDB"
|
||||
license.workspace = true
|
||||
|
||||
@@ -3,7 +3,7 @@ name = "lancedb"
|
||||
# version in Cargo.toml
|
||||
dependencies = [
|
||||
"deprecation",
|
||||
"pylance==0.18.0",
|
||||
"pylance==0.18.2",
|
||||
"requests>=2.31.0",
|
||||
"retry>=0.9.2",
|
||||
"tqdm>=4.27.0",
|
||||
|
||||
@@ -20,7 +20,7 @@ from .util import safe_import_pandas
|
||||
|
||||
pd = safe_import_pandas()
|
||||
|
||||
DATA = Union[List[dict], dict, "pd.DataFrame", pa.Table, Iterable[pa.RecordBatch]]
|
||||
DATA = Union[List[dict], "pd.DataFrame", pa.Table, Iterable[pa.RecordBatch]]
|
||||
VEC = Union[list, np.ndarray, pa.Array, pa.ChunkedArray]
|
||||
URI = Union[str, Path]
|
||||
VECTOR_COLUMN_NAME = "vector"
|
||||
|
||||
@@ -96,7 +96,7 @@ class DBConnection(EnforceOverrides):
|
||||
User must provide at least one of `data` or `schema`.
|
||||
Acceptable types are:
|
||||
|
||||
- dict or list-of-dict
|
||||
- list-of-dict
|
||||
|
||||
- pandas.DataFrame
|
||||
|
||||
@@ -579,7 +579,7 @@ class AsyncConnection(object):
|
||||
User must provide at least one of `data` or `schema`.
|
||||
Acceptable types are:
|
||||
|
||||
- dict or list-of-dict
|
||||
- list-of-dict
|
||||
|
||||
- pandas.DataFrame
|
||||
|
||||
|
||||
@@ -103,19 +103,29 @@ class RestfulLanceDBClient:
|
||||
|
||||
@staticmethod
|
||||
def _check_status(resp: requests.Response):
|
||||
# Leaving request id empty for now, as we'll be replacing this impl
|
||||
# with the Rust one shortly.
|
||||
if resp.status_code == 404:
|
||||
raise LanceDBClientError(f"Not found: {resp.text}")
|
||||
raise LanceDBClientError(
|
||||
f"Not found: {resp.text}", request_id="", status_code=404
|
||||
)
|
||||
elif 400 <= resp.status_code < 500:
|
||||
raise LanceDBClientError(
|
||||
f"Bad Request: {resp.status_code}, error: {resp.text}"
|
||||
f"Bad Request: {resp.status_code}, error: {resp.text}",
|
||||
request_id="",
|
||||
status_code=resp.status_code,
|
||||
)
|
||||
elif 500 <= resp.status_code < 600:
|
||||
raise LanceDBClientError(
|
||||
f"Internal Server Error: {resp.status_code}, error: {resp.text}"
|
||||
f"Internal Server Error: {resp.status_code}, error: {resp.text}",
|
||||
request_id="",
|
||||
status_code=resp.status_code,
|
||||
)
|
||||
elif resp.status_code != 200:
|
||||
raise LanceDBClientError(
|
||||
f"Unknown Error: {resp.status_code}, error: {resp.text}"
|
||||
f"Unknown Error: {resp.status_code}, error: {resp.text}",
|
||||
request_id="",
|
||||
status_code=resp.status_code,
|
||||
)
|
||||
|
||||
@_check_not_closed
|
||||
|
||||
@@ -12,5 +12,102 @@
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
from typing import Optional
|
||||
|
||||
|
||||
class LanceDBClientError(RuntimeError):
|
||||
"""An error that occurred in the LanceDB client.
|
||||
|
||||
Attributes
|
||||
----------
|
||||
message: str
|
||||
The error message.
|
||||
request_id: str
|
||||
The id of the request that failed. This can be provided in error reports
|
||||
to help diagnose the issue.
|
||||
status_code: int
|
||||
The HTTP status code of the response. May be None if the request
|
||||
failed before the response was received.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self, message: str, request_id: str, status_code: Optional[int] = None
|
||||
):
|
||||
super().__init__(message)
|
||||
self.request_id = request_id
|
||||
self.status_code = status_code
|
||||
|
||||
|
||||
class HttpError(LanceDBClientError):
|
||||
"""An error that occurred during an HTTP request.
|
||||
|
||||
Attributes
|
||||
----------
|
||||
message: str
|
||||
The error message.
|
||||
request_id: str
|
||||
The id of the request that failed. This can be provided in error reports
|
||||
to help diagnose the issue.
|
||||
status_code: int
|
||||
The HTTP status code of the response. May be None if the request
|
||||
failed before the response was received.
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class RetryError(LanceDBClientError):
|
||||
"""An error that occurs when the client has exceeded the maximum number of retries.
|
||||
|
||||
The retry strategy can be adjusted by setting the
|
||||
[retry_config](lancedb.remote.ClientConfig.retry_config) in the client
|
||||
configuration. This is passed in the `client_config` argument of
|
||||
[connect](lancedb.connect) and [connect_async](lancedb.connect_async).
|
||||
|
||||
The __cause__ attribute of this exception will be the last exception that
|
||||
caused the retry to fail. It will be an
|
||||
[HttpError][lancedb.remote.errors.HttpError] instance.
|
||||
|
||||
Attributes
|
||||
----------
|
||||
message: str
|
||||
The retry error message, which will describe which retry limit was hit.
|
||||
request_id: str
|
||||
The id of the request that failed. This can be provided in error reports
|
||||
to help diagnose the issue.
|
||||
request_failures: int
|
||||
The number of request failures.
|
||||
connect_failures: int
|
||||
The number of connect failures.
|
||||
read_failures: int
|
||||
The number of read failures.
|
||||
max_request_failures: int
|
||||
The maximum number of request failures.
|
||||
max_connect_failures: int
|
||||
The maximum number of connect failures.
|
||||
max_read_failures: int
|
||||
The maximum number of read failures.
|
||||
status_code: int
|
||||
The HTTP status code of the last response. May be None if the request
|
||||
failed before the response was received.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
message: str,
|
||||
request_id: str,
|
||||
request_failures: int,
|
||||
connect_failures: int,
|
||||
read_failures: int,
|
||||
max_request_failures: int,
|
||||
max_connect_failures: int,
|
||||
max_read_failures: int,
|
||||
status_code: Optional[int],
|
||||
):
|
||||
super().__init__(message, request_id, status_code)
|
||||
self.request_failures = request_failures
|
||||
self.connect_failures = connect_failures
|
||||
self.read_failures = read_failures
|
||||
self.max_request_failures = max_request_failures
|
||||
self.max_connect_failures = max_connect_failures
|
||||
self.max_read_failures = max_read_failures
|
||||
|
||||
@@ -31,7 +31,6 @@ import pyarrow.compute as pc
|
||||
import pyarrow.fs as pa_fs
|
||||
from lance import LanceDataset
|
||||
from lance.dependencies import _check_for_hugging_face
|
||||
from lance.vector import vec_to_table
|
||||
|
||||
from .common import DATA, VEC, VECTOR_COLUMN_NAME
|
||||
from .embeddings import EmbeddingFunctionConfig, EmbeddingFunctionRegistry
|
||||
@@ -87,6 +86,9 @@ def _coerce_to_table(data, schema: Optional[pa.Schema] = None) -> pa.Table:
|
||||
if isinstance(data, LanceModel):
|
||||
raise ValueError("Cannot add a single LanceModel to a table. Use a list.")
|
||||
|
||||
if isinstance(data, dict):
|
||||
raise ValueError("Cannot add a single dictionary to a table. Use a list.")
|
||||
|
||||
if isinstance(data, list):
|
||||
# convert to list of dict if data is a bunch of LanceModels
|
||||
if isinstance(data[0], LanceModel):
|
||||
@@ -98,8 +100,6 @@ def _coerce_to_table(data, schema: Optional[pa.Schema] = None) -> pa.Table:
|
||||
return pa.Table.from_batches(data, schema=schema)
|
||||
else:
|
||||
return pa.Table.from_pylist(data)
|
||||
elif isinstance(data, dict):
|
||||
return vec_to_table(data)
|
||||
elif _check_for_pandas(data) and isinstance(data, pd.DataFrame):
|
||||
# Do not add schema here, since schema may contains the vector column
|
||||
table = pa.Table.from_pandas(data, preserve_index=False)
|
||||
@@ -554,7 +554,7 @@ class Table(ABC):
|
||||
data: DATA
|
||||
The data to insert into the table. Acceptable types are:
|
||||
|
||||
- dict or list-of-dict
|
||||
- list-of-dict
|
||||
|
||||
- pandas.DataFrame
|
||||
|
||||
@@ -1409,7 +1409,7 @@ class LanceTable(Table):
|
||||
|
||||
Parameters
|
||||
----------
|
||||
data: list-of-dict, dict, pd.DataFrame
|
||||
data: list-of-dict, pd.DataFrame
|
||||
The data to insert into the table.
|
||||
mode: str
|
||||
The mode to use when writing the data. Valid values are
|
||||
@@ -2348,7 +2348,7 @@ class AsyncTable:
|
||||
data: DATA
|
||||
The data to insert into the table. Acceptable types are:
|
||||
|
||||
- dict or list-of-dict
|
||||
- list-of-dict
|
||||
|
||||
- pandas.DataFrame
|
||||
|
||||
|
||||
@@ -354,7 +354,7 @@ async def test_create_mode_async(tmp_path):
|
||||
)
|
||||
await db.create_table("test", data=data)
|
||||
|
||||
with pytest.raises(RuntimeError):
|
||||
with pytest.raises(ValueError, match="already exists"):
|
||||
await db.create_table("test", data=data)
|
||||
|
||||
new_data = pd.DataFrame(
|
||||
@@ -382,7 +382,7 @@ async def test_create_exist_ok_async(tmp_path):
|
||||
)
|
||||
tbl = await db.create_table("test", data=data)
|
||||
|
||||
with pytest.raises(RuntimeError):
|
||||
with pytest.raises(ValueError, match="already exists"):
|
||||
await db.create_table("test", data=data)
|
||||
|
||||
# open the table but don't add more rows
|
||||
|
||||
@@ -1,12 +1,14 @@
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
import contextlib
|
||||
import http.server
|
||||
import threading
|
||||
from unittest.mock import MagicMock
|
||||
import uuid
|
||||
|
||||
import lancedb
|
||||
from lancedb.remote.errors import HttpError, RetryError
|
||||
import pyarrow as pa
|
||||
from lancedb.remote.client import VectorQuery, VectorQueryResult
|
||||
import pytest
|
||||
@@ -98,6 +100,33 @@ def make_mock_http_handler(handler):
|
||||
return MockLanceDBHandler
|
||||
|
||||
|
||||
@contextlib.asynccontextmanager
|
||||
async def mock_lancedb_connection(handler):
|
||||
with http.server.HTTPServer(
|
||||
("localhost", 8080), make_mock_http_handler(handler)
|
||||
) as server:
|
||||
handle = threading.Thread(target=server.serve_forever)
|
||||
handle.start()
|
||||
|
||||
db = await lancedb.connect_async(
|
||||
"db://dev",
|
||||
api_key="fake",
|
||||
host_override="http://localhost:8080",
|
||||
client_config={
|
||||
"retry_config": {"retries": 2},
|
||||
"timeout_config": {
|
||||
"connect_timeout": 1,
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
try:
|
||||
yield db
|
||||
finally:
|
||||
server.shutdown()
|
||||
handle.join()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_remote_db():
|
||||
def handler(request):
|
||||
@@ -114,28 +143,50 @@ async def test_async_remote_db():
|
||||
request.end_headers()
|
||||
request.wfile.write(b'{"tables": []}')
|
||||
|
||||
def run_server():
|
||||
with http.server.HTTPServer(
|
||||
("localhost", 8080), make_mock_http_handler(handler)
|
||||
) as server:
|
||||
# we will only make one request
|
||||
server.handle_request()
|
||||
async with mock_lancedb_connection(handler) as db:
|
||||
table_names = await db.table_names()
|
||||
assert table_names == []
|
||||
|
||||
handle = threading.Thread(target=run_server)
|
||||
handle.start()
|
||||
|
||||
db = await lancedb.connect_async(
|
||||
"db://dev",
|
||||
api_key="fake",
|
||||
host_override="http://localhost:8080",
|
||||
client_config={
|
||||
"retry_config": {"retries": 2},
|
||||
"timeout_config": {
|
||||
"connect_timeout": 1,
|
||||
},
|
||||
},
|
||||
)
|
||||
table_names = await db.table_names()
|
||||
assert table_names == []
|
||||
@pytest.mark.asyncio
|
||||
async def test_http_error():
|
||||
request_id_holder = {"request_id": None}
|
||||
|
||||
handle.join()
|
||||
def handler(request):
|
||||
request_id_holder["request_id"] = request.headers["x-request-id"]
|
||||
|
||||
request.send_response(507)
|
||||
request.end_headers()
|
||||
request.wfile.write(b"Internal Server Error")
|
||||
|
||||
async with mock_lancedb_connection(handler) as db:
|
||||
with pytest.raises(HttpError, match="Internal Server Error") as exc_info:
|
||||
await db.table_names()
|
||||
|
||||
assert exc_info.value.request_id == request_id_holder["request_id"]
|
||||
assert exc_info.value.status_code == 507
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_retry_error():
|
||||
request_id_holder = {"request_id": None}
|
||||
|
||||
def handler(request):
|
||||
request_id_holder["request_id"] = request.headers["x-request-id"]
|
||||
|
||||
request.send_response(429)
|
||||
request.end_headers()
|
||||
request.wfile.write(b"Try again later")
|
||||
|
||||
async with mock_lancedb_connection(handler) as db:
|
||||
with pytest.raises(RetryError, match="Hit retry limit") as exc_info:
|
||||
await db.table_names()
|
||||
|
||||
assert exc_info.value.request_id == request_id_holder["request_id"]
|
||||
assert exc_info.value.status_code == 429
|
||||
|
||||
cause = exc_info.value.__cause__
|
||||
assert isinstance(cause, HttpError)
|
||||
assert "Try again later" in str(cause)
|
||||
assert cause.request_id == request_id_holder["request_id"]
|
||||
assert cause.status_code == 429
|
||||
|
||||
@@ -193,6 +193,24 @@ def test_empty_table(db):
|
||||
tbl.add(data=data)
|
||||
|
||||
|
||||
def test_add_dictionary(db):
|
||||
schema = pa.schema(
|
||||
[
|
||||
pa.field("vector", pa.list_(pa.float32(), 2)),
|
||||
pa.field("item", pa.string()),
|
||||
pa.field("price", pa.float32()),
|
||||
]
|
||||
)
|
||||
tbl = LanceTable.create(db, "test", schema=schema)
|
||||
data = {"vector": [3.1, 4.1], "item": "foo", "price": 10.0}
|
||||
with pytest.raises(ValueError) as excep_info:
|
||||
tbl.add(data=data)
|
||||
assert (
|
||||
str(excep_info.value)
|
||||
== "Cannot add a single dictionary to a table. Use a list."
|
||||
)
|
||||
|
||||
|
||||
def test_add(db):
|
||||
schema = pa.schema(
|
||||
[
|
||||
|
||||
@@ -14,7 +14,9 @@
|
||||
|
||||
use pyo3::{
|
||||
exceptions::{PyIOError, PyNotImplementedError, PyOSError, PyRuntimeError, PyValueError},
|
||||
PyResult,
|
||||
intern,
|
||||
types::{PyAnyMethods, PyNone},
|
||||
PyErr, PyResult, Python,
|
||||
};
|
||||
|
||||
use lancedb::error::Error as LanceError;
|
||||
@@ -38,12 +40,79 @@ impl<T> PythonErrorExt<T> for std::result::Result<T, LanceError> {
|
||||
LanceError::InvalidInput { .. }
|
||||
| LanceError::InvalidTableName { .. }
|
||||
| LanceError::TableNotFound { .. }
|
||||
| LanceError::Schema { .. } => self.value_error(),
|
||||
| LanceError::Schema { .. }
|
||||
| LanceError::TableAlreadyExists { .. } => self.value_error(),
|
||||
LanceError::CreateDir { .. } => self.os_error(),
|
||||
LanceError::ObjectStore { .. } => Err(PyIOError::new_err(err.to_string())),
|
||||
LanceError::NotSupported { .. } => {
|
||||
Err(PyNotImplementedError::new_err(err.to_string()))
|
||||
}
|
||||
LanceError::Http {
|
||||
request_id,
|
||||
source,
|
||||
status_code,
|
||||
} => Python::with_gil(|py| {
|
||||
let message = err.to_string();
|
||||
let http_err_cls = py
|
||||
.import_bound(intern!(py, "lancedb.remote.errors"))?
|
||||
.getattr(intern!(py, "HttpError"))?;
|
||||
let err = http_err_cls.call1((
|
||||
message,
|
||||
request_id,
|
||||
status_code.map(|s| s.as_u16()),
|
||||
))?;
|
||||
|
||||
if let Some(cause) = source.source() {
|
||||
// The HTTP error already includes the first cause. But
|
||||
// we can add the rest of the chain if there is any more.
|
||||
let cause_err = http_from_rust_error(
|
||||
py,
|
||||
cause,
|
||||
request_id,
|
||||
status_code.map(|s| s.as_u16()),
|
||||
)?;
|
||||
err.setattr(intern!(py, "__cause__"), cause_err)?;
|
||||
}
|
||||
|
||||
Err(PyErr::from_value_bound(err))
|
||||
}),
|
||||
LanceError::Retry {
|
||||
request_id,
|
||||
request_failures,
|
||||
max_request_failures,
|
||||
connect_failures,
|
||||
max_connect_failures,
|
||||
read_failures,
|
||||
max_read_failures,
|
||||
source,
|
||||
status_code,
|
||||
} => Python::with_gil(|py| {
|
||||
let cause_err = http_from_rust_error(
|
||||
py,
|
||||
source.as_ref(),
|
||||
request_id,
|
||||
status_code.map(|s| s.as_u16()),
|
||||
)?;
|
||||
|
||||
let message = err.to_string();
|
||||
let retry_error_cls = py
|
||||
.import_bound(intern!(py, "lancedb.remote.errors"))?
|
||||
.getattr("RetryError")?;
|
||||
let err = retry_error_cls.call1((
|
||||
message,
|
||||
request_id,
|
||||
*request_failures,
|
||||
*connect_failures,
|
||||
*read_failures,
|
||||
*max_request_failures,
|
||||
*max_connect_failures,
|
||||
*max_read_failures,
|
||||
status_code.map(|s| s.as_u16()),
|
||||
))?;
|
||||
|
||||
err.setattr(intern!(py, "__cause__"), cause_err)?;
|
||||
Err(PyErr::from_value_bound(err))
|
||||
}),
|
||||
_ => self.runtime_error(),
|
||||
},
|
||||
}
|
||||
@@ -61,3 +130,24 @@ impl<T> PythonErrorExt<T> for std::result::Result<T, LanceError> {
|
||||
self.map_err(|err| PyValueError::new_err(err.to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
fn http_from_rust_error(
|
||||
py: Python<'_>,
|
||||
err: &dyn std::error::Error,
|
||||
request_id: &str,
|
||||
status_code: Option<u16>,
|
||||
) -> PyResult<PyErr> {
|
||||
let message = err.to_string();
|
||||
let http_err_cls = py.import("lancedb.remote.errors")?.getattr("HttpError")?;
|
||||
let py_err = http_err_cls.call1((message, request_id, status_code))?;
|
||||
|
||||
// Reset the traceback since it doesn't provide additional information.
|
||||
let py_err = py_err.call_method1(intern!(py, "with_traceback"), (PyNone::get_bound(py),))?;
|
||||
|
||||
if let Some(cause) = err.source() {
|
||||
let cause_err = http_from_rust_error(py, cause, request_id, status_code)?;
|
||||
py_err.setattr(intern!(py, "__cause__"), cause_err)?;
|
||||
}
|
||||
|
||||
Ok(PyErr::from_value(py_err))
|
||||
}
|
||||
|
||||
@@ -46,8 +46,37 @@ pub enum Error {
|
||||
ObjectStore { source: object_store::Error },
|
||||
#[snafu(display("lance error: {source}"))]
|
||||
Lance { source: lance::Error },
|
||||
#[snafu(display("Http error: {message}"))]
|
||||
Http { message: String },
|
||||
#[cfg(feature = "remote")]
|
||||
#[snafu(display("Http error: (request_id={request_id}) {source}"))]
|
||||
Http {
|
||||
#[snafu(source(from(reqwest::Error, Box::new)))]
|
||||
source: Box<dyn std::error::Error + Send + Sync>,
|
||||
request_id: String,
|
||||
/// Status code associated with the error, if available.
|
||||
/// This is not always available, for example when the error is due to a
|
||||
/// connection failure. It may also be missing if the request was
|
||||
/// successful but there was an error decoding the response.
|
||||
status_code: Option<reqwest::StatusCode>,
|
||||
},
|
||||
#[cfg(feature = "remote")]
|
||||
#[snafu(display(
|
||||
"Hit retry limit for request_id={request_id} (\
|
||||
request_failures={request_failures}/{max_request_failures}, \
|
||||
connect_failures={connect_failures}/{max_connect_failures}, \
|
||||
read_failures={read_failures}/{max_read_failures})"
|
||||
))]
|
||||
Retry {
|
||||
request_id: String,
|
||||
request_failures: u8,
|
||||
max_request_failures: u8,
|
||||
connect_failures: u8,
|
||||
max_connect_failures: u8,
|
||||
read_failures: u8,
|
||||
max_read_failures: u8,
|
||||
#[snafu(source(from(reqwest::Error, Box::new)))]
|
||||
source: Box<dyn std::error::Error + Send + Sync>,
|
||||
status_code: Option<reqwest::StatusCode>,
|
||||
},
|
||||
#[snafu(display("Arrow error: {source}"))]
|
||||
Arrow { source: ArrowError },
|
||||
#[snafu(display("LanceDBError: not supported: {message}"))]
|
||||
@@ -98,24 +127,6 @@ impl<T> From<PoisonError<T>> for Error {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "remote")]
|
||||
impl From<reqwest::Error> for Error {
|
||||
fn from(e: reqwest::Error) -> Self {
|
||||
Self::Http {
|
||||
message: e.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "remote")]
|
||||
impl From<url::ParseError> for Error {
|
||||
fn from(e: url::ParseError) -> Self {
|
||||
Self::Http {
|
||||
message: e.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "polars")]
|
||||
impl From<polars::prelude::PolarsError> for Error {
|
||||
fn from(source: polars::prelude::PolarsError) -> Self {
|
||||
|
||||
@@ -216,10 +216,12 @@ impl RestfulLanceDbClient<Sender> {
|
||||
host_override: Option<String>,
|
||||
client_config: ClientConfig,
|
||||
) -> Result<Self> {
|
||||
let parsed_url = url::Url::parse(db_url)?;
|
||||
let parsed_url = url::Url::parse(db_url).map_err(|err| Error::InvalidInput {
|
||||
message: format!("db_url is not a valid URL. '{db_url}'. Error: {err}"),
|
||||
})?;
|
||||
debug_assert_eq!(parsed_url.scheme(), "db");
|
||||
if !parsed_url.has_host() {
|
||||
return Err(Error::Http {
|
||||
return Err(Error::InvalidInput {
|
||||
message: format!("Invalid database URL (missing host) '{}'", db_url),
|
||||
});
|
||||
}
|
||||
@@ -255,7 +257,11 @@ impl RestfulLanceDbClient<Sender> {
|
||||
host_override.is_some(),
|
||||
)?)
|
||||
.user_agent(client_config.user_agent)
|
||||
.build()?;
|
||||
.build()
|
||||
.map_err(|err| Error::Other {
|
||||
message: "Failed to build HTTP client".into(),
|
||||
source: Some(Box::new(err)),
|
||||
})?;
|
||||
let host = match host_override {
|
||||
Some(host_override) => host_override,
|
||||
None => format!("https://{}.{}.api.lancedb.com", db_name, region),
|
||||
@@ -284,7 +290,7 @@ impl<S: HttpSend> RestfulLanceDbClient<S> {
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert(
|
||||
"x-api-key",
|
||||
HeaderValue::from_str(api_key).map_err(|_| Error::Http {
|
||||
HeaderValue::from_str(api_key).map_err(|_| Error::InvalidInput {
|
||||
message: "non-ascii api key provided".to_string(),
|
||||
})?,
|
||||
);
|
||||
@@ -292,7 +298,7 @@ impl<S: HttpSend> RestfulLanceDbClient<S> {
|
||||
let host = format!("{}.local.api.lancedb.com", db_name);
|
||||
headers.insert(
|
||||
"Host",
|
||||
HeaderValue::from_str(&host).map_err(|_| Error::Http {
|
||||
HeaderValue::from_str(&host).map_err(|_| Error::InvalidInput {
|
||||
message: format!("non-ascii database name '{}' provided", db_name),
|
||||
})?,
|
||||
);
|
||||
@@ -300,7 +306,7 @@ impl<S: HttpSend> RestfulLanceDbClient<S> {
|
||||
if has_host_override {
|
||||
headers.insert(
|
||||
"x-lancedb-database",
|
||||
HeaderValue::from_str(db_name).map_err(|_| Error::Http {
|
||||
HeaderValue::from_str(db_name).map_err(|_| Error::InvalidInput {
|
||||
message: format!("non-ascii database name '{}' provided", db_name),
|
||||
})?,
|
||||
);
|
||||
@@ -319,22 +325,30 @@ impl<S: HttpSend> RestfulLanceDbClient<S> {
|
||||
self.client.post(full_uri)
|
||||
}
|
||||
|
||||
pub async fn send(&self, req: RequestBuilder, with_retry: bool) -> Result<Response> {
|
||||
pub async fn send(&self, req: RequestBuilder, with_retry: bool) -> Result<(String, Response)> {
|
||||
let (client, request) = req.build_split();
|
||||
let mut request = request.unwrap();
|
||||
|
||||
// Set a request id.
|
||||
// TODO: allow the user to supply this, through middleware?
|
||||
if request.headers().get(REQUEST_ID_HEADER).is_none() {
|
||||
let request_id = uuid::Uuid::new_v4();
|
||||
let request_id = HeaderValue::from_str(&request_id.to_string()).unwrap();
|
||||
request.headers_mut().insert(REQUEST_ID_HEADER, request_id);
|
||||
}
|
||||
let request_id = if let Some(request_id) = request.headers().get(REQUEST_ID_HEADER) {
|
||||
request_id.to_str().unwrap().to_string()
|
||||
} else {
|
||||
let request_id = uuid::Uuid::new_v4().to_string();
|
||||
let header = HeaderValue::from_str(&request_id).unwrap();
|
||||
request.headers_mut().insert(REQUEST_ID_HEADER, header);
|
||||
request_id
|
||||
};
|
||||
|
||||
if with_retry {
|
||||
self.send_with_retry_impl(client, request).await
|
||||
self.send_with_retry_impl(client, request, request_id).await
|
||||
} else {
|
||||
Ok(self.sender.send(&client, request).await?)
|
||||
let response = self
|
||||
.sender
|
||||
.send(&client, request)
|
||||
.await
|
||||
.err_to_http(request_id.clone())?;
|
||||
Ok((request_id, response))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -342,98 +356,178 @@ impl<S: HttpSend> RestfulLanceDbClient<S> {
|
||||
&self,
|
||||
client: reqwest::Client,
|
||||
req: Request,
|
||||
) -> Result<Response> {
|
||||
let mut request_failures = 0;
|
||||
let mut connect_failures = 0;
|
||||
let mut read_failures = 0;
|
||||
request_id: String,
|
||||
) -> Result<(String, Response)> {
|
||||
let mut retry_counter = RetryCounter::new(&self.retry_config, request_id);
|
||||
|
||||
loop {
|
||||
// This only works if the request body is not a stream. If it is
|
||||
// a stream, we can't use the retry path. We would need to implement
|
||||
// an outer retry.
|
||||
let request = req.try_clone().ok_or_else(|| Error::Http {
|
||||
let request = req.try_clone().ok_or_else(|| Error::Runtime {
|
||||
message: "Attempted to retry a request that cannot be cloned".to_string(),
|
||||
})?;
|
||||
let response = self.sender.send(&client, request).await;
|
||||
let status_code = response.as_ref().map(|r| r.status());
|
||||
match status_code {
|
||||
Ok(status) if status.is_success() => return Ok(response?),
|
||||
Ok(status) if self.retry_config.statuses.contains(&status) => {
|
||||
request_failures += 1;
|
||||
if request_failures >= self.retry_config.retries {
|
||||
// TODO: better error
|
||||
return Err(Error::Runtime {
|
||||
message: format!(
|
||||
"Request failed after {} retries with status code {}",
|
||||
request_failures, status
|
||||
),
|
||||
});
|
||||
}
|
||||
let response = self
|
||||
.sender
|
||||
.send(&client, request)
|
||||
.await
|
||||
.map(|r| (r.status(), r));
|
||||
match response {
|
||||
Ok((status, response)) if status.is_success() => {
|
||||
return Ok((retry_counter.request_id, response))
|
||||
}
|
||||
Ok((status, response)) if self.retry_config.statuses.contains(&status) => {
|
||||
let source = self
|
||||
.check_response(&retry_counter.request_id, response)
|
||||
.await
|
||||
.unwrap_err();
|
||||
retry_counter.increment_request_failures(source)?;
|
||||
}
|
||||
Err(err) if err.is_connect() => {
|
||||
connect_failures += 1;
|
||||
if connect_failures >= self.retry_config.connect_retries {
|
||||
return Err(Error::Runtime {
|
||||
message: format!(
|
||||
"Request failed after {} connect retries with error: {}",
|
||||
connect_failures, err
|
||||
),
|
||||
});
|
||||
}
|
||||
retry_counter.increment_connect_failures(err)?;
|
||||
}
|
||||
Err(err) if err.is_timeout() || err.is_body() || err.is_decode() => {
|
||||
read_failures += 1;
|
||||
if read_failures >= self.retry_config.read_retries {
|
||||
return Err(Error::Runtime {
|
||||
message: format!(
|
||||
"Request failed after {} read retries with error: {}",
|
||||
read_failures, err
|
||||
),
|
||||
});
|
||||
}
|
||||
retry_counter.increment_read_failures(err)?;
|
||||
}
|
||||
Ok(_) | Err(_) => return Ok(response?),
|
||||
Err(err) => {
|
||||
let status_code = err.status();
|
||||
return Err(Error::Http {
|
||||
source: Box::new(err),
|
||||
request_id: retry_counter.request_id,
|
||||
status_code,
|
||||
});
|
||||
}
|
||||
Ok((_, response)) => return Ok((retry_counter.request_id, response)),
|
||||
}
|
||||
|
||||
let backoff = self.retry_config.backoff_factor * (2.0f32.powi(request_failures as i32));
|
||||
let jitter = rand::random::<f32>() * self.retry_config.backoff_jitter;
|
||||
let sleep_time = Duration::from_secs_f32(backoff + jitter);
|
||||
debug!(
|
||||
"Retrying request {:?} ({}/{} connect, {}/{} read, {}/{} read) in {:?}",
|
||||
req.headers()
|
||||
.get("x-request-id")
|
||||
.and_then(|v| v.to_str().ok()),
|
||||
connect_failures,
|
||||
self.retry_config.connect_retries,
|
||||
request_failures,
|
||||
self.retry_config.retries,
|
||||
read_failures,
|
||||
self.retry_config.read_retries,
|
||||
sleep_time
|
||||
);
|
||||
let sleep_time = retry_counter.next_sleep_time();
|
||||
tokio::time::sleep(sleep_time).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn rsp_to_str(response: Response) -> String {
|
||||
pub async fn check_response(&self, request_id: &str, response: Response) -> Result<Response> {
|
||||
// Try to get the response text, but if that fails, just return the status code
|
||||
let status = response.status();
|
||||
response.text().await.unwrap_or_else(|_| status.to_string())
|
||||
if status.is_success() {
|
||||
Ok(response)
|
||||
} else {
|
||||
let response_text = response.text().await.ok();
|
||||
let message = if let Some(response_text) = response_text {
|
||||
format!("{}: {}", status, response_text)
|
||||
} else {
|
||||
status.to_string()
|
||||
};
|
||||
Err(Error::Http {
|
||||
source: message.into(),
|
||||
request_id: request_id.into(),
|
||||
status_code: Some(status),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct RetryCounter<'a> {
|
||||
request_failures: u8,
|
||||
connect_failures: u8,
|
||||
read_failures: u8,
|
||||
config: &'a ResolvedRetryConfig,
|
||||
request_id: String,
|
||||
}
|
||||
|
||||
impl<'a> RetryCounter<'a> {
|
||||
fn new(config: &'a ResolvedRetryConfig, request_id: String) -> Self {
|
||||
Self {
|
||||
request_failures: 0,
|
||||
connect_failures: 0,
|
||||
read_failures: 0,
|
||||
config,
|
||||
request_id,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn check_response(&self, response: Response) -> Result<Response> {
|
||||
let status_int: u16 = u16::from(response.status());
|
||||
if (400..500).contains(&status_int) {
|
||||
Err(Error::InvalidInput {
|
||||
message: Self::rsp_to_str(response).await,
|
||||
})
|
||||
} else if status_int != 200 {
|
||||
Err(Error::Runtime {
|
||||
message: Self::rsp_to_str(response).await,
|
||||
fn check_out_of_retries(
|
||||
&self,
|
||||
source: Box<dyn std::error::Error + Send + Sync>,
|
||||
status_code: Option<reqwest::StatusCode>,
|
||||
) -> Result<()> {
|
||||
if self.request_failures >= self.config.retries
|
||||
|| self.connect_failures >= self.config.connect_retries
|
||||
|| self.read_failures >= self.config.read_retries
|
||||
{
|
||||
Err(Error::Retry {
|
||||
request_id: self.request_id.clone(),
|
||||
request_failures: self.request_failures,
|
||||
max_request_failures: self.config.retries,
|
||||
connect_failures: self.connect_failures,
|
||||
max_connect_failures: self.config.connect_retries,
|
||||
read_failures: self.read_failures,
|
||||
max_read_failures: self.config.read_retries,
|
||||
source,
|
||||
status_code,
|
||||
})
|
||||
} else {
|
||||
Ok(response)
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn increment_request_failures(&mut self, source: crate::Error) -> Result<()> {
|
||||
self.request_failures += 1;
|
||||
let status_code = if let crate::Error::Http { status_code, .. } = &source {
|
||||
*status_code
|
||||
} else {
|
||||
None
|
||||
};
|
||||
self.check_out_of_retries(Box::new(source), status_code)
|
||||
}
|
||||
|
||||
fn increment_connect_failures(&mut self, source: reqwest::Error) -> Result<()> {
|
||||
self.connect_failures += 1;
|
||||
let status_code = source.status();
|
||||
self.check_out_of_retries(Box::new(source), status_code)
|
||||
}
|
||||
|
||||
fn increment_read_failures(&mut self, source: reqwest::Error) -> Result<()> {
|
||||
self.read_failures += 1;
|
||||
let status_code = source.status();
|
||||
self.check_out_of_retries(Box::new(source), status_code)
|
||||
}
|
||||
|
||||
fn next_sleep_time(&self) -> Duration {
|
||||
let backoff = self.config.backoff_factor * (2.0f32.powi(self.request_failures as i32));
|
||||
let jitter = rand::random::<f32>() * self.config.backoff_jitter;
|
||||
let sleep_time = Duration::from_secs_f32(backoff + jitter);
|
||||
debug!(
|
||||
"Retrying request {:?} ({}/{} connect, {}/{} read, {}/{} read) in {:?}",
|
||||
self.request_id,
|
||||
self.connect_failures,
|
||||
self.config.connect_retries,
|
||||
self.request_failures,
|
||||
self.config.retries,
|
||||
self.read_failures,
|
||||
self.config.read_retries,
|
||||
sleep_time
|
||||
);
|
||||
sleep_time
|
||||
}
|
||||
}
|
||||
|
||||
pub trait RequestResultExt {
|
||||
type Output;
|
||||
fn err_to_http(self, request_id: String) -> Result<Self::Output>;
|
||||
}
|
||||
|
||||
impl<T> RequestResultExt for reqwest::Result<T> {
|
||||
type Output = T;
|
||||
fn err_to_http(self, request_id: String) -> Result<T> {
|
||||
self.map_err(|err| {
|
||||
let status_code = err.status();
|
||||
Error::Http {
|
||||
source: Box::new(err),
|
||||
request_id,
|
||||
status_code,
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -29,7 +29,7 @@ use crate::embeddings::EmbeddingRegistry;
|
||||
use crate::error::Result;
|
||||
use crate::Table;
|
||||
|
||||
use super::client::{ClientConfig, HttpSend, RestfulLanceDbClient, Sender};
|
||||
use super::client::{ClientConfig, HttpSend, RequestResultExt, RestfulLanceDbClient, Sender};
|
||||
use super::table::RemoteTable;
|
||||
use super::util::batches_to_ipc_bytes;
|
||||
use super::ARROW_STREAM_CONTENT_TYPE;
|
||||
@@ -105,9 +105,13 @@ impl<S: HttpSend> ConnectionInternal for RemoteDatabase<S> {
|
||||
if let Some(start_after) = options.start_after {
|
||||
req = req.query(&[("page_token", start_after)]);
|
||||
}
|
||||
let rsp = self.client.send(req, true).await?;
|
||||
let rsp = self.client.check_response(rsp).await?;
|
||||
let tables = rsp.json::<ListTablesResponse>().await?.tables;
|
||||
let (request_id, rsp) = self.client.send(req, true).await?;
|
||||
let rsp = self.client.check_response(&request_id, rsp).await?;
|
||||
let tables = rsp
|
||||
.json::<ListTablesResponse>()
|
||||
.await
|
||||
.err_to_http(request_id)?
|
||||
.tables;
|
||||
for table in &tables {
|
||||
self.table_cache.insert(table.clone(), ()).await;
|
||||
}
|
||||
@@ -130,13 +134,11 @@ impl<S: HttpSend> ConnectionInternal for RemoteDatabase<S> {
|
||||
.client
|
||||
.post(&format!("/v1/table/{}/create/", options.name))
|
||||
.body(data_buffer)
|
||||
.header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE)
|
||||
// This is currently expected by LanceDb cloud but will be removed soon.
|
||||
.header("x-request-id", "na");
|
||||
let rsp = self.client.send(req, false).await?;
|
||||
.header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE);
|
||||
let (request_id, rsp) = self.client.send(req, false).await?;
|
||||
|
||||
if rsp.status() == StatusCode::BAD_REQUEST {
|
||||
let body = rsp.text().await?;
|
||||
let body = rsp.text().await.err_to_http(request_id.clone())?;
|
||||
if body.contains("already exists") {
|
||||
return Err(crate::Error::TableAlreadyExists { name: options.name });
|
||||
} else {
|
||||
@@ -144,7 +146,7 @@ impl<S: HttpSend> ConnectionInternal for RemoteDatabase<S> {
|
||||
}
|
||||
}
|
||||
|
||||
self.client.check_response(rsp).await?;
|
||||
self.client.check_response(&request_id, rsp).await?;
|
||||
|
||||
self.table_cache.insert(options.name.clone(), ()).await;
|
||||
|
||||
@@ -160,11 +162,11 @@ impl<S: HttpSend> ConnectionInternal for RemoteDatabase<S> {
|
||||
let req = self
|
||||
.client
|
||||
.get(&format!("/v1/table/{}/describe/", options.name));
|
||||
let resp = self.client.send(req, true).await?;
|
||||
let (request_id, resp) = self.client.send(req, true).await?;
|
||||
if resp.status() == StatusCode::NOT_FOUND {
|
||||
return Err(crate::Error::TableNotFound { name: options.name });
|
||||
}
|
||||
self.client.check_response(resp).await?;
|
||||
self.client.check_response(&request_id, resp).await?;
|
||||
}
|
||||
|
||||
Ok(Table::new(Arc::new(RemoteTable::new(
|
||||
@@ -178,8 +180,8 @@ impl<S: HttpSend> ConnectionInternal for RemoteDatabase<S> {
|
||||
.client
|
||||
.post(&format!("/v1/table/{}/rename/", current_name));
|
||||
let req = req.json(&serde_json::json!({ "new_table_name": new_name }));
|
||||
let resp = self.client.send(req, false).await?;
|
||||
self.client.check_response(resp).await?;
|
||||
let (request_id, resp) = self.client.send(req, false).await?;
|
||||
self.client.check_response(&request_id, resp).await?;
|
||||
self.table_cache.remove(current_name).await;
|
||||
self.table_cache.insert(new_name.into(), ()).await;
|
||||
Ok(())
|
||||
@@ -187,8 +189,8 @@ impl<S: HttpSend> ConnectionInternal for RemoteDatabase<S> {
|
||||
|
||||
async fn drop_table(&self, name: &str) -> Result<()> {
|
||||
let req = self.client.post(&format!("/v1/table/{}/drop/", name));
|
||||
let resp = self.client.send(req, true).await?;
|
||||
self.client.check_response(resp).await?;
|
||||
let (request_id, resp) = self.client.send(req, true).await?;
|
||||
self.client.check_response(&request_id, resp).await?;
|
||||
self.table_cache.remove(name).await;
|
||||
Ok(())
|
||||
}
|
||||
@@ -206,16 +208,57 @@ impl<S: HttpSend> ConnectionInternal for RemoteDatabase<S> {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
use std::sync::{Arc, OnceLock};
|
||||
|
||||
use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator};
|
||||
use arrow_schema::{DataType, Field, Schema};
|
||||
|
||||
use crate::{
|
||||
remote::{ARROW_STREAM_CONTENT_TYPE, JSON_CONTENT_TYPE},
|
||||
Connection,
|
||||
Connection, Error,
|
||||
};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_retries() {
|
||||
// We'll record the request_id here, to check it matches the one in the error.
|
||||
let seen_request_id = Arc::new(OnceLock::new());
|
||||
let seen_request_id_ref = seen_request_id.clone();
|
||||
let conn = Connection::new_with_handler(move |request| {
|
||||
// Request id should be the same on each retry.
|
||||
let request_id = request.headers()["x-request-id"]
|
||||
.to_str()
|
||||
.unwrap()
|
||||
.to_string();
|
||||
let seen_id = seen_request_id_ref.get_or_init(|| request_id.clone());
|
||||
assert_eq!(&request_id, seen_id);
|
||||
|
||||
http::Response::builder()
|
||||
.status(500)
|
||||
.body("internal server error")
|
||||
.unwrap()
|
||||
});
|
||||
let result = conn.table_names().execute().await;
|
||||
if let Err(Error::Retry {
|
||||
request_id,
|
||||
request_failures,
|
||||
max_request_failures,
|
||||
source,
|
||||
..
|
||||
}) = result
|
||||
{
|
||||
let expected_id = seen_request_id.get().unwrap();
|
||||
assert_eq!(&request_id, expected_id);
|
||||
assert_eq!(request_failures, max_request_failures);
|
||||
assert!(
|
||||
source.to_string().contains("internal server error"),
|
||||
"source: {:?}",
|
||||
source
|
||||
);
|
||||
} else {
|
||||
panic!("unexpected result: {:?}", result);
|
||||
};
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_table_names() {
|
||||
let conn = Connection::new_with_handler(|request| {
|
||||
|
||||
@@ -34,6 +34,7 @@ use crate::{
|
||||
},
|
||||
};
|
||||
|
||||
use super::client::RequestResultExt;
|
||||
use super::client::{HttpSend, RestfulLanceDbClient, Sender};
|
||||
use super::{ARROW_STREAM_CONTENT_TYPE, JSON_CONTENT_TYPE};
|
||||
|
||||
@@ -53,15 +54,25 @@ impl<S: HttpSend> RemoteTable<S> {
|
||||
let request = self
|
||||
.client
|
||||
.post(&format!("/v1/table/{}/describe/", self.name));
|
||||
let response = self.client.send(request, true).await?;
|
||||
let (request_id, response) = self.client.send(request, true).await?;
|
||||
|
||||
let response = self.check_table_response(response).await?;
|
||||
let response = self.check_table_response(&request_id, response).await?;
|
||||
|
||||
let body = response.text().await?;
|
||||
|
||||
serde_json::from_str(&body).map_err(|e| Error::Http {
|
||||
message: format!("Failed to parse table description: {}", e),
|
||||
})
|
||||
match response.text().await {
|
||||
Ok(body) => serde_json::from_str(&body).map_err(|e| Error::Http {
|
||||
source: format!("Failed to parse table description: {}", e).into(),
|
||||
request_id,
|
||||
status_code: None,
|
||||
}),
|
||||
Err(err) => {
|
||||
let status_code = err.status();
|
||||
Err(Error::Http {
|
||||
source: Box::new(err),
|
||||
request_id,
|
||||
status_code,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn reader_as_body(data: Box<dyn RecordBatchReader + Send>) -> Result<reqwest::Body> {
|
||||
@@ -87,18 +98,23 @@ impl<S: HttpSend> RemoteTable<S> {
|
||||
Ok(reqwest::Body::wrap_stream(body_stream))
|
||||
}
|
||||
|
||||
async fn check_table_response(&self, response: reqwest::Response) -> Result<reqwest::Response> {
|
||||
async fn check_table_response(
|
||||
&self,
|
||||
request_id: &str,
|
||||
response: reqwest::Response,
|
||||
) -> Result<reqwest::Response> {
|
||||
if response.status() == StatusCode::NOT_FOUND {
|
||||
return Err(Error::TableNotFound {
|
||||
name: self.name.clone(),
|
||||
});
|
||||
}
|
||||
|
||||
self.client.check_response(response).await
|
||||
self.client.check_response(request_id, response).await
|
||||
}
|
||||
|
||||
async fn read_arrow_stream(
|
||||
&self,
|
||||
request_id: &str,
|
||||
body: reqwest::Response,
|
||||
) -> Result<SendableRecordBatchStream> {
|
||||
// Assert that the content type is correct
|
||||
@@ -106,24 +122,31 @@ impl<S: HttpSend> RemoteTable<S> {
|
||||
.headers()
|
||||
.get(CONTENT_TYPE)
|
||||
.ok_or_else(|| Error::Http {
|
||||
message: "Missing content type".into(),
|
||||
source: "Missing content type".into(),
|
||||
request_id: request_id.to_string(),
|
||||
status_code: None,
|
||||
})?
|
||||
.to_str()
|
||||
.map_err(|e| Error::Http {
|
||||
message: format!("Failed to parse content type: {}", e),
|
||||
source: format!("Failed to parse content type: {}", e).into(),
|
||||
request_id: request_id.to_string(),
|
||||
status_code: None,
|
||||
})?;
|
||||
if content_type != ARROW_STREAM_CONTENT_TYPE {
|
||||
return Err(Error::Http {
|
||||
message: format!(
|
||||
source: format!(
|
||||
"Expected content type {}, got {}",
|
||||
ARROW_STREAM_CONTENT_TYPE, content_type
|
||||
),
|
||||
)
|
||||
.into(),
|
||||
request_id: request_id.to_string(),
|
||||
status_code: None,
|
||||
});
|
||||
}
|
||||
|
||||
// There isn't a way to actually stream this data yet. I have an upstream issue:
|
||||
// https://github.com/apache/arrow-rs/issues/6420
|
||||
let body = body.bytes().await?;
|
||||
let body = body.bytes().await.err_to_http(request_id.into())?;
|
||||
let reader = StreamReader::try_new(body.reader(), None)?;
|
||||
let schema = reader.schema();
|
||||
let stream = futures::stream::iter(reader).map_err(DataFusionError::from);
|
||||
@@ -259,14 +282,16 @@ impl<S: HttpSend> TableInternal for RemoteTable<S> {
|
||||
request = request.json(&serde_json::json!({}));
|
||||
}
|
||||
|
||||
let response = self.client.send(request, true).await?;
|
||||
let (request_id, response) = self.client.send(request, true).await?;
|
||||
|
||||
let response = self.check_table_response(response).await?;
|
||||
let response = self.check_table_response(&request_id, response).await?;
|
||||
|
||||
let body = response.text().await?;
|
||||
let body = response.text().await.err_to_http(request_id.clone())?;
|
||||
|
||||
serde_json::from_str(&body).map_err(|e| Error::Http {
|
||||
message: format!("Failed to parse row count: {}", e),
|
||||
source: format!("Failed to parse row count: {}", e).into(),
|
||||
request_id,
|
||||
status_code: None,
|
||||
})
|
||||
}
|
||||
async fn add(
|
||||
@@ -288,9 +313,9 @@ impl<S: HttpSend> TableInternal for RemoteTable<S> {
|
||||
}
|
||||
}
|
||||
|
||||
let response = self.client.send(request, false).await?;
|
||||
let (request_id, response) = self.client.send(request, false).await?;
|
||||
|
||||
self.check_table_response(response).await?;
|
||||
self.check_table_response(&request_id, response).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -339,9 +364,9 @@ impl<S: HttpSend> TableInternal for RemoteTable<S> {
|
||||
|
||||
let request = request.json(&body);
|
||||
|
||||
let response = self.client.send(request, true).await?;
|
||||
let (request_id, response) = self.client.send(request, true).await?;
|
||||
|
||||
let stream = self.read_arrow_stream(response).await?;
|
||||
let stream = self.read_arrow_stream(&request_id, response).await?;
|
||||
|
||||
Ok(Arc::new(OneShotExec::new(stream)))
|
||||
}
|
||||
@@ -361,9 +386,9 @@ impl<S: HttpSend> TableInternal for RemoteTable<S> {
|
||||
|
||||
let request = request.json(&body);
|
||||
|
||||
let response = self.client.send(request, true).await?;
|
||||
let (request_id, response) = self.client.send(request, true).await?;
|
||||
|
||||
let stream = self.read_arrow_stream(response).await?;
|
||||
let stream = self.read_arrow_stream(&request_id, response).await?;
|
||||
|
||||
Ok(DatasetRecordBatchStream::new(stream))
|
||||
}
|
||||
@@ -383,17 +408,20 @@ impl<S: HttpSend> TableInternal for RemoteTable<S> {
|
||||
"only_if": update.filter,
|
||||
}));
|
||||
|
||||
let response = self.client.send(request, false).await?;
|
||||
let (request_id, response) = self.client.send(request, false).await?;
|
||||
|
||||
let response = self.check_table_response(response).await?;
|
||||
let response = self.check_table_response(&request_id, response).await?;
|
||||
|
||||
let body = response.text().await?;
|
||||
let body = response.text().await.err_to_http(request_id.clone())?;
|
||||
|
||||
serde_json::from_str(&body).map_err(|e| Error::Http {
|
||||
message: format!(
|
||||
source: format!(
|
||||
"Failed to parse updated rows result from response {}: {}",
|
||||
body, e
|
||||
),
|
||||
)
|
||||
.into(),
|
||||
request_id,
|
||||
status_code: None,
|
||||
})
|
||||
}
|
||||
async fn delete(&self, predicate: &str) -> Result<()> {
|
||||
@@ -402,8 +430,8 @@ impl<S: HttpSend> TableInternal for RemoteTable<S> {
|
||||
.client
|
||||
.post(&format!("/v1/table/{}/delete/", self.name))
|
||||
.json(&body);
|
||||
let response = self.client.send(request, false).await?;
|
||||
self.check_table_response(response).await?;
|
||||
let (request_id, response) = self.client.send(request, false).await?;
|
||||
self.check_table_response(&request_id, response).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -474,9 +502,9 @@ impl<S: HttpSend> TableInternal for RemoteTable<S> {
|
||||
|
||||
let request = request.json(&body);
|
||||
|
||||
let response = self.client.send(request, false).await?;
|
||||
let (request_id, response) = self.client.send(request, false).await?;
|
||||
|
||||
self.check_table_response(response).await?;
|
||||
self.check_table_response(&request_id, response).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -495,9 +523,9 @@ impl<S: HttpSend> TableInternal for RemoteTable<S> {
|
||||
.header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE)
|
||||
.body(body);
|
||||
|
||||
let response = self.client.send(request, false).await?;
|
||||
let (request_id, response) = self.client.send(request, false).await?;
|
||||
|
||||
self.check_table_response(response).await?;
|
||||
self.check_table_response(&request_id, response).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -525,28 +553,79 @@ impl<S: HttpSend> TableInternal for RemoteTable<S> {
|
||||
message: "drop_columns is not yet supported.".into(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn list_indices(&self) -> Result<Vec<IndexConfig>> {
|
||||
Err(Error::NotSupported {
|
||||
message: "list_indices is not yet supported.".into(),
|
||||
})
|
||||
// Make request to list the indices
|
||||
let request = self
|
||||
.client
|
||||
.post(&format!("/v1/table/{}/index/list/", self.name));
|
||||
let (request_id, response) = self.client.send(request, true).await?;
|
||||
let response = self.check_table_response(&request_id, response).await?;
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct ListIndicesResponse {
|
||||
indexes: Vec<IndexConfigResponse>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct IndexConfigResponse {
|
||||
index_name: String,
|
||||
columns: Vec<String>,
|
||||
}
|
||||
|
||||
let body = response.text().await.err_to_http(request_id.clone())?;
|
||||
let body: ListIndicesResponse = serde_json::from_str(&body).map_err(|err| Error::Http {
|
||||
source: format!(
|
||||
"Failed to parse list_indices response: {}, body: {}",
|
||||
err, body
|
||||
)
|
||||
.into(),
|
||||
request_id,
|
||||
status_code: None,
|
||||
})?;
|
||||
|
||||
// Make request to get stats for each index, so we get the index type.
|
||||
// This is a bit inefficient, but it's the only way to get the index type.
|
||||
let mut futures = Vec::with_capacity(body.indexes.len());
|
||||
for index in body.indexes {
|
||||
let future = async move {
|
||||
match self.index_stats(&index.index_name).await {
|
||||
Ok(Some(stats)) => Ok(Some(IndexConfig {
|
||||
name: index.index_name,
|
||||
index_type: stats.index_type,
|
||||
columns: index.columns,
|
||||
})),
|
||||
Ok(None) => Ok(None), // The index must have been deleted since we listed it.
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
};
|
||||
futures.push(future);
|
||||
}
|
||||
let results = futures::future::try_join_all(futures).await?;
|
||||
let index_configs = results.into_iter().flatten().collect();
|
||||
|
||||
Ok(index_configs)
|
||||
}
|
||||
|
||||
async fn index_stats(&self, index_name: &str) -> Result<Option<IndexStatistics>> {
|
||||
let request = self.client.post(&format!(
|
||||
"/v1/table/{}/index/{}/stats/",
|
||||
self.name, index_name
|
||||
));
|
||||
let response = self.client.send(request, true).await?;
|
||||
let (request_id, response) = self.client.send(request, true).await?;
|
||||
|
||||
if response.status() == StatusCode::NOT_FOUND {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let response = self.check_table_response(response).await?;
|
||||
let response = self.check_table_response(&request_id, response).await?;
|
||||
|
||||
let body = response.text().await?;
|
||||
let body = response.text().await.err_to_http(request_id.clone())?;
|
||||
|
||||
let stats = serde_json::from_str(&body).map_err(|e| Error::Http {
|
||||
message: format!("Failed to parse index statistics: {}", e),
|
||||
source: format!("Failed to parse index statistics: {}", e).into(),
|
||||
request_id,
|
||||
status_code: None,
|
||||
})?;
|
||||
|
||||
Ok(Some(stats))
|
||||
@@ -1181,6 +1260,69 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_list_indices() {
|
||||
let table = Table::new_with_handler("my_table", |request| {
|
||||
assert_eq!(request.method(), "POST");
|
||||
|
||||
let response_body = match request.url().path() {
|
||||
"/v1/table/my_table/index/list/" => {
|
||||
serde_json::json!({
|
||||
"indexes": [
|
||||
{
|
||||
"index_name": "vector_idx",
|
||||
"index_uuid": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
|
||||
"columns": ["vector"],
|
||||
"index_status": "done",
|
||||
},
|
||||
{
|
||||
"index_name": "my_idx",
|
||||
"index_uuid": "34255f64-5717-4562-b3fc-2c963f66afa6",
|
||||
"columns": ["my_column"],
|
||||
"index_status": "done",
|
||||
},
|
||||
]
|
||||
})
|
||||
}
|
||||
"/v1/table/my_table/index/vector_idx/stats/" => {
|
||||
serde_json::json!({
|
||||
"num_indexed_rows": 100000,
|
||||
"num_unindexed_rows": 0,
|
||||
"index_type": "IVF_PQ",
|
||||
"distance_type": "l2"
|
||||
})
|
||||
}
|
||||
"/v1/table/my_table/index/my_idx/stats/" => {
|
||||
serde_json::json!({
|
||||
"num_indexed_rows": 100000,
|
||||
"num_unindexed_rows": 0,
|
||||
"index_type": "LABEL_LIST"
|
||||
})
|
||||
}
|
||||
path => panic!("Unexpected path: {}", path),
|
||||
};
|
||||
http::Response::builder()
|
||||
.status(200)
|
||||
.body(serde_json::to_string(&response_body).unwrap())
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
let indices = table.list_indices().await.unwrap();
|
||||
let expected = vec![
|
||||
IndexConfig {
|
||||
name: "vector_idx".into(),
|
||||
index_type: IndexType::IvfPq,
|
||||
columns: vec!["vector".into()],
|
||||
},
|
||||
IndexConfig {
|
||||
name: "my_idx".into(),
|
||||
index_type: IndexType::LabelList,
|
||||
columns: vec!["my_column".into()],
|
||||
},
|
||||
];
|
||||
assert_eq!(indices, expected);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_index_stats() {
|
||||
let table = Table::new_with_handler("my_table", |request| {
|
||||
|
||||
Reference in New Issue
Block a user