feat: support namespace credentials vending (#2778)

Based on https://github.com/lancedb/lance/pull/4984

1. Bump to 1.0.0-beta.2
2. Use DirectoryNamespace in lance to perform all testing in python and
rust for much better coverage
3. Refactor `ListingDatabase` to be able to accept location and
namespace. This is because we have to leverage listing database (local
lancedb connection) for using namespace, namespace only resolves the
location and storage options but we don't want to bind all the way to
rust since user will plug-in namespace from python side. And thus
`ListingDatabase` needs to be able to accept location and namespace that
are created from namespace connection.
4. For credentials vending, we also pass storage options provider all
the way to rust layer, and the rust layer calls back to the python
function to fetch next storage option. This is exactly the same thing we
did in pylance.
This commit is contained in:
Jack Ye
2025-11-17 00:42:24 -08:00
committed by GitHub
parent c0cc58c156
commit e47f552a86
27 changed files with 1660 additions and 636 deletions

View File

@@ -14,6 +14,7 @@ __version__ = importlib.metadata.version("lancedb")
from ._lancedb import connect as lancedb_connect
from .common import URI, sanitize_uri
from .db import AsyncConnection, DBConnection, LanceDBConnection
from .io import StorageOptionsProvider
from .remote import ClientConfig
from .remote.db import RemoteDBConnection
from .schema import vector
@@ -233,6 +234,7 @@ __all__ = [
"LanceNamespaceDBConnection",
"RemoteDBConnection",
"Session",
"StorageOptionsProvider",
"Table",
"__version__",
]

View File

@@ -4,6 +4,7 @@ from typing import Dict, List, Optional, Tuple, Any, TypedDict, Union, Literal
import pyarrow as pa
from .index import BTree, IvfFlat, IvfPq, Bitmap, LabelList, HnswPq, HnswSq, FTS
from .io import StorageOptionsProvider
from .remote import ClientConfig
class Session:
@@ -44,6 +45,8 @@ class Connection(object):
data: pa.RecordBatchReader,
namespace: List[str] = [],
storage_options: Optional[Dict[str, str]] = None,
storage_options_provider: Optional[StorageOptionsProvider] = None,
location: Optional[str] = None,
) -> Table: ...
async def create_empty_table(
self,
@@ -52,13 +55,17 @@ class Connection(object):
schema: pa.Schema,
namespace: List[str] = [],
storage_options: Optional[Dict[str, str]] = None,
storage_options_provider: Optional[StorageOptionsProvider] = None,
location: Optional[str] = None,
) -> Table: ...
async def open_table(
self,
name: str,
namespace: List[str] = [],
storage_options: Optional[Dict[str, str]] = None,
storage_options_provider: Optional[StorageOptionsProvider] = None,
index_cache_size: Optional[int] = None,
location: Optional[str] = None,
) -> Table: ...
async def clone_table(
self,

View File

@@ -45,6 +45,7 @@ if TYPE_CHECKING:
from ._lancedb import Connection as LanceDbConnection
from .common import DATA, URI
from .embeddings import EmbeddingFunctionConfig
from .io import StorageOptionsProvider
from ._lancedb import Session
@@ -143,6 +144,7 @@ class DBConnection(EnforceOverrides):
*,
namespace: List[str] = [],
storage_options: Optional[Dict[str, str]] = None,
storage_options_provider: Optional["StorageOptionsProvider"] = None,
data_storage_version: Optional[str] = None,
enable_v2_manifest_paths: Optional[bool] = None,
) -> Table:
@@ -308,6 +310,7 @@ class DBConnection(EnforceOverrides):
*,
namespace: List[str] = [],
storage_options: Optional[Dict[str, str]] = None,
storage_options_provider: Optional["StorageOptionsProvider"] = None,
index_cache_size: Optional[int] = None,
) -> Table:
"""Open a Lance Table in the database.
@@ -463,6 +466,12 @@ class LanceDBConnection(DBConnection):
is_local = isinstance(uri, Path) or scheme == "file"
if is_local:
if isinstance(uri, str):
# Strip file:// or file:/ scheme if present
# file:///path becomes file:/path after URL normalization
if uri.startswith("file://"):
uri = uri[7:] # Remove "file://"
elif uri.startswith("file:/"):
uri = uri[5:] # Remove "file:"
uri = Path(uri)
uri = uri.expanduser().absolute()
Path(uri).mkdir(parents=True, exist_ok=True)
@@ -625,6 +634,7 @@ class LanceDBConnection(DBConnection):
*,
namespace: List[str] = [],
storage_options: Optional[Dict[str, str]] = None,
storage_options_provider: Optional["StorageOptionsProvider"] = None,
data_storage_version: Optional[str] = None,
enable_v2_manifest_paths: Optional[bool] = None,
) -> LanceTable:
@@ -655,6 +665,7 @@ class LanceDBConnection(DBConnection):
embedding_functions=embedding_functions,
namespace=namespace,
storage_options=storage_options,
storage_options_provider=storage_options_provider,
)
return tbl
@@ -665,6 +676,7 @@ class LanceDBConnection(DBConnection):
*,
namespace: List[str] = [],
storage_options: Optional[Dict[str, str]] = None,
storage_options_provider: Optional["StorageOptionsProvider"] = None,
index_cache_size: Optional[int] = None,
) -> LanceTable:
"""Open a table in the database.
@@ -696,6 +708,7 @@ class LanceDBConnection(DBConnection):
name,
namespace=namespace,
storage_options=storage_options,
storage_options_provider=storage_options_provider,
index_cache_size=index_cache_size,
)
@@ -977,9 +990,11 @@ class AsyncConnection(object):
on_bad_vectors: Optional[str] = None,
fill_value: Optional[float] = None,
storage_options: Optional[Dict[str, str]] = None,
storage_options_provider: Optional["StorageOptionsProvider"] = None,
*,
namespace: List[str] = [],
embedding_functions: Optional[List[EmbeddingFunctionConfig]] = None,
location: Optional[str] = None,
) -> AsyncTable:
"""Create an [AsyncTable][lancedb.table.AsyncTable] in the database.
@@ -1170,6 +1185,8 @@ class AsyncConnection(object):
schema,
namespace=namespace,
storage_options=storage_options,
storage_options_provider=storage_options_provider,
location=location,
)
else:
data = data_to_reader(data, schema)
@@ -1179,6 +1196,8 @@ class AsyncConnection(object):
data,
namespace=namespace,
storage_options=storage_options,
storage_options_provider=storage_options_provider,
location=location,
)
return AsyncTable(new_table)
@@ -1189,7 +1208,9 @@ class AsyncConnection(object):
*,
namespace: List[str] = [],
storage_options: Optional[Dict[str, str]] = None,
storage_options_provider: Optional["StorageOptionsProvider"] = None,
index_cache_size: Optional[int] = None,
location: Optional[str] = None,
) -> AsyncTable:
"""Open a Lance Table in the database.
@@ -1218,6 +1239,10 @@ class AsyncConnection(object):
This cache applies to the entire opened table, across all indices.
Setting this value higher will increase performance on larger datasets
at the expense of more RAM
location: str, optional
The explicit location (URI) of the table. If provided, the table will be
opened from this location instead of deriving it from the database URI
and table name.
Returns
-------
@@ -1227,7 +1252,9 @@ class AsyncConnection(object):
name,
namespace=namespace,
storage_options=storage_options,
storage_options_provider=storage_options_provider,
index_cache_size=index_cache_size,
location=location,
)
return AsyncTable(table)

View File

