diff --git a/python/python/lancedb/db.py b/python/python/lancedb/db.py index 2a9194cfd..36e044809 100644 --- a/python/python/lancedb/db.py +++ b/python/python/lancedb/db.py @@ -596,6 +596,7 @@ class LanceDBConnection(DBConnection): ): if _inner is not None: self._conn = _inner + self._cached_namespace_client = None return if not isinstance(uri, Path): @@ -643,6 +644,7 @@ class LanceDBConnection(DBConnection): # beyond _conn. self.storage_options = storage_options self._conn = AsyncConnection(LOOP.run(do_connect())) + self._cached_namespace_client: Optional[LanceNamespace] = None @property def read_consistency_interval(self) -> Optional[timedelta]: @@ -714,11 +716,13 @@ class LanceDBConnection(DBConnection): ListNamespacesResponse Response containing namespace names and optional page_token for pagination. """ + from lance_namespace import ListNamespacesRequest + if namespace_path is None: namespace_path = [] - return LOOP.run( - self._conn.list_namespaces( - namespace_path=namespace_path, page_token=page_token, limit=limit + return self.namespace_client().list_namespaces( + ListNamespacesRequest( + id=namespace_path, page_token=page_token, limit=limit ) ) @@ -746,11 +750,15 @@ class LanceDBConnection(DBConnection): CreateNamespaceResponse Response containing the properties of the created namespace. """ - return LOOP.run( - self._conn.create_namespace( - namespace_path=namespace_path, mode=mode, properties=properties - ) + from lance_namespace import CreateNamespaceRequest + from lancedb.namespace_utils import _normalize_create_namespace_mode + + req = CreateNamespaceRequest( + id=namespace_path, + mode=_normalize_create_namespace_mode(mode), + properties=properties, ) + return self.namespace_client().create_namespace(req) @override def drop_namespace( @@ -776,12 +784,19 @@ class LanceDBConnection(DBConnection): DropNamespaceResponse Response containing properties and transaction_id if applicable. """ - return LOOP.run( - self._conn.drop_namespace( - namespace_path=namespace_path, mode=mode, behavior=behavior - ) + from lance_namespace import DropNamespaceRequest + from lancedb.namespace_utils import ( + _normalize_drop_namespace_behavior, + _normalize_drop_namespace_mode, ) + req = DropNamespaceRequest( + id=namespace_path, + mode=_normalize_drop_namespace_mode(mode), + behavior=_normalize_drop_namespace_behavior(behavior), + ) + return self.namespace_client().drop_namespace(req) + @override def describe_namespace( self, namespace_path: List[str] @@ -798,7 +813,11 @@ class LanceDBConnection(DBConnection): DescribeNamespaceResponse Response containing the namespace properties. """ - return LOOP.run(self._conn.describe_namespace(namespace_path=namespace_path)) + from lance_namespace import DescribeNamespaceRequest + + return self.namespace_client().describe_namespace( + DescribeNamespaceRequest(id=namespace_path) + ) @override def list_tables( @@ -827,6 +846,14 @@ class LanceDBConnection(DBConnection): """ if namespace_path is None: namespace_path = [] + if namespace_path: + from lance_namespace import ListTablesRequest + + return self.namespace_client().list_tables( + ListTablesRequest( + id=namespace_path, page_token=page_token, limit=limit + ) + ) return LOOP.run( self._conn.list_tables( namespace_path=namespace_path, page_token=page_token, limit=limit @@ -915,6 +942,22 @@ class LanceDBConnection(DBConnection): raise ValueError("mode must be either 'create' or 'overwrite'") validate_table_name(name) + if namespace_path: + return self._create_table_via_namespace( + name, + data=data, + schema=schema, + mode=mode, + exist_ok=exist_ok, + on_bad_vectors=on_bad_vectors, + fill_value=fill_value, + embedding_functions=embedding_functions, + namespace_path=namespace_path, + storage_options=storage_options, + data_storage_version=data_storage_version, + enable_v2_manifest_paths=enable_v2_manifest_paths, + ) + tbl = LanceTable.create( self, name, @@ -930,6 +973,48 @@ class LanceDBConnection(DBConnection): ) return tbl + def _create_table_via_namespace( + self, + name: str, + *, + data: Optional[DATA] = None, + schema: Optional[Union[pa.Schema, LanceModel]] = None, + mode: str = "create", + exist_ok: bool = False, + on_bad_vectors: str = "error", + fill_value: float = 0.0, + embedding_functions: Optional[List[EmbeddingFunctionConfig]] = None, + namespace_path: List[str], + storage_options: Optional[Dict[str, str]] = None, + data_storage_version: Optional[str] = None, + enable_v2_manifest_paths: Optional[bool] = None, + ) -> LanceTable: + """Create a table through the directory namespace client.""" + from lancedb.namespace import LanceNamespaceDBConnection + + ns_client = self.namespace_client() + ns_conn = LanceNamespaceDBConnection( + ns_client, + read_consistency_interval=self.read_consistency_interval, + storage_options=self.storage_options, + namespace_client_impl=None, + namespace_client_properties=None, + ) + return ns_conn.create_table( + name, + data=data, + schema=schema, + mode=mode, + exist_ok=exist_ok, + on_bad_vectors=on_bad_vectors, + fill_value=fill_value, + embedding_functions=embedding_functions, + namespace_path=namespace_path, + storage_options=storage_options, + data_storage_version=data_storage_version, + enable_v2_manifest_paths=enable_v2_manifest_paths, + ) + @override def open_table( self, @@ -946,7 +1031,8 @@ class LanceDBConnection(DBConnection): name: str The name of the table. namespace_path: List[str], optional - The namespace to open the table from. + The namespace to open the table from. When non-empty, the + table is resolved through the directory namespace client. Returns ------- @@ -965,6 +1051,14 @@ class LanceDBConnection(DBConnection): stacklevel=2, ) + if namespace_path: + return self._open_table_via_namespace( + name, + namespace_path=namespace_path, + storage_options=storage_options, + index_cache_size=index_cache_size, + ) + return LanceTable.open( self, name, @@ -973,6 +1067,45 @@ class LanceDBConnection(DBConnection): index_cache_size=index_cache_size, ) + def _open_table_via_namespace( + self, + name: str, + *, + namespace_path: List[str], + storage_options: Optional[Dict[str, str]] = None, + index_cache_size: Optional[int] = None, + ) -> LanceTable: + """Open a table through the directory namespace client.""" + from lance_namespace import DescribeTableRequest + + ns_client = self.namespace_client() + table_id = namespace_path + [name] + response = ns_client.describe_table(DescribeTableRequest(id=table_id)) + + merged = dict(self.storage_options or {}) + if storage_options: + merged.update(storage_options) + if response.storage_options: + merged.update(response.storage_options) + + managed_versioning = response.managed_versioning is True + + temp_conn = LanceDBConnection( + response.location, + read_consistency_interval=self.read_consistency_interval, + storage_options=merged or None, + ) + return LanceTable.open( + temp_conn, + name, + namespace_path=namespace_path, + storage_options=merged or None, + index_cache_size=index_cache_size, + location=response.location, + namespace_client=ns_client, + managed_versioning=managed_versioning, + ) + def clone_table( self, target_table_name: str, @@ -1049,6 +1182,13 @@ class LanceDBConnection(DBConnection): """ if namespace_path is None: namespace_path = [] + if namespace_path: + from lance_namespace import DropTableRequest + + self.namespace_client().drop_table( + DropTableRequest(id=namespace_path + [name]) + ) + return LOOP.run( self._conn.drop_table( name, namespace_path=namespace_path, ignore_missing=ignore_missing @@ -1100,14 +1240,19 @@ class LanceDBConnection(DBConnection): """Get the equivalent namespace client for this connection. Returns a DirectoryNamespace pointing to the same root with the - same storage options. + same storage options. The result is cached for the lifetime of + this connection. Returns ------- LanceNamespace The namespace client for this connection. """ - return LOOP.run(self._conn.namespace_client()) + if self._cached_namespace_client is None: + self._cached_namespace_client = LOOP.run( + self._conn.namespace_client() + ) + return self._cached_namespace_client @deprecation.deprecated( deprecated_in="0.15.1",