diff --git a/nodejs/lancedb/connection.ts b/nodejs/lancedb/connection.ts index 373fb670..8b3aefdd 100644 --- a/nodejs/lancedb/connection.ts +++ b/nodejs/lancedb/connection.ts @@ -159,17 +159,33 @@ export abstract class Connection { * * Tables will be returned in lexicographical order. * @param {Partial} options - options to control the - * paging / start point + * paging / start point (backwards compatibility) * */ abstract tableNames(options?: Partial): Promise; + /** + * List all the table names in this database. + * + * Tables will be returned in lexicographical order. + * @param {string[]} namespace - The namespace to list tables from (defaults to root namespace) + * @param {Partial} options - options to control the + * paging / start point + * + */ + abstract tableNames( + namespace?: string[], + options?: Partial, + ): Promise; /** * Open a table in the database. * @param {string} name - The name of the table + * @param {string[]} namespace - The namespace of the table (defaults to root namespace) + * @param {Partial} options - Additional options */ abstract openTable( name: string, + namespace?: string[], options?: Partial, ): Promise; @@ -178,6 +194,7 @@ export abstract class Connection { * @param {object} options - The options object. * @param {string} options.name - The name of the table. * @param {Data} options.data - Non-empty Array of Records to be inserted into the table + * @param {string[]} namespace - The namespace to create the table in (defaults to root namespace) * */ abstract createTable( @@ -185,40 +202,72 @@ export abstract class Connection { name: string; data: Data; } & Partial, + namespace?: string[], ): Promise
; /** * Creates a new Table and initialize it with new data. * @param {string} name - The name of the table. * @param {Record[] | TableLike} data - Non-empty Array of Records * to be inserted into the table + * @param {Partial} options - Additional options (backwards compatibility) */ abstract createTable( name: string, data: Record[] | TableLike, options?: Partial, ): Promise
; + /** + * Creates a new Table and initialize it with new data. + * @param {string} name - The name of the table. + * @param {Record[] | TableLike} data - Non-empty Array of Records + * to be inserted into the table + * @param {string[]} namespace - The namespace to create the table in (defaults to root namespace) + * @param {Partial} options - Additional options + */ + abstract createTable( + name: string, + data: Record[] | TableLike, + namespace?: string[], + options?: Partial, + ): Promise
; /** * Creates a new empty Table * @param {string} name - The name of the table. * @param {Schema} schema - The schema of the table + * @param {Partial} options - Additional options (backwards compatibility) */ abstract createEmptyTable( name: string, schema: import("./arrow").SchemaLike, options?: Partial, ): Promise
; + /** + * Creates a new empty Table + * @param {string} name - The name of the table. + * @param {Schema} schema - The schema of the table + * @param {string[]} namespace - The namespace to create the table in (defaults to root namespace) + * @param {Partial} options - Additional options + */ + abstract createEmptyTable( + name: string, + schema: import("./arrow").SchemaLike, + namespace?: string[], + options?: Partial, + ): Promise
; /** * Drop an existing table. * @param {string} name The name of the table to drop. + * @param {string[]} namespace The namespace of the table (defaults to root namespace). */ - abstract dropTable(name: string): Promise; + abstract dropTable(name: string, namespace?: string[]): Promise; /** * Drop all tables in the database. + * @param {string[]} namespace The namespace to drop tables from (defaults to root namespace). */ - abstract dropAllTables(): Promise; + abstract dropAllTables(namespace?: string[]): Promise; } /** @hideconstructor */ @@ -243,16 +292,39 @@ export class LocalConnection extends Connection { return this.inner.display(); } - async tableNames(options?: Partial): Promise { - return this.inner.tableNames(options?.startAfter, options?.limit); + async tableNames( + namespaceOrOptions?: string[] | Partial, + options?: Partial, + ): Promise { + // Detect if first argument is namespace array or options object + let namespace: string[] | undefined; + let tableNamesOptions: Partial | undefined; + + if (Array.isArray(namespaceOrOptions)) { + // First argument is namespace array + namespace = namespaceOrOptions; + tableNamesOptions = options; + } else { + // First argument is options object (backwards compatibility) + namespace = undefined; + tableNamesOptions = namespaceOrOptions; + } + + return this.inner.tableNames( + namespace ?? [], + tableNamesOptions?.startAfter, + tableNamesOptions?.limit, + ); } async openTable( name: string, + namespace?: string[], options?: Partial, ): Promise
{ const innerTable = await this.inner.openTable( name, + namespace ?? [], cleanseStorageOptions(options?.storageOptions), options?.indexCacheSize, ); @@ -286,14 +358,44 @@ export class LocalConnection extends Connection { nameOrOptions: | string | ({ name: string; data: Data } & Partial), - data?: Record[] | TableLike, + dataOrNamespace?: Record[] | TableLike | string[], + namespaceOrOptions?: string[] | Partial, options?: Partial, ): Promise
{ if (typeof nameOrOptions !== "string" && "name" in nameOrOptions) { - const { name, data, ...options } = nameOrOptions; - - return this.createTable(name, data, options); + // First overload: createTable(options, namespace?) + const { name, data, ...createOptions } = nameOrOptions; + const namespace = dataOrNamespace as string[] | undefined; + return this._createTableImpl(name, data, namespace, createOptions); } + + // Second overload: createTable(name, data, namespace?, options?) + const name = nameOrOptions; + const data = dataOrNamespace as Record[] | TableLike; + + // Detect if third argument is namespace array or options object + let namespace: string[] | undefined; + let createOptions: Partial | undefined; + + if (Array.isArray(namespaceOrOptions)) { + // Third argument is namespace array + namespace = namespaceOrOptions; + createOptions = options; + } else { + // Third argument is options object (backwards compatibility) + namespace = undefined; + createOptions = namespaceOrOptions; + } + + return this._createTableImpl(name, data, namespace, createOptions); + } + + private async _createTableImpl( + name: string, + data: Data, + namespace?: string[], + options?: Partial, + ): Promise
{ if (data === undefined) { throw new Error("data is required"); } @@ -302,9 +404,10 @@ export class LocalConnection extends Connection { const storageOptions = this.getStorageOptions(options); const innerTable = await this.inner.createTable( - nameOrOptions, + name, buf, mode, + namespace ?? [], storageOptions, ); @@ -314,39 +417,55 @@ export class LocalConnection extends Connection { async createEmptyTable( name: string, schema: import("./arrow").SchemaLike, + namespaceOrOptions?: string[] | Partial, options?: Partial, ): Promise
{ - let mode: string = options?.mode ?? "create"; - const existOk = options?.existOk ?? false; + // Detect if third argument is namespace array or options object + let namespace: string[] | undefined; + let createOptions: Partial | undefined; + + if (Array.isArray(namespaceOrOptions)) { + // Third argument is namespace array + namespace = namespaceOrOptions; + createOptions = options; + } else { + // Third argument is options object (backwards compatibility) + namespace = undefined; + createOptions = namespaceOrOptions; + } + + let mode: string = createOptions?.mode ?? "create"; + const existOk = createOptions?.existOk ?? false; if (mode === "create" && existOk) { mode = "exist_ok"; } let metadata: Map | undefined = undefined; - if (options?.embeddingFunction !== undefined) { - const embeddingFunction = options.embeddingFunction; + if (createOptions?.embeddingFunction !== undefined) { + const embeddingFunction = createOptions.embeddingFunction; const registry = getRegistry(); metadata = registry.getTableMetadata([embeddingFunction]); } - const storageOptions = this.getStorageOptions(options); + const storageOptions = this.getStorageOptions(createOptions); const table = makeEmptyTable(schema, metadata); const buf = await fromTableToBuffer(table); const innerTable = await this.inner.createEmptyTable( name, buf, mode, + namespace ?? [], storageOptions, ); return new LocalTable(innerTable); } - async dropTable(name: string): Promise { - return this.inner.dropTable(name); + async dropTable(name: string, namespace?: string[]): Promise { + return this.inner.dropTable(name, namespace ?? []); } - async dropAllTables(): Promise { - return this.inner.dropAllTables(); + async dropAllTables(namespace?: string[]): Promise { + return this.inner.dropAllTables(namespace ?? []); } } diff --git a/nodejs/src/connection.rs b/nodejs/src/connection.rs index d6313e4a..a907c18f 100644 --- a/nodejs/src/connection.rs +++ b/nodejs/src/connection.rs @@ -100,10 +100,12 @@ impl Connection { #[napi(catch_unwind)] pub async fn table_names( &self, + namespace: Vec, start_after: Option, limit: Option, ) -> napi::Result> { let mut op = self.get_inner()?.table_names(); + op = op.namespace(namespace); if let Some(start_after) = start_after { op = op.start_after(start_after); } @@ -125,6 +127,7 @@ impl Connection { name: String, buf: Buffer, mode: String, + namespace: Vec, storage_options: Option>, ) -> napi::Result
{ let batches = ipc_file_to_batches(buf.to_vec()) @@ -132,6 +135,8 @@ impl Connection { let mode = Self::parse_create_mode_str(&mode)?; let mut builder = self.get_inner()?.create_table(&name, batches).mode(mode); + builder = builder.namespace(namespace); + if let Some(storage_options) = storage_options { for (key, value) in storage_options { builder = builder.storage_option(key, value); @@ -147,6 +152,7 @@ impl Connection { name: String, schema_buf: Buffer, mode: String, + namespace: Vec, storage_options: Option>, ) -> napi::Result
{ let schema = ipc_file_to_schema(schema_buf.to_vec()).map_err(|e| { @@ -157,6 +163,9 @@ impl Connection { .get_inner()? .create_empty_table(&name, schema) .mode(mode); + + builder = builder.namespace(namespace); + if let Some(storage_options) = storage_options { for (key, value) in storage_options { builder = builder.storage_option(key, value); @@ -170,10 +179,14 @@ impl Connection { pub async fn open_table( &self, name: String, + namespace: Vec, storage_options: Option>, index_cache_size: Option, ) -> napi::Result
{ let mut builder = self.get_inner()?.open_table(&name); + + builder = builder.namespace(namespace); + if let Some(storage_options) = storage_options { for (key, value) in storage_options { builder = builder.storage_option(key, value); @@ -188,12 +201,18 @@ impl Connection { /// Drop table with the name. Or raise an error if the table does not exist. #[napi(catch_unwind)] - pub async fn drop_table(&self, name: String) -> napi::Result<()> { - self.get_inner()?.drop_table(&name).await.default_error() + pub async fn drop_table(&self, name: String, namespace: Vec) -> napi::Result<()> { + self.get_inner()? + .drop_table(&name, &namespace) + .await + .default_error() } #[napi(catch_unwind)] - pub async fn drop_all_tables(&self) -> napi::Result<()> { - self.get_inner()?.drop_all_tables().await.default_error() + pub async fn drop_all_tables(&self, namespace: Vec) -> napi::Result<()> { + self.get_inner()? + .drop_all_tables(&namespace) + .await + .default_error() } } diff --git a/nodejs/src/remote.rs b/nodejs/src/remote.rs index 0b4845e5..818734e7 100644 --- a/nodejs/src/remote.rs +++ b/nodejs/src/remote.rs @@ -76,6 +76,7 @@ pub struct ClientConfig { pub retry_config: Option, pub timeout_config: Option, pub extra_headers: Option>, + pub id_delimiter: Option, } impl From for lancedb::remote::TimeoutConfig { @@ -115,6 +116,7 @@ impl From for lancedb::remote::ClientConfig { retry_config: config.retry_config.map(Into::into).unwrap_or_default(), timeout_config: config.timeout_config.map(Into::into).unwrap_or_default(), extra_headers: config.extra_headers.unwrap_or_default(), + id_delimiter: config.id_delimiter, } } } diff --git a/python/python/lancedb/_lancedb.pyi b/python/python/lancedb/_lancedb.pyi index 1ab9fe35..b6b86af1 100644 --- a/python/python/lancedb/_lancedb.pyi +++ b/python/python/lancedb/_lancedb.pyi @@ -21,14 +21,28 @@ class Session: class Connection(object): uri: str + async def is_open(self): ... + async def close(self): ... + async def list_namespaces( + self, + namespace: List[str], + page_token: Optional[str], + limit: Optional[int], + ) -> List[str]: ... + async def create_namespace(self, namespace: List[str]) -> None: ... + async def drop_namespace(self, namespace: List[str]) -> None: ... async def table_names( - self, start_after: Optional[str], limit: Optional[int] + self, + namespace: List[str], + start_after: Optional[str], + limit: Optional[int], ) -> list[str]: ... async def create_table( self, name: str, mode: str, data: pa.RecordBatchReader, + namespace: List[str] = [], storage_options: Optional[Dict[str, str]] = None, ) -> Table: ... async def create_empty_table( @@ -36,10 +50,25 @@ class Connection(object): name: str, mode: str, schema: pa.Schema, + namespace: List[str] = [], storage_options: Optional[Dict[str, str]] = None, ) -> Table: ... - async def rename_table(self, old_name: str, new_name: str) -> None: ... - async def drop_table(self, name: str) -> None: ... + async def open_table( + self, + name: str, + namespace: List[str] = [], + storage_options: Optional[Dict[str, str]] = None, + index_cache_size: Optional[int] = None, + ) -> Table: ... + async def rename_table( + self, + cur_name: str, + new_name: str, + cur_namespace: List[str] = [], + new_namespace: List[str] = [], + ) -> None: ... + async def drop_table(self, name: str, namespace: List[str] = []) -> None: ... + async def drop_all_tables(self, namespace: List[str] = []) -> None: ... class Table: def name(self) -> str: ... diff --git a/python/python/lancedb/db.py b/python/python/lancedb/db.py index fedde25e..07d95b18 100644 --- a/python/python/lancedb/db.py +++ b/python/python/lancedb/db.py @@ -43,14 +43,69 @@ if TYPE_CHECKING: class DBConnection(EnforceOverrides): """An active LanceDB connection interface.""" + def list_namespaces( + self, + namespace: List[str] = [], + page_token: Optional[str] = None, + limit: int = 10, + ) -> Iterable[str]: + """List immediate child namespace names in the given namespace. + + Parameters + ---------- + namespace: List[str], default [] + The parent namespace to list namespaces in. + Empty list represents root namespace. + page_token: str, optional + The token to use for pagination. If not present, start from the beginning. + limit: int, default 10 + The size of the page to return. + + Returns + ------- + Iterable of str + List of immediate child namespace names + """ + return [] + + def create_namespace(self, namespace: List[str]) -> None: + """Create a new namespace. + + Parameters + ---------- + namespace: List[str] + The namespace identifier to create. + """ + raise NotImplementedError( + "Namespace operations are not supported for this connection type" + ) + + def drop_namespace(self, namespace: List[str]) -> None: + """Drop a namespace. + + Parameters + ---------- + namespace: List[str] + The namespace identifier to drop. + """ + raise NotImplementedError( + "Namespace operations are not supported for this connection type" + ) + @abstractmethod def table_names( - self, page_token: Optional[str] = None, limit: int = 10 + self, + namespace: List[str] = [], + page_token: Optional[str] = None, + limit: int = 10, ) -> Iterable[str]: """List all tables in this database, in sorted order Parameters ---------- + namespace: List[str], default [] + The namespace to list tables in. + Empty list represents root namespace. page_token: str, optional The token to use for pagination. If not present, start from the beginning. Typically, this token is last table name from the previous page. @@ -77,6 +132,7 @@ class DBConnection(EnforceOverrides): fill_value: float = 0.0, embedding_functions: Optional[List[EmbeddingFunctionConfig]] = None, *, + namespace: List[str] = [], storage_options: Optional[Dict[str, str]] = None, data_storage_version: Optional[str] = None, enable_v2_manifest_paths: Optional[bool] = None, @@ -87,6 +143,9 @@ class DBConnection(EnforceOverrides): ---------- name: str The name of the table. + namespace: List[str], default [] + The namespace to create the table in. + Empty list represents root namespace. data: The data to initialize the table, *optional* User must provide at least one of `data` or `schema`. Acceptable types are: @@ -238,6 +297,7 @@ class DBConnection(EnforceOverrides): self, name: str, *, + namespace: List[str] = [], storage_options: Optional[Dict[str, str]] = None, index_cache_size: Optional[int] = None, ) -> Table: @@ -247,6 +307,9 @@ class DBConnection(EnforceOverrides): ---------- name: str The name of the table. + namespace: List[str], optional + The namespace to open the table from. + None or empty list represents root namespace. index_cache_size: int, default 256 **Deprecated**: Use session-level cache configuration instead. Create a Session with custom cache sizes and pass it to lancedb.connect(). @@ -272,17 +335,26 @@ class DBConnection(EnforceOverrides): """ raise NotImplementedError - def drop_table(self, name: str): + def drop_table(self, name: str, namespace: List[str] = []): """Drop a table from the database. Parameters ---------- name: str The name of the table. + namespace: List[str], default [] + The namespace to drop the table from. + Empty list represents root namespace. """ raise NotImplementedError - def rename_table(self, cur_name: str, new_name: str): + def rename_table( + self, + cur_name: str, + new_name: str, + cur_namespace: List[str] = [], + new_namespace: List[str] = [], + ): """Rename a table in the database. Parameters @@ -291,6 +363,12 @@ class DBConnection(EnforceOverrides): The current name of the table. new_name: str The new name of the table. + cur_namespace: List[str], optional + The namespace of the current table. + None or empty list represents root namespace. + new_namespace: List[str], optional + The namespace to move the table to. + If not specified, defaults to the same as cur_namespace. """ raise NotImplementedError @@ -301,9 +379,15 @@ class DBConnection(EnforceOverrides): """ raise NotImplementedError - def drop_all_tables(self): + def drop_all_tables(self, namespace: List[str] = []): """ Drop all tables from the database + + Parameters + ---------- + namespace: List[str], optional + The namespace to drop all tables from. + None or empty list represents root namespace. """ raise NotImplementedError @@ -404,18 +488,86 @@ class LanceDBConnection(DBConnection): conn = AsyncConnection(await lancedb_connect(self.uri)) return await conn.table_names(start_after=start_after, limit=limit) + @override + def list_namespaces( + self, + namespace: List[str] = [], + page_token: Optional[str] = None, + limit: int = 10, + ) -> Iterable[str]: + """List immediate child namespace names in the given namespace. + + Parameters + ---------- + namespace: List[str], optional + The parent namespace to list namespaces in. + None or empty list represents root namespace. + page_token: str, optional + The token to use for pagination. If not present, start from the beginning. + limit: int, default 10 + The size of the page to return. + + Returns + ------- + Iterable of str + List of immediate child namespace names + """ + return LOOP.run( + self._conn.list_namespaces( + namespace=namespace, page_token=page_token, limit=limit + ) + ) + + @override + def create_namespace(self, namespace: List[str]) -> None: + """Create a new namespace. + + Parameters + ---------- + namespace: List[str] + The namespace identifier to create. + """ + LOOP.run(self._conn.create_namespace(namespace=namespace)) + + @override + def drop_namespace(self, namespace: List[str]) -> None: + """Drop a namespace. + + Parameters + ---------- + namespace: List[str] + The namespace identifier to drop. + """ + return LOOP.run(self._conn.drop_namespace(namespace=namespace)) + @override def table_names( - self, page_token: Optional[str] = None, limit: int = 10 + self, + namespace: List[str] = [], + page_token: Optional[str] = None, + limit: int = 10, ) -> Iterable[str]: """Get the names of all tables in the database. The names are sorted. + Parameters + ---------- + namespace: List[str], optional + The namespace to list tables in. + page_token: str, optional + The token to use for pagination. + limit: int, default 10 + The maximum number of tables to return. + Returns ------- Iterator of str. A list of table names. """ - return LOOP.run(self._conn.table_names(start_after=page_token, limit=limit)) + return LOOP.run( + self._conn.table_names( + namespace=namespace, start_after=page_token, limit=limit + ) + ) def __len__(self) -> int: return len(self.table_names()) @@ -435,12 +587,18 @@ class LanceDBConnection(DBConnection): fill_value: float = 0.0, embedding_functions: Optional[List[EmbeddingFunctionConfig]] = None, *, + namespace: List[str] = [], storage_options: Optional[Dict[str, str]] = None, data_storage_version: Optional[str] = None, enable_v2_manifest_paths: Optional[bool] = None, ) -> LanceTable: """Create a table in the database. + Parameters + ---------- + namespace: List[str], optional + The namespace to create the table in. + See --- DBConnection.create_table @@ -459,6 +617,7 @@ class LanceDBConnection(DBConnection): on_bad_vectors=on_bad_vectors, fill_value=fill_value, embedding_functions=embedding_functions, + namespace=namespace, storage_options=storage_options, ) return tbl @@ -468,6 +627,7 @@ class LanceDBConnection(DBConnection): self, name: str, *, + namespace: List[str] = [], storage_options: Optional[Dict[str, str]] = None, index_cache_size: Optional[int] = None, ) -> LanceTable: @@ -477,6 +637,8 @@ class LanceDBConnection(DBConnection): ---------- name: str The name of the table. + namespace: List[str], optional + The namespace to open the table from. Returns ------- @@ -496,26 +658,68 @@ class LanceDBConnection(DBConnection): return LanceTable.open( self, name, + namespace=namespace, storage_options=storage_options, index_cache_size=index_cache_size, ) @override - def drop_table(self, name: str, ignore_missing: bool = False): + def drop_table( + self, + name: str, + namespace: List[str] = [], + ignore_missing: bool = False, + ): """Drop a table from the database. Parameters ---------- name: str The name of the table. + namespace: List[str], optional + The namespace to drop the table from. ignore_missing: bool, default False If True, ignore if the table does not exist. """ - LOOP.run(self._conn.drop_table(name, ignore_missing=ignore_missing)) + LOOP.run( + self._conn.drop_table( + name, namespace=namespace, ignore_missing=ignore_missing + ) + ) @override - def drop_all_tables(self): - LOOP.run(self._conn.drop_all_tables()) + def drop_all_tables(self, namespace: List[str] = []): + LOOP.run(self._conn.drop_all_tables(namespace=namespace)) + + @override + def rename_table( + self, + cur_name: str, + new_name: str, + cur_namespace: List[str] = [], + new_namespace: List[str] = [], + ): + """Rename a table in the database. + + Parameters + ---------- + cur_name: str + The current name of the table. + new_name: str + The new name of the table. + cur_namespace: List[str], optional + The namespace of the current table. + new_namespace: List[str], optional + The namespace to move the table to. + """ + LOOP.run( + self._conn.rename_table( + cur_name, + new_name, + cur_namespace=cur_namespace, + new_namespace=new_namespace, + ) + ) @deprecation.deprecated( deprecated_in="0.15.1", @@ -588,13 +792,67 @@ class AsyncConnection(object): def uri(self) -> str: return self._inner.uri + async def list_namespaces( + self, + namespace: List[str] = [], + page_token: Optional[str] = None, + limit: int = 10, + ) -> Iterable[str]: + """List immediate child namespace names in the given namespace. + + Parameters + ---------- + namespace: List[str], optional + The parent namespace to list namespaces in. + None or empty list represents root namespace. + page_token: str, optional + The token to use for pagination. If not present, start from the beginning. + limit: int, default 10 + The size of the page to return. + + Returns + ------- + Iterable of str + List of immediate child namespace names (not full paths) + """ + return await self._inner.list_namespaces( + namespace=namespace, page_token=page_token, limit=limit + ) + + async def create_namespace(self, namespace: List[str]) -> None: + """Create a new namespace. + + Parameters + ---------- + namespace: List[str] + The namespace identifier to create. + """ + await self._inner.create_namespace(namespace) + + async def drop_namespace(self, namespace: List[str]) -> None: + """Drop a namespace. + + Parameters + ---------- + namespace: List[str] + The namespace identifier to drop. + """ + await self._inner.drop_namespace(namespace) + async def table_names( - self, *, start_after: Optional[str] = None, limit: Optional[int] = None + self, + *, + namespace: List[str] = [], + start_after: Optional[str] = None, + limit: Optional[int] = None, ) -> Iterable[str]: """List all tables in this database, in sorted order Parameters ---------- + namespace: List[str], optional + The namespace to list tables in. + None or empty list represents root namespace. start_after: str, optional If present, only return names that come lexicographically after the supplied value. @@ -608,7 +866,9 @@ class AsyncConnection(object): ------- Iterable of str """ - return await self._inner.table_names(start_after=start_after, limit=limit) + return await self._inner.table_names( + namespace=namespace, start_after=start_after, limit=limit + ) async def create_table( self, @@ -621,6 +881,7 @@ class AsyncConnection(object): fill_value: Optional[float] = None, storage_options: Optional[Dict[str, str]] = None, *, + namespace: List[str] = [], embedding_functions: Optional[List[EmbeddingFunctionConfig]] = None, ) -> AsyncTable: """Create an [AsyncTable][lancedb.table.AsyncTable] in the database. @@ -629,6 +890,9 @@ class AsyncConnection(object): ---------- name: str The name of the table. + namespace: List[str], default [] + The namespace to create the table in. + Empty list represents root namespace. data: The data to initialize the table, *optional* User must provide at least one of `data` or `schema`. Acceptable types are: @@ -807,6 +1071,7 @@ class AsyncConnection(object): name, mode, schema, + namespace=namespace, storage_options=storage_options, ) else: @@ -815,6 +1080,7 @@ class AsyncConnection(object): name, mode, data, + namespace=namespace, storage_options=storage_options, ) @@ -823,6 +1089,8 @@ class AsyncConnection(object): async def open_table( self, name: str, + *, + namespace: List[str] = [], storage_options: Optional[Dict[str, str]] = None, index_cache_size: Optional[int] = None, ) -> AsyncTable: @@ -832,6 +1100,9 @@ class AsyncConnection(object): ---------- name: str The name of the table. + namespace: List[str], optional + The namespace to open the table from. + None or empty list represents root namespace. storage_options: dict, optional Additional options for the storage backend. Options already set on the connection will be inherited by the table, but can be overridden here. @@ -855,42 +1126,77 @@ class AsyncConnection(object): ------- A LanceTable object representing the table. """ - table = await self._inner.open_table(name, storage_options, index_cache_size) + table = await self._inner.open_table( + name, + namespace=namespace, + storage_options=storage_options, + index_cache_size=index_cache_size, + ) return AsyncTable(table) - async def rename_table(self, old_name: str, new_name: str): + async def rename_table( + self, + cur_name: str, + new_name: str, + cur_namespace: List[str] = [], + new_namespace: List[str] = [], + ): """Rename a table in the database. Parameters ---------- - old_name: str + cur_name: str The current name of the table. new_name: str The new name of the table. + cur_namespace: List[str], optional + The namespace of the current table. + None or empty list represents root namespace. + new_namespace: List[str], optional + The namespace to move the table to. + If not specified, defaults to the same as cur_namespace. """ - await self._inner.rename_table(old_name, new_name) + await self._inner.rename_table( + cur_name, new_name, cur_namespace=cur_namespace, new_namespace=new_namespace + ) - async def drop_table(self, name: str, *, ignore_missing: bool = False): + async def drop_table( + self, + name: str, + *, + namespace: List[str] = [], + ignore_missing: bool = False, + ): """Drop a table from the database. Parameters ---------- name: str The name of the table. + namespace: List[str], default [] + The namespace to drop the table from. + Empty list represents root namespace. ignore_missing: bool, default False If True, ignore if the table does not exist. """ try: - await self._inner.drop_table(name) + await self._inner.drop_table(name, namespace=namespace) except ValueError as e: if not ignore_missing: raise e if f"Table '{name}' was not found" not in str(e): raise e - async def drop_all_tables(self): - """Drop all tables from the database.""" - await self._inner.drop_all_tables() + async def drop_all_tables(self, namespace: List[str] = []): + """Drop all tables from the database. + + Parameters + ---------- + namespace: List[str], optional + The namespace to drop all tables from. + None or empty list represents root namespace. + """ + await self._inner.drop_all_tables(namespace=namespace) @deprecation.deprecated( deprecated_in="0.15.1", diff --git a/python/python/lancedb/namespace.py b/python/python/lancedb/namespace.py index ad0bf0c2..b18fdb23 100644 --- a/python/python/lancedb/namespace.py +++ b/python/python/lancedb/namespace.py @@ -26,6 +26,9 @@ from lance_namespace_urllib3_client.models import ( DescribeTableRequest, CreateTableRequest, DropTableRequest, + ListNamespacesRequest, + CreateNamespaceRequest, + DropNamespaceRequest, JsonArrowSchema, JsonArrowField, JsonArrowDataType, @@ -134,10 +137,12 @@ class LanceNamespaceDBConnection(DBConnection): @override def table_names( - self, page_token: Optional[str] = None, limit: int = 10 + self, + namespace: List[str] = [], + page_token: Optional[str] = None, + limit: int = 10, ) -> Iterable[str]: - # Use namespace to list tables - request = ListTablesRequest(id=None, page_token=page_token, limit=limit) + request = ListTablesRequest(id=namespace, page_token=page_token, limit=limit) response = self._ns.list_tables(request) return response.tables if response.tables else [] @@ -153,6 +158,7 @@ class LanceNamespaceDBConnection(DBConnection): fill_value: float = 0.0, embedding_functions: Optional[List[EmbeddingFunctionConfig]] = None, *, + namespace: List[str] = [], storage_options: Optional[Dict[str, str]] = None, data_storage_version: Optional[str] = None, enable_v2_manifest_paths: Optional[bool] = None, @@ -183,8 +189,9 @@ class LanceNamespaceDBConnection(DBConnection): # Convert PyArrow schema to JsonArrowSchema json_schema = _convert_pyarrow_schema_to_json(schema) - # Create table request - request = CreateTableRequest(id=[name], var_schema=json_schema) + # Create table request with namespace + table_id = (namespace or []) + [name] + request = CreateTableRequest(id=table_id, var_schema=json_schema) # Create empty Arrow IPC stream bytes import pyarrow.ipc as ipc @@ -199,17 +206,21 @@ class LanceNamespaceDBConnection(DBConnection): request_data = buffer.getvalue() self._ns.create_table(request, request_data) - return self.open_table(name, storage_options=storage_options) + return self.open_table( + name, namespace=namespace, storage_options=storage_options + ) @override def open_table( self, name: str, *, + namespace: List[str] = [], storage_options: Optional[Dict[str, str]] = None, index_cache_size: Optional[int] = None, ) -> Table: - request = DescribeTableRequest(id=[name]) + table_id = (namespace or []) + [name] + request = DescribeTableRequest(id=table_id) response = self._ns.describe_table(request) merged_storage_options = dict() @@ -225,13 +236,20 @@ class LanceNamespaceDBConnection(DBConnection): ) @override - def drop_table(self, name: str): + def drop_table(self, name: str, namespace: Optional[List[str]] = None): # Use namespace drop_table directly - request = DropTableRequest(id=[name]) + table_id = (namespace or []) + [name] + request = DropTableRequest(id=table_id) self._ns.drop_table(request) @override - def rename_table(self, cur_name: str, new_name: str): + def rename_table( + self, + cur_name: str, + new_name: str, + cur_namespace: Optional[List[str]] = None, + new_namespace: Optional[List[str]] = None, + ): raise NotImplementedError( "rename_table is not supported for namespace connections" ) @@ -243,9 +261,66 @@ class LanceNamespaceDBConnection(DBConnection): ) @override - def drop_all_tables(self): - for table_name in self.table_names(): - self.drop_table(table_name) + def drop_all_tables(self, namespace: Optional[List[str]] = None): + for table_name in self.table_names(namespace=namespace): + self.drop_table(table_name, namespace=namespace) + + @override + def list_namespaces( + self, + namespace: List[str] = [], + page_token: Optional[str] = None, + limit: int = 10, + ) -> Iterable[str]: + """ + List child namespaces under the given namespace. + + Parameters + ---------- + namespace : Optional[List[str]] + The parent namespace to list children from. + If None, lists root-level namespaces. + page_token : Optional[str] + Pagination token for listing results. + limit : int + Maximum number of namespaces to return. + + Returns + ------- + Iterable[str] + Names of child namespaces. + """ + request = ListNamespacesRequest( + id=namespace, page_token=page_token, limit=limit + ) + response = self._ns.list_namespaces(request) + return response.namespaces if response.namespaces else [] + + @override + def create_namespace(self, namespace: List[str]) -> None: + """ + Create a new namespace. + + Parameters + ---------- + namespace : List[str] + The namespace path to create. + """ + request = CreateNamespaceRequest(id=namespace) + self._ns.create_namespace(request) + + @override + def drop_namespace(self, namespace: List[str]) -> None: + """ + Drop a namespace. + + Parameters + ---------- + namespace : List[str] + The namespace path to drop. + """ + request = DropNamespaceRequest(id=namespace) + self._ns.drop_namespace(request) def _lance_table_from_uri( self, diff --git a/python/python/lancedb/remote/__init__.py b/python/python/lancedb/remote/__init__.py index 896efbd7..754febf0 100644 --- a/python/python/lancedb/remote/__init__.py +++ b/python/python/lancedb/remote/__init__.py @@ -118,6 +118,7 @@ class ClientConfig: retry_config: RetryConfig = field(default_factory=RetryConfig) timeout_config: Optional[TimeoutConfig] = field(default_factory=TimeoutConfig) extra_headers: Optional[dict] = None + id_delimiter: Optional[str] = None def __post_init__(self): if isinstance(self.retry_config, dict): diff --git a/python/python/lancedb/remote/db.py b/python/python/lancedb/remote/db.py index 44d614e5..6ca110b2 100644 --- a/python/python/lancedb/remote/db.py +++ b/python/python/lancedb/remote/db.py @@ -96,14 +96,72 @@ class RemoteDBConnection(DBConnection): def __repr__(self) -> str: return f"RemoteConnect(name={self.db_name})" + @override + def list_namespaces( + self, + namespace: List[str] = [], + page_token: Optional[str] = None, + limit: int = 10, + ) -> Iterable[str]: + """List immediate child namespace names in the given namespace. + + Parameters + ---------- + namespace: List[str], optional + The parent namespace to list namespaces in. + None or empty list represents root namespace. + page_token: str, optional + The token to use for pagination. If not present, start from the beginning. + limit: int, default 10 + The size of the page to return. + + Returns + ------- + Iterable of str + List of immediate child namespace names + """ + return LOOP.run( + self._conn.list_namespaces( + namespace=namespace, page_token=page_token, limit=limit + ) + ) + + @override + def create_namespace(self, namespace: List[str]) -> None: + """Create a new namespace. + + Parameters + ---------- + namespace: List[str] + The namespace identifier to create. + """ + LOOP.run(self._conn.create_namespace(namespace=namespace)) + + @override + def drop_namespace(self, namespace: List[str]) -> None: + """Drop a namespace. + + Parameters + ---------- + namespace: List[str] + The namespace identifier to drop. + """ + return LOOP.run(self._conn.drop_namespace(namespace=namespace)) + @override def table_names( - self, page_token: Optional[str] = None, limit: int = 10 + self, + namespace: List[str] = [], + page_token: Optional[str] = None, + limit: int = 10, ) -> Iterable[str]: """List the names of all tables in the database. Parameters ---------- + namespace: List[str], default [] + The namespace to list tables in. + Empty list represents root namespace. page_token: str The last token to start the new page. limit: int, default 10 @@ -113,13 +171,18 @@ class RemoteDBConnection(DBConnection): ------- An iterator of table names. """ - return LOOP.run(self._conn.table_names(start_after=page_token, limit=limit)) + return LOOP.run( + self._conn.table_names( + namespace=namespace, start_after=page_token, limit=limit + ) + ) @override def open_table( self, name: str, *, + namespace: List[str] = [], storage_options: Optional[Dict[str, str]] = None, index_cache_size: Optional[int] = None, ) -> Table: @@ -129,6 +192,9 @@ class RemoteDBConnection(DBConnection): ---------- name: str The name of the table. + namespace: List[str], optional + The namespace to open the table from. + None or empty list represents root namespace. Returns ------- @@ -142,7 +208,7 @@ class RemoteDBConnection(DBConnection): " (there is no local cache to configure)" ) - table = LOOP.run(self._conn.open_table(name)) + table = LOOP.run(self._conn.open_table(name, namespace=namespace)) return RemoteTable(table, self.db_name) @override @@ -155,6 +221,8 @@ class RemoteDBConnection(DBConnection): fill_value: float = 0.0, mode: Optional[str] = None, embedding_functions: Optional[List[EmbeddingFunctionConfig]] = None, + *, + namespace: List[str] = [], ) -> Table: """Create a [Table][lancedb.table.Table] in the database. @@ -162,6 +230,9 @@ class RemoteDBConnection(DBConnection): ---------- name: str The name of the table. + namespace: List[str], optional + The namespace to create the table in. + None or empty list represents root namespace. data: The data to initialize the table, *optional* User must provide at least one of `data` or `schema`. Acceptable types are: @@ -262,6 +333,7 @@ class RemoteDBConnection(DBConnection): self._conn.create_table( name, data, + namespace=namespace, mode=mode, schema=schema, on_bad_vectors=on_bad_vectors, @@ -271,18 +343,27 @@ class RemoteDBConnection(DBConnection): return RemoteTable(table, self.db_name) @override - def drop_table(self, name: str): + def drop_table(self, name: str, namespace: Optional[List[str]] = None): """Drop a table from the database. Parameters ---------- name: str The name of the table. + namespace: List[str], optional + The namespace to drop the table from. + None or empty list represents root namespace. """ - LOOP.run(self._conn.drop_table(name)) + LOOP.run(self._conn.drop_table(name, namespace=namespace)) @override - def rename_table(self, cur_name: str, new_name: str): + def rename_table( + self, + cur_name: str, + new_name: str, + cur_namespace: Optional[List[str]] = None, + new_namespace: Optional[List[str]] = None, + ): """Rename a table in the database. Parameters @@ -292,7 +373,14 @@ class RemoteDBConnection(DBConnection): new_name: str The new name of the table. """ - LOOP.run(self._conn.rename_table(cur_name, new_name)) + LOOP.run( + self._conn.rename_table( + cur_name, + new_name, + cur_namespace=cur_namespace, + new_namespace=new_namespace, + ) + ) async def close(self): """Close the connection to the database.""" diff --git a/python/python/lancedb/table.py b/python/python/lancedb/table.py index bd7f2039..0f36d00f 100644 --- a/python/python/lancedb/table.py +++ b/python/python/lancedb/table.py @@ -1700,13 +1700,16 @@ class LanceTable(Table): connection: "LanceDBConnection", name: str, *, + namespace: List[str] = [], storage_options: Optional[Dict[str, str]] = None, index_cache_size: Optional[int] = None, ): self._conn = connection + self._namespace = namespace self._table = LOOP.run( connection._conn.open_table( name, + namespace=namespace, storage_options=storage_options, index_cache_size=index_cache_size, ) @@ -1717,8 +1720,8 @@ class LanceTable(Table): return self._table.name @classmethod - def open(cls, db, name, **kwargs): - tbl = cls(db, name, **kwargs) + def open(cls, db, name, *, namespace: List[str] = [], **kwargs): + tbl = cls(db, name, namespace=namespace, **kwargs) # check the dataset exists try: @@ -2539,6 +2542,7 @@ class LanceTable(Table): fill_value: float = 0.0, embedding_functions: Optional[List[EmbeddingFunctionConfig]] = None, *, + namespace: List[str] = [], storage_options: Optional[Dict[str, str | bool]] = None, data_storage_version: Optional[str] = None, enable_v2_manifest_paths: Optional[bool] = None, @@ -2598,6 +2602,7 @@ class LanceTable(Table): """ self = cls.__new__(cls) self._conn = db + self._namespace = namespace if data_storage_version is not None: warnings.warn( @@ -2630,6 +2635,7 @@ class LanceTable(Table): on_bad_vectors=on_bad_vectors, fill_value=fill_value, embedding_functions=embedding_functions, + namespace=namespace, storage_options=storage_options, ) ) diff --git a/python/python/tests/test_db.py b/python/python/tests/test_db.py index 88bb33e8..7c2a8c36 100644 --- a/python/python/tests/test_db.py +++ b/python/python/tests/test_db.py @@ -728,3 +728,93 @@ def test_bypass_vector_index_sync(tmp_db: lancedb.DBConnection): table.search(sample_key).bypass_vector_index().explain_plan(verbose=True) ) assert "KNN" in plan_without_index + + +def test_local_namespace_operations(tmp_path): + """Test that local mode namespace operations behave as expected.""" + # Create a local database connection + db = lancedb.connect(tmp_path) + + # Test list_namespaces returns empty list + namespaces = list(db.list_namespaces()) + assert namespaces == [] + + # Test list_namespaces with parameters still returns empty list + namespaces_with_params = list( + db.list_namespaces(namespace=["test"], page_token="token", limit=5) + ) + assert namespaces_with_params == [] + + +def test_local_create_namespace_not_supported(tmp_path): + """Test that create_namespace is not supported in local mode.""" + db = lancedb.connect(tmp_path) + + with pytest.raises( + NotImplementedError, + match="Namespace operations are not supported for listing database", + ): + db.create_namespace(["test_namespace"]) + + +def test_local_drop_namespace_not_supported(tmp_path): + """Test that drop_namespace is not supported in local mode.""" + db = lancedb.connect(tmp_path) + + with pytest.raises( + NotImplementedError, + match="Namespace operations are not supported for listing database", + ): + db.drop_namespace(["test_namespace"]) + + +def test_local_table_operations_with_namespace_raise_error(tmp_path): + """ + Test that table operations with namespace parameter + raise ValueError in local mode. + """ + db = lancedb.connect(tmp_path) + + # Create some test data + data = [{"vector": [1.0, 2.0], "item": "test"}] + schema = pa.schema( + [pa.field("vector", pa.list_(pa.float32(), 2)), pa.field("item", pa.string())] + ) + + # Test create_table with namespace - should raise ValueError + with pytest.raises( + NotImplementedError, + match="Namespace parameter is not supported for listing database", + ): + db.create_table( + "test_table_with_ns", data=data, schema=schema, namespace=["test_ns"] + ) + + # Create table normally for other tests + db.create_table("test_table", data=data, schema=schema) + assert "test_table" in db.table_names() + + # Test open_table with namespace - should raise ValueError + with pytest.raises( + NotImplementedError, + match="Namespace parameter is not supported for listing database", + ): + db.open_table("test_table", namespace=["test_ns"]) + + # Test table_names with namespace - should raise ValueError + with pytest.raises( + NotImplementedError, + match="Namespace parameter is not supported for listing database", + ): + list(db.table_names(namespace=["test_ns"])) + + # Test drop_table with namespace - should raise ValueError + with pytest.raises( + NotImplementedError, + match="Namespace parameter is not supported for listing database", + ): + db.drop_table("test_table", namespace=["test_ns"]) + + # Test table_names without namespace - should work normally + tables_root = list(db.table_names()) + assert "test_table" in tables_root diff --git a/python/python/tests/test_namespace.py b/python/python/tests/test_namespace.py index 36968763..e7f6b017 100644 --- a/python/python/tests/test_namespace.py +++ b/python/python/tests/test_namespace.py @@ -23,6 +23,12 @@ from lance_namespace_urllib3_client.models import ( CreateTableResponse, DropTableRequest, DropTableResponse, + ListNamespacesRequest, + ListNamespacesResponse, + CreateNamespaceRequest, + CreateNamespaceResponse, + DropNamespaceRequest, + DropNamespaceResponse, ) @@ -31,6 +37,8 @@ class TempNamespace(LanceNamespace): # Class-level storage to persist table registry across instances _global_registry: Dict[str, Dict[str, str]] = {} + # Class-level storage for namespaces (supporting 1-level namespace) + _global_namespaces: Dict[str, set] = {} def __init__(self, **properties): """Initialize the test namespace. @@ -44,20 +52,48 @@ class TempNamespace(LanceNamespace): root = self.config.root if root not in self._global_registry: self._global_registry[root] = {} + if root not in self._global_namespaces: + self._global_namespaces[root] = set() self.tables = self._global_registry[root] # Reference to shared registry + self.namespaces = self._global_namespaces[ + root + ] # Reference to shared namespaces def list_tables(self, request: ListTablesRequest) -> ListTablesResponse: """List all tables in the namespace.""" - # For simplicity, ignore namespace ID validation - tables = list(self.tables.keys()) + if not request.id: + # List all tables in root namespace + tables = [name for name in self.tables.keys() if "." not in name] + else: + # List tables in specific namespace (1-level only) + if len(request.id) == 1: + namespace_name = request.id[0] + prefix = f"{namespace_name}." + tables = [ + name[len(prefix) :] + for name in self.tables.keys() + if name.startswith(prefix) + ] + else: + # Multi-level namespaces not supported + raise ValueError("Only 1-level namespaces are supported") return ListTablesResponse(tables=tables) def describe_table(self, request: DescribeTableRequest) -> DescribeTableResponse: """Describe a table by returning its location.""" - if not request.id or len(request.id) != 1: + if not request.id: raise ValueError("Invalid table ID") - table_name = request.id[0] + if len(request.id) == 1: + # Root namespace table + table_name = request.id[0] + elif len(request.id) == 2: + # Namespaced table (1-level namespace) + namespace_name, table_name = request.id + table_name = f"{namespace_name}.{table_name}" + else: + raise ValueError("Only 1-level namespaces are supported") + if table_name not in self.tables: raise RuntimeError(f"Table does not exist: {table_name}") @@ -68,10 +104,22 @@ class TempNamespace(LanceNamespace): self, request: CreateTableRequest, request_data: bytes ) -> CreateTableResponse: """Create a table in the namespace.""" - if not request.id or len(request.id) != 1: + if not request.id: raise ValueError("Invalid table ID") - table_name = request.id[0] + if len(request.id) == 1: + # Root namespace table + table_name = request.id[0] + table_uri = f"{self.config.root}/{table_name}.lance" + elif len(request.id) == 2: + # Namespaced table (1-level namespace) + namespace_name, base_table_name = request.id + # Add namespace to our namespace set + self.namespaces.add(namespace_name) + table_name = f"{namespace_name}.{base_table_name}" + table_uri = f"{self.config.root}/{namespace_name}/{base_table_name}.lance" + else: + raise ValueError("Only 1-level namespaces are supported") # Check if table already exists if table_name in self.tables: @@ -81,13 +129,14 @@ class TempNamespace(LanceNamespace): else: raise RuntimeError(f"Table already exists: {table_name}") - # Generate table URI based on root directory - table_uri = f"{self.config.root}/{table_name}.lance" - # Parse the Arrow IPC stream to get the schema and create the actual table import pyarrow.ipc as ipc import io import lance + import os + + # Create directory if needed for namespaced tables + os.makedirs(os.path.dirname(table_uri), exist_ok=True) # Read the IPC stream reader = ipc.open_stream(io.BytesIO(request_data)) @@ -103,10 +152,19 @@ class TempNamespace(LanceNamespace): def drop_table(self, request: DropTableRequest) -> DropTableResponse: """Drop a table from the namespace.""" - if not request.id or len(request.id) != 1: + if not request.id: raise ValueError("Invalid table ID") - table_name = request.id[0] + if len(request.id) == 1: + # Root namespace table + table_name = request.id[0] + elif len(request.id) == 2: + # Namespaced table (1-level namespace) + namespace_name, base_table_name = request.id + table_name = f"{namespace_name}.{base_table_name}" + else: + raise ValueError("Only 1-level namespaces are supported") + if table_name not in self.tables: raise RuntimeError(f"Table does not exist: {table_name}") @@ -152,6 +210,78 @@ class TempNamespace(LanceNamespace): del self.tables[table_name] return DeregisterTableResponse() + def list_namespaces(self, request: ListNamespacesRequest) -> ListNamespacesResponse: + """List child namespaces.""" + if not request.id: + # List root-level namespaces + namespaces = list(self.namespaces) + elif len(request.id) == 1: + # For 1-level namespace, there are no child namespaces + namespaces = [] + else: + raise ValueError("Only 1-level namespaces are supported") + + return ListNamespacesResponse(namespaces=namespaces) + + def create_namespace( + self, request: CreateNamespaceRequest + ) -> CreateNamespaceResponse: + """Create a namespace.""" + if not request.id: + raise ValueError("Invalid namespace ID") + + if len(request.id) == 1: + # Create 1-level namespace + namespace_name = request.id[0] + self.namespaces.add(namespace_name) + + # Create directory for the namespace + import os + + namespace_dir = f"{self.config.root}/{namespace_name}" + os.makedirs(namespace_dir, exist_ok=True) + else: + raise ValueError("Only 1-level namespaces are supported") + + return CreateNamespaceResponse() + + def drop_namespace(self, request: DropNamespaceRequest) -> DropNamespaceResponse: + """Drop a namespace.""" + if not request.id: + raise ValueError("Invalid namespace ID") + + if len(request.id) == 1: + # Drop 1-level namespace + namespace_name = request.id[0] + + if namespace_name not in self.namespaces: + raise RuntimeError(f"Namespace does not exist: {namespace_name}") + + # Check if namespace has any tables + prefix = f"{namespace_name}." + tables_in_namespace = [ + name for name in self.tables.keys() if name.startswith(prefix) + ] + if tables_in_namespace: + raise RuntimeError( + f"Cannot drop namespace '{namespace_name}': contains tables" + ) + + # Remove namespace + self.namespaces.remove(namespace_name) + + # Remove directory + import shutil + import os + + namespace_dir = f"{self.config.root}/{namespace_name}" + if os.path.exists(namespace_dir): + shutil.rmtree(namespace_dir, ignore_errors=True) + else: + raise ValueError("Only 1-level namespaces are supported") + + return DropNamespaceResponse() + class TempNamespaceConfig: """Configuration for TestNamespace.""" @@ -187,12 +317,16 @@ class TestNamespaceConnection: # Clear the TestNamespace registry for this test if self.temp_dir in TempNamespace._global_registry: TempNamespace._global_registry[self.temp_dir].clear() + if self.temp_dir in TempNamespace._global_namespaces: + TempNamespace._global_namespaces[self.temp_dir].clear() def teardown_method(self): """Clean up test fixtures.""" # Clear the TestNamespace registry if self.temp_dir in TempNamespace._global_registry: del TempNamespace._global_registry[self.temp_dir] + if self.temp_dir in TempNamespace._global_namespaces: + del TempNamespace._global_namespaces[self.temp_dir] shutil.rmtree(self.temp_dir, ignore_errors=True) def test_connect_namespace_test(self): @@ -412,3 +546,153 @@ class TestNamespaceConnection: ] ) db.create_table("test_table", schema=schema, storage_options=table_opts) + + def test_namespace_operations(self): + """Test namespace management operations.""" + db = lancedb.connect_namespace("temp", {"root": self.temp_dir}) + + # Initially no namespaces + assert len(list(db.list_namespaces())) == 0 + + # Create a namespace + db.create_namespace(["test_namespace"]) + + # Verify namespace exists + namespaces = list(db.list_namespaces()) + assert "test_namespace" in namespaces + assert len(namespaces) == 1 + + # Create table in namespace + schema = pa.schema( + [ + pa.field("id", pa.int64()), + pa.field("vector", pa.list_(pa.float32(), 2)), + ] + ) + table = db.create_table( + "test_table", schema=schema, namespace=["test_namespace"] + ) + assert table is not None + + # Verify table exists in namespace + tables_in_namespace = list(db.table_names(namespace=["test_namespace"])) + assert "test_table" in tables_in_namespace + assert len(tables_in_namespace) == 1 + + # Open table from namespace + table = db.open_table("test_table", namespace=["test_namespace"]) + assert table is not None + assert table.name == "test_table" + + # Drop table from namespace + db.drop_table("test_table", namespace=["test_namespace"]) + + # Verify table no longer exists in namespace + tables_in_namespace = list(db.table_names(namespace=["test_namespace"])) + assert len(tables_in_namespace) == 0 + + # Drop namespace + db.drop_namespace(["test_namespace"]) + + # Verify namespace no longer exists + namespaces = list(db.list_namespaces()) + assert len(namespaces) == 0 + + def test_namespace_with_tables_cannot_be_dropped(self): + """Test that namespaces containing tables cannot be dropped.""" + db = lancedb.connect_namespace("temp", {"root": self.temp_dir}) + + # Create namespace and table + db.create_namespace(["test_namespace"]) + schema = pa.schema( + [ + pa.field("id", pa.int64()), + pa.field("vector", pa.list_(pa.float32(), 2)), + ] + ) + db.create_table("test_table", schema=schema, namespace=["test_namespace"]) + + # Try to drop namespace with tables - should fail + with pytest.raises(RuntimeError, match="contains tables"): + db.drop_namespace(["test_namespace"]) + + # Drop table first + db.drop_table("test_table", namespace=["test_namespace"]) + + # Now dropping namespace should work + db.drop_namespace(["test_namespace"]) + + def test_same_table_name_different_namespaces(self): + db = lancedb.connect_namespace("temp", {"root": self.temp_dir}) + + # Create two namespaces + db.create_namespace(["namespace_a"]) + db.create_namespace(["namespace_b"]) + + # Define schema + schema = pa.schema( + [ + pa.field("id", pa.int64()), + pa.field("vector", pa.list_(pa.float32(), 2)), + pa.field("text", pa.string()), + ] + ) + + # Create table with same name in both namespaces + table_a = db.create_table( + "same_name_table", schema=schema, namespace=["namespace_a"] + ) + table_b = db.create_table( + "same_name_table", schema=schema, namespace=["namespace_b"] + ) + + # Add different data to each table + data_a = [ + {"id": 1, "vector": [1.0, 2.0], "text": "data_from_namespace_a"}, + {"id": 2, "vector": [3.0, 4.0], "text": "also_from_namespace_a"}, + ] + table_a.add(data_a) + + data_b = [ + {"id": 10, "vector": [10.0, 20.0], "text": "data_from_namespace_b"}, + {"id": 20, "vector": [30.0, 40.0], "text": "also_from_namespace_b"}, + {"id": 30, "vector": [50.0, 60.0], "text": "more_from_namespace_b"}, + ] + table_b.add(data_b) + + # Verify data in namespace_a table + opened_table_a = db.open_table("same_name_table", namespace=["namespace_a"]) + result_a = opened_table_a.to_pandas().sort_values("id").reset_index(drop=True) + assert len(result_a) == 2 + assert result_a["id"].tolist() == [1, 2] + assert result_a["text"].tolist() == [ + "data_from_namespace_a", + "also_from_namespace_a", + ] + assert [v.tolist() for v in result_a["vector"]] == [[1.0, 2.0], [3.0, 4.0]] + + # Verify data in namespace_b table + opened_table_b = db.open_table("same_name_table", namespace=["namespace_b"]) + result_b = opened_table_b.to_pandas().sort_values("id").reset_index(drop=True) + assert len(result_b) == 3 + assert result_b["id"].tolist() == [10, 20, 30] + assert result_b["text"].tolist() == [ + "data_from_namespace_b", + "also_from_namespace_b", + "more_from_namespace_b", + ] + assert [v.tolist() for v in result_b["vector"]] == [ + [10.0, 20.0], + [30.0, 40.0], + [50.0, 60.0], + ] + + # Verify root namespace doesn't have this table + root_tables = list(db.table_names()) + assert "same_name_table" not in root_tables + + # Clean up + db.drop_table("same_name_table", namespace=["namespace_a"]) + db.drop_table("same_name_table", namespace=["namespace_b"]) + db.drop_namespace(["namespace_a"]) + db.drop_namespace(["namespace_b"]) diff --git a/python/src/connection.rs b/python/src/connection.rs index d5fe00cd..8e0507f9 100644 --- a/python/src/connection.rs +++ b/python/src/connection.rs @@ -63,14 +63,16 @@ impl Connection { self.get_inner().map(|inner| inner.uri().to_string()) } - #[pyo3(signature = (start_after=None, limit=None))] + #[pyo3(signature = (namespace=vec![], start_after=None, limit=None))] pub fn table_names( self_: PyRef<'_, Self>, + namespace: Vec, start_after: Option, limit: Option, ) -> PyResult> { let inner = self_.get_inner()?.clone(); let mut op = inner.table_names(); + op = op.namespace(namespace); if let Some(start_after) = start_after { op = op.start_after(start_after); } @@ -80,12 +82,13 @@ impl Connection { future_into_py(self_.py(), async move { op.execute().await.infer_error() }) } - #[pyo3(signature = (name, mode, data, storage_options=None))] + #[pyo3(signature = (name, mode, data, namespace=vec![], storage_options=None))] pub fn create_table<'a>( self_: PyRef<'a, Self>, name: String, mode: &str, data: Bound<'_, PyAny>, + namespace: Vec, storage_options: Option>, ) -> PyResult> { let inner = self_.get_inner()?.clone(); @@ -93,8 +96,10 @@ impl Connection { let mode = Self::parse_create_mode_str(mode)?; let batches = ArrowArrayStreamReader::from_pyarrow_bound(&data)?; + let mut builder = inner.create_table(name, batches).mode(mode); + builder = builder.namespace(namespace); if let Some(storage_options) = storage_options { builder = builder.storage_options(storage_options); } @@ -105,12 +110,13 @@ impl Connection { }) } - #[pyo3(signature = (name, mode, schema, storage_options=None))] + #[pyo3(signature = (name, mode, schema, namespace=vec![], storage_options=None))] pub fn create_empty_table<'a>( self_: PyRef<'a, Self>, name: String, mode: &str, schema: Bound<'_, PyAny>, + namespace: Vec, storage_options: Option>, ) -> PyResult> { let inner = self_.get_inner()?.clone(); @@ -121,6 +127,7 @@ impl Connection { let mut builder = inner.create_empty_table(name, Arc::new(schema)).mode(mode); + builder = builder.namespace(namespace); if let Some(storage_options) = storage_options { builder = builder.storage_options(storage_options); } @@ -131,49 +138,115 @@ impl Connection { }) } - #[pyo3(signature = (name, storage_options = None, index_cache_size = None))] + #[pyo3(signature = (name, namespace=vec![], storage_options = None, index_cache_size = None))] pub fn open_table( self_: PyRef<'_, Self>, name: String, + namespace: Vec, storage_options: Option>, index_cache_size: Option, ) -> PyResult> { let inner = self_.get_inner()?.clone(); + let mut builder = inner.open_table(name); + builder = builder.namespace(namespace); if let Some(storage_options) = storage_options { builder = builder.storage_options(storage_options); } if let Some(index_cache_size) = index_cache_size { builder = builder.index_cache_size(index_cache_size); } + future_into_py(self_.py(), async move { let table = builder.execute().await.infer_error()?; Ok(Table::new(table)) }) } + #[pyo3(signature = (cur_name, new_name, cur_namespace=vec![], new_namespace=vec![]))] pub fn rename_table( self_: PyRef<'_, Self>, - old_name: String, + cur_name: String, new_name: String, + cur_namespace: Vec, + new_namespace: Vec, ) -> PyResult> { let inner = self_.get_inner()?.clone(); future_into_py(self_.py(), async move { - inner.rename_table(old_name, new_name).await.infer_error() + inner + .rename_table(cur_name, new_name, &cur_namespace, &new_namespace) + .await + .infer_error() }) } - pub fn drop_table(self_: PyRef<'_, Self>, name: String) -> PyResult> { + #[pyo3(signature = (name, namespace=vec![]))] + pub fn drop_table( + self_: PyRef<'_, Self>, + name: String, + namespace: Vec, + ) -> PyResult> { let inner = self_.get_inner()?.clone(); future_into_py(self_.py(), async move { - inner.drop_table(name).await.infer_error() + inner.drop_table(name, &namespace).await.infer_error() }) } - pub fn drop_all_tables(self_: PyRef<'_, Self>) -> PyResult> { + #[pyo3(signature = (namespace=vec![],))] + pub fn drop_all_tables( + self_: PyRef<'_, Self>, + namespace: Vec, + ) -> PyResult> { let inner = self_.get_inner()?.clone(); future_into_py(self_.py(), async move { - inner.drop_all_tables().await.infer_error() + inner.drop_all_tables(&namespace).await.infer_error() + }) + } + + // Namespace management methods + + #[pyo3(signature = (namespace=vec![], page_token=None, limit=None))] + pub fn list_namespaces( + self_: PyRef<'_, Self>, + namespace: Vec, + page_token: Option, + limit: Option, + ) -> PyResult> { + let inner = self_.get_inner()?.clone(); + future_into_py(self_.py(), async move { + use lancedb::database::ListNamespacesRequest; + let request = ListNamespacesRequest { + namespace, + page_token, + limit, + }; + inner.list_namespaces(request).await.infer_error() + }) + } + + #[pyo3(signature = (namespace,))] + pub fn create_namespace( + self_: PyRef<'_, Self>, + namespace: Vec, + ) -> PyResult> { + let inner = self_.get_inner()?.clone(); + future_into_py(self_.py(), async move { + use lancedb::database::CreateNamespaceRequest; + let request = CreateNamespaceRequest { namespace }; + inner.create_namespace(request).await.infer_error() + }) + } + + #[pyo3(signature = (namespace,))] + pub fn drop_namespace( + self_: PyRef<'_, Self>, + namespace: Vec, + ) -> PyResult> { + let inner = self_.get_inner()?.clone(); + future_into_py(self_.py(), async move { + use lancedb::database::DropNamespaceRequest; + let request = DropNamespaceRequest { namespace }; + inner.drop_namespace(request).await.infer_error() }) } } @@ -227,6 +300,7 @@ pub struct PyClientConfig { retry_config: Option, timeout_config: Option, extra_headers: Option>, + id_delimiter: Option, } #[derive(FromPyObject)] @@ -281,6 +355,7 @@ impl From for lancedb::remote::ClientConfig { retry_config: value.retry_config.map(Into::into).unwrap_or_default(), timeout_config: value.timeout_config.map(Into::into).unwrap_or_default(), extra_headers: value.extra_headers.unwrap_or_default(), + id_delimiter: value.id_delimiter, } } } diff --git a/rust/lancedb/examples/simple.rs b/rust/lancedb/examples/simple.rs index f5400da5..57b7be70 100644 --- a/rust/lancedb/examples/simple.rs +++ b/rust/lancedb/examples/simple.rs @@ -43,7 +43,7 @@ async fn main() -> Result<()> { // --8<-- [end:delete] // --8<-- [start:drop_table] - db.drop_table("my_table").await.unwrap(); + db.drop_table("my_table", &[]).await.unwrap(); // --8<-- [end:drop_table] Ok(()) } diff --git a/rust/lancedb/src/catalog/listing.rs b/rust/lancedb/src/catalog/listing.rs index c78336f0..b9604d62 100644 --- a/rust/lancedb/src/catalog/listing.rs +++ b/rust/lancedb/src/catalog/listing.rs @@ -379,6 +379,7 @@ mod tests { data: CreateTableData::Empty(TableDefinition::new_from_schema(dummy_schema)), mode: Default::default(), write_options: Default::default(), + namespace: vec![], }) .await .unwrap(); @@ -414,6 +415,7 @@ mod tests { data: CreateTableData::Empty(TableDefinition::new_from_schema(dummy_schema)), mode: Default::default(), write_options: Default::default(), + namespace: vec![], }) .await .unwrap(); diff --git a/rust/lancedb/src/connection.rs b/rust/lancedb/src/connection.rs index b08aa25e..79ac11f6 100644 --- a/rust/lancedb/src/connection.rs +++ b/rust/lancedb/src/connection.rs @@ -19,8 +19,9 @@ use crate::database::listing::{ ListingDatabase, OPT_NEW_TABLE_STORAGE_VERSION, OPT_NEW_TABLE_V2_MANIFEST_PATHS, }; use crate::database::{ - CreateTableData, CreateTableMode, CreateTableRequest, Database, DatabaseOptions, - OpenTableRequest, TableNamesRequest, + CreateNamespaceRequest, CreateTableData, CreateTableMode, CreateTableRequest, Database, + DatabaseOptions, DropNamespaceRequest, ListNamespacesRequest, OpenTableRequest, + TableNamesRequest, }; use crate::embeddings::{ EmbeddingDefinition, EmbeddingFunction, EmbeddingRegistry, MemoryRegistry, WithEmbeddings, @@ -67,6 +68,12 @@ impl TableNamesBuilder { self } + /// Set the namespace to list tables from + pub fn namespace(mut self, namespace: Vec) -> Self { + self.request.namespace = namespace; + self + } + /// Execute the table names operation pub async fn execute(self) -> Result> { self.parent.clone().table_names(self.request).await @@ -348,6 +355,12 @@ impl CreateTableBuilder { ); self } + + /// Set the namespace for the table + pub fn namespace(mut self, namespace: Vec) -> Self { + self.request.namespace = namespace; + self + } } #[derive(Clone, Debug)] @@ -367,6 +380,7 @@ impl OpenTableBuilder { parent, request: OpenTableRequest { name, + namespace: vec![], index_cache_size: None, lance_read_params: None, }, @@ -442,6 +456,12 @@ impl OpenTableBuilder { self } + /// Set the namespace for the table + pub fn namespace(mut self, namespace: Vec) -> Self { + self.request.namespace = namespace; + self + } + /// Open the table pub async fn execute(self) -> Result
{ Ok(Table::new_with_embedding_registry( @@ -564,9 +584,16 @@ impl Connection { &self, old_name: impl AsRef, new_name: impl AsRef, + cur_namespace: &[String], + new_namespace: &[String], ) -> Result<()> { self.internal - .rename_table(old_name.as_ref(), new_name.as_ref()) + .rename_table( + old_name.as_ref(), + new_name.as_ref(), + cur_namespace, + new_namespace, + ) .await } @@ -574,8 +601,9 @@ impl Connection { /// /// # Arguments /// * `name` - The name of the table to drop - pub async fn drop_table(&self, name: impl AsRef) -> Result<()> { - self.internal.drop_table(name.as_ref()).await + /// * `namespace` - The namespace to drop the table from + pub async fn drop_table(&self, name: impl AsRef, namespace: &[String]) -> Result<()> { + self.internal.drop_table(name.as_ref(), namespace).await } /// Drop the database @@ -583,12 +611,30 @@ impl Connection { /// This is the same as dropping all of the tables #[deprecated(since = "0.15.1", note = "Use `drop_all_tables` instead")] pub async fn drop_db(&self) -> Result<()> { - self.internal.drop_all_tables().await + self.internal.drop_all_tables(&[]).await } /// Drops all tables in the database - pub async fn drop_all_tables(&self) -> Result<()> { - self.internal.drop_all_tables().await + /// + /// # Arguments + /// * `namespace` - The namespace to drop all tables from. Empty slice represents root namespace. + pub async fn drop_all_tables(&self, namespace: &[String]) -> Result<()> { + self.internal.drop_all_tables(namespace).await + } + + /// List immediate child namespace names in the given namespace + pub async fn list_namespaces(&self, request: ListNamespacesRequest) -> Result> { + self.internal.list_namespaces(request).await + } + + /// Create a new namespace + pub async fn create_namespace(&self, request: CreateNamespaceRequest) -> Result<()> { + self.internal.create_namespace(request).await + } + + /// Drop a namespace + pub async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<()> { + self.internal.drop_namespace(request).await } /// Get the in-memory embedding registry. @@ -1220,12 +1266,12 @@ mod tests { // drop non-exist table assert!(matches!( - db.drop_table("invalid_table").await, + db.drop_table("invalid_table", &[]).await, Err(crate::Error::TableNotFound { .. }), )); create_dir_all(tmp_dir.path().join("table1.lance")).unwrap(); - db.drop_table("table1").await.unwrap(); + db.drop_table("table1", &[]).await.unwrap(); let tables = db.table_names().execute().await.unwrap(); assert_eq!(tables.len(), 0); diff --git a/rust/lancedb/src/database.rs b/rust/lancedb/src/database.rs index 63406059..fe83d508 100644 --- a/rust/lancedb/src/database.rs +++ b/rust/lancedb/src/database.rs @@ -34,9 +34,36 @@ pub trait DatabaseOptions { fn serialize_into_map(&self, map: &mut HashMap); } +/// A request to list namespaces in the database +#[derive(Clone, Debug, Default)] +pub struct ListNamespacesRequest { + /// The parent namespace to list namespaces in. Empty list represents root namespace. + pub namespace: Vec, + /// If present, only return names that come lexicographically after the supplied value. + pub page_token: Option, + /// The maximum number of namespace names to return + pub limit: Option, +} + +/// A request to create a namespace +#[derive(Clone, Debug)] +pub struct CreateNamespaceRequest { + /// The namespace identifier to create + pub namespace: Vec, +} + +/// A request to drop a namespace +#[derive(Clone, Debug)] +pub struct DropNamespaceRequest { + /// The namespace identifier to drop + pub namespace: Vec, +} + /// A request to list names of tables in the database #[derive(Clone, Debug, Default)] pub struct TableNamesRequest { + /// The namespace to list tables in. Empty list represents root namespace. + pub namespace: Vec, /// If present, only return names that come lexicographically after the supplied /// value. /// @@ -51,6 +78,8 @@ pub struct TableNamesRequest { #[derive(Clone, Debug)] pub struct OpenTableRequest { pub name: String, + /// The namespace to open the table from. Empty list represents root namespace. + pub namespace: Vec, pub index_cache_size: Option, pub lance_read_params: Option, } @@ -125,6 +154,8 @@ impl StreamingWriteSource for CreateTableData { pub struct CreateTableRequest { /// The name of the new table pub name: String, + /// The namespace to create the table in. Empty list represents root namespace. + pub namespace: Vec, /// Initial data to write to the table, can be None to create an empty table pub data: CreateTableData, /// The mode to use when creating the table @@ -137,6 +168,7 @@ impl CreateTableRequest { pub fn new(name: String, data: CreateTableData) -> Self { Self { name, + namespace: vec![], data, mode: CreateTableMode::default(), write_options: WriteOptions::default(), @@ -151,6 +183,12 @@ impl CreateTableRequest { pub trait Database: Send + Sync + std::any::Any + std::fmt::Debug + std::fmt::Display + 'static { + /// List immediate child namespace names in the given namespace + async fn list_namespaces(&self, request: ListNamespacesRequest) -> Result>; + /// Create a new namespace + async fn create_namespace(&self, request: CreateNamespaceRequest) -> Result<()>; + /// Drop a namespace + async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<()>; /// List the names of tables in the database async fn table_names(&self, request: TableNamesRequest) -> Result>; /// Create a table in the database @@ -158,10 +196,16 @@ pub trait Database: /// Open a table in the database async fn open_table(&self, request: OpenTableRequest) -> Result>; /// Rename a table in the database - async fn rename_table(&self, old_name: &str, new_name: &str) -> Result<()>; + async fn rename_table( + &self, + cur_name: &str, + new_name: &str, + cur_namespace: &[String], + new_namespace: &[String], + ) -> Result<()>; /// Drop a table in the database - async fn drop_table(&self, name: &str) -> Result<()>; + async fn drop_table(&self, name: &str, namespace: &[String]) -> Result<()>; /// Drop all tables in the database - async fn drop_all_tables(&self) -> Result<()>; + async fn drop_all_tables(&self, namespace: &[String]) -> Result<()>; fn as_any(&self) -> &dyn std::any::Any; } diff --git a/rust/lancedb/src/database/listing.rs b/rust/lancedb/src/database/listing.rs index 13b84f5f..824f73e9 100644 --- a/rust/lancedb/src/database/listing.rs +++ b/rust/lancedb/src/database/listing.rs @@ -22,7 +22,8 @@ use crate::table::NativeTable; use crate::utils::validate_table_name; use super::{ - BaseTable, CreateTableMode, CreateTableRequest, Database, DatabaseOptions, OpenTableRequest, + BaseTable, CreateNamespaceRequest, CreateTableMode, CreateTableRequest, Database, + DatabaseOptions, DropNamespaceRequest, ListNamespacesRequest, OpenTableRequest, TableNamesRequest, }; @@ -551,6 +552,7 @@ impl ListingDatabase { async fn handle_table_exists( &self, table_name: &str, + namespace: Vec, mode: CreateTableMode, data_schema: &arrow_schema::Schema, ) -> Result> { @@ -561,6 +563,7 @@ impl ListingDatabase { CreateTableMode::ExistOk(callback) => { let req = OpenTableRequest { name: table_name.to_string(), + namespace: namespace.clone(), index_cache_size: None, lance_read_params: None, }; @@ -584,7 +587,28 @@ impl ListingDatabase { #[async_trait::async_trait] impl Database for ListingDatabase { + async fn list_namespaces(&self, _request: ListNamespacesRequest) -> Result> { + Ok(Vec::new()) + } + + async fn create_namespace(&self, _request: CreateNamespaceRequest) -> Result<()> { + Err(Error::NotSupported { + message: "Namespace operations are not supported for listing database".into(), + }) + } + + async fn drop_namespace(&self, _request: DropNamespaceRequest) -> Result<()> { + Err(Error::NotSupported { + message: "Namespace operations are not supported for listing database".into(), + }) + } + async fn table_names(&self, request: TableNamesRequest) -> Result> { + if !request.namespace.is_empty() { + return Err(Error::NotSupported { + message: "Namespace parameter is not supported for listing database. Only root namespace is supported.".into(), + }); + } let mut f = self .object_store .read_dir(self.base_path.clone()) @@ -615,6 +639,11 @@ impl Database for ListingDatabase { } async fn create_table(&self, request: CreateTableRequest) -> Result> { + if !request.namespace.is_empty() { + return Err(Error::NotSupported { + message: "Namespace parameter is not supported for listing database. Only root namespace is supported.".into(), + }); + } let table_uri = self.table_uri(&request.name)?; let (storage_version_override, v2_manifest_override) = @@ -637,14 +666,24 @@ impl Database for ListingDatabase { { Ok(table) => Ok(Arc::new(table)), Err(Error::TableAlreadyExists { .. }) => { - self.handle_table_exists(&request.name, request.mode, &data_schema) - .await + self.handle_table_exists( + &request.name, + request.namespace.clone(), + request.mode, + &data_schema, + ) + .await } Err(err) => Err(err), } } async fn open_table(&self, mut request: OpenTableRequest) -> Result> { + if !request.namespace.is_empty() { + return Err(Error::NotSupported { + message: "Namespace parameter is not supported for listing database. Only root namespace is supported.".into(), + }); + } let table_uri = self.table_uri(&request.name)?; // Only modify the storage options if we actually have something to @@ -694,17 +733,44 @@ impl Database for ListingDatabase { Ok(native_table) } - async fn rename_table(&self, _old_name: &str, _new_name: &str) -> Result<()> { + async fn rename_table( + &self, + _cur_name: &str, + _new_name: &str, + cur_namespace: &[String], + new_namespace: &[String], + ) -> Result<()> { + if !cur_namespace.is_empty() { + return Err(Error::NotSupported { + message: "Namespace parameter is not supported for listing database.".into(), + }); + } + if !new_namespace.is_empty() { + return Err(Error::NotSupported { + message: "Namespace parameter is not supported for listing database.".into(), + }); + } Err(Error::NotSupported { - message: "rename_table is not supported in LanceDB OSS".to_string(), + message: "rename_table is not supported in LanceDB OSS".into(), }) } - async fn drop_table(&self, name: &str) -> Result<()> { + async fn drop_table(&self, name: &str, namespace: &[String]) -> Result<()> { + if !namespace.is_empty() { + return Err(Error::NotSupported { + message: "Namespace parameter is not supported for listing database.".into(), + }); + } self.drop_tables(vec![name.to_string()]).await } - async fn drop_all_tables(&self) -> Result<()> { + async fn drop_all_tables(&self, namespace: &[String]) -> Result<()> { + // Check if namespace parameter is provided + if !namespace.is_empty() { + return Err(Error::NotSupported { + message: "Namespace parameter is not supported for listing database.".into(), + }); + } let tables = self.table_names(TableNamesRequest::default()).await?; self.drop_tables(tables).await } diff --git a/rust/lancedb/src/remote/client.rs b/rust/lancedb/src/remote/client.rs index ecf58df4..a5fcfd1f 100644 --- a/rust/lancedb/src/remote/client.rs +++ b/rust/lancedb/src/remote/client.rs @@ -25,6 +25,9 @@ pub struct ClientConfig { pub user_agent: String, // TODO: how to configure request ids? pub extra_headers: HashMap, + /// The delimiter to use when constructing object identifiers. + /// If not default, passes as query parameter. + pub id_delimiter: Option, } impl Default for ClientConfig { @@ -34,6 +37,7 @@ impl Default for ClientConfig { retry_config: RetryConfig::default(), user_agent: concat!("LanceDB-Rust-Client/", env!("CARGO_PKG_VERSION")).into(), extra_headers: HashMap::new(), + id_delimiter: None, } } } @@ -145,6 +149,7 @@ pub struct RestfulLanceDbClient { host: String, pub(crate) retry_config: ResolvedRetryConfig, pub(crate) sender: S, + pub(crate) id_delimiter: String, } pub trait HttpSend: Clone + Send + Sync + std::fmt::Debug + 'static { @@ -268,6 +273,7 @@ impl RestfulLanceDbClient { host, retry_config, sender: Sender, + id_delimiter: client_config.id_delimiter.unwrap_or("$".to_string()), }) } } @@ -356,12 +362,22 @@ impl RestfulLanceDbClient { pub fn get(&self, uri: &str) -> RequestBuilder { let full_uri = format!("{}{}", self.host, uri); - self.client.get(full_uri) + let builder = self.client.get(full_uri); + self.add_id_delimiter_query_param(builder) } pub fn post(&self, uri: &str) -> RequestBuilder { let full_uri = format!("{}{}", self.host, uri); - self.client.post(full_uri) + let builder = self.client.post(full_uri); + self.add_id_delimiter_query_param(builder) + } + + fn add_id_delimiter_query_param(&self, req: RequestBuilder) -> RequestBuilder { + if self.id_delimiter != "$" { + req.query(&[("delimiter", self.id_delimiter.clone())]) + } else { + req + } } pub async fn send(&self, req: RequestBuilder) -> Result<(String, Response)> { @@ -594,6 +610,7 @@ pub mod test_utils { sender: MockSender { f: Arc::new(wrapper), }, + id_delimiter: "$".to_string(), } } } diff --git a/rust/lancedb/src/remote/db.rs b/rust/lancedb/src/remote/db.rs index e87ad668..25f3be65 100644 --- a/rust/lancedb/src/remote/db.rs +++ b/rust/lancedb/src/remote/db.rs @@ -14,8 +14,9 @@ use serde::Deserialize; use tokio::task::spawn_blocking; use crate::database::{ - CreateTableData, CreateTableMode, CreateTableRequest, Database, DatabaseOptions, - OpenTableRequest, TableNamesRequest, + CreateNamespaceRequest, CreateTableData, CreateTableMode, CreateTableRequest, Database, + DatabaseOptions, DropNamespaceRequest, ListNamespacesRequest, OpenTableRequest, + TableNamesRequest, }; use crate::error::Result; use crate::table::BaseTable; @@ -245,10 +246,61 @@ impl From<&CreateTableMode> for &'static str { } } +fn build_table_identifier(name: &str, namespace: &[String], delimiter: &str) -> String { + if !namespace.is_empty() { + let mut parts = namespace.to_vec(); + parts.push(name.to_string()); + parts.join(delimiter) + } else { + name.to_string() + } +} + +fn build_namespace_identifier(namespace: &[String], delimiter: &str) -> String { + if namespace.is_empty() { + // According to the namespace spec, use delimiter to represent root namespace + delimiter.to_string() + } else { + namespace.join(delimiter) + } +} + +/// Build a secure cache key using length prefixes. +/// This format is completely unambiguous regardless of delimiter or content. +/// Format: [u32_len][namespace1][u32_len][namespace2]...[u32_len][table_name] +/// Returns a hex-encoded string for use as a cache key. +fn build_cache_key(name: &str, namespace: &[String]) -> String { + let mut key = Vec::new(); + + // Add each namespace component with length prefix + for ns in namespace { + let bytes = ns.as_bytes(); + key.extend_from_slice(&(bytes.len() as u32).to_le_bytes()); + key.extend_from_slice(bytes); + } + + // Add table name with length prefix + let name_bytes = name.as_bytes(); + key.extend_from_slice(&(name_bytes.len() as u32).to_le_bytes()); + key.extend_from_slice(name_bytes); + + // Convert to hex string for use as a cache key + key.iter().map(|b| format!("{:02x}", b)).collect() +} + #[async_trait] impl Database for RemoteDatabase { async fn table_names(&self, request: TableNamesRequest) -> Result> { - let mut req = self.client.get("/v1/table/"); + let mut req = if !request.namespace.is_empty() { + let namespace_id = + build_namespace_identifier(&request.namespace, &self.client.id_delimiter); + self.client + .get(&format!("/v1/namespace/{}/table/list", namespace_id)) + } else { + // TODO: use new API for all listing operations once stable + self.client.get("/v1/table/") + }; + if let Some(limit) = request.limit { req = req.query(&[("limit", limit)]); } @@ -264,12 +316,17 @@ impl Database for RemoteDatabase { .err_to_http(request_id)? .tables; for table in &tables { + let table_identifier = + build_table_identifier(table, &request.namespace, &self.client.id_delimiter); + let cache_key = build_cache_key(table, &request.namespace); let remote_table = Arc::new(RemoteTable::new( self.client.clone(), table.clone(), + request.namespace.clone(), + table_identifier.clone(), version.clone(), )); - self.table_cache.insert(table.clone(), remote_table).await; + self.table_cache.insert(cache_key, remote_table).await; } Ok(tables) } @@ -295,9 +352,11 @@ impl Database for RemoteDatabase { .await .unwrap()?; + let identifier = + build_table_identifier(&request.name, &request.namespace, &self.client.id_delimiter); let req = self .client - .post(&format!("/v1/table/{}/create/", request.name)) + .post(&format!("/v1/table/{}/create/", identifier)) .query(&[("mode", Into::<&str>::into(&request.mode))]) .body(data_buffer) .header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE); @@ -314,6 +373,7 @@ impl Database for RemoteDatabase { CreateTableMode::ExistOk(callback) => { let req = OpenTableRequest { name: request.name.clone(), + namespace: request.namespace.clone(), index_cache_size: None, lance_read_params: None, }; @@ -342,70 +402,160 @@ impl Database for RemoteDatabase { } let rsp = self.client.check_response(&request_id, rsp).await?; let version = parse_server_version(&request_id, &rsp)?; + let table_identifier = + build_table_identifier(&request.name, &request.namespace, &self.client.id_delimiter); + let cache_key = build_cache_key(&request.name, &request.namespace); let table = Arc::new(RemoteTable::new( self.client.clone(), request.name.clone(), + request.namespace.clone(), + table_identifier, version, )); - self.table_cache - .insert(request.name.clone(), table.clone()) - .await; + self.table_cache.insert(cache_key, table.clone()).await; Ok(table) } async fn open_table(&self, request: OpenTableRequest) -> Result> { + let identifier = + build_table_identifier(&request.name, &request.namespace, &self.client.id_delimiter); + let cache_key = build_cache_key(&request.name, &request.namespace); + // We describe the table to confirm it exists before moving on. - if let Some(table) = self.table_cache.get(&request.name).await { + if let Some(table) = self.table_cache.get(&cache_key).await { Ok(table.clone()) } else { let req = self .client - .post(&format!("/v1/table/{}/describe/", request.name)); + .post(&format!("/v1/table/{}/describe/", identifier)); let (request_id, rsp) = self.client.send_with_retry(req, None, true).await?; if rsp.status() == StatusCode::NOT_FOUND { - return Err(crate::Error::TableNotFound { name: request.name }); + return Err(crate::Error::TableNotFound { + name: identifier.clone(), + }); } let rsp = self.client.check_response(&request_id, rsp).await?; let version = parse_server_version(&request_id, &rsp)?; + let table_identifier = build_table_identifier( + &request.name, + &request.namespace, + &self.client.id_delimiter, + ); let table = Arc::new(RemoteTable::new( self.client.clone(), request.name.clone(), + request.namespace.clone(), + table_identifier, version, )); - self.table_cache.insert(request.name, table.clone()).await; + let cache_key = build_cache_key(&request.name, &request.namespace); + self.table_cache.insert(cache_key, table.clone()).await; Ok(table) } } - async fn rename_table(&self, current_name: &str, new_name: &str) -> Result<()> { + async fn rename_table( + &self, + current_name: &str, + new_name: &str, + cur_namespace: &[String], + new_namespace: &[String], + ) -> Result<()> { + let current_identifier = + build_table_identifier(current_name, cur_namespace, &self.client.id_delimiter); + let current_cache_key = build_cache_key(current_name, cur_namespace); + let new_cache_key = build_cache_key(new_name, new_namespace); + + let mut body = serde_json::json!({ "new_table_name": new_name }); + if !new_namespace.is_empty() { + body["new_namespace"] = serde_json::Value::Array( + new_namespace + .iter() + .map(|s| serde_json::Value::String(s.clone())) + .collect(), + ); + } let req = self .client - .post(&format!("/v1/table/{}/rename/", current_name)); - let req = req.json(&serde_json::json!({ "new_table_name": new_name })); + .post(&format!("/v1/table/{}/rename/", current_identifier)) + .json(&body); let (request_id, resp) = self.client.send(req).await?; self.client.check_response(&request_id, resp).await?; - let table = self.table_cache.remove(current_name).await; + let table = self.table_cache.remove(¤t_cache_key).await; if let Some(table) = table { - self.table_cache.insert(new_name.into(), table).await; + self.table_cache.insert(new_cache_key, table).await; } Ok(()) } - async fn drop_table(&self, name: &str) -> Result<()> { - let req = self.client.post(&format!("/v1/table/{}/drop/", name)); + async fn drop_table(&self, name: &str, namespace: &[String]) -> Result<()> { + let identifier = build_table_identifier(name, namespace, &self.client.id_delimiter); + let cache_key = build_cache_key(name, namespace); + let req = self.client.post(&format!("/v1/table/{}/drop/", identifier)); let (request_id, resp) = self.client.send(req).await?; self.client.check_response(&request_id, resp).await?; - self.table_cache.remove(name).await; + self.table_cache.remove(&cache_key).await; Ok(()) } - async fn drop_all_tables(&self) -> Result<()> { + async fn drop_all_tables(&self, namespace: &[String]) -> Result<()> { + // TODO: Implement namespace-aware drop_all_tables + let _namespace = namespace; // Suppress unused warning for now Err(crate::Error::NotSupported { - message: "Dropping databases is not supported in the remote API".to_string(), + message: "Dropping all tables is not currently supported in the remote API".to_string(), }) } + async fn list_namespaces(&self, request: ListNamespacesRequest) -> Result> { + let namespace_id = + build_namespace_identifier(request.namespace.as_slice(), &self.client.id_delimiter); + let mut req = self + .client + .get(&format!("/v1/namespace/{}/list", namespace_id)); + if let Some(limit) = request.limit { + req = req.query(&[("limit", limit)]); + } + if let Some(page_token) = request.page_token { + req = req.query(&[("page_token", page_token)]); + } + + let (request_id, resp) = self.client.send(req).await?; + let resp = self.client.check_response(&request_id, resp).await?; + + #[derive(Deserialize)] + struct ListNamespacesResponse { + namespaces: Vec, + } + + let parsed: ListNamespacesResponse = resp.json().await.map_err(|e| Error::Runtime { + message: format!("Failed to parse namespace response: {}", e), + })?; + Ok(parsed.namespaces) + } + + async fn create_namespace(&self, request: CreateNamespaceRequest) -> Result<()> { + let namespace_id = + build_namespace_identifier(request.namespace.as_slice(), &self.client.id_delimiter); + let req = self + .client + .post(&format!("/v1/namespace/{}/create", namespace_id)); + let (request_id, resp) = self.client.send(req).await?; + self.client.check_response(&request_id, resp).await?; + Ok(()) + } + + async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result<()> { + let namespace_id = + build_namespace_identifier(request.namespace.as_slice(), &self.client.id_delimiter); + let req = self + .client + .post(&format!("/v1/namespace/{}/drop", namespace_id)); + let (request_id, resp) = self.client.send(req).await?; + self.client.check_response(&request_id, resp).await?; + Ok(()) + } + fn as_any(&self) -> &dyn std::any::Any { self } @@ -436,6 +586,7 @@ impl From for RemoteOptions { #[cfg(test)] mod tests { + use super::build_cache_key; use std::sync::{Arc, OnceLock}; use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator}; @@ -448,6 +599,38 @@ mod tests { Connection, Error, }; + #[test] + fn test_cache_key_security() { + // Test that cache keys are unique regardless of delimiter manipulation + + // Case 1: Different delimiters should not affect cache key + let key1 = build_cache_key("table1", &["ns1".to_string(), "ns2".to_string()]); + let key2 = build_cache_key("table1", &["ns1$ns2".to_string()]); + assert_ne!( + key1, key2, + "Cache keys should differ for different namespace structures" + ); + + // Case 2: Table name containing delimiter should not cause collision + let key3 = build_cache_key("ns2$table1", &["ns1".to_string()]); + assert_ne!( + key1, key3, + "Cache key should be different when table name contains delimiter" + ); + + // Case 3: Empty namespace vs namespace with empty string + let key4 = build_cache_key("table1", &[]); + let key5 = build_cache_key("table1", &["".to_string()]); + assert_ne!( + key4, key5, + "Empty namespace should differ from namespace with empty string" + ); + + // Case 4: Verify same inputs produce same key (consistency) + let key6 = build_cache_key("table1", &["ns1".to_string(), "ns2".to_string()]); + assert_eq!(key1, key6, "Same inputs should produce same cache key"); + } + #[tokio::test] async fn test_retries() { // We'll record the request_id here, to check it matches the one in the error. @@ -711,7 +894,7 @@ mod tests { http::Response::builder().status(200).body("").unwrap() }); - conn.drop_table("table1").await.unwrap(); + conn.drop_table("table1", &[]).await.unwrap(); // NOTE: the API will return 200 even if the table does not exist. So we shouldn't expect 404. } @@ -731,7 +914,9 @@ mod tests { http::Response::builder().status(200).body("").unwrap() }); - conn.rename_table("table1", "table2").await.unwrap(); + conn.rename_table("table1", "table2", &[], &[]) + .await + .unwrap(); } #[tokio::test] @@ -745,4 +930,186 @@ mod tests { .await .unwrap(); } + + #[tokio::test] + async fn test_table_names_with_root_namespace() { + // When namespace is empty (root namespace), should use /v1/table/ for backwards compatibility + let conn = Connection::new_with_handler(|request| { + assert_eq!(request.method(), &reqwest::Method::GET); + assert_eq!(request.url().path(), "/v1/table/"); + assert_eq!(request.url().query(), None); + + http::Response::builder() + .status(200) + .body(r#"{"tables": ["table1", "table2"]}"#) + .unwrap() + }); + let names = conn + .table_names() + .namespace(vec![]) + .execute() + .await + .unwrap(); + assert_eq!(names, vec!["table1", "table2"]); + } + + #[tokio::test] + async fn test_table_names_with_namespace() { + // When namespace is non-empty, should use /v1/namespace/{id}/table/list + let conn = Connection::new_with_handler(|request| { + assert_eq!(request.method(), &reqwest::Method::GET); + assert_eq!(request.url().path(), "/v1/namespace/test/table/list"); + assert_eq!(request.url().query(), None); + + http::Response::builder() + .status(200) + .body(r#"{"tables": ["table1", "table2"]}"#) + .unwrap() + }); + let names = conn + .table_names() + .namespace(vec!["test".to_string()]) + .execute() + .await + .unwrap(); + assert_eq!(names, vec!["table1", "table2"]); + } + + #[tokio::test] + async fn test_table_names_with_nested_namespace() { + // When namespace is vec!["ns1", "ns2"], should use /v1/namespace/ns1$ns2/table/list + let conn = Connection::new_with_handler(|request| { + assert_eq!(request.method(), &reqwest::Method::GET); + assert_eq!(request.url().path(), "/v1/namespace/ns1$ns2/table/list"); + assert_eq!(request.url().query(), None); + + http::Response::builder() + .status(200) + .body(r#"{"tables": ["ns1$ns2$table1", "ns1$ns2$table2"]}"#) + .unwrap() + }); + let names = conn + .table_names() + .namespace(vec!["ns1".to_string(), "ns2".to_string()]) + .execute() + .await + .unwrap(); + assert_eq!(names, vec!["ns1$ns2$table1", "ns1$ns2$table2"]); + } + + #[tokio::test] + async fn test_open_table_with_namespace() { + let conn = Connection::new_with_handler(|request| { + assert_eq!(request.method(), &reqwest::Method::POST); + assert_eq!(request.url().path(), "/v1/table/ns1$ns2$table1/describe/"); + assert_eq!(request.url().query(), None); + + http::Response::builder() + .status(200) + .body(r#"{"table": "table1"}"#) + .unwrap() + }); + let table = conn + .open_table("table1") + .namespace(vec!["ns1".to_string(), "ns2".to_string()]) + .execute() + .await + .unwrap(); + assert_eq!(table.name(), "table1"); + } + + #[tokio::test] + async fn test_create_table_with_namespace() { + let conn = Connection::new_with_handler(|request| { + assert_eq!(request.method(), &reqwest::Method::POST); + assert_eq!(request.url().path(), "/v1/table/ns1$table1/create/"); + assert_eq!( + request + .headers() + .get(reqwest::header::CONTENT_TYPE) + .unwrap(), + ARROW_STREAM_CONTENT_TYPE.as_bytes() + ); + + http::Response::builder().status(200).body("").unwrap() + }); + let data = RecordBatch::try_new( + Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])), + vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], + ) + .unwrap(); + let reader = RecordBatchIterator::new([Ok(data.clone())], data.schema()); + let table = conn + .create_table("table1", reader) + .namespace(vec!["ns1".to_string()]) + .execute() + .await + .unwrap(); + assert_eq!(table.name(), "table1"); + } + + #[tokio::test] + async fn test_drop_table_with_namespace() { + let conn = Connection::new_with_handler(|request| { + assert_eq!(request.method(), &reqwest::Method::POST); + assert_eq!(request.url().path(), "/v1/table/ns1$ns2$table1/drop/"); + assert_eq!(request.url().query(), None); + assert!(request.body().is_none()); + + http::Response::builder().status(200).body("").unwrap() + }); + conn.drop_table("table1", &["ns1".to_string(), "ns2".to_string()]) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_rename_table_with_namespace() { + let conn = Connection::new_with_handler(|request| { + assert_eq!(request.method(), &reqwest::Method::POST); + assert_eq!(request.url().path(), "/v1/table/ns1$table1/rename/"); + assert_eq!( + request.headers().get("Content-Type").unwrap(), + JSON_CONTENT_TYPE + ); + + let body = request.body().unwrap().as_bytes().unwrap(); + let body: serde_json::Value = serde_json::from_slice(body).unwrap(); + assert_eq!(body["new_table_name"], "table2"); + assert_eq!(body["new_namespace"], serde_json::json!(["ns2"])); + + http::Response::builder().status(200).body("").unwrap() + }); + conn.rename_table( + "table1", + "table2", + &["ns1".to_string()], + &["ns2".to_string()], + ) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_create_empty_table_with_namespace() { + let conn = Connection::new_with_handler(|request| { + assert_eq!(request.method(), &reqwest::Method::POST); + assert_eq!(request.url().path(), "/v1/table/prod$data$metrics/create/"); + assert_eq!( + request + .headers() + .get(reqwest::header::CONTENT_TYPE) + .unwrap(), + ARROW_STREAM_CONTENT_TYPE.as_bytes() + ); + + http::Response::builder().status(200).body("").unwrap() + }); + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + conn.create_empty_table("metrics", schema) + .namespace(vec!["prod".to_string(), "data".to_string()]) + .execute() + .await + .unwrap(); + } } diff --git a/rust/lancedb/src/remote/table.rs b/rust/lancedb/src/remote/table.rs index 14a022a9..8338978a 100644 --- a/rust/lancedb/src/remote/table.rs +++ b/rust/lancedb/src/remote/table.rs @@ -70,7 +70,7 @@ impl Tags for RemoteTags<'_, S> { let request = self .inner .client - .post(&format!("/v1/table/{}/tags/list/", self.inner.name)); + .post(&format!("/v1/table/{}/tags/list/", self.inner.identifier)); let (request_id, response) = self.inner.send(request, true).await?; let response = self .inner @@ -104,7 +104,10 @@ impl Tags for RemoteTags<'_, S> { let request = self .inner .client - .post(&format!("/v1/table/{}/tags/version/", self.inner.name)) + .post(&format!( + "/v1/table/{}/tags/version/", + self.inner.identifier + )) .json(&serde_json::json!({ "tag": tag })); let (request_id, response) = self.inner.send(request, true).await?; @@ -146,7 +149,7 @@ impl Tags for RemoteTags<'_, S> { let request = self .inner .client - .post(&format!("/v1/table/{}/tags/create/", self.inner.name)) + .post(&format!("/v1/table/{}/tags/create/", self.inner.identifier)) .json(&serde_json::json!({ "tag": tag, "version": version @@ -163,7 +166,7 @@ impl Tags for RemoteTags<'_, S> { let request = self .inner .client - .post(&format!("/v1/table/{}/tags/delete/", self.inner.name)) + .post(&format!("/v1/table/{}/tags/delete/", self.inner.identifier)) .json(&serde_json::json!({ "tag": tag })); let (request_id, response) = self.inner.send(request, true).await?; @@ -177,7 +180,7 @@ impl Tags for RemoteTags<'_, S> { let request = self .inner .client - .post(&format!("/v1/table/{}/tags/update/", self.inner.name)) + .post(&format!("/v1/table/{}/tags/update/", self.inner.identifier)) .json(&serde_json::json!({ "tag": tag, "version": version @@ -196,6 +199,8 @@ pub struct RemoteTable { #[allow(dead_code)] client: RestfulLanceDbClient, name: String, + namespace: Vec, + identifier: String, server_version: ServerVersion, version: RwLock>, @@ -205,11 +210,15 @@ impl RemoteTable { pub fn new( client: RestfulLanceDbClient, name: String, + namespace: Vec, + identifier: String, server_version: ServerVersion, ) -> Self { Self { client, name, + namespace, + identifier, server_version, version: RwLock::new(None), } @@ -223,7 +232,7 @@ impl RemoteTable { async fn describe_version(&self, version: Option) -> Result { let mut request = self .client - .post(&format!("/v1/table/{}/describe/", self.name)); + .post(&format!("/v1/table/{}/describe/", self.identifier)); let body = serde_json::json!({ "version": version }); request = request.json(&body); @@ -334,7 +343,7 @@ impl RemoteTable { ) -> Result { if response.status() == StatusCode::NOT_FOUND { return Err(Error::TableNotFound { - name: self.name.clone(), + name: self.identifier.clone(), }); } @@ -548,7 +557,9 @@ impl RemoteTable { query: &AnyQuery, options: &QueryExecutionOptions, ) -> Result>>> { - let mut request = self.client.post(&format!("/v1/table/{}/query/", self.name)); + let mut request = self + .client + .post(&format!("/v1/table/{}/query/", self.identifier)); if let Some(timeout) = options.timeout { // Also send to server, so it can abort the query if it takes too long. @@ -615,7 +626,7 @@ struct TableDescription { impl std::fmt::Display for RemoteTable { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "RemoteTable({})", self.name) + write!(f, "RemoteTable({})", self.identifier) } } @@ -634,7 +645,9 @@ mod test_utils { let client = client_with_handler(handler); Self { client, - name, + name: name.clone(), + namespace: vec![], + identifier: name, server_version: version.map(ServerVersion).unwrap_or_default(), version: RwLock::new(None), } @@ -650,6 +663,14 @@ impl BaseTable for RemoteTable { fn name(&self) -> &str { &self.name } + + fn namespace(&self) -> &[String] { + &self.namespace + } + + fn id(&self) -> &str { + &self.identifier + } async fn version(&self) -> Result { self.describe().await.map(|desc| desc.version) } @@ -678,7 +699,7 @@ impl BaseTable for RemoteTable { async fn restore(&self) -> Result<()> { let mut request = self .client - .post(&format!("/v1/table/{}/restore/", self.name)); + .post(&format!("/v1/table/{}/restore/", self.identifier)); let version = self.current_version().await; let body = serde_json::json!({ "version": version }); request = request.json(&body); @@ -692,7 +713,7 @@ impl BaseTable for RemoteTable { async fn list_versions(&self) -> Result> { let request = self .client - .post(&format!("/v1/table/{}/version/list/", self.name)); + .post(&format!("/v1/table/{}/version/list/", self.identifier)); let (request_id, response) = self.send(request, true).await?; let response = self.check_table_response(&request_id, response).await?; @@ -723,7 +744,7 @@ impl BaseTable for RemoteTable { async fn count_rows(&self, filter: Option) -> Result { let mut request = self .client - .post(&format!("/v1/table/{}/count_rows/", self.name)); + .post(&format!("/v1/table/{}/count_rows/", self.identifier)); let version = self.current_version().await; @@ -759,7 +780,7 @@ impl BaseTable for RemoteTable { self.check_mutable().await?; let mut request = self .client - .post(&format!("/v1/table/{}/insert/", self.name)) + .post(&format!("/v1/table/{}/insert/", self.identifier)) .header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE); match add.mode { @@ -831,7 +852,7 @@ impl BaseTable for RemoteTable { async fn explain_plan(&self, query: &AnyQuery, verbose: bool) -> Result { let base_request = self .client - .post(&format!("/v1/table/{}/explain_plan/", self.name)); + .post(&format!("/v1/table/{}/explain_plan/", self.identifier)); let query_bodies = self.prepare_query_bodies(query).await?; let requests: Vec = query_bodies @@ -880,7 +901,7 @@ impl BaseTable for RemoteTable { ) -> Result { let request = self .client - .post(&format!("/v1/table/{}/analyze_plan/", self.name)); + .post(&format!("/v1/table/{}/analyze_plan/", self.identifier)); let query_bodies = self.prepare_query_bodies(query).await?; let requests: Vec = query_bodies @@ -919,7 +940,7 @@ impl BaseTable for RemoteTable { self.check_mutable().await?; let request = self .client - .post(&format!("/v1/table/{}/update/", self.name)); + .post(&format!("/v1/table/{}/update/", self.identifier)); let mut updates = Vec::new(); for (column, expression) in update.columns { @@ -958,7 +979,7 @@ impl BaseTable for RemoteTable { let body = serde_json::json!({ "predicate": predicate }); let request = self .client - .post(&format!("/v1/table/{}/delete/", self.name)) + .post(&format!("/v1/table/{}/delete/", self.identifier)) .json(&body); let (request_id, response) = self.send(request, true).await?; let response = self.check_table_response(&request_id, response).await?; @@ -980,7 +1001,7 @@ impl BaseTable for RemoteTable { self.check_mutable().await?; let request = self .client - .post(&format!("/v1/table/{}/create_index/", self.name)); + .post(&format!("/v1/table/{}/create_index/", self.identifier)); let column = match index.columns.len() { 0 => { @@ -1121,7 +1142,7 @@ impl BaseTable for RemoteTable { let query = MergeInsertRequest::try_from(params)?; let mut request = self .client - .post(&format!("/v1/table/{}/merge_insert/", self.name)) + .post(&format!("/v1/table/{}/merge_insert/", self.identifier)) .query(&query) .header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE); @@ -1193,7 +1214,7 @@ impl BaseTable for RemoteTable { let body = serde_json::json!({ "new_columns": body }); let request = self .client - .post(&format!("/v1/table/{}/add_columns/", self.name)) + .post(&format!("/v1/table/{}/add_columns/", self.identifier)) .json(&body); let (request_id, response) = self.send(request, true).await?; let response = self.check_table_response(&request_id, response).await?; @@ -1246,7 +1267,7 @@ impl BaseTable for RemoteTable { let body = serde_json::json!({ "alterations": body }); let request = self .client - .post(&format!("/v1/table/{}/alter_columns/", self.name)) + .post(&format!("/v1/table/{}/alter_columns/", self.identifier)) .json(&body); let (request_id, response) = self.send(request, true).await?; let response = self.check_table_response(&request_id, response).await?; @@ -1271,7 +1292,7 @@ impl BaseTable for RemoteTable { let body = serde_json::json!({ "columns": columns }); let request = self .client - .post(&format!("/v1/table/{}/drop_columns/", self.name)) + .post(&format!("/v1/table/{}/drop_columns/", self.identifier)) .json(&body); let (request_id, response) = self.send(request, true).await?; let response = self.check_table_response(&request_id, response).await?; @@ -1295,7 +1316,7 @@ impl BaseTable for RemoteTable { // Make request to list the indices let mut request = self .client - .post(&format!("/v1/table/{}/index/list/", self.name)); + .post(&format!("/v1/table/{}/index/list/", self.identifier)); let version = self.current_version().await; let body = serde_json::json!({ "version": version }); request = request.json(&body); @@ -1351,7 +1372,7 @@ impl BaseTable for RemoteTable { async fn index_stats(&self, index_name: &str) -> Result> { let mut request = self.client.post(&format!( "/v1/table/{}/index/{}/stats/", - self.name, index_name + self.identifier, index_name )); let version = self.current_version().await; let body = serde_json::json!({ "version": version }); @@ -1379,7 +1400,7 @@ impl BaseTable for RemoteTable { async fn drop_index(&self, index_name: &str) -> Result<()> { let request = self.client.post(&format!( "/v1/table/{}/index/{}/drop/", - self.name, index_name + self.identifier, index_name )); let (request_id, response) = self.send(request, true).await?; if response.status() == StatusCode::NOT_FOUND { @@ -1407,7 +1428,9 @@ impl BaseTable for RemoteTable { } async fn stats(&self) -> Result { - let request = self.client.post(&format!("/v1/table/{}/stats/", self.name)); + let request = self + .client + .post(&format!("/v1/table/{}/stats/", self.identifier)); let (request_id, response) = self.send(request, true).await?; let response = self.check_table_response(&request_id, response).await?; let body = response.text().await.err_to_http(request_id.clone())?; @@ -3082,4 +3105,174 @@ mod tests { }); table } + + #[tokio::test] + async fn test_table_with_namespace_identifier() { + // Test that a table created with namespace uses the correct identifier in API calls + let table = Table::new_with_handler("ns1$ns2$table1", |request| { + assert_eq!(request.method(), "POST"); + // All API calls should use the full identifier in the path + assert_eq!(request.url().path(), "/v1/table/ns1$ns2$table1/describe/"); + + http::Response::builder() + .status(200) + .body(r#"{"version": 1, "schema": { "fields": [] }}"#) + .unwrap() + }); + + // The name() method should return just the base name, not the full identifier + assert_eq!(table.name(), "ns1$ns2$table1"); + + // API operations should work correctly + let version = table.version().await.unwrap(); + assert_eq!(version, 1); + } + + #[tokio::test] + async fn test_query_with_namespace() { + let table = Table::new_with_handler("analytics$events", |request| { + match request.url().path() { + "/v1/table/analytics$events/query/" => { + assert_eq!(request.method(), "POST"); + + // Return empty arrow stream + let data = RecordBatch::try_new( + Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])), + vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], + ) + .unwrap(); + let body = write_ipc_file(&data); + + http::Response::builder() + .status(200) + .header("Content-Type", ARROW_FILE_CONTENT_TYPE) + .body(body) + .unwrap() + } + _ => { + panic!("Unexpected path: {}", request.url().path()); + } + } + }); + + let results = table.query().execute().await.unwrap(); + let batches = results.try_collect::>().await.unwrap(); + assert_eq!(batches.len(), 1); + assert_eq!(batches[0].num_rows(), 3); + } + + #[tokio::test] + async fn test_add_data_with_namespace() { + let data = RecordBatch::try_new( + Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])), + vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], + ) + .unwrap(); + + let (sender, receiver) = std::sync::mpsc::channel(); + let table = Table::new_with_handler("prod$metrics", move |mut request| { + if request.url().path() == "/v1/table/prod$metrics/insert/" { + assert_eq!(request.method(), "POST"); + assert_eq!( + request.headers().get("Content-Type").unwrap(), + ARROW_STREAM_CONTENT_TYPE + ); + let mut body_out = reqwest::Body::from(Vec::new()); + std::mem::swap(request.body_mut().as_mut().unwrap(), &mut body_out); + sender.send(body_out).unwrap(); + http::Response::builder() + .status(200) + .body(r#"{"version": 2}"#) + .unwrap() + } else { + panic!("Unexpected request path: {}", request.url().path()); + } + }); + + let result = table + .add(RecordBatchIterator::new([Ok(data.clone())], data.schema())) + .execute() + .await + .unwrap(); + + assert_eq!(result.version, 2); + + let body = receiver.recv().unwrap(); + let body = collect_body(body).await; + let expected_body = write_ipc_stream(&data); + assert_eq!(&body, &expected_body); + } + + #[tokio::test] + async fn test_create_index_with_namespace() { + let table = Table::new_with_handler("dev$users", |request| { + match request.url().path() { + "/v1/table/dev$users/create_index/" => { + assert_eq!(request.method(), "POST"); + assert_eq!( + request.headers().get("Content-Type").unwrap(), + JSON_CONTENT_TYPE + ); + + // Verify the request body contains the column name + if let Some(body) = request.body().unwrap().as_bytes() { + let body = std::str::from_utf8(body).unwrap(); + let value: serde_json::Value = serde_json::from_str(body).unwrap(); + assert_eq!(value["column"], "embedding"); + assert_eq!(value["index_type"], "IVF_PQ"); + } + + http::Response::builder().status(200).body("").unwrap() + } + "/v1/table/dev$users/describe/" => { + // Needed for schema check in Auto index type + http::Response::builder() + .status(200) + .body(r#"{"version": 1, "schema": {"fields": [{"name": "embedding", "type": {"type": "list", "item": {"type": "float32"}}, "nullable": false}]}}"#) + .unwrap() + } + _ => { + panic!("Unexpected path: {}", request.url().path()); + } + } + }); + + table + .create_index(&["embedding"], Index::IvfPq(IvfPqIndexBuilder::default())) + .execute() + .await + .unwrap(); + } + + #[tokio::test] + async fn test_drop_columns_with_namespace() { + let table = Table::new_with_handler("test$schema_ops", |request| { + assert_eq!(request.method(), "POST"); + assert_eq!( + request.url().path(), + "/v1/table/test$schema_ops/drop_columns/" + ); + assert_eq!( + request.headers().get("Content-Type").unwrap(), + JSON_CONTENT_TYPE + ); + + if let Some(body) = request.body().unwrap().as_bytes() { + let body = std::str::from_utf8(body).unwrap(); + let value: serde_json::Value = serde_json::from_str(body).unwrap(); + let columns = value["columns"].as_array().unwrap(); + assert_eq!(columns.len(), 2); + assert_eq!(columns[0], "old_col1"); + assert_eq!(columns[1], "old_col2"); + } + + http::Response::builder() + .status(200) + .body(r#"{"version": 5}"#) + .unwrap() + }); + + let result = table.drop_columns(&["old_col1", "old_col2"]).await.unwrap(); + assert_eq!(result.version, 5); + } } diff --git a/rust/lancedb/src/table.rs b/rust/lancedb/src/table.rs index 114dac44..1bbeed52 100644 --- a/rust/lancedb/src/table.rs +++ b/rust/lancedb/src/table.rs @@ -509,6 +509,10 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync { fn as_any(&self) -> &dyn std::any::Any; /// Get the name of the table. fn name(&self) -> &str; + /// Get the namespace of the table. + fn namespace(&self) -> &[String]; + /// Get the id of the table + fn id(&self) -> &str; /// Get the arrow [Schema] of the table. async fn schema(&self) -> Result; /// Count the number of rows in this table. @@ -2007,6 +2011,16 @@ impl BaseTable for NativeTable { self.name.as_str() } + fn namespace(&self) -> &[String] { + // Native tables don't support namespaces yet, return empty slice for root namespace + &[] + } + + fn id(&self) -> &str { + // For native tables, id is same as name since no namespace support + self.name.as_str() + } + async fn version(&self) -> Result { Ok(self.dataset.get().await?.version().version) } diff --git a/rust/lancedb/tests/object_store_test.rs b/rust/lancedb/tests/object_store_test.rs index b2deb4b5..c874fefb 100644 --- a/rust/lancedb/tests/object_store_test.rs +++ b/rust/lancedb/tests/object_store_test.rs @@ -130,7 +130,7 @@ async fn test_minio_lifecycle() -> Result<()> { let data = RecordBatchIterator::new(vec![Ok(data.clone())], data.schema()); table.add(data).execute().await?; - db.drop_table("test_table").await?; + db.drop_table("test_table", &[]).await?; Ok(()) }