@@ -0,0 +1,71 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
"""I/O utilities and interfaces for LanceDB."""
from abc import ABC, abstractmethod
from typing import Dict
class StorageOptionsProvider(ABC):
"""Abstract base class for providing storage options to LanceDB tables.
Storage options providers enable automatic credential refresh for cloud
storage backends (e.g., AWS S3, Azure Blob Storage, GCS). When credentials
have an expiration time, the provider's fetch_storage_options() method will
be called periodically to get fresh credentials before they expire.
Example
-------
>>> class MyProvider(StorageOptionsProvider):
... def fetch_storage_options(self) -> Dict[str, str]:
... # Fetch fresh credentials from your credential manager
... return {
... "aws_access_key_id": "...",
... "aws_secret_access_key": "...",
... "expires_at_millis": "1234567890000" # Optional
... }
"""
@abstractmethod
def fetch_storage_options(self) -> Dict[str, str]:
"""Fetch fresh storage credentials.
This method is called by LanceDB when credentials need to be refreshed.
If the returned dictionary contains an "expires_at_millis" key with a
Unix timestamp in milliseconds, LanceDB will automatically refresh the
credentials before that time. If the key is not present, credentials
are assumed to not expire.
Returns
-------
Dict[str, str]
Dictionary containing cloud storage credentials and optionally an
expiration time:
- "expires_at_millis" (optional): Unix timestamp in milliseconds when
credentials expire
- Provider-specific credential keys (e.g., aws_access_key_id,
aws_secret_access_key, etc.)
Raises
------
RuntimeError
If credentials cannot be fetched or are invalid
"""
pass
def provider_id(self) -> str:
"""Return a human-readable unique identifier for this provider instance.
This identifier is used for caching and equality comparison. Two providers
with the same ID will share the same cached object store connection.
The default implementation uses the class name and string representation.
Override this method if you need custom identification logic.
Returns
-------
str
A unique identifier for this provider instance
"""
return f"{self.__class__.__name__} {{ repr: {str(self)!r} }}"

View File

