feat(python): integrate with lance namespace (#2599)

This PR integrates `lancedb` with `lance-namespace` so that users can
use LanceDB client to access Lance tables in any catalog services. In
general, we expect most of the logic to be delegated to the existing
`LanceDBConnection` and `LanceTable`, but the namespace implemenation
will control how table is created, dropped, and describe where the table
is stored with any related storage options like access credentials.

The implementation currently only supports a 1 level namespace that
directly contains tables. We will introduce nested namespace support in
a separated PR.

Users are expected to use it in the following way:

```python
>>> import lancedb
>>> import pyarrow as pa
>>> # Connect using GlueNamespace
>>> db = lancedb.connect_namespace("glue", {"catalog_id": "123456789012"})
>>> # Create a table with schema
>>> schema = pa.schema([
...     pa.field("id", pa.int64()),
...     pa.field("vector", pa.list_(pa.float32(), 2))
... ])
>>> table = db.create_table("my_table", schema=schema)
>>> # List tables
>>> db.table_names()
['my_table']
```
This commit is contained in:
Jack Ye
2025-08-20 15:46:16 -07:00
committed by GitHub
parent d4a41b5663
commit 04285a4a4e
4 changed files with 743 additions and 0 deletions

View File

@@ -10,6 +10,7 @@ dependencies = [
"pyarrow>=16",
"pydantic>=1.10",
"tqdm>=4.27.0",
"lance-namespace==0.0.6"
]
description = "lancedb"
authors = [{ name = "LanceDB Devs", email = "dev@lancedb.com" }]

View File

@@ -19,6 +19,7 @@ from .remote.db import RemoteDBConnection
from .schema import vector
from .table import AsyncTable
from ._lancedb import Session
from .namespace import connect_namespace, LanceNamespaceDBConnection
def connect(
@@ -221,6 +222,7 @@ async def connect_async(
__all__ = [
"connect",
"connect_async",
"connect_namespace",
"AsyncConnection",
"AsyncTable",
"URI",
@@ -228,6 +230,7 @@ __all__ = [
"vector",
"DBConnection",
"LanceDBConnection",
"LanceNamespaceDBConnection",
"RemoteDBConnection",
"Session",
"__version__",

View File

@@ -0,0 +1,325 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
"""
LanceDB Namespace integration module.
This module provides integration with lance_namespace for managing tables
through a namespace abstraction.
"""
from __future__ import annotations
from typing import Dict, Iterable, List, Optional, Union
import os
from lancedb.db import DBConnection
from lancedb.table import LanceTable, Table
from lancedb.util import validate_table_name
from lancedb.common import validate_schema
from lancedb.table import sanitize_create_table
from overrides import override
from lance_namespace import LanceNamespace, connect as namespace_connect
from lance_namespace_urllib3_client.models import (
ListTablesRequest,
DescribeTableRequest,
CreateTableRequest,
DropTableRequest,
JsonArrowSchema,
JsonArrowField,
JsonArrowDataType,
)
import pyarrow as pa
from datetime import timedelta
from lancedb.pydantic import LanceModel
from lancedb.common import DATA
from lancedb.embeddings import EmbeddingFunctionConfig
from ._lancedb import Session
def _convert_pyarrow_type_to_json(arrow_type: pa.DataType) -> JsonArrowDataType:
"""Convert PyArrow DataType to JsonArrowDataType."""
if pa.types.is_null(arrow_type):
type_name = "null"
elif pa.types.is_boolean(arrow_type):
type_name = "bool"
elif pa.types.is_int8(arrow_type):
type_name = "int8"
elif pa.types.is_uint8(arrow_type):
type_name = "uint8"
elif pa.types.is_int16(arrow_type):
type_name = "int16"
elif pa.types.is_uint16(arrow_type):
type_name = "uint16"
elif pa.types.is_int32(arrow_type):
type_name = "int32"
elif pa.types.is_uint32(arrow_type):
type_name = "uint32"
elif pa.types.is_int64(arrow_type):
type_name = "int64"
elif pa.types.is_uint64(arrow_type):
type_name = "uint64"
elif pa.types.is_float32(arrow_type):
type_name = "float32"
elif pa.types.is_float64(arrow_type):
type_name = "float64"
elif pa.types.is_string(arrow_type):
type_name = "utf8"
elif pa.types.is_binary(arrow_type):
type_name = "binary"
elif pa.types.is_list(arrow_type):
# For list types, we need more complex handling
type_name = "list"
elif pa.types.is_fixed_size_list(arrow_type):
type_name = "fixed_size_list"
else:
# Default to string representation for unsupported types
type_name = str(arrow_type)
return JsonArrowDataType(type=type_name)
def _convert_pyarrow_schema_to_json(schema: pa.Schema) -> JsonArrowSchema:
"""Convert PyArrow Schema to JsonArrowSchema."""
fields = []
for field in schema:
json_field = JsonArrowField(
name=field.name,
type=_convert_pyarrow_type_to_json(field.type),
nullable=field.nullable,
metadata=field.metadata,
)
fields.append(json_field)
return JsonArrowSchema(fields=fields, metadata=schema.metadata)
class LanceNamespaceDBConnection(DBConnection):
"""
A LanceDB connection that uses a namespace for table management.
This connection delegates table URI resolution to a lance_namespace instance,
while using the standard LanceTable for actual table operations.
"""
def __init__(
self,
namespace: LanceNamespace,
*,
read_consistency_interval: Optional[timedelta] = None,
storage_options: Optional[Dict[str, str]] = None,
session: Optional[Session] = None,
):
"""
Initialize a namespace-based LanceDB connection.
Parameters
----------
namespace : LanceNamespace
The namespace instance to use for table management
read_consistency_interval : Optional[timedelta]
The interval at which to check for updates to the table from other
processes. If None, then consistency is not checked.
storage_options : Optional[Dict[str, str]]
Additional options for the storage backend
session : Optional[Session]
A session to use for this connection
"""
self._ns = namespace
self.read_consistency_interval = read_consistency_interval
self.storage_options = storage_options or {}
self.session = session
@override
def table_names(
self, page_token: Optional[str] = None, limit: int = 10
) -> Iterable[str]:
# Use namespace to list tables
request = ListTablesRequest(id=None, page_token=page_token, limit=limit)
response = self._ns.list_tables(request)
return response.tables if response.tables else []
@override
def create_table(
self,
name: str,
data: Optional[DATA] = None,
schema: Optional[Union[pa.Schema, LanceModel]] = None,
mode: str = "create",
exist_ok: bool = False,
on_bad_vectors: str = "error",
fill_value: float = 0.0,
embedding_functions: Optional[List[EmbeddingFunctionConfig]] = None,
*,
storage_options: Optional[Dict[str, str]] = None,
data_storage_version: Optional[str] = None,
enable_v2_manifest_paths: Optional[bool] = None,
) -> Table:
if mode.lower() not in ["create", "overwrite"]:
raise ValueError("mode must be either 'create' or 'overwrite'")
validate_table_name(name)
# TODO: support passing data
if data is not None:
raise ValueError(
"create_table currently only supports creating empty tables (data=None)"
)
# Prepare schema
metadata = None
if embedding_functions is not None:
from lancedb.embeddings.registry import EmbeddingFunctionRegistry
registry = EmbeddingFunctionRegistry.get_instance()
metadata = registry.get_table_metadata(embedding_functions)
data, schema = sanitize_create_table(
data, schema, metadata, on_bad_vectors, fill_value
)
validate_schema(schema)
# Convert PyArrow schema to JsonArrowSchema
json_schema = _convert_pyarrow_schema_to_json(schema)
# Create table request
request = CreateTableRequest(id=[name], var_schema=json_schema)
# Create empty Arrow IPC stream bytes
import pyarrow.ipc as ipc
import io
empty_table = pa.Table.from_arrays(
[pa.array([], type=field.type) for field in schema], schema=schema
)
buffer = io.BytesIO()
with ipc.new_stream(buffer, schema) as writer:
writer.write_table(empty_table)
request_data = buffer.getvalue()
self._ns.create_table(request, request_data)
return self.open_table(name, storage_options=storage_options)
@override
def open_table(
self,
name: str,
*,
storage_options: Optional[Dict[str, str]] = None,
index_cache_size: Optional[int] = None,
) -> Table:
request = DescribeTableRequest(id=[name])
response = self._ns.describe_table(request)
merged_storage_options = dict()
if storage_options:
merged_storage_options.update(storage_options)
if response.storage_options:
merged_storage_options.update(response.storage_options)
return self._lance_table_from_uri(
response.location,
storage_options=merged_storage_options,
index_cache_size=index_cache_size,
)
@override
def drop_table(self, name: str):
# Use namespace drop_table directly
request = DropTableRequest(id=[name])
self._ns.drop_table(request)
@override
def rename_table(self, cur_name: str, new_name: str):
raise NotImplementedError(
"rename_table is not supported for namespace connections"
)
@override
def drop_database(self):
raise NotImplementedError(
"drop_database is deprecated, use drop_all_tables instead"
)
@override
def drop_all_tables(self):
for table_name in self.table_names():
self.drop_table(table_name)
def _lance_table_from_uri(
self,
table_uri: str,
*,
storage_options: Optional[Dict[str, str]] = None,
index_cache_size: Optional[int] = None,
) -> LanceTable:
# Extract the base path and table name from the URI
if table_uri.endswith(".lance"):
base_path = os.path.dirname(table_uri)
table_name = os.path.basename(table_uri)[:-6] # Remove .lance
else:
raise ValueError(f"Invalid table URI: {table_uri}")
from lancedb.db import LanceDBConnection
temp_conn = LanceDBConnection(
base_path,
read_consistency_interval=self.read_consistency_interval,
storage_options={**self.storage_options, **(storage_options or {})},
session=self.session,
)
# Open the table using the temporary connection
return LanceTable.open(
temp_conn,
table_name,
storage_options=storage_options,
index_cache_size=index_cache_size,
)
def connect_namespace(
impl: str,
properties: Dict[str, str],
*,
read_consistency_interval: Optional[timedelta] = None,
storage_options: Optional[Dict[str, str]] = None,
session: Optional[Session] = None,
) -> LanceNamespaceDBConnection:
"""
Connect to a LanceDB database through a namespace.
Parameters
----------
impl : str
The namespace implementation to use. For examples:
- "dir" for DirectoryNamespace
- "rest" for REST-based namespace
- Full module path for custom implementations
properties : Dict[str, str]
Configuration properties for the namespace implementation.
Different namespace implemenation has different config properties.
For example, use DirectoryNamespace with {"root": "/path/to/directory"}
read_consistency_interval : Optional[timedelta]
The interval at which to check for updates to the table from other
processes. If None, then consistency is not checked.
storage_options : Optional[Dict[str, str]]
Additional options for the storage backend
session : Optional[Session]
A session to use for this connection
Returns
-------
LanceNamespaceDBConnection
A namespace-based connection to LanceDB
"""
namespace = namespace_connect(impl, properties)
# Return the namespace-based connection
return LanceNamespaceDBConnection(
namespace,
read_consistency_interval=read_consistency_interval,
storage_options=storage_options,
session=session,
)

View File

@@ -0,0 +1,414 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
"""Tests for LanceDB namespace integration."""
import tempfile
import shutil
from typing import Dict, Optional
import pytest
import pyarrow as pa
import lancedb
from lance_namespace.namespace import NATIVE_IMPLS, LanceNamespace
from lance_namespace_urllib3_client.models import (
ListTablesRequest,
ListTablesResponse,
DescribeTableRequest,
DescribeTableResponse,
RegisterTableRequest,
RegisterTableResponse,
DeregisterTableRequest,
DeregisterTableResponse,
CreateTableRequest,
CreateTableResponse,
DropTableRequest,
DropTableResponse,
)
class TempNamespace(LanceNamespace):
"""A simple dictionary-backed namespace for testing."""
# Class-level storage to persist table registry across instances
_global_registry: Dict[str, Dict[str, str]] = {}
def __init__(self, **properties):
"""Initialize the test namespace.
Args:
root: The root directory for tables (optional)
**properties: Additional configuration properties
"""
self.config = TempNamespaceConfig(properties)
# Use the root as a key to maintain separate registries per root
root = self.config.root
if root not in self._global_registry:
self._global_registry[root] = {}
self.tables = self._global_registry[root] # Reference to shared registry
def list_tables(self, request: ListTablesRequest) -> ListTablesResponse:
"""List all tables in the namespace."""
# For simplicity, ignore namespace ID validation
tables = list(self.tables.keys())
return ListTablesResponse(tables=tables)
def describe_table(self, request: DescribeTableRequest) -> DescribeTableResponse:
"""Describe a table by returning its location."""
if not request.id or len(request.id) != 1:
raise ValueError("Invalid table ID")
table_name = request.id[0]
if table_name not in self.tables:
raise RuntimeError(f"Table does not exist: {table_name}")
table_uri = self.tables[table_name]
return DescribeTableResponse(location=table_uri)
def create_table(
self, request: CreateTableRequest, request_data: bytes
) -> CreateTableResponse:
"""Create a table in the namespace."""
if not request.id or len(request.id) != 1:
raise ValueError("Invalid table ID")
table_name = request.id[0]
# Check if table already exists
if table_name in self.tables:
if request.mode == "overwrite":
# Drop existing table for overwrite mode
del self.tables[table_name]
else:
raise RuntimeError(f"Table already exists: {table_name}")
# Generate table URI based on root directory
table_uri = f"{self.config.root}/{table_name}.lance"
# Parse the Arrow IPC stream to get the schema and create the actual table
import pyarrow.ipc as ipc
import io
import lance
# Read the IPC stream
reader = ipc.open_stream(io.BytesIO(request_data))
table = reader.read_all()
# Create the actual Lance table
lance.write_dataset(table, table_uri)
# Store the table mapping
self.tables[table_name] = table_uri
return CreateTableResponse(location=table_uri)
def drop_table(self, request: DropTableRequest) -> DropTableResponse:
"""Drop a table from the namespace."""
if not request.id or len(request.id) != 1:
raise ValueError("Invalid table ID")
table_name = request.id[0]
if table_name not in self.tables:
raise RuntimeError(f"Table does not exist: {table_name}")
# Get the table URI
table_uri = self.tables[table_name]
# Delete the actual table files
import shutil
import os
if os.path.exists(table_uri):
shutil.rmtree(table_uri, ignore_errors=True)
# Remove from registry
del self.tables[table_name]
return DropTableResponse()
def register_table(self, request: RegisterTableRequest) -> RegisterTableResponse:
"""Register a table with the namespace."""
if not request.id or len(request.id) != 1:
raise ValueError("Invalid table ID")
if not request.location:
raise ValueError("Table location is required")
table_name = request.id[0]
self.tables[table_name] = request.location
return RegisterTableResponse()
def deregister_table(
self, request: DeregisterTableRequest
) -> DeregisterTableResponse:
"""Deregister a table from the namespace."""
if not request.id or len(request.id) != 1:
raise ValueError("Invalid table ID")
table_name = request.id[0]
if table_name not in self.tables:
raise RuntimeError(f"Table does not exist: {table_name}")
del self.tables[table_name]
return DeregisterTableResponse()
class TempNamespaceConfig:
"""Configuration for TestNamespace."""
ROOT = "root"
def __init__(self, properties: Optional[Dict[str, str]] = None):
"""Initialize configuration from properties.
Args:
properties: Dictionary of configuration properties
"""
if properties is None:
properties = {}
self._root = properties.get(self.ROOT, "/tmp")
@property
def root(self) -> str:
"""Get the namespace root directory."""
return self._root
NATIVE_IMPLS["temp"] = f"{TempNamespace.__module__}.TempNamespace"
class TestNamespaceConnection:
"""Test namespace-based LanceDB connection."""
def setup_method(self):
"""Set up test fixtures."""
self.temp_dir = tempfile.mkdtemp()
# Clear the TestNamespace registry for this test
if self.temp_dir in TempNamespace._global_registry:
TempNamespace._global_registry[self.temp_dir].clear()
def teardown_method(self):
"""Clean up test fixtures."""
# Clear the TestNamespace registry
if self.temp_dir in TempNamespace._global_registry:
del TempNamespace._global_registry[self.temp_dir]
shutil.rmtree(self.temp_dir, ignore_errors=True)
def test_connect_namespace_test(self):
"""Test connecting to LanceDB through TestNamespace."""
# Connect using TestNamespace
db = lancedb.connect_namespace("temp", {"root": self.temp_dir})
# Should be a LanceNamespaceDBConnection
assert isinstance(db, lancedb.LanceNamespaceDBConnection)
# Initially no tables
assert len(list(db.table_names())) == 0
def test_create_table_through_namespace(self):
"""Test creating a table through namespace."""
db = lancedb.connect_namespace("temp", {"root": self.temp_dir})
# Define schema for empty table
schema = pa.schema(
[
pa.field("id", pa.int64()),
pa.field("vector", pa.list_(pa.float32(), 2)),
pa.field("text", pa.string()),
]
)
# Create empty table
table = db.create_table("test_table", schema=schema)
assert table is not None
assert table.name == "test_table"
# Table should appear in namespace
table_names = list(db.table_names())
assert "test_table" in table_names
assert len(table_names) == 1
# Verify empty table
result = table.to_pandas()
assert len(result) == 0
assert list(result.columns) == ["id", "vector", "text"]
def test_open_table_through_namespace(self):
"""Test opening an existing table through namespace."""
db = lancedb.connect_namespace("temp", {"root": self.temp_dir})
# Create a table with schema
schema = pa.schema(
[
pa.field("id", pa.int64()),
pa.field("vector", pa.list_(pa.float32(), 2)),
]
)
db.create_table("test_table", schema=schema)
# Open the table
table = db.open_table("test_table")
assert table is not None
assert table.name == "test_table"
# Verify empty table with correct schema
result = table.to_pandas()
assert len(result) == 0
assert list(result.columns) == ["id", "vector"]
def test_drop_table_through_namespace(self):
"""Test dropping a table through namespace."""
db = lancedb.connect_namespace("temp", {"root": self.temp_dir})
# Create tables
schema = pa.schema(
[
pa.field("id", pa.int64()),
pa.field("vector", pa.list_(pa.float32(), 2)),
]
)
db.create_table("table1", schema=schema)
db.create_table("table2", schema=schema)
# Verify both tables exist
table_names = list(db.table_names())
assert "table1" in table_names
assert "table2" in table_names
assert len(table_names) == 2
# Drop one table
db.drop_table("table1")
# Verify only table2 remains
table_names = list(db.table_names())
assert "table1" not in table_names
assert "table2" in table_names
assert len(table_names) == 1
# Should not be able to open dropped table
with pytest.raises(RuntimeError):
db.open_table("table1")
def test_create_table_with_schema(self):
"""Test creating a table with explicit schema through namespace."""
db = lancedb.connect_namespace("temp", {"root": self.temp_dir})
# Define schema
schema = pa.schema(
[
pa.field("id", pa.int64()),
pa.field("vector", pa.list_(pa.float32(), 3)),
pa.field("text", pa.string()),
]
)
# Create table with schema
table = db.create_table("test_table", schema=schema)
assert table is not None
# Verify schema
table_schema = table.schema
assert len(table_schema) == 3
assert table_schema.field("id").type == pa.int64()
assert table_schema.field("text").type == pa.string()
def test_rename_table_not_supported(self):
"""Test that rename_table raises NotImplementedError."""
db = lancedb.connect_namespace("temp", {"root": self.temp_dir})
# Create a table
schema = pa.schema(
[
pa.field("id", pa.int64()),
pa.field("vector", pa.list_(pa.float32(), 2)),
]
)
db.create_table("old_name", schema=schema)
# Rename should raise NotImplementedError
with pytest.raises(NotImplementedError, match="rename_table is not supported"):
db.rename_table("old_name", "new_name")
def test_drop_all_tables(self):
"""Test dropping all tables through namespace."""
db = lancedb.connect_namespace("temp", {"root": self.temp_dir})
# Create multiple tables
schema = pa.schema(
[
pa.field("id", pa.int64()),
pa.field("vector", pa.list_(pa.float32(), 2)),
]
)
for i in range(3):
db.create_table(f"table{i}", schema=schema)
# Verify tables exist
assert len(list(db.table_names())) == 3
# Drop all tables
db.drop_all_tables()
# Verify all tables are gone
assert len(list(db.table_names())) == 0
def test_table_operations(self):
"""Test various table operations through namespace."""
db = lancedb.connect_namespace("temp", {"root": self.temp_dir})
# Create a table with schema
schema = pa.schema(
[
pa.field("id", pa.int64()),
pa.field("vector", pa.list_(pa.float32(), 2)),
pa.field("text", pa.string()),
]
)
table = db.create_table("test_table", schema=schema)
# Verify empty table was created
result = table.to_pandas()
assert len(result) == 0
assert list(result.columns) == ["id", "vector", "text"]
# Test add data to the table
new_data = [
{"id": 1, "vector": [1.0, 2.0], "text": "item_1"},
{"id": 2, "vector": [2.0, 3.0], "text": "item_2"},
]
table.add(new_data)
result = table.to_pandas()
assert len(result) == 2
# Test delete
table.delete("id = 1")
result = table.to_pandas()
assert len(result) == 1
assert result["id"].values[0] == 2
# Test update
table.update(where="id = 2", values={"text": "updated"})
result = table.to_pandas()
assert result["text"].values[0] == "updated"
def test_storage_options(self):
"""Test passing storage options through namespace connection."""
# Connect with storage options
storage_opts = {"test_option": "test_value"}
db = lancedb.connect_namespace(
"temp", {"root": self.temp_dir}, storage_options=storage_opts
)
# Storage options should be preserved
assert db.storage_options == storage_opts
# Create table with additional storage options
table_opts = {"table_option": "table_value"}
schema = pa.schema(
[
pa.field("id", pa.int64()),
pa.field("vector", pa.list_(pa.float32(), 2)),
]
)
db.create_table("test_table", schema=schema, storage_options=table_opts)