diff --git a/.github/workflows/build_linux_wheel/action.yml b/.github/workflows/build_linux_wheel/action.yml index 58fd9fa3..b9603cd0 100644 --- a/.github/workflows/build_linux_wheel/action.yml +++ b/.github/workflows/build_linux_wheel/action.yml @@ -31,6 +31,7 @@ runs: with: command: build working-directory: python + docker-options: "-e PIP_EXTRA_INDEX_URL=https://pypi.fury.io/lancedb/" target: x86_64-unknown-linux-gnu manylinux: ${{ inputs.manylinux }} args: ${{ inputs.args }} diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index f4ab6461..0dc3d57c 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -24,6 +24,7 @@ env: # according to: https://matklad.github.io/2021/09/04/fast-rust-builds.html # CI builds are faster with incremental disabled. CARGO_INCREMENTAL: "0" + PIP_EXTRA_INDEX_URL: "https://pypi.fury.io/lancedb/" jobs: # Single deploy job since we're just deploying diff --git a/.github/workflows/pypi-publish.yml b/.github/workflows/pypi-publish.yml index bfb162fc..6390bc8d 100644 --- a/.github/workflows/pypi-publish.yml +++ b/.github/workflows/pypi-publish.yml @@ -10,6 +10,9 @@ on: - .github/workflows/pypi-publish.yml - Cargo.toml # Change in dependency frequently breaks builds +env: + PIP_EXTRA_INDEX_URL: "https://pypi.fury.io/lancedb/" + jobs: linux: name: Python ${{ matrix.config.platform }} manylinux${{ matrix.config.manylinux }} diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml index a55cb070..4341f990 100644 --- a/.github/workflows/python.yml +++ b/.github/workflows/python.yml @@ -18,6 +18,7 @@ env: # Color output for pytest is off by default. PYTEST_ADDOPTS: "--color=yes" FORCE_COLOR: "1" + PIP_EXTRA_INDEX_URL: "https://pypi.fury.io/lancedb/" jobs: lint: diff --git a/Cargo.lock b/Cargo.lock index b382d69f..bc4492f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3032,8 +3032,8 @@ checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" [[package]] name = "fsst" -version = "0.40.0-beta.2" -source = "git+https://github.com/lancedb/lance.git?tag=v0.40.0-beta.2#9f368faab85fa41799c42e05b07596d060da3ebc" +version = "1.0.0-beta.2" +source = "git+https://github.com/lancedb/lance.git?tag=v1.0.0-beta.2#254a8217ac26666585983aa7ec8c4234f4c3f99f" dependencies = [ "arrow-array", "rand 0.9.2", @@ -4217,8 +4217,8 @@ dependencies = [ [[package]] name = "lance" -version = "0.40.0-beta.2" -source = "git+https://github.com/lancedb/lance.git?tag=v0.40.0-beta.2#9f368faab85fa41799c42e05b07596d060da3ebc" +version = "1.0.0-beta.2" +source = "git+https://github.com/lancedb/lance.git?tag=v1.0.0-beta.2#254a8217ac26666585983aa7ec8c4234f4c3f99f" dependencies = [ "arrow", "arrow-arith", @@ -4282,8 +4282,8 @@ dependencies = [ [[package]] name = "lance-arrow" -version = "0.40.0-beta.2" -source = "git+https://github.com/lancedb/lance.git?tag=v0.40.0-beta.2#9f368faab85fa41799c42e05b07596d060da3ebc" +version = "1.0.0-beta.2" +source = "git+https://github.com/lancedb/lance.git?tag=v1.0.0-beta.2#254a8217ac26666585983aa7ec8c4234f4c3f99f" dependencies = [ "arrow-array", "arrow-buffer", @@ -4301,8 +4301,8 @@ dependencies = [ [[package]] name = "lance-bitpacking" -version = "0.40.0-beta.2" -source = "git+https://github.com/lancedb/lance.git?tag=v0.40.0-beta.2#9f368faab85fa41799c42e05b07596d060da3ebc" +version = "1.0.0-beta.2" +source = "git+https://github.com/lancedb/lance.git?tag=v1.0.0-beta.2#254a8217ac26666585983aa7ec8c4234f4c3f99f" dependencies = [ "arrayref", "paste", @@ -4311,8 +4311,8 @@ dependencies = [ [[package]] name = "lance-core" -version = "0.40.0-beta.2" -source = "git+https://github.com/lancedb/lance.git?tag=v0.40.0-beta.2#9f368faab85fa41799c42e05b07596d060da3ebc" +version = "1.0.0-beta.2" +source = "git+https://github.com/lancedb/lance.git?tag=v1.0.0-beta.2#254a8217ac26666585983aa7ec8c4234f4c3f99f" dependencies = [ "arrow-array", "arrow-buffer", @@ -4348,8 +4348,8 @@ dependencies = [ [[package]] name = "lance-datafusion" -version = "0.40.0-beta.2" -source = "git+https://github.com/lancedb/lance.git?tag=v0.40.0-beta.2#9f368faab85fa41799c42e05b07596d060da3ebc" +version = "1.0.0-beta.2" +source = "git+https://github.com/lancedb/lance.git?tag=v1.0.0-beta.2#254a8217ac26666585983aa7ec8c4234f4c3f99f" dependencies = [ "arrow", "arrow-array", @@ -4378,8 +4378,8 @@ dependencies = [ [[package]] name = "lance-datagen" -version = "0.40.0-beta.2" -source = "git+https://github.com/lancedb/lance.git?tag=v0.40.0-beta.2#9f368faab85fa41799c42e05b07596d060da3ebc" +version = "1.0.0-beta.2" +source = "git+https://github.com/lancedb/lance.git?tag=v1.0.0-beta.2#254a8217ac26666585983aa7ec8c4234f4c3f99f" dependencies = [ "arrow", "arrow-array", @@ -4396,8 +4396,8 @@ dependencies = [ [[package]] name = "lance-encoding" -version = "0.40.0-beta.2" -source = "git+https://github.com/lancedb/lance.git?tag=v0.40.0-beta.2#9f368faab85fa41799c42e05b07596d060da3ebc" +version = "1.0.0-beta.2" +source = "git+https://github.com/lancedb/lance.git?tag=v1.0.0-beta.2#254a8217ac26666585983aa7ec8c4234f4c3f99f" dependencies = [ "arrow-arith", "arrow-array", @@ -4434,8 +4434,8 @@ dependencies = [ [[package]] name = "lance-file" -version = "0.40.0-beta.2" -source = "git+https://github.com/lancedb/lance.git?tag=v0.40.0-beta.2#9f368faab85fa41799c42e05b07596d060da3ebc" +version = "1.0.0-beta.2" +source = "git+https://github.com/lancedb/lance.git?tag=v1.0.0-beta.2#254a8217ac26666585983aa7ec8c4234f4c3f99f" dependencies = [ "arrow-arith", "arrow-array", @@ -4467,8 +4467,8 @@ dependencies = [ [[package]] name = "lance-index" -version = "0.40.0-beta.2" -source = "git+https://github.com/lancedb/lance.git?tag=v0.40.0-beta.2#9f368faab85fa41799c42e05b07596d060da3ebc" +version = "1.0.0-beta.2" +source = "git+https://github.com/lancedb/lance.git?tag=v1.0.0-beta.2#254a8217ac26666585983aa7ec8c4234f4c3f99f" dependencies = [ "arrow", "arrow-arith", @@ -4529,8 +4529,8 @@ dependencies = [ [[package]] name = "lance-io" -version = "0.40.0-beta.2" -source = "git+https://github.com/lancedb/lance.git?tag=v0.40.0-beta.2#9f368faab85fa41799c42e05b07596d060da3ebc" +version = "1.0.0-beta.2" +source = "git+https://github.com/lancedb/lance.git?tag=v1.0.0-beta.2#254a8217ac26666585983aa7ec8c4234f4c3f99f" dependencies = [ "arrow", "arrow-arith", @@ -4570,8 +4570,8 @@ dependencies = [ [[package]] name = "lance-linalg" -version = "0.40.0-beta.2" -source = "git+https://github.com/lancedb/lance.git?tag=v0.40.0-beta.2#9f368faab85fa41799c42e05b07596d060da3ebc" +version = "1.0.0-beta.2" +source = "git+https://github.com/lancedb/lance.git?tag=v1.0.0-beta.2#254a8217ac26666585983aa7ec8c4234f4c3f99f" dependencies = [ "arrow-array", "arrow-buffer", @@ -4587,8 +4587,8 @@ dependencies = [ [[package]] name = "lance-namespace" -version = "0.40.0-beta.2" -source = "git+https://github.com/lancedb/lance.git?tag=v0.40.0-beta.2#9f368faab85fa41799c42e05b07596d060da3ebc" +version = "1.0.0-beta.2" +source = "git+https://github.com/lancedb/lance.git?tag=v1.0.0-beta.2#254a8217ac26666585983aa7ec8c4234f4c3f99f" dependencies = [ "arrow", "async-trait", @@ -4600,22 +4600,27 @@ dependencies = [ [[package]] name = "lance-namespace-impls" -version = "0.40.0-beta.2" -source = "git+https://github.com/lancedb/lance.git?tag=v0.40.0-beta.2#9f368faab85fa41799c42e05b07596d060da3ebc" +version = "1.0.0-beta.2" +source = "git+https://github.com/lancedb/lance.git?tag=v1.0.0-beta.2#254a8217ac26666585983aa7ec8c4234f4c3f99f" dependencies = [ "arrow", "arrow-ipc", "arrow-schema", "async-trait", "bytes", + "futures", "lance", "lance-core", + "lance-index", "lance-io", "lance-namespace", + "log", "object_store", + "rand 0.9.2", "reqwest", "serde_json", "snafu", + "tokio", "url", ] @@ -4634,8 +4639,8 @@ dependencies = [ [[package]] name = "lance-table" -version = "0.40.0-beta.2" -source = "git+https://github.com/lancedb/lance.git?tag=v0.40.0-beta.2#9f368faab85fa41799c42e05b07596d060da3ebc" +version = "1.0.0-beta.2" +source = "git+https://github.com/lancedb/lance.git?tag=v1.0.0-beta.2#254a8217ac26666585983aa7ec8c4234f4c3f99f" dependencies = [ "arrow", "arrow-array", @@ -4674,8 +4679,8 @@ dependencies = [ [[package]] name = "lance-testing" -version = "0.40.0-beta.2" -source = "git+https://github.com/lancedb/lance.git?tag=v0.40.0-beta.2#9f368faab85fa41799c42e05b07596d060da3ebc" +version = "1.0.0-beta.2" +source = "git+https://github.com/lancedb/lance.git?tag=v1.0.0-beta.2#254a8217ac26666585983aa7ec8c4234f4c3f99f" dependencies = [ "arrow-array", "arrow-schema", @@ -4807,11 +4812,14 @@ dependencies = [ "async-trait", "env_logger", "futures", + "lance-core", + "lance-io", "lancedb", "pin-project", "pyo3", "pyo3-async-runtimes", "pyo3-build-config", + "snafu", "tokio", ] @@ -8408,6 +8416,7 @@ dependencies = [ "io-uring", "libc", "mio", + "parking_lot", "pin-project-lite", "signal-hook-registry", "slab", diff --git a/Cargo.toml b/Cargo.toml index 55c3aba5..5a10d4c6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,20 +15,20 @@ categories = ["database-implementations"] rust-version = "1.78.0" [workspace.dependencies] -lance = { "version" = "=0.40.0-beta.2", default-features = false, "tag" = "v0.40.0-beta.2", "git" = "https://github.com/lancedb/lance.git" } -lance-core = { "version" = "=0.40.0-beta.2", "tag" = "v0.40.0-beta.2", "git" = "https://github.com/lancedb/lance.git" } -lance-datagen = { "version" = "=0.40.0-beta.2", "tag" = "v0.40.0-beta.2", "git" = "https://github.com/lancedb/lance.git" } -lance-file = { "version" = "=0.40.0-beta.2", "tag" = "v0.40.0-beta.2", "git" = "https://github.com/lancedb/lance.git" } -lance-io = { "version" = "=0.40.0-beta.2", default-features = false, "tag" = "v0.40.0-beta.2", "git" = "https://github.com/lancedb/lance.git" } -lance-index = { "version" = "=0.40.0-beta.2", "tag" = "v0.40.0-beta.2", "git" = "https://github.com/lancedb/lance.git" } -lance-linalg = { "version" = "=0.40.0-beta.2", "tag" = "v0.40.0-beta.2", "git" = "https://github.com/lancedb/lance.git" } -lance-namespace = { "version" = "=0.40.0-beta.2", "tag" = "v0.40.0-beta.2", "git" = "https://github.com/lancedb/lance.git" } -lance-namespace-impls = { "version" = "=0.40.0-beta.2", "features" = ["dir-aws", "dir-gcp", "dir-azure", "dir-oss", "rest"], "tag" = "v0.40.0-beta.2", "git" = "https://github.com/lancedb/lance.git" } -lance-table = { "version" = "=0.40.0-beta.2", "tag" = "v0.40.0-beta.2", "git" = "https://github.com/lancedb/lance.git" } -lance-testing = { "version" = "=0.40.0-beta.2", "tag" = "v0.40.0-beta.2", "git" = "https://github.com/lancedb/lance.git" } -lance-datafusion = { "version" = "=0.40.0-beta.2", "tag" = "v0.40.0-beta.2", "git" = "https://github.com/lancedb/lance.git" } -lance-encoding = { "version" = "=0.40.0-beta.2", "tag" = "v0.40.0-beta.2", "git" = "https://github.com/lancedb/lance.git" } -lance-arrow = { "version" = "=0.40.0-beta.2", "tag" = "v0.40.0-beta.2", "git" = "https://github.com/lancedb/lance.git" } +lance = { "version" = "=1.0.0-beta.2", default-features = false, "tag" = "v1.0.0-beta.2", "git" = "https://github.com/lancedb/lance.git" } +lance-core = { "version" = "=1.0.0-beta.2", "tag" = "v1.0.0-beta.2", "git" = "https://github.com/lancedb/lance.git" } +lance-datagen = { "version" = "=1.0.0-beta.2", "tag" = "v1.0.0-beta.2", "git" = "https://github.com/lancedb/lance.git" } +lance-file = { "version" = "=1.0.0-beta.2", "tag" = "v1.0.0-beta.2", "git" = "https://github.com/lancedb/lance.git" } +lance-io = { "version" = "=1.0.0-beta.2", default-features = false, "tag" = "v1.0.0-beta.2", "git" = "https://github.com/lancedb/lance.git" } +lance-index = { "version" = "=1.0.0-beta.2", "tag" = "v1.0.0-beta.2", "git" = "https://github.com/lancedb/lance.git" } +lance-linalg = { "version" = "=1.0.0-beta.2", "tag" = "v1.0.0-beta.2", "git" = "https://github.com/lancedb/lance.git" } +lance-namespace = { "version" = "=1.0.0-beta.2", "tag" = "v1.0.0-beta.2", "git" = "https://github.com/lancedb/lance.git" } +lance-namespace-impls = { "version" = "=1.0.0-beta.2", "features" = ["dir-aws", "dir-gcp", "dir-azure", "dir-oss", "rest"], "tag" = "v1.0.0-beta.2", "git" = "https://github.com/lancedb/lance.git" } +lance-table = { "version" = "=1.0.0-beta.2", "tag" = "v1.0.0-beta.2", "git" = "https://github.com/lancedb/lance.git" } +lance-testing = { "version" = "=1.0.0-beta.2", "tag" = "v1.0.0-beta.2", "git" = "https://github.com/lancedb/lance.git" } +lance-datafusion = { "version" = "=1.0.0-beta.2", "tag" = "v1.0.0-beta.2", "git" = "https://github.com/lancedb/lance.git" } +lance-encoding = { "version" = "=1.0.0-beta.2", "tag" = "v1.0.0-beta.2", "git" = "https://github.com/lancedb/lance.git" } +lance-arrow = { "version" = "=1.0.0-beta.2", "tag" = "v1.0.0-beta.2", "git" = "https://github.com/lancedb/lance.git" } ahash = "0.8" # Note that this one does not include pyarrow arrow = { version = "56.2", optional = false } diff --git a/python/Cargo.toml b/python/Cargo.toml index b267ab2f..84d8bcc9 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -17,6 +17,8 @@ crate-type = ["cdylib"] arrow = { version = "56.2", features = ["pyarrow"] } async-trait = "0.1" lancedb = { path = "../rust/lancedb", default-features = false } +lance-core.workspace = true +lance-io.workspace = true env_logger.workspace = true pyo3 = { version = "0.25", features = ["extension-module", "abi3-py39"] } pyo3-async-runtimes = { version = "0.25", features = [ @@ -25,6 +27,7 @@ pyo3-async-runtimes = { version = "0.25", features = [ ] } pin-project = "1.1.5" futures.workspace = true +snafu.workspace = true tokio = { version = "1.40", features = ["sync"] } [build-dependencies] diff --git a/python/pyproject.toml b/python/pyproject.toml index 118b23dc..2d311956 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -10,7 +10,7 @@ dependencies = [ "pyarrow>=16", "pydantic>=1.10", "tqdm>=4.27.0", - "lance-namespace>=0.0.16" + "lance-namespace>=0.0.21" ] description = "lancedb" authors = [{ name = "LanceDB Devs", email = "dev@lancedb.com" }] @@ -59,7 +59,7 @@ tests = [ "polars>=0.19, <=1.3.0", "tantivy", "pyarrow-stubs", - "pylance>=0.25", + "pylance>=1.0.0b2", "requests", "datafusion", ] diff --git a/python/python/lancedb/__init__.py b/python/python/lancedb/__init__.py index de68e085..98a28724 100644 --- a/python/python/lancedb/__init__.py +++ b/python/python/lancedb/__init__.py @@ -14,6 +14,7 @@ __version__ = importlib.metadata.version("lancedb") from ._lancedb import connect as lancedb_connect from .common import URI, sanitize_uri from .db import AsyncConnection, DBConnection, LanceDBConnection +from .io import StorageOptionsProvider from .remote import ClientConfig from .remote.db import RemoteDBConnection from .schema import vector @@ -233,6 +234,7 @@ __all__ = [ "LanceNamespaceDBConnection", "RemoteDBConnection", "Session", + "StorageOptionsProvider", "Table", "__version__", ] diff --git a/python/python/lancedb/_lancedb.pyi b/python/python/lancedb/_lancedb.pyi index 54843a76..d5b39c9b 100644 --- a/python/python/lancedb/_lancedb.pyi +++ b/python/python/lancedb/_lancedb.pyi @@ -4,6 +4,7 @@ from typing import Dict, List, Optional, Tuple, Any, TypedDict, Union, Literal import pyarrow as pa from .index import BTree, IvfFlat, IvfPq, Bitmap, LabelList, HnswPq, HnswSq, FTS +from .io import StorageOptionsProvider from .remote import ClientConfig class Session: @@ -44,6 +45,8 @@ class Connection(object): data: pa.RecordBatchReader, namespace: List[str] = [], storage_options: Optional[Dict[str, str]] = None, + storage_options_provider: Optional[StorageOptionsProvider] = None, + location: Optional[str] = None, ) -> Table: ... async def create_empty_table( self, @@ -52,13 +55,17 @@ class Connection(object): schema: pa.Schema, namespace: List[str] = [], storage_options: Optional[Dict[str, str]] = None, + storage_options_provider: Optional[StorageOptionsProvider] = None, + location: Optional[str] = None, ) -> Table: ... async def open_table( self, name: str, namespace: List[str] = [], storage_options: Optional[Dict[str, str]] = None, + storage_options_provider: Optional[StorageOptionsProvider] = None, index_cache_size: Optional[int] = None, + location: Optional[str] = None, ) -> Table: ... async def clone_table( self, diff --git a/python/python/lancedb/db.py b/python/python/lancedb/db.py index dac5b1cd..a4430b22 100644 --- a/python/python/lancedb/db.py +++ b/python/python/lancedb/db.py @@ -45,6 +45,7 @@ if TYPE_CHECKING: from ._lancedb import Connection as LanceDbConnection from .common import DATA, URI from .embeddings import EmbeddingFunctionConfig + from .io import StorageOptionsProvider from ._lancedb import Session @@ -143,6 +144,7 @@ class DBConnection(EnforceOverrides): *, namespace: List[str] = [], storage_options: Optional[Dict[str, str]] = None, + storage_options_provider: Optional["StorageOptionsProvider"] = None, data_storage_version: Optional[str] = None, enable_v2_manifest_paths: Optional[bool] = None, ) -> Table: @@ -308,6 +310,7 @@ class DBConnection(EnforceOverrides): *, namespace: List[str] = [], storage_options: Optional[Dict[str, str]] = None, + storage_options_provider: Optional["StorageOptionsProvider"] = None, index_cache_size: Optional[int] = None, ) -> Table: """Open a Lance Table in the database. @@ -463,6 +466,12 @@ class LanceDBConnection(DBConnection): is_local = isinstance(uri, Path) or scheme == "file" if is_local: if isinstance(uri, str): + # Strip file:// or file:/ scheme if present + # file:///path becomes file:/path after URL normalization + if uri.startswith("file://"): + uri = uri[7:] # Remove "file://" + elif uri.startswith("file:/"): + uri = uri[5:] # Remove "file:" uri = Path(uri) uri = uri.expanduser().absolute() Path(uri).mkdir(parents=True, exist_ok=True) @@ -625,6 +634,7 @@ class LanceDBConnection(DBConnection): *, namespace: List[str] = [], storage_options: Optional[Dict[str, str]] = None, + storage_options_provider: Optional["StorageOptionsProvider"] = None, data_storage_version: Optional[str] = None, enable_v2_manifest_paths: Optional[bool] = None, ) -> LanceTable: @@ -655,6 +665,7 @@ class LanceDBConnection(DBConnection): embedding_functions=embedding_functions, namespace=namespace, storage_options=storage_options, + storage_options_provider=storage_options_provider, ) return tbl @@ -665,6 +676,7 @@ class LanceDBConnection(DBConnection): *, namespace: List[str] = [], storage_options: Optional[Dict[str, str]] = None, + storage_options_provider: Optional["StorageOptionsProvider"] = None, index_cache_size: Optional[int] = None, ) -> LanceTable: """Open a table in the database. @@ -696,6 +708,7 @@ class LanceDBConnection(DBConnection): name, namespace=namespace, storage_options=storage_options, + storage_options_provider=storage_options_provider, index_cache_size=index_cache_size, ) @@ -977,9 +990,11 @@ class AsyncConnection(object): on_bad_vectors: Optional[str] = None, fill_value: Optional[float] = None, storage_options: Optional[Dict[str, str]] = None, + storage_options_provider: Optional["StorageOptionsProvider"] = None, *, namespace: List[str] = [], embedding_functions: Optional[List[EmbeddingFunctionConfig]] = None, + location: Optional[str] = None, ) -> AsyncTable: """Create an [AsyncTable][lancedb.table.AsyncTable] in the database. @@ -1170,6 +1185,8 @@ class AsyncConnection(object): schema, namespace=namespace, storage_options=storage_options, + storage_options_provider=storage_options_provider, + location=location, ) else: data = data_to_reader(data, schema) @@ -1179,6 +1196,8 @@ class AsyncConnection(object): data, namespace=namespace, storage_options=storage_options, + storage_options_provider=storage_options_provider, + location=location, ) return AsyncTable(new_table) @@ -1189,7 +1208,9 @@ class AsyncConnection(object): *, namespace: List[str] = [], storage_options: Optional[Dict[str, str]] = None, + storage_options_provider: Optional["StorageOptionsProvider"] = None, index_cache_size: Optional[int] = None, + location: Optional[str] = None, ) -> AsyncTable: """Open a Lance Table in the database. @@ -1218,6 +1239,10 @@ class AsyncConnection(object): This cache applies to the entire opened table, across all indices. Setting this value higher will increase performance on larger datasets at the expense of more RAM + location: str, optional + The explicit location (URI) of the table. If provided, the table will be + opened from this location instead of deriving it from the database URI + and table name. Returns ------- @@ -1227,7 +1252,9 @@ class AsyncConnection(object): name, namespace=namespace, storage_options=storage_options, + storage_options_provider=storage_options_provider, index_cache_size=index_cache_size, + location=location, ) return AsyncTable(table) diff --git a/python/python/lancedb/io.py b/python/python/lancedb/io.py new file mode 100644 index 00000000..09b4c08d --- /dev/null +++ b/python/python/lancedb/io.py @@ -0,0 +1,71 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright The LanceDB Authors + +"""I/O utilities and interfaces for LanceDB.""" + +from abc import ABC, abstractmethod +from typing import Dict + + +class StorageOptionsProvider(ABC): + """Abstract base class for providing storage options to LanceDB tables. + + Storage options providers enable automatic credential refresh for cloud + storage backends (e.g., AWS S3, Azure Blob Storage, GCS). When credentials + have an expiration time, the provider's fetch_storage_options() method will + be called periodically to get fresh credentials before they expire. + + Example + ------- + >>> class MyProvider(StorageOptionsProvider): + ... def fetch_storage_options(self) -> Dict[str, str]: + ... # Fetch fresh credentials from your credential manager + ... return { + ... "aws_access_key_id": "...", + ... "aws_secret_access_key": "...", + ... "expires_at_millis": "1234567890000" # Optional + ... } + """ + + @abstractmethod + def fetch_storage_options(self) -> Dict[str, str]: + """Fetch fresh storage credentials. + + This method is called by LanceDB when credentials need to be refreshed. + If the returned dictionary contains an "expires_at_millis" key with a + Unix timestamp in milliseconds, LanceDB will automatically refresh the + credentials before that time. If the key is not present, credentials + are assumed to not expire. + + Returns + ------- + Dict[str, str] + Dictionary containing cloud storage credentials and optionally an + expiration time: + - "expires_at_millis" (optional): Unix timestamp in milliseconds when + credentials expire + - Provider-specific credential keys (e.g., aws_access_key_id, + aws_secret_access_key, etc.) + + Raises + ------ + RuntimeError + If credentials cannot be fetched or are invalid + """ + pass + + def provider_id(self) -> str: + """Return a human-readable unique identifier for this provider instance. + + This identifier is used for caching and equality comparison. Two providers + with the same ID will share the same cached object store connection. + + The default implementation uses the class name and string representation. + Override this method if you need custom identification logic. + + Returns + ------- + str + A unique identifier for this provider instance + """ + return f"{self.__class__.__name__} {{ repr: {str(self)!r} }}" diff --git a/python/python/lancedb/namespace.py b/python/python/lancedb/namespace.py index 41968fff..2085c90a 100644 --- a/python/python/lancedb/namespace.py +++ b/python/python/lancedb/namespace.py @@ -10,42 +10,40 @@ through a namespace abstraction. from __future__ import annotations -from typing import Dict, Iterable, List, Optional, Union -import os import sys +from typing import Dict, Iterable, List, Optional, Union if sys.version_info >= (3, 12): from typing import override else: from overrides import override -from lancedb.db import DBConnection +from datetime import timedelta +import pyarrow as pa + +from lancedb.db import DBConnection, LanceDBConnection +from lancedb.io import StorageOptionsProvider from lancedb.table import LanceTable, Table from lancedb.util import validate_table_name -from lancedb.common import validate_schema -from lancedb.table import sanitize_create_table +from lancedb.common import DATA +from lancedb.pydantic import LanceModel +from lancedb.embeddings import EmbeddingFunctionConfig +from ._lancedb import Session from lance_namespace import LanceNamespace, connect as namespace_connect from lance_namespace_urllib3_client.models import ( ListTablesRequest, DescribeTableRequest, - CreateTableRequest, DropTableRequest, ListNamespacesRequest, CreateNamespaceRequest, DropNamespaceRequest, + CreateEmptyTableRequest, JsonArrowSchema, JsonArrowField, JsonArrowDataType, ) -import pyarrow as pa -from datetime import timedelta -from lancedb.pydantic import LanceModel -from lancedb.common import DATA -from lancedb.embeddings import EmbeddingFunctionConfig -from ._lancedb import Session - def _convert_pyarrow_type_to_json(arrow_type: pa.DataType) -> JsonArrowDataType: """Convert PyArrow DataType to JsonArrowDataType.""" @@ -104,6 +102,89 @@ def _convert_pyarrow_schema_to_json(schema: pa.Schema) -> JsonArrowSchema: return JsonArrowSchema(fields=fields, metadata=schema.metadata) +class LanceNamespaceStorageOptionsProvider(StorageOptionsProvider): + """Storage options provider that fetches storage options from a LanceNamespace. + + This provider automatically fetches fresh storage options by calling the + namespace's describe_table() method, which returns both the table location + and time-limited storage options. This enables automatic credential refresh + for tables accessed through namespace connections. + + Parameters + ---------- + namespace : LanceNamespace + The namespace instance with a describe_table() method + table_id : List[str] + The table identifier (namespace path + table name) + + Examples + -------- + >>> from lance_namespace import connect as namespace_connect + >>> namespace = namespace_connect("rest", {"url": "https://..."}) + >>> provider = LanceNamespaceStorageOptionsProvider( + ... namespace=namespace, + ... table_id=["my_namespace", "my_table"] + ... ) + >>> options = provider.fetch_storage_options() + """ + + def __init__(self, namespace: LanceNamespace, table_id: List[str]): + """Initialize with namespace and table ID. + + Parameters + ---------- + namespace : LanceNamespace + The namespace instance with a describe_table() method + table_id : List[str] + The table identifier + """ + self._namespace = namespace + self._table_id = table_id + + def fetch_storage_options(self) -> Dict[str, str]: + """Fetch storage options from the namespace. + + This calls namespace.describe_table() to get the latest storage options + and their expiration time. + + Returns + ------- + Dict[str, str] + Flat dictionary of string key-value pairs containing storage options. + May include "expires_at_millis" key for automatic refresh. + + Raises + ------ + RuntimeError + If namespace does not return storage_options + """ + request = DescribeTableRequest(id=self._table_id, version=None) + response = self._namespace.describe_table(request) + storage_options = response.storage_options + if storage_options is None: + raise RuntimeError( + "Namespace did not return storage_options. " + "Ensure the namespace supports storage options providing." + ) + + # Return the storage_options directly - it's already a flat Map + return storage_options + + def provider_id(self) -> str: + """Return a human-readable unique identifier for this provider instance.""" + # Try to call namespace_id() if available (lance-namespace >= 0.0.20) + if hasattr(self._namespace, "namespace_id"): + namespace_id = self._namespace.namespace_id() + else: + # Fallback for older namespace versions + namespace_id = str(self._namespace) + + return ( + f"LanceNamespaceStorageOptionsProvider {{ " + f"namespace: {namespace_id}, table_id: {self._table_id!r} }}" + ) + + class LanceNamespaceDBConnection(DBConnection): """ A LanceDB connection that uses a namespace for table management. @@ -166,6 +247,7 @@ class LanceNamespaceDBConnection(DBConnection): *, namespace: List[str] = [], storage_options: Optional[Dict[str, str]] = None, + storage_options_provider: Optional[StorageOptionsProvider] = None, data_storage_version: Optional[str] = None, enable_v2_manifest_paths: Optional[bool] = None, ) -> Table: @@ -173,48 +255,84 @@ class LanceNamespaceDBConnection(DBConnection): raise ValueError("mode must be either 'create' or 'overwrite'") validate_table_name(name) - # TODO: support passing data - if data is not None: - raise ValueError( - "create_table currently only supports creating empty tables (data=None)" + # Get location from namespace + table_id = namespace + [name] + + # Step 1: Get the table location and storage options from namespace + # In overwrite mode, if table exists, use describe_table to get + # existing location. Otherwise, call create_empty_table to reserve + # a new location + location = None + namespace_storage_options = None + if mode.lower() == "overwrite": + # Try to describe the table first to see if it exists + try: + describe_request = DescribeTableRequest(id=table_id) + describe_response = self._ns.describe_table(describe_request) + location = describe_response.location + namespace_storage_options = describe_response.storage_options + except Exception: + # Table doesn't exist, will create a new one below + pass + + if location is None: + # Table doesn't exist or mode is "create", reserve a new location + create_empty_request = CreateEmptyTableRequest( + id=table_id, + location=None, + properties=self.storage_options if self.storage_options else None, + ) + create_empty_response = self._ns.create_empty_table(create_empty_request) + + if not create_empty_response.location: + raise ValueError( + "Table location is missing from create_empty_table response" + ) + + location = create_empty_response.location + namespace_storage_options = create_empty_response.storage_options + + # Merge storage options: self.storage_options < user options < namespace options + merged_storage_options = dict(self.storage_options) + if storage_options: + merged_storage_options.update(storage_options) + if namespace_storage_options: + merged_storage_options.update(namespace_storage_options) + + # Step 2: Create table using LanceTable.create with the location + # We need a temporary connection for the LanceTable.create method + temp_conn = LanceDBConnection( + location, # Use the actual location as the connection URI + read_consistency_interval=self.read_consistency_interval, + storage_options=merged_storage_options, + session=self.session, + ) + + # Create a storage options provider if not provided by user + # Only create if namespace returned storage_options (not None) + if storage_options_provider is None and namespace_storage_options is not None: + storage_options_provider = LanceNamespaceStorageOptionsProvider( + namespace=self._ns, + table_id=table_id, ) - # Prepare schema - metadata = None - if embedding_functions is not None: - from lancedb.embeddings.registry import EmbeddingFunctionRegistry - - registry = EmbeddingFunctionRegistry.get_instance() - metadata = registry.get_table_metadata(embedding_functions) - - data, schema = sanitize_create_table( - data, schema, metadata, on_bad_vectors, fill_value + tbl = LanceTable.create( + temp_conn, + name, + data, + schema, + mode=mode, + exist_ok=exist_ok, + on_bad_vectors=on_bad_vectors, + fill_value=fill_value, + embedding_functions=embedding_functions, + namespace=namespace, + storage_options=merged_storage_options, + storage_options_provider=storage_options_provider, + location=location, ) - validate_schema(schema) - # Convert PyArrow schema to JsonArrowSchema - json_schema = _convert_pyarrow_schema_to_json(schema) - - # Create table request with namespace - table_id = namespace + [name] - request = CreateTableRequest(id=table_id, var_schema=json_schema) - - # Create empty Arrow IPC stream bytes - import pyarrow.ipc as ipc - import io - - empty_table = pa.Table.from_arrays( - [pa.array([], type=field.type) for field in schema], schema=schema - ) - buffer = io.BytesIO() - with ipc.new_stream(buffer, schema) as writer: - writer.write_table(empty_table) - request_data = buffer.getvalue() - - self._ns.create_table(request, request_data) - return self.open_table( - name, namespace=namespace, storage_options=storage_options - ) + return tbl @override def open_table( @@ -223,21 +341,34 @@ class LanceNamespaceDBConnection(DBConnection): *, namespace: List[str] = [], storage_options: Optional[Dict[str, str]] = None, + storage_options_provider: Optional[StorageOptionsProvider] = None, index_cache_size: Optional[int] = None, ) -> Table: table_id = namespace + [name] request = DescribeTableRequest(id=table_id) response = self._ns.describe_table(request) - merged_storage_options = dict() + # Merge storage options: self.storage_options < user options < namespace options + merged_storage_options = dict(self.storage_options) if storage_options: merged_storage_options.update(storage_options) if response.storage_options: merged_storage_options.update(response.storage_options) + # Create a storage options provider if not provided by user + # Only create if namespace returned storage_options (not None) + if storage_options_provider is None and response.storage_options is not None: + storage_options_provider = LanceNamespaceStorageOptionsProvider( + namespace=self._ns, + table_id=table_id, + ) + return self._lance_table_from_uri( + name, response.location, + namespace=namespace, storage_options=merged_storage_options, + storage_options_provider=storage_options_provider, index_cache_size=index_cache_size, ) @@ -330,33 +461,32 @@ class LanceNamespaceDBConnection(DBConnection): def _lance_table_from_uri( self, + name: str, table_uri: str, *, + namespace: List[str] = [], storage_options: Optional[Dict[str, str]] = None, + storage_options_provider: Optional[StorageOptionsProvider] = None, index_cache_size: Optional[int] = None, ) -> LanceTable: - # Extract the base path and table name from the URI - if table_uri.endswith(".lance"): - base_path = os.path.dirname(table_uri) - table_name = os.path.basename(table_uri)[:-6] # Remove .lance - else: - raise ValueError(f"Invalid table URI: {table_uri}") - - from lancedb.db import LanceDBConnection - + # Open a table directly from a URI using the location parameter + # Note: storage_options should already be merged by the caller temp_conn = LanceDBConnection( - base_path, + table_uri, # Use the table location as the connection URI read_consistency_interval=self.read_consistency_interval, - storage_options={**self.storage_options, **(storage_options or {})}, + storage_options=storage_options if storage_options is not None else {}, session=self.session, ) - # Open the table using the temporary connection + # Open the table using the temporary connection with the location parameter return LanceTable.open( temp_conn, - table_name, + name, + namespace=namespace, storage_options=storage_options, + storage_options_provider=storage_options_provider, index_cache_size=index_cache_size, + location=table_uri, ) diff --git a/python/python/lancedb/table.py b/python/python/lancedb/table.py index 4b595099..5bdade6d 100644 --- a/python/python/lancedb/table.py +++ b/python/python/lancedb/table.py @@ -75,6 +75,7 @@ from .index import lang_mapping if TYPE_CHECKING: from .db import LanceDBConnection + from .io import StorageOptionsProvider from ._lancedb import ( Table as LanceDBTable, OptimizeStats, @@ -1709,7 +1710,9 @@ class LanceTable(Table): *, namespace: List[str] = [], storage_options: Optional[Dict[str, str]] = None, + storage_options_provider: Optional["StorageOptionsProvider"] = None, index_cache_size: Optional[int] = None, + location: Optional[str] = None, _async: AsyncTable = None, ): self._conn = connection @@ -1722,7 +1725,9 @@ class LanceTable(Table): name, namespace=namespace, storage_options=storage_options, + storage_options_provider=storage_options_provider, index_cache_size=index_cache_size, + location=location, ) ) @@ -1730,6 +1735,18 @@ class LanceTable(Table): def name(self) -> str: return self._table.name + @property + def namespace(self) -> List[str]: + """Return the namespace path of the table.""" + return self._namespace + + @property + def id(self) -> str: + """Return the full identifier of the table (namespace$name).""" + if self._namespace: + return "$".join(self._namespace + [self.name]) + return self.name + @classmethod def from_inner(cls, tbl: LanceDBTable): from .db import LanceDBConnection @@ -1743,8 +1760,26 @@ class LanceTable(Table): ) @classmethod - def open(cls, db, name, *, namespace: List[str] = [], **kwargs): - tbl = cls(db, name, namespace=namespace, **kwargs) + def open( + cls, + db, + name, + *, + namespace: List[str] = [], + storage_options: Optional[Dict[str, str]] = None, + storage_options_provider: Optional["StorageOptionsProvider"] = None, + index_cache_size: Optional[int] = None, + location: Optional[str] = None, + ): + tbl = cls( + db, + name, + namespace=namespace, + storage_options=storage_options, + storage_options_provider=storage_options_provider, + index_cache_size=index_cache_size, + location=location, + ) # check the dataset exists try: @@ -2585,8 +2620,10 @@ class LanceTable(Table): *, namespace: List[str] = [], storage_options: Optional[Dict[str, str | bool]] = None, + storage_options_provider: Optional["StorageOptionsProvider"] = None, data_storage_version: Optional[str] = None, enable_v2_manifest_paths: Optional[bool] = None, + location: Optional[str] = None, ): """ Create a new table. @@ -2678,6 +2715,8 @@ class LanceTable(Table): embedding_functions=embedding_functions, namespace=namespace, storage_options=storage_options, + storage_options_provider=storage_options_provider, + location=location, ) ) return self diff --git a/python/python/tests/test_db.py b/python/python/tests/test_db.py index 1f0e9d21..f3d002cf 100644 --- a/python/python/tests/test_db.py +++ b/python/python/tests/test_db.py @@ -781,58 +781,6 @@ def test_local_drop_namespace_not_supported(tmp_path): db.drop_namespace(["test_namespace"]) -def test_local_table_operations_with_namespace_raise_error(tmp_path): - """ - Test that table operations with namespace parameter - raise ValueError in local mode. - """ - db = lancedb.connect(tmp_path) - - # Create some test data - data = [{"vector": [1.0, 2.0], "item": "test"}] - schema = pa.schema( - [pa.field("vector", pa.list_(pa.float32(), 2)), pa.field("item", pa.string())] - ) - - # Test create_table with namespace - should raise ValueError - with pytest.raises( - NotImplementedError, - match="Namespace parameter is not supported for listing database", - ): - db.create_table( - "test_table_with_ns", data=data, schema=schema, namespace=["test_ns"] - ) - - # Create table normally for other tests - db.create_table("test_table", data=data, schema=schema) - assert "test_table" in db.table_names() - - # Test open_table with namespace - should raise ValueError - with pytest.raises( - NotImplementedError, - match="Namespace parameter is not supported for listing database", - ): - db.open_table("test_table", namespace=["test_ns"]) - - # Test table_names with namespace - should raise ValueError - with pytest.raises( - NotImplementedError, - match="Namespace parameter is not supported for listing database", - ): - list(db.table_names(namespace=["test_ns"])) - - # Test drop_table with namespace - should raise ValueError - with pytest.raises( - NotImplementedError, - match="Namespace parameter is not supported for listing database", - ): - db.drop_table("test_table", namespace=["test_ns"]) - - # Test table_names without namespace - should work normally - tables_root = list(db.table_names()) - assert "test_table" in tables_root - - def test_clone_table_latest_version(tmp_path): """Test cloning a table with the latest version (default behavior)""" import os diff --git a/python/python/tests/test_namespace.py b/python/python/tests/test_namespace.py index f7bb83c1..0fee1ab0 100644 --- a/python/python/tests/test_namespace.py +++ b/python/python/tests/test_namespace.py @@ -5,352 +5,39 @@ import tempfile import shutil -from typing import Dict, Optional import pytest import pyarrow as pa import lancedb -from lance_namespace.namespace import NATIVE_IMPLS, LanceNamespace -from lance_namespace_urllib3_client.models import ( - ListTablesRequest, - ListTablesResponse, - DescribeTableRequest, - DescribeTableResponse, - RegisterTableRequest, - RegisterTableResponse, - DeregisterTableRequest, - DeregisterTableResponse, - CreateTableRequest, - CreateTableResponse, - DropTableRequest, - DropTableResponse, - ListNamespacesRequest, - ListNamespacesResponse, - CreateNamespaceRequest, - CreateNamespaceResponse, - DropNamespaceRequest, - DropNamespaceResponse, -) - - -class TempNamespace(LanceNamespace): - """A simple dictionary-backed namespace for testing.""" - - # Class-level storage to persist table registry across instances - _global_registry: Dict[str, Dict[str, str]] = {} - # Class-level storage for namespaces (supporting 1-level namespace) - _global_namespaces: Dict[str, set] = {} - - def __init__(self, **properties): - """Initialize the test namespace. - - Args: - root: The root directory for tables (optional) - **properties: Additional configuration properties - """ - self.config = TempNamespaceConfig(properties) - # Use the root as a key to maintain separate registries per root - root = self.config.root - if root not in self._global_registry: - self._global_registry[root] = {} - if root not in self._global_namespaces: - self._global_namespaces[root] = set() - self.tables = self._global_registry[root] # Reference to shared registry - self.namespaces = self._global_namespaces[ - root - ] # Reference to shared namespaces - - def namespace_id(self) -> str: - """Return a human-readable unique identifier for this namespace instance. - - Returns: - A unique identifier string based on the root directory - """ - return f"TempNamespace {{ root: '{self.config.root}' }}" - - def list_tables(self, request: ListTablesRequest) -> ListTablesResponse: - """List all tables in the namespace.""" - if not request.id: - # List all tables in root namespace - tables = [name for name in self.tables.keys() if "." not in name] - else: - # List tables in specific namespace (1-level only) - if len(request.id) == 1: - namespace_name = request.id[0] - prefix = f"{namespace_name}." - tables = [ - name[len(prefix) :] - for name in self.tables.keys() - if name.startswith(prefix) - ] - else: - # Multi-level namespaces not supported - raise ValueError("Only 1-level namespaces are supported") - return ListTablesResponse(tables=tables) - - def describe_table(self, request: DescribeTableRequest) -> DescribeTableResponse: - """Describe a table by returning its location.""" - if not request.id: - raise ValueError("Invalid table ID") - - if len(request.id) == 1: - # Root namespace table - table_name = request.id[0] - elif len(request.id) == 2: - # Namespaced table (1-level namespace) - namespace_name, table_name = request.id - table_name = f"{namespace_name}.{table_name}" - else: - raise ValueError("Only 1-level namespaces are supported") - - if table_name not in self.tables: - raise RuntimeError(f"Table does not exist: {table_name}") - - table_uri = self.tables[table_name] - return DescribeTableResponse(location=table_uri) - - def create_table( - self, request: CreateTableRequest, request_data: bytes - ) -> CreateTableResponse: - """Create a table in the namespace.""" - if not request.id: - raise ValueError("Invalid table ID") - - if len(request.id) == 1: - # Root namespace table - table_name = request.id[0] - table_uri = f"{self.config.root}/{table_name}.lance" - elif len(request.id) == 2: - # Namespaced table (1-level namespace) - namespace_name, base_table_name = request.id - # Add namespace to our namespace set - self.namespaces.add(namespace_name) - table_name = f"{namespace_name}.{base_table_name}" - table_uri = f"{self.config.root}/{namespace_name}/{base_table_name}.lance" - else: - raise ValueError("Only 1-level namespaces are supported") - - # Check if table already exists - if table_name in self.tables: - if request.mode == "overwrite": - # Drop existing table for overwrite mode - del self.tables[table_name] - else: - raise RuntimeError(f"Table already exists: {table_name}") - - # Parse the Arrow IPC stream to get the schema and create the actual table - import pyarrow.ipc as ipc - import io - import lance - import os - - # Create directory if needed for namespaced tables - os.makedirs(os.path.dirname(table_uri), exist_ok=True) - - # Read the IPC stream - reader = ipc.open_stream(io.BytesIO(request_data)) - table = reader.read_all() - - # Create the actual Lance table - lance.write_dataset(table, table_uri) - - # Store the table mapping - self.tables[table_name] = table_uri - - return CreateTableResponse(location=table_uri) - - def drop_table(self, request: DropTableRequest) -> DropTableResponse: - """Drop a table from the namespace.""" - if not request.id: - raise ValueError("Invalid table ID") - - if len(request.id) == 1: - # Root namespace table - table_name = request.id[0] - elif len(request.id) == 2: - # Namespaced table (1-level namespace) - namespace_name, base_table_name = request.id - table_name = f"{namespace_name}.{base_table_name}" - else: - raise ValueError("Only 1-level namespaces are supported") - - if table_name not in self.tables: - raise RuntimeError(f"Table does not exist: {table_name}") - - # Get the table URI - table_uri = self.tables[table_name] - - # Delete the actual table files - import shutil - import os - - if os.path.exists(table_uri): - shutil.rmtree(table_uri, ignore_errors=True) - - # Remove from registry - del self.tables[table_name] - - return DropTableResponse() - - def register_table(self, request: RegisterTableRequest) -> RegisterTableResponse: - """Register a table with the namespace.""" - if not request.id or len(request.id) != 1: - raise ValueError("Invalid table ID") - - if not request.location: - raise ValueError("Table location is required") - - table_name = request.id[0] - self.tables[table_name] = request.location - - return RegisterTableResponse() - - def deregister_table( - self, request: DeregisterTableRequest - ) -> DeregisterTableResponse: - """Deregister a table from the namespace.""" - if not request.id or len(request.id) != 1: - raise ValueError("Invalid table ID") - - table_name = request.id[0] - if table_name not in self.tables: - raise RuntimeError(f"Table does not exist: {table_name}") - - del self.tables[table_name] - return DeregisterTableResponse() - - def list_namespaces(self, request: ListNamespacesRequest) -> ListNamespacesResponse: - """List child namespaces.""" - if not request.id: - # List root-level namespaces - namespaces = list(self.namespaces) - elif len(request.id) == 1: - # For 1-level namespace, there are no child namespaces - namespaces = [] - else: - raise ValueError("Only 1-level namespaces are supported") - - return ListNamespacesResponse(namespaces=namespaces) - - def create_namespace( - self, request: CreateNamespaceRequest - ) -> CreateNamespaceResponse: - """Create a namespace.""" - if not request.id: - raise ValueError("Invalid namespace ID") - - if len(request.id) == 1: - # Create 1-level namespace - namespace_name = request.id[0] - self.namespaces.add(namespace_name) - - # Create directory for the namespace - import os - - namespace_dir = f"{self.config.root}/{namespace_name}" - os.makedirs(namespace_dir, exist_ok=True) - else: - raise ValueError("Only 1-level namespaces are supported") - - return CreateNamespaceResponse() - - def drop_namespace(self, request: DropNamespaceRequest) -> DropNamespaceResponse: - """Drop a namespace.""" - if not request.id: - raise ValueError("Invalid namespace ID") - - if len(request.id) == 1: - # Drop 1-level namespace - namespace_name = request.id[0] - - if namespace_name not in self.namespaces: - raise RuntimeError(f"Namespace does not exist: {namespace_name}") - - # Check if namespace has any tables - prefix = f"{namespace_name}." - tables_in_namespace = [ - name for name in self.tables.keys() if name.startswith(prefix) - ] - if tables_in_namespace: - raise RuntimeError( - f"Cannot drop namespace '{namespace_name}': contains tables" - ) - - # Remove namespace - self.namespaces.remove(namespace_name) - - # Remove directory - import shutil - import os - - namespace_dir = f"{self.config.root}/{namespace_name}" - if os.path.exists(namespace_dir): - shutil.rmtree(namespace_dir, ignore_errors=True) - else: - raise ValueError("Only 1-level namespaces are supported") - - return DropNamespaceResponse() - - -class TempNamespaceConfig: - """Configuration for TestNamespace.""" - - ROOT = "root" - - def __init__(self, properties: Optional[Dict[str, str]] = None): - """Initialize configuration from properties. - - Args: - properties: Dictionary of configuration properties - """ - if properties is None: - properties = {} - - self._root = properties.get(self.ROOT, "/tmp") - - @property - def root(self) -> str: - """Get the namespace root directory.""" - return self._root - - -NATIVE_IMPLS["temp"] = f"{TempNamespace.__module__}.TempNamespace" class TestNamespaceConnection: - """Test namespace-based LanceDB connection.""" + """Test namespace-based LanceDB connection using DirectoryNamespace.""" def setup_method(self): """Set up test fixtures.""" self.temp_dir = tempfile.mkdtemp() - # Clear the TestNamespace registry for this test - if self.temp_dir in TempNamespace._global_registry: - TempNamespace._global_registry[self.temp_dir].clear() - if self.temp_dir in TempNamespace._global_namespaces: - TempNamespace._global_namespaces[self.temp_dir].clear() def teardown_method(self): """Clean up test fixtures.""" - # Clear the TestNamespace registry - if self.temp_dir in TempNamespace._global_registry: - del TempNamespace._global_registry[self.temp_dir] - if self.temp_dir in TempNamespace._global_namespaces: - del TempNamespace._global_namespaces[self.temp_dir] shutil.rmtree(self.temp_dir, ignore_errors=True) def test_connect_namespace_test(self): - """Test connecting to LanceDB through TestNamespace.""" - # Connect using TestNamespace - db = lancedb.connect_namespace("temp", {"root": self.temp_dir}) + """Test connecting to LanceDB through DirectoryNamespace.""" + # Connect using DirectoryNamespace + db = lancedb.connect_namespace("dir", {"root": self.temp_dir}) # Should be a LanceNamespaceDBConnection assert isinstance(db, lancedb.LanceNamespaceDBConnection) - # Initially no tables + # Initially no tables in root assert len(list(db.table_names())) == 0 def test_create_table_through_namespace(self): """Test creating a table through namespace.""" - db = lancedb.connect_namespace("temp", {"root": self.temp_dir}) + db = lancedb.connect_namespace("dir", {"root": self.temp_dir}) + + # Create a child namespace first + db.create_namespace(["test_ns"]) # Define schema for empty table schema = pa.schema( @@ -361,13 +48,15 @@ class TestNamespaceConnection: ] ) - # Create empty table - table = db.create_table("test_table", schema=schema) + # Create empty table in child namespace + table = db.create_table("test_table", schema=schema, namespace=["test_ns"]) assert table is not None assert table.name == "test_table" + assert table.namespace == ["test_ns"] + assert table.id == "test_ns$test_table" - # Table should appear in namespace - table_names = list(db.table_names()) + # Table should appear in child namespace + table_names = list(db.table_names(namespace=["test_ns"])) assert "test_table" in table_names assert len(table_names) == 1 @@ -378,21 +67,26 @@ class TestNamespaceConnection: def test_open_table_through_namespace(self): """Test opening an existing table through namespace.""" - db = lancedb.connect_namespace("temp", {"root": self.temp_dir}) + db = lancedb.connect_namespace("dir", {"root": self.temp_dir}) - # Create a table with schema + # Create a child namespace first + db.create_namespace(["test_ns"]) + + # Create a table with schema in child namespace schema = pa.schema( [ pa.field("id", pa.int64()), pa.field("vector", pa.list_(pa.float32(), 2)), ] ) - db.create_table("test_table", schema=schema) + db.create_table("test_table", schema=schema, namespace=["test_ns"]) # Open the table - table = db.open_table("test_table") + table = db.open_table("test_table", namespace=["test_ns"]) assert table is not None assert table.name == "test_table" + assert table.namespace == ["test_ns"] + assert table.id == "test_ns$test_table" # Verify empty table with correct schema result = table.to_pandas() @@ -401,44 +95,50 @@ class TestNamespaceConnection: def test_drop_table_through_namespace(self): """Test dropping a table through namespace.""" - db = lancedb.connect_namespace("temp", {"root": self.temp_dir}) + db = lancedb.connect_namespace("dir", {"root": self.temp_dir}) - # Create tables + # Create a child namespace first + db.create_namespace(["test_ns"]) + + # Create tables in child namespace schema = pa.schema( [ pa.field("id", pa.int64()), pa.field("vector", pa.list_(pa.float32(), 2)), ] ) - db.create_table("table1", schema=schema) - db.create_table("table2", schema=schema) + db.create_table("table1", schema=schema, namespace=["test_ns"]) + db.create_table("table2", schema=schema, namespace=["test_ns"]) - # Verify both tables exist - table_names = list(db.table_names()) + # Verify both tables exist in child namespace + table_names = list(db.table_names(namespace=["test_ns"])) assert "table1" in table_names assert "table2" in table_names assert len(table_names) == 2 # Drop one table - db.drop_table("table1") + db.drop_table("table1", namespace=["test_ns"]) # Verify only table2 remains - table_names = list(db.table_names()) + table_names = list(db.table_names(namespace=["test_ns"])) assert "table1" not in table_names assert "table2" in table_names assert len(table_names) == 1 - # Test that drop_table works without explicit namespace parameter - db.drop_table("table2") - assert len(list(db.table_names())) == 0 + # Drop the second table + db.drop_table("table2", namespace=["test_ns"]) + assert len(list(db.table_names(namespace=["test_ns"]))) == 0 # Should not be able to open dropped table with pytest.raises(RuntimeError): - db.open_table("table1") + db.open_table("table1", namespace=["test_ns"]) def test_create_table_with_schema(self): """Test creating a table with explicit schema through namespace.""" - db = lancedb.connect_namespace("temp", {"root": self.temp_dir}) + db = lancedb.connect_namespace("dir", {"root": self.temp_dir}) + + # Create a child namespace first + db.create_namespace(["test_ns"]) # Define schema schema = pa.schema( @@ -449,9 +149,10 @@ class TestNamespaceConnection: ] ) - # Create table with schema - table = db.create_table("test_table", schema=schema) + # Create table with schema in child namespace + table = db.create_table("test_table", schema=schema, namespace=["test_ns"]) assert table is not None + assert table.namespace == ["test_ns"] # Verify schema table_schema = table.schema @@ -461,16 +162,19 @@ class TestNamespaceConnection: def test_rename_table_not_supported(self): """Test that rename_table raises NotImplementedError.""" - db = lancedb.connect_namespace("temp", {"root": self.temp_dir}) + db = lancedb.connect_namespace("dir", {"root": self.temp_dir}) - # Create a table + # Create a child namespace first + db.create_namespace(["test_ns"]) + + # Create a table in child namespace schema = pa.schema( [ pa.field("id", pa.int64()), pa.field("vector", pa.list_(pa.float32(), 2)), ] ) - db.create_table("old_name", schema=schema) + db.create_table("old_name", schema=schema, namespace=["test_ns"]) # Rename should raise NotImplementedError with pytest.raises(NotImplementedError, match="rename_table is not supported"): @@ -478,9 +182,12 @@ class TestNamespaceConnection: def test_drop_all_tables(self): """Test dropping all tables through namespace.""" - db = lancedb.connect_namespace("temp", {"root": self.temp_dir}) + db = lancedb.connect_namespace("dir", {"root": self.temp_dir}) - # Create multiple tables + # Create a child namespace first + db.create_namespace(["test_ns"]) + + # Create multiple tables in child namespace schema = pa.schema( [ pa.field("id", pa.int64()), @@ -488,27 +195,30 @@ class TestNamespaceConnection: ] ) for i in range(3): - db.create_table(f"table{i}", schema=schema) + db.create_table(f"table{i}", schema=schema, namespace=["test_ns"]) - # Verify tables exist - assert len(list(db.table_names())) == 3 + # Verify tables exist in child namespace + assert len(list(db.table_names(namespace=["test_ns"]))) == 3 - # Drop all tables - db.drop_all_tables() + # Drop all tables in child namespace + db.drop_all_tables(namespace=["test_ns"]) - # Verify all tables are gone - assert len(list(db.table_names())) == 0 + # Verify all tables are gone from child namespace + assert len(list(db.table_names(namespace=["test_ns"]))) == 0 # Test that table_names works with keyword-only namespace parameter - db.create_table("test_table", schema=schema) - result = list(db.table_names(namespace=[])) + db.create_table("test_table", schema=schema, namespace=["test_ns"]) + result = list(db.table_names(namespace=["test_ns"])) assert "test_table" in result def test_table_operations(self): """Test various table operations through namespace.""" - db = lancedb.connect_namespace("temp", {"root": self.temp_dir}) + db = lancedb.connect_namespace("dir", {"root": self.temp_dir}) - # Create a table with schema + # Create a child namespace first + db.create_namespace(["test_ns"]) + + # Create a table with schema in child namespace schema = pa.schema( [ pa.field("id", pa.int64()), @@ -516,7 +226,7 @@ class TestNamespaceConnection: pa.field("text", pa.string()), ] ) - table = db.create_table("test_table", schema=schema) + table = db.create_table("test_table", schema=schema, namespace=["test_ns"]) # Verify empty table was created result = table.to_pandas() @@ -548,7 +258,7 @@ class TestNamespaceConnection: # Connect with storage options storage_opts = {"test_option": "test_value"} db = lancedb.connect_namespace( - "temp", {"root": self.temp_dir}, storage_options=storage_opts + "dir", {"root": self.temp_dir}, storage_options=storage_opts ) # Storage options should be preserved @@ -566,7 +276,7 @@ class TestNamespaceConnection: def test_namespace_operations(self): """Test namespace management operations.""" - db = lancedb.connect_namespace("temp", {"root": self.temp_dir}) + db = lancedb.connect_namespace("dir", {"root": self.temp_dir}) # Initially no namespaces assert len(list(db.list_namespaces())) == 0 @@ -617,7 +327,7 @@ class TestNamespaceConnection: def test_namespace_with_tables_cannot_be_dropped(self): """Test that namespaces containing tables cannot be dropped.""" - db = lancedb.connect_namespace("temp", {"root": self.temp_dir}) + db = lancedb.connect_namespace("dir", {"root": self.temp_dir}) # Create namespace and table db.create_namespace(["test_namespace"]) @@ -630,7 +340,7 @@ class TestNamespaceConnection: db.create_table("test_table", schema=schema, namespace=["test_namespace"]) # Try to drop namespace with tables - should fail - with pytest.raises(RuntimeError, match="contains tables"): + with pytest.raises(RuntimeError, match="is not empty"): db.drop_namespace(["test_namespace"]) # Drop table first @@ -640,7 +350,7 @@ class TestNamespaceConnection: db.drop_namespace(["test_namespace"]) def test_same_table_name_different_namespaces(self): - db = lancedb.connect_namespace("temp", {"root": self.temp_dir}) + db = lancedb.connect_namespace("dir", {"root": self.temp_dir}) # Create two namespaces db.create_namespace(["namespace_a"]) diff --git a/python/python/tests/test_namespace_integration.py b/python/python/tests/test_namespace_integration.py new file mode 100644 index 00000000..d3e08ee1 --- /dev/null +++ b/python/python/tests/test_namespace_integration.py @@ -0,0 +1,632 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright The LanceDB Authors + +""" +Integration tests for LanceDB Namespace with S3 and credential refresh. + +This test simulates a namespace server that returns incrementing credentials +and verifies that the credential refresh mechanism works correctly for both +create_table and open_table operations. + +Tests verify: +- Storage options provider is auto-created and used +- Credentials are properly cached during reads +- Credentials refresh when they expire +- Both create and open operations work with credential rotation +""" + +import copy +import time +import uuid +from threading import Lock +from typing import Dict + +import pyarrow as pa +import pytest +from lance_namespace import ( + CreateEmptyTableRequest, + CreateEmptyTableResponse, + DescribeTableRequest, + DescribeTableResponse, + LanceNamespace, +) +from lancedb.namespace import LanceNamespaceDBConnection + +# LocalStack S3 configuration +CONFIG = { + "allow_http": "true", + "aws_access_key_id": "ACCESSKEY", + "aws_secret_access_key": "SECRETKEY", + "aws_endpoint": "http://localhost:4566", + "aws_region": "us-east-1", +} + + +def get_boto3_client(*args, **kwargs): + import boto3 + + return boto3.client( + *args, + region_name=CONFIG["aws_region"], + aws_access_key_id=CONFIG["aws_access_key_id"], + aws_secret_access_key=CONFIG["aws_secret_access_key"], + **kwargs, + ) + + +@pytest.fixture(scope="module") +def s3_bucket(): + """Create and cleanup S3 bucket for integration tests.""" + s3 = get_boto3_client("s3", endpoint_url=CONFIG["aws_endpoint"]) + bucket_name = "lancedb-namespace-integtest" + + # Clean up existing bucket if it exists + try: + delete_bucket(s3, bucket_name) + except s3.exceptions.NoSuchBucket: + pass + + s3.create_bucket(Bucket=bucket_name) + yield bucket_name + + # Cleanup after tests + delete_bucket(s3, bucket_name) + + +def delete_bucket(s3, bucket_name): + """Delete S3 bucket and all its contents.""" + try: + # Delete all objects first + paginator = s3.get_paginator("list_objects_v2") + for page in paginator.paginate(Bucket=bucket_name): + if "Contents" in page: + for obj in page["Contents"]: + s3.delete_object(Bucket=bucket_name, Key=obj["Key"]) + s3.delete_bucket(Bucket=bucket_name) + except Exception: + pass + + +class TrackingNamespace(LanceNamespace): + """ + Mock namespace that wraps DirectoryNamespace and tracks API calls. + + This namespace returns incrementing credentials with each API call to simulate + credential rotation. It also tracks the number of times each API is called + to verify caching behavior. + """ + + def __init__( + self, + bucket_name: str, + storage_options: Dict[str, str], + credential_expires_in_seconds: int = 60, + ): + from lance.namespace import DirectoryNamespace + + self.bucket_name = bucket_name + self.base_storage_options = storage_options + self.credential_expires_in_seconds = credential_expires_in_seconds + self.describe_call_count = 0 + self.create_call_count = 0 + self.lock = Lock() + + # Create underlying DirectoryNamespace with storage options + dir_props = {f"storage.{k}": v for k, v in storage_options.items()} + + # Use S3 path for bucket name, local path for file paths + if bucket_name.startswith("/") or bucket_name.startswith("file://"): + dir_props["root"] = f"{bucket_name}/namespace_root" + else: + dir_props["root"] = f"s3://{bucket_name}/namespace_root" + + self.inner = DirectoryNamespace(**dir_props) + + def get_describe_call_count(self) -> int: + """Thread-safe getter for describe call count.""" + with self.lock: + return self.describe_call_count + + def get_create_call_count(self) -> int: + """Thread-safe getter for create call count.""" + with self.lock: + return self.create_call_count + + def namespace_id(self) -> str: + """Return namespace identifier.""" + return f"TrackingNamespace {{ inner: {self.inner.namespace_id()} }}" + + def _modify_storage_options( + self, storage_options: Dict[str, str], count: int + ) -> Dict[str, str]: + """ + Add incrementing credentials with expiration timestamp. + + This simulates a credential rotation system where each call returns + new credentials that expire after credential_expires_in_seconds. + """ + modified = copy.deepcopy(storage_options) if storage_options else {} + + # Increment credentials to simulate rotation + modified["aws_access_key_id"] = f"AKID_{count}" + modified["aws_secret_access_key"] = f"SECRET_{count}" + modified["aws_session_token"] = f"TOKEN_{count}" + + # Set expiration time + expires_at_millis = int( + (time.time() + self.credential_expires_in_seconds) * 1000 + ) + modified["expires_at_millis"] = str(expires_at_millis) + + return modified + + def create_empty_table( + self, request: CreateEmptyTableRequest + ) -> CreateEmptyTableResponse: + """Track create_empty_table calls and inject rotating credentials.""" + with self.lock: + self.create_call_count += 1 + count = self.create_call_count + + response = self.inner.create_empty_table(request) + response.storage_options = self._modify_storage_options( + response.storage_options, count + ) + + return response + + def describe_table(self, request: DescribeTableRequest) -> DescribeTableResponse: + """Track describe_table calls and inject rotating credentials.""" + with self.lock: + self.describe_call_count += 1 + count = self.describe_call_count + + response = self.inner.describe_table(request) + response.storage_options = self._modify_storage_options( + response.storage_options, count + ) + + return response + + # Pass through other methods to inner namespace + def list_tables(self, request): + return self.inner.list_tables(request) + + def drop_table(self, request): + return self.inner.drop_table(request) + + def list_namespaces(self, request): + return self.inner.list_namespaces(request) + + def create_namespace(self, request): + return self.inner.create_namespace(request) + + def drop_namespace(self, request): + return self.inner.drop_namespace(request) + + +@pytest.mark.s3_test +def test_namespace_create_table_with_provider(s3_bucket: str): + """ + Test creating a table through namespace with storage options provider. + + Verifies: + - create_empty_table is called once to reserve location + - Storage options provider is auto-created + - Table can be written successfully + - Credentials are cached during write operations + """ + storage_options = copy.deepcopy(CONFIG) + + namespace = TrackingNamespace( + bucket_name=s3_bucket, + storage_options=storage_options, + credential_expires_in_seconds=3600, # 1 hour + ) + + db = LanceNamespaceDBConnection(namespace) + + # Create unique namespace for this test + namespace_name = f"test_ns_{uuid.uuid4().hex[:8]}" + db.create_namespace([namespace_name]) + + table_name = f"test_table_{uuid.uuid4().hex}" + namespace_path = [namespace_name] + + # Verify initial state + assert namespace.get_create_call_count() == 0 + assert namespace.get_describe_call_count() == 0 + + # Create table with data + data = pa.table( + { + "id": [1, 2, 3], + "vector": [[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]], + "text": ["hello", "world", "test"], + } + ) + + table = db.create_table(table_name, data, namespace=namespace_path) + + # Verify create_empty_table was called exactly once + assert namespace.get_create_call_count() == 1 + # describe_table should NOT be called during create in create mode + assert namespace.get_describe_call_count() == 0 + + # Verify table was created successfully + assert table.name == table_name + result = table.to_pandas() + assert len(result) == 3 + assert list(result["id"]) == [1, 2, 3] + + +@pytest.mark.s3_test +def test_namespace_open_table_with_provider(s3_bucket: str): + """ + Test opening a table through namespace with storage options provider. + + Verifies: + - describe_table is called once when opening + - Storage options provider is auto-created + - Table can be read successfully + - Credentials are cached during read operations + """ + storage_options = copy.deepcopy(CONFIG) + + namespace = TrackingNamespace( + bucket_name=s3_bucket, + storage_options=storage_options, + credential_expires_in_seconds=3600, + ) + + db = LanceNamespaceDBConnection(namespace) + + # Create unique namespace for this test + namespace_name = f"test_ns_{uuid.uuid4().hex[:8]}" + db.create_namespace([namespace_name]) + + table_name = f"test_table_{uuid.uuid4().hex}" + namespace_path = [namespace_name] + + # Create table first + data = pa.table( + { + "id": [1, 2, 3, 4, 5], + "vector": [[1.0, 2.0], [3.0, 4.0], [5.0, 6.0], [7.0, 8.0], [9.0, 10.0]], + "value": [10, 20, 30, 40, 50], + } + ) + + db.create_table(table_name, data, namespace=namespace_path) + + initial_create_count = namespace.get_create_call_count() + assert initial_create_count == 1 + + # Open the table + opened_table = db.open_table(table_name, namespace=namespace_path) + + # Verify describe_table was called exactly once + assert namespace.get_describe_call_count() == 1 + # create_empty_table should not be called again + assert namespace.get_create_call_count() == initial_create_count + + # Perform multiple read operations + describe_count_after_open = namespace.get_describe_call_count() + + for _ in range(3): + result = opened_table.to_pandas() + assert len(result) == 5 + count = opened_table.count_rows() + assert count == 5 + + # Verify credentials were cached (no additional describe_table calls) + assert namespace.get_describe_call_count() == describe_count_after_open + + +@pytest.mark.s3_test +def test_namespace_credential_refresh_on_read(s3_bucket: str): + """ + Test credential refresh when credentials expire during read operations. + + Verifies: + - Credentials are cached initially (no additional describe_table calls) + - After expiration, credentials are refreshed (describe_table called again) + - Read operations continue to work with refreshed credentials + """ + storage_options = copy.deepcopy(CONFIG) + + namespace = TrackingNamespace( + bucket_name=s3_bucket, + storage_options=storage_options, + credential_expires_in_seconds=3, # Short expiration for testing + ) + + db = LanceNamespaceDBConnection(namespace) + + # Create unique namespace for this test + namespace_name = f"test_ns_{uuid.uuid4().hex[:8]}" + db.create_namespace([namespace_name]) + + table_name = f"test_table_{uuid.uuid4().hex}" + namespace_path = [namespace_name] + + # Create table + data = pa.table( + { + "id": [1, 2, 3], + "vector": [[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]], + } + ) + + db.create_table(table_name, data, namespace=namespace_path) + + # Open table (triggers describe_table) + opened_table = db.open_table(table_name, namespace=namespace_path) + + # Perform an immediate read (should use credentials from open) + result = opened_table.to_pandas() + assert len(result) == 3 + + describe_count_after_first_read = namespace.get_describe_call_count() + + # Wait for credentials to expire (3 seconds + buffer) + time.sleep(5) + + # Perform read after expiration (should trigger credential refresh) + result = opened_table.to_pandas() + assert len(result) == 3 + + describe_count_after_refresh = namespace.get_describe_call_count() + # Verify describe_table was called again (credential refresh) + refresh_delta = describe_count_after_refresh - describe_count_after_first_read + + # Verify the exact count: credential refresh should call describe_table exactly + # once + assert refresh_delta == 1, ( + f"Credential refresh should call describe_table exactly once " + f"(got {refresh_delta})" + ) + + +@pytest.mark.s3_test +def test_namespace_credential_refresh_on_write(s3_bucket: str): + """ + Test credential refresh when credentials expire during write operations. + + Verifies: + - Credentials are cached during initial writes + - After expiration, new credentials are fetched before writes + - Write operations continue to work with refreshed credentials + """ + storage_options = copy.deepcopy(CONFIG) + + namespace = TrackingNamespace( + bucket_name=s3_bucket, + storage_options=storage_options, + credential_expires_in_seconds=3, # Short expiration + ) + + db = LanceNamespaceDBConnection(namespace) + + # Create unique namespace for this test + namespace_name = f"test_ns_{uuid.uuid4().hex[:8]}" + db.create_namespace([namespace_name]) + + table_name = f"test_table_{uuid.uuid4().hex}" + namespace_path = [namespace_name] + + # Create table + initial_data = pa.table( + { + "id": [1, 2], + "vector": [[1.0, 2.0], [3.0, 4.0]], + } + ) + + table = db.create_table(table_name, initial_data, namespace=namespace_path) + + # Add more data (should use cached credentials) + new_data = pa.table( + { + "id": [3, 4], + "vector": [[5.0, 6.0], [7.0, 8.0]], + } + ) + table.add(new_data) + + # Wait for credentials to expire + time.sleep(5) + + # Add more data (should trigger credential refresh) + more_data = pa.table( + { + "id": [5, 6], + "vector": [[9.0, 10.0], [11.0, 12.0]], + } + ) + table.add(more_data) + + # Verify final row count + assert table.count_rows() == 6 + + +@pytest.mark.s3_test +def test_namespace_overwrite_mode(s3_bucket: str): + """ + Test creating table in overwrite mode with credential tracking. + + Verifies: + - First create calls create_empty_table exactly once + - Overwrite mode calls describe_table exactly once to check existence + - Storage options provider works in overwrite mode + """ + storage_options = copy.deepcopy(CONFIG) + + namespace = TrackingNamespace( + bucket_name=s3_bucket, + storage_options=storage_options, + credential_expires_in_seconds=3600, + ) + + db = LanceNamespaceDBConnection(namespace) + + # Create unique namespace for this test + namespace_name = f"test_ns_{uuid.uuid4().hex[:8]}" + db.create_namespace([namespace_name]) + + table_name = f"test_table_{uuid.uuid4().hex}" + namespace_path = [namespace_name] + + # Create initial table + data1 = pa.table( + { + "id": [1, 2], + "vector": [[1.0, 2.0], [3.0, 4.0]], + } + ) + + table = db.create_table(table_name, data1, namespace=namespace_path) + # Exactly one create_empty_table call for initial create + assert namespace.get_create_call_count() == 1 + # No describe_table calls in create mode + assert namespace.get_describe_call_count() == 0 + assert table.count_rows() == 2 + + # Overwrite the table + data2 = pa.table( + { + "id": [10, 20, 30], + "vector": [[10.0, 20.0], [30.0, 40.0], [50.0, 60.0]], + } + ) + + table2 = db.create_table( + table_name, data2, namespace=namespace_path, mode="overwrite" + ) + + # Should still have only 1 create_empty_table call + # (overwrite reuses location from describe_table) + assert namespace.get_create_call_count() == 1 + # Should have called describe_table exactly once to get existing table location + assert namespace.get_describe_call_count() == 1 + + # Verify new data + assert table2.count_rows() == 3 + result = table2.to_pandas() + assert list(result["id"]) == [10, 20, 30] + + +@pytest.mark.s3_test +def test_namespace_multiple_tables(s3_bucket: str): + """ + Test creating and opening multiple tables in the same namespace. + + Verifies: + - Each table gets its own storage options provider + - Credentials are tracked independently per table + - Multiple tables can coexist in the same namespace + """ + storage_options = copy.deepcopy(CONFIG) + + namespace = TrackingNamespace( + bucket_name=s3_bucket, + storage_options=storage_options, + credential_expires_in_seconds=3600, + ) + + db = LanceNamespaceDBConnection(namespace) + + # Create unique namespace for this test + namespace_name = f"test_ns_{uuid.uuid4().hex[:8]}" + db.create_namespace([namespace_name]) + namespace_path = [namespace_name] + + # Create first table + table1_name = f"table1_{uuid.uuid4().hex}" + data1 = pa.table({"id": [1, 2], "value": [10, 20]}) + db.create_table(table1_name, data1, namespace=namespace_path) + + # Create second table + table2_name = f"table2_{uuid.uuid4().hex}" + data2 = pa.table({"id": [3, 4], "value": [30, 40]}) + db.create_table(table2_name, data2, namespace=namespace_path) + + # Should have 2 create calls (one per table) + assert namespace.get_create_call_count() == 2 + + # Open both tables + opened1 = db.open_table(table1_name, namespace=namespace_path) + opened2 = db.open_table(table2_name, namespace=namespace_path) + + # Should have 2 describe calls (one per open) + assert namespace.get_describe_call_count() == 2 + + # Verify both tables work independently + assert opened1.count_rows() == 2 + assert opened2.count_rows() == 2 + + result1 = opened1.to_pandas() + result2 = opened2.to_pandas() + + assert list(result1["id"]) == [1, 2] + assert list(result2["id"]) == [3, 4] + + +@pytest.mark.s3_test +def test_namespace_with_schema_only(s3_bucket: str): + """ + Test creating empty table with schema only (no data). + + Verifies: + - Empty table creation works with storage options provider + - describe_table is NOT called during create + - Data can be added later + """ + storage_options = copy.deepcopy(CONFIG) + + namespace = TrackingNamespace( + bucket_name=s3_bucket, + storage_options=storage_options, + credential_expires_in_seconds=3600, + ) + + db = LanceNamespaceDBConnection(namespace) + + # Create unique namespace for this test + namespace_name = f"test_ns_{uuid.uuid4().hex[:8]}" + db.create_namespace([namespace_name]) + + table_name = f"test_table_{uuid.uuid4().hex}" + namespace_path = [namespace_name] + + # Create empty table with schema + schema = pa.schema( + [ + pa.field("id", pa.int64()), + pa.field("vector", pa.list_(pa.float32(), 2)), + pa.field("text", pa.utf8()), + ] + ) + + table = db.create_table(table_name, schema=schema, namespace=namespace_path) + + # Should have called create_empty_table once + assert namespace.get_create_call_count() == 1 + # Should NOT have called describe_table in create mode + assert namespace.get_describe_call_count() == 0 + + # Verify empty table + assert table.count_rows() == 0 + + # Add data + data = pa.table( + { + "id": [1, 2], + "vector": [[1.0, 2.0], [3.0, 4.0]], + "text": ["hello", "world"], + } + ) + table.add(data) + + # Verify data was added + assert table.count_rows() == 2 diff --git a/python/src/connection.rs b/python/src/connection.rs index 148ab100..981b9cb6 100644 --- a/python/src/connection.rs +++ b/python/src/connection.rs @@ -10,11 +10,14 @@ use lancedb::{ }; use pyo3::{ exceptions::{PyRuntimeError, PyValueError}, - pyclass, pyfunction, pymethods, Bound, FromPyObject, Py, PyAny, PyRef, PyResult, Python, + pyclass, pyfunction, pymethods, Bound, FromPyObject, Py, PyAny, PyObject, PyRef, PyResult, + Python, }; use pyo3_async_runtimes::tokio::future_into_py; -use crate::{error::PythonErrorExt, table::Table}; +use crate::{ + error::PythonErrorExt, storage_options::py_object_to_storage_options_provider, table::Table, +}; #[pyclass] pub struct Connection { @@ -101,7 +104,8 @@ impl Connection { future_into_py(self_.py(), async move { op.execute().await.infer_error() }) } - #[pyo3(signature = (name, mode, data, namespace=vec![], storage_options=None))] + #[allow(clippy::too_many_arguments)] + #[pyo3(signature = (name, mode, data, namespace=vec![], storage_options=None, storage_options_provider=None, location=None))] pub fn create_table<'a>( self_: PyRef<'a, Self>, name: String, @@ -109,6 +113,8 @@ impl Connection { data: Bound<'_, PyAny>, namespace: Vec, storage_options: Option>, + storage_options_provider: Option, + location: Option, ) -> PyResult> { let inner = self_.get_inner()?.clone(); @@ -122,6 +128,13 @@ impl Connection { if let Some(storage_options) = storage_options { builder = builder.storage_options(storage_options); } + if let Some(provider_obj) = storage_options_provider { + let provider = py_object_to_storage_options_provider(provider_obj)?; + builder = builder.storage_options_provider(provider); + } + if let Some(location) = location { + builder = builder.location(location); + } future_into_py(self_.py(), async move { let table = builder.execute().await.infer_error()?; @@ -129,7 +142,8 @@ impl Connection { }) } - #[pyo3(signature = (name, mode, schema, namespace=vec![], storage_options=None))] + #[allow(clippy::too_many_arguments)] + #[pyo3(signature = (name, mode, schema, namespace=vec![], storage_options=None, storage_options_provider=None, location=None))] pub fn create_empty_table<'a>( self_: PyRef<'a, Self>, name: String, @@ -137,6 +151,8 @@ impl Connection { schema: Bound<'_, PyAny>, namespace: Vec, storage_options: Option>, + storage_options_provider: Option, + location: Option, ) -> PyResult> { let inner = self_.get_inner()?.clone(); @@ -150,6 +166,13 @@ impl Connection { if let Some(storage_options) = storage_options { builder = builder.storage_options(storage_options); } + if let Some(provider_obj) = storage_options_provider { + let provider = py_object_to_storage_options_provider(provider_obj)?; + builder = builder.storage_options_provider(provider); + } + if let Some(location) = location { + builder = builder.location(location); + } future_into_py(self_.py(), async move { let table = builder.execute().await.infer_error()?; @@ -157,13 +180,15 @@ impl Connection { }) } - #[pyo3(signature = (name, namespace=vec![], storage_options = None, index_cache_size = None))] + #[pyo3(signature = (name, namespace=vec![], storage_options = None, storage_options_provider=None, index_cache_size = None, location=None))] pub fn open_table( self_: PyRef<'_, Self>, name: String, namespace: Vec, storage_options: Option>, + storage_options_provider: Option, index_cache_size: Option, + location: Option, ) -> PyResult> { let inner = self_.get_inner()?.clone(); @@ -172,9 +197,16 @@ impl Connection { if let Some(storage_options) = storage_options { builder = builder.storage_options(storage_options); } + if let Some(provider_obj) = storage_options_provider { + let provider = py_object_to_storage_options_provider(provider_obj)?; + builder = builder.storage_options_provider(provider); + } if let Some(index_cache_size) = index_cache_size { builder = builder.index_cache_size(index_cache_size); } + if let Some(location) = location { + builder = builder.location(location); + } future_into_py(self_.py(), async move { let table = builder.execute().await.infer_error()?; diff --git a/python/src/lib.rs b/python/src/lib.rs index 8c4cedb1..3c7289ca 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -26,6 +26,7 @@ pub mod index; pub mod permutation; pub mod query; pub mod session; +pub mod storage_options; pub mod table; pub mod util; diff --git a/python/src/storage_options.rs b/python/src/storage_options.rs new file mode 100644 index 00000000..ca604db3 --- /dev/null +++ b/python/src/storage_options.rs @@ -0,0 +1,150 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The LanceDB Authors + +//! PyO3 bindings for StorageOptionsProvider +//! +//! This module provides the bridge between Python StorageOptionsProvider objects +//! and Rust's StorageOptionsProvider trait, enabling automatic credential refresh. + +use std::collections::HashMap; +use std::sync::Arc; + +use async_trait::async_trait; +use lance_io::object_store::StorageOptionsProvider; +use pyo3::prelude::*; +use pyo3::types::PyDict; + +/// Internal wrapper around a Python object implementing StorageOptionsProvider +pub struct PyStorageOptionsProvider { + /// The Python object implementing fetch_storage_options() + inner: PyObject, +} + +impl Clone for PyStorageOptionsProvider { + fn clone(&self) -> Self { + Python::with_gil(|py| Self { + inner: self.inner.clone_ref(py), + }) + } +} + +impl PyStorageOptionsProvider { + pub fn new(obj: PyObject) -> PyResult { + Python::with_gil(|py| { + // Verify the object has a fetch_storage_options method + if !obj.bind(py).hasattr("fetch_storage_options")? { + return Err(pyo3::exceptions::PyTypeError::new_err( + "StorageOptionsProvider must implement fetch_storage_options() method", + )); + } + Ok(Self { inner: obj }) + }) + } +} + +/// Wrapper that implements the Rust StorageOptionsProvider trait +pub struct PyStorageOptionsProviderWrapper { + py_provider: PyStorageOptionsProvider, +} + +impl PyStorageOptionsProviderWrapper { + pub fn new(py_provider: PyStorageOptionsProvider) -> Self { + Self { py_provider } + } +} + +#[async_trait] +impl StorageOptionsProvider for PyStorageOptionsProviderWrapper { + async fn fetch_storage_options(&self) -> lance_core::Result>> { + // Call Python method from async context using spawn_blocking + let py_provider = self.py_provider.clone(); + + tokio::task::spawn_blocking(move || { + Python::with_gil(|py| { + // Call the Python fetch_storage_options method + let result = py_provider + .inner + .bind(py) + .call_method0("fetch_storage_options") + .map_err(|e| lance_core::Error::IO { + source: Box::new(std::io::Error::other(format!( + "Failed to call fetch_storage_options: {}", + e + ))), + location: snafu::location!(), + })?; + + // If result is None, return None + if result.is_none() { + return Ok(None); + } + + // Extract the result dict - should be a flat Map + let result_dict = result.downcast::().map_err(|_| { + lance_core::Error::InvalidInput { + source: "fetch_storage_options() must return None or a dict of string key-value pairs".into(), + location: snafu::location!(), + } + })?; + + // Convert all entries to HashMap + let mut storage_options = HashMap::new(); + for (key, value) in result_dict.iter() { + let key_str: String = key.extract().map_err(|e| { + lance_core::Error::InvalidInput { + source: format!("Storage option key must be a string: {}", e).into(), + location: snafu::location!(), + } + })?; + let value_str: String = value.extract().map_err(|e| { + lance_core::Error::InvalidInput { + source: format!("Storage option value must be a string: {}", e).into(), + location: snafu::location!(), + } + })?; + storage_options.insert(key_str, value_str); + } + + Ok(Some(storage_options)) + }) + }) + .await + .map_err(|e| lance_core::Error::IO { + source: Box::new(std::io::Error::other(format!( + "Task join error: {}", + e + ))), + location: snafu::location!(), + })? + } + + fn provider_id(&self) -> String { + Python::with_gil(|py| { + // Call provider_id() method on the Python object + let obj = self.py_provider.inner.bind(py); + obj.call_method0("provider_id") + .and_then(|result| result.extract::()) + .unwrap_or_else(|e| { + // If provider_id() fails, construct a fallback ID + format!("PyStorageOptionsProvider(error: {})", e) + }) + }) + } +} + +impl std::fmt::Debug for PyStorageOptionsProviderWrapper { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "PyStorageOptionsProviderWrapper({})", self.provider_id()) + } +} + +/// Convert a Python object to an Arc +/// +/// This is the main entry point for converting Python StorageOptionsProvider objects +/// to Rust trait objects that can be used by the Lance ecosystem. +pub fn py_object_to_storage_options_provider( + py_obj: PyObject, +) -> PyResult> { + let py_provider = PyStorageOptionsProvider::new(py_obj)?; + Ok(Arc::new(PyStorageOptionsProviderWrapper::new(py_provider))) +} diff --git a/rust/lancedb/src/connection.rs b/rust/lancedb/src/connection.rs index 0c60121b..3102099f 100644 --- a/rust/lancedb/src/connection.rs +++ b/rust/lancedb/src/connection.rs @@ -35,6 +35,7 @@ use crate::Table; pub use lance_encoding::version::LanceFileVersion; #[cfg(feature = "remote")] use lance_io::object_store::StorageOptions; +use lance_io::object_store::StorageOptionsProvider; /// A builder for configuring a [`Connection::table_names`] operation pub struct TableNamesBuilder { @@ -360,6 +361,30 @@ impl CreateTableBuilder { self.request.namespace = namespace; self } + + /// Set a custom location for the table. + /// + /// If not set, the database will derive a location from its URI and the table name. + /// This is useful when integrating with namespace systems that manage table locations. + pub fn location(mut self, location: impl Into) -> Self { + self.request.location = Some(location.into()); + self + } + + /// Set a storage options provider for automatic credential refresh. + /// + /// This allows tables to automatically refresh cloud storage credentials + /// when they expire, enabling long-running operations on remote storage. + pub fn storage_options_provider(mut self, provider: Arc) -> Self { + self.request + .write_options + .lance_write_params + .get_or_insert(Default::default()) + .store_params + .get_or_insert(Default::default()) + .storage_options_provider = Some(provider); + self + } } #[derive(Clone, Debug)] @@ -382,6 +407,7 @@ impl OpenTableBuilder { namespace: vec![], index_cache_size: None, lance_read_params: None, + location: None, }, embedding_registry, } @@ -461,6 +487,29 @@ impl OpenTableBuilder { self } + /// Set a custom location for the table. + /// + /// If not set, the database will derive a location from its URI and the table name. + /// This is useful when integrating with namespace systems that manage table locations. + pub fn location(mut self, location: impl Into) -> Self { + self.request.location = Some(location.into()); + self + } + + /// Set a storage options provider for automatic credential refresh. + /// + /// This allows tables to automatically refresh cloud storage credentials + /// when they expire, enabling long-running operations on remote storage. + pub fn storage_options_provider(mut self, provider: Arc) -> Self { + self.request + .lance_read_params + .get_or_insert(Default::default()) + .store_options + .get_or_insert(Default::default()) + .storage_options_provider = Some(provider); + self + } + /// Open the table pub async fn execute(self) -> Result { let table = self.parent.open_table(self.request).await?; diff --git a/rust/lancedb/src/database.rs b/rust/lancedb/src/database.rs index 7816518e..b7ce3613 100644 --- a/rust/lancedb/src/database.rs +++ b/rust/lancedb/src/database.rs @@ -84,6 +84,9 @@ pub struct OpenTableRequest { pub namespace: Vec, pub index_cache_size: Option, pub lance_read_params: Option, + /// Optional custom location for the table. If not provided, the database will + /// derive a location based on its URI and the table name. + pub location: Option, } pub type TableBuilderCallback = Box OpenTableRequest + Send>; @@ -164,6 +167,9 @@ pub struct CreateTableRequest { pub mode: CreateTableMode, /// Options to use when writing data (only used if `data` is not None) pub write_options: WriteOptions, + /// Optional custom location for the table. If not provided, the database will + /// derive a location based on its URI and the table name. + pub location: Option, } impl CreateTableRequest { @@ -174,6 +180,7 @@ impl CreateTableRequest { data, mode: CreateTableMode::default(), write_options: WriteOptions::default(), + location: None, } } } diff --git a/rust/lancedb/src/database/listing.rs b/rust/lancedb/src/database/listing.rs index f3d24740..b30c021e 100644 --- a/rust/lancedb/src/database/listing.rs +++ b/rust/lancedb/src/database/listing.rs @@ -12,6 +12,7 @@ use lance::dataset::{builder::DatasetBuilder, ReadParams, WriteMode}; use lance::io::{ObjectStore, ObjectStoreParams, WrappingObjectStore}; use lance_datafusion::utils::StreamingWriteSource; use lance_encoding::version::LanceFileVersion; +use lance_io::object_store::StorageOptionsProvider; use lance_table::io::commit::commit_handler_from_url; use object_store::local::LocalFileSystem; use snafu::ResultExt; @@ -218,6 +219,9 @@ pub struct ListingDatabase { // Storage options to be inherited by tables created from this connection storage_options: HashMap, + // Dynamic storage options provider for automatic credential refresh + pub(crate) storage_options_provider: Option>, + // Options for tables created by this connection new_table_config: NewTableConfig, @@ -335,7 +339,9 @@ impl ListingDatabase { ) .await?; if object_store.is_local() { - Self::try_create_dir(&plain_uri).context(CreateDirSnafu { path: plain_uri })?; + Self::try_create_dir(&plain_uri).context(CreateDirSnafu { + path: plain_uri.clone(), + })?; } let write_store_wrapper = match mirrored_store { @@ -355,6 +361,7 @@ impl ListingDatabase { store_wrapper: write_store_wrapper, read_consistency_interval: request.read_consistency_interval, storage_options: options.storage_options, + storage_options_provider: None, new_table_config: options.new_table_config, session, }) @@ -396,6 +403,7 @@ impl ListingDatabase { store_wrapper: None, read_consistency_interval, storage_options: HashMap::new(), + storage_options_provider: None, new_table_config, session, }) @@ -403,7 +411,20 @@ impl ListingDatabase { /// Try to create a local directory to store the lancedb dataset fn try_create_dir(path: &str) -> core::result::Result<(), std::io::Error> { - let path = Path::new(path); + // Strip file:// or file:/ scheme if present to get the actual filesystem path + // Note: file:///path becomes file:/path after url.to_string(), so we need to handle both + let fs_path = if let Some(stripped) = path.strip_prefix("file://") { + // file:///path or file://host/path format + stripped + } else if let Some(stripped) = path.strip_prefix("file:") { + // file:/path format (from url.to_string() on file:///path) + // The path after "file:" should already start with "/" for absolute paths + stripped + } else { + path + }; + + let path = Path::new(fs_path); if !path.try_exists()? { create_dir_all(path)?; } @@ -529,6 +550,14 @@ impl ListingDatabase { self.inherit_storage_options(storage_options); } + // Set storage options provider if available + if self.storage_options_provider.is_some() { + write_params + .store_params + .get_or_insert_with(Default::default) + .storage_options_provider = self.storage_options_provider.clone(); + } + write_params.data_storage_version = self .new_table_config .data_storage_version @@ -569,6 +598,7 @@ impl ListingDatabase { namespace: namespace.clone(), index_cache_size: None, lance_read_params: None, + location: None, }; let req = (callback)(req); let table = self.open_table(req).await?; @@ -664,12 +694,17 @@ impl Database for ListingDatabase { } async fn create_table(&self, request: CreateTableRequest) -> Result> { - if !request.namespace.is_empty() { - return Err(Error::NotSupported { - message: "Namespace parameter is not supported for listing database. Only root namespace is supported.".into(), + // When namespace is not empty, location must be provided + if !request.namespace.is_empty() && request.location.is_none() { + return Err(Error::InvalidInput { + message: "Location must be provided when namespace is not empty".into(), }); } - let table_uri = self.table_uri(&request.name)?; + // Use provided location if available, otherwise derive from table name + let table_uri = request + .location + .clone() + .unwrap_or_else(|| self.table_uri(&request.name).unwrap()); let (storage_version_override, v2_manifest_override) = self.extract_storage_overrides(&request)?; @@ -682,6 +717,7 @@ impl Database for ListingDatabase { match NativeTable::create( &table_uri, &request.name, + request.namespace.clone(), request.data, self.store_wrapper.clone(), Some(write_params), @@ -753,6 +789,7 @@ impl Database for ListingDatabase { let cloned_table = NativeTable::open_with_params( &target_uri, &request.target_table_name, + request.target_namespace, self.store_wrapper.clone(), None, self.read_consistency_interval, @@ -763,12 +800,17 @@ impl Database for ListingDatabase { } async fn open_table(&self, mut request: OpenTableRequest) -> Result> { - if !request.namespace.is_empty() { - return Err(Error::NotSupported { - message: "Namespace parameter is not supported for listing database. Only root namespace is supported.".into(), + // When namespace is not empty, location must be provided + if !request.namespace.is_empty() && request.location.is_none() { + return Err(Error::InvalidInput { + message: "Location must be provided when namespace is not empty".into(), }); } - let table_uri = self.table_uri(&request.name)?; + // Use provided location if available, otherwise derive from table name + let table_uri = request + .location + .clone() + .unwrap_or_else(|| self.table_uri(&request.name).unwrap()); // Only modify the storage options if we actually have something to // inherit. There is a difference between storage_options=None and @@ -788,6 +830,16 @@ impl Database for ListingDatabase { self.inherit_storage_options(storage_options); } + // Set storage options provider if available + if self.storage_options_provider.is_some() { + request + .lance_read_params + .get_or_insert_with(Default::default) + .store_options + .get_or_insert_with(Default::default) + .storage_options_provider = self.storage_options_provider.clone(); + } + // Some ReadParams are exposed in the OpenTableBuilder, but we also // let the user provide their own ReadParams. // @@ -808,6 +860,7 @@ impl Database for ListingDatabase { NativeTable::open_with_params( &table_uri, &request.name, + request.namespace, self.store_wrapper.clone(), Some(read_params), self.read_consistency_interval, @@ -911,6 +964,7 @@ mod tests { data: CreateTableData::Empty(TableDefinition::new_from_schema(schema.clone())), mode: CreateTableMode::Create, write_options: Default::default(), + location: None, }) .await .unwrap(); @@ -974,6 +1028,7 @@ mod tests { data: CreateTableData::Data(reader), mode: CreateTableMode::Create, write_options: Default::default(), + location: None, }) .await .unwrap(); @@ -1031,6 +1086,7 @@ mod tests { data: CreateTableData::Empty(TableDefinition::new_from_schema(schema)), mode: CreateTableMode::Create, write_options: Default::default(), + location: None, }) .await .unwrap(); @@ -1065,6 +1121,7 @@ mod tests { data: CreateTableData::Empty(TableDefinition::new_from_schema(schema)), mode: CreateTableMode::Create, write_options: Default::default(), + location: None, }) .await .unwrap(); @@ -1103,6 +1160,7 @@ mod tests { data: CreateTableData::Empty(TableDefinition::new_from_schema(schema)), mode: CreateTableMode::Create, write_options: Default::default(), + location: None, }) .await .unwrap(); @@ -1141,6 +1199,7 @@ mod tests { data: CreateTableData::Empty(TableDefinition::new_from_schema(schema)), mode: CreateTableMode::Create, write_options: Default::default(), + location: None, }) .await .unwrap(); @@ -1194,6 +1253,7 @@ mod tests { data: CreateTableData::Empty(TableDefinition::new_from_schema(schema)), mode: CreateTableMode::Create, write_options: Default::default(), + location: None, }) .await .unwrap(); @@ -1250,6 +1310,7 @@ mod tests { data: CreateTableData::Data(reader), mode: CreateTableMode::Create, write_options: Default::default(), + location: None, }) .await .unwrap(); @@ -1334,6 +1395,7 @@ mod tests { data: CreateTableData::Data(reader), mode: CreateTableMode::Create, write_options: Default::default(), + location: None, }) .await .unwrap(); @@ -1419,6 +1481,7 @@ mod tests { data: CreateTableData::Data(reader), mode: CreateTableMode::Create, write_options: Default::default(), + location: None, }) .await .unwrap(); @@ -1511,6 +1574,7 @@ mod tests { data: CreateTableData::Data(reader), mode: CreateTableMode::Create, write_options: Default::default(), + location: None, }) .await .unwrap(); diff --git a/rust/lancedb/src/database/namespace.rs b/rust/lancedb/src/database/namespace.rs index ac1ee897..3e085622 100644 --- a/rust/lancedb/src/database/namespace.rs +++ b/rust/lancedb/src/database/namespace.rs @@ -7,6 +7,7 @@ use std::collections::HashMap; use std::sync::Arc; use async_trait::async_trait; +use lance_io::object_store::{LanceNamespaceStorageOptionsProvider, StorageOptionsProvider}; use lance_namespace::{ models::{ CreateEmptyTableRequest, CreateNamespaceRequest, DescribeTableRequest, @@ -16,13 +17,14 @@ use lance_namespace::{ }; use lance_namespace_impls::ConnectBuilder; -use crate::database::listing::ListingDatabase; +use crate::connection::ConnectRequest; +use crate::database::ReadConsistency; use crate::error::{Error, Result}; -use crate::{connection::ConnectRequest, database::ReadConsistency}; use super::{ - BaseTable, CloneTableRequest, CreateNamespaceRequest as DbCreateNamespaceRequest, - CreateTableMode, CreateTableRequest as DbCreateTableRequest, Database, + listing::ListingDatabase, BaseTable, CloneTableRequest, + CreateNamespaceRequest as DbCreateNamespaceRequest, CreateTableMode, + CreateTableRequest as DbCreateTableRequest, Database, DropNamespaceRequest as DbDropNamespaceRequest, ListNamespacesRequest as DbListNamespacesRequest, OpenTableRequest, TableNamesRequest, }; @@ -67,58 +69,6 @@ impl LanceNamespaceDatabase { uri: format!("namespace://{}", ns_impl), }) } - - /// Helper method to create a ListingDatabase from a table location - /// - /// This method: - /// 1. Validates that the location ends with .lance - /// 2. Extracts the parent directory from the location - /// 3. Creates a ListingDatabase at that parent directory - async fn create_listing_database( - &self, - table_name: &str, - location: &str, - additional_storage_options: Option>, - ) -> Result> { - let expected_suffix = format!("{}.lance", table_name); - if !location.ends_with(&expected_suffix) { - return Err(Error::Runtime { - message: format!( - "Invalid table location '{}': expected to end with '{}'", - location, expected_suffix - ), - }); - } - - let parent_dir = location - .rsplit_once('/') - .map(|(parent, _)| parent.to_string()) - .ok_or_else(|| Error::Runtime { - message: format!("Invalid table location '{}': no parent directory", location), - })?; - - let mut merged_storage_options = self.storage_options.clone(); - if let Some(opts) = additional_storage_options { - merged_storage_options.extend(opts); - } - - let connect_request = ConnectRequest { - uri: parent_dir, - options: merged_storage_options, - read_consistency_interval: self.read_consistency_interval, - session: self.session.clone(), - #[cfg(feature = "remote")] - client_config: Default::default(), - }; - - let listing_db = ListingDatabase::connect_with_options(&connect_request) - .await - .map_err(|e| Error::Runtime { - message: format!("Failed to create listing database: {}", e), - })?; - - Ok(Arc::new(listing_db)) - } } impl std::fmt::Debug for LanceNamespaceDatabase { @@ -136,6 +86,51 @@ impl std::fmt::Display for LanceNamespaceDatabase { } } +impl LanceNamespaceDatabase { + /// Create a temporary listing database for the given location + /// + /// Merges storage options with priority: connection < user < namespace + async fn create_listing_database( + &self, + location: &str, + table_id: Vec, + user_storage_options: Option<&HashMap>, + response_storage_options: Option<&HashMap>, + ) -> Result { + // Merge storage options: connection < user < namespace + let mut merged_storage_options = self.storage_options.clone(); + if let Some(opts) = user_storage_options { + merged_storage_options.extend(opts.clone()); + } + if let Some(opts) = response_storage_options { + merged_storage_options.extend(opts.clone()); + } + + let request = ConnectRequest { + uri: location.to_string(), + #[cfg(feature = "remote")] + client_config: Default::default(), + options: merged_storage_options, + read_consistency_interval: self.read_consistency_interval, + session: self.session.clone(), + }; + + let mut listing_db = ListingDatabase::connect_with_options(&request).await?; + + // Create storage options provider only if namespace returned storage options + // (not just user-provided options) + if response_storage_options.is_some() { + let provider = Arc::new(LanceNamespaceStorageOptionsProvider::new( + self.namespace.clone(), + table_id, + )) as Arc; + listing_db.storage_options_provider = Some(provider); + } + + Ok(listing_db) + } +} + #[async_trait] impl Database for LanceNamespaceDatabase { fn uri(&self) -> &str { @@ -241,6 +236,14 @@ impl Database for LanceNamespaceDatabase { } async fn create_table(&self, request: DbCreateTableRequest) -> Result> { + // Extract user-provided storage options from request + let user_storage_options = request + .write_options + .lance_write_params + .as_ref() + .and_then(|lwp| lwp.store_params.as_ref()) + .and_then(|sp| sp.storage_options.as_ref()); + let mut table_id = request.namespace.clone(); table_id.push(request.name.clone()); let describe_request = DescribeTableRequest { @@ -279,15 +282,21 @@ impl Database for LanceNamespaceDatabase { })?; let listing_db = self - .create_listing_database(&request.name, &location, response.storage_options) + .create_listing_database( + &location, + table_id.clone(), + user_storage_options, + response.storage_options.as_ref(), + ) .await?; return listing_db .open_table(OpenTableRequest { name: request.name.clone(), - namespace: vec![], + namespace: request.namespace.clone(), index_cache_size: None, lance_read_params: None, + location: Some(location), }) .await; } @@ -298,7 +307,7 @@ impl Database for LanceNamespaceDatabase { table_id.push(request.name.clone()); let create_empty_request = CreateEmptyTableRequest { - id: Some(table_id), + id: Some(table_id.clone()), location: None, properties: if self.storage_options.is_empty() { None @@ -323,28 +332,37 @@ impl Database for LanceNamespaceDatabase { let listing_db = self .create_listing_database( - &request.name, &location, - create_empty_response.storage_options, + table_id, + user_storage_options, + create_empty_response.storage_options.as_ref(), ) .await?; let create_request = DbCreateTableRequest { name: request.name, - namespace: vec![], + namespace: request.namespace, data: request.data, mode: request.mode, write_options: request.write_options, + location: Some(location), }; listing_db.create_table(create_request).await } async fn open_table(&self, request: OpenTableRequest) -> Result> { + // Extract user-provided storage options from request + let user_storage_options = request + .lance_read_params + .as_ref() + .and_then(|lrp| lrp.store_options.as_ref()) + .and_then(|so| so.storage_options.as_ref()); + let mut table_id = request.namespace.clone(); table_id.push(request.name.clone()); let describe_request = DescribeTableRequest { - id: Some(table_id), + id: Some(table_id.clone()), version: None, }; let response = self @@ -360,14 +378,20 @@ impl Database for LanceNamespaceDatabase { })?; let listing_db = self - .create_listing_database(&request.name, &location, response.storage_options) + .create_listing_database( + &location, + table_id, + user_storage_options, + response.storage_options.as_ref(), + ) .await?; let open_request = OpenTableRequest { name: request.name.clone(), - namespace: vec![], + namespace: request.namespace.clone(), index_cache_size: request.index_cache_size, lance_read_params: request.lance_read_params, + location: Some(location), }; listing_db.open_table(open_request).await } @@ -431,6 +455,7 @@ impl Database for LanceNamespaceDatabase { mod tests { use super::*; use crate::connect_namespace; + use crate::database::CreateNamespaceRequest; use crate::query::ExecutableQuery; use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator, StringArray}; use arrow_schema::{DataType, Field, Schema}; @@ -541,10 +566,18 @@ mod tests { .await .expect("Failed to connect to namespace"); - // Test: Create a table + // Create a child namespace first + conn.create_namespace(CreateNamespaceRequest { + namespace: vec!["test_ns".into()], + }) + .await + .expect("Failed to create namespace"); + + // Test: Create a table in the child namespace let test_data = create_test_data(); let table = conn .create_table("test_table", test_data) + .namespace(vec!["test_ns".into()]) .execute() .await .expect("Failed to create table"); @@ -562,9 +595,15 @@ mod tests { assert_eq!(results.len(), 1); assert_eq!(results[0].num_rows(), 5); - // Verify: Table appears in table_names + // Verify: Table namespace is correct + assert_eq!(table.namespace(), &["test_ns"]); + assert_eq!(table.name(), "test_table"); + assert_eq!(table.id(), "test_ns$test_table"); + + // Verify: Table appears in table_names for the child namespace let table_names = conn .table_names() + .namespace(vec!["test_ns".into()]) .execute() .await .expect("Failed to list tables"); @@ -586,10 +625,18 @@ mod tests { .await .expect("Failed to connect to namespace"); - // Create a table first + // Create a child namespace first + conn.create_namespace(CreateNamespaceRequest { + namespace: vec!["test_ns".into()], + }) + .await + .expect("Failed to create namespace"); + + // Create a table in child namespace let test_data = create_test_data(); let _table = conn .create_table("describe_test", test_data) + .namespace(vec!["test_ns".into()]) .execute() .await .expect("Failed to create table"); @@ -597,6 +644,7 @@ mod tests { // Test: Open the table (which internally uses describe_table) let opened_table = conn .open_table("describe_test") + .namespace(vec!["test_ns".into()]) .execute() .await .expect("Failed to open table"); @@ -619,6 +667,10 @@ mod tests { assert_eq!(schema.fields.len(), 2); assert_eq!(schema.field(0).name(), "id"); assert_eq!(schema.field(1).name(), "name"); + + // Verify namespace and id + assert_eq!(opened_table.namespace(), &["test_ns"]); + assert_eq!(opened_table.id(), "test_ns$describe_test"); } #[tokio::test] @@ -635,10 +687,18 @@ mod tests { .await .expect("Failed to connect to namespace"); - // Create initial table with 5 rows + // Create a child namespace first + conn.create_namespace(CreateNamespaceRequest { + namespace: vec!["test_ns".into()], + }) + .await + .expect("Failed to create namespace"); + + // Create initial table with 5 rows in child namespace let test_data1 = create_test_data(); let _table1 = conn .create_table("overwrite_test", test_data1) + .namespace(vec!["test_ns".into()]) .execute() .await .expect("Failed to create table"); @@ -665,6 +725,7 @@ mod tests { schema, ), ) + .namespace(vec!["test_ns".into()]) .mode(CreateTableMode::Overwrite) .execute() .await @@ -708,10 +769,18 @@ mod tests { .await .expect("Failed to connect to namespace"); - // Create initial table with test data + // Create a child namespace first + conn.create_namespace(CreateNamespaceRequest { + namespace: vec!["test_ns".into()], + }) + .await + .expect("Failed to create namespace"); + + // Create initial table with test data in child namespace let test_data1 = create_test_data(); let _table1 = conn .create_table("exist_ok_test", test_data1) + .namespace(vec!["test_ns".into()]) .execute() .await .expect("Failed to create table"); @@ -720,6 +789,7 @@ mod tests { let test_data2 = create_test_data(); let table2 = conn .create_table("exist_ok_test", test_data2) + .namespace(vec!["test_ns".into()]) .mode(CreateTableMode::exist_ok(|req| req)) .execute() .await @@ -753,25 +823,35 @@ mod tests { .await .expect("Failed to connect to namespace"); - // Create first table + // Create a child namespace first + conn.create_namespace(CreateNamespaceRequest { + namespace: vec!["test_ns".into()], + }) + .await + .expect("Failed to create namespace"); + + // Create first table in child namespace let test_data1 = create_test_data(); let _table1 = conn .create_table("table1", test_data1) + .namespace(vec!["test_ns".into()]) .execute() .await .expect("Failed to create first table"); - // Create second table + // Create second table in child namespace let test_data2 = create_test_data(); let _table2 = conn .create_table("table2", test_data2) + .namespace(vec!["test_ns".into()]) .execute() .await .expect("Failed to create second table"); - // Verify: Both tables appear in table list + // Verify: Both tables appear in table list for the child namespace let table_names = conn .table_names() + .namespace(vec!["test_ns".into()]) .execute() .await .expect("Failed to list tables"); @@ -782,12 +862,14 @@ mod tests { // Verify: Can open both tables let opened_table1 = conn .open_table("table1") + .namespace(vec!["test_ns".into()]) .execute() .await .expect("Failed to open table1"); let opened_table2 = conn .open_table("table2") + .namespace(vec!["test_ns".into()]) .execute() .await .expect("Failed to open table2"); @@ -820,8 +902,19 @@ mod tests { .await .expect("Failed to connect to namespace"); - // Test: Try to open a non-existent table - let result = conn.open_table("non_existent_table").execute().await; + // Create a child namespace first + conn.create_namespace(CreateNamespaceRequest { + namespace: vec!["test_ns".into()], + }) + .await + .expect("Failed to create namespace"); + + // Test: Try to open a non-existent table in the child namespace + let result = conn + .open_table("non_existent_table") + .namespace(vec!["test_ns".into()]) + .execute() + .await; // Verify: Should return an error assert!(result.is_err()); @@ -841,30 +934,40 @@ mod tests { .await .expect("Failed to connect to namespace"); - // Create a table first + // Create a child namespace first + conn.create_namespace(CreateNamespaceRequest { + namespace: vec!["test_ns".into()], + }) + .await + .expect("Failed to create namespace"); + + // Create a table in child namespace let test_data = create_test_data(); let _table = conn .create_table("drop_test", test_data) + .namespace(vec!["test_ns".into()]) .execute() .await .expect("Failed to create table"); - // Verify table exists + // Verify table exists in child namespace let table_names_before = conn .table_names() + .namespace(vec!["test_ns".into()]) .execute() .await .expect("Failed to list tables"); assert!(table_names_before.contains(&"drop_test".to_string())); // Test: Drop the table - conn.drop_table("drop_test", &[]) + conn.drop_table("drop_test", &["test_ns".into()]) .await .expect("Failed to drop table"); // Verify: Table no longer exists let table_names_after = conn .table_names() + .namespace(vec!["test_ns".into()]) .execute() .await .expect("Failed to list tables"); diff --git a/rust/lancedb/src/dataloader/permutation/shuffle.rs b/rust/lancedb/src/dataloader/permutation/shuffle.rs index 2c2b10fc..0d1ec43e 100644 --- a/rust/lancedb/src/dataloader/permutation/shuffle.rs +++ b/rust/lancedb/src/dataloader/permutation/shuffle.rs @@ -9,7 +9,7 @@ use futures::{StreamExt, TryStreamExt}; use lance::io::ObjectStore; use lance_core::{cache::LanceCache, utils::futures::FinallyStreamExt}; use lance_encoding::decoder::DecoderPlugins; -use lance_file::v2::{ +use lance_file::{ reader::{FileReader, FileReaderOptions}, writer::{FileWriter, FileWriterOptions}, }; diff --git a/rust/lancedb/src/remote/db.rs b/rust/lancedb/src/remote/db.rs index 2f99ce4b..09c2c60d 100644 --- a/rust/lancedb/src/remote/db.rs +++ b/rust/lancedb/src/remote/db.rs @@ -416,6 +416,7 @@ impl Database for RemoteDatabase { namespace: request.namespace.clone(), index_cache_size: None, lance_read_params: None, + location: None, }; let req = (callback)(req); self.open_table(req).await diff --git a/rust/lancedb/src/table.rs b/rust/lancedb/src/table.rs index 17d9fb74..6cb28286 100644 --- a/rust/lancedb/src/table.rs +++ b/rust/lancedb/src/table.rs @@ -511,7 +511,7 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync { /// Get the id of the table /// /// This is the namespace of the table concatenated with the name - /// separated by a dot (".") + /// separated by $ fn id(&self) -> &str; /// Get the arrow [Schema] of the table. async fn schema(&self) -> Result; @@ -734,6 +734,16 @@ impl Table { self.inner.name() } + /// Get the namespace of the table. + pub fn namespace(&self) -> &[String] { + self.inner.namespace() + } + + /// Get the ID of the table (namespace + name joined by '$'). + pub fn id(&self) -> &str { + self.inner.id() + } + /// Get the dataset of the table if it is a native table /// /// Returns None otherwise @@ -1468,6 +1478,8 @@ impl NativeTableExt for Arc { #[derive(Debug, Clone)] pub struct NativeTable { name: String, + namespace: Vec, + id: String, uri: String, pub(crate) dataset: dataset::DatasetConsistencyWrapper, // This comes from the connection options. We store here so we can pass down @@ -1507,7 +1519,7 @@ impl NativeTable { /// * A [NativeTable] object. pub async fn open(uri: &str) -> Result { let name = Self::get_table_name(uri)?; - Self::open_with_params(uri, &name, None, None, None).await + Self::open_with_params(uri, &name, vec![], None, None, None).await } /// Opens an existing Table @@ -1524,6 +1536,7 @@ impl NativeTable { pub async fn open_with_params( uri: &str, name: &str, + namespace: Vec, write_store_wrapper: Option>, params: Option, read_consistency_interval: Option, @@ -1548,9 +1561,12 @@ impl NativeTable { })?; let dataset = DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval); + let id = Self::build_id(&namespace, name); Ok(Self { name: name.to_string(), + namespace, + id, uri: uri.to_string(), dataset, read_consistency_interval, @@ -1573,12 +1589,24 @@ impl NativeTable { Ok(name.to_string()) } + fn build_id(namespace: &[String], name: &str) -> String { + if namespace.is_empty() { + name.to_string() + } else { + let mut parts = namespace.to_vec(); + parts.push(name.to_string()); + parts.join("$") + } + } + /// Creates a new Table /// /// # Arguments /// - /// * `uri` - The URI to the table. + /// * `uri` - The URI to the table. When namespace is not empty, the caller must + /// provide an explicit URI (location) rather than deriving it from the table name. /// * `name` The Table name + /// * `namespace` - The namespace path. When non-empty, an explicit URI must be provided. /// * `batches` RecordBatch to be saved in the database. /// * `params` - Write parameters. /// @@ -1588,6 +1616,7 @@ impl NativeTable { pub async fn create( uri: &str, name: &str, + namespace: Vec, batches: impl StreamingWriteSource, write_store_wrapper: Option>, params: Option, @@ -1614,8 +1643,12 @@ impl NativeTable { source => Error::Lance { source }, })?; + let id = Self::build_id(&namespace, name); + Ok(Self { name: name.to_string(), + namespace, + id, uri: uri.to_string(), dataset: DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval), read_consistency_interval, @@ -1625,6 +1658,7 @@ impl NativeTable { pub async fn create_empty( uri: &str, name: &str, + namespace: Vec, schema: SchemaRef, write_store_wrapper: Option>, params: Option, @@ -1634,6 +1668,7 @@ impl NativeTable { Self::create( uri, name, + namespace, batches, write_store_wrapper, params, @@ -2078,13 +2113,11 @@ impl BaseTable for NativeTable { } fn namespace(&self) -> &[String] { - // Native tables don't support namespaces yet, return empty slice for root namespace - &[] + &self.namespace } fn id(&self) -> &str { - // For native tables, id is same as name since no namespace support - self.name.as_str() + &self.id } async fn version(&self) -> Result { @@ -2884,7 +2917,7 @@ mod tests { let batches = make_test_batches(); let batches = Box::new(batches) as Box; - let table = NativeTable::create(uri, "test", batches, None, None, None) + let table = NativeTable::create(uri, "test", vec![], batches, None, None, None) .await .unwrap();