@@ -10,42 +10,40 @@ through a namespace abstraction.
from __future__ import annotations
from typing import Dict, Iterable, List, Optional, Union
import os
import sys
from typing import Dict, Iterable, List, Optional, Union
if sys.version_info >= (3, 12):
from typing import override
else:
from overrides import override
from lancedb.db import DBConnection
from datetime import timedelta
import pyarrow as pa
from lancedb.db import DBConnection, LanceDBConnection
from lancedb.io import StorageOptionsProvider
from lancedb.table import LanceTable, Table
from lancedb.util import validate_table_name
from lancedb.common import validate_schema
from lancedb.table import sanitize_create_table
from lancedb.common import DATA
from lancedb.pydantic import LanceModel
from lancedb.embeddings import EmbeddingFunctionConfig
from ._lancedb import Session
from lance_namespace import LanceNamespace, connect as namespace_connect
from lance_namespace_urllib3_client.models import (
ListTablesRequest,
DescribeTableRequest,
CreateTableRequest,
DropTableRequest,
ListNamespacesRequest,
CreateNamespaceRequest,
DropNamespaceRequest,
CreateEmptyTableRequest,
JsonArrowSchema,
JsonArrowField,
JsonArrowDataType,
)
import pyarrow as pa
from datetime import timedelta
from lancedb.pydantic import LanceModel
from lancedb.common import DATA
from lancedb.embeddings import EmbeddingFunctionConfig
from ._lancedb import Session
def _convert_pyarrow_type_to_json(arrow_type: pa.DataType) -> JsonArrowDataType:
"""Convert PyArrow DataType to JsonArrowDataType."""
@@ -104,6 +102,89 @@ def _convert_pyarrow_schema_to_json(schema: pa.Schema) -> JsonArrowSchema:
return JsonArrowSchema(fields=fields, metadata=schema.metadata)
class LanceNamespaceStorageOptionsProvider(StorageOptionsProvider):
"""Storage options provider that fetches storage options from a LanceNamespace.
This provider automatically fetches fresh storage options by calling the
namespace's describe_table() method, which returns both the table location
and time-limited storage options. This enables automatic credential refresh
for tables accessed through namespace connections.
Parameters
----------
namespace : LanceNamespace
The namespace instance with a describe_table() method
table_id : List[str]
The table identifier (namespace path + table name)
Examples
--------
>>> from lance_namespace import connect as namespace_connect
>>> namespace = namespace_connect("rest", {"url": "https://..."})
>>> provider = LanceNamespaceStorageOptionsProvider(
... namespace=namespace,
... table_id=["my_namespace", "my_table"]
... )
>>> options = provider.fetch_storage_options()
"""
def __init__(self, namespace: LanceNamespace, table_id: List[str]):
"""Initialize with namespace and table ID.
Parameters
----------
namespace : LanceNamespace
The namespace instance with a describe_table() method
table_id : List[str]
The table identifier
"""
self._namespace = namespace
self._table_id = table_id
def fetch_storage_options(self) -> Dict[str, str]:
"""Fetch storage options from the namespace.
This calls namespace.describe_table() to get the latest storage options
and their expiration time.
Returns
-------
Dict[str, str]
Flat dictionary of string key-value pairs containing storage options.
May include "expires_at_millis" key for automatic refresh.
Raises
------
RuntimeError
If namespace does not return storage_options
"""
request = DescribeTableRequest(id=self._table_id, version=None)
response = self._namespace.describe_table(request)
storage_options = response.storage_options
if storage_options is None:
raise RuntimeError(
"Namespace did not return storage_options. "
"Ensure the namespace supports storage options providing."
)
# Return the storage_options directly - it's already a flat Map<String, String>
return storage_options
def provider_id(self) -> str:
"""Return a human-readable unique identifier for this provider instance."""
# Try to call namespace_id() if available (lance-namespace >= 0.0.20)
if hasattr(self._namespace, "namespace_id"):
namespace_id = self._namespace.namespace_id()
else:
# Fallback for older namespace versions
namespace_id = str(self._namespace)
return (
f"LanceNamespaceStorageOptionsProvider {{ "
f"namespace: {namespace_id}, table_id: {self._table_id!r} }}"
)
class LanceNamespaceDBConnection(DBConnection):
"""
A LanceDB connection that uses a namespace for table management.
@@ -166,6 +247,7 @@ class LanceNamespaceDBConnection(DBConnection):
*,
namespace: List[str] = [],
storage_options: Optional[Dict[str, str]] = None,
storage_options_provider: Optional[StorageOptionsProvider] = None,
data_storage_version: Optional[str] = None,
enable_v2_manifest_paths: Optional[bool] = None,
) -> Table:
@@ -173,48 +255,84 @@ class LanceNamespaceDBConnection(DBConnection):
raise ValueError("mode must be either 'create' or 'overwrite'")
validate_table_name(name)
# TODO: support passing data
if data is not None:
raise ValueError(
"create_table currently only supports creating empty tables (data=None)"
# Get location from namespace
table_id = namespace + [name]
# Step 1: Get the table location and storage options from namespace
# In overwrite mode, if table exists, use describe_table to get
# existing location. Otherwise, call create_empty_table to reserve
# a new location
location = None
namespace_storage_options = None
if mode.lower() == "overwrite":
# Try to describe the table first to see if it exists
try:
describe_request = DescribeTableRequest(id=table_id)
describe_response = self._ns.describe_table(describe_request)
location = describe_response.location
namespace_storage_options = describe_response.storage_options
except Exception:
# Table doesn't exist, will create a new one below
pass
if location is None:
# Table doesn't exist or mode is "create", reserve a new location
create_empty_request = CreateEmptyTableRequest(
id=table_id,
location=None,
properties=self.storage_options if self.storage_options else None,
)
create_empty_response = self._ns.create_empty_table(create_empty_request)
if not create_empty_response.location:
raise ValueError(
"Table location is missing from create_empty_table response"
)
location = create_empty_response.location
namespace_storage_options = create_empty_response.storage_options
# Merge storage options: self.storage_options < user options < namespace options
merged_storage_options = dict(self.storage_options)
if storage_options:
merged_storage_options.update(storage_options)
if namespace_storage_options:
merged_storage_options.update(namespace_storage_options)
# Step 2: Create table using LanceTable.create with the location
# We need a temporary connection for the LanceTable.create method
temp_conn = LanceDBConnection(
location, # Use the actual location as the connection URI
read_consistency_interval=self.read_consistency_interval,
storage_options=merged_storage_options,
session=self.session,
)
# Create a storage options provider if not provided by user
# Only create if namespace returned storage_options (not None)
if storage_options_provider is None and namespace_storage_options is not None:
storage_options_provider = LanceNamespaceStorageOptionsProvider(
namespace=self._ns,
table_id=table_id,
)
# Prepare schema
metadata = None
if embedding_functions is not None:
from lancedb.embeddings.registry import EmbeddingFunctionRegistry
registry = EmbeddingFunctionRegistry.get_instance()
metadata = registry.get_table_metadata(embedding_functions)
data, schema = sanitize_create_table(
data, schema, metadata, on_bad_vectors, fill_value
tbl = LanceTable.create(
temp_conn,
name,
data,
schema,
mode=mode,
exist_ok=exist_ok,
on_bad_vectors=on_bad_vectors,
fill_value=fill_value,
embedding_functions=embedding_functions,
namespace=namespace,
storage_options=merged_storage_options,
storage_options_provider=storage_options_provider,
location=location,
)
validate_schema(schema)
# Convert PyArrow schema to JsonArrowSchema
json_schema = _convert_pyarrow_schema_to_json(schema)
# Create table request with namespace
table_id = namespace + [name]
request = CreateTableRequest(id=table_id, var_schema=json_schema)
# Create empty Arrow IPC stream bytes
import pyarrow.ipc as ipc
import io
empty_table = pa.Table.from_arrays(
[pa.array([], type=field.type) for field in schema], schema=schema
)
buffer = io.BytesIO()
with ipc.new_stream(buffer, schema) as writer:
writer.write_table(empty_table)
request_data = buffer.getvalue()
self._ns.create_table(request, request_data)
return self.open_table(
name, namespace=namespace, storage_options=storage_options
)
return tbl
@override
def open_table(
@@ -223,21 +341,34 @@ class LanceNamespaceDBConnection(DBConnection):
*,
namespace: List[str] = [],
storage_options: Optional[Dict[str, str]] = None,
storage_options_provider: Optional[StorageOptionsProvider] = None,
index_cache_size: Optional[int] = None,
) -> Table:
table_id = namespace + [name]
request = DescribeTableRequest(id=table_id)
response = self._ns.describe_table(request)
merged_storage_options = dict()
# Merge storage options: self.storage_options < user options < namespace options
merged_storage_options = dict(self.storage_options)
if storage_options:
merged_storage_options.update(storage_options)
if response.storage_options:
merged_storage_options.update(response.storage_options)
# Create a storage options provider if not provided by user
# Only create if namespace returned storage_options (not None)
if storage_options_provider is None and response.storage_options is not None:
storage_options_provider = LanceNamespaceStorageOptionsProvider(
namespace=self._ns,
table_id=table_id,
)
return self._lance_table_from_uri(
name,
response.location,
namespace=namespace,
storage_options=merged_storage_options,
storage_options_provider=storage_options_provider,
index_cache_size=index_cache_size,
)
@@ -330,33 +461,32 @@ class LanceNamespaceDBConnection(DBConnection):
def _lance_table_from_uri(
self,
name: str,
table_uri: str,
*,
namespace: List[str] = [],
storage_options: Optional[Dict[str, str]] = None,
storage_options_provider: Optional[StorageOptionsProvider] = None,
index_cache_size: Optional[int] = None,
) -> LanceTable:
# Extract the base path and table name from the URI
if table_uri.endswith(".lance"):
base_path = os.path.dirname(table_uri)
table_name = os.path.basename(table_uri)[:-6] # Remove .lance
else:
raise ValueError(f"Invalid table URI: {table_uri}")
from lancedb.db import LanceDBConnection
# Open a table directly from a URI using the location parameter
# Note: storage_options should already be merged by the caller
temp_conn = LanceDBConnection(
base_path,
table_uri, # Use the table location as the connection URI
read_consistency_interval=self.read_consistency_interval,
storage_options={**self.storage_options, **(storage_options or {})},
storage_options=storage_options if storage_options is not None else {},
session=self.session,
)
# Open the table using the temporary connection
# Open the table using the temporary connection with the location parameter
return LanceTable.open(
temp_conn,
table_name,
name,
namespace=namespace,
storage_options=storage_options,
storage_options_provider=storage_options_provider,
index_cache_size=index_cache_size,
location=table_uri,
)

View File

@@ -75,6 +75,7 @@ from .index import lang_mapping
if TYPE_CHECKING:
from .db import LanceDBConnection
from .io import StorageOptionsProvider
from ._lancedb import (
Table as LanceDBTable,
OptimizeStats,
@@ -1709,7 +1710,9 @@ class LanceTable(Table):
*,
namespace: List[str] = [],
storage_options: Optional[Dict[str, str]] = None,
storage_options_provider: Optional["StorageOptionsProvider"] = None,
index_cache_size: Optional[int] = None,
location: Optional[str] = None,
_async: AsyncTable = None,
):
self._conn = connection
@@ -1722,7 +1725,9 @@ class LanceTable(Table):
name,
namespace=namespace,
storage_options=storage_options,
storage_options_provider=storage_options_provider,
index_cache_size=index_cache_size,
location=location,
)
)
@@ -1730,6 +1735,18 @@ class LanceTable(Table):
def name(self) -> str:
return self._table.name
@property
def namespace(self) -> List[str]:
"""Return the namespace path of the table."""
return self._namespace
@property
def id(self) -> str:
"""Return the full identifier of the table (namespace$name)."""
if self._namespace:
return "$".join(self._namespace + [self.name])
return self.name
@classmethod
def from_inner(cls, tbl: LanceDBTable):
from .db import LanceDBConnection
@@ -1743,8 +1760,26 @@ class LanceTable(Table):
)
@classmethod
def open(cls, db, name, *, namespace: List[str] = [], **kwargs):
tbl = cls(db, name, namespace=namespace, **kwargs)
def open(
cls,
db,
name,
*,
namespace: List[str] = [],
storage_options: Optional[Dict[str, str]] = None,
storage_options_provider: Optional["StorageOptionsProvider"] = None,
index_cache_size: Optional[int] = None,
location: Optional[str] = None,
):
tbl = cls(
db,
name,
namespace=namespace,
storage_options=storage_options,
storage_options_provider=storage_options_provider,
index_cache_size=index_cache_size,
location=location,
)
# check the dataset exists
try:
@@ -2585,8 +2620,10 @@ class LanceTable(Table):
*,
namespace: List[str] = [],
storage_options: Optional[Dict[str, str | bool]] = None,
storage_options_provider: Optional["StorageOptionsProvider"] = None,
data_storage_version: Optional[str] = None,
enable_v2_manifest_paths: Optional[bool] = None,
location: Optional[str] = None,
):
"""
Create a new table.
@@ -2678,6 +2715,8 @@ class LanceTable(Table):
embedding_functions=embedding_functions,
namespace=namespace,
storage_options=storage_options,
storage_options_provider=storage_options_provider,
location=location,
)
)
return self

View File

