mirror of
https://github.com/lancedb/lancedb.git
synced 2025-12-22 21:09:58 +00:00
BREAKING CHANGE: Check if remote table exists when opening (with caching) (#1214)
- make open table behaviour consistent: - remote tables will check if the table exists by calling /describe and throwing an error if the call doesn't succeed - this is similar to the behaviour for local tables where we will raise an exception when opening the table if the local dataset doesn't exist - The table names are cached in the client with a TTL - Also fixes a small bug where if the remote error response was deserialized from JSON as an object, we'd print it resulting in the unhelpful error message: `Error: Server Error, status: 404, message: Not Found: [object Object]`
This commit is contained in:
@@ -111,7 +111,11 @@ async function decodeErrorData(
|
||||
if (responseType === 'arraybuffer') {
|
||||
return new TextDecoder().decode(errorData)
|
||||
} else {
|
||||
return errorData
|
||||
if (typeof errorData === 'object') {
|
||||
return JSON.stringify(errorData)
|
||||
}
|
||||
|
||||
return errorData
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -38,7 +38,7 @@ import {
|
||||
fromRecordsToStreamBuffer,
|
||||
fromTableToStreamBuffer
|
||||
} from '../arrow'
|
||||
import { toSQL } from '../util'
|
||||
import { toSQL, TTLCache } from '../util'
|
||||
import { type HttpMiddleware } from '../middleware'
|
||||
|
||||
/**
|
||||
@@ -47,6 +47,7 @@ import { type HttpMiddleware } from '../middleware'
|
||||
export class RemoteConnection implements Connection {
|
||||
private _client: HttpLancedbClient
|
||||
private readonly _dbName: string
|
||||
private readonly _tableCache = new TTLCache(300_000)
|
||||
|
||||
constructor (opts: ConnectionOptions) {
|
||||
if (!opts.uri.startsWith('db://')) {
|
||||
@@ -89,6 +90,9 @@ export class RemoteConnection implements Connection {
|
||||
page_token: pageToken
|
||||
})
|
||||
const body = await response.body()
|
||||
for (const table of body.tables) {
|
||||
this._tableCache.set(table, true)
|
||||
}
|
||||
return body.tables
|
||||
}
|
||||
|
||||
@@ -101,6 +105,12 @@ export class RemoteConnection implements Connection {
|
||||
name: string,
|
||||
embeddings?: EmbeddingFunction<T>
|
||||
): Promise<Table<T>> {
|
||||
// check if the table exists
|
||||
if (this._tableCache.get(name) === undefined) {
|
||||
await this._client.post(`/v1/table/${encodeURIComponent(name)}/describe/`)
|
||||
this._tableCache.set(name, true)
|
||||
}
|
||||
|
||||
if (embeddings !== undefined) {
|
||||
return new RemoteTable(this._client, name, embeddings)
|
||||
} else {
|
||||
@@ -169,6 +179,7 @@ export class RemoteConnection implements Connection {
|
||||
)
|
||||
}
|
||||
|
||||
this._tableCache.set(tableName, true)
|
||||
if (embeddings === undefined) {
|
||||
return new RemoteTable(this._client, tableName)
|
||||
} else {
|
||||
@@ -178,6 +189,7 @@ export class RemoteConnection implements Connection {
|
||||
|
||||
async dropTable (name: string): Promise<void> {
|
||||
await this._client.post(`/v1/table/${encodeURIComponent(name)}/drop/`)
|
||||
this._tableCache.delete(name)
|
||||
}
|
||||
|
||||
withMiddleware (middleware: HttpMiddleware): Connection {
|
||||
|
||||
@@ -42,6 +42,7 @@ import {
|
||||
Float16,
|
||||
Int64
|
||||
} from 'apache-arrow'
|
||||
import type { RemoteRequest, RemoteResponse } from '../middleware'
|
||||
|
||||
const expect = chai.expect
|
||||
const assert = chai.assert
|
||||
@@ -913,7 +914,22 @@ describe('Remote LanceDB client', function () {
|
||||
}
|
||||
|
||||
// Search
|
||||
const table = await con.openTable('vectors')
|
||||
const table = await con.withMiddleware(new (class {
|
||||
async onRemoteRequest(req: RemoteRequest, next: (req: RemoteRequest) => Promise<RemoteResponse>) {
|
||||
// intercept call to check if the table exists and make the call succeed
|
||||
if (req.uri.endsWith('/describe/')) {
|
||||
return {
|
||||
status: 200,
|
||||
statusText: 'OK',
|
||||
headers: new Map(),
|
||||
body: async () => ({})
|
||||
}
|
||||
}
|
||||
|
||||
return await next(req)
|
||||
}
|
||||
})()).openTable('vectors')
|
||||
|
||||
try {
|
||||
await table.search([0.1, 0.3]).execute()
|
||||
} catch (err) {
|
||||
|
||||
@@ -42,3 +42,36 @@ export function toSQL (value: Literal): string {
|
||||
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
|
||||
throw new Error(`Unsupported value type: ${typeof value} value: (${value})`)
|
||||
}
|
||||
|
||||
export class TTLCache {
|
||||
private readonly cache: Map<string, { value: any, expires: number }>
|
||||
|
||||
/**
|
||||
* @param ttl Time to live in milliseconds
|
||||
*/
|
||||
constructor (private readonly ttl: number) {
|
||||
this.cache = new Map()
|
||||
}
|
||||
|
||||
get (key: string): any | undefined {
|
||||
const entry = this.cache.get(key)
|
||||
if (entry === undefined) {
|
||||
return undefined
|
||||
}
|
||||
|
||||
if (entry.expires < Date.now()) {
|
||||
this.cache.delete(key)
|
||||
return undefined
|
||||
}
|
||||
|
||||
return entry.value
|
||||
}
|
||||
|
||||
set (key: string, value: any): void {
|
||||
this.cache.set(key, { value, expires: Date.now() + this.ttl })
|
||||
}
|
||||
|
||||
delete (key: string): void {
|
||||
this.cache.delete(key)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ from concurrent.futures import ThreadPoolExecutor
|
||||
from typing import Iterable, List, Optional, Union
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from cachetools import TTLCache
|
||||
import pyarrow as pa
|
||||
from overrides import override
|
||||
|
||||
@@ -29,7 +30,6 @@ from ..table import Table, _sanitize_data
|
||||
from ..util import validate_table_name
|
||||
from .arrow import to_ipc_binary
|
||||
from .client import ARROW_STREAM_CONTENT_TYPE, RestfulLanceDBClient
|
||||
from .errors import LanceDBClientError
|
||||
|
||||
|
||||
class RemoteDBConnection(DBConnection):
|
||||
@@ -60,6 +60,7 @@ class RemoteDBConnection(DBConnection):
|
||||
read_timeout=read_timeout,
|
||||
)
|
||||
self._request_thread_pool = request_thread_pool
|
||||
self._table_cache = TTLCache(maxsize=10000, ttl=300)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"RemoteConnect(name={self.db_name})"
|
||||
@@ -89,6 +90,7 @@ class RemoteDBConnection(DBConnection):
|
||||
else:
|
||||
break
|
||||
for item in result:
|
||||
self._table_cache[item] = True
|
||||
yield item
|
||||
|
||||
@override
|
||||
@@ -109,16 +111,10 @@ class RemoteDBConnection(DBConnection):
|
||||
self._client.mount_retry_adapter_for_table(name)
|
||||
|
||||
# check if table exists
|
||||
try:
|
||||
if self._table_cache.get(name) is None:
|
||||
self._client.post(f"/v1/table/{name}/describe/")
|
||||
except LanceDBClientError as err:
|
||||
if str(err).startswith("Not found"):
|
||||
logging.error(
|
||||
"Table %s does not exist. Please first call "
|
||||
"db.create_table(%s, data).",
|
||||
name,
|
||||
name,
|
||||
)
|
||||
self._table_cache[name] = True
|
||||
|
||||
return RemoteTable(self, name)
|
||||
|
||||
@override
|
||||
@@ -267,6 +263,7 @@ class RemoteDBConnection(DBConnection):
|
||||
content_type=ARROW_STREAM_CONTENT_TYPE,
|
||||
)
|
||||
|
||||
self._table_cache[name] = True
|
||||
return RemoteTable(self, name)
|
||||
|
||||
@override
|
||||
@@ -282,6 +279,7 @@ class RemoteDBConnection(DBConnection):
|
||||
self._client.post(
|
||||
f"/v1/table/{name}/drop/",
|
||||
)
|
||||
self._table_cache.pop(name)
|
||||
|
||||
async def close(self):
|
||||
"""Close the connection to the database."""
|
||||
|
||||
Reference in New Issue
Block a user