From 25dea4e859b970aec576f0743ad33d8d2f7d6a0c Mon Sep 17 00:00:00 2001 From: Bert Date: Wed, 10 Apr 2024 11:54:47 -0400 Subject: [PATCH] BREAKING CHANGE: Check if remote table exists when opening (with caching) (#1214) - make open table behaviour consistent: - remote tables will check if the table exists by calling /describe and throwing an error if the call doesn't succeed - this is similar to the behaviour for local tables where we will raise an exception when opening the table if the local dataset doesn't exist - The table names are cached in the client with a TTL - Also fixes a small bug where if the remote error response was deserialized from JSON as an object, we'd print it resulting in the unhelpful error message: `Error: Server Error, status: 404, message: Not Found: [object Object]` --- node/src/remote/client.ts | 6 +++++- node/src/remote/index.ts | 14 ++++++++++++- node/src/test/test.ts | 18 +++++++++++++++- node/src/util.ts | 33 ++++++++++++++++++++++++++++++ python/python/lancedb/remote/db.py | 18 ++++++++-------- 5 files changed, 76 insertions(+), 13 deletions(-) diff --git a/node/src/remote/client.ts b/node/src/remote/client.ts index 04f8e43a..9992abbb 100644 --- a/node/src/remote/client.ts +++ b/node/src/remote/client.ts @@ -111,7 +111,11 @@ async function decodeErrorData( if (responseType === 'arraybuffer') { return new TextDecoder().decode(errorData) } else { - return errorData + if (typeof errorData === 'object') { + return JSON.stringify(errorData) + } + + return errorData } } diff --git a/node/src/remote/index.ts b/node/src/remote/index.ts index 17b9443d..29f8a2fb 100644 --- a/node/src/remote/index.ts +++ b/node/src/remote/index.ts @@ -38,7 +38,7 @@ import { fromRecordsToStreamBuffer, fromTableToStreamBuffer } from '../arrow' -import { toSQL } from '../util' +import { toSQL, TTLCache } from '../util' import { type HttpMiddleware } from '../middleware' /** @@ -47,6 +47,7 @@ import { type HttpMiddleware } from '../middleware' export class RemoteConnection implements Connection { private _client: HttpLancedbClient private readonly _dbName: string + private readonly _tableCache = new TTLCache(300_000) constructor (opts: ConnectionOptions) { if (!opts.uri.startsWith('db://')) { @@ -89,6 +90,9 @@ export class RemoteConnection implements Connection { page_token: pageToken }) const body = await response.body() + for (const table of body.tables) { + this._tableCache.set(table, true) + } return body.tables } @@ -101,6 +105,12 @@ export class RemoteConnection implements Connection { name: string, embeddings?: EmbeddingFunction ): Promise> { + // check if the table exists + if (this._tableCache.get(name) === undefined) { + await this._client.post(`/v1/table/${encodeURIComponent(name)}/describe/`) + this._tableCache.set(name, true) + } + if (embeddings !== undefined) { return new RemoteTable(this._client, name, embeddings) } else { @@ -169,6 +179,7 @@ export class RemoteConnection implements Connection { ) } + this._tableCache.set(tableName, true) if (embeddings === undefined) { return new RemoteTable(this._client, tableName) } else { @@ -178,6 +189,7 @@ export class RemoteConnection implements Connection { async dropTable (name: string): Promise { await this._client.post(`/v1/table/${encodeURIComponent(name)}/drop/`) + this._tableCache.delete(name) } withMiddleware (middleware: HttpMiddleware): Connection { diff --git a/node/src/test/test.ts b/node/src/test/test.ts index af87ee41..d3c5d2aa 100644 --- a/node/src/test/test.ts +++ b/node/src/test/test.ts @@ -42,6 +42,7 @@ import { Float16, Int64 } from 'apache-arrow' +import type { RemoteRequest, RemoteResponse } from '../middleware' const expect = chai.expect const assert = chai.assert @@ -913,7 +914,22 @@ describe('Remote LanceDB client', function () { } // Search - const table = await con.openTable('vectors') + const table = await con.withMiddleware(new (class { + async onRemoteRequest(req: RemoteRequest, next: (req: RemoteRequest) => Promise) { + // intercept call to check if the table exists and make the call succeed + if (req.uri.endsWith('/describe/')) { + return { + status: 200, + statusText: 'OK', + headers: new Map(), + body: async () => ({}) + } + } + + return await next(req) + } + })()).openTable('vectors') + try { await table.search([0.1, 0.3]).execute() } catch (err) { diff --git a/node/src/util.ts b/node/src/util.ts index 242a4caf..a84fc29d 100644 --- a/node/src/util.ts +++ b/node/src/util.ts @@ -42,3 +42,36 @@ export function toSQL (value: Literal): string { // eslint-disable-next-line @typescript-eslint/restrict-template-expressions throw new Error(`Unsupported value type: ${typeof value} value: (${value})`) } + +export class TTLCache { + private readonly cache: Map + + /** + * @param ttl Time to live in milliseconds + */ + constructor (private readonly ttl: number) { + this.cache = new Map() + } + + get (key: string): any | undefined { + const entry = this.cache.get(key) + if (entry === undefined) { + return undefined + } + + if (entry.expires < Date.now()) { + this.cache.delete(key) + return undefined + } + + return entry.value + } + + set (key: string, value: any): void { + this.cache.set(key, { value, expires: Date.now() + this.ttl }) + } + + delete (key: string): void { + this.cache.delete(key) + } +} diff --git a/python/python/lancedb/remote/db.py b/python/python/lancedb/remote/db.py index 9dff65c5..c252fc5c 100644 --- a/python/python/lancedb/remote/db.py +++ b/python/python/lancedb/remote/db.py @@ -18,6 +18,7 @@ from concurrent.futures import ThreadPoolExecutor from typing import Iterable, List, Optional, Union from urllib.parse import urlparse +from cachetools import TTLCache import pyarrow as pa from overrides import override @@ -29,7 +30,6 @@ from ..table import Table, _sanitize_data from ..util import validate_table_name from .arrow import to_ipc_binary from .client import ARROW_STREAM_CONTENT_TYPE, RestfulLanceDBClient -from .errors import LanceDBClientError class RemoteDBConnection(DBConnection): @@ -60,6 +60,7 @@ class RemoteDBConnection(DBConnection): read_timeout=read_timeout, ) self._request_thread_pool = request_thread_pool + self._table_cache = TTLCache(maxsize=10000, ttl=300) def __repr__(self) -> str: return f"RemoteConnect(name={self.db_name})" @@ -89,6 +90,7 @@ class RemoteDBConnection(DBConnection): else: break for item in result: + self._table_cache[item] = True yield item @override @@ -109,16 +111,10 @@ class RemoteDBConnection(DBConnection): self._client.mount_retry_adapter_for_table(name) # check if table exists - try: + if self._table_cache.get(name) is None: self._client.post(f"/v1/table/{name}/describe/") - except LanceDBClientError as err: - if str(err).startswith("Not found"): - logging.error( - "Table %s does not exist. Please first call " - "db.create_table(%s, data).", - name, - name, - ) + self._table_cache[name] = True + return RemoteTable(self, name) @override @@ -267,6 +263,7 @@ class RemoteDBConnection(DBConnection): content_type=ARROW_STREAM_CONTENT_TYPE, ) + self._table_cache[name] = True return RemoteTable(self, name) @override @@ -282,6 +279,7 @@ class RemoteDBConnection(DBConnection): self._client.post( f"/v1/table/{name}/drop/", ) + self._table_cache.pop(name) async def close(self): """Close the connection to the database."""