@@ -781,58 +781,6 @@ def test_local_drop_namespace_not_supported(tmp_path):
db.drop_namespace(["test_namespace"])
def test_local_table_operations_with_namespace_raise_error(tmp_path):
"""
Test that table operations with namespace parameter
raise ValueError in local mode.
"""
db = lancedb.connect(tmp_path)
# Create some test data
data = [{"vector": [1.0, 2.0], "item": "test"}]
schema = pa.schema(
[pa.field("vector", pa.list_(pa.float32(), 2)), pa.field("item", pa.string())]
)
# Test create_table with namespace - should raise ValueError
with pytest.raises(
NotImplementedError,
match="Namespace parameter is not supported for listing database",
):
db.create_table(
"test_table_with_ns", data=data, schema=schema, namespace=["test_ns"]
)
# Create table normally for other tests
db.create_table("test_table", data=data, schema=schema)
assert "test_table" in db.table_names()
# Test open_table with namespace - should raise ValueError
with pytest.raises(
NotImplementedError,
match="Namespace parameter is not supported for listing database",
):
db.open_table("test_table", namespace=["test_ns"])
# Test table_names with namespace - should raise ValueError
with pytest.raises(
NotImplementedError,
match="Namespace parameter is not supported for listing database",
):
list(db.table_names(namespace=["test_ns"]))
# Test drop_table with namespace - should raise ValueError
with pytest.raises(
NotImplementedError,
match="Namespace parameter is not supported for listing database",
):
db.drop_table("test_table", namespace=["test_ns"])
# Test table_names without namespace - should work normally
tables_root = list(db.table_names())
assert "test_table" in tables_root
def test_clone_table_latest_version(tmp_path):
"""Test cloning a table with the latest version (default behavior)"""
import os

View File

