mirror of
https://github.com/lancedb/lancedb.git
synced 2026-03-24 17:40:41 +00:00
Supercedes https://github.com/lancedb/lancedb/pull/2925 We accidentally upgraded lance to 2.0.0-beta.8. This PR reverts that first and then bump to 1.0.3-rc.1 --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
648 lines
20 KiB
Python
648 lines
20 KiB
Python
# SPDX-License-Identifier: Apache-2.0
|
|
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
|
|
|
"""
|
|
Integration tests for LanceDB Namespace with S3 and credential refresh.
|
|
|
|
This test simulates a namespace server that returns incrementing credentials
|
|
and verifies that the credential refresh mechanism works correctly for both
|
|
create_table and open_table operations.
|
|
|
|
Tests verify:
|
|
- Storage options provider is auto-created and used
|
|
- Credentials are properly cached during reads
|
|
- Credentials refresh when they expire
|
|
- Both create and open operations work with credential rotation
|
|
"""
|
|
|
|
import copy
|
|
import time
|
|
import uuid
|
|
from threading import Lock
|
|
from typing import Dict
|
|
|
|
import pyarrow as pa
|
|
import pytest
|
|
from lance_namespace import (
|
|
CreateEmptyTableRequest,
|
|
CreateEmptyTableResponse,
|
|
DeclareTableRequest,
|
|
DeclareTableResponse,
|
|
DescribeTableRequest,
|
|
DescribeTableResponse,
|
|
LanceNamespace,
|
|
)
|
|
from lancedb.namespace import LanceNamespaceDBConnection
|
|
|
|
# LocalStack S3 configuration
|
|
CONFIG = {
|
|
"allow_http": "true",
|
|
"aws_access_key_id": "ACCESSKEY",
|
|
"aws_secret_access_key": "SECRETKEY",
|
|
"aws_endpoint": "http://localhost:4566",
|
|
"aws_region": "us-east-1",
|
|
}
|
|
|
|
|
|
def get_boto3_client(*args, **kwargs):
|
|
import boto3
|
|
|
|
return boto3.client(
|
|
*args,
|
|
region_name=CONFIG["aws_region"],
|
|
aws_access_key_id=CONFIG["aws_access_key_id"],
|
|
aws_secret_access_key=CONFIG["aws_secret_access_key"],
|
|
**kwargs,
|
|
)
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def s3_bucket():
|
|
"""Create and cleanup S3 bucket for integration tests."""
|
|
s3 = get_boto3_client("s3", endpoint_url=CONFIG["aws_endpoint"])
|
|
bucket_name = "lancedb-namespace-integtest"
|
|
|
|
# Clean up existing bucket if it exists
|
|
try:
|
|
delete_bucket(s3, bucket_name)
|
|
except s3.exceptions.NoSuchBucket:
|
|
pass
|
|
|
|
s3.create_bucket(Bucket=bucket_name)
|
|
yield bucket_name
|
|
|
|
# Cleanup after tests
|
|
delete_bucket(s3, bucket_name)
|
|
|
|
|
|
def delete_bucket(s3, bucket_name):
|
|
"""Delete S3 bucket and all its contents."""
|
|
try:
|
|
# Delete all objects first
|
|
paginator = s3.get_paginator("list_objects_v2")
|
|
for page in paginator.paginate(Bucket=bucket_name):
|
|
if "Contents" in page:
|
|
for obj in page["Contents"]:
|
|
s3.delete_object(Bucket=bucket_name, Key=obj["Key"])
|
|
s3.delete_bucket(Bucket=bucket_name)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
class TrackingNamespace(LanceNamespace):
|
|
"""
|
|
Mock namespace that wraps DirectoryNamespace and tracks API calls.
|
|
|
|
This namespace returns incrementing credentials with each API call to simulate
|
|
credential rotation. It also tracks the number of times each API is called
|
|
to verify caching behavior.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
bucket_name: str,
|
|
storage_options: Dict[str, str],
|
|
credential_expires_in_seconds: int = 60,
|
|
):
|
|
from lance.namespace import DirectoryNamespace
|
|
|
|
self.bucket_name = bucket_name
|
|
self.base_storage_options = storage_options
|
|
self.credential_expires_in_seconds = credential_expires_in_seconds
|
|
self.describe_call_count = 0
|
|
self.create_call_count = 0
|
|
self.lock = Lock()
|
|
|
|
# Create underlying DirectoryNamespace with storage options
|
|
dir_props = {f"storage.{k}": v for k, v in storage_options.items()}
|
|
|
|
# Use S3 path for bucket name, local path for file paths
|
|
if bucket_name.startswith("/") or bucket_name.startswith("file://"):
|
|
dir_props["root"] = f"{bucket_name}/namespace_root"
|
|
else:
|
|
dir_props["root"] = f"s3://{bucket_name}/namespace_root"
|
|
|
|
self.inner = DirectoryNamespace(**dir_props)
|
|
|
|
def get_describe_call_count(self) -> int:
|
|
"""Thread-safe getter for describe call count."""
|
|
with self.lock:
|
|
return self.describe_call_count
|
|
|
|
def get_create_call_count(self) -> int:
|
|
"""Thread-safe getter for create call count."""
|
|
with self.lock:
|
|
return self.create_call_count
|
|
|
|
def namespace_id(self) -> str:
|
|
"""Return namespace identifier."""
|
|
return f"TrackingNamespace {{ inner: {self.inner.namespace_id()} }}"
|
|
|
|
def _modify_storage_options(
|
|
self, storage_options: Dict[str, str], count: int
|
|
) -> Dict[str, str]:
|
|
"""
|
|
Add incrementing credentials with expiration timestamp.
|
|
|
|
This simulates a credential rotation system where each call returns
|
|
new credentials that expire after credential_expires_in_seconds.
|
|
"""
|
|
modified = copy.deepcopy(storage_options) if storage_options else {}
|
|
|
|
# Increment credentials to simulate rotation
|
|
modified["aws_access_key_id"] = f"AKID_{count}"
|
|
modified["aws_secret_access_key"] = f"SECRET_{count}"
|
|
modified["aws_session_token"] = f"TOKEN_{count}"
|
|
|
|
# Set expiration time
|
|
expires_at_millis = int(
|
|
(time.time() + self.credential_expires_in_seconds) * 1000
|
|
)
|
|
modified["expires_at_millis"] = str(expires_at_millis)
|
|
|
|
return modified
|
|
|
|
def declare_table(self, request: DeclareTableRequest) -> DeclareTableResponse:
|
|
"""Track declare_table calls and inject rotating credentials."""
|
|
with self.lock:
|
|
self.create_call_count += 1
|
|
count = self.create_call_count
|
|
|
|
response = self.inner.declare_table(request)
|
|
response.storage_options = self._modify_storage_options(
|
|
response.storage_options, count
|
|
)
|
|
|
|
return response
|
|
|
|
def create_empty_table(
|
|
self, request: CreateEmptyTableRequest
|
|
) -> CreateEmptyTableResponse:
|
|
"""Track create_empty_table calls and inject rotating credentials."""
|
|
with self.lock:
|
|
self.create_call_count += 1
|
|
count = self.create_call_count
|
|
|
|
response = self.inner.create_empty_table(request)
|
|
response.storage_options = self._modify_storage_options(
|
|
response.storage_options, count
|
|
)
|
|
|
|
return response
|
|
|
|
def describe_table(self, request: DescribeTableRequest) -> DescribeTableResponse:
|
|
"""Track describe_table calls and inject rotating credentials."""
|
|
with self.lock:
|
|
self.describe_call_count += 1
|
|
count = self.describe_call_count
|
|
|
|
response = self.inner.describe_table(request)
|
|
response.storage_options = self._modify_storage_options(
|
|
response.storage_options, count
|
|
)
|
|
|
|
return response
|
|
|
|
# Pass through other methods to inner namespace
|
|
def list_tables(self, request):
|
|
return self.inner.list_tables(request)
|
|
|
|
def drop_table(self, request):
|
|
return self.inner.drop_table(request)
|
|
|
|
def list_namespaces(self, request):
|
|
return self.inner.list_namespaces(request)
|
|
|
|
def create_namespace(self, request):
|
|
return self.inner.create_namespace(request)
|
|
|
|
def drop_namespace(self, request):
|
|
return self.inner.drop_namespace(request)
|
|
|
|
|
|
@pytest.mark.s3_test
|
|
def test_namespace_create_table_with_provider(s3_bucket: str):
|
|
"""
|
|
Test creating a table through namespace with storage options provider.
|
|
|
|
Verifies:
|
|
- create_empty_table is called once to reserve location
|
|
- Storage options provider is auto-created
|
|
- Table can be written successfully
|
|
- Credentials are cached during write operations
|
|
"""
|
|
storage_options = copy.deepcopy(CONFIG)
|
|
|
|
namespace = TrackingNamespace(
|
|
bucket_name=s3_bucket,
|
|
storage_options=storage_options,
|
|
credential_expires_in_seconds=3600, # 1 hour
|
|
)
|
|
|
|
db = LanceNamespaceDBConnection(namespace)
|
|
|
|
# Create unique namespace for this test
|
|
namespace_name = f"test_ns_{uuid.uuid4().hex[:8]}"
|
|
db.create_namespace([namespace_name])
|
|
|
|
table_name = f"test_table_{uuid.uuid4().hex}"
|
|
namespace_path = [namespace_name]
|
|
|
|
# Verify initial state
|
|
assert namespace.get_create_call_count() == 0
|
|
assert namespace.get_describe_call_count() == 0
|
|
|
|
# Create table with data
|
|
data = pa.table(
|
|
{
|
|
"id": [1, 2, 3],
|
|
"vector": [[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]],
|
|
"text": ["hello", "world", "test"],
|
|
}
|
|
)
|
|
|
|
table = db.create_table(table_name, data, namespace=namespace_path)
|
|
|
|
# Verify create_empty_table was called exactly once
|
|
assert namespace.get_create_call_count() == 1
|
|
# describe_table should NOT be called during create in create mode
|
|
assert namespace.get_describe_call_count() == 0
|
|
|
|
# Verify table was created successfully
|
|
assert table.name == table_name
|
|
result = table.to_pandas()
|
|
assert len(result) == 3
|
|
assert list(result["id"]) == [1, 2, 3]
|
|
|
|
|
|
@pytest.mark.s3_test
|
|
def test_namespace_open_table_with_provider(s3_bucket: str):
|
|
"""
|
|
Test opening a table through namespace with storage options provider.
|
|
|
|
Verifies:
|
|
- describe_table is called once when opening
|
|
- Storage options provider is auto-created
|
|
- Table can be read successfully
|
|
- Credentials are cached during read operations
|
|
"""
|
|
storage_options = copy.deepcopy(CONFIG)
|
|
|
|
namespace = TrackingNamespace(
|
|
bucket_name=s3_bucket,
|
|
storage_options=storage_options,
|
|
credential_expires_in_seconds=3600,
|
|
)
|
|
|
|
db = LanceNamespaceDBConnection(namespace)
|
|
|
|
# Create unique namespace for this test
|
|
namespace_name = f"test_ns_{uuid.uuid4().hex[:8]}"
|
|
db.create_namespace([namespace_name])
|
|
|
|
table_name = f"test_table_{uuid.uuid4().hex}"
|
|
namespace_path = [namespace_name]
|
|
|
|
# Create table first
|
|
data = pa.table(
|
|
{
|
|
"id": [1, 2, 3, 4, 5],
|
|
"vector": [[1.0, 2.0], [3.0, 4.0], [5.0, 6.0], [7.0, 8.0], [9.0, 10.0]],
|
|
"value": [10, 20, 30, 40, 50],
|
|
}
|
|
)
|
|
|
|
db.create_table(table_name, data, namespace=namespace_path)
|
|
|
|
initial_create_count = namespace.get_create_call_count()
|
|
assert initial_create_count == 1
|
|
|
|
# Open the table
|
|
opened_table = db.open_table(table_name, namespace=namespace_path)
|
|
|
|
# Verify describe_table was called exactly once
|
|
assert namespace.get_describe_call_count() == 1
|
|
# create_empty_table should not be called again
|
|
assert namespace.get_create_call_count() == initial_create_count
|
|
|
|
# Perform multiple read operations
|
|
describe_count_after_open = namespace.get_describe_call_count()
|
|
|
|
for _ in range(3):
|
|
result = opened_table.to_pandas()
|
|
assert len(result) == 5
|
|
count = opened_table.count_rows()
|
|
assert count == 5
|
|
|
|
# Verify credentials were cached (no additional describe_table calls)
|
|
assert namespace.get_describe_call_count() == describe_count_after_open
|
|
|
|
|
|
@pytest.mark.s3_test
|
|
def test_namespace_credential_refresh_on_read(s3_bucket: str):
|
|
"""
|
|
Test credential refresh when credentials expire during read operations.
|
|
|
|
Verifies:
|
|
- Credentials are cached initially (no additional describe_table calls)
|
|
- After expiration, credentials are refreshed (describe_table called again)
|
|
- Read operations continue to work with refreshed credentials
|
|
"""
|
|
storage_options = copy.deepcopy(CONFIG)
|
|
|
|
namespace = TrackingNamespace(
|
|
bucket_name=s3_bucket,
|
|
storage_options=storage_options,
|
|
credential_expires_in_seconds=3, # Short expiration for testing
|
|
)
|
|
|
|
db = LanceNamespaceDBConnection(namespace)
|
|
|
|
# Create unique namespace for this test
|
|
namespace_name = f"test_ns_{uuid.uuid4().hex[:8]}"
|
|
db.create_namespace([namespace_name])
|
|
|
|
table_name = f"test_table_{uuid.uuid4().hex}"
|
|
namespace_path = [namespace_name]
|
|
|
|
# Create table
|
|
data = pa.table(
|
|
{
|
|
"id": [1, 2, 3],
|
|
"vector": [[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]],
|
|
}
|
|
)
|
|
|
|
db.create_table(table_name, data, namespace=namespace_path)
|
|
|
|
# Open table (triggers describe_table)
|
|
opened_table = db.open_table(table_name, namespace=namespace_path)
|
|
|
|
# Perform an immediate read (should use credentials from open)
|
|
result = opened_table.to_pandas()
|
|
assert len(result) == 3
|
|
|
|
describe_count_after_first_read = namespace.get_describe_call_count()
|
|
|
|
# Wait for credentials to expire (3 seconds + buffer)
|
|
time.sleep(5)
|
|
|
|
# Perform read after expiration (should trigger credential refresh)
|
|
result = opened_table.to_pandas()
|
|
assert len(result) == 3
|
|
|
|
describe_count_after_refresh = namespace.get_describe_call_count()
|
|
# Verify describe_table was called again (credential refresh)
|
|
refresh_delta = describe_count_after_refresh - describe_count_after_first_read
|
|
|
|
# Verify the exact count: credential refresh should call describe_table exactly
|
|
# once
|
|
assert refresh_delta == 1, (
|
|
f"Credential refresh should call describe_table exactly once "
|
|
f"(got {refresh_delta})"
|
|
)
|
|
|
|
|
|
@pytest.mark.s3_test
|
|
def test_namespace_credential_refresh_on_write(s3_bucket: str):
|
|
"""
|
|
Test credential refresh when credentials expire during write operations.
|
|
|
|
Verifies:
|
|
- Credentials are cached during initial writes
|
|
- After expiration, new credentials are fetched before writes
|
|
- Write operations continue to work with refreshed credentials
|
|
"""
|
|
storage_options = copy.deepcopy(CONFIG)
|
|
|
|
namespace = TrackingNamespace(
|
|
bucket_name=s3_bucket,
|
|
storage_options=storage_options,
|
|
credential_expires_in_seconds=3, # Short expiration
|
|
)
|
|
|
|
db = LanceNamespaceDBConnection(namespace)
|
|
|
|
# Create unique namespace for this test
|
|
namespace_name = f"test_ns_{uuid.uuid4().hex[:8]}"
|
|
db.create_namespace([namespace_name])
|
|
|
|
table_name = f"test_table_{uuid.uuid4().hex}"
|
|
namespace_path = [namespace_name]
|
|
|
|
# Create table
|
|
initial_data = pa.table(
|
|
{
|
|
"id": [1, 2],
|
|
"vector": [[1.0, 2.0], [3.0, 4.0]],
|
|
}
|
|
)
|
|
|
|
table = db.create_table(table_name, initial_data, namespace=namespace_path)
|
|
|
|
# Add more data (should use cached credentials)
|
|
new_data = pa.table(
|
|
{
|
|
"id": [3, 4],
|
|
"vector": [[5.0, 6.0], [7.0, 8.0]],
|
|
}
|
|
)
|
|
table.add(new_data)
|
|
|
|
# Wait for credentials to expire
|
|
time.sleep(5)
|
|
|
|
# Add more data (should trigger credential refresh)
|
|
more_data = pa.table(
|
|
{
|
|
"id": [5, 6],
|
|
"vector": [[9.0, 10.0], [11.0, 12.0]],
|
|
}
|
|
)
|
|
table.add(more_data)
|
|
|
|
# Verify final row count
|
|
assert table.count_rows() == 6
|
|
|
|
|
|
@pytest.mark.s3_test
|
|
def test_namespace_overwrite_mode(s3_bucket: str):
|
|
"""
|
|
Test creating table in overwrite mode with credential tracking.
|
|
|
|
Verifies:
|
|
- First create calls create_empty_table exactly once
|
|
- Overwrite mode calls describe_table exactly once to check existence
|
|
- Storage options provider works in overwrite mode
|
|
"""
|
|
storage_options = copy.deepcopy(CONFIG)
|
|
|
|
namespace = TrackingNamespace(
|
|
bucket_name=s3_bucket,
|
|
storage_options=storage_options,
|
|
credential_expires_in_seconds=3600,
|
|
)
|
|
|
|
db = LanceNamespaceDBConnection(namespace)
|
|
|
|
# Create unique namespace for this test
|
|
namespace_name = f"test_ns_{uuid.uuid4().hex[:8]}"
|
|
db.create_namespace([namespace_name])
|
|
|
|
table_name = f"test_table_{uuid.uuid4().hex}"
|
|
namespace_path = [namespace_name]
|
|
|
|
# Create initial table
|
|
data1 = pa.table(
|
|
{
|
|
"id": [1, 2],
|
|
"vector": [[1.0, 2.0], [3.0, 4.0]],
|
|
}
|
|
)
|
|
|
|
table = db.create_table(table_name, data1, namespace=namespace_path)
|
|
# Exactly one create_empty_table call for initial create
|
|
assert namespace.get_create_call_count() == 1
|
|
# No describe_table calls in create mode
|
|
assert namespace.get_describe_call_count() == 0
|
|
assert table.count_rows() == 2
|
|
|
|
# Overwrite the table
|
|
data2 = pa.table(
|
|
{
|
|
"id": [10, 20, 30],
|
|
"vector": [[10.0, 20.0], [30.0, 40.0], [50.0, 60.0]],
|
|
}
|
|
)
|
|
|
|
table2 = db.create_table(
|
|
table_name, data2, namespace=namespace_path, mode="overwrite"
|
|
)
|
|
|
|
# Should still have only 1 create_empty_table call
|
|
# (overwrite reuses location from describe_table)
|
|
assert namespace.get_create_call_count() == 1
|
|
# Should have called describe_table exactly once to get existing table location
|
|
assert namespace.get_describe_call_count() == 1
|
|
|
|
# Verify new data
|
|
assert table2.count_rows() == 3
|
|
result = table2.to_pandas()
|
|
assert list(result["id"]) == [10, 20, 30]
|
|
|
|
|
|
@pytest.mark.s3_test
|
|
def test_namespace_multiple_tables(s3_bucket: str):
|
|
"""
|
|
Test creating and opening multiple tables in the same namespace.
|
|
|
|
Verifies:
|
|
- Each table gets its own storage options provider
|
|
- Credentials are tracked independently per table
|
|
- Multiple tables can coexist in the same namespace
|
|
"""
|
|
storage_options = copy.deepcopy(CONFIG)
|
|
|
|
namespace = TrackingNamespace(
|
|
bucket_name=s3_bucket,
|
|
storage_options=storage_options,
|
|
credential_expires_in_seconds=3600,
|
|
)
|
|
|
|
db = LanceNamespaceDBConnection(namespace)
|
|
|
|
# Create unique namespace for this test
|
|
namespace_name = f"test_ns_{uuid.uuid4().hex[:8]}"
|
|
db.create_namespace([namespace_name])
|
|
namespace_path = [namespace_name]
|
|
|
|
# Create first table
|
|
table1_name = f"table1_{uuid.uuid4().hex}"
|
|
data1 = pa.table({"id": [1, 2], "value": [10, 20]})
|
|
db.create_table(table1_name, data1, namespace=namespace_path)
|
|
|
|
# Create second table
|
|
table2_name = f"table2_{uuid.uuid4().hex}"
|
|
data2 = pa.table({"id": [3, 4], "value": [30, 40]})
|
|
db.create_table(table2_name, data2, namespace=namespace_path)
|
|
|
|
# Should have 2 create calls (one per table)
|
|
assert namespace.get_create_call_count() == 2
|
|
|
|
# Open both tables
|
|
opened1 = db.open_table(table1_name, namespace=namespace_path)
|
|
opened2 = db.open_table(table2_name, namespace=namespace_path)
|
|
|
|
# Should have 2 describe calls (one per open)
|
|
assert namespace.get_describe_call_count() == 2
|
|
|
|
# Verify both tables work independently
|
|
assert opened1.count_rows() == 2
|
|
assert opened2.count_rows() == 2
|
|
|
|
result1 = opened1.to_pandas()
|
|
result2 = opened2.to_pandas()
|
|
|
|
assert list(result1["id"]) == [1, 2]
|
|
assert list(result2["id"]) == [3, 4]
|
|
|
|
|
|
@pytest.mark.s3_test
|
|
def test_namespace_with_schema_only(s3_bucket: str):
|
|
"""
|
|
Test creating empty table with schema only (no data).
|
|
|
|
Verifies:
|
|
- Empty table creation works with storage options provider
|
|
- describe_table is NOT called during create
|
|
- Data can be added later
|
|
"""
|
|
storage_options = copy.deepcopy(CONFIG)
|
|
|
|
namespace = TrackingNamespace(
|
|
bucket_name=s3_bucket,
|
|
storage_options=storage_options,
|
|
credential_expires_in_seconds=3600,
|
|
)
|
|
|
|
db = LanceNamespaceDBConnection(namespace)
|
|
|
|
# Create unique namespace for this test
|
|
namespace_name = f"test_ns_{uuid.uuid4().hex[:8]}"
|
|
db.create_namespace([namespace_name])
|
|
|
|
table_name = f"test_table_{uuid.uuid4().hex}"
|
|
namespace_path = [namespace_name]
|
|
|
|
# Create empty table with schema
|
|
schema = pa.schema(
|
|
[
|
|
pa.field("id", pa.int64()),
|
|
pa.field("vector", pa.list_(pa.float32(), 2)),
|
|
pa.field("text", pa.utf8()),
|
|
]
|
|
)
|
|
|
|
table = db.create_table(table_name, schema=schema, namespace=namespace_path)
|
|
|
|
# Should have called create_empty_table once
|
|
assert namespace.get_create_call_count() == 1
|
|
# Should NOT have called describe_table in create mode
|
|
assert namespace.get_describe_call_count() == 0
|
|
|
|
# Verify empty table
|
|
assert table.count_rows() == 0
|
|
|
|
# Add data
|
|
data = pa.table(
|
|
{
|
|
"id": [1, 2],
|
|
"vector": [[1.0, 2.0], [3.0, 4.0]],
|
|
"text": ["hello", "world"],
|
|
}
|
|
)
|
|
table.add(data)
|
|
|
|
# Verify data was added
|
|
assert table.count_rows() == 2
|