diff --git a/python/pyproject.toml b/python/pyproject.toml index f9f4c55a..687d04a2 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -10,6 +10,7 @@ dependencies = [ "pyarrow>=16", "pydantic>=1.10", "tqdm>=4.27.0", + "lance-namespace==0.0.6" ] description = "lancedb" authors = [{ name = "LanceDB Devs", email = "dev@lancedb.com" }] diff --git a/python/python/lancedb/__init__.py b/python/python/lancedb/__init__.py index e4112577..7f15be8a 100644 --- a/python/python/lancedb/__init__.py +++ b/python/python/lancedb/__init__.py @@ -19,6 +19,7 @@ from .remote.db import RemoteDBConnection from .schema import vector from .table import AsyncTable from ._lancedb import Session +from .namespace import connect_namespace, LanceNamespaceDBConnection def connect( @@ -221,6 +222,7 @@ async def connect_async( __all__ = [ "connect", "connect_async", + "connect_namespace", "AsyncConnection", "AsyncTable", "URI", @@ -228,6 +230,7 @@ __all__ = [ "vector", "DBConnection", "LanceDBConnection", + "LanceNamespaceDBConnection", "RemoteDBConnection", "Session", "__version__", diff --git a/python/python/lancedb/namespace.py b/python/python/lancedb/namespace.py new file mode 100644 index 00000000..ad0bf0c2 --- /dev/null +++ b/python/python/lancedb/namespace.py @@ -0,0 +1,325 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright The LanceDB Authors + +""" +LanceDB Namespace integration module. + +This module provides integration with lance_namespace for managing tables +through a namespace abstraction. +""" + +from __future__ import annotations + +from typing import Dict, Iterable, List, Optional, Union +import os + +from lancedb.db import DBConnection +from lancedb.table import LanceTable, Table +from lancedb.util import validate_table_name +from lancedb.common import validate_schema +from lancedb.table import sanitize_create_table +from overrides import override + +from lance_namespace import LanceNamespace, connect as namespace_connect +from lance_namespace_urllib3_client.models import ( + ListTablesRequest, + DescribeTableRequest, + CreateTableRequest, + DropTableRequest, + JsonArrowSchema, + JsonArrowField, + JsonArrowDataType, +) + +import pyarrow as pa +from datetime import timedelta +from lancedb.pydantic import LanceModel +from lancedb.common import DATA +from lancedb.embeddings import EmbeddingFunctionConfig +from ._lancedb import Session + + +def _convert_pyarrow_type_to_json(arrow_type: pa.DataType) -> JsonArrowDataType: + """Convert PyArrow DataType to JsonArrowDataType.""" + if pa.types.is_null(arrow_type): + type_name = "null" + elif pa.types.is_boolean(arrow_type): + type_name = "bool" + elif pa.types.is_int8(arrow_type): + type_name = "int8" + elif pa.types.is_uint8(arrow_type): + type_name = "uint8" + elif pa.types.is_int16(arrow_type): + type_name = "int16" + elif pa.types.is_uint16(arrow_type): + type_name = "uint16" + elif pa.types.is_int32(arrow_type): + type_name = "int32" + elif pa.types.is_uint32(arrow_type): + type_name = "uint32" + elif pa.types.is_int64(arrow_type): + type_name = "int64" + elif pa.types.is_uint64(arrow_type): + type_name = "uint64" + elif pa.types.is_float32(arrow_type): + type_name = "float32" + elif pa.types.is_float64(arrow_type): + type_name = "float64" + elif pa.types.is_string(arrow_type): + type_name = "utf8" + elif pa.types.is_binary(arrow_type): + type_name = "binary" + elif pa.types.is_list(arrow_type): + # For list types, we need more complex handling + type_name = "list" + elif pa.types.is_fixed_size_list(arrow_type): + type_name = "fixed_size_list" + else: + # Default to string representation for unsupported types + type_name = str(arrow_type) + + return JsonArrowDataType(type=type_name) + + +def _convert_pyarrow_schema_to_json(schema: pa.Schema) -> JsonArrowSchema: + """Convert PyArrow Schema to JsonArrowSchema.""" + fields = [] + for field in schema: + json_field = JsonArrowField( + name=field.name, + type=_convert_pyarrow_type_to_json(field.type), + nullable=field.nullable, + metadata=field.metadata, + ) + fields.append(json_field) + + return JsonArrowSchema(fields=fields, metadata=schema.metadata) + + +class LanceNamespaceDBConnection(DBConnection): + """ + A LanceDB connection that uses a namespace for table management. + + This connection delegates table URI resolution to a lance_namespace instance, + while using the standard LanceTable for actual table operations. + """ + + def __init__( + self, + namespace: LanceNamespace, + *, + read_consistency_interval: Optional[timedelta] = None, + storage_options: Optional[Dict[str, str]] = None, + session: Optional[Session] = None, + ): + """ + Initialize a namespace-based LanceDB connection. + + Parameters + ---------- + namespace : LanceNamespace + The namespace instance to use for table management + read_consistency_interval : Optional[timedelta] + The interval at which to check for updates to the table from other + processes. If None, then consistency is not checked. + storage_options : Optional[Dict[str, str]] + Additional options for the storage backend + session : Optional[Session] + A session to use for this connection + """ + self._ns = namespace + self.read_consistency_interval = read_consistency_interval + self.storage_options = storage_options or {} + self.session = session + + @override + def table_names( + self, page_token: Optional[str] = None, limit: int = 10 + ) -> Iterable[str]: + # Use namespace to list tables + request = ListTablesRequest(id=None, page_token=page_token, limit=limit) + response = self._ns.list_tables(request) + return response.tables if response.tables else [] + + @override + def create_table( + self, + name: str, + data: Optional[DATA] = None, + schema: Optional[Union[pa.Schema, LanceModel]] = None, + mode: str = "create", + exist_ok: bool = False, + on_bad_vectors: str = "error", + fill_value: float = 0.0, + embedding_functions: Optional[List[EmbeddingFunctionConfig]] = None, + *, + storage_options: Optional[Dict[str, str]] = None, + data_storage_version: Optional[str] = None, + enable_v2_manifest_paths: Optional[bool] = None, + ) -> Table: + if mode.lower() not in ["create", "overwrite"]: + raise ValueError("mode must be either 'create' or 'overwrite'") + validate_table_name(name) + + # TODO: support passing data + if data is not None: + raise ValueError( + "create_table currently only supports creating empty tables (data=None)" + ) + + # Prepare schema + metadata = None + if embedding_functions is not None: + from lancedb.embeddings.registry import EmbeddingFunctionRegistry + + registry = EmbeddingFunctionRegistry.get_instance() + metadata = registry.get_table_metadata(embedding_functions) + + data, schema = sanitize_create_table( + data, schema, metadata, on_bad_vectors, fill_value + ) + validate_schema(schema) + + # Convert PyArrow schema to JsonArrowSchema + json_schema = _convert_pyarrow_schema_to_json(schema) + + # Create table request + request = CreateTableRequest(id=[name], var_schema=json_schema) + + # Create empty Arrow IPC stream bytes + import pyarrow.ipc as ipc + import io + + empty_table = pa.Table.from_arrays( + [pa.array([], type=field.type) for field in schema], schema=schema + ) + buffer = io.BytesIO() + with ipc.new_stream(buffer, schema) as writer: + writer.write_table(empty_table) + request_data = buffer.getvalue() + + self._ns.create_table(request, request_data) + return self.open_table(name, storage_options=storage_options) + + @override + def open_table( + self, + name: str, + *, + storage_options: Optional[Dict[str, str]] = None, + index_cache_size: Optional[int] = None, + ) -> Table: + request = DescribeTableRequest(id=[name]) + response = self._ns.describe_table(request) + + merged_storage_options = dict() + if storage_options: + merged_storage_options.update(storage_options) + if response.storage_options: + merged_storage_options.update(response.storage_options) + + return self._lance_table_from_uri( + response.location, + storage_options=merged_storage_options, + index_cache_size=index_cache_size, + ) + + @override + def drop_table(self, name: str): + # Use namespace drop_table directly + request = DropTableRequest(id=[name]) + self._ns.drop_table(request) + + @override + def rename_table(self, cur_name: str, new_name: str): + raise NotImplementedError( + "rename_table is not supported for namespace connections" + ) + + @override + def drop_database(self): + raise NotImplementedError( + "drop_database is deprecated, use drop_all_tables instead" + ) + + @override + def drop_all_tables(self): + for table_name in self.table_names(): + self.drop_table(table_name) + + def _lance_table_from_uri( + self, + table_uri: str, + *, + storage_options: Optional[Dict[str, str]] = None, + index_cache_size: Optional[int] = None, + ) -> LanceTable: + # Extract the base path and table name from the URI + if table_uri.endswith(".lance"): + base_path = os.path.dirname(table_uri) + table_name = os.path.basename(table_uri)[:-6] # Remove .lance + else: + raise ValueError(f"Invalid table URI: {table_uri}") + + from lancedb.db import LanceDBConnection + + temp_conn = LanceDBConnection( + base_path, + read_consistency_interval=self.read_consistency_interval, + storage_options={**self.storage_options, **(storage_options or {})}, + session=self.session, + ) + + # Open the table using the temporary connection + return LanceTable.open( + temp_conn, + table_name, + storage_options=storage_options, + index_cache_size=index_cache_size, + ) + + +def connect_namespace( + impl: str, + properties: Dict[str, str], + *, + read_consistency_interval: Optional[timedelta] = None, + storage_options: Optional[Dict[str, str]] = None, + session: Optional[Session] = None, +) -> LanceNamespaceDBConnection: + """ + Connect to a LanceDB database through a namespace. + + Parameters + ---------- + impl : str + The namespace implementation to use. For examples: + - "dir" for DirectoryNamespace + - "rest" for REST-based namespace + - Full module path for custom implementations + properties : Dict[str, str] + Configuration properties for the namespace implementation. + Different namespace implemenation has different config properties. + For example, use DirectoryNamespace with {"root": "/path/to/directory"} + read_consistency_interval : Optional[timedelta] + The interval at which to check for updates to the table from other + processes. If None, then consistency is not checked. + storage_options : Optional[Dict[str, str]] + Additional options for the storage backend + session : Optional[Session] + A session to use for this connection + + Returns + ------- + LanceNamespaceDBConnection + A namespace-based connection to LanceDB + """ + namespace = namespace_connect(impl, properties) + + # Return the namespace-based connection + return LanceNamespaceDBConnection( + namespace, + read_consistency_interval=read_consistency_interval, + storage_options=storage_options, + session=session, + ) diff --git a/python/python/tests/test_namespace.py b/python/python/tests/test_namespace.py new file mode 100644 index 00000000..36968763 --- /dev/null +++ b/python/python/tests/test_namespace.py @@ -0,0 +1,414 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright The LanceDB Authors + +"""Tests for LanceDB namespace integration.""" + +import tempfile +import shutil +from typing import Dict, Optional +import pytest +import pyarrow as pa +import lancedb +from lance_namespace.namespace import NATIVE_IMPLS, LanceNamespace +from lance_namespace_urllib3_client.models import ( + ListTablesRequest, + ListTablesResponse, + DescribeTableRequest, + DescribeTableResponse, + RegisterTableRequest, + RegisterTableResponse, + DeregisterTableRequest, + DeregisterTableResponse, + CreateTableRequest, + CreateTableResponse, + DropTableRequest, + DropTableResponse, +) + + +class TempNamespace(LanceNamespace): + """A simple dictionary-backed namespace for testing.""" + + # Class-level storage to persist table registry across instances + _global_registry: Dict[str, Dict[str, str]] = {} + + def __init__(self, **properties): + """Initialize the test namespace. + + Args: + root: The root directory for tables (optional) + **properties: Additional configuration properties + """ + self.config = TempNamespaceConfig(properties) + # Use the root as a key to maintain separate registries per root + root = self.config.root + if root not in self._global_registry: + self._global_registry[root] = {} + self.tables = self._global_registry[root] # Reference to shared registry + + def list_tables(self, request: ListTablesRequest) -> ListTablesResponse: + """List all tables in the namespace.""" + # For simplicity, ignore namespace ID validation + tables = list(self.tables.keys()) + return ListTablesResponse(tables=tables) + + def describe_table(self, request: DescribeTableRequest) -> DescribeTableResponse: + """Describe a table by returning its location.""" + if not request.id or len(request.id) != 1: + raise ValueError("Invalid table ID") + + table_name = request.id[0] + if table_name not in self.tables: + raise RuntimeError(f"Table does not exist: {table_name}") + + table_uri = self.tables[table_name] + return DescribeTableResponse(location=table_uri) + + def create_table( + self, request: CreateTableRequest, request_data: bytes + ) -> CreateTableResponse: + """Create a table in the namespace.""" + if not request.id or len(request.id) != 1: + raise ValueError("Invalid table ID") + + table_name = request.id[0] + + # Check if table already exists + if table_name in self.tables: + if request.mode == "overwrite": + # Drop existing table for overwrite mode + del self.tables[table_name] + else: + raise RuntimeError(f"Table already exists: {table_name}") + + # Generate table URI based on root directory + table_uri = f"{self.config.root}/{table_name}.lance" + + # Parse the Arrow IPC stream to get the schema and create the actual table + import pyarrow.ipc as ipc + import io + import lance + + # Read the IPC stream + reader = ipc.open_stream(io.BytesIO(request_data)) + table = reader.read_all() + + # Create the actual Lance table + lance.write_dataset(table, table_uri) + + # Store the table mapping + self.tables[table_name] = table_uri + + return CreateTableResponse(location=table_uri) + + def drop_table(self, request: DropTableRequest) -> DropTableResponse: + """Drop a table from the namespace.""" + if not request.id or len(request.id) != 1: + raise ValueError("Invalid table ID") + + table_name = request.id[0] + if table_name not in self.tables: + raise RuntimeError(f"Table does not exist: {table_name}") + + # Get the table URI + table_uri = self.tables[table_name] + + # Delete the actual table files + import shutil + import os + + if os.path.exists(table_uri): + shutil.rmtree(table_uri, ignore_errors=True) + + # Remove from registry + del self.tables[table_name] + + return DropTableResponse() + + def register_table(self, request: RegisterTableRequest) -> RegisterTableResponse: + """Register a table with the namespace.""" + if not request.id or len(request.id) != 1: + raise ValueError("Invalid table ID") + + if not request.location: + raise ValueError("Table location is required") + + table_name = request.id[0] + self.tables[table_name] = request.location + + return RegisterTableResponse() + + def deregister_table( + self, request: DeregisterTableRequest + ) -> DeregisterTableResponse: + """Deregister a table from the namespace.""" + if not request.id or len(request.id) != 1: + raise ValueError("Invalid table ID") + + table_name = request.id[0] + if table_name not in self.tables: + raise RuntimeError(f"Table does not exist: {table_name}") + + del self.tables[table_name] + return DeregisterTableResponse() + + +class TempNamespaceConfig: + """Configuration for TestNamespace.""" + + ROOT = "root" + + def __init__(self, properties: Optional[Dict[str, str]] = None): + """Initialize configuration from properties. + + Args: + properties: Dictionary of configuration properties + """ + if properties is None: + properties = {} + + self._root = properties.get(self.ROOT, "/tmp") + + @property + def root(self) -> str: + """Get the namespace root directory.""" + return self._root + + +NATIVE_IMPLS["temp"] = f"{TempNamespace.__module__}.TempNamespace" + + +class TestNamespaceConnection: + """Test namespace-based LanceDB connection.""" + + def setup_method(self): + """Set up test fixtures.""" + self.temp_dir = tempfile.mkdtemp() + # Clear the TestNamespace registry for this test + if self.temp_dir in TempNamespace._global_registry: + TempNamespace._global_registry[self.temp_dir].clear() + + def teardown_method(self): + """Clean up test fixtures.""" + # Clear the TestNamespace registry + if self.temp_dir in TempNamespace._global_registry: + del TempNamespace._global_registry[self.temp_dir] + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def test_connect_namespace_test(self): + """Test connecting to LanceDB through TestNamespace.""" + # Connect using TestNamespace + db = lancedb.connect_namespace("temp", {"root": self.temp_dir}) + + # Should be a LanceNamespaceDBConnection + assert isinstance(db, lancedb.LanceNamespaceDBConnection) + + # Initially no tables + assert len(list(db.table_names())) == 0 + + def test_create_table_through_namespace(self): + """Test creating a table through namespace.""" + db = lancedb.connect_namespace("temp", {"root": self.temp_dir}) + + # Define schema for empty table + schema = pa.schema( + [ + pa.field("id", pa.int64()), + pa.field("vector", pa.list_(pa.float32(), 2)), + pa.field("text", pa.string()), + ] + ) + + # Create empty table + table = db.create_table("test_table", schema=schema) + assert table is not None + assert table.name == "test_table" + + # Table should appear in namespace + table_names = list(db.table_names()) + assert "test_table" in table_names + assert len(table_names) == 1 + + # Verify empty table + result = table.to_pandas() + assert len(result) == 0 + assert list(result.columns) == ["id", "vector", "text"] + + def test_open_table_through_namespace(self): + """Test opening an existing table through namespace.""" + db = lancedb.connect_namespace("temp", {"root": self.temp_dir}) + + # Create a table with schema + schema = pa.schema( + [ + pa.field("id", pa.int64()), + pa.field("vector", pa.list_(pa.float32(), 2)), + ] + ) + db.create_table("test_table", schema=schema) + + # Open the table + table = db.open_table("test_table") + assert table is not None + assert table.name == "test_table" + + # Verify empty table with correct schema + result = table.to_pandas() + assert len(result) == 0 + assert list(result.columns) == ["id", "vector"] + + def test_drop_table_through_namespace(self): + """Test dropping a table through namespace.""" + db = lancedb.connect_namespace("temp", {"root": self.temp_dir}) + + # Create tables + schema = pa.schema( + [ + pa.field("id", pa.int64()), + pa.field("vector", pa.list_(pa.float32(), 2)), + ] + ) + db.create_table("table1", schema=schema) + db.create_table("table2", schema=schema) + + # Verify both tables exist + table_names = list(db.table_names()) + assert "table1" in table_names + assert "table2" in table_names + assert len(table_names) == 2 + + # Drop one table + db.drop_table("table1") + + # Verify only table2 remains + table_names = list(db.table_names()) + assert "table1" not in table_names + assert "table2" in table_names + assert len(table_names) == 1 + + # Should not be able to open dropped table + with pytest.raises(RuntimeError): + db.open_table("table1") + + def test_create_table_with_schema(self): + """Test creating a table with explicit schema through namespace.""" + db = lancedb.connect_namespace("temp", {"root": self.temp_dir}) + + # Define schema + schema = pa.schema( + [ + pa.field("id", pa.int64()), + pa.field("vector", pa.list_(pa.float32(), 3)), + pa.field("text", pa.string()), + ] + ) + + # Create table with schema + table = db.create_table("test_table", schema=schema) + assert table is not None + + # Verify schema + table_schema = table.schema + assert len(table_schema) == 3 + assert table_schema.field("id").type == pa.int64() + assert table_schema.field("text").type == pa.string() + + def test_rename_table_not_supported(self): + """Test that rename_table raises NotImplementedError.""" + db = lancedb.connect_namespace("temp", {"root": self.temp_dir}) + + # Create a table + schema = pa.schema( + [ + pa.field("id", pa.int64()), + pa.field("vector", pa.list_(pa.float32(), 2)), + ] + ) + db.create_table("old_name", schema=schema) + + # Rename should raise NotImplementedError + with pytest.raises(NotImplementedError, match="rename_table is not supported"): + db.rename_table("old_name", "new_name") + + def test_drop_all_tables(self): + """Test dropping all tables through namespace.""" + db = lancedb.connect_namespace("temp", {"root": self.temp_dir}) + + # Create multiple tables + schema = pa.schema( + [ + pa.field("id", pa.int64()), + pa.field("vector", pa.list_(pa.float32(), 2)), + ] + ) + for i in range(3): + db.create_table(f"table{i}", schema=schema) + + # Verify tables exist + assert len(list(db.table_names())) == 3 + + # Drop all tables + db.drop_all_tables() + + # Verify all tables are gone + assert len(list(db.table_names())) == 0 + + def test_table_operations(self): + """Test various table operations through namespace.""" + db = lancedb.connect_namespace("temp", {"root": self.temp_dir}) + + # Create a table with schema + schema = pa.schema( + [ + pa.field("id", pa.int64()), + pa.field("vector", pa.list_(pa.float32(), 2)), + pa.field("text", pa.string()), + ] + ) + table = db.create_table("test_table", schema=schema) + + # Verify empty table was created + result = table.to_pandas() + assert len(result) == 0 + assert list(result.columns) == ["id", "vector", "text"] + + # Test add data to the table + new_data = [ + {"id": 1, "vector": [1.0, 2.0], "text": "item_1"}, + {"id": 2, "vector": [2.0, 3.0], "text": "item_2"}, + ] + table.add(new_data) + result = table.to_pandas() + assert len(result) == 2 + + # Test delete + table.delete("id = 1") + result = table.to_pandas() + assert len(result) == 1 + assert result["id"].values[0] == 2 + + # Test update + table.update(where="id = 2", values={"text": "updated"}) + result = table.to_pandas() + assert result["text"].values[0] == "updated" + + def test_storage_options(self): + """Test passing storage options through namespace connection.""" + # Connect with storage options + storage_opts = {"test_option": "test_value"} + db = lancedb.connect_namespace( + "temp", {"root": self.temp_dir}, storage_options=storage_opts + ) + + # Storage options should be preserved + assert db.storage_options == storage_opts + + # Create table with additional storage options + table_opts = {"table_option": "table_value"} + schema = pa.schema( + [ + pa.field("id", pa.int64()), + pa.field("vector", pa.list_(pa.float32(), 2)), + ] + ) + db.create_table("test_table", schema=schema, storage_options=table_opts)