@@ -5,352 +5,39 @@
import tempfile
import shutil
from typing import Dict, Optional
import pytest
import pyarrow as pa
import lancedb
from lance_namespace.namespace import NATIVE_IMPLS, LanceNamespace
from lance_namespace_urllib3_client.models import (
ListTablesRequest,
ListTablesResponse,
DescribeTableRequest,
DescribeTableResponse,
RegisterTableRequest,
RegisterTableResponse,
DeregisterTableRequest,
DeregisterTableResponse,
CreateTableRequest,
CreateTableResponse,
DropTableRequest,
DropTableResponse,
ListNamespacesRequest,
ListNamespacesResponse,
CreateNamespaceRequest,
CreateNamespaceResponse,
DropNamespaceRequest,
DropNamespaceResponse,
)
class TempNamespace(LanceNamespace):
"""A simple dictionary-backed namespace for testing."""
# Class-level storage to persist table registry across instances
_global_registry: Dict[str, Dict[str, str]] = {}
# Class-level storage for namespaces (supporting 1-level namespace)
_global_namespaces: Dict[str, set] = {}
def __init__(self, **properties):
"""Initialize the test namespace.
Args:
root: The root directory for tables (optional)
**properties: Additional configuration properties
"""
self.config = TempNamespaceConfig(properties)
# Use the root as a key to maintain separate registries per root
root = self.config.root
if root not in self._global_registry:
self._global_registry[root] = {}
if root not in self._global_namespaces:
self._global_namespaces[root] = set()
self.tables = self._global_registry[root] # Reference to shared registry
self.namespaces = self._global_namespaces[
root
] # Reference to shared namespaces
def namespace_id(self) -> str:
"""Return a human-readable unique identifier for this namespace instance.
Returns:
A unique identifier string based on the root directory
"""
return f"TempNamespace {{ root: '{self.config.root}' }}"
def list_tables(self, request: ListTablesRequest) -> ListTablesResponse:
"""List all tables in the namespace."""
if not request.id:
# List all tables in root namespace
tables = [name for name in self.tables.keys() if "." not in name]
else:
# List tables in specific namespace (1-level only)
if len(request.id) == 1:
namespace_name = request.id[0]
prefix = f"{namespace_name}."
tables = [
name[len(prefix) :]
for name in self.tables.keys()
if name.startswith(prefix)
]
else:
# Multi-level namespaces not supported
raise ValueError("Only 1-level namespaces are supported")
return ListTablesResponse(tables=tables)
def describe_table(self, request: DescribeTableRequest) -> DescribeTableResponse:
"""Describe a table by returning its location."""
if not request.id:
raise ValueError("Invalid table ID")
if len(request.id) == 1:
# Root namespace table
table_name = request.id[0]
elif len(request.id) == 2:
# Namespaced table (1-level namespace)
namespace_name, table_name = request.id
table_name = f"{namespace_name}.{table_name}"
else:
raise ValueError("Only 1-level namespaces are supported")
if table_name not in self.tables:
raise RuntimeError(f"Table does not exist: {table_name}")
table_uri = self.tables[table_name]
return DescribeTableResponse(location=table_uri)
def create_table(
self, request: CreateTableRequest, request_data: bytes
) -> CreateTableResponse:
"""Create a table in the namespace."""
if not request.id:
raise ValueError("Invalid table ID")
if len(request.id) == 1:
# Root namespace table
table_name = request.id[0]
table_uri = f"{self.config.root}/{table_name}.lance"
elif len(request.id) == 2:
# Namespaced table (1-level namespace)
namespace_name, base_table_name = request.id
# Add namespace to our namespace set
self.namespaces.add(namespace_name)
table_name = f"{namespace_name}.{base_table_name}"
table_uri = f"{self.config.root}/{namespace_name}/{base_table_name}.lance"
else:
raise ValueError("Only 1-level namespaces are supported")
# Check if table already exists
if table_name in self.tables:
if request.mode == "overwrite":
# Drop existing table for overwrite mode
del self.tables[table_name]
else:
raise RuntimeError(f"Table already exists: {table_name}")
# Parse the Arrow IPC stream to get the schema and create the actual table
import pyarrow.ipc as ipc
import io
import lance
import os
# Create directory if needed for namespaced tables
os.makedirs(os.path.dirname(table_uri), exist_ok=True)
# Read the IPC stream
reader = ipc.open_stream(io.BytesIO(request_data))
table = reader.read_all()
# Create the actual Lance table
lance.write_dataset(table, table_uri)
# Store the table mapping
self.tables[table_name] = table_uri
return CreateTableResponse(location=table_uri)
def drop_table(self, request: DropTableRequest) -> DropTableResponse:
"""Drop a table from the namespace."""
if not request.id:
raise ValueError("Invalid table ID")
if len(request.id) == 1:
# Root namespace table
table_name = request.id[0]
elif len(request.id) == 2:
# Namespaced table (1-level namespace)
namespace_name, base_table_name = request.id
table_name = f"{namespace_name}.{base_table_name}"
else:
raise ValueError("Only 1-level namespaces are supported")
if table_name not in self.tables:
raise RuntimeError(f"Table does not exist: {table_name}")
# Get the table URI
table_uri = self.tables[table_name]
# Delete the actual table files
import shutil
import os
if os.path.exists(table_uri):
shutil.rmtree(table_uri, ignore_errors=True)
# Remove from registry
del self.tables[table_name]
return DropTableResponse()
def register_table(self, request: RegisterTableRequest) -> RegisterTableResponse:
"""Register a table with the namespace."""
if not request.id or len(request.id) != 1:
raise ValueError("Invalid table ID")
if not request.location:
raise ValueError("Table location is required")
table_name = request.id[0]
self.tables[table_name] = request.location
return RegisterTableResponse()
def deregister_table(
self, request: DeregisterTableRequest
) -> DeregisterTableResponse:
"""Deregister a table from the namespace."""
if not request.id or len(request.id) != 1:
raise ValueError("Invalid table ID")
table_name = request.id[0]
if table_name not in self.tables:
raise RuntimeError(f"Table does not exist: {table_name}")
del self.tables[table_name]
return DeregisterTableResponse()
def list_namespaces(self, request: ListNamespacesRequest) -> ListNamespacesResponse:
"""List child namespaces."""
if not request.id:
# List root-level namespaces
namespaces = list(self.namespaces)
elif len(request.id) == 1:
# For 1-level namespace, there are no child namespaces
namespaces = []
else:
raise ValueError("Only 1-level namespaces are supported")
return ListNamespacesResponse(namespaces=namespaces)
def create_namespace(
self, request: CreateNamespaceRequest
) -> CreateNamespaceResponse:
"""Create a namespace."""
if not request.id:
raise ValueError("Invalid namespace ID")
if len(request.id) == 1:
# Create 1-level namespace
namespace_name = request.id[0]
self.namespaces.add(namespace_name)
# Create directory for the namespace
import os
namespace_dir = f"{self.config.root}/{namespace_name}"
os.makedirs(namespace_dir, exist_ok=True)
else:
raise ValueError("Only 1-level namespaces are supported")
return CreateNamespaceResponse()
def drop_namespace(self, request: DropNamespaceRequest) -> DropNamespaceResponse:
"""Drop a namespace."""
if not request.id:
raise ValueError("Invalid namespace ID")
if len(request.id) == 1:
# Drop 1-level namespace
namespace_name = request.id[0]
if namespace_name not in self.namespaces:
raise RuntimeError(f"Namespace does not exist: {namespace_name}")
# Check if namespace has any tables
prefix = f"{namespace_name}."
tables_in_namespace = [
name for name in self.tables.keys() if name.startswith(prefix)
]
if tables_in_namespace:
raise RuntimeError(
f"Cannot drop namespace '{namespace_name}': contains tables"
)
# Remove namespace
self.namespaces.remove(namespace_name)
# Remove directory
import shutil
import os
namespace_dir = f"{self.config.root}/{namespace_name}"
if os.path.exists(namespace_dir):
shutil.rmtree(namespace_dir, ignore_errors=True)
else:
raise ValueError("Only 1-level namespaces are supported")
return DropNamespaceResponse()
class TempNamespaceConfig:
"""Configuration for TestNamespace."""
ROOT = "root"
def __init__(self, properties: Optional[Dict[str, str]] = None):
"""Initialize configuration from properties.
Args:
properties: Dictionary of configuration properties
"""
if properties is None:
properties = {}
self._root = properties.get(self.ROOT, "/tmp")
@property
def root(self) -> str:
"""Get the namespace root directory."""
return self._root
NATIVE_IMPLS["temp"] = f"{TempNamespace.__module__}.TempNamespace"
class TestNamespaceConnection:
"""Test namespace-based LanceDB connection."""
"""Test namespace-based LanceDB connection using DirectoryNamespace."""
def setup_method(self):
"""Set up test fixtures."""
self.temp_dir = tempfile.mkdtemp()
# Clear the TestNamespace registry for this test
if self.temp_dir in TempNamespace._global_registry:
TempNamespace._global_registry[self.temp_dir].clear()
if self.temp_dir in TempNamespace._global_namespaces:
TempNamespace._global_namespaces[self.temp_dir].clear()
def teardown_method(self):
"""Clean up test fixtures."""
# Clear the TestNamespace registry
if self.temp_dir in TempNamespace._global_registry:
del TempNamespace._global_registry[self.temp_dir]
if self.temp_dir in TempNamespace._global_namespaces:
del TempNamespace._global_namespaces[self.temp_dir]
shutil.rmtree(self.temp_dir, ignore_errors=True)
def test_connect_namespace_test(self):
"""Test connecting to LanceDB through TestNamespace."""
# Connect using TestNamespace
db = lancedb.connect_namespace("temp", {"root": self.temp_dir})
"""Test connecting to LanceDB through DirectoryNamespace."""
# Connect using DirectoryNamespace
db = lancedb.connect_namespace("dir", {"root": self.temp_dir})
# Should be a LanceNamespaceDBConnection
assert isinstance(db, lancedb.LanceNamespaceDBConnection)
# Initially no tables
# Initially no tables in root
assert len(list(db.table_names())) == 0
def test_create_table_through_namespace(self):
"""Test creating a table through namespace."""
db = lancedb.connect_namespace("temp", {"root": self.temp_dir})
db = lancedb.connect_namespace("dir", {"root": self.temp_dir})
# Create a child namespace first
db.create_namespace(["test_ns"])
# Define schema for empty table
schema = pa.schema(
@@ -361,13 +48,15 @@ class TestNamespaceConnection:
]
)
# Create empty table
table = db.create_table("test_table", schema=schema)
# Create empty table in child namespace
table = db.create_table("test_table", schema=schema, namespace=["test_ns"])
assert table is not None
assert table.name == "test_table"
assert table.namespace == ["test_ns"]
assert table.id == "test_ns$test_table"
# Table should appear in namespace
table_names = list(db.table_names())
# Table should appear in child namespace
table_names = list(db.table_names(namespace=["test_ns"]))
assert "test_table" in table_names
assert len(table_names) == 1
@@ -378,21 +67,26 @@ class TestNamespaceConnection:
def test_open_table_through_namespace(self):
"""Test opening an existing table through namespace."""
db = lancedb.connect_namespace("temp", {"root": self.temp_dir})
db = lancedb.connect_namespace("dir", {"root": self.temp_dir})
# Create a table with schema
# Create a child namespace first
db.create_namespace(["test_ns"])
# Create a table with schema in child namespace
schema = pa.schema(
[
pa.field("id", pa.int64()),
pa.field("vector", pa.list_(pa.float32(), 2)),
]
)
db.create_table("test_table", schema=schema)
db.create_table("test_table", schema=schema, namespace=["test_ns"])
# Open the table
table = db.open_table("test_table")
table = db.open_table("test_table", namespace=["test_ns"])
assert table is not None
assert table.name == "test_table"
assert table.namespace == ["test_ns"]
assert table.id == "test_ns$test_table"
# Verify empty table with correct schema
result = table.to_pandas()
@@ -401,44 +95,50 @@ class TestNamespaceConnection:
def test_drop_table_through_namespace(self):
"""Test dropping a table through namespace."""
db = lancedb.connect_namespace("temp", {"root": self.temp_dir})
db = lancedb.connect_namespace("dir", {"root": self.temp_dir})
# Create tables
# Create a child namespace first
db.create_namespace(["test_ns"])
# Create tables in child namespace
schema = pa.schema(
[
pa.field("id", pa.int64()),
pa.field("vector", pa.list_(pa.float32(), 2)),
]
)
db.create_table("table1", schema=schema)
db.create_table("table2", schema=schema)
db.create_table("table1", schema=schema, namespace=["test_ns"])
db.create_table("table2", schema=schema, namespace=["test_ns"])
# Verify both tables exist
table_names = list(db.table_names())
# Verify both tables exist in child namespace
table_names = list(db.table_names(namespace=["test_ns"]))
assert "table1" in table_names
assert "table2" in table_names
assert len(table_names) == 2
# Drop one table
db.drop_table("table1")
db.drop_table("table1", namespace=["test_ns"])
# Verify only table2 remains
table_names = list(db.table_names())
table_names = list(db.table_names(namespace=["test_ns"]))
assert "table1" not in table_names
assert "table2" in table_names
assert len(table_names) == 1
# Test that drop_table works without explicit namespace parameter
db.drop_table("table2")
assert len(list(db.table_names())) == 0
# Drop the second table
db.drop_table("table2", namespace=["test_ns"])
assert len(list(db.table_names(namespace=["test_ns"]))) == 0
# Should not be able to open dropped table
with pytest.raises(RuntimeError):
db.open_table("table1")
db.open_table("table1", namespace=["test_ns"])
def test_create_table_with_schema(self):
"""Test creating a table with explicit schema through namespace."""
db = lancedb.connect_namespace("temp", {"root": self.temp_dir})
db = lancedb.connect_namespace("dir", {"root": self.temp_dir})
# Create a child namespace first
db.create_namespace(["test_ns"])
# Define schema
schema = pa.schema(
@@ -449,9 +149,10 @@ class TestNamespaceConnection:
]
)
# Create table with schema
table = db.create_table("test_table", schema=schema)
# Create table with schema in child namespace
table = db.create_table("test_table", schema=schema, namespace=["test_ns"])
assert table is not None
assert table.namespace == ["test_ns"]
# Verify schema
table_schema = table.schema
@@ -461,16 +162,19 @@ class TestNamespaceConnection:
def test_rename_table_not_supported(self):
"""Test that rename_table raises NotImplementedError."""
db = lancedb.connect_namespace("temp", {"root": self.temp_dir})
db = lancedb.connect_namespace("dir", {"root": self.temp_dir})
# Create a table
# Create a child namespace first
db.create_namespace(["test_ns"])
# Create a table in child namespace
schema = pa.schema(
[
pa.field("id", pa.int64()),
pa.field("vector", pa.list_(pa.float32(), 2)),
]
)
db.create_table("old_name", schema=schema)
db.create_table("old_name", schema=schema, namespace=["test_ns"])
# Rename should raise NotImplementedError
with pytest.raises(NotImplementedError, match="rename_table is not supported"):
@@ -478,9 +182,12 @@ class TestNamespaceConnection:
def test_drop_all_tables(self):
"""Test dropping all tables through namespace."""
db = lancedb.connect_namespace("temp", {"root": self.temp_dir})
db = lancedb.connect_namespace("dir", {"root": self.temp_dir})
# Create multiple tables
# Create a child namespace first
db.create_namespace(["test_ns"])
# Create multiple tables in child namespace
schema = pa.schema(
[
pa.field("id", pa.int64()),
@@ -488,27 +195,30 @@ class TestNamespaceConnection:
]
)
for i in range(3):
db.create_table(f"table{i}", schema=schema)
db.create_table(f"table{i}", schema=schema, namespace=["test_ns"])
# Verify tables exist
assert len(list(db.table_names())) == 3
# Verify tables exist in child namespace
assert len(list(db.table_names(namespace=["test_ns"]))) == 3
# Drop all tables
db.drop_all_tables()
# Drop all tables in child namespace
db.drop_all_tables(namespace=["test_ns"])
# Verify all tables are gone
assert len(list(db.table_names())) == 0
# Verify all tables are gone from child namespace
assert len(list(db.table_names(namespace=["test_ns"]))) == 0
# Test that table_names works with keyword-only namespace parameter
db.create_table("test_table", schema=schema)
result = list(db.table_names(namespace=[]))
db.create_table("test_table", schema=schema, namespace=["test_ns"])
result = list(db.table_names(namespace=["test_ns"]))
assert "test_table" in result
def test_table_operations(self):
"""Test various table operations through namespace."""
db = lancedb.connect_namespace("temp", {"root": self.temp_dir})
db = lancedb.connect_namespace("dir", {"root": self.temp_dir})
# Create a table with schema
# Create a child namespace first
db.create_namespace(["test_ns"])
# Create a table with schema in child namespace
schema = pa.schema(
[
pa.field("id", pa.int64()),
@@ -516,7 +226,7 @@ class TestNamespaceConnection:
pa.field("text", pa.string()),
]
)
table = db.create_table("test_table", schema=schema)
table = db.create_table("test_table", schema=schema, namespace=["test_ns"])
# Verify empty table was created
result = table.to_pandas()
@@ -548,7 +258,7 @@ class TestNamespaceConnection:
# Connect with storage options
storage_opts = {"test_option": "test_value"}
db = lancedb.connect_namespace(
"temp", {"root": self.temp_dir}, storage_options=storage_opts
"dir", {"root": self.temp_dir}, storage_options=storage_opts
)
# Storage options should be preserved
@@ -566,7 +276,7 @@ class TestNamespaceConnection:
def test_namespace_operations(self):
"""Test namespace management operations."""
db = lancedb.connect_namespace("temp", {"root": self.temp_dir})
db = lancedb.connect_namespace("dir", {"root": self.temp_dir})
# Initially no namespaces
assert len(list(db.list_namespaces())) == 0
@@ -617,7 +327,7 @@ class TestNamespaceConnection:
def test_namespace_with_tables_cannot_be_dropped(self):
"""Test that namespaces containing tables cannot be dropped."""
db = lancedb.connect_namespace("temp", {"root": self.temp_dir})
db = lancedb.connect_namespace("dir", {"root": self.temp_dir})
# Create namespace and table
db.create_namespace(["test_namespace"])
@@ -630,7 +340,7 @@ class TestNamespaceConnection:
db.create_table("test_table", schema=schema, namespace=["test_namespace"])
# Try to drop namespace with tables - should fail
with pytest.raises(RuntimeError, match="contains tables"):
with pytest.raises(RuntimeError, match="is not empty"):
db.drop_namespace(["test_namespace"])
# Drop table first
@@ -640,7 +350,7 @@ class TestNamespaceConnection:
db.drop_namespace(["test_namespace"])
def test_same_table_name_different_namespaces(self):
db = lancedb.connect_namespace("temp", {"root": self.temp_dir})
db = lancedb.connect_namespace("dir", {"root": self.temp_dir})
# Create two namespaces
db.create_namespace(["namespace_a"])

