mirror of
https://github.com/lancedb/lancedb.git
synced 2026-03-30 04:20:39 +00:00
feat!: support multi-level namespace (#2603)
This PR adds support of multi-level namespace in a LanceDB database,
according to the Lance Namespace spec.
This allows users to create namespace inside a database connection,
perform create, drop, list, list_tables in a namespace. (other
operations like update, describe will be in a follow-up PR)
The 3 types of database connections behave like the following:
1 Local database connections will continue to have just a flat list of
tables for backwards compatibility.
2. Remote database connections will make REST API calls according to the
APIs in the Lance Namespace spec.
3. Lance Namespace connections will invoke the corresponding operations
against the specific namespace implementation which could have different
behaviors regarding these APIs.
All the table APIs now take identifier instead of name, for example
`/v1/table/{name}/create` is now `/v1/table/{id}/create`. If a table is
directly in the root namespace, the API call is identical. If the table
is in a namespace, then the full table ID should be used, with `$` as
the default delimiter (`.` is a special character and creates issues
with URL parsing so `$` is used), for example
`/v1/table/ns1$table1/create`. If a different parameter needs to be
passed in, user can configure the `id_delimiter` in client config and
that becomes a query parameter, for example
`/v1/table/ns1__table1/create?delimiter=__`
The Python and Typescript APIs are kept backwards compatible, but the
following Rust APIs are not:
1. `Connection::drop_table(&self, name: impl AsRef<str>) -> Result<()>`
is now `Connection::drop_table(&self, name: impl AsRef<str>, namespace:
&[String]) -> Result<()>`
2. `Connection::drop_all_tables(&self) -> Result<()>` is now
`Connection::drop_all_tables(&self, name: impl AsRef<str>) ->
Result<()>`
This commit is contained in:
@@ -23,6 +23,12 @@ from lance_namespace_urllib3_client.models import (
|
||||
CreateTableResponse,
|
||||
DropTableRequest,
|
||||
DropTableResponse,
|
||||
ListNamespacesRequest,
|
||||
ListNamespacesResponse,
|
||||
CreateNamespaceRequest,
|
||||
CreateNamespaceResponse,
|
||||
DropNamespaceRequest,
|
||||
DropNamespaceResponse,
|
||||
)
|
||||
|
||||
|
||||
@@ -31,6 +37,8 @@ class TempNamespace(LanceNamespace):
|
||||
|
||||
# Class-level storage to persist table registry across instances
|
||||
_global_registry: Dict[str, Dict[str, str]] = {}
|
||||
# Class-level storage for namespaces (supporting 1-level namespace)
|
||||
_global_namespaces: Dict[str, set] = {}
|
||||
|
||||
def __init__(self, **properties):
|
||||
"""Initialize the test namespace.
|
||||
@@ -44,20 +52,48 @@ class TempNamespace(LanceNamespace):
|
||||
root = self.config.root
|
||||
if root not in self._global_registry:
|
||||
self._global_registry[root] = {}
|
||||
if root not in self._global_namespaces:
|
||||
self._global_namespaces[root] = set()
|
||||
self.tables = self._global_registry[root] # Reference to shared registry
|
||||
self.namespaces = self._global_namespaces[
|
||||
root
|
||||
] # Reference to shared namespaces
|
||||
|
||||
def list_tables(self, request: ListTablesRequest) -> ListTablesResponse:
|
||||
"""List all tables in the namespace."""
|
||||
# For simplicity, ignore namespace ID validation
|
||||
tables = list(self.tables.keys())
|
||||
if not request.id:
|
||||
# List all tables in root namespace
|
||||
tables = [name for name in self.tables.keys() if "." not in name]
|
||||
else:
|
||||
# List tables in specific namespace (1-level only)
|
||||
if len(request.id) == 1:
|
||||
namespace_name = request.id[0]
|
||||
prefix = f"{namespace_name}."
|
||||
tables = [
|
||||
name[len(prefix) :]
|
||||
for name in self.tables.keys()
|
||||
if name.startswith(prefix)
|
||||
]
|
||||
else:
|
||||
# Multi-level namespaces not supported
|
||||
raise ValueError("Only 1-level namespaces are supported")
|
||||
return ListTablesResponse(tables=tables)
|
||||
|
||||
def describe_table(self, request: DescribeTableRequest) -> DescribeTableResponse:
|
||||
"""Describe a table by returning its location."""
|
||||
if not request.id or len(request.id) != 1:
|
||||
if not request.id:
|
||||
raise ValueError("Invalid table ID")
|
||||
|
||||
table_name = request.id[0]
|
||||
if len(request.id) == 1:
|
||||
# Root namespace table
|
||||
table_name = request.id[0]
|
||||
elif len(request.id) == 2:
|
||||
# Namespaced table (1-level namespace)
|
||||
namespace_name, table_name = request.id
|
||||
table_name = f"{namespace_name}.{table_name}"
|
||||
else:
|
||||
raise ValueError("Only 1-level namespaces are supported")
|
||||
|
||||
if table_name not in self.tables:
|
||||
raise RuntimeError(f"Table does not exist: {table_name}")
|
||||
|
||||
@@ -68,10 +104,22 @@ class TempNamespace(LanceNamespace):
|
||||
self, request: CreateTableRequest, request_data: bytes
|
||||
) -> CreateTableResponse:
|
||||
"""Create a table in the namespace."""
|
||||
if not request.id or len(request.id) != 1:
|
||||
if not request.id:
|
||||
raise ValueError("Invalid table ID")
|
||||
|
||||
table_name = request.id[0]
|
||||
if len(request.id) == 1:
|
||||
# Root namespace table
|
||||
table_name = request.id[0]
|
||||
table_uri = f"{self.config.root}/{table_name}.lance"
|
||||
elif len(request.id) == 2:
|
||||
# Namespaced table (1-level namespace)
|
||||
namespace_name, base_table_name = request.id
|
||||
# Add namespace to our namespace set
|
||||
self.namespaces.add(namespace_name)
|
||||
table_name = f"{namespace_name}.{base_table_name}"
|
||||
table_uri = f"{self.config.root}/{namespace_name}/{base_table_name}.lance"
|
||||
else:
|
||||
raise ValueError("Only 1-level namespaces are supported")
|
||||
|
||||
# Check if table already exists
|
||||
if table_name in self.tables:
|
||||
@@ -81,13 +129,14 @@ class TempNamespace(LanceNamespace):
|
||||
else:
|
||||
raise RuntimeError(f"Table already exists: {table_name}")
|
||||
|
||||
# Generate table URI based on root directory
|
||||
table_uri = f"{self.config.root}/{table_name}.lance"
|
||||
|
||||
# Parse the Arrow IPC stream to get the schema and create the actual table
|
||||
import pyarrow.ipc as ipc
|
||||
import io
|
||||
import lance
|
||||
import os
|
||||
|
||||
# Create directory if needed for namespaced tables
|
||||
os.makedirs(os.path.dirname(table_uri), exist_ok=True)
|
||||
|
||||
# Read the IPC stream
|
||||
reader = ipc.open_stream(io.BytesIO(request_data))
|
||||
@@ -103,10 +152,19 @@ class TempNamespace(LanceNamespace):
|
||||
|
||||
def drop_table(self, request: DropTableRequest) -> DropTableResponse:
|
||||
"""Drop a table from the namespace."""
|
||||
if not request.id or len(request.id) != 1:
|
||||
if not request.id:
|
||||
raise ValueError("Invalid table ID")
|
||||
|
||||
table_name = request.id[0]
|
||||
if len(request.id) == 1:
|
||||
# Root namespace table
|
||||
table_name = request.id[0]
|
||||
elif len(request.id) == 2:
|
||||
# Namespaced table (1-level namespace)
|
||||
namespace_name, base_table_name = request.id
|
||||
table_name = f"{namespace_name}.{base_table_name}"
|
||||
else:
|
||||
raise ValueError("Only 1-level namespaces are supported")
|
||||
|
||||
if table_name not in self.tables:
|
||||
raise RuntimeError(f"Table does not exist: {table_name}")
|
||||
|
||||
@@ -152,6 +210,78 @@ class TempNamespace(LanceNamespace):
|
||||
del self.tables[table_name]
|
||||
return DeregisterTableResponse()
|
||||
|
||||
def list_namespaces(self, request: ListNamespacesRequest) -> ListNamespacesResponse:
|
||||
"""List child namespaces."""
|
||||
if not request.id:
|
||||
# List root-level namespaces
|
||||
namespaces = list(self.namespaces)
|
||||
elif len(request.id) == 1:
|
||||
# For 1-level namespace, there are no child namespaces
|
||||
namespaces = []
|
||||
else:
|
||||
raise ValueError("Only 1-level namespaces are supported")
|
||||
|
||||
return ListNamespacesResponse(namespaces=namespaces)
|
||||
|
||||
def create_namespace(
|
||||
self, request: CreateNamespaceRequest
|
||||
) -> CreateNamespaceResponse:
|
||||
"""Create a namespace."""
|
||||
if not request.id:
|
||||
raise ValueError("Invalid namespace ID")
|
||||
|
||||
if len(request.id) == 1:
|
||||
# Create 1-level namespace
|
||||
namespace_name = request.id[0]
|
||||
self.namespaces.add(namespace_name)
|
||||
|
||||
# Create directory for the namespace
|
||||
import os
|
||||
|
||||
namespace_dir = f"{self.config.root}/{namespace_name}"
|
||||
os.makedirs(namespace_dir, exist_ok=True)
|
||||
else:
|
||||
raise ValueError("Only 1-level namespaces are supported")
|
||||
|
||||
return CreateNamespaceResponse()
|
||||
|
||||
def drop_namespace(self, request: DropNamespaceRequest) -> DropNamespaceResponse:
|
||||
"""Drop a namespace."""
|
||||
if not request.id:
|
||||
raise ValueError("Invalid namespace ID")
|
||||
|
||||
if len(request.id) == 1:
|
||||
# Drop 1-level namespace
|
||||
namespace_name = request.id[0]
|
||||
|
||||
if namespace_name not in self.namespaces:
|
||||
raise RuntimeError(f"Namespace does not exist: {namespace_name}")
|
||||
|
||||
# Check if namespace has any tables
|
||||
prefix = f"{namespace_name}."
|
||||
tables_in_namespace = [
|
||||
name for name in self.tables.keys() if name.startswith(prefix)
|
||||
]
|
||||
if tables_in_namespace:
|
||||
raise RuntimeError(
|
||||
f"Cannot drop namespace '{namespace_name}': contains tables"
|
||||
)
|
||||
|
||||
# Remove namespace
|
||||
self.namespaces.remove(namespace_name)
|
||||
|
||||
# Remove directory
|
||||
import shutil
|
||||
import os
|
||||
|
||||
namespace_dir = f"{self.config.root}/{namespace_name}"
|
||||
if os.path.exists(namespace_dir):
|
||||
shutil.rmtree(namespace_dir, ignore_errors=True)
|
||||
else:
|
||||
raise ValueError("Only 1-level namespaces are supported")
|
||||
|
||||
return DropNamespaceResponse()
|
||||
|
||||
|
||||
class TempNamespaceConfig:
|
||||
"""Configuration for TestNamespace."""
|
||||
@@ -187,12 +317,16 @@ class TestNamespaceConnection:
|
||||
# Clear the TestNamespace registry for this test
|
||||
if self.temp_dir in TempNamespace._global_registry:
|
||||
TempNamespace._global_registry[self.temp_dir].clear()
|
||||
if self.temp_dir in TempNamespace._global_namespaces:
|
||||
TempNamespace._global_namespaces[self.temp_dir].clear()
|
||||
|
||||
def teardown_method(self):
|
||||
"""Clean up test fixtures."""
|
||||
# Clear the TestNamespace registry
|
||||
if self.temp_dir in TempNamespace._global_registry:
|
||||
del TempNamespace._global_registry[self.temp_dir]
|
||||
if self.temp_dir in TempNamespace._global_namespaces:
|
||||
del TempNamespace._global_namespaces[self.temp_dir]
|
||||
shutil.rmtree(self.temp_dir, ignore_errors=True)
|
||||
|
||||
def test_connect_namespace_test(self):
|
||||
@@ -412,3 +546,153 @@ class TestNamespaceConnection:
|
||||
]
|
||||
)
|
||||
db.create_table("test_table", schema=schema, storage_options=table_opts)
|
||||
|
||||
def test_namespace_operations(self):
|
||||
"""Test namespace management operations."""
|
||||
db = lancedb.connect_namespace("temp", {"root": self.temp_dir})
|
||||
|
||||
# Initially no namespaces
|
||||
assert len(list(db.list_namespaces())) == 0
|
||||
|
||||
# Create a namespace
|
||||
db.create_namespace(["test_namespace"])
|
||||
|
||||
# Verify namespace exists
|
||||
namespaces = list(db.list_namespaces())
|
||||
assert "test_namespace" in namespaces
|
||||
assert len(namespaces) == 1
|
||||
|
||||
# Create table in namespace
|
||||
schema = pa.schema(
|
||||
[
|
||||
pa.field("id", pa.int64()),
|
||||
pa.field("vector", pa.list_(pa.float32(), 2)),
|
||||
]
|
||||
)
|
||||
table = db.create_table(
|
||||
"test_table", schema=schema, namespace=["test_namespace"]
|
||||
)
|
||||
assert table is not None
|
||||
|
||||
# Verify table exists in namespace
|
||||
tables_in_namespace = list(db.table_names(namespace=["test_namespace"]))
|
||||
assert "test_table" in tables_in_namespace
|
||||
assert len(tables_in_namespace) == 1
|
||||
|
||||
# Open table from namespace
|
||||
table = db.open_table("test_table", namespace=["test_namespace"])
|
||||
assert table is not None
|
||||
assert table.name == "test_table"
|
||||
|
||||
# Drop table from namespace
|
||||
db.drop_table("test_table", namespace=["test_namespace"])
|
||||
|
||||
# Verify table no longer exists in namespace
|
||||
tables_in_namespace = list(db.table_names(namespace=["test_namespace"]))
|
||||
assert len(tables_in_namespace) == 0
|
||||
|
||||
# Drop namespace
|
||||
db.drop_namespace(["test_namespace"])
|
||||
|
||||
# Verify namespace no longer exists
|
||||
namespaces = list(db.list_namespaces())
|
||||
assert len(namespaces) == 0
|
||||
|
||||
def test_namespace_with_tables_cannot_be_dropped(self):
|
||||
"""Test that namespaces containing tables cannot be dropped."""
|
||||
db = lancedb.connect_namespace("temp", {"root": self.temp_dir})
|
||||
|
||||
# Create namespace and table
|
||||
db.create_namespace(["test_namespace"])
|
||||
schema = pa.schema(
|
||||
[
|
||||
pa.field("id", pa.int64()),
|
||||
pa.field("vector", pa.list_(pa.float32(), 2)),
|
||||
]
|
||||
)
|
||||
db.create_table("test_table", schema=schema, namespace=["test_namespace"])
|
||||
|
||||
# Try to drop namespace with tables - should fail
|
||||
with pytest.raises(RuntimeError, match="contains tables"):
|
||||
db.drop_namespace(["test_namespace"])
|
||||
|
||||
# Drop table first
|
||||
db.drop_table("test_table", namespace=["test_namespace"])
|
||||
|
||||
# Now dropping namespace should work
|
||||
db.drop_namespace(["test_namespace"])
|
||||
|
||||
def test_same_table_name_different_namespaces(self):
|
||||
db = lancedb.connect_namespace("temp", {"root": self.temp_dir})
|
||||
|
||||
# Create two namespaces
|
||||
db.create_namespace(["namespace_a"])
|
||||
db.create_namespace(["namespace_b"])
|
||||
|
||||
# Define schema
|
||||
schema = pa.schema(
|
||||
[
|
||||
pa.field("id", pa.int64()),
|
||||
pa.field("vector", pa.list_(pa.float32(), 2)),
|
||||
pa.field("text", pa.string()),
|
||||
]
|
||||
)
|
||||
|
||||
# Create table with same name in both namespaces
|
||||
table_a = db.create_table(
|
||||
"same_name_table", schema=schema, namespace=["namespace_a"]
|
||||
)
|
||||
table_b = db.create_table(
|
||||
"same_name_table", schema=schema, namespace=["namespace_b"]
|
||||
)
|
||||
|
||||
# Add different data to each table
|
||||
data_a = [
|
||||
{"id": 1, "vector": [1.0, 2.0], "text": "data_from_namespace_a"},
|
||||
{"id": 2, "vector": [3.0, 4.0], "text": "also_from_namespace_a"},
|
||||
]
|
||||
table_a.add(data_a)
|
||||
|
||||
data_b = [
|
||||
{"id": 10, "vector": [10.0, 20.0], "text": "data_from_namespace_b"},
|
||||
{"id": 20, "vector": [30.0, 40.0], "text": "also_from_namespace_b"},
|
||||
{"id": 30, "vector": [50.0, 60.0], "text": "more_from_namespace_b"},
|
||||
]
|
||||
table_b.add(data_b)
|
||||
|
||||
# Verify data in namespace_a table
|
||||
opened_table_a = db.open_table("same_name_table", namespace=["namespace_a"])
|
||||
result_a = opened_table_a.to_pandas().sort_values("id").reset_index(drop=True)
|
||||
assert len(result_a) == 2
|
||||
assert result_a["id"].tolist() == [1, 2]
|
||||
assert result_a["text"].tolist() == [
|
||||
"data_from_namespace_a",
|
||||
"also_from_namespace_a",
|
||||
]
|
||||
assert [v.tolist() for v in result_a["vector"]] == [[1.0, 2.0], [3.0, 4.0]]
|
||||
|
||||
# Verify data in namespace_b table
|
||||
opened_table_b = db.open_table("same_name_table", namespace=["namespace_b"])
|
||||
result_b = opened_table_b.to_pandas().sort_values("id").reset_index(drop=True)
|
||||
assert len(result_b) == 3
|
||||
assert result_b["id"].tolist() == [10, 20, 30]
|
||||
assert result_b["text"].tolist() == [
|
||||
"data_from_namespace_b",
|
||||
"also_from_namespace_b",
|
||||
"more_from_namespace_b",
|
||||
]
|
||||
assert [v.tolist() for v in result_b["vector"]] == [
|
||||
[10.0, 20.0],
|
||||
[30.0, 40.0],
|
||||
[50.0, 60.0],
|
||||
]
|
||||
|
||||
# Verify root namespace doesn't have this table
|
||||
root_tables = list(db.table_names())
|
||||
assert "same_name_table" not in root_tables
|
||||
|
||||
# Clean up
|
||||
db.drop_table("same_name_table", namespace=["namespace_a"])
|
||||
db.drop_table("same_name_table", namespace=["namespace_b"])
|
||||
db.drop_namespace(["namespace_a"])
|
||||
db.drop_namespace(["namespace_b"])
|
||||
|
||||
Reference in New Issue
Block a user