View File

@@ -0,0 +1,632 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
"""
Integration tests for LanceDB Namespace with S3 and credential refresh.
This test simulates a namespace server that returns incrementing credentials
and verifies that the credential refresh mechanism works correctly for both
create_table and open_table operations.
Tests verify:
- Storage options provider is auto-created and used
- Credentials are properly cached during reads
- Credentials refresh when they expire
- Both create and open operations work with credential rotation
"""
import copy
import time
import uuid
from threading import Lock
from typing import Dict
import pyarrow as pa
import pytest
from lance_namespace import (
CreateEmptyTableRequest,
CreateEmptyTableResponse,
DescribeTableRequest,
DescribeTableResponse,
LanceNamespace,
)
from lancedb.namespace import LanceNamespaceDBConnection
# LocalStack S3 configuration
CONFIG = {
"allow_http": "true",
"aws_access_key_id": "ACCESSKEY",
"aws_secret_access_key": "SECRETKEY",
"aws_endpoint": "http://localhost:4566",
"aws_region": "us-east-1",
}
def get_boto3_client(*args, **kwargs):
import boto3
return boto3.client(
*args,
region_name=CONFIG["aws_region"],
aws_access_key_id=CONFIG["aws_access_key_id"],
aws_secret_access_key=CONFIG["aws_secret_access_key"],
**kwargs,
)
@pytest.fixture(scope="module")
def s3_bucket():
"""Create and cleanup S3 bucket for integration tests."""
s3 = get_boto3_client("s3", endpoint_url=CONFIG["aws_endpoint"])
bucket_name = "lancedb-namespace-integtest"
# Clean up existing bucket if it exists
try:
delete_bucket(s3, bucket_name)
except s3.exceptions.NoSuchBucket:
pass
s3.create_bucket(Bucket=bucket_name)
yield bucket_name
# Cleanup after tests
delete_bucket(s3, bucket_name)
def delete_bucket(s3, bucket_name):
"""Delete S3 bucket and all its contents."""
try:
# Delete all objects first
paginator = s3.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=bucket_name):
if "Contents" in page:
for obj in page["Contents"]:
s3.delete_object(Bucket=bucket_name, Key=obj["Key"])
s3.delete_bucket(Bucket=bucket_name)
except Exception:
pass
class TrackingNamespace(LanceNamespace):
"""
Mock namespace that wraps DirectoryNamespace and tracks API calls.
This namespace returns incrementing credentials with each API call to simulate
credential rotation. It also tracks the number of times each API is called
to verify caching behavior.
"""
def __init__(
self,
bucket_name: str,
storage_options: Dict[str, str],
credential_expires_in_seconds: int = 60,
):
from lance.namespace import DirectoryNamespace
self.bucket_name = bucket_name
self.base_storage_options = storage_options
self.credential_expires_in_seconds = credential_expires_in_seconds
self.describe_call_count = 0
self.create_call_count = 0
self.lock = Lock()
# Create underlying DirectoryNamespace with storage options
dir_props = {f"storage.{k}": v for k, v in storage_options.items()}
# Use S3 path for bucket name, local path for file paths
if bucket_name.startswith("/") or bucket_name.startswith("file://"):
dir_props["root"] = f"{bucket_name}/namespace_root"
else:
dir_props["root"] = f"s3://{bucket_name}/namespace_root"
self.inner = DirectoryNamespace(**dir_props)
def get_describe_call_count(self) -> int:
"""Thread-safe getter for describe call count."""
with self.lock:
return self.describe_call_count
def get_create_call_count(self) -> int:
"""Thread-safe getter for create call count."""
with self.lock:
return self.create_call_count
def namespace_id(self) -> str:
"""Return namespace identifier."""
return f"TrackingNamespace {{ inner: {self.inner.namespace_id()} }}"
def _modify_storage_options(
self, storage_options: Dict[str, str], count: int
) -> Dict[str, str]:
"""
Add incrementing credentials with expiration timestamp.
This simulates a credential rotation system where each call returns
new credentials that expire after credential_expires_in_seconds.
"""
modified = copy.deepcopy(storage_options) if storage_options else {}
# Increment credentials to simulate rotation
modified["aws_access_key_id"] = f"AKID_{count}"
modified["aws_secret_access_key"] = f"SECRET_{count}"
modified["aws_session_token"] = f"TOKEN_{count}"
# Set expiration time
expires_at_millis = int(
(time.time() + self.credential_expires_in_seconds) * 1000
)
modified["expires_at_millis"] = str(expires_at_millis)
return modified
def create_empty_table(
self, request: CreateEmptyTableRequest
) -> CreateEmptyTableResponse:
"""Track create_empty_table calls and inject rotating credentials."""
with self.lock:
self.create_call_count += 1
count = self.create_call_count
response = self.inner.create_empty_table(request)
response.storage_options = self._modify_storage_options(
response.storage_options, count
)
return response
def describe_table(self, request: DescribeTableRequest) -> DescribeTableResponse:
"""Track describe_table calls and inject rotating credentials."""
with self.lock:
self.describe_call_count += 1
count = self.describe_call_count
response = self.inner.describe_table(request)
response.storage_options = self._modify_storage_options(
response.storage_options, count
)
return response
# Pass through other methods to inner namespace
def list_tables(self, request):
return self.inner.list_tables(request)
def drop_table(self, request):
return self.inner.drop_table(request)
def list_namespaces(self, request):
return self.inner.list_namespaces(request)
def create_namespace(self, request):
return self.inner.create_namespace(request)
def drop_namespace(self, request):
return self.inner.drop_namespace(request)
@pytest.mark.s3_test
def test_namespace_create_table_with_provider(s3_bucket: str):
"""
Test creating a table through namespace with storage options provider.
Verifies:
- create_empty_table is called once to reserve location
- Storage options provider is auto-created
- Table can be written successfully
- Credentials are cached during write operations
"""
storage_options = copy.deepcopy(CONFIG)
namespace = TrackingNamespace(
bucket_name=s3_bucket,
storage_options=storage_options,
credential_expires_in_seconds=3600, # 1 hour
)
db = LanceNamespaceDBConnection(namespace)
# Create unique namespace for this test
namespace_name = f"test_ns_{uuid.uuid4().hex[:8]}"
db.create_namespace([namespace_name])
table_name = f"test_table_{uuid.uuid4().hex}"
namespace_path = [namespace_name]
# Verify initial state
assert namespace.get_create_call_count() == 0
assert namespace.get_describe_call_count() == 0
# Create table with data
data = pa.table(
{
"id": [1, 2, 3],
"vector": [[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]],
"text": ["hello", "world", "test"],
}
)
table = db.create_table(table_name, data, namespace=namespace_path)
# Verify create_empty_table was called exactly once
assert namespace.get_create_call_count() == 1
# describe_table should NOT be called during create in create mode
assert namespace.get_describe_call_count() == 0
# Verify table was created successfully
assert table.name == table_name
result = table.to_pandas()
assert len(result) == 3
assert list(result["id"]) == [1, 2, 3]
@pytest.mark.s3_test
def test_namespace_open_table_with_provider(s3_bucket: str):
"""
Test opening a table through namespace with storage options provider.
Verifies:
- describe_table is called once when opening
- Storage options provider is auto-created
- Table can be read successfully
- Credentials are cached during read operations
"""
storage_options = copy.deepcopy(CONFIG)
namespace = TrackingNamespace(
bucket_name=s3_bucket,
storage_options=storage_options,
credential_expires_in_seconds=3600,
)
db = LanceNamespaceDBConnection(namespace)
# Create unique namespace for this test
namespace_name = f"test_ns_{uuid.uuid4().hex[:8]}"
db.create_namespace([namespace_name])
table_name = f"test_table_{uuid.uuid4().hex}"
namespace_path = [namespace_name]
# Create table first
data = pa.table(
{
"id": [1, 2, 3, 4, 5],
"vector": [[1.0, 2.0], [3.0, 4.0], [5.0, 6.0], [7.0, 8.0], [9.0, 10.0]],
"value": [10, 20, 30, 40, 50],
}
)
db.create_table(table_name, data, namespace=namespace_path)
initial_create_count = namespace.get_create_call_count()
assert initial_create_count == 1
# Open the table
opened_table = db.open_table(table_name, namespace=namespace_path)
# Verify describe_table was called exactly once
assert namespace.get_describe_call_count() == 1
# create_empty_table should not be called again
assert namespace.get_create_call_count() == initial_create_count
# Perform multiple read operations
describe_count_after_open = namespace.get_describe_call_count()
for _ in range(3):
result = opened_table.to_pandas()
assert len(result) == 5
count = opened_table.count_rows()
assert count == 5
# Verify credentials were cached (no additional describe_table calls)
assert namespace.get_describe_call_count() == describe_count_after_open
@pytest.mark.s3_test
def test_namespace_credential_refresh_on_read(s3_bucket: str):
"""
Test credential refresh when credentials expire during read operations.
Verifies:
- Credentials are cached initially (no additional describe_table calls)
- After expiration, credentials are refreshed (describe_table called again)
- Read operations continue to work with refreshed credentials
"""
storage_options = copy.deepcopy(CONFIG)
namespace = TrackingNamespace(
bucket_name=s3_bucket,
storage_options=storage_options,
credential_expires_in_seconds=3, # Short expiration for testing
)
db = LanceNamespaceDBConnection(namespace)
# Create unique namespace for this test
namespace_name = f"test_ns_{uuid.uuid4().hex[:8]}"
db.create_namespace([namespace_name])
table_name = f"test_table_{uuid.uuid4().hex}"
namespace_path = [namespace_name]
# Create table
data = pa.table(
{
"id": [1, 2, 3],
"vector": [[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]],
}
)
db.create_table(table_name, data, namespace=namespace_path)
# Open table (triggers describe_table)
opened_table = db.open_table(table_name, namespace=namespace_path)
# Perform an immediate read (should use credentials from open)
result = opened_table.to_pandas()
assert len(result) == 3
describe_count_after_first_read = namespace.get_describe_call_count()
# Wait for credentials to expire (3 seconds + buffer)
time.sleep(5)
# Perform read after expiration (should trigger credential refresh)
result = opened_table.to_pandas()
assert len(result) == 3
describe_count_after_refresh = namespace.get_describe_call_count()
# Verify describe_table was called again (credential refresh)
refresh_delta = describe_count_after_refresh - describe_count_after_first_read
# Verify the exact count: credential refresh should call describe_table exactly
# once
assert refresh_delta == 1, (
f"Credential refresh should call describe_table exactly once "
f"(got {refresh_delta})"
)
@pytest.mark.s3_test
def test_namespace_credential_refresh_on_write(s3_bucket: str):
"""
Test credential refresh when credentials expire during write operations.
Verifies:
- Credentials are cached during initial writes
- After expiration, new credentials are fetched before writes
- Write operations continue to work with refreshed credentials
"""
storage_options = copy.deepcopy(CONFIG)
namespace = TrackingNamespace(
bucket_name=s3_bucket,
storage_options=storage_options,
credential_expires_in_seconds=3, # Short expiration
)
db = LanceNamespaceDBConnection(namespace)
# Create unique namespace for this test
namespace_name = f"test_ns_{uuid.uuid4().hex[:8]}"
db.create_namespace([namespace_name])
table_name = f"test_table_{uuid.uuid4().hex}"
namespace_path = [namespace_name]
# Create table
initial_data = pa.table(
{
"id": [1, 2],
"vector": [[1.0, 2.0], [3.0, 4.0]],
}
)
table = db.create_table(table_name, initial_data, namespace=namespace_path)
# Add more data (should use cached credentials)
new_data = pa.table(
{
"id": [3, 4],
"vector": [[5.0, 6.0], [7.0, 8.0]],
}
)
table.add(new_data)
# Wait for credentials to expire
time.sleep(5)
# Add more data (should trigger credential refresh)
more_data = pa.table(
{
"id": [5, 6],
"vector": [[9.0, 10.0], [11.0, 12.0]],
}
)
table.add(more_data)
# Verify final row count
assert table.count_rows() == 6
@pytest.mark.s3_test
def test_namespace_overwrite_mode(s3_bucket: str):
"""
Test creating table in overwrite mode with credential tracking.
Verifies:
- First create calls create_empty_table exactly once
- Overwrite mode calls describe_table exactly once to check existence
- Storage options provider works in overwrite mode
"""
storage_options = copy.deepcopy(CONFIG)
namespace = TrackingNamespace(
bucket_name=s3_bucket,
storage_options=storage_options,
credential_expires_in_seconds=3600,
)
db = LanceNamespaceDBConnection(namespace)
# Create unique namespace for this test
namespace_name = f"test_ns_{uuid.uuid4().hex[:8]}"
db.create_namespace([namespace_name])
table_name = f"test_table_{uuid.uuid4().hex}"
namespace_path = [namespace_name]
# Create initial table
data1 = pa.table(
{
"id": [1, 2],
"vector": [[1.0, 2.0], [3.0, 4.0]],
}
)
table = db.create_table(table_name, data1, namespace=namespace_path)
# Exactly one create_empty_table call for initial create
assert namespace.get_create_call_count() == 1
# No describe_table calls in create mode
assert namespace.get_describe_call_count() == 0
assert table.count_rows() == 2
# Overwrite the table
data2 = pa.table(
{
"id": [10, 20, 30],
"vector": [[10.0, 20.0], [30.0, 40.0], [50.0, 60.0]],
}
)
table2 = db.create_table(
table_name, data2, namespace=namespace_path, mode="overwrite"
)
# Should still have only 1 create_empty_table call
# (overwrite reuses location from describe_table)
assert namespace.get_create_call_count() == 1
# Should have called describe_table exactly once to get existing table location
assert namespace.get_describe_call_count() == 1
# Verify new data
assert table2.count_rows() == 3
result = table2.to_pandas()
assert list(result["id"]) == [10, 20, 30]
@pytest.mark.s3_test
def test_namespace_multiple_tables(s3_bucket: str):
"""
Test creating and opening multiple tables in the same namespace.
Verifies:
- Each table gets its own storage options provider
- Credentials are tracked independently per table
- Multiple tables can coexist in the same namespace
"""
storage_options = copy.deepcopy(CONFIG)
namespace = TrackingNamespace(
bucket_name=s3_bucket,
storage_options=storage_options,
credential_expires_in_seconds=3600,
)
db = LanceNamespaceDBConnection(namespace)
# Create unique namespace for this test
namespace_name = f"test_ns_{uuid.uuid4().hex[:8]}"
db.create_namespace([namespace_name])
namespace_path = [namespace_name]
# Create first table
table1_name = f"table1_{uuid.uuid4().hex}"
data1 = pa.table({"id": [1, 2], "value": [10, 20]})
db.create_table(table1_name, data1, namespace=namespace_path)
# Create second table
table2_name = f"table2_{uuid.uuid4().hex}"
data2 = pa.table({"id": [3, 4], "value": [30, 40]})
db.create_table(table2_name, data2, namespace=namespace_path)
# Should have 2 create calls (one per table)
assert namespace.get_create_call_count() == 2
# Open both tables
opened1 = db.open_table(table1_name, namespace=namespace_path)
opened2 = db.open_table(table2_name, namespace=namespace_path)
# Should have 2 describe calls (one per open)
assert namespace.get_describe_call_count() == 2
# Verify both tables work independently
assert opened1.count_rows() == 2
assert opened2.count_rows() == 2
result1 = opened1.to_pandas()
result2 = opened2.to_pandas()
assert list(result1["id"]) == [1, 2]
assert list(result2["id"]) == [3, 4]
@pytest.mark.s3_test
def test_namespace_with_schema_only(s3_bucket: str):
"""
Test creating empty table with schema only (no data).
Verifies:
- Empty table creation works with storage options provider
- describe_table is NOT called during create
- Data can be added later
"""
storage_options = copy.deepcopy(CONFIG)
namespace = TrackingNamespace(
bucket_name=s3_bucket,
storage_options=storage_options,
credential_expires_in_seconds=3600,
)
db = LanceNamespaceDBConnection(namespace)
# Create unique namespace for this test
namespace_name = f"test_ns_{uuid.uuid4().hex[:8]}"
db.create_namespace([namespace_name])
table_name = f"test_table_{uuid.uuid4().hex}"
namespace_path = [namespace_name]
# Create empty table with schema
schema = pa.schema(
[
pa.field("id", pa.int64()),
pa.field("vector", pa.list_(pa.float32(), 2)),
pa.field("text", pa.utf8()),
]
)
table = db.create_table(table_name, schema=schema, namespace=namespace_path)
# Should have called create_empty_table once
assert namespace.get_create_call_count() == 1
# Should NOT have called describe_table in create mode
assert namespace.get_describe_call_count() == 0
# Verify empty table
assert table.count_rows() == 0
# Add data
data = pa.table(
{
"id": [1, 2],
"vector": [[1.0, 2.0], [3.0, 4.0]],
"text": ["hello", "world"],
}
)
table.add(data)
# Verify data was added
assert table.count_rows() == 2