mirror of
https://github.com/lancedb/lancedb.git
synced 2026-01-05 19:32:56 +00:00
Compare commits
2 Commits
codex/upda
...
python-v0.
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
57ed302a61 | ||
|
|
e47f552a86 |
@@ -31,6 +31,7 @@ runs:
|
|||||||
with:
|
with:
|
||||||
command: build
|
command: build
|
||||||
working-directory: python
|
working-directory: python
|
||||||
|
docker-options: "-e PIP_EXTRA_INDEX_URL=https://pypi.fury.io/lancedb/"
|
||||||
target: x86_64-unknown-linux-gnu
|
target: x86_64-unknown-linux-gnu
|
||||||
manylinux: ${{ inputs.manylinux }}
|
manylinux: ${{ inputs.manylinux }}
|
||||||
args: ${{ inputs.args }}
|
args: ${{ inputs.args }}
|
||||||
|
|||||||
1
.github/workflows/docs.yml
vendored
1
.github/workflows/docs.yml
vendored
@@ -24,6 +24,7 @@ env:
|
|||||||
# according to: https://matklad.github.io/2021/09/04/fast-rust-builds.html
|
# according to: https://matklad.github.io/2021/09/04/fast-rust-builds.html
|
||||||
# CI builds are faster with incremental disabled.
|
# CI builds are faster with incremental disabled.
|
||||||
CARGO_INCREMENTAL: "0"
|
CARGO_INCREMENTAL: "0"
|
||||||
|
PIP_EXTRA_INDEX_URL: "https://pypi.fury.io/lancedb/"
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
# Single deploy job since we're just deploying
|
# Single deploy job since we're just deploying
|
||||||
|
|||||||
3
.github/workflows/pypi-publish.yml
vendored
3
.github/workflows/pypi-publish.yml
vendored
@@ -10,6 +10,9 @@ on:
|
|||||||
- .github/workflows/pypi-publish.yml
|
- .github/workflows/pypi-publish.yml
|
||||||
- Cargo.toml # Change in dependency frequently breaks builds
|
- Cargo.toml # Change in dependency frequently breaks builds
|
||||||
|
|
||||||
|
env:
|
||||||
|
PIP_EXTRA_INDEX_URL: "https://pypi.fury.io/lancedb/"
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
linux:
|
linux:
|
||||||
name: Python ${{ matrix.config.platform }} manylinux${{ matrix.config.manylinux }}
|
name: Python ${{ matrix.config.platform }} manylinux${{ matrix.config.manylinux }}
|
||||||
|
|||||||
1
.github/workflows/python.yml
vendored
1
.github/workflows/python.yml
vendored
@@ -18,6 +18,7 @@ env:
|
|||||||
# Color output for pytest is off by default.
|
# Color output for pytest is off by default.
|
||||||
PYTEST_ADDOPTS: "--color=yes"
|
PYTEST_ADDOPTS: "--color=yes"
|
||||||
FORCE_COLOR: "1"
|
FORCE_COLOR: "1"
|
||||||
|
PIP_EXTRA_INDEX_URL: "https://pypi.fury.io/lancedb/"
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
lint:
|
lint:
|
||||||
|
|||||||
73
Cargo.lock
generated
73
Cargo.lock
generated
@@ -3032,8 +3032,8 @@ checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "fsst"
|
name = "fsst"
|
||||||
version = "0.40.0-beta.2"
|
version = "1.0.0-beta.2"
|
||||||
source = "git+https://github.com/lancedb/lance.git?tag=v0.40.0-beta.2#9f368faab85fa41799c42e05b07596d060da3ebc"
|
source = "git+https://github.com/lancedb/lance.git?tag=v1.0.0-beta.2#254a8217ac26666585983aa7ec8c4234f4c3f99f"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow-array",
|
"arrow-array",
|
||||||
"rand 0.9.2",
|
"rand 0.9.2",
|
||||||
@@ -4217,8 +4217,8 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lance"
|
name = "lance"
|
||||||
version = "0.40.0-beta.2"
|
version = "1.0.0-beta.2"
|
||||||
source = "git+https://github.com/lancedb/lance.git?tag=v0.40.0-beta.2#9f368faab85fa41799c42e05b07596d060da3ebc"
|
source = "git+https://github.com/lancedb/lance.git?tag=v1.0.0-beta.2#254a8217ac26666585983aa7ec8c4234f4c3f99f"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow",
|
"arrow",
|
||||||
"arrow-arith",
|
"arrow-arith",
|
||||||
@@ -4282,8 +4282,8 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lance-arrow"
|
name = "lance-arrow"
|
||||||
version = "0.40.0-beta.2"
|
version = "1.0.0-beta.2"
|
||||||
source = "git+https://github.com/lancedb/lance.git?tag=v0.40.0-beta.2#9f368faab85fa41799c42e05b07596d060da3ebc"
|
source = "git+https://github.com/lancedb/lance.git?tag=v1.0.0-beta.2#254a8217ac26666585983aa7ec8c4234f4c3f99f"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow-array",
|
"arrow-array",
|
||||||
"arrow-buffer",
|
"arrow-buffer",
|
||||||
@@ -4301,8 +4301,8 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lance-bitpacking"
|
name = "lance-bitpacking"
|
||||||
version = "0.40.0-beta.2"
|
version = "1.0.0-beta.2"
|
||||||
source = "git+https://github.com/lancedb/lance.git?tag=v0.40.0-beta.2#9f368faab85fa41799c42e05b07596d060da3ebc"
|
source = "git+https://github.com/lancedb/lance.git?tag=v1.0.0-beta.2#254a8217ac26666585983aa7ec8c4234f4c3f99f"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrayref",
|
"arrayref",
|
||||||
"paste",
|
"paste",
|
||||||
@@ -4311,8 +4311,8 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lance-core"
|
name = "lance-core"
|
||||||
version = "0.40.0-beta.2"
|
version = "1.0.0-beta.2"
|
||||||
source = "git+https://github.com/lancedb/lance.git?tag=v0.40.0-beta.2#9f368faab85fa41799c42e05b07596d060da3ebc"
|
source = "git+https://github.com/lancedb/lance.git?tag=v1.0.0-beta.2#254a8217ac26666585983aa7ec8c4234f4c3f99f"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow-array",
|
"arrow-array",
|
||||||
"arrow-buffer",
|
"arrow-buffer",
|
||||||
@@ -4348,8 +4348,8 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lance-datafusion"
|
name = "lance-datafusion"
|
||||||
version = "0.40.0-beta.2"
|
version = "1.0.0-beta.2"
|
||||||
source = "git+https://github.com/lancedb/lance.git?tag=v0.40.0-beta.2#9f368faab85fa41799c42e05b07596d060da3ebc"
|
source = "git+https://github.com/lancedb/lance.git?tag=v1.0.0-beta.2#254a8217ac26666585983aa7ec8c4234f4c3f99f"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow",
|
"arrow",
|
||||||
"arrow-array",
|
"arrow-array",
|
||||||
@@ -4378,8 +4378,8 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lance-datagen"
|
name = "lance-datagen"
|
||||||
version = "0.40.0-beta.2"
|
version = "1.0.0-beta.2"
|
||||||
source = "git+https://github.com/lancedb/lance.git?tag=v0.40.0-beta.2#9f368faab85fa41799c42e05b07596d060da3ebc"
|
source = "git+https://github.com/lancedb/lance.git?tag=v1.0.0-beta.2#254a8217ac26666585983aa7ec8c4234f4c3f99f"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow",
|
"arrow",
|
||||||
"arrow-array",
|
"arrow-array",
|
||||||
@@ -4396,8 +4396,8 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lance-encoding"
|
name = "lance-encoding"
|
||||||
version = "0.40.0-beta.2"
|
version = "1.0.0-beta.2"
|
||||||
source = "git+https://github.com/lancedb/lance.git?tag=v0.40.0-beta.2#9f368faab85fa41799c42e05b07596d060da3ebc"
|
source = "git+https://github.com/lancedb/lance.git?tag=v1.0.0-beta.2#254a8217ac26666585983aa7ec8c4234f4c3f99f"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow-arith",
|
"arrow-arith",
|
||||||
"arrow-array",
|
"arrow-array",
|
||||||
@@ -4434,8 +4434,8 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lance-file"
|
name = "lance-file"
|
||||||
version = "0.40.0-beta.2"
|
version = "1.0.0-beta.2"
|
||||||
source = "git+https://github.com/lancedb/lance.git?tag=v0.40.0-beta.2#9f368faab85fa41799c42e05b07596d060da3ebc"
|
source = "git+https://github.com/lancedb/lance.git?tag=v1.0.0-beta.2#254a8217ac26666585983aa7ec8c4234f4c3f99f"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow-arith",
|
"arrow-arith",
|
||||||
"arrow-array",
|
"arrow-array",
|
||||||
@@ -4467,8 +4467,8 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lance-index"
|
name = "lance-index"
|
||||||
version = "0.40.0-beta.2"
|
version = "1.0.0-beta.2"
|
||||||
source = "git+https://github.com/lancedb/lance.git?tag=v0.40.0-beta.2#9f368faab85fa41799c42e05b07596d060da3ebc"
|
source = "git+https://github.com/lancedb/lance.git?tag=v1.0.0-beta.2#254a8217ac26666585983aa7ec8c4234f4c3f99f"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow",
|
"arrow",
|
||||||
"arrow-arith",
|
"arrow-arith",
|
||||||
@@ -4529,8 +4529,8 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lance-io"
|
name = "lance-io"
|
||||||
version = "0.40.0-beta.2"
|
version = "1.0.0-beta.2"
|
||||||
source = "git+https://github.com/lancedb/lance.git?tag=v0.40.0-beta.2#9f368faab85fa41799c42e05b07596d060da3ebc"
|
source = "git+https://github.com/lancedb/lance.git?tag=v1.0.0-beta.2#254a8217ac26666585983aa7ec8c4234f4c3f99f"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow",
|
"arrow",
|
||||||
"arrow-arith",
|
"arrow-arith",
|
||||||
@@ -4570,8 +4570,8 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lance-linalg"
|
name = "lance-linalg"
|
||||||
version = "0.40.0-beta.2"
|
version = "1.0.0-beta.2"
|
||||||
source = "git+https://github.com/lancedb/lance.git?tag=v0.40.0-beta.2#9f368faab85fa41799c42e05b07596d060da3ebc"
|
source = "git+https://github.com/lancedb/lance.git?tag=v1.0.0-beta.2#254a8217ac26666585983aa7ec8c4234f4c3f99f"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow-array",
|
"arrow-array",
|
||||||
"arrow-buffer",
|
"arrow-buffer",
|
||||||
@@ -4587,8 +4587,8 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lance-namespace"
|
name = "lance-namespace"
|
||||||
version = "0.40.0-beta.2"
|
version = "1.0.0-beta.2"
|
||||||
source = "git+https://github.com/lancedb/lance.git?tag=v0.40.0-beta.2#9f368faab85fa41799c42e05b07596d060da3ebc"
|
source = "git+https://github.com/lancedb/lance.git?tag=v1.0.0-beta.2#254a8217ac26666585983aa7ec8c4234f4c3f99f"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow",
|
"arrow",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
@@ -4600,22 +4600,27 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lance-namespace-impls"
|
name = "lance-namespace-impls"
|
||||||
version = "0.40.0-beta.2"
|
version = "1.0.0-beta.2"
|
||||||
source = "git+https://github.com/lancedb/lance.git?tag=v0.40.0-beta.2#9f368faab85fa41799c42e05b07596d060da3ebc"
|
source = "git+https://github.com/lancedb/lance.git?tag=v1.0.0-beta.2#254a8217ac26666585983aa7ec8c4234f4c3f99f"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow",
|
"arrow",
|
||||||
"arrow-ipc",
|
"arrow-ipc",
|
||||||
"arrow-schema",
|
"arrow-schema",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"bytes",
|
"bytes",
|
||||||
|
"futures",
|
||||||
"lance",
|
"lance",
|
||||||
"lance-core",
|
"lance-core",
|
||||||
|
"lance-index",
|
||||||
"lance-io",
|
"lance-io",
|
||||||
"lance-namespace",
|
"lance-namespace",
|
||||||
|
"log",
|
||||||
"object_store",
|
"object_store",
|
||||||
|
"rand 0.9.2",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"snafu",
|
"snafu",
|
||||||
|
"tokio",
|
||||||
"url",
|
"url",
|
||||||
]
|
]
|
||||||
|
|
||||||
@@ -4634,8 +4639,8 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lance-table"
|
name = "lance-table"
|
||||||
version = "0.40.0-beta.2"
|
version = "1.0.0-beta.2"
|
||||||
source = "git+https://github.com/lancedb/lance.git?tag=v0.40.0-beta.2#9f368faab85fa41799c42e05b07596d060da3ebc"
|
source = "git+https://github.com/lancedb/lance.git?tag=v1.0.0-beta.2#254a8217ac26666585983aa7ec8c4234f4c3f99f"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow",
|
"arrow",
|
||||||
"arrow-array",
|
"arrow-array",
|
||||||
@@ -4674,8 +4679,8 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lance-testing"
|
name = "lance-testing"
|
||||||
version = "0.40.0-beta.2"
|
version = "1.0.0-beta.2"
|
||||||
source = "git+https://github.com/lancedb/lance.git?tag=v0.40.0-beta.2#9f368faab85fa41799c42e05b07596d060da3ebc"
|
source = "git+https://github.com/lancedb/lance.git?tag=v1.0.0-beta.2#254a8217ac26666585983aa7ec8c4234f4c3f99f"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow-array",
|
"arrow-array",
|
||||||
"arrow-schema",
|
"arrow-schema",
|
||||||
@@ -4807,11 +4812,14 @@ dependencies = [
|
|||||||
"async-trait",
|
"async-trait",
|
||||||
"env_logger",
|
"env_logger",
|
||||||
"futures",
|
"futures",
|
||||||
|
"lance-core",
|
||||||
|
"lance-io",
|
||||||
"lancedb",
|
"lancedb",
|
||||||
"pin-project",
|
"pin-project",
|
||||||
"pyo3",
|
"pyo3",
|
||||||
"pyo3-async-runtimes",
|
"pyo3-async-runtimes",
|
||||||
"pyo3-build-config",
|
"pyo3-build-config",
|
||||||
|
"snafu",
|
||||||
"tokio",
|
"tokio",
|
||||||
]
|
]
|
||||||
|
|
||||||
@@ -8408,6 +8416,7 @@ dependencies = [
|
|||||||
"io-uring",
|
"io-uring",
|
||||||
"libc",
|
"libc",
|
||||||
"mio",
|
"mio",
|
||||||
|
"parking_lot",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"signal-hook-registry",
|
"signal-hook-registry",
|
||||||
"slab",
|
"slab",
|
||||||
|
|||||||
28
Cargo.toml
28
Cargo.toml
@@ -15,20 +15,20 @@ categories = ["database-implementations"]
|
|||||||
rust-version = "1.78.0"
|
rust-version = "1.78.0"
|
||||||
|
|
||||||
[workspace.dependencies]
|
[workspace.dependencies]
|
||||||
lance = { "version" = "=0.40.0-beta.2", default-features = false, "tag" = "v0.40.0-beta.2", "git" = "https://github.com/lancedb/lance.git" }
|
lance = { "version" = "=1.0.0-beta.2", default-features = false, "tag" = "v1.0.0-beta.2", "git" = "https://github.com/lancedb/lance.git" }
|
||||||
lance-core = { "version" = "=0.40.0-beta.2", "tag" = "v0.40.0-beta.2", "git" = "https://github.com/lancedb/lance.git" }
|
lance-core = { "version" = "=1.0.0-beta.2", "tag" = "v1.0.0-beta.2", "git" = "https://github.com/lancedb/lance.git" }
|
||||||
lance-datagen = { "version" = "=0.40.0-beta.2", "tag" = "v0.40.0-beta.2", "git" = "https://github.com/lancedb/lance.git" }
|
lance-datagen = { "version" = "=1.0.0-beta.2", "tag" = "v1.0.0-beta.2", "git" = "https://github.com/lancedb/lance.git" }
|
||||||
lance-file = { "version" = "=0.40.0-beta.2", "tag" = "v0.40.0-beta.2", "git" = "https://github.com/lancedb/lance.git" }
|
lance-file = { "version" = "=1.0.0-beta.2", "tag" = "v1.0.0-beta.2", "git" = "https://github.com/lancedb/lance.git" }
|
||||||
lance-io = { "version" = "=0.40.0-beta.2", default-features = false, "tag" = "v0.40.0-beta.2", "git" = "https://github.com/lancedb/lance.git" }
|
lance-io = { "version" = "=1.0.0-beta.2", default-features = false, "tag" = "v1.0.0-beta.2", "git" = "https://github.com/lancedb/lance.git" }
|
||||||
lance-index = { "version" = "=0.40.0-beta.2", "tag" = "v0.40.0-beta.2", "git" = "https://github.com/lancedb/lance.git" }
|
lance-index = { "version" = "=1.0.0-beta.2", "tag" = "v1.0.0-beta.2", "git" = "https://github.com/lancedb/lance.git" }
|
||||||
lance-linalg = { "version" = "=0.40.0-beta.2", "tag" = "v0.40.0-beta.2", "git" = "https://github.com/lancedb/lance.git" }
|
lance-linalg = { "version" = "=1.0.0-beta.2", "tag" = "v1.0.0-beta.2", "git" = "https://github.com/lancedb/lance.git" }
|
||||||
lance-namespace = { "version" = "=0.40.0-beta.2", "tag" = "v0.40.0-beta.2", "git" = "https://github.com/lancedb/lance.git" }
|
lance-namespace = { "version" = "=1.0.0-beta.2", "tag" = "v1.0.0-beta.2", "git" = "https://github.com/lancedb/lance.git" }
|
||||||
lance-namespace-impls = { "version" = "=0.40.0-beta.2", "features" = ["dir-aws", "dir-gcp", "dir-azure", "dir-oss", "rest"], "tag" = "v0.40.0-beta.2", "git" = "https://github.com/lancedb/lance.git" }
|
lance-namespace-impls = { "version" = "=1.0.0-beta.2", "features" = ["dir-aws", "dir-gcp", "dir-azure", "dir-oss", "rest"], "tag" = "v1.0.0-beta.2", "git" = "https://github.com/lancedb/lance.git" }
|
||||||
lance-table = { "version" = "=0.40.0-beta.2", "tag" = "v0.40.0-beta.2", "git" = "https://github.com/lancedb/lance.git" }
|
lance-table = { "version" = "=1.0.0-beta.2", "tag" = "v1.0.0-beta.2", "git" = "https://github.com/lancedb/lance.git" }
|
||||||
lance-testing = { "version" = "=0.40.0-beta.2", "tag" = "v0.40.0-beta.2", "git" = "https://github.com/lancedb/lance.git" }
|
lance-testing = { "version" = "=1.0.0-beta.2", "tag" = "v1.0.0-beta.2", "git" = "https://github.com/lancedb/lance.git" }
|
||||||
lance-datafusion = { "version" = "=0.40.0-beta.2", "tag" = "v0.40.0-beta.2", "git" = "https://github.com/lancedb/lance.git" }
|
lance-datafusion = { "version" = "=1.0.0-beta.2", "tag" = "v1.0.0-beta.2", "git" = "https://github.com/lancedb/lance.git" }
|
||||||
lance-encoding = { "version" = "=0.40.0-beta.2", "tag" = "v0.40.0-beta.2", "git" = "https://github.com/lancedb/lance.git" }
|
lance-encoding = { "version" = "=1.0.0-beta.2", "tag" = "v1.0.0-beta.2", "git" = "https://github.com/lancedb/lance.git" }
|
||||||
lance-arrow = { "version" = "=0.40.0-beta.2", "tag" = "v0.40.0-beta.2", "git" = "https://github.com/lancedb/lance.git" }
|
lance-arrow = { "version" = "=1.0.0-beta.2", "tag" = "v1.0.0-beta.2", "git" = "https://github.com/lancedb/lance.git" }
|
||||||
ahash = "0.8"
|
ahash = "0.8"
|
||||||
# Note that this one does not include pyarrow
|
# Note that this one does not include pyarrow
|
||||||
arrow = { version = "56.2", optional = false }
|
arrow = { version = "56.2", optional = false }
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
[tool.bumpversion]
|
[tool.bumpversion]
|
||||||
current_version = "0.25.3"
|
current_version = "0.25.4-beta.0"
|
||||||
parse = """(?x)
|
parse = """(?x)
|
||||||
(?P<major>0|[1-9]\\d*)\\.
|
(?P<major>0|[1-9]\\d*)\\.
|
||||||
(?P<minor>0|[1-9]\\d*)\\.
|
(?P<minor>0|[1-9]\\d*)\\.
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "lancedb-python"
|
name = "lancedb-python"
|
||||||
version = "0.25.3"
|
version = "0.25.4-beta.0"
|
||||||
edition.workspace = true
|
edition.workspace = true
|
||||||
description = "Python bindings for LanceDB"
|
description = "Python bindings for LanceDB"
|
||||||
license.workspace = true
|
license.workspace = true
|
||||||
@@ -17,6 +17,8 @@ crate-type = ["cdylib"]
|
|||||||
arrow = { version = "56.2", features = ["pyarrow"] }
|
arrow = { version = "56.2", features = ["pyarrow"] }
|
||||||
async-trait = "0.1"
|
async-trait = "0.1"
|
||||||
lancedb = { path = "../rust/lancedb", default-features = false }
|
lancedb = { path = "../rust/lancedb", default-features = false }
|
||||||
|
lance-core.workspace = true
|
||||||
|
lance-io.workspace = true
|
||||||
env_logger.workspace = true
|
env_logger.workspace = true
|
||||||
pyo3 = { version = "0.25", features = ["extension-module", "abi3-py39"] }
|
pyo3 = { version = "0.25", features = ["extension-module", "abi3-py39"] }
|
||||||
pyo3-async-runtimes = { version = "0.25", features = [
|
pyo3-async-runtimes = { version = "0.25", features = [
|
||||||
@@ -25,6 +27,7 @@ pyo3-async-runtimes = { version = "0.25", features = [
|
|||||||
] }
|
] }
|
||||||
pin-project = "1.1.5"
|
pin-project = "1.1.5"
|
||||||
futures.workspace = true
|
futures.workspace = true
|
||||||
|
snafu.workspace = true
|
||||||
tokio = { version = "1.40", features = ["sync"] }
|
tokio = { version = "1.40", features = ["sync"] }
|
||||||
|
|
||||||
[build-dependencies]
|
[build-dependencies]
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ dependencies = [
|
|||||||
"pyarrow>=16",
|
"pyarrow>=16",
|
||||||
"pydantic>=1.10",
|
"pydantic>=1.10",
|
||||||
"tqdm>=4.27.0",
|
"tqdm>=4.27.0",
|
||||||
"lance-namespace>=0.0.16"
|
"lance-namespace>=0.0.21"
|
||||||
]
|
]
|
||||||
description = "lancedb"
|
description = "lancedb"
|
||||||
authors = [{ name = "LanceDB Devs", email = "dev@lancedb.com" }]
|
authors = [{ name = "LanceDB Devs", email = "dev@lancedb.com" }]
|
||||||
@@ -59,7 +59,7 @@ tests = [
|
|||||||
"polars>=0.19, <=1.3.0",
|
"polars>=0.19, <=1.3.0",
|
||||||
"tantivy",
|
"tantivy",
|
||||||
"pyarrow-stubs",
|
"pyarrow-stubs",
|
||||||
"pylance>=0.25",
|
"pylance>=1.0.0b2",
|
||||||
"requests",
|
"requests",
|
||||||
"datafusion",
|
"datafusion",
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ __version__ = importlib.metadata.version("lancedb")
|
|||||||
from ._lancedb import connect as lancedb_connect
|
from ._lancedb import connect as lancedb_connect
|
||||||
from .common import URI, sanitize_uri
|
from .common import URI, sanitize_uri
|
||||||
from .db import AsyncConnection, DBConnection, LanceDBConnection
|
from .db import AsyncConnection, DBConnection, LanceDBConnection
|
||||||
|
from .io import StorageOptionsProvider
|
||||||
from .remote import ClientConfig
|
from .remote import ClientConfig
|
||||||
from .remote.db import RemoteDBConnection
|
from .remote.db import RemoteDBConnection
|
||||||
from .schema import vector
|
from .schema import vector
|
||||||
@@ -233,6 +234,7 @@ __all__ = [
|
|||||||
"LanceNamespaceDBConnection",
|
"LanceNamespaceDBConnection",
|
||||||
"RemoteDBConnection",
|
"RemoteDBConnection",
|
||||||
"Session",
|
"Session",
|
||||||
|
"StorageOptionsProvider",
|
||||||
"Table",
|
"Table",
|
||||||
"__version__",
|
"__version__",
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ from typing import Dict, List, Optional, Tuple, Any, TypedDict, Union, Literal
|
|||||||
import pyarrow as pa
|
import pyarrow as pa
|
||||||
|
|
||||||
from .index import BTree, IvfFlat, IvfPq, Bitmap, LabelList, HnswPq, HnswSq, FTS
|
from .index import BTree, IvfFlat, IvfPq, Bitmap, LabelList, HnswPq, HnswSq, FTS
|
||||||
|
from .io import StorageOptionsProvider
|
||||||
from .remote import ClientConfig
|
from .remote import ClientConfig
|
||||||
|
|
||||||
class Session:
|
class Session:
|
||||||
@@ -44,6 +45,8 @@ class Connection(object):
|
|||||||
data: pa.RecordBatchReader,
|
data: pa.RecordBatchReader,
|
||||||
namespace: List[str] = [],
|
namespace: List[str] = [],
|
||||||
storage_options: Optional[Dict[str, str]] = None,
|
storage_options: Optional[Dict[str, str]] = None,
|
||||||
|
storage_options_provider: Optional[StorageOptionsProvider] = None,
|
||||||
|
location: Optional[str] = None,
|
||||||
) -> Table: ...
|
) -> Table: ...
|
||||||
async def create_empty_table(
|
async def create_empty_table(
|
||||||
self,
|
self,
|
||||||
@@ -52,13 +55,17 @@ class Connection(object):
|
|||||||
schema: pa.Schema,
|
schema: pa.Schema,
|
||||||
namespace: List[str] = [],
|
namespace: List[str] = [],
|
||||||
storage_options: Optional[Dict[str, str]] = None,
|
storage_options: Optional[Dict[str, str]] = None,
|
||||||
|
storage_options_provider: Optional[StorageOptionsProvider] = None,
|
||||||
|
location: Optional[str] = None,
|
||||||
) -> Table: ...
|
) -> Table: ...
|
||||||
async def open_table(
|
async def open_table(
|
||||||
self,
|
self,
|
||||||
name: str,
|
name: str,
|
||||||
namespace: List[str] = [],
|
namespace: List[str] = [],
|
||||||
storage_options: Optional[Dict[str, str]] = None,
|
storage_options: Optional[Dict[str, str]] = None,
|
||||||
|
storage_options_provider: Optional[StorageOptionsProvider] = None,
|
||||||
index_cache_size: Optional[int] = None,
|
index_cache_size: Optional[int] = None,
|
||||||
|
location: Optional[str] = None,
|
||||||
) -> Table: ...
|
) -> Table: ...
|
||||||
async def clone_table(
|
async def clone_table(
|
||||||
self,
|
self,
|
||||||
|
|||||||
@@ -45,6 +45,7 @@ if TYPE_CHECKING:
|
|||||||
from ._lancedb import Connection as LanceDbConnection
|
from ._lancedb import Connection as LanceDbConnection
|
||||||
from .common import DATA, URI
|
from .common import DATA, URI
|
||||||
from .embeddings import EmbeddingFunctionConfig
|
from .embeddings import EmbeddingFunctionConfig
|
||||||
|
from .io import StorageOptionsProvider
|
||||||
from ._lancedb import Session
|
from ._lancedb import Session
|
||||||
|
|
||||||
|
|
||||||
@@ -143,6 +144,7 @@ class DBConnection(EnforceOverrides):
|
|||||||
*,
|
*,
|
||||||
namespace: List[str] = [],
|
namespace: List[str] = [],
|
||||||
storage_options: Optional[Dict[str, str]] = None,
|
storage_options: Optional[Dict[str, str]] = None,
|
||||||
|
storage_options_provider: Optional["StorageOptionsProvider"] = None,
|
||||||
data_storage_version: Optional[str] = None,
|
data_storage_version: Optional[str] = None,
|
||||||
enable_v2_manifest_paths: Optional[bool] = None,
|
enable_v2_manifest_paths: Optional[bool] = None,
|
||||||
) -> Table:
|
) -> Table:
|
||||||
@@ -308,6 +310,7 @@ class DBConnection(EnforceOverrides):
|
|||||||
*,
|
*,
|
||||||
namespace: List[str] = [],
|
namespace: List[str] = [],
|
||||||
storage_options: Optional[Dict[str, str]] = None,
|
storage_options: Optional[Dict[str, str]] = None,
|
||||||
|
storage_options_provider: Optional["StorageOptionsProvider"] = None,
|
||||||
index_cache_size: Optional[int] = None,
|
index_cache_size: Optional[int] = None,
|
||||||
) -> Table:
|
) -> Table:
|
||||||
"""Open a Lance Table in the database.
|
"""Open a Lance Table in the database.
|
||||||
@@ -463,6 +466,12 @@ class LanceDBConnection(DBConnection):
|
|||||||
is_local = isinstance(uri, Path) or scheme == "file"
|
is_local = isinstance(uri, Path) or scheme == "file"
|
||||||
if is_local:
|
if is_local:
|
||||||
if isinstance(uri, str):
|
if isinstance(uri, str):
|
||||||
|
# Strip file:// or file:/ scheme if present
|
||||||
|
# file:///path becomes file:/path after URL normalization
|
||||||
|
if uri.startswith("file://"):
|
||||||
|
uri = uri[7:] # Remove "file://"
|
||||||
|
elif uri.startswith("file:/"):
|
||||||
|
uri = uri[5:] # Remove "file:"
|
||||||
uri = Path(uri)
|
uri = Path(uri)
|
||||||
uri = uri.expanduser().absolute()
|
uri = uri.expanduser().absolute()
|
||||||
Path(uri).mkdir(parents=True, exist_ok=True)
|
Path(uri).mkdir(parents=True, exist_ok=True)
|
||||||
@@ -625,6 +634,7 @@ class LanceDBConnection(DBConnection):
|
|||||||
*,
|
*,
|
||||||
namespace: List[str] = [],
|
namespace: List[str] = [],
|
||||||
storage_options: Optional[Dict[str, str]] = None,
|
storage_options: Optional[Dict[str, str]] = None,
|
||||||
|
storage_options_provider: Optional["StorageOptionsProvider"] = None,
|
||||||
data_storage_version: Optional[str] = None,
|
data_storage_version: Optional[str] = None,
|
||||||
enable_v2_manifest_paths: Optional[bool] = None,
|
enable_v2_manifest_paths: Optional[bool] = None,
|
||||||
) -> LanceTable:
|
) -> LanceTable:
|
||||||
@@ -655,6 +665,7 @@ class LanceDBConnection(DBConnection):
|
|||||||
embedding_functions=embedding_functions,
|
embedding_functions=embedding_functions,
|
||||||
namespace=namespace,
|
namespace=namespace,
|
||||||
storage_options=storage_options,
|
storage_options=storage_options,
|
||||||
|
storage_options_provider=storage_options_provider,
|
||||||
)
|
)
|
||||||
return tbl
|
return tbl
|
||||||
|
|
||||||
@@ -665,6 +676,7 @@ class LanceDBConnection(DBConnection):
|
|||||||
*,
|
*,
|
||||||
namespace: List[str] = [],
|
namespace: List[str] = [],
|
||||||
storage_options: Optional[Dict[str, str]] = None,
|
storage_options: Optional[Dict[str, str]] = None,
|
||||||
|
storage_options_provider: Optional["StorageOptionsProvider"] = None,
|
||||||
index_cache_size: Optional[int] = None,
|
index_cache_size: Optional[int] = None,
|
||||||
) -> LanceTable:
|
) -> LanceTable:
|
||||||
"""Open a table in the database.
|
"""Open a table in the database.
|
||||||
@@ -696,6 +708,7 @@ class LanceDBConnection(DBConnection):
|
|||||||
name,
|
name,
|
||||||
namespace=namespace,
|
namespace=namespace,
|
||||||
storage_options=storage_options,
|
storage_options=storage_options,
|
||||||
|
storage_options_provider=storage_options_provider,
|
||||||
index_cache_size=index_cache_size,
|
index_cache_size=index_cache_size,
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -977,9 +990,11 @@ class AsyncConnection(object):
|
|||||||
on_bad_vectors: Optional[str] = None,
|
on_bad_vectors: Optional[str] = None,
|
||||||
fill_value: Optional[float] = None,
|
fill_value: Optional[float] = None,
|
||||||
storage_options: Optional[Dict[str, str]] = None,
|
storage_options: Optional[Dict[str, str]] = None,
|
||||||
|
storage_options_provider: Optional["StorageOptionsProvider"] = None,
|
||||||
*,
|
*,
|
||||||
namespace: List[str] = [],
|
namespace: List[str] = [],
|
||||||
embedding_functions: Optional[List[EmbeddingFunctionConfig]] = None,
|
embedding_functions: Optional[List[EmbeddingFunctionConfig]] = None,
|
||||||
|
location: Optional[str] = None,
|
||||||
) -> AsyncTable:
|
) -> AsyncTable:
|
||||||
"""Create an [AsyncTable][lancedb.table.AsyncTable] in the database.
|
"""Create an [AsyncTable][lancedb.table.AsyncTable] in the database.
|
||||||
|
|
||||||
@@ -1170,6 +1185,8 @@ class AsyncConnection(object):
|
|||||||
schema,
|
schema,
|
||||||
namespace=namespace,
|
namespace=namespace,
|
||||||
storage_options=storage_options,
|
storage_options=storage_options,
|
||||||
|
storage_options_provider=storage_options_provider,
|
||||||
|
location=location,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
data = data_to_reader(data, schema)
|
data = data_to_reader(data, schema)
|
||||||
@@ -1179,6 +1196,8 @@ class AsyncConnection(object):
|
|||||||
data,
|
data,
|
||||||
namespace=namespace,
|
namespace=namespace,
|
||||||
storage_options=storage_options,
|
storage_options=storage_options,
|
||||||
|
storage_options_provider=storage_options_provider,
|
||||||
|
location=location,
|
||||||
)
|
)
|
||||||
|
|
||||||
return AsyncTable(new_table)
|
return AsyncTable(new_table)
|
||||||
@@ -1189,7 +1208,9 @@ class AsyncConnection(object):
|
|||||||
*,
|
*,
|
||||||
namespace: List[str] = [],
|
namespace: List[str] = [],
|
||||||
storage_options: Optional[Dict[str, str]] = None,
|
storage_options: Optional[Dict[str, str]] = None,
|
||||||
|
storage_options_provider: Optional["StorageOptionsProvider"] = None,
|
||||||
index_cache_size: Optional[int] = None,
|
index_cache_size: Optional[int] = None,
|
||||||
|
location: Optional[str] = None,
|
||||||
) -> AsyncTable:
|
) -> AsyncTable:
|
||||||
"""Open a Lance Table in the database.
|
"""Open a Lance Table in the database.
|
||||||
|
|
||||||
@@ -1218,6 +1239,10 @@ class AsyncConnection(object):
|
|||||||
This cache applies to the entire opened table, across all indices.
|
This cache applies to the entire opened table, across all indices.
|
||||||
Setting this value higher will increase performance on larger datasets
|
Setting this value higher will increase performance on larger datasets
|
||||||
at the expense of more RAM
|
at the expense of more RAM
|
||||||
|
location: str, optional
|
||||||
|
The explicit location (URI) of the table. If provided, the table will be
|
||||||
|
opened from this location instead of deriving it from the database URI
|
||||||
|
and table name.
|
||||||
|
|
||||||
Returns
|
Returns
|
||||||
-------
|
-------
|
||||||
@@ -1227,7 +1252,9 @@ class AsyncConnection(object):
|
|||||||
name,
|
name,
|
||||||
namespace=namespace,
|
namespace=namespace,
|
||||||
storage_options=storage_options,
|
storage_options=storage_options,
|
||||||
|
storage_options_provider=storage_options_provider,
|
||||||
index_cache_size=index_cache_size,
|
index_cache_size=index_cache_size,
|
||||||
|
location=location,
|
||||||
)
|
)
|
||||||
return AsyncTable(table)
|
return AsyncTable(table)
|
||||||
|
|
||||||
|
|||||||
71
python/python/lancedb/io.py
Normal file
71
python/python/lancedb/io.py
Normal file
@@ -0,0 +1,71 @@
|
|||||||
|
# SPDX-License-Identifier: Apache-2.0
|
||||||
|
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||||
|
|
||||||
|
"""I/O utilities and interfaces for LanceDB."""
|
||||||
|
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
|
from typing import Dict
|
||||||
|
|
||||||
|
|
||||||
|
class StorageOptionsProvider(ABC):
|
||||||
|
"""Abstract base class for providing storage options to LanceDB tables.
|
||||||
|
|
||||||
|
Storage options providers enable automatic credential refresh for cloud
|
||||||
|
storage backends (e.g., AWS S3, Azure Blob Storage, GCS). When credentials
|
||||||
|
have an expiration time, the provider's fetch_storage_options() method will
|
||||||
|
be called periodically to get fresh credentials before they expire.
|
||||||
|
|
||||||
|
Example
|
||||||
|
-------
|
||||||
|
>>> class MyProvider(StorageOptionsProvider):
|
||||||
|
... def fetch_storage_options(self) -> Dict[str, str]:
|
||||||
|
... # Fetch fresh credentials from your credential manager
|
||||||
|
... return {
|
||||||
|
... "aws_access_key_id": "...",
|
||||||
|
... "aws_secret_access_key": "...",
|
||||||
|
... "expires_at_millis": "1234567890000" # Optional
|
||||||
|
... }
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def fetch_storage_options(self) -> Dict[str, str]:
|
||||||
|
"""Fetch fresh storage credentials.
|
||||||
|
|
||||||
|
This method is called by LanceDB when credentials need to be refreshed.
|
||||||
|
If the returned dictionary contains an "expires_at_millis" key with a
|
||||||
|
Unix timestamp in milliseconds, LanceDB will automatically refresh the
|
||||||
|
credentials before that time. If the key is not present, credentials
|
||||||
|
are assumed to not expire.
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
Dict[str, str]
|
||||||
|
Dictionary containing cloud storage credentials and optionally an
|
||||||
|
expiration time:
|
||||||
|
- "expires_at_millis" (optional): Unix timestamp in milliseconds when
|
||||||
|
credentials expire
|
||||||
|
- Provider-specific credential keys (e.g., aws_access_key_id,
|
||||||
|
aws_secret_access_key, etc.)
|
||||||
|
|
||||||
|
Raises
|
||||||
|
------
|
||||||
|
RuntimeError
|
||||||
|
If credentials cannot be fetched or are invalid
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
def provider_id(self) -> str:
|
||||||
|
"""Return a human-readable unique identifier for this provider instance.
|
||||||
|
|
||||||
|
This identifier is used for caching and equality comparison. Two providers
|
||||||
|
with the same ID will share the same cached object store connection.
|
||||||
|
|
||||||
|
The default implementation uses the class name and string representation.
|
||||||
|
Override this method if you need custom identification logic.
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
str
|
||||||
|
A unique identifier for this provider instance
|
||||||
|
"""
|
||||||
|
return f"{self.__class__.__name__} {{ repr: {str(self)!r} }}"
|
||||||
@@ -10,42 +10,40 @@ through a namespace abstraction.
|
|||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from typing import Dict, Iterable, List, Optional, Union
|
|
||||||
import os
|
|
||||||
import sys
|
import sys
|
||||||
|
from typing import Dict, Iterable, List, Optional, Union
|
||||||
|
|
||||||
if sys.version_info >= (3, 12):
|
if sys.version_info >= (3, 12):
|
||||||
from typing import override
|
from typing import override
|
||||||
else:
|
else:
|
||||||
from overrides import override
|
from overrides import override
|
||||||
|
|
||||||
from lancedb.db import DBConnection
|
from datetime import timedelta
|
||||||
|
import pyarrow as pa
|
||||||
|
|
||||||
|
from lancedb.db import DBConnection, LanceDBConnection
|
||||||
|
from lancedb.io import StorageOptionsProvider
|
||||||
from lancedb.table import LanceTable, Table
|
from lancedb.table import LanceTable, Table
|
||||||
from lancedb.util import validate_table_name
|
from lancedb.util import validate_table_name
|
||||||
from lancedb.common import validate_schema
|
from lancedb.common import DATA
|
||||||
from lancedb.table import sanitize_create_table
|
from lancedb.pydantic import LanceModel
|
||||||
|
from lancedb.embeddings import EmbeddingFunctionConfig
|
||||||
|
from ._lancedb import Session
|
||||||
|
|
||||||
from lance_namespace import LanceNamespace, connect as namespace_connect
|
from lance_namespace import LanceNamespace, connect as namespace_connect
|
||||||
from lance_namespace_urllib3_client.models import (
|
from lance_namespace_urllib3_client.models import (
|
||||||
ListTablesRequest,
|
ListTablesRequest,
|
||||||
DescribeTableRequest,
|
DescribeTableRequest,
|
||||||
CreateTableRequest,
|
|
||||||
DropTableRequest,
|
DropTableRequest,
|
||||||
ListNamespacesRequest,
|
ListNamespacesRequest,
|
||||||
CreateNamespaceRequest,
|
CreateNamespaceRequest,
|
||||||
DropNamespaceRequest,
|
DropNamespaceRequest,
|
||||||
|
CreateEmptyTableRequest,
|
||||||
JsonArrowSchema,
|
JsonArrowSchema,
|
||||||
JsonArrowField,
|
JsonArrowField,
|
||||||
JsonArrowDataType,
|
JsonArrowDataType,
|
||||||
)
|
)
|
||||||
|
|
||||||
import pyarrow as pa
|
|
||||||
from datetime import timedelta
|
|
||||||
from lancedb.pydantic import LanceModel
|
|
||||||
from lancedb.common import DATA
|
|
||||||
from lancedb.embeddings import EmbeddingFunctionConfig
|
|
||||||
from ._lancedb import Session
|
|
||||||
|
|
||||||
|
|
||||||
def _convert_pyarrow_type_to_json(arrow_type: pa.DataType) -> JsonArrowDataType:
|
def _convert_pyarrow_type_to_json(arrow_type: pa.DataType) -> JsonArrowDataType:
|
||||||
"""Convert PyArrow DataType to JsonArrowDataType."""
|
"""Convert PyArrow DataType to JsonArrowDataType."""
|
||||||
@@ -104,6 +102,89 @@ def _convert_pyarrow_schema_to_json(schema: pa.Schema) -> JsonArrowSchema:
|
|||||||
return JsonArrowSchema(fields=fields, metadata=schema.metadata)
|
return JsonArrowSchema(fields=fields, metadata=schema.metadata)
|
||||||
|
|
||||||
|
|
||||||
|
class LanceNamespaceStorageOptionsProvider(StorageOptionsProvider):
|
||||||
|
"""Storage options provider that fetches storage options from a LanceNamespace.
|
||||||
|
|
||||||
|
This provider automatically fetches fresh storage options by calling the
|
||||||
|
namespace's describe_table() method, which returns both the table location
|
||||||
|
and time-limited storage options. This enables automatic credential refresh
|
||||||
|
for tables accessed through namespace connections.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
namespace : LanceNamespace
|
||||||
|
The namespace instance with a describe_table() method
|
||||||
|
table_id : List[str]
|
||||||
|
The table identifier (namespace path + table name)
|
||||||
|
|
||||||
|
Examples
|
||||||
|
--------
|
||||||
|
>>> from lance_namespace import connect as namespace_connect
|
||||||
|
>>> namespace = namespace_connect("rest", {"url": "https://..."})
|
||||||
|
>>> provider = LanceNamespaceStorageOptionsProvider(
|
||||||
|
... namespace=namespace,
|
||||||
|
... table_id=["my_namespace", "my_table"]
|
||||||
|
... )
|
||||||
|
>>> options = provider.fetch_storage_options()
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, namespace: LanceNamespace, table_id: List[str]):
|
||||||
|
"""Initialize with namespace and table ID.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
namespace : LanceNamespace
|
||||||
|
The namespace instance with a describe_table() method
|
||||||
|
table_id : List[str]
|
||||||
|
The table identifier
|
||||||
|
"""
|
||||||
|
self._namespace = namespace
|
||||||
|
self._table_id = table_id
|
||||||
|
|
||||||
|
def fetch_storage_options(self) -> Dict[str, str]:
|
||||||
|
"""Fetch storage options from the namespace.
|
||||||
|
|
||||||
|
This calls namespace.describe_table() to get the latest storage options
|
||||||
|
and their expiration time.
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
Dict[str, str]
|
||||||
|
Flat dictionary of string key-value pairs containing storage options.
|
||||||
|
May include "expires_at_millis" key for automatic refresh.
|
||||||
|
|
||||||
|
Raises
|
||||||
|
------
|
||||||
|
RuntimeError
|
||||||
|
If namespace does not return storage_options
|
||||||
|
"""
|
||||||
|
request = DescribeTableRequest(id=self._table_id, version=None)
|
||||||
|
response = self._namespace.describe_table(request)
|
||||||
|
storage_options = response.storage_options
|
||||||
|
if storage_options is None:
|
||||||
|
raise RuntimeError(
|
||||||
|
"Namespace did not return storage_options. "
|
||||||
|
"Ensure the namespace supports storage options providing."
|
||||||
|
)
|
||||||
|
|
||||||
|
# Return the storage_options directly - it's already a flat Map<String, String>
|
||||||
|
return storage_options
|
||||||
|
|
||||||
|
def provider_id(self) -> str:
|
||||||
|
"""Return a human-readable unique identifier for this provider instance."""
|
||||||
|
# Try to call namespace_id() if available (lance-namespace >= 0.0.20)
|
||||||
|
if hasattr(self._namespace, "namespace_id"):
|
||||||
|
namespace_id = self._namespace.namespace_id()
|
||||||
|
else:
|
||||||
|
# Fallback for older namespace versions
|
||||||
|
namespace_id = str(self._namespace)
|
||||||
|
|
||||||
|
return (
|
||||||
|
f"LanceNamespaceStorageOptionsProvider {{ "
|
||||||
|
f"namespace: {namespace_id}, table_id: {self._table_id!r} }}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class LanceNamespaceDBConnection(DBConnection):
|
class LanceNamespaceDBConnection(DBConnection):
|
||||||
"""
|
"""
|
||||||
A LanceDB connection that uses a namespace for table management.
|
A LanceDB connection that uses a namespace for table management.
|
||||||
@@ -166,6 +247,7 @@ class LanceNamespaceDBConnection(DBConnection):
|
|||||||
*,
|
*,
|
||||||
namespace: List[str] = [],
|
namespace: List[str] = [],
|
||||||
storage_options: Optional[Dict[str, str]] = None,
|
storage_options: Optional[Dict[str, str]] = None,
|
||||||
|
storage_options_provider: Optional[StorageOptionsProvider] = None,
|
||||||
data_storage_version: Optional[str] = None,
|
data_storage_version: Optional[str] = None,
|
||||||
enable_v2_manifest_paths: Optional[bool] = None,
|
enable_v2_manifest_paths: Optional[bool] = None,
|
||||||
) -> Table:
|
) -> Table:
|
||||||
@@ -173,48 +255,84 @@ class LanceNamespaceDBConnection(DBConnection):
|
|||||||
raise ValueError("mode must be either 'create' or 'overwrite'")
|
raise ValueError("mode must be either 'create' or 'overwrite'")
|
||||||
validate_table_name(name)
|
validate_table_name(name)
|
||||||
|
|
||||||
# TODO: support passing data
|
# Get location from namespace
|
||||||
if data is not None:
|
table_id = namespace + [name]
|
||||||
raise ValueError(
|
|
||||||
"create_table currently only supports creating empty tables (data=None)"
|
# Step 1: Get the table location and storage options from namespace
|
||||||
|
# In overwrite mode, if table exists, use describe_table to get
|
||||||
|
# existing location. Otherwise, call create_empty_table to reserve
|
||||||
|
# a new location
|
||||||
|
location = None
|
||||||
|
namespace_storage_options = None
|
||||||
|
if mode.lower() == "overwrite":
|
||||||
|
# Try to describe the table first to see if it exists
|
||||||
|
try:
|
||||||
|
describe_request = DescribeTableRequest(id=table_id)
|
||||||
|
describe_response = self._ns.describe_table(describe_request)
|
||||||
|
location = describe_response.location
|
||||||
|
namespace_storage_options = describe_response.storage_options
|
||||||
|
except Exception:
|
||||||
|
# Table doesn't exist, will create a new one below
|
||||||
|
pass
|
||||||
|
|
||||||
|
if location is None:
|
||||||
|
# Table doesn't exist or mode is "create", reserve a new location
|
||||||
|
create_empty_request = CreateEmptyTableRequest(
|
||||||
|
id=table_id,
|
||||||
|
location=None,
|
||||||
|
properties=self.storage_options if self.storage_options else None,
|
||||||
|
)
|
||||||
|
create_empty_response = self._ns.create_empty_table(create_empty_request)
|
||||||
|
|
||||||
|
if not create_empty_response.location:
|
||||||
|
raise ValueError(
|
||||||
|
"Table location is missing from create_empty_table response"
|
||||||
|
)
|
||||||
|
|
||||||
|
location = create_empty_response.location
|
||||||
|
namespace_storage_options = create_empty_response.storage_options
|
||||||
|
|
||||||
|
# Merge storage options: self.storage_options < user options < namespace options
|
||||||
|
merged_storage_options = dict(self.storage_options)
|
||||||
|
if storage_options:
|
||||||
|
merged_storage_options.update(storage_options)
|
||||||
|
if namespace_storage_options:
|
||||||
|
merged_storage_options.update(namespace_storage_options)
|
||||||
|
|
||||||
|
# Step 2: Create table using LanceTable.create with the location
|
||||||
|
# We need a temporary connection for the LanceTable.create method
|
||||||
|
temp_conn = LanceDBConnection(
|
||||||
|
location, # Use the actual location as the connection URI
|
||||||
|
read_consistency_interval=self.read_consistency_interval,
|
||||||
|
storage_options=merged_storage_options,
|
||||||
|
session=self.session,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create a storage options provider if not provided by user
|
||||||
|
# Only create if namespace returned storage_options (not None)
|
||||||
|
if storage_options_provider is None and namespace_storage_options is not None:
|
||||||
|
storage_options_provider = LanceNamespaceStorageOptionsProvider(
|
||||||
|
namespace=self._ns,
|
||||||
|
table_id=table_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Prepare schema
|
tbl = LanceTable.create(
|
||||||
metadata = None
|
temp_conn,
|
||||||
if embedding_functions is not None:
|
name,
|
||||||
from lancedb.embeddings.registry import EmbeddingFunctionRegistry
|
data,
|
||||||
|
schema,
|
||||||
registry = EmbeddingFunctionRegistry.get_instance()
|
mode=mode,
|
||||||
metadata = registry.get_table_metadata(embedding_functions)
|
exist_ok=exist_ok,
|
||||||
|
on_bad_vectors=on_bad_vectors,
|
||||||
data, schema = sanitize_create_table(
|
fill_value=fill_value,
|
||||||
data, schema, metadata, on_bad_vectors, fill_value
|
embedding_functions=embedding_functions,
|
||||||
|
namespace=namespace,
|
||||||
|
storage_options=merged_storage_options,
|
||||||
|
storage_options_provider=storage_options_provider,
|
||||||
|
location=location,
|
||||||
)
|
)
|
||||||
validate_schema(schema)
|
|
||||||
|
|
||||||
# Convert PyArrow schema to JsonArrowSchema
|
return tbl
|
||||||
json_schema = _convert_pyarrow_schema_to_json(schema)
|
|
||||||
|
|
||||||
# Create table request with namespace
|
|
||||||
table_id = namespace + [name]
|
|
||||||
request = CreateTableRequest(id=table_id, var_schema=json_schema)
|
|
||||||
|
|
||||||
# Create empty Arrow IPC stream bytes
|
|
||||||
import pyarrow.ipc as ipc
|
|
||||||
import io
|
|
||||||
|
|
||||||
empty_table = pa.Table.from_arrays(
|
|
||||||
[pa.array([], type=field.type) for field in schema], schema=schema
|
|
||||||
)
|
|
||||||
buffer = io.BytesIO()
|
|
||||||
with ipc.new_stream(buffer, schema) as writer:
|
|
||||||
writer.write_table(empty_table)
|
|
||||||
request_data = buffer.getvalue()
|
|
||||||
|
|
||||||
self._ns.create_table(request, request_data)
|
|
||||||
return self.open_table(
|
|
||||||
name, namespace=namespace, storage_options=storage_options
|
|
||||||
)
|
|
||||||
|
|
||||||
@override
|
@override
|
||||||
def open_table(
|
def open_table(
|
||||||
@@ -223,21 +341,34 @@ class LanceNamespaceDBConnection(DBConnection):
|
|||||||
*,
|
*,
|
||||||
namespace: List[str] = [],
|
namespace: List[str] = [],
|
||||||
storage_options: Optional[Dict[str, str]] = None,
|
storage_options: Optional[Dict[str, str]] = None,
|
||||||
|
storage_options_provider: Optional[StorageOptionsProvider] = None,
|
||||||
index_cache_size: Optional[int] = None,
|
index_cache_size: Optional[int] = None,
|
||||||
) -> Table:
|
) -> Table:
|
||||||
table_id = namespace + [name]
|
table_id = namespace + [name]
|
||||||
request = DescribeTableRequest(id=table_id)
|
request = DescribeTableRequest(id=table_id)
|
||||||
response = self._ns.describe_table(request)
|
response = self._ns.describe_table(request)
|
||||||
|
|
||||||
merged_storage_options = dict()
|
# Merge storage options: self.storage_options < user options < namespace options
|
||||||
|
merged_storage_options = dict(self.storage_options)
|
||||||
if storage_options:
|
if storage_options:
|
||||||
merged_storage_options.update(storage_options)
|
merged_storage_options.update(storage_options)
|
||||||
if response.storage_options:
|
if response.storage_options:
|
||||||
merged_storage_options.update(response.storage_options)
|
merged_storage_options.update(response.storage_options)
|
||||||
|
|
||||||
|
# Create a storage options provider if not provided by user
|
||||||
|
# Only create if namespace returned storage_options (not None)
|
||||||
|
if storage_options_provider is None and response.storage_options is not None:
|
||||||
|
storage_options_provider = LanceNamespaceStorageOptionsProvider(
|
||||||
|
namespace=self._ns,
|
||||||
|
table_id=table_id,
|
||||||
|
)
|
||||||
|
|
||||||
return self._lance_table_from_uri(
|
return self._lance_table_from_uri(
|
||||||
|
name,
|
||||||
response.location,
|
response.location,
|
||||||
|
namespace=namespace,
|
||||||
storage_options=merged_storage_options,
|
storage_options=merged_storage_options,
|
||||||
|
storage_options_provider=storage_options_provider,
|
||||||
index_cache_size=index_cache_size,
|
index_cache_size=index_cache_size,
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -330,33 +461,32 @@ class LanceNamespaceDBConnection(DBConnection):
|
|||||||
|
|
||||||
def _lance_table_from_uri(
|
def _lance_table_from_uri(
|
||||||
self,
|
self,
|
||||||
|
name: str,
|
||||||
table_uri: str,
|
table_uri: str,
|
||||||
*,
|
*,
|
||||||
|
namespace: List[str] = [],
|
||||||
storage_options: Optional[Dict[str, str]] = None,
|
storage_options: Optional[Dict[str, str]] = None,
|
||||||
|
storage_options_provider: Optional[StorageOptionsProvider] = None,
|
||||||
index_cache_size: Optional[int] = None,
|
index_cache_size: Optional[int] = None,
|
||||||
) -> LanceTable:
|
) -> LanceTable:
|
||||||
# Extract the base path and table name from the URI
|
# Open a table directly from a URI using the location parameter
|
||||||
if table_uri.endswith(".lance"):
|
# Note: storage_options should already be merged by the caller
|
||||||
base_path = os.path.dirname(table_uri)
|
|
||||||
table_name = os.path.basename(table_uri)[:-6] # Remove .lance
|
|
||||||
else:
|
|
||||||
raise ValueError(f"Invalid table URI: {table_uri}")
|
|
||||||
|
|
||||||
from lancedb.db import LanceDBConnection
|
|
||||||
|
|
||||||
temp_conn = LanceDBConnection(
|
temp_conn = LanceDBConnection(
|
||||||
base_path,
|
table_uri, # Use the table location as the connection URI
|
||||||
read_consistency_interval=self.read_consistency_interval,
|
read_consistency_interval=self.read_consistency_interval,
|
||||||
storage_options={**self.storage_options, **(storage_options or {})},
|
storage_options=storage_options if storage_options is not None else {},
|
||||||
session=self.session,
|
session=self.session,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Open the table using the temporary connection
|
# Open the table using the temporary connection with the location parameter
|
||||||
return LanceTable.open(
|
return LanceTable.open(
|
||||||
temp_conn,
|
temp_conn,
|
||||||
table_name,
|
name,
|
||||||
|
namespace=namespace,
|
||||||
storage_options=storage_options,
|
storage_options=storage_options,
|
||||||
|
storage_options_provider=storage_options_provider,
|
||||||
index_cache_size=index_cache_size,
|
index_cache_size=index_cache_size,
|
||||||
|
location=table_uri,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -75,6 +75,7 @@ from .index import lang_mapping
|
|||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from .db import LanceDBConnection
|
from .db import LanceDBConnection
|
||||||
|
from .io import StorageOptionsProvider
|
||||||
from ._lancedb import (
|
from ._lancedb import (
|
||||||
Table as LanceDBTable,
|
Table as LanceDBTable,
|
||||||
OptimizeStats,
|
OptimizeStats,
|
||||||
@@ -1709,7 +1710,9 @@ class LanceTable(Table):
|
|||||||
*,
|
*,
|
||||||
namespace: List[str] = [],
|
namespace: List[str] = [],
|
||||||
storage_options: Optional[Dict[str, str]] = None,
|
storage_options: Optional[Dict[str, str]] = None,
|
||||||
|
storage_options_provider: Optional["StorageOptionsProvider"] = None,
|
||||||
index_cache_size: Optional[int] = None,
|
index_cache_size: Optional[int] = None,
|
||||||
|
location: Optional[str] = None,
|
||||||
_async: AsyncTable = None,
|
_async: AsyncTable = None,
|
||||||
):
|
):
|
||||||
self._conn = connection
|
self._conn = connection
|
||||||
@@ -1722,7 +1725,9 @@ class LanceTable(Table):
|
|||||||
name,
|
name,
|
||||||
namespace=namespace,
|
namespace=namespace,
|
||||||
storage_options=storage_options,
|
storage_options=storage_options,
|
||||||
|
storage_options_provider=storage_options_provider,
|
||||||
index_cache_size=index_cache_size,
|
index_cache_size=index_cache_size,
|
||||||
|
location=location,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -1730,6 +1735,18 @@ class LanceTable(Table):
|
|||||||
def name(self) -> str:
|
def name(self) -> str:
|
||||||
return self._table.name
|
return self._table.name
|
||||||
|
|
||||||
|
@property
|
||||||
|
def namespace(self) -> List[str]:
|
||||||
|
"""Return the namespace path of the table."""
|
||||||
|
return self._namespace
|
||||||
|
|
||||||
|
@property
|
||||||
|
def id(self) -> str:
|
||||||
|
"""Return the full identifier of the table (namespace$name)."""
|
||||||
|
if self._namespace:
|
||||||
|
return "$".join(self._namespace + [self.name])
|
||||||
|
return self.name
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_inner(cls, tbl: LanceDBTable):
|
def from_inner(cls, tbl: LanceDBTable):
|
||||||
from .db import LanceDBConnection
|
from .db import LanceDBConnection
|
||||||
@@ -1743,8 +1760,26 @@ class LanceTable(Table):
|
|||||||
)
|
)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def open(cls, db, name, *, namespace: List[str] = [], **kwargs):
|
def open(
|
||||||
tbl = cls(db, name, namespace=namespace, **kwargs)
|
cls,
|
||||||
|
db,
|
||||||
|
name,
|
||||||
|
*,
|
||||||
|
namespace: List[str] = [],
|
||||||
|
storage_options: Optional[Dict[str, str]] = None,
|
||||||
|
storage_options_provider: Optional["StorageOptionsProvider"] = None,
|
||||||
|
index_cache_size: Optional[int] = None,
|
||||||
|
location: Optional[str] = None,
|
||||||
|
):
|
||||||
|
tbl = cls(
|
||||||
|
db,
|
||||||
|
name,
|
||||||
|
namespace=namespace,
|
||||||
|
storage_options=storage_options,
|
||||||
|
storage_options_provider=storage_options_provider,
|
||||||
|
index_cache_size=index_cache_size,
|
||||||
|
location=location,
|
||||||
|
)
|
||||||
|
|
||||||
# check the dataset exists
|
# check the dataset exists
|
||||||
try:
|
try:
|
||||||
@@ -2585,8 +2620,10 @@ class LanceTable(Table):
|
|||||||
*,
|
*,
|
||||||
namespace: List[str] = [],
|
namespace: List[str] = [],
|
||||||
storage_options: Optional[Dict[str, str | bool]] = None,
|
storage_options: Optional[Dict[str, str | bool]] = None,
|
||||||
|
storage_options_provider: Optional["StorageOptionsProvider"] = None,
|
||||||
data_storage_version: Optional[str] = None,
|
data_storage_version: Optional[str] = None,
|
||||||
enable_v2_manifest_paths: Optional[bool] = None,
|
enable_v2_manifest_paths: Optional[bool] = None,
|
||||||
|
location: Optional[str] = None,
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
Create a new table.
|
Create a new table.
|
||||||
@@ -2678,6 +2715,8 @@ class LanceTable(Table):
|
|||||||
embedding_functions=embedding_functions,
|
embedding_functions=embedding_functions,
|
||||||
namespace=namespace,
|
namespace=namespace,
|
||||||
storage_options=storage_options,
|
storage_options=storage_options,
|
||||||
|
storage_options_provider=storage_options_provider,
|
||||||
|
location=location,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
return self
|
return self
|
||||||
|
|||||||
@@ -781,58 +781,6 @@ def test_local_drop_namespace_not_supported(tmp_path):
|
|||||||
db.drop_namespace(["test_namespace"])
|
db.drop_namespace(["test_namespace"])
|
||||||
|
|
||||||
|
|
||||||
def test_local_table_operations_with_namespace_raise_error(tmp_path):
|
|
||||||
"""
|
|
||||||
Test that table operations with namespace parameter
|
|
||||||
raise ValueError in local mode.
|
|
||||||
"""
|
|
||||||
db = lancedb.connect(tmp_path)
|
|
||||||
|
|
||||||
# Create some test data
|
|
||||||
data = [{"vector": [1.0, 2.0], "item": "test"}]
|
|
||||||
schema = pa.schema(
|
|
||||||
[pa.field("vector", pa.list_(pa.float32(), 2)), pa.field("item", pa.string())]
|
|
||||||
)
|
|
||||||
|
|
||||||
# Test create_table with namespace - should raise ValueError
|
|
||||||
with pytest.raises(
|
|
||||||
NotImplementedError,
|
|
||||||
match="Namespace parameter is not supported for listing database",
|
|
||||||
):
|
|
||||||
db.create_table(
|
|
||||||
"test_table_with_ns", data=data, schema=schema, namespace=["test_ns"]
|
|
||||||
)
|
|
||||||
|
|
||||||
# Create table normally for other tests
|
|
||||||
db.create_table("test_table", data=data, schema=schema)
|
|
||||||
assert "test_table" in db.table_names()
|
|
||||||
|
|
||||||
# Test open_table with namespace - should raise ValueError
|
|
||||||
with pytest.raises(
|
|
||||||
NotImplementedError,
|
|
||||||
match="Namespace parameter is not supported for listing database",
|
|
||||||
):
|
|
||||||
db.open_table("test_table", namespace=["test_ns"])
|
|
||||||
|
|
||||||
# Test table_names with namespace - should raise ValueError
|
|
||||||
with pytest.raises(
|
|
||||||
NotImplementedError,
|
|
||||||
match="Namespace parameter is not supported for listing database",
|
|
||||||
):
|
|
||||||
list(db.table_names(namespace=["test_ns"]))
|
|
||||||
|
|
||||||
# Test drop_table with namespace - should raise ValueError
|
|
||||||
with pytest.raises(
|
|
||||||
NotImplementedError,
|
|
||||||
match="Namespace parameter is not supported for listing database",
|
|
||||||
):
|
|
||||||
db.drop_table("test_table", namespace=["test_ns"])
|
|
||||||
|
|
||||||
# Test table_names without namespace - should work normally
|
|
||||||
tables_root = list(db.table_names())
|
|
||||||
assert "test_table" in tables_root
|
|
||||||
|
|
||||||
|
|
||||||
def test_clone_table_latest_version(tmp_path):
|
def test_clone_table_latest_version(tmp_path):
|
||||||
"""Test cloning a table with the latest version (default behavior)"""
|
"""Test cloning a table with the latest version (default behavior)"""
|
||||||
import os
|
import os
|
||||||
|
|||||||
@@ -5,352 +5,39 @@
|
|||||||
|
|
||||||
import tempfile
|
import tempfile
|
||||||
import shutil
|
import shutil
|
||||||
from typing import Dict, Optional
|
|
||||||
import pytest
|
import pytest
|
||||||
import pyarrow as pa
|
import pyarrow as pa
|
||||||
import lancedb
|
import lancedb
|
||||||
from lance_namespace.namespace import NATIVE_IMPLS, LanceNamespace
|
|
||||||
from lance_namespace_urllib3_client.models import (
|
|
||||||
ListTablesRequest,
|
|
||||||
ListTablesResponse,
|
|
||||||
DescribeTableRequest,
|
|
||||||
DescribeTableResponse,
|
|
||||||
RegisterTableRequest,
|
|
||||||
RegisterTableResponse,
|
|
||||||
DeregisterTableRequest,
|
|
||||||
DeregisterTableResponse,
|
|
||||||
CreateTableRequest,
|
|
||||||
CreateTableResponse,
|
|
||||||
DropTableRequest,
|
|
||||||
DropTableResponse,
|
|
||||||
ListNamespacesRequest,
|
|
||||||
ListNamespacesResponse,
|
|
||||||
CreateNamespaceRequest,
|
|
||||||
CreateNamespaceResponse,
|
|
||||||
DropNamespaceRequest,
|
|
||||||
DropNamespaceResponse,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class TempNamespace(LanceNamespace):
|
|
||||||
"""A simple dictionary-backed namespace for testing."""
|
|
||||||
|
|
||||||
# Class-level storage to persist table registry across instances
|
|
||||||
_global_registry: Dict[str, Dict[str, str]] = {}
|
|
||||||
# Class-level storage for namespaces (supporting 1-level namespace)
|
|
||||||
_global_namespaces: Dict[str, set] = {}
|
|
||||||
|
|
||||||
def __init__(self, **properties):
|
|
||||||
"""Initialize the test namespace.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
root: The root directory for tables (optional)
|
|
||||||
**properties: Additional configuration properties
|
|
||||||
"""
|
|
||||||
self.config = TempNamespaceConfig(properties)
|
|
||||||
# Use the root as a key to maintain separate registries per root
|
|
||||||
root = self.config.root
|
|
||||||
if root not in self._global_registry:
|
|
||||||
self._global_registry[root] = {}
|
|
||||||
if root not in self._global_namespaces:
|
|
||||||
self._global_namespaces[root] = set()
|
|
||||||
self.tables = self._global_registry[root] # Reference to shared registry
|
|
||||||
self.namespaces = self._global_namespaces[
|
|
||||||
root
|
|
||||||
] # Reference to shared namespaces
|
|
||||||
|
|
||||||
def namespace_id(self) -> str:
|
|
||||||
"""Return a human-readable unique identifier for this namespace instance.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
A unique identifier string based on the root directory
|
|
||||||
"""
|
|
||||||
return f"TempNamespace {{ root: '{self.config.root}' }}"
|
|
||||||
|
|
||||||
def list_tables(self, request: ListTablesRequest) -> ListTablesResponse:
|
|
||||||
"""List all tables in the namespace."""
|
|
||||||
if not request.id:
|
|
||||||
# List all tables in root namespace
|
|
||||||
tables = [name for name in self.tables.keys() if "." not in name]
|
|
||||||
else:
|
|
||||||
# List tables in specific namespace (1-level only)
|
|
||||||
if len(request.id) == 1:
|
|
||||||
namespace_name = request.id[0]
|
|
||||||
prefix = f"{namespace_name}."
|
|
||||||
tables = [
|
|
||||||
name[len(prefix) :]
|
|
||||||
for name in self.tables.keys()
|
|
||||||
if name.startswith(prefix)
|
|
||||||
]
|
|
||||||
else:
|
|
||||||
# Multi-level namespaces not supported
|
|
||||||
raise ValueError("Only 1-level namespaces are supported")
|
|
||||||
return ListTablesResponse(tables=tables)
|
|
||||||
|
|
||||||
def describe_table(self, request: DescribeTableRequest) -> DescribeTableResponse:
|
|
||||||
"""Describe a table by returning its location."""
|
|
||||||
if not request.id:
|
|
||||||
raise ValueError("Invalid table ID")
|
|
||||||
|
|
||||||
if len(request.id) == 1:
|
|
||||||
# Root namespace table
|
|
||||||
table_name = request.id[0]
|
|
||||||
elif len(request.id) == 2:
|
|
||||||
# Namespaced table (1-level namespace)
|
|
||||||
namespace_name, table_name = request.id
|
|
||||||
table_name = f"{namespace_name}.{table_name}"
|
|
||||||
else:
|
|
||||||
raise ValueError("Only 1-level namespaces are supported")
|
|
||||||
|
|
||||||
if table_name not in self.tables:
|
|
||||||
raise RuntimeError(f"Table does not exist: {table_name}")
|
|
||||||
|
|
||||||
table_uri = self.tables[table_name]
|
|
||||||
return DescribeTableResponse(location=table_uri)
|
|
||||||
|
|
||||||
def create_table(
|
|
||||||
self, request: CreateTableRequest, request_data: bytes
|
|
||||||
) -> CreateTableResponse:
|
|
||||||
"""Create a table in the namespace."""
|
|
||||||
if not request.id:
|
|
||||||
raise ValueError("Invalid table ID")
|
|
||||||
|
|
||||||
if len(request.id) == 1:
|
|
||||||
# Root namespace table
|
|
||||||
table_name = request.id[0]
|
|
||||||
table_uri = f"{self.config.root}/{table_name}.lance"
|
|
||||||
elif len(request.id) == 2:
|
|
||||||
# Namespaced table (1-level namespace)
|
|
||||||
namespace_name, base_table_name = request.id
|
|
||||||
# Add namespace to our namespace set
|
|
||||||
self.namespaces.add(namespace_name)
|
|
||||||
table_name = f"{namespace_name}.{base_table_name}"
|
|
||||||
table_uri = f"{self.config.root}/{namespace_name}/{base_table_name}.lance"
|
|
||||||
else:
|
|
||||||
raise ValueError("Only 1-level namespaces are supported")
|
|
||||||
|
|
||||||
# Check if table already exists
|
|
||||||
if table_name in self.tables:
|
|
||||||
if request.mode == "overwrite":
|
|
||||||
# Drop existing table for overwrite mode
|
|
||||||
del self.tables[table_name]
|
|
||||||
else:
|
|
||||||
raise RuntimeError(f"Table already exists: {table_name}")
|
|
||||||
|
|
||||||
# Parse the Arrow IPC stream to get the schema and create the actual table
|
|
||||||
import pyarrow.ipc as ipc
|
|
||||||
import io
|
|
||||||
import lance
|
|
||||||
import os
|
|
||||||
|
|
||||||
# Create directory if needed for namespaced tables
|
|
||||||
os.makedirs(os.path.dirname(table_uri), exist_ok=True)
|
|
||||||
|
|
||||||
# Read the IPC stream
|
|
||||||
reader = ipc.open_stream(io.BytesIO(request_data))
|
|
||||||
table = reader.read_all()
|
|
||||||
|
|
||||||
# Create the actual Lance table
|
|
||||||
lance.write_dataset(table, table_uri)
|
|
||||||
|
|
||||||
# Store the table mapping
|
|
||||||
self.tables[table_name] = table_uri
|
|
||||||
|
|
||||||
return CreateTableResponse(location=table_uri)
|
|
||||||
|
|
||||||
def drop_table(self, request: DropTableRequest) -> DropTableResponse:
|
|
||||||
"""Drop a table from the namespace."""
|
|
||||||
if not request.id:
|
|
||||||
raise ValueError("Invalid table ID")
|
|
||||||
|
|
||||||
if len(request.id) == 1:
|
|
||||||
# Root namespace table
|
|
||||||
table_name = request.id[0]
|
|
||||||
elif len(request.id) == 2:
|
|
||||||
# Namespaced table (1-level namespace)
|
|
||||||
namespace_name, base_table_name = request.id
|
|
||||||
table_name = f"{namespace_name}.{base_table_name}"
|
|
||||||
else:
|
|
||||||
raise ValueError("Only 1-level namespaces are supported")
|
|
||||||
|
|
||||||
if table_name not in self.tables:
|
|
||||||
raise RuntimeError(f"Table does not exist: {table_name}")
|
|
||||||
|
|
||||||
# Get the table URI
|
|
||||||
table_uri = self.tables[table_name]
|
|
||||||
|
|
||||||
# Delete the actual table files
|
|
||||||
import shutil
|
|
||||||
import os
|
|
||||||
|
|
||||||
if os.path.exists(table_uri):
|
|
||||||
shutil.rmtree(table_uri, ignore_errors=True)
|
|
||||||
|
|
||||||
# Remove from registry
|
|
||||||
del self.tables[table_name]
|
|
||||||
|
|
||||||
return DropTableResponse()
|
|
||||||
|
|
||||||
def register_table(self, request: RegisterTableRequest) -> RegisterTableResponse:
|
|
||||||
"""Register a table with the namespace."""
|
|
||||||
if not request.id or len(request.id) != 1:
|
|
||||||
raise ValueError("Invalid table ID")
|
|
||||||
|
|
||||||
if not request.location:
|
|
||||||
raise ValueError("Table location is required")
|
|
||||||
|
|
||||||
table_name = request.id[0]
|
|
||||||
self.tables[table_name] = request.location
|
|
||||||
|
|
||||||
return RegisterTableResponse()
|
|
||||||
|
|
||||||
def deregister_table(
|
|
||||||
self, request: DeregisterTableRequest
|
|
||||||
) -> DeregisterTableResponse:
|
|
||||||
"""Deregister a table from the namespace."""
|
|
||||||
if not request.id or len(request.id) != 1:
|
|
||||||
raise ValueError("Invalid table ID")
|
|
||||||
|
|
||||||
table_name = request.id[0]
|
|
||||||
if table_name not in self.tables:
|
|
||||||
raise RuntimeError(f"Table does not exist: {table_name}")
|
|
||||||
|
|
||||||
del self.tables[table_name]
|
|
||||||
return DeregisterTableResponse()
|
|
||||||
|
|
||||||
def list_namespaces(self, request: ListNamespacesRequest) -> ListNamespacesResponse:
|
|
||||||
"""List child namespaces."""
|
|
||||||
if not request.id:
|
|
||||||
# List root-level namespaces
|
|
||||||
namespaces = list(self.namespaces)
|
|
||||||
elif len(request.id) == 1:
|
|
||||||
# For 1-level namespace, there are no child namespaces
|
|
||||||
namespaces = []
|
|
||||||
else:
|
|
||||||
raise ValueError("Only 1-level namespaces are supported")
|
|
||||||
|
|
||||||
return ListNamespacesResponse(namespaces=namespaces)
|
|
||||||
|
|
||||||
def create_namespace(
|
|
||||||
self, request: CreateNamespaceRequest
|
|
||||||
) -> CreateNamespaceResponse:
|
|
||||||
"""Create a namespace."""
|
|
||||||
if not request.id:
|
|
||||||
raise ValueError("Invalid namespace ID")
|
|
||||||
|
|
||||||
if len(request.id) == 1:
|
|
||||||
# Create 1-level namespace
|
|
||||||
namespace_name = request.id[0]
|
|
||||||
self.namespaces.add(namespace_name)
|
|
||||||
|
|
||||||
# Create directory for the namespace
|
|
||||||
import os
|
|
||||||
|
|
||||||
namespace_dir = f"{self.config.root}/{namespace_name}"
|
|
||||||
os.makedirs(namespace_dir, exist_ok=True)
|
|
||||||
else:
|
|
||||||
raise ValueError("Only 1-level namespaces are supported")
|
|
||||||
|
|
||||||
return CreateNamespaceResponse()
|
|
||||||
|
|
||||||
def drop_namespace(self, request: DropNamespaceRequest) -> DropNamespaceResponse:
|
|
||||||
"""Drop a namespace."""
|
|
||||||
if not request.id:
|
|
||||||
raise ValueError("Invalid namespace ID")
|
|
||||||
|
|
||||||
if len(request.id) == 1:
|
|
||||||
# Drop 1-level namespace
|
|
||||||
namespace_name = request.id[0]
|
|
||||||
|
|
||||||
if namespace_name not in self.namespaces:
|
|
||||||
raise RuntimeError(f"Namespace does not exist: {namespace_name}")
|
|
||||||
|
|
||||||
# Check if namespace has any tables
|
|
||||||
prefix = f"{namespace_name}."
|
|
||||||
tables_in_namespace = [
|
|
||||||
name for name in self.tables.keys() if name.startswith(prefix)
|
|
||||||
]
|
|
||||||
if tables_in_namespace:
|
|
||||||
raise RuntimeError(
|
|
||||||
f"Cannot drop namespace '{namespace_name}': contains tables"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Remove namespace
|
|
||||||
self.namespaces.remove(namespace_name)
|
|
||||||
|
|
||||||
# Remove directory
|
|
||||||
import shutil
|
|
||||||
import os
|
|
||||||
|
|
||||||
namespace_dir = f"{self.config.root}/{namespace_name}"
|
|
||||||
if os.path.exists(namespace_dir):
|
|
||||||
shutil.rmtree(namespace_dir, ignore_errors=True)
|
|
||||||
else:
|
|
||||||
raise ValueError("Only 1-level namespaces are supported")
|
|
||||||
|
|
||||||
return DropNamespaceResponse()
|
|
||||||
|
|
||||||
|
|
||||||
class TempNamespaceConfig:
|
|
||||||
"""Configuration for TestNamespace."""
|
|
||||||
|
|
||||||
ROOT = "root"
|
|
||||||
|
|
||||||
def __init__(self, properties: Optional[Dict[str, str]] = None):
|
|
||||||
"""Initialize configuration from properties.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
properties: Dictionary of configuration properties
|
|
||||||
"""
|
|
||||||
if properties is None:
|
|
||||||
properties = {}
|
|
||||||
|
|
||||||
self._root = properties.get(self.ROOT, "/tmp")
|
|
||||||
|
|
||||||
@property
|
|
||||||
def root(self) -> str:
|
|
||||||
"""Get the namespace root directory."""
|
|
||||||
return self._root
|
|
||||||
|
|
||||||
|
|
||||||
NATIVE_IMPLS["temp"] = f"{TempNamespace.__module__}.TempNamespace"
|
|
||||||
|
|
||||||
|
|
||||||
class TestNamespaceConnection:
|
class TestNamespaceConnection:
|
||||||
"""Test namespace-based LanceDB connection."""
|
"""Test namespace-based LanceDB connection using DirectoryNamespace."""
|
||||||
|
|
||||||
def setup_method(self):
|
def setup_method(self):
|
||||||
"""Set up test fixtures."""
|
"""Set up test fixtures."""
|
||||||
self.temp_dir = tempfile.mkdtemp()
|
self.temp_dir = tempfile.mkdtemp()
|
||||||
# Clear the TestNamespace registry for this test
|
|
||||||
if self.temp_dir in TempNamespace._global_registry:
|
|
||||||
TempNamespace._global_registry[self.temp_dir].clear()
|
|
||||||
if self.temp_dir in TempNamespace._global_namespaces:
|
|
||||||
TempNamespace._global_namespaces[self.temp_dir].clear()
|
|
||||||
|
|
||||||
def teardown_method(self):
|
def teardown_method(self):
|
||||||
"""Clean up test fixtures."""
|
"""Clean up test fixtures."""
|
||||||
# Clear the TestNamespace registry
|
|
||||||
if self.temp_dir in TempNamespace._global_registry:
|
|
||||||
del TempNamespace._global_registry[self.temp_dir]
|
|
||||||
if self.temp_dir in TempNamespace._global_namespaces:
|
|
||||||
del TempNamespace._global_namespaces[self.temp_dir]
|
|
||||||
shutil.rmtree(self.temp_dir, ignore_errors=True)
|
shutil.rmtree(self.temp_dir, ignore_errors=True)
|
||||||
|
|
||||||
def test_connect_namespace_test(self):
|
def test_connect_namespace_test(self):
|
||||||
"""Test connecting to LanceDB through TestNamespace."""
|
"""Test connecting to LanceDB through DirectoryNamespace."""
|
||||||
# Connect using TestNamespace
|
# Connect using DirectoryNamespace
|
||||||
db = lancedb.connect_namespace("temp", {"root": self.temp_dir})
|
db = lancedb.connect_namespace("dir", {"root": self.temp_dir})
|
||||||
|
|
||||||
# Should be a LanceNamespaceDBConnection
|
# Should be a LanceNamespaceDBConnection
|
||||||
assert isinstance(db, lancedb.LanceNamespaceDBConnection)
|
assert isinstance(db, lancedb.LanceNamespaceDBConnection)
|
||||||
|
|
||||||
# Initially no tables
|
# Initially no tables in root
|
||||||
assert len(list(db.table_names())) == 0
|
assert len(list(db.table_names())) == 0
|
||||||
|
|
||||||
def test_create_table_through_namespace(self):
|
def test_create_table_through_namespace(self):
|
||||||
"""Test creating a table through namespace."""
|
"""Test creating a table through namespace."""
|
||||||
db = lancedb.connect_namespace("temp", {"root": self.temp_dir})
|
db = lancedb.connect_namespace("dir", {"root": self.temp_dir})
|
||||||
|
|
||||||
|
# Create a child namespace first
|
||||||
|
db.create_namespace(["test_ns"])
|
||||||
|
|
||||||
# Define schema for empty table
|
# Define schema for empty table
|
||||||
schema = pa.schema(
|
schema = pa.schema(
|
||||||
@@ -361,13 +48,15 @@ class TestNamespaceConnection:
|
|||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
# Create empty table
|
# Create empty table in child namespace
|
||||||
table = db.create_table("test_table", schema=schema)
|
table = db.create_table("test_table", schema=schema, namespace=["test_ns"])
|
||||||
assert table is not None
|
assert table is not None
|
||||||
assert table.name == "test_table"
|
assert table.name == "test_table"
|
||||||
|
assert table.namespace == ["test_ns"]
|
||||||
|
assert table.id == "test_ns$test_table"
|
||||||
|
|
||||||
# Table should appear in namespace
|
# Table should appear in child namespace
|
||||||
table_names = list(db.table_names())
|
table_names = list(db.table_names(namespace=["test_ns"]))
|
||||||
assert "test_table" in table_names
|
assert "test_table" in table_names
|
||||||
assert len(table_names) == 1
|
assert len(table_names) == 1
|
||||||
|
|
||||||
@@ -378,21 +67,26 @@ class TestNamespaceConnection:
|
|||||||
|
|
||||||
def test_open_table_through_namespace(self):
|
def test_open_table_through_namespace(self):
|
||||||
"""Test opening an existing table through namespace."""
|
"""Test opening an existing table through namespace."""
|
||||||
db = lancedb.connect_namespace("temp", {"root": self.temp_dir})
|
db = lancedb.connect_namespace("dir", {"root": self.temp_dir})
|
||||||
|
|
||||||
# Create a table with schema
|
# Create a child namespace first
|
||||||
|
db.create_namespace(["test_ns"])
|
||||||
|
|
||||||
|
# Create a table with schema in child namespace
|
||||||
schema = pa.schema(
|
schema = pa.schema(
|
||||||
[
|
[
|
||||||
pa.field("id", pa.int64()),
|
pa.field("id", pa.int64()),
|
||||||
pa.field("vector", pa.list_(pa.float32(), 2)),
|
pa.field("vector", pa.list_(pa.float32(), 2)),
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
db.create_table("test_table", schema=schema)
|
db.create_table("test_table", schema=schema, namespace=["test_ns"])
|
||||||
|
|
||||||
# Open the table
|
# Open the table
|
||||||
table = db.open_table("test_table")
|
table = db.open_table("test_table", namespace=["test_ns"])
|
||||||
assert table is not None
|
assert table is not None
|
||||||
assert table.name == "test_table"
|
assert table.name == "test_table"
|
||||||
|
assert table.namespace == ["test_ns"]
|
||||||
|
assert table.id == "test_ns$test_table"
|
||||||
|
|
||||||
# Verify empty table with correct schema
|
# Verify empty table with correct schema
|
||||||
result = table.to_pandas()
|
result = table.to_pandas()
|
||||||
@@ -401,44 +95,50 @@ class TestNamespaceConnection:
|
|||||||
|
|
||||||
def test_drop_table_through_namespace(self):
|
def test_drop_table_through_namespace(self):
|
||||||
"""Test dropping a table through namespace."""
|
"""Test dropping a table through namespace."""
|
||||||
db = lancedb.connect_namespace("temp", {"root": self.temp_dir})
|
db = lancedb.connect_namespace("dir", {"root": self.temp_dir})
|
||||||
|
|
||||||
# Create tables
|
# Create a child namespace first
|
||||||
|
db.create_namespace(["test_ns"])
|
||||||
|
|
||||||
|
# Create tables in child namespace
|
||||||
schema = pa.schema(
|
schema = pa.schema(
|
||||||
[
|
[
|
||||||
pa.field("id", pa.int64()),
|
pa.field("id", pa.int64()),
|
||||||
pa.field("vector", pa.list_(pa.float32(), 2)),
|
pa.field("vector", pa.list_(pa.float32(), 2)),
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
db.create_table("table1", schema=schema)
|
db.create_table("table1", schema=schema, namespace=["test_ns"])
|
||||||
db.create_table("table2", schema=schema)
|
db.create_table("table2", schema=schema, namespace=["test_ns"])
|
||||||
|
|
||||||
# Verify both tables exist
|
# Verify both tables exist in child namespace
|
||||||
table_names = list(db.table_names())
|
table_names = list(db.table_names(namespace=["test_ns"]))
|
||||||
assert "table1" in table_names
|
assert "table1" in table_names
|
||||||
assert "table2" in table_names
|
assert "table2" in table_names
|
||||||
assert len(table_names) == 2
|
assert len(table_names) == 2
|
||||||
|
|
||||||
# Drop one table
|
# Drop one table
|
||||||
db.drop_table("table1")
|
db.drop_table("table1", namespace=["test_ns"])
|
||||||
|
|
||||||
# Verify only table2 remains
|
# Verify only table2 remains
|
||||||
table_names = list(db.table_names())
|
table_names = list(db.table_names(namespace=["test_ns"]))
|
||||||
assert "table1" not in table_names
|
assert "table1" not in table_names
|
||||||
assert "table2" in table_names
|
assert "table2" in table_names
|
||||||
assert len(table_names) == 1
|
assert len(table_names) == 1
|
||||||
|
|
||||||
# Test that drop_table works without explicit namespace parameter
|
# Drop the second table
|
||||||
db.drop_table("table2")
|
db.drop_table("table2", namespace=["test_ns"])
|
||||||
assert len(list(db.table_names())) == 0
|
assert len(list(db.table_names(namespace=["test_ns"]))) == 0
|
||||||
|
|
||||||
# Should not be able to open dropped table
|
# Should not be able to open dropped table
|
||||||
with pytest.raises(RuntimeError):
|
with pytest.raises(RuntimeError):
|
||||||
db.open_table("table1")
|
db.open_table("table1", namespace=["test_ns"])
|
||||||
|
|
||||||
def test_create_table_with_schema(self):
|
def test_create_table_with_schema(self):
|
||||||
"""Test creating a table with explicit schema through namespace."""
|
"""Test creating a table with explicit schema through namespace."""
|
||||||
db = lancedb.connect_namespace("temp", {"root": self.temp_dir})
|
db = lancedb.connect_namespace("dir", {"root": self.temp_dir})
|
||||||
|
|
||||||
|
# Create a child namespace first
|
||||||
|
db.create_namespace(["test_ns"])
|
||||||
|
|
||||||
# Define schema
|
# Define schema
|
||||||
schema = pa.schema(
|
schema = pa.schema(
|
||||||
@@ -449,9 +149,10 @@ class TestNamespaceConnection:
|
|||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
# Create table with schema
|
# Create table with schema in child namespace
|
||||||
table = db.create_table("test_table", schema=schema)
|
table = db.create_table("test_table", schema=schema, namespace=["test_ns"])
|
||||||
assert table is not None
|
assert table is not None
|
||||||
|
assert table.namespace == ["test_ns"]
|
||||||
|
|
||||||
# Verify schema
|
# Verify schema
|
||||||
table_schema = table.schema
|
table_schema = table.schema
|
||||||
@@ -461,16 +162,19 @@ class TestNamespaceConnection:
|
|||||||
|
|
||||||
def test_rename_table_not_supported(self):
|
def test_rename_table_not_supported(self):
|
||||||
"""Test that rename_table raises NotImplementedError."""
|
"""Test that rename_table raises NotImplementedError."""
|
||||||
db = lancedb.connect_namespace("temp", {"root": self.temp_dir})
|
db = lancedb.connect_namespace("dir", {"root": self.temp_dir})
|
||||||
|
|
||||||
# Create a table
|
# Create a child namespace first
|
||||||
|
db.create_namespace(["test_ns"])
|
||||||
|
|
||||||
|
# Create a table in child namespace
|
||||||
schema = pa.schema(
|
schema = pa.schema(
|
||||||
[
|
[
|
||||||
pa.field("id", pa.int64()),
|
pa.field("id", pa.int64()),
|
||||||
pa.field("vector", pa.list_(pa.float32(), 2)),
|
pa.field("vector", pa.list_(pa.float32(), 2)),
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
db.create_table("old_name", schema=schema)
|
db.create_table("old_name", schema=schema, namespace=["test_ns"])
|
||||||
|
|
||||||
# Rename should raise NotImplementedError
|
# Rename should raise NotImplementedError
|
||||||
with pytest.raises(NotImplementedError, match="rename_table is not supported"):
|
with pytest.raises(NotImplementedError, match="rename_table is not supported"):
|
||||||
@@ -478,9 +182,12 @@ class TestNamespaceConnection:
|
|||||||
|
|
||||||
def test_drop_all_tables(self):
|
def test_drop_all_tables(self):
|
||||||
"""Test dropping all tables through namespace."""
|
"""Test dropping all tables through namespace."""
|
||||||
db = lancedb.connect_namespace("temp", {"root": self.temp_dir})
|
db = lancedb.connect_namespace("dir", {"root": self.temp_dir})
|
||||||
|
|
||||||
# Create multiple tables
|
# Create a child namespace first
|
||||||
|
db.create_namespace(["test_ns"])
|
||||||
|
|
||||||
|
# Create multiple tables in child namespace
|
||||||
schema = pa.schema(
|
schema = pa.schema(
|
||||||
[
|
[
|
||||||
pa.field("id", pa.int64()),
|
pa.field("id", pa.int64()),
|
||||||
@@ -488,27 +195,30 @@ class TestNamespaceConnection:
|
|||||||
]
|
]
|
||||||
)
|
)
|
||||||
for i in range(3):
|
for i in range(3):
|
||||||
db.create_table(f"table{i}", schema=schema)
|
db.create_table(f"table{i}", schema=schema, namespace=["test_ns"])
|
||||||
|
|
||||||
# Verify tables exist
|
# Verify tables exist in child namespace
|
||||||
assert len(list(db.table_names())) == 3
|
assert len(list(db.table_names(namespace=["test_ns"]))) == 3
|
||||||
|
|
||||||
# Drop all tables
|
# Drop all tables in child namespace
|
||||||
db.drop_all_tables()
|
db.drop_all_tables(namespace=["test_ns"])
|
||||||
|
|
||||||
# Verify all tables are gone
|
# Verify all tables are gone from child namespace
|
||||||
assert len(list(db.table_names())) == 0
|
assert len(list(db.table_names(namespace=["test_ns"]))) == 0
|
||||||
|
|
||||||
# Test that table_names works with keyword-only namespace parameter
|
# Test that table_names works with keyword-only namespace parameter
|
||||||
db.create_table("test_table", schema=schema)
|
db.create_table("test_table", schema=schema, namespace=["test_ns"])
|
||||||
result = list(db.table_names(namespace=[]))
|
result = list(db.table_names(namespace=["test_ns"]))
|
||||||
assert "test_table" in result
|
assert "test_table" in result
|
||||||
|
|
||||||
def test_table_operations(self):
|
def test_table_operations(self):
|
||||||
"""Test various table operations through namespace."""
|
"""Test various table operations through namespace."""
|
||||||
db = lancedb.connect_namespace("temp", {"root": self.temp_dir})
|
db = lancedb.connect_namespace("dir", {"root": self.temp_dir})
|
||||||
|
|
||||||
# Create a table with schema
|
# Create a child namespace first
|
||||||
|
db.create_namespace(["test_ns"])
|
||||||
|
|
||||||
|
# Create a table with schema in child namespace
|
||||||
schema = pa.schema(
|
schema = pa.schema(
|
||||||
[
|
[
|
||||||
pa.field("id", pa.int64()),
|
pa.field("id", pa.int64()),
|
||||||
@@ -516,7 +226,7 @@ class TestNamespaceConnection:
|
|||||||
pa.field("text", pa.string()),
|
pa.field("text", pa.string()),
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
table = db.create_table("test_table", schema=schema)
|
table = db.create_table("test_table", schema=schema, namespace=["test_ns"])
|
||||||
|
|
||||||
# Verify empty table was created
|
# Verify empty table was created
|
||||||
result = table.to_pandas()
|
result = table.to_pandas()
|
||||||
@@ -548,7 +258,7 @@ class TestNamespaceConnection:
|
|||||||
# Connect with storage options
|
# Connect with storage options
|
||||||
storage_opts = {"test_option": "test_value"}
|
storage_opts = {"test_option": "test_value"}
|
||||||
db = lancedb.connect_namespace(
|
db = lancedb.connect_namespace(
|
||||||
"temp", {"root": self.temp_dir}, storage_options=storage_opts
|
"dir", {"root": self.temp_dir}, storage_options=storage_opts
|
||||||
)
|
)
|
||||||
|
|
||||||
# Storage options should be preserved
|
# Storage options should be preserved
|
||||||
@@ -566,7 +276,7 @@ class TestNamespaceConnection:
|
|||||||
|
|
||||||
def test_namespace_operations(self):
|
def test_namespace_operations(self):
|
||||||
"""Test namespace management operations."""
|
"""Test namespace management operations."""
|
||||||
db = lancedb.connect_namespace("temp", {"root": self.temp_dir})
|
db = lancedb.connect_namespace("dir", {"root": self.temp_dir})
|
||||||
|
|
||||||
# Initially no namespaces
|
# Initially no namespaces
|
||||||
assert len(list(db.list_namespaces())) == 0
|
assert len(list(db.list_namespaces())) == 0
|
||||||
@@ -617,7 +327,7 @@ class TestNamespaceConnection:
|
|||||||
|
|
||||||
def test_namespace_with_tables_cannot_be_dropped(self):
|
def test_namespace_with_tables_cannot_be_dropped(self):
|
||||||
"""Test that namespaces containing tables cannot be dropped."""
|
"""Test that namespaces containing tables cannot be dropped."""
|
||||||
db = lancedb.connect_namespace("temp", {"root": self.temp_dir})
|
db = lancedb.connect_namespace("dir", {"root": self.temp_dir})
|
||||||
|
|
||||||
# Create namespace and table
|
# Create namespace and table
|
||||||
db.create_namespace(["test_namespace"])
|
db.create_namespace(["test_namespace"])
|
||||||
@@ -630,7 +340,7 @@ class TestNamespaceConnection:
|
|||||||
db.create_table("test_table", schema=schema, namespace=["test_namespace"])
|
db.create_table("test_table", schema=schema, namespace=["test_namespace"])
|
||||||
|
|
||||||
# Try to drop namespace with tables - should fail
|
# Try to drop namespace with tables - should fail
|
||||||
with pytest.raises(RuntimeError, match="contains tables"):
|
with pytest.raises(RuntimeError, match="is not empty"):
|
||||||
db.drop_namespace(["test_namespace"])
|
db.drop_namespace(["test_namespace"])
|
||||||
|
|
||||||
# Drop table first
|
# Drop table first
|
||||||
@@ -640,7 +350,7 @@ class TestNamespaceConnection:
|
|||||||
db.drop_namespace(["test_namespace"])
|
db.drop_namespace(["test_namespace"])
|
||||||
|
|
||||||
def test_same_table_name_different_namespaces(self):
|
def test_same_table_name_different_namespaces(self):
|
||||||
db = lancedb.connect_namespace("temp", {"root": self.temp_dir})
|
db = lancedb.connect_namespace("dir", {"root": self.temp_dir})
|
||||||
|
|
||||||
# Create two namespaces
|
# Create two namespaces
|
||||||
db.create_namespace(["namespace_a"])
|
db.create_namespace(["namespace_a"])
|
||||||
|
|||||||
632
python/python/tests/test_namespace_integration.py
Normal file
632
python/python/tests/test_namespace_integration.py
Normal file
@@ -0,0 +1,632 @@
|
|||||||
|
# SPDX-License-Identifier: Apache-2.0
|
||||||
|
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||||
|
|
||||||
|
"""
|
||||||
|
Integration tests for LanceDB Namespace with S3 and credential refresh.
|
||||||
|
|
||||||
|
This test simulates a namespace server that returns incrementing credentials
|
||||||
|
and verifies that the credential refresh mechanism works correctly for both
|
||||||
|
create_table and open_table operations.
|
||||||
|
|
||||||
|
Tests verify:
|
||||||
|
- Storage options provider is auto-created and used
|
||||||
|
- Credentials are properly cached during reads
|
||||||
|
- Credentials refresh when they expire
|
||||||
|
- Both create and open operations work with credential rotation
|
||||||
|
"""
|
||||||
|
|
||||||
|
import copy
|
||||||
|
import time
|
||||||
|
import uuid
|
||||||
|
from threading import Lock
|
||||||
|
from typing import Dict
|
||||||
|
|
||||||
|
import pyarrow as pa
|
||||||
|
import pytest
|
||||||
|
from lance_namespace import (
|
||||||
|
CreateEmptyTableRequest,
|
||||||
|
CreateEmptyTableResponse,
|
||||||
|
DescribeTableRequest,
|
||||||
|
DescribeTableResponse,
|
||||||
|
LanceNamespace,
|
||||||
|
)
|
||||||
|
from lancedb.namespace import LanceNamespaceDBConnection
|
||||||
|
|
||||||
|
# LocalStack S3 configuration
|
||||||
|
CONFIG = {
|
||||||
|
"allow_http": "true",
|
||||||
|
"aws_access_key_id": "ACCESSKEY",
|
||||||
|
"aws_secret_access_key": "SECRETKEY",
|
||||||
|
"aws_endpoint": "http://localhost:4566",
|
||||||
|
"aws_region": "us-east-1",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def get_boto3_client(*args, **kwargs):
|
||||||
|
import boto3
|
||||||
|
|
||||||
|
return boto3.client(
|
||||||
|
*args,
|
||||||
|
region_name=CONFIG["aws_region"],
|
||||||
|
aws_access_key_id=CONFIG["aws_access_key_id"],
|
||||||
|
aws_secret_access_key=CONFIG["aws_secret_access_key"],
|
||||||
|
**kwargs,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="module")
|
||||||
|
def s3_bucket():
|
||||||
|
"""Create and cleanup S3 bucket for integration tests."""
|
||||||
|
s3 = get_boto3_client("s3", endpoint_url=CONFIG["aws_endpoint"])
|
||||||
|
bucket_name = "lancedb-namespace-integtest"
|
||||||
|
|
||||||
|
# Clean up existing bucket if it exists
|
||||||
|
try:
|
||||||
|
delete_bucket(s3, bucket_name)
|
||||||
|
except s3.exceptions.NoSuchBucket:
|
||||||
|
pass
|
||||||
|
|
||||||
|
s3.create_bucket(Bucket=bucket_name)
|
||||||
|
yield bucket_name
|
||||||
|
|
||||||
|
# Cleanup after tests
|
||||||
|
delete_bucket(s3, bucket_name)
|
||||||
|
|
||||||
|
|
||||||
|
def delete_bucket(s3, bucket_name):
|
||||||
|
"""Delete S3 bucket and all its contents."""
|
||||||
|
try:
|
||||||
|
# Delete all objects first
|
||||||
|
paginator = s3.get_paginator("list_objects_v2")
|
||||||
|
for page in paginator.paginate(Bucket=bucket_name):
|
||||||
|
if "Contents" in page:
|
||||||
|
for obj in page["Contents"]:
|
||||||
|
s3.delete_object(Bucket=bucket_name, Key=obj["Key"])
|
||||||
|
s3.delete_bucket(Bucket=bucket_name)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class TrackingNamespace(LanceNamespace):
|
||||||
|
"""
|
||||||
|
Mock namespace that wraps DirectoryNamespace and tracks API calls.
|
||||||
|
|
||||||
|
This namespace returns incrementing credentials with each API call to simulate
|
||||||
|
credential rotation. It also tracks the number of times each API is called
|
||||||
|
to verify caching behavior.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
bucket_name: str,
|
||||||
|
storage_options: Dict[str, str],
|
||||||
|
credential_expires_in_seconds: int = 60,
|
||||||
|
):
|
||||||
|
from lance.namespace import DirectoryNamespace
|
||||||
|
|
||||||
|
self.bucket_name = bucket_name
|
||||||
|
self.base_storage_options = storage_options
|
||||||
|
self.credential_expires_in_seconds = credential_expires_in_seconds
|
||||||
|
self.describe_call_count = 0
|
||||||
|
self.create_call_count = 0
|
||||||
|
self.lock = Lock()
|
||||||
|
|
||||||
|
# Create underlying DirectoryNamespace with storage options
|
||||||
|
dir_props = {f"storage.{k}": v for k, v in storage_options.items()}
|
||||||
|
|
||||||
|
# Use S3 path for bucket name, local path for file paths
|
||||||
|
if bucket_name.startswith("/") or bucket_name.startswith("file://"):
|
||||||
|
dir_props["root"] = f"{bucket_name}/namespace_root"
|
||||||
|
else:
|
||||||
|
dir_props["root"] = f"s3://{bucket_name}/namespace_root"
|
||||||
|
|
||||||
|
self.inner = DirectoryNamespace(**dir_props)
|
||||||
|
|
||||||
|
def get_describe_call_count(self) -> int:
|
||||||
|
"""Thread-safe getter for describe call count."""
|
||||||
|
with self.lock:
|
||||||
|
return self.describe_call_count
|
||||||
|
|
||||||
|
def get_create_call_count(self) -> int:
|
||||||
|
"""Thread-safe getter for create call count."""
|
||||||
|
with self.lock:
|
||||||
|
return self.create_call_count
|
||||||
|
|
||||||
|
def namespace_id(self) -> str:
|
||||||
|
"""Return namespace identifier."""
|
||||||
|
return f"TrackingNamespace {{ inner: {self.inner.namespace_id()} }}"
|
||||||
|
|
||||||
|
def _modify_storage_options(
|
||||||
|
self, storage_options: Dict[str, str], count: int
|
||||||
|
) -> Dict[str, str]:
|
||||||
|
"""
|
||||||
|
Add incrementing credentials with expiration timestamp.
|
||||||
|
|
||||||
|
This simulates a credential rotation system where each call returns
|
||||||
|
new credentials that expire after credential_expires_in_seconds.
|
||||||
|
"""
|
||||||
|
modified = copy.deepcopy(storage_options) if storage_options else {}
|
||||||
|
|
||||||
|
# Increment credentials to simulate rotation
|
||||||
|
modified["aws_access_key_id"] = f"AKID_{count}"
|
||||||
|
modified["aws_secret_access_key"] = f"SECRET_{count}"
|
||||||
|
modified["aws_session_token"] = f"TOKEN_{count}"
|
||||||
|
|
||||||
|
# Set expiration time
|
||||||
|
expires_at_millis = int(
|
||||||
|
(time.time() + self.credential_expires_in_seconds) * 1000
|
||||||
|
)
|
||||||
|
modified["expires_at_millis"] = str(expires_at_millis)
|
||||||
|
|
||||||
|
return modified
|
||||||
|
|
||||||
|
def create_empty_table(
|
||||||
|
self, request: CreateEmptyTableRequest
|
||||||
|
) -> CreateEmptyTableResponse:
|
||||||
|
"""Track create_empty_table calls and inject rotating credentials."""
|
||||||
|
with self.lock:
|
||||||
|
self.create_call_count += 1
|
||||||
|
count = self.create_call_count
|
||||||
|
|
||||||
|
response = self.inner.create_empty_table(request)
|
||||||
|
response.storage_options = self._modify_storage_options(
|
||||||
|
response.storage_options, count
|
||||||
|
)
|
||||||
|
|
||||||
|
return response
|
||||||
|
|
||||||
|
def describe_table(self, request: DescribeTableRequest) -> DescribeTableResponse:
|
||||||
|
"""Track describe_table calls and inject rotating credentials."""
|
||||||
|
with self.lock:
|
||||||
|
self.describe_call_count += 1
|
||||||
|
count = self.describe_call_count
|
||||||
|
|
||||||
|
response = self.inner.describe_table(request)
|
||||||
|
response.storage_options = self._modify_storage_options(
|
||||||
|
response.storage_options, count
|
||||||
|
)
|
||||||
|
|
||||||
|
return response
|
||||||
|
|
||||||
|
# Pass through other methods to inner namespace
|
||||||
|
def list_tables(self, request):
|
||||||
|
return self.inner.list_tables(request)
|
||||||
|
|
||||||
|
def drop_table(self, request):
|
||||||
|
return self.inner.drop_table(request)
|
||||||
|
|
||||||
|
def list_namespaces(self, request):
|
||||||
|
return self.inner.list_namespaces(request)
|
||||||
|
|
||||||
|
def create_namespace(self, request):
|
||||||
|
return self.inner.create_namespace(request)
|
||||||
|
|
||||||
|
def drop_namespace(self, request):
|
||||||
|
return self.inner.drop_namespace(request)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.s3_test
|
||||||
|
def test_namespace_create_table_with_provider(s3_bucket: str):
|
||||||
|
"""
|
||||||
|
Test creating a table through namespace with storage options provider.
|
||||||
|
|
||||||
|
Verifies:
|
||||||
|
- create_empty_table is called once to reserve location
|
||||||
|
- Storage options provider is auto-created
|
||||||
|
- Table can be written successfully
|
||||||
|
- Credentials are cached during write operations
|
||||||
|
"""
|
||||||
|
storage_options = copy.deepcopy(CONFIG)
|
||||||
|
|
||||||
|
namespace = TrackingNamespace(
|
||||||
|
bucket_name=s3_bucket,
|
||||||
|
storage_options=storage_options,
|
||||||
|
credential_expires_in_seconds=3600, # 1 hour
|
||||||
|
)
|
||||||
|
|
||||||
|
db = LanceNamespaceDBConnection(namespace)
|
||||||
|
|
||||||
|
# Create unique namespace for this test
|
||||||
|
namespace_name = f"test_ns_{uuid.uuid4().hex[:8]}"
|
||||||
|
db.create_namespace([namespace_name])
|
||||||
|
|
||||||
|
table_name = f"test_table_{uuid.uuid4().hex}"
|
||||||
|
namespace_path = [namespace_name]
|
||||||
|
|
||||||
|
# Verify initial state
|
||||||
|
assert namespace.get_create_call_count() == 0
|
||||||
|
assert namespace.get_describe_call_count() == 0
|
||||||
|
|
||||||
|
# Create table with data
|
||||||
|
data = pa.table(
|
||||||
|
{
|
||||||
|
"id": [1, 2, 3],
|
||||||
|
"vector": [[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]],
|
||||||
|
"text": ["hello", "world", "test"],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
table = db.create_table(table_name, data, namespace=namespace_path)
|
||||||
|
|
||||||
|
# Verify create_empty_table was called exactly once
|
||||||
|
assert namespace.get_create_call_count() == 1
|
||||||
|
# describe_table should NOT be called during create in create mode
|
||||||
|
assert namespace.get_describe_call_count() == 0
|
||||||
|
|
||||||
|
# Verify table was created successfully
|
||||||
|
assert table.name == table_name
|
||||||
|
result = table.to_pandas()
|
||||||
|
assert len(result) == 3
|
||||||
|
assert list(result["id"]) == [1, 2, 3]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.s3_test
|
||||||
|
def test_namespace_open_table_with_provider(s3_bucket: str):
|
||||||
|
"""
|
||||||
|
Test opening a table through namespace with storage options provider.
|
||||||
|
|
||||||
|
Verifies:
|
||||||
|
- describe_table is called once when opening
|
||||||
|
- Storage options provider is auto-created
|
||||||
|
- Table can be read successfully
|
||||||
|
- Credentials are cached during read operations
|
||||||
|
"""
|
||||||
|
storage_options = copy.deepcopy(CONFIG)
|
||||||
|
|
||||||
|
namespace = TrackingNamespace(
|
||||||
|
bucket_name=s3_bucket,
|
||||||
|
storage_options=storage_options,
|
||||||
|
credential_expires_in_seconds=3600,
|
||||||
|
)
|
||||||
|
|
||||||
|
db = LanceNamespaceDBConnection(namespace)
|
||||||
|
|
||||||
|
# Create unique namespace for this test
|
||||||
|
namespace_name = f"test_ns_{uuid.uuid4().hex[:8]}"
|
||||||
|
db.create_namespace([namespace_name])
|
||||||
|
|
||||||
|
table_name = f"test_table_{uuid.uuid4().hex}"
|
||||||
|
namespace_path = [namespace_name]
|
||||||
|
|
||||||
|
# Create table first
|
||||||
|
data = pa.table(
|
||||||
|
{
|
||||||
|
"id": [1, 2, 3, 4, 5],
|
||||||
|
"vector": [[1.0, 2.0], [3.0, 4.0], [5.0, 6.0], [7.0, 8.0], [9.0, 10.0]],
|
||||||
|
"value": [10, 20, 30, 40, 50],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
db.create_table(table_name, data, namespace=namespace_path)
|
||||||
|
|
||||||
|
initial_create_count = namespace.get_create_call_count()
|
||||||
|
assert initial_create_count == 1
|
||||||
|
|
||||||
|
# Open the table
|
||||||
|
opened_table = db.open_table(table_name, namespace=namespace_path)
|
||||||
|
|
||||||
|
# Verify describe_table was called exactly once
|
||||||
|
assert namespace.get_describe_call_count() == 1
|
||||||
|
# create_empty_table should not be called again
|
||||||
|
assert namespace.get_create_call_count() == initial_create_count
|
||||||
|
|
||||||
|
# Perform multiple read operations
|
||||||
|
describe_count_after_open = namespace.get_describe_call_count()
|
||||||
|
|
||||||
|
for _ in range(3):
|
||||||
|
result = opened_table.to_pandas()
|
||||||
|
assert len(result) == 5
|
||||||
|
count = opened_table.count_rows()
|
||||||
|
assert count == 5
|
||||||
|
|
||||||
|
# Verify credentials were cached (no additional describe_table calls)
|
||||||
|
assert namespace.get_describe_call_count() == describe_count_after_open
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.s3_test
|
||||||
|
def test_namespace_credential_refresh_on_read(s3_bucket: str):
|
||||||
|
"""
|
||||||
|
Test credential refresh when credentials expire during read operations.
|
||||||
|
|
||||||
|
Verifies:
|
||||||
|
- Credentials are cached initially (no additional describe_table calls)
|
||||||
|
- After expiration, credentials are refreshed (describe_table called again)
|
||||||
|
- Read operations continue to work with refreshed credentials
|
||||||
|
"""
|
||||||
|
storage_options = copy.deepcopy(CONFIG)
|
||||||
|
|
||||||
|
namespace = TrackingNamespace(
|
||||||
|
bucket_name=s3_bucket,
|
||||||
|
storage_options=storage_options,
|
||||||
|
credential_expires_in_seconds=3, # Short expiration for testing
|
||||||
|
)
|
||||||
|
|
||||||
|
db = LanceNamespaceDBConnection(namespace)
|
||||||
|
|
||||||
|
# Create unique namespace for this test
|
||||||
|
namespace_name = f"test_ns_{uuid.uuid4().hex[:8]}"
|
||||||
|
db.create_namespace([namespace_name])
|
||||||
|
|
||||||
|
table_name = f"test_table_{uuid.uuid4().hex}"
|
||||||
|
namespace_path = [namespace_name]
|
||||||
|
|
||||||
|
# Create table
|
||||||
|
data = pa.table(
|
||||||
|
{
|
||||||
|
"id": [1, 2, 3],
|
||||||
|
"vector": [[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
db.create_table(table_name, data, namespace=namespace_path)
|
||||||
|
|
||||||
|
# Open table (triggers describe_table)
|
||||||
|
opened_table = db.open_table(table_name, namespace=namespace_path)
|
||||||
|
|
||||||
|
# Perform an immediate read (should use credentials from open)
|
||||||
|
result = opened_table.to_pandas()
|
||||||
|
assert len(result) == 3
|
||||||
|
|
||||||
|
describe_count_after_first_read = namespace.get_describe_call_count()
|
||||||
|
|
||||||
|
# Wait for credentials to expire (3 seconds + buffer)
|
||||||
|
time.sleep(5)
|
||||||
|
|
||||||
|
# Perform read after expiration (should trigger credential refresh)
|
||||||
|
result = opened_table.to_pandas()
|
||||||
|
assert len(result) == 3
|
||||||
|
|
||||||
|
describe_count_after_refresh = namespace.get_describe_call_count()
|
||||||
|
# Verify describe_table was called again (credential refresh)
|
||||||
|
refresh_delta = describe_count_after_refresh - describe_count_after_first_read
|
||||||
|
|
||||||
|
# Verify the exact count: credential refresh should call describe_table exactly
|
||||||
|
# once
|
||||||
|
assert refresh_delta == 1, (
|
||||||
|
f"Credential refresh should call describe_table exactly once "
|
||||||
|
f"(got {refresh_delta})"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.s3_test
|
||||||
|
def test_namespace_credential_refresh_on_write(s3_bucket: str):
|
||||||
|
"""
|
||||||
|
Test credential refresh when credentials expire during write operations.
|
||||||
|
|
||||||
|
Verifies:
|
||||||
|
- Credentials are cached during initial writes
|
||||||
|
- After expiration, new credentials are fetched before writes
|
||||||
|
- Write operations continue to work with refreshed credentials
|
||||||
|
"""
|
||||||
|
storage_options = copy.deepcopy(CONFIG)
|
||||||
|
|
||||||
|
namespace = TrackingNamespace(
|
||||||
|
bucket_name=s3_bucket,
|
||||||
|
storage_options=storage_options,
|
||||||
|
credential_expires_in_seconds=3, # Short expiration
|
||||||
|
)
|
||||||
|
|
||||||
|
db = LanceNamespaceDBConnection(namespace)
|
||||||
|
|
||||||
|
# Create unique namespace for this test
|
||||||
|
namespace_name = f"test_ns_{uuid.uuid4().hex[:8]}"
|
||||||
|
db.create_namespace([namespace_name])
|
||||||
|
|
||||||
|
table_name = f"test_table_{uuid.uuid4().hex}"
|
||||||
|
namespace_path = [namespace_name]
|
||||||
|
|
||||||
|
# Create table
|
||||||
|
initial_data = pa.table(
|
||||||
|
{
|
||||||
|
"id": [1, 2],
|
||||||
|
"vector": [[1.0, 2.0], [3.0, 4.0]],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
table = db.create_table(table_name, initial_data, namespace=namespace_path)
|
||||||
|
|
||||||
|
# Add more data (should use cached credentials)
|
||||||
|
new_data = pa.table(
|
||||||
|
{
|
||||||
|
"id": [3, 4],
|
||||||
|
"vector": [[5.0, 6.0], [7.0, 8.0]],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
table.add(new_data)
|
||||||
|
|
||||||
|
# Wait for credentials to expire
|
||||||
|
time.sleep(5)
|
||||||
|
|
||||||
|
# Add more data (should trigger credential refresh)
|
||||||
|
more_data = pa.table(
|
||||||
|
{
|
||||||
|
"id": [5, 6],
|
||||||
|
"vector": [[9.0, 10.0], [11.0, 12.0]],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
table.add(more_data)
|
||||||
|
|
||||||
|
# Verify final row count
|
||||||
|
assert table.count_rows() == 6
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.s3_test
|
||||||
|
def test_namespace_overwrite_mode(s3_bucket: str):
|
||||||
|
"""
|
||||||
|
Test creating table in overwrite mode with credential tracking.
|
||||||
|
|
||||||
|
Verifies:
|
||||||
|
- First create calls create_empty_table exactly once
|
||||||
|
- Overwrite mode calls describe_table exactly once to check existence
|
||||||
|
- Storage options provider works in overwrite mode
|
||||||
|
"""
|
||||||
|
storage_options = copy.deepcopy(CONFIG)
|
||||||
|
|
||||||
|
namespace = TrackingNamespace(
|
||||||
|
bucket_name=s3_bucket,
|
||||||
|
storage_options=storage_options,
|
||||||
|
credential_expires_in_seconds=3600,
|
||||||
|
)
|
||||||
|
|
||||||
|
db = LanceNamespaceDBConnection(namespace)
|
||||||
|
|
||||||
|
# Create unique namespace for this test
|
||||||
|
namespace_name = f"test_ns_{uuid.uuid4().hex[:8]}"
|
||||||
|
db.create_namespace([namespace_name])
|
||||||
|
|
||||||
|
table_name = f"test_table_{uuid.uuid4().hex}"
|
||||||
|
namespace_path = [namespace_name]
|
||||||
|
|
||||||
|
# Create initial table
|
||||||
|
data1 = pa.table(
|
||||||
|
{
|
||||||
|
"id": [1, 2],
|
||||||
|
"vector": [[1.0, 2.0], [3.0, 4.0]],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
table = db.create_table(table_name, data1, namespace=namespace_path)
|
||||||
|
# Exactly one create_empty_table call for initial create
|
||||||
|
assert namespace.get_create_call_count() == 1
|
||||||
|
# No describe_table calls in create mode
|
||||||
|
assert namespace.get_describe_call_count() == 0
|
||||||
|
assert table.count_rows() == 2
|
||||||
|
|
||||||
|
# Overwrite the table
|
||||||
|
data2 = pa.table(
|
||||||
|
{
|
||||||
|
"id": [10, 20, 30],
|
||||||
|
"vector": [[10.0, 20.0], [30.0, 40.0], [50.0, 60.0]],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
table2 = db.create_table(
|
||||||
|
table_name, data2, namespace=namespace_path, mode="overwrite"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Should still have only 1 create_empty_table call
|
||||||
|
# (overwrite reuses location from describe_table)
|
||||||
|
assert namespace.get_create_call_count() == 1
|
||||||
|
# Should have called describe_table exactly once to get existing table location
|
||||||
|
assert namespace.get_describe_call_count() == 1
|
||||||
|
|
||||||
|
# Verify new data
|
||||||
|
assert table2.count_rows() == 3
|
||||||
|
result = table2.to_pandas()
|
||||||
|
assert list(result["id"]) == [10, 20, 30]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.s3_test
|
||||||
|
def test_namespace_multiple_tables(s3_bucket: str):
|
||||||
|
"""
|
||||||
|
Test creating and opening multiple tables in the same namespace.
|
||||||
|
|
||||||
|
Verifies:
|
||||||
|
- Each table gets its own storage options provider
|
||||||
|
- Credentials are tracked independently per table
|
||||||
|
- Multiple tables can coexist in the same namespace
|
||||||
|
"""
|
||||||
|
storage_options = copy.deepcopy(CONFIG)
|
||||||
|
|
||||||
|
namespace = TrackingNamespace(
|
||||||
|
bucket_name=s3_bucket,
|
||||||
|
storage_options=storage_options,
|
||||||
|
credential_expires_in_seconds=3600,
|
||||||
|
)
|
||||||
|
|
||||||
|
db = LanceNamespaceDBConnection(namespace)
|
||||||
|
|
||||||
|
# Create unique namespace for this test
|
||||||
|
namespace_name = f"test_ns_{uuid.uuid4().hex[:8]}"
|
||||||
|
db.create_namespace([namespace_name])
|
||||||
|
namespace_path = [namespace_name]
|
||||||
|
|
||||||
|
# Create first table
|
||||||
|
table1_name = f"table1_{uuid.uuid4().hex}"
|
||||||
|
data1 = pa.table({"id": [1, 2], "value": [10, 20]})
|
||||||
|
db.create_table(table1_name, data1, namespace=namespace_path)
|
||||||
|
|
||||||
|
# Create second table
|
||||||
|
table2_name = f"table2_{uuid.uuid4().hex}"
|
||||||
|
data2 = pa.table({"id": [3, 4], "value": [30, 40]})
|
||||||
|
db.create_table(table2_name, data2, namespace=namespace_path)
|
||||||
|
|
||||||
|
# Should have 2 create calls (one per table)
|
||||||
|
assert namespace.get_create_call_count() == 2
|
||||||
|
|
||||||
|
# Open both tables
|
||||||
|
opened1 = db.open_table(table1_name, namespace=namespace_path)
|
||||||
|
opened2 = db.open_table(table2_name, namespace=namespace_path)
|
||||||
|
|
||||||
|
# Should have 2 describe calls (one per open)
|
||||||
|
assert namespace.get_describe_call_count() == 2
|
||||||
|
|
||||||
|
# Verify both tables work independently
|
||||||
|
assert opened1.count_rows() == 2
|
||||||
|
assert opened2.count_rows() == 2
|
||||||
|
|
||||||
|
result1 = opened1.to_pandas()
|
||||||
|
result2 = opened2.to_pandas()
|
||||||
|
|
||||||
|
assert list(result1["id"]) == [1, 2]
|
||||||
|
assert list(result2["id"]) == [3, 4]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.s3_test
|
||||||
|
def test_namespace_with_schema_only(s3_bucket: str):
|
||||||
|
"""
|
||||||
|
Test creating empty table with schema only (no data).
|
||||||
|
|
||||||
|
Verifies:
|
||||||
|
- Empty table creation works with storage options provider
|
||||||
|
- describe_table is NOT called during create
|
||||||
|
- Data can be added later
|
||||||
|
"""
|
||||||
|
storage_options = copy.deepcopy(CONFIG)
|
||||||
|
|
||||||
|
namespace = TrackingNamespace(
|
||||||
|
bucket_name=s3_bucket,
|
||||||
|
storage_options=storage_options,
|
||||||
|
credential_expires_in_seconds=3600,
|
||||||
|
)
|
||||||
|
|
||||||
|
db = LanceNamespaceDBConnection(namespace)
|
||||||
|
|
||||||
|
# Create unique namespace for this test
|
||||||
|
namespace_name = f"test_ns_{uuid.uuid4().hex[:8]}"
|
||||||
|
db.create_namespace([namespace_name])
|
||||||
|
|
||||||
|
table_name = f"test_table_{uuid.uuid4().hex}"
|
||||||
|
namespace_path = [namespace_name]
|
||||||
|
|
||||||
|
# Create empty table with schema
|
||||||
|
schema = pa.schema(
|
||||||
|
[
|
||||||
|
pa.field("id", pa.int64()),
|
||||||
|
pa.field("vector", pa.list_(pa.float32(), 2)),
|
||||||
|
pa.field("text", pa.utf8()),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
table = db.create_table(table_name, schema=schema, namespace=namespace_path)
|
||||||
|
|
||||||
|
# Should have called create_empty_table once
|
||||||
|
assert namespace.get_create_call_count() == 1
|
||||||
|
# Should NOT have called describe_table in create mode
|
||||||
|
assert namespace.get_describe_call_count() == 0
|
||||||
|
|
||||||
|
# Verify empty table
|
||||||
|
assert table.count_rows() == 0
|
||||||
|
|
||||||
|
# Add data
|
||||||
|
data = pa.table(
|
||||||
|
{
|
||||||
|
"id": [1, 2],
|
||||||
|
"vector": [[1.0, 2.0], [3.0, 4.0]],
|
||||||
|
"text": ["hello", "world"],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
table.add(data)
|
||||||
|
|
||||||
|
# Verify data was added
|
||||||
|
assert table.count_rows() == 2
|
||||||
@@ -10,11 +10,14 @@ use lancedb::{
|
|||||||
};
|
};
|
||||||
use pyo3::{
|
use pyo3::{
|
||||||
exceptions::{PyRuntimeError, PyValueError},
|
exceptions::{PyRuntimeError, PyValueError},
|
||||||
pyclass, pyfunction, pymethods, Bound, FromPyObject, Py, PyAny, PyRef, PyResult, Python,
|
pyclass, pyfunction, pymethods, Bound, FromPyObject, Py, PyAny, PyObject, PyRef, PyResult,
|
||||||
|
Python,
|
||||||
};
|
};
|
||||||
use pyo3_async_runtimes::tokio::future_into_py;
|
use pyo3_async_runtimes::tokio::future_into_py;
|
||||||
|
|
||||||
use crate::{error::PythonErrorExt, table::Table};
|
use crate::{
|
||||||
|
error::PythonErrorExt, storage_options::py_object_to_storage_options_provider, table::Table,
|
||||||
|
};
|
||||||
|
|
||||||
#[pyclass]
|
#[pyclass]
|
||||||
pub struct Connection {
|
pub struct Connection {
|
||||||
@@ -101,7 +104,8 @@ impl Connection {
|
|||||||
future_into_py(self_.py(), async move { op.execute().await.infer_error() })
|
future_into_py(self_.py(), async move { op.execute().await.infer_error() })
|
||||||
}
|
}
|
||||||
|
|
||||||
#[pyo3(signature = (name, mode, data, namespace=vec![], storage_options=None))]
|
#[allow(clippy::too_many_arguments)]
|
||||||
|
#[pyo3(signature = (name, mode, data, namespace=vec![], storage_options=None, storage_options_provider=None, location=None))]
|
||||||
pub fn create_table<'a>(
|
pub fn create_table<'a>(
|
||||||
self_: PyRef<'a, Self>,
|
self_: PyRef<'a, Self>,
|
||||||
name: String,
|
name: String,
|
||||||
@@ -109,6 +113,8 @@ impl Connection {
|
|||||||
data: Bound<'_, PyAny>,
|
data: Bound<'_, PyAny>,
|
||||||
namespace: Vec<String>,
|
namespace: Vec<String>,
|
||||||
storage_options: Option<HashMap<String, String>>,
|
storage_options: Option<HashMap<String, String>>,
|
||||||
|
storage_options_provider: Option<PyObject>,
|
||||||
|
location: Option<String>,
|
||||||
) -> PyResult<Bound<'a, PyAny>> {
|
) -> PyResult<Bound<'a, PyAny>> {
|
||||||
let inner = self_.get_inner()?.clone();
|
let inner = self_.get_inner()?.clone();
|
||||||
|
|
||||||
@@ -122,6 +128,13 @@ impl Connection {
|
|||||||
if let Some(storage_options) = storage_options {
|
if let Some(storage_options) = storage_options {
|
||||||
builder = builder.storage_options(storage_options);
|
builder = builder.storage_options(storage_options);
|
||||||
}
|
}
|
||||||
|
if let Some(provider_obj) = storage_options_provider {
|
||||||
|
let provider = py_object_to_storage_options_provider(provider_obj)?;
|
||||||
|
builder = builder.storage_options_provider(provider);
|
||||||
|
}
|
||||||
|
if let Some(location) = location {
|
||||||
|
builder = builder.location(location);
|
||||||
|
}
|
||||||
|
|
||||||
future_into_py(self_.py(), async move {
|
future_into_py(self_.py(), async move {
|
||||||
let table = builder.execute().await.infer_error()?;
|
let table = builder.execute().await.infer_error()?;
|
||||||
@@ -129,7 +142,8 @@ impl Connection {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
#[pyo3(signature = (name, mode, schema, namespace=vec![], storage_options=None))]
|
#[allow(clippy::too_many_arguments)]
|
||||||
|
#[pyo3(signature = (name, mode, schema, namespace=vec![], storage_options=None, storage_options_provider=None, location=None))]
|
||||||
pub fn create_empty_table<'a>(
|
pub fn create_empty_table<'a>(
|
||||||
self_: PyRef<'a, Self>,
|
self_: PyRef<'a, Self>,
|
||||||
name: String,
|
name: String,
|
||||||
@@ -137,6 +151,8 @@ impl Connection {
|
|||||||
schema: Bound<'_, PyAny>,
|
schema: Bound<'_, PyAny>,
|
||||||
namespace: Vec<String>,
|
namespace: Vec<String>,
|
||||||
storage_options: Option<HashMap<String, String>>,
|
storage_options: Option<HashMap<String, String>>,
|
||||||
|
storage_options_provider: Option<PyObject>,
|
||||||
|
location: Option<String>,
|
||||||
) -> PyResult<Bound<'a, PyAny>> {
|
) -> PyResult<Bound<'a, PyAny>> {
|
||||||
let inner = self_.get_inner()?.clone();
|
let inner = self_.get_inner()?.clone();
|
||||||
|
|
||||||
@@ -150,6 +166,13 @@ impl Connection {
|
|||||||
if let Some(storage_options) = storage_options {
|
if let Some(storage_options) = storage_options {
|
||||||
builder = builder.storage_options(storage_options);
|
builder = builder.storage_options(storage_options);
|
||||||
}
|
}
|
||||||
|
if let Some(provider_obj) = storage_options_provider {
|
||||||
|
let provider = py_object_to_storage_options_provider(provider_obj)?;
|
||||||
|
builder = builder.storage_options_provider(provider);
|
||||||
|
}
|
||||||
|
if let Some(location) = location {
|
||||||
|
builder = builder.location(location);
|
||||||
|
}
|
||||||
|
|
||||||
future_into_py(self_.py(), async move {
|
future_into_py(self_.py(), async move {
|
||||||
let table = builder.execute().await.infer_error()?;
|
let table = builder.execute().await.infer_error()?;
|
||||||
@@ -157,13 +180,15 @@ impl Connection {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
#[pyo3(signature = (name, namespace=vec![], storage_options = None, index_cache_size = None))]
|
#[pyo3(signature = (name, namespace=vec![], storage_options = None, storage_options_provider=None, index_cache_size = None, location=None))]
|
||||||
pub fn open_table(
|
pub fn open_table(
|
||||||
self_: PyRef<'_, Self>,
|
self_: PyRef<'_, Self>,
|
||||||
name: String,
|
name: String,
|
||||||
namespace: Vec<String>,
|
namespace: Vec<String>,
|
||||||
storage_options: Option<HashMap<String, String>>,
|
storage_options: Option<HashMap<String, String>>,
|
||||||
|
storage_options_provider: Option<PyObject>,
|
||||||
index_cache_size: Option<u32>,
|
index_cache_size: Option<u32>,
|
||||||
|
location: Option<String>,
|
||||||
) -> PyResult<Bound<'_, PyAny>> {
|
) -> PyResult<Bound<'_, PyAny>> {
|
||||||
let inner = self_.get_inner()?.clone();
|
let inner = self_.get_inner()?.clone();
|
||||||
|
|
||||||
@@ -172,9 +197,16 @@ impl Connection {
|
|||||||
if let Some(storage_options) = storage_options {
|
if let Some(storage_options) = storage_options {
|
||||||
builder = builder.storage_options(storage_options);
|
builder = builder.storage_options(storage_options);
|
||||||
}
|
}
|
||||||
|
if let Some(provider_obj) = storage_options_provider {
|
||||||
|
let provider = py_object_to_storage_options_provider(provider_obj)?;
|
||||||
|
builder = builder.storage_options_provider(provider);
|
||||||
|
}
|
||||||
if let Some(index_cache_size) = index_cache_size {
|
if let Some(index_cache_size) = index_cache_size {
|
||||||
builder = builder.index_cache_size(index_cache_size);
|
builder = builder.index_cache_size(index_cache_size);
|
||||||
}
|
}
|
||||||
|
if let Some(location) = location {
|
||||||
|
builder = builder.location(location);
|
||||||
|
}
|
||||||
|
|
||||||
future_into_py(self_.py(), async move {
|
future_into_py(self_.py(), async move {
|
||||||
let table = builder.execute().await.infer_error()?;
|
let table = builder.execute().await.infer_error()?;
|
||||||
|
|||||||
@@ -26,6 +26,7 @@ pub mod index;
|
|||||||
pub mod permutation;
|
pub mod permutation;
|
||||||
pub mod query;
|
pub mod query;
|
||||||
pub mod session;
|
pub mod session;
|
||||||
|
pub mod storage_options;
|
||||||
pub mod table;
|
pub mod table;
|
||||||
pub mod util;
|
pub mod util;
|
||||||
|
|
||||||
|
|||||||
150
python/src/storage_options.rs
Normal file
150
python/src/storage_options.rs
Normal file
@@ -0,0 +1,150 @@
|
|||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||||
|
|
||||||
|
//! PyO3 bindings for StorageOptionsProvider
|
||||||
|
//!
|
||||||
|
//! This module provides the bridge between Python StorageOptionsProvider objects
|
||||||
|
//! and Rust's StorageOptionsProvider trait, enabling automatic credential refresh.
|
||||||
|
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use lance_io::object_store::StorageOptionsProvider;
|
||||||
|
use pyo3::prelude::*;
|
||||||
|
use pyo3::types::PyDict;
|
||||||
|
|
||||||
|
/// Internal wrapper around a Python object implementing StorageOptionsProvider
|
||||||
|
pub struct PyStorageOptionsProvider {
|
||||||
|
/// The Python object implementing fetch_storage_options()
|
||||||
|
inner: PyObject,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Clone for PyStorageOptionsProvider {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Python::with_gil(|py| Self {
|
||||||
|
inner: self.inner.clone_ref(py),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PyStorageOptionsProvider {
|
||||||
|
pub fn new(obj: PyObject) -> PyResult<Self> {
|
||||||
|
Python::with_gil(|py| {
|
||||||
|
// Verify the object has a fetch_storage_options method
|
||||||
|
if !obj.bind(py).hasattr("fetch_storage_options")? {
|
||||||
|
return Err(pyo3::exceptions::PyTypeError::new_err(
|
||||||
|
"StorageOptionsProvider must implement fetch_storage_options() method",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
Ok(Self { inner: obj })
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Wrapper that implements the Rust StorageOptionsProvider trait
|
||||||
|
pub struct PyStorageOptionsProviderWrapper {
|
||||||
|
py_provider: PyStorageOptionsProvider,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PyStorageOptionsProviderWrapper {
|
||||||
|
pub fn new(py_provider: PyStorageOptionsProvider) -> Self {
|
||||||
|
Self { py_provider }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl StorageOptionsProvider for PyStorageOptionsProviderWrapper {
|
||||||
|
async fn fetch_storage_options(&self) -> lance_core::Result<Option<HashMap<String, String>>> {
|
||||||
|
// Call Python method from async context using spawn_blocking
|
||||||
|
let py_provider = self.py_provider.clone();
|
||||||
|
|
||||||
|
tokio::task::spawn_blocking(move || {
|
||||||
|
Python::with_gil(|py| {
|
||||||
|
// Call the Python fetch_storage_options method
|
||||||
|
let result = py_provider
|
||||||
|
.inner
|
||||||
|
.bind(py)
|
||||||
|
.call_method0("fetch_storage_options")
|
||||||
|
.map_err(|e| lance_core::Error::IO {
|
||||||
|
source: Box::new(std::io::Error::other(format!(
|
||||||
|
"Failed to call fetch_storage_options: {}",
|
||||||
|
e
|
||||||
|
))),
|
||||||
|
location: snafu::location!(),
|
||||||
|
})?;
|
||||||
|
|
||||||
|
// If result is None, return None
|
||||||
|
if result.is_none() {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Extract the result dict - should be a flat Map<String, String>
|
||||||
|
let result_dict = result.downcast::<PyDict>().map_err(|_| {
|
||||||
|
lance_core::Error::InvalidInput {
|
||||||
|
source: "fetch_storage_options() must return None or a dict of string key-value pairs".into(),
|
||||||
|
location: snafu::location!(),
|
||||||
|
}
|
||||||
|
})?;
|
||||||
|
|
||||||
|
// Convert all entries to HashMap<String, String>
|
||||||
|
let mut storage_options = HashMap::new();
|
||||||
|
for (key, value) in result_dict.iter() {
|
||||||
|
let key_str: String = key.extract().map_err(|e| {
|
||||||
|
lance_core::Error::InvalidInput {
|
||||||
|
source: format!("Storage option key must be a string: {}", e).into(),
|
||||||
|
location: snafu::location!(),
|
||||||
|
}
|
||||||
|
})?;
|
||||||
|
let value_str: String = value.extract().map_err(|e| {
|
||||||
|
lance_core::Error::InvalidInput {
|
||||||
|
source: format!("Storage option value must be a string: {}", e).into(),
|
||||||
|
location: snafu::location!(),
|
||||||
|
}
|
||||||
|
})?;
|
||||||
|
storage_options.insert(key_str, value_str);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Some(storage_options))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.map_err(|e| lance_core::Error::IO {
|
||||||
|
source: Box::new(std::io::Error::other(format!(
|
||||||
|
"Task join error: {}",
|
||||||
|
e
|
||||||
|
))),
|
||||||
|
location: snafu::location!(),
|
||||||
|
})?
|
||||||
|
}
|
||||||
|
|
||||||
|
fn provider_id(&self) -> String {
|
||||||
|
Python::with_gil(|py| {
|
||||||
|
// Call provider_id() method on the Python object
|
||||||
|
let obj = self.py_provider.inner.bind(py);
|
||||||
|
obj.call_method0("provider_id")
|
||||||
|
.and_then(|result| result.extract::<String>())
|
||||||
|
.unwrap_or_else(|e| {
|
||||||
|
// If provider_id() fails, construct a fallback ID
|
||||||
|
format!("PyStorageOptionsProvider(error: {})", e)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Debug for PyStorageOptionsProviderWrapper {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
write!(f, "PyStorageOptionsProviderWrapper({})", self.provider_id())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Convert a Python object to an Arc<dyn StorageOptionsProvider>
|
||||||
|
///
|
||||||
|
/// This is the main entry point for converting Python StorageOptionsProvider objects
|
||||||
|
/// to Rust trait objects that can be used by the Lance ecosystem.
|
||||||
|
pub fn py_object_to_storage_options_provider(
|
||||||
|
py_obj: PyObject,
|
||||||
|
) -> PyResult<Arc<dyn StorageOptionsProvider>> {
|
||||||
|
let py_provider = PyStorageOptionsProvider::new(py_obj)?;
|
||||||
|
Ok(Arc::new(PyStorageOptionsProviderWrapper::new(py_provider)))
|
||||||
|
}
|
||||||
@@ -35,6 +35,7 @@ use crate::Table;
|
|||||||
pub use lance_encoding::version::LanceFileVersion;
|
pub use lance_encoding::version::LanceFileVersion;
|
||||||
#[cfg(feature = "remote")]
|
#[cfg(feature = "remote")]
|
||||||
use lance_io::object_store::StorageOptions;
|
use lance_io::object_store::StorageOptions;
|
||||||
|
use lance_io::object_store::StorageOptionsProvider;
|
||||||
|
|
||||||
/// A builder for configuring a [`Connection::table_names`] operation
|
/// A builder for configuring a [`Connection::table_names`] operation
|
||||||
pub struct TableNamesBuilder {
|
pub struct TableNamesBuilder {
|
||||||
@@ -360,6 +361,30 @@ impl<const HAS_DATA: bool> CreateTableBuilder<HAS_DATA> {
|
|||||||
self.request.namespace = namespace;
|
self.request.namespace = namespace;
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Set a custom location for the table.
|
||||||
|
///
|
||||||
|
/// If not set, the database will derive a location from its URI and the table name.
|
||||||
|
/// This is useful when integrating with namespace systems that manage table locations.
|
||||||
|
pub fn location(mut self, location: impl Into<String>) -> Self {
|
||||||
|
self.request.location = Some(location.into());
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set a storage options provider for automatic credential refresh.
|
||||||
|
///
|
||||||
|
/// This allows tables to automatically refresh cloud storage credentials
|
||||||
|
/// when they expire, enabling long-running operations on remote storage.
|
||||||
|
pub fn storage_options_provider(mut self, provider: Arc<dyn StorageOptionsProvider>) -> Self {
|
||||||
|
self.request
|
||||||
|
.write_options
|
||||||
|
.lance_write_params
|
||||||
|
.get_or_insert(Default::default())
|
||||||
|
.store_params
|
||||||
|
.get_or_insert(Default::default())
|
||||||
|
.storage_options_provider = Some(provider);
|
||||||
|
self
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
@@ -382,6 +407,7 @@ impl OpenTableBuilder {
|
|||||||
namespace: vec![],
|
namespace: vec![],
|
||||||
index_cache_size: None,
|
index_cache_size: None,
|
||||||
lance_read_params: None,
|
lance_read_params: None,
|
||||||
|
location: None,
|
||||||
},
|
},
|
||||||
embedding_registry,
|
embedding_registry,
|
||||||
}
|
}
|
||||||
@@ -461,6 +487,29 @@ impl OpenTableBuilder {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Set a custom location for the table.
|
||||||
|
///
|
||||||
|
/// If not set, the database will derive a location from its URI and the table name.
|
||||||
|
/// This is useful when integrating with namespace systems that manage table locations.
|
||||||
|
pub fn location(mut self, location: impl Into<String>) -> Self {
|
||||||
|
self.request.location = Some(location.into());
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set a storage options provider for automatic credential refresh.
|
||||||
|
///
|
||||||
|
/// This allows tables to automatically refresh cloud storage credentials
|
||||||
|
/// when they expire, enabling long-running operations on remote storage.
|
||||||
|
pub fn storage_options_provider(mut self, provider: Arc<dyn StorageOptionsProvider>) -> Self {
|
||||||
|
self.request
|
||||||
|
.lance_read_params
|
||||||
|
.get_or_insert(Default::default())
|
||||||
|
.store_options
|
||||||
|
.get_or_insert(Default::default())
|
||||||
|
.storage_options_provider = Some(provider);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
/// Open the table
|
/// Open the table
|
||||||
pub async fn execute(self) -> Result<Table> {
|
pub async fn execute(self) -> Result<Table> {
|
||||||
let table = self.parent.open_table(self.request).await?;
|
let table = self.parent.open_table(self.request).await?;
|
||||||
|
|||||||
@@ -84,6 +84,9 @@ pub struct OpenTableRequest {
|
|||||||
pub namespace: Vec<String>,
|
pub namespace: Vec<String>,
|
||||||
pub index_cache_size: Option<u32>,
|
pub index_cache_size: Option<u32>,
|
||||||
pub lance_read_params: Option<ReadParams>,
|
pub lance_read_params: Option<ReadParams>,
|
||||||
|
/// Optional custom location for the table. If not provided, the database will
|
||||||
|
/// derive a location based on its URI and the table name.
|
||||||
|
pub location: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type TableBuilderCallback = Box<dyn FnOnce(OpenTableRequest) -> OpenTableRequest + Send>;
|
pub type TableBuilderCallback = Box<dyn FnOnce(OpenTableRequest) -> OpenTableRequest + Send>;
|
||||||
@@ -164,6 +167,9 @@ pub struct CreateTableRequest {
|
|||||||
pub mode: CreateTableMode,
|
pub mode: CreateTableMode,
|
||||||
/// Options to use when writing data (only used if `data` is not None)
|
/// Options to use when writing data (only used if `data` is not None)
|
||||||
pub write_options: WriteOptions,
|
pub write_options: WriteOptions,
|
||||||
|
/// Optional custom location for the table. If not provided, the database will
|
||||||
|
/// derive a location based on its URI and the table name.
|
||||||
|
pub location: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl CreateTableRequest {
|
impl CreateTableRequest {
|
||||||
@@ -174,6 +180,7 @@ impl CreateTableRequest {
|
|||||||
data,
|
data,
|
||||||
mode: CreateTableMode::default(),
|
mode: CreateTableMode::default(),
|
||||||
write_options: WriteOptions::default(),
|
write_options: WriteOptions::default(),
|
||||||
|
location: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ use lance::dataset::{builder::DatasetBuilder, ReadParams, WriteMode};
|
|||||||
use lance::io::{ObjectStore, ObjectStoreParams, WrappingObjectStore};
|
use lance::io::{ObjectStore, ObjectStoreParams, WrappingObjectStore};
|
||||||
use lance_datafusion::utils::StreamingWriteSource;
|
use lance_datafusion::utils::StreamingWriteSource;
|
||||||
use lance_encoding::version::LanceFileVersion;
|
use lance_encoding::version::LanceFileVersion;
|
||||||
|
use lance_io::object_store::StorageOptionsProvider;
|
||||||
use lance_table::io::commit::commit_handler_from_url;
|
use lance_table::io::commit::commit_handler_from_url;
|
||||||
use object_store::local::LocalFileSystem;
|
use object_store::local::LocalFileSystem;
|
||||||
use snafu::ResultExt;
|
use snafu::ResultExt;
|
||||||
@@ -218,6 +219,9 @@ pub struct ListingDatabase {
|
|||||||
// Storage options to be inherited by tables created from this connection
|
// Storage options to be inherited by tables created from this connection
|
||||||
storage_options: HashMap<String, String>,
|
storage_options: HashMap<String, String>,
|
||||||
|
|
||||||
|
// Dynamic storage options provider for automatic credential refresh
|
||||||
|
pub(crate) storage_options_provider: Option<Arc<dyn StorageOptionsProvider>>,
|
||||||
|
|
||||||
// Options for tables created by this connection
|
// Options for tables created by this connection
|
||||||
new_table_config: NewTableConfig,
|
new_table_config: NewTableConfig,
|
||||||
|
|
||||||
@@ -335,7 +339,9 @@ impl ListingDatabase {
|
|||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
if object_store.is_local() {
|
if object_store.is_local() {
|
||||||
Self::try_create_dir(&plain_uri).context(CreateDirSnafu { path: plain_uri })?;
|
Self::try_create_dir(&plain_uri).context(CreateDirSnafu {
|
||||||
|
path: plain_uri.clone(),
|
||||||
|
})?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let write_store_wrapper = match mirrored_store {
|
let write_store_wrapper = match mirrored_store {
|
||||||
@@ -355,6 +361,7 @@ impl ListingDatabase {
|
|||||||
store_wrapper: write_store_wrapper,
|
store_wrapper: write_store_wrapper,
|
||||||
read_consistency_interval: request.read_consistency_interval,
|
read_consistency_interval: request.read_consistency_interval,
|
||||||
storage_options: options.storage_options,
|
storage_options: options.storage_options,
|
||||||
|
storage_options_provider: None,
|
||||||
new_table_config: options.new_table_config,
|
new_table_config: options.new_table_config,
|
||||||
session,
|
session,
|
||||||
})
|
})
|
||||||
@@ -396,6 +403,7 @@ impl ListingDatabase {
|
|||||||
store_wrapper: None,
|
store_wrapper: None,
|
||||||
read_consistency_interval,
|
read_consistency_interval,
|
||||||
storage_options: HashMap::new(),
|
storage_options: HashMap::new(),
|
||||||
|
storage_options_provider: None,
|
||||||
new_table_config,
|
new_table_config,
|
||||||
session,
|
session,
|
||||||
})
|
})
|
||||||
@@ -403,7 +411,20 @@ impl ListingDatabase {
|
|||||||
|
|
||||||
/// Try to create a local directory to store the lancedb dataset
|
/// Try to create a local directory to store the lancedb dataset
|
||||||
fn try_create_dir(path: &str) -> core::result::Result<(), std::io::Error> {
|
fn try_create_dir(path: &str) -> core::result::Result<(), std::io::Error> {
|
||||||
let path = Path::new(path);
|
// Strip file:// or file:/ scheme if present to get the actual filesystem path
|
||||||
|
// Note: file:///path becomes file:/path after url.to_string(), so we need to handle both
|
||||||
|
let fs_path = if let Some(stripped) = path.strip_prefix("file://") {
|
||||||
|
// file:///path or file://host/path format
|
||||||
|
stripped
|
||||||
|
} else if let Some(stripped) = path.strip_prefix("file:") {
|
||||||
|
// file:/path format (from url.to_string() on file:///path)
|
||||||
|
// The path after "file:" should already start with "/" for absolute paths
|
||||||
|
stripped
|
||||||
|
} else {
|
||||||
|
path
|
||||||
|
};
|
||||||
|
|
||||||
|
let path = Path::new(fs_path);
|
||||||
if !path.try_exists()? {
|
if !path.try_exists()? {
|
||||||
create_dir_all(path)?;
|
create_dir_all(path)?;
|
||||||
}
|
}
|
||||||
@@ -529,6 +550,14 @@ impl ListingDatabase {
|
|||||||
self.inherit_storage_options(storage_options);
|
self.inherit_storage_options(storage_options);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Set storage options provider if available
|
||||||
|
if self.storage_options_provider.is_some() {
|
||||||
|
write_params
|
||||||
|
.store_params
|
||||||
|
.get_or_insert_with(Default::default)
|
||||||
|
.storage_options_provider = self.storage_options_provider.clone();
|
||||||
|
}
|
||||||
|
|
||||||
write_params.data_storage_version = self
|
write_params.data_storage_version = self
|
||||||
.new_table_config
|
.new_table_config
|
||||||
.data_storage_version
|
.data_storage_version
|
||||||
@@ -569,6 +598,7 @@ impl ListingDatabase {
|
|||||||
namespace: namespace.clone(),
|
namespace: namespace.clone(),
|
||||||
index_cache_size: None,
|
index_cache_size: None,
|
||||||
lance_read_params: None,
|
lance_read_params: None,
|
||||||
|
location: None,
|
||||||
};
|
};
|
||||||
let req = (callback)(req);
|
let req = (callback)(req);
|
||||||
let table = self.open_table(req).await?;
|
let table = self.open_table(req).await?;
|
||||||
@@ -664,12 +694,17 @@ impl Database for ListingDatabase {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn create_table(&self, request: CreateTableRequest) -> Result<Arc<dyn BaseTable>> {
|
async fn create_table(&self, request: CreateTableRequest) -> Result<Arc<dyn BaseTable>> {
|
||||||
if !request.namespace.is_empty() {
|
// When namespace is not empty, location must be provided
|
||||||
return Err(Error::NotSupported {
|
if !request.namespace.is_empty() && request.location.is_none() {
|
||||||
message: "Namespace parameter is not supported for listing database. Only root namespace is supported.".into(),
|
return Err(Error::InvalidInput {
|
||||||
|
message: "Location must be provided when namespace is not empty".into(),
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
let table_uri = self.table_uri(&request.name)?;
|
// Use provided location if available, otherwise derive from table name
|
||||||
|
let table_uri = request
|
||||||
|
.location
|
||||||
|
.clone()
|
||||||
|
.unwrap_or_else(|| self.table_uri(&request.name).unwrap());
|
||||||
|
|
||||||
let (storage_version_override, v2_manifest_override) =
|
let (storage_version_override, v2_manifest_override) =
|
||||||
self.extract_storage_overrides(&request)?;
|
self.extract_storage_overrides(&request)?;
|
||||||
@@ -682,6 +717,7 @@ impl Database for ListingDatabase {
|
|||||||
match NativeTable::create(
|
match NativeTable::create(
|
||||||
&table_uri,
|
&table_uri,
|
||||||
&request.name,
|
&request.name,
|
||||||
|
request.namespace.clone(),
|
||||||
request.data,
|
request.data,
|
||||||
self.store_wrapper.clone(),
|
self.store_wrapper.clone(),
|
||||||
Some(write_params),
|
Some(write_params),
|
||||||
@@ -753,6 +789,7 @@ impl Database for ListingDatabase {
|
|||||||
let cloned_table = NativeTable::open_with_params(
|
let cloned_table = NativeTable::open_with_params(
|
||||||
&target_uri,
|
&target_uri,
|
||||||
&request.target_table_name,
|
&request.target_table_name,
|
||||||
|
request.target_namespace,
|
||||||
self.store_wrapper.clone(),
|
self.store_wrapper.clone(),
|
||||||
None,
|
None,
|
||||||
self.read_consistency_interval,
|
self.read_consistency_interval,
|
||||||
@@ -763,12 +800,17 @@ impl Database for ListingDatabase {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn open_table(&self, mut request: OpenTableRequest) -> Result<Arc<dyn BaseTable>> {
|
async fn open_table(&self, mut request: OpenTableRequest) -> Result<Arc<dyn BaseTable>> {
|
||||||
if !request.namespace.is_empty() {
|
// When namespace is not empty, location must be provided
|
||||||
return Err(Error::NotSupported {
|
if !request.namespace.is_empty() && request.location.is_none() {
|
||||||
message: "Namespace parameter is not supported for listing database. Only root namespace is supported.".into(),
|
return Err(Error::InvalidInput {
|
||||||
|
message: "Location must be provided when namespace is not empty".into(),
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
let table_uri = self.table_uri(&request.name)?;
|
// Use provided location if available, otherwise derive from table name
|
||||||
|
let table_uri = request
|
||||||
|
.location
|
||||||
|
.clone()
|
||||||
|
.unwrap_or_else(|| self.table_uri(&request.name).unwrap());
|
||||||
|
|
||||||
// Only modify the storage options if we actually have something to
|
// Only modify the storage options if we actually have something to
|
||||||
// inherit. There is a difference between storage_options=None and
|
// inherit. There is a difference between storage_options=None and
|
||||||
@@ -788,6 +830,16 @@ impl Database for ListingDatabase {
|
|||||||
self.inherit_storage_options(storage_options);
|
self.inherit_storage_options(storage_options);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Set storage options provider if available
|
||||||
|
if self.storage_options_provider.is_some() {
|
||||||
|
request
|
||||||
|
.lance_read_params
|
||||||
|
.get_or_insert_with(Default::default)
|
||||||
|
.store_options
|
||||||
|
.get_or_insert_with(Default::default)
|
||||||
|
.storage_options_provider = self.storage_options_provider.clone();
|
||||||
|
}
|
||||||
|
|
||||||
// Some ReadParams are exposed in the OpenTableBuilder, but we also
|
// Some ReadParams are exposed in the OpenTableBuilder, but we also
|
||||||
// let the user provide their own ReadParams.
|
// let the user provide their own ReadParams.
|
||||||
//
|
//
|
||||||
@@ -808,6 +860,7 @@ impl Database for ListingDatabase {
|
|||||||
NativeTable::open_with_params(
|
NativeTable::open_with_params(
|
||||||
&table_uri,
|
&table_uri,
|
||||||
&request.name,
|
&request.name,
|
||||||
|
request.namespace,
|
||||||
self.store_wrapper.clone(),
|
self.store_wrapper.clone(),
|
||||||
Some(read_params),
|
Some(read_params),
|
||||||
self.read_consistency_interval,
|
self.read_consistency_interval,
|
||||||
@@ -911,6 +964,7 @@ mod tests {
|
|||||||
data: CreateTableData::Empty(TableDefinition::new_from_schema(schema.clone())),
|
data: CreateTableData::Empty(TableDefinition::new_from_schema(schema.clone())),
|
||||||
mode: CreateTableMode::Create,
|
mode: CreateTableMode::Create,
|
||||||
write_options: Default::default(),
|
write_options: Default::default(),
|
||||||
|
location: None,
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
@@ -974,6 +1028,7 @@ mod tests {
|
|||||||
data: CreateTableData::Data(reader),
|
data: CreateTableData::Data(reader),
|
||||||
mode: CreateTableMode::Create,
|
mode: CreateTableMode::Create,
|
||||||
write_options: Default::default(),
|
write_options: Default::default(),
|
||||||
|
location: None,
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
@@ -1031,6 +1086,7 @@ mod tests {
|
|||||||
data: CreateTableData::Empty(TableDefinition::new_from_schema(schema)),
|
data: CreateTableData::Empty(TableDefinition::new_from_schema(schema)),
|
||||||
mode: CreateTableMode::Create,
|
mode: CreateTableMode::Create,
|
||||||
write_options: Default::default(),
|
write_options: Default::default(),
|
||||||
|
location: None,
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
@@ -1065,6 +1121,7 @@ mod tests {
|
|||||||
data: CreateTableData::Empty(TableDefinition::new_from_schema(schema)),
|
data: CreateTableData::Empty(TableDefinition::new_from_schema(schema)),
|
||||||
mode: CreateTableMode::Create,
|
mode: CreateTableMode::Create,
|
||||||
write_options: Default::default(),
|
write_options: Default::default(),
|
||||||
|
location: None,
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
@@ -1103,6 +1160,7 @@ mod tests {
|
|||||||
data: CreateTableData::Empty(TableDefinition::new_from_schema(schema)),
|
data: CreateTableData::Empty(TableDefinition::new_from_schema(schema)),
|
||||||
mode: CreateTableMode::Create,
|
mode: CreateTableMode::Create,
|
||||||
write_options: Default::default(),
|
write_options: Default::default(),
|
||||||
|
location: None,
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
@@ -1141,6 +1199,7 @@ mod tests {
|
|||||||
data: CreateTableData::Empty(TableDefinition::new_from_schema(schema)),
|
data: CreateTableData::Empty(TableDefinition::new_from_schema(schema)),
|
||||||
mode: CreateTableMode::Create,
|
mode: CreateTableMode::Create,
|
||||||
write_options: Default::default(),
|
write_options: Default::default(),
|
||||||
|
location: None,
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
@@ -1194,6 +1253,7 @@ mod tests {
|
|||||||
data: CreateTableData::Empty(TableDefinition::new_from_schema(schema)),
|
data: CreateTableData::Empty(TableDefinition::new_from_schema(schema)),
|
||||||
mode: CreateTableMode::Create,
|
mode: CreateTableMode::Create,
|
||||||
write_options: Default::default(),
|
write_options: Default::default(),
|
||||||
|
location: None,
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
@@ -1250,6 +1310,7 @@ mod tests {
|
|||||||
data: CreateTableData::Data(reader),
|
data: CreateTableData::Data(reader),
|
||||||
mode: CreateTableMode::Create,
|
mode: CreateTableMode::Create,
|
||||||
write_options: Default::default(),
|
write_options: Default::default(),
|
||||||
|
location: None,
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
@@ -1334,6 +1395,7 @@ mod tests {
|
|||||||
data: CreateTableData::Data(reader),
|
data: CreateTableData::Data(reader),
|
||||||
mode: CreateTableMode::Create,
|
mode: CreateTableMode::Create,
|
||||||
write_options: Default::default(),
|
write_options: Default::default(),
|
||||||
|
location: None,
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
@@ -1419,6 +1481,7 @@ mod tests {
|
|||||||
data: CreateTableData::Data(reader),
|
data: CreateTableData::Data(reader),
|
||||||
mode: CreateTableMode::Create,
|
mode: CreateTableMode::Create,
|
||||||
write_options: Default::default(),
|
write_options: Default::default(),
|
||||||
|
location: None,
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
@@ -1511,6 +1574,7 @@ mod tests {
|
|||||||
data: CreateTableData::Data(reader),
|
data: CreateTableData::Data(reader),
|
||||||
mode: CreateTableMode::Create,
|
mode: CreateTableMode::Create,
|
||||||
write_options: Default::default(),
|
write_options: Default::default(),
|
||||||
|
location: None,
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ use std::collections::HashMap;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
use lance_io::object_store::{LanceNamespaceStorageOptionsProvider, StorageOptionsProvider};
|
||||||
use lance_namespace::{
|
use lance_namespace::{
|
||||||
models::{
|
models::{
|
||||||
CreateEmptyTableRequest, CreateNamespaceRequest, DescribeTableRequest,
|
CreateEmptyTableRequest, CreateNamespaceRequest, DescribeTableRequest,
|
||||||
@@ -16,13 +17,14 @@ use lance_namespace::{
|
|||||||
};
|
};
|
||||||
use lance_namespace_impls::ConnectBuilder;
|
use lance_namespace_impls::ConnectBuilder;
|
||||||
|
|
||||||
use crate::database::listing::ListingDatabase;
|
use crate::connection::ConnectRequest;
|
||||||
|
use crate::database::ReadConsistency;
|
||||||
use crate::error::{Error, Result};
|
use crate::error::{Error, Result};
|
||||||
use crate::{connection::ConnectRequest, database::ReadConsistency};
|
|
||||||
|
|
||||||
use super::{
|
use super::{
|
||||||
BaseTable, CloneTableRequest, CreateNamespaceRequest as DbCreateNamespaceRequest,
|
listing::ListingDatabase, BaseTable, CloneTableRequest,
|
||||||
CreateTableMode, CreateTableRequest as DbCreateTableRequest, Database,
|
CreateNamespaceRequest as DbCreateNamespaceRequest, CreateTableMode,
|
||||||
|
CreateTableRequest as DbCreateTableRequest, Database,
|
||||||
DropNamespaceRequest as DbDropNamespaceRequest,
|
DropNamespaceRequest as DbDropNamespaceRequest,
|
||||||
ListNamespacesRequest as DbListNamespacesRequest, OpenTableRequest, TableNamesRequest,
|
ListNamespacesRequest as DbListNamespacesRequest, OpenTableRequest, TableNamesRequest,
|
||||||
};
|
};
|
||||||
@@ -67,58 +69,6 @@ impl LanceNamespaceDatabase {
|
|||||||
uri: format!("namespace://{}", ns_impl),
|
uri: format!("namespace://{}", ns_impl),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Helper method to create a ListingDatabase from a table location
|
|
||||||
///
|
|
||||||
/// This method:
|
|
||||||
/// 1. Validates that the location ends with <table_name>.lance
|
|
||||||
/// 2. Extracts the parent directory from the location
|
|
||||||
/// 3. Creates a ListingDatabase at that parent directory
|
|
||||||
async fn create_listing_database(
|
|
||||||
&self,
|
|
||||||
table_name: &str,
|
|
||||||
location: &str,
|
|
||||||
additional_storage_options: Option<HashMap<String, String>>,
|
|
||||||
) -> Result<Arc<ListingDatabase>> {
|
|
||||||
let expected_suffix = format!("{}.lance", table_name);
|
|
||||||
if !location.ends_with(&expected_suffix) {
|
|
||||||
return Err(Error::Runtime {
|
|
||||||
message: format!(
|
|
||||||
"Invalid table location '{}': expected to end with '{}'",
|
|
||||||
location, expected_suffix
|
|
||||||
),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
let parent_dir = location
|
|
||||||
.rsplit_once('/')
|
|
||||||
.map(|(parent, _)| parent.to_string())
|
|
||||||
.ok_or_else(|| Error::Runtime {
|
|
||||||
message: format!("Invalid table location '{}': no parent directory", location),
|
|
||||||
})?;
|
|
||||||
|
|
||||||
let mut merged_storage_options = self.storage_options.clone();
|
|
||||||
if let Some(opts) = additional_storage_options {
|
|
||||||
merged_storage_options.extend(opts);
|
|
||||||
}
|
|
||||||
|
|
||||||
let connect_request = ConnectRequest {
|
|
||||||
uri: parent_dir,
|
|
||||||
options: merged_storage_options,
|
|
||||||
read_consistency_interval: self.read_consistency_interval,
|
|
||||||
session: self.session.clone(),
|
|
||||||
#[cfg(feature = "remote")]
|
|
||||||
client_config: Default::default(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let listing_db = ListingDatabase::connect_with_options(&connect_request)
|
|
||||||
.await
|
|
||||||
.map_err(|e| Error::Runtime {
|
|
||||||
message: format!("Failed to create listing database: {}", e),
|
|
||||||
})?;
|
|
||||||
|
|
||||||
Ok(Arc::new(listing_db))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::fmt::Debug for LanceNamespaceDatabase {
|
impl std::fmt::Debug for LanceNamespaceDatabase {
|
||||||
@@ -136,6 +86,51 @@ impl std::fmt::Display for LanceNamespaceDatabase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl LanceNamespaceDatabase {
|
||||||
|
/// Create a temporary listing database for the given location
|
||||||
|
///
|
||||||
|
/// Merges storage options with priority: connection < user < namespace
|
||||||
|
async fn create_listing_database(
|
||||||
|
&self,
|
||||||
|
location: &str,
|
||||||
|
table_id: Vec<String>,
|
||||||
|
user_storage_options: Option<&HashMap<String, String>>,
|
||||||
|
response_storage_options: Option<&HashMap<String, String>>,
|
||||||
|
) -> Result<ListingDatabase> {
|
||||||
|
// Merge storage options: connection < user < namespace
|
||||||
|
let mut merged_storage_options = self.storage_options.clone();
|
||||||
|
if let Some(opts) = user_storage_options {
|
||||||
|
merged_storage_options.extend(opts.clone());
|
||||||
|
}
|
||||||
|
if let Some(opts) = response_storage_options {
|
||||||
|
merged_storage_options.extend(opts.clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
let request = ConnectRequest {
|
||||||
|
uri: location.to_string(),
|
||||||
|
#[cfg(feature = "remote")]
|
||||||
|
client_config: Default::default(),
|
||||||
|
options: merged_storage_options,
|
||||||
|
read_consistency_interval: self.read_consistency_interval,
|
||||||
|
session: self.session.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut listing_db = ListingDatabase::connect_with_options(&request).await?;
|
||||||
|
|
||||||
|
// Create storage options provider only if namespace returned storage options
|
||||||
|
// (not just user-provided options)
|
||||||
|
if response_storage_options.is_some() {
|
||||||
|
let provider = Arc::new(LanceNamespaceStorageOptionsProvider::new(
|
||||||
|
self.namespace.clone(),
|
||||||
|
table_id,
|
||||||
|
)) as Arc<dyn StorageOptionsProvider>;
|
||||||
|
listing_db.storage_options_provider = Some(provider);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(listing_db)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl Database for LanceNamespaceDatabase {
|
impl Database for LanceNamespaceDatabase {
|
||||||
fn uri(&self) -> &str {
|
fn uri(&self) -> &str {
|
||||||
@@ -241,6 +236,14 @@ impl Database for LanceNamespaceDatabase {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn create_table(&self, request: DbCreateTableRequest) -> Result<Arc<dyn BaseTable>> {
|
async fn create_table(&self, request: DbCreateTableRequest) -> Result<Arc<dyn BaseTable>> {
|
||||||
|
// Extract user-provided storage options from request
|
||||||
|
let user_storage_options = request
|
||||||
|
.write_options
|
||||||
|
.lance_write_params
|
||||||
|
.as_ref()
|
||||||
|
.and_then(|lwp| lwp.store_params.as_ref())
|
||||||
|
.and_then(|sp| sp.storage_options.as_ref());
|
||||||
|
|
||||||
let mut table_id = request.namespace.clone();
|
let mut table_id = request.namespace.clone();
|
||||||
table_id.push(request.name.clone());
|
table_id.push(request.name.clone());
|
||||||
let describe_request = DescribeTableRequest {
|
let describe_request = DescribeTableRequest {
|
||||||
@@ -279,15 +282,21 @@ impl Database for LanceNamespaceDatabase {
|
|||||||
})?;
|
})?;
|
||||||
|
|
||||||
let listing_db = self
|
let listing_db = self
|
||||||
.create_listing_database(&request.name, &location, response.storage_options)
|
.create_listing_database(
|
||||||
|
&location,
|
||||||
|
table_id.clone(),
|
||||||
|
user_storage_options,
|
||||||
|
response.storage_options.as_ref(),
|
||||||
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
return listing_db
|
return listing_db
|
||||||
.open_table(OpenTableRequest {
|
.open_table(OpenTableRequest {
|
||||||
name: request.name.clone(),
|
name: request.name.clone(),
|
||||||
namespace: vec![],
|
namespace: request.namespace.clone(),
|
||||||
index_cache_size: None,
|
index_cache_size: None,
|
||||||
lance_read_params: None,
|
lance_read_params: None,
|
||||||
|
location: Some(location),
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
@@ -298,7 +307,7 @@ impl Database for LanceNamespaceDatabase {
|
|||||||
table_id.push(request.name.clone());
|
table_id.push(request.name.clone());
|
||||||
|
|
||||||
let create_empty_request = CreateEmptyTableRequest {
|
let create_empty_request = CreateEmptyTableRequest {
|
||||||
id: Some(table_id),
|
id: Some(table_id.clone()),
|
||||||
location: None,
|
location: None,
|
||||||
properties: if self.storage_options.is_empty() {
|
properties: if self.storage_options.is_empty() {
|
||||||
None
|
None
|
||||||
@@ -323,28 +332,37 @@ impl Database for LanceNamespaceDatabase {
|
|||||||
|
|
||||||
let listing_db = self
|
let listing_db = self
|
||||||
.create_listing_database(
|
.create_listing_database(
|
||||||
&request.name,
|
|
||||||
&location,
|
&location,
|
||||||
create_empty_response.storage_options,
|
table_id,
|
||||||
|
user_storage_options,
|
||||||
|
create_empty_response.storage_options.as_ref(),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let create_request = DbCreateTableRequest {
|
let create_request = DbCreateTableRequest {
|
||||||
name: request.name,
|
name: request.name,
|
||||||
namespace: vec![],
|
namespace: request.namespace,
|
||||||
data: request.data,
|
data: request.data,
|
||||||
mode: request.mode,
|
mode: request.mode,
|
||||||
write_options: request.write_options,
|
write_options: request.write_options,
|
||||||
|
location: Some(location),
|
||||||
};
|
};
|
||||||
listing_db.create_table(create_request).await
|
listing_db.create_table(create_request).await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn open_table(&self, request: OpenTableRequest) -> Result<Arc<dyn BaseTable>> {
|
async fn open_table(&self, request: OpenTableRequest) -> Result<Arc<dyn BaseTable>> {
|
||||||
|
// Extract user-provided storage options from request
|
||||||
|
let user_storage_options = request
|
||||||
|
.lance_read_params
|
||||||
|
.as_ref()
|
||||||
|
.and_then(|lrp| lrp.store_options.as_ref())
|
||||||
|
.and_then(|so| so.storage_options.as_ref());
|
||||||
|
|
||||||
let mut table_id = request.namespace.clone();
|
let mut table_id = request.namespace.clone();
|
||||||
table_id.push(request.name.clone());
|
table_id.push(request.name.clone());
|
||||||
|
|
||||||
let describe_request = DescribeTableRequest {
|
let describe_request = DescribeTableRequest {
|
||||||
id: Some(table_id),
|
id: Some(table_id.clone()),
|
||||||
version: None,
|
version: None,
|
||||||
};
|
};
|
||||||
let response = self
|
let response = self
|
||||||
@@ -360,14 +378,20 @@ impl Database for LanceNamespaceDatabase {
|
|||||||
})?;
|
})?;
|
||||||
|
|
||||||
let listing_db = self
|
let listing_db = self
|
||||||
.create_listing_database(&request.name, &location, response.storage_options)
|
.create_listing_database(
|
||||||
|
&location,
|
||||||
|
table_id,
|
||||||
|
user_storage_options,
|
||||||
|
response.storage_options.as_ref(),
|
||||||
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let open_request = OpenTableRequest {
|
let open_request = OpenTableRequest {
|
||||||
name: request.name.clone(),
|
name: request.name.clone(),
|
||||||
namespace: vec![],
|
namespace: request.namespace.clone(),
|
||||||
index_cache_size: request.index_cache_size,
|
index_cache_size: request.index_cache_size,
|
||||||
lance_read_params: request.lance_read_params,
|
lance_read_params: request.lance_read_params,
|
||||||
|
location: Some(location),
|
||||||
};
|
};
|
||||||
listing_db.open_table(open_request).await
|
listing_db.open_table(open_request).await
|
||||||
}
|
}
|
||||||
@@ -431,6 +455,7 @@ impl Database for LanceNamespaceDatabase {
|
|||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::connect_namespace;
|
use crate::connect_namespace;
|
||||||
|
use crate::database::CreateNamespaceRequest;
|
||||||
use crate::query::ExecutableQuery;
|
use crate::query::ExecutableQuery;
|
||||||
use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator, StringArray};
|
use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator, StringArray};
|
||||||
use arrow_schema::{DataType, Field, Schema};
|
use arrow_schema::{DataType, Field, Schema};
|
||||||
@@ -541,10 +566,18 @@ mod tests {
|
|||||||
.await
|
.await
|
||||||
.expect("Failed to connect to namespace");
|
.expect("Failed to connect to namespace");
|
||||||
|
|
||||||
// Test: Create a table
|
// Create a child namespace first
|
||||||
|
conn.create_namespace(CreateNamespaceRequest {
|
||||||
|
namespace: vec!["test_ns".into()],
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.expect("Failed to create namespace");
|
||||||
|
|
||||||
|
// Test: Create a table in the child namespace
|
||||||
let test_data = create_test_data();
|
let test_data = create_test_data();
|
||||||
let table = conn
|
let table = conn
|
||||||
.create_table("test_table", test_data)
|
.create_table("test_table", test_data)
|
||||||
|
.namespace(vec!["test_ns".into()])
|
||||||
.execute()
|
.execute()
|
||||||
.await
|
.await
|
||||||
.expect("Failed to create table");
|
.expect("Failed to create table");
|
||||||
@@ -562,9 +595,15 @@ mod tests {
|
|||||||
assert_eq!(results.len(), 1);
|
assert_eq!(results.len(), 1);
|
||||||
assert_eq!(results[0].num_rows(), 5);
|
assert_eq!(results[0].num_rows(), 5);
|
||||||
|
|
||||||
// Verify: Table appears in table_names
|
// Verify: Table namespace is correct
|
||||||
|
assert_eq!(table.namespace(), &["test_ns"]);
|
||||||
|
assert_eq!(table.name(), "test_table");
|
||||||
|
assert_eq!(table.id(), "test_ns$test_table");
|
||||||
|
|
||||||
|
// Verify: Table appears in table_names for the child namespace
|
||||||
let table_names = conn
|
let table_names = conn
|
||||||
.table_names()
|
.table_names()
|
||||||
|
.namespace(vec!["test_ns".into()])
|
||||||
.execute()
|
.execute()
|
||||||
.await
|
.await
|
||||||
.expect("Failed to list tables");
|
.expect("Failed to list tables");
|
||||||
@@ -586,10 +625,18 @@ mod tests {
|
|||||||
.await
|
.await
|
||||||
.expect("Failed to connect to namespace");
|
.expect("Failed to connect to namespace");
|
||||||
|
|
||||||
// Create a table first
|
// Create a child namespace first
|
||||||
|
conn.create_namespace(CreateNamespaceRequest {
|
||||||
|
namespace: vec!["test_ns".into()],
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.expect("Failed to create namespace");
|
||||||
|
|
||||||
|
// Create a table in child namespace
|
||||||
let test_data = create_test_data();
|
let test_data = create_test_data();
|
||||||
let _table = conn
|
let _table = conn
|
||||||
.create_table("describe_test", test_data)
|
.create_table("describe_test", test_data)
|
||||||
|
.namespace(vec!["test_ns".into()])
|
||||||
.execute()
|
.execute()
|
||||||
.await
|
.await
|
||||||
.expect("Failed to create table");
|
.expect("Failed to create table");
|
||||||
@@ -597,6 +644,7 @@ mod tests {
|
|||||||
// Test: Open the table (which internally uses describe_table)
|
// Test: Open the table (which internally uses describe_table)
|
||||||
let opened_table = conn
|
let opened_table = conn
|
||||||
.open_table("describe_test")
|
.open_table("describe_test")
|
||||||
|
.namespace(vec!["test_ns".into()])
|
||||||
.execute()
|
.execute()
|
||||||
.await
|
.await
|
||||||
.expect("Failed to open table");
|
.expect("Failed to open table");
|
||||||
@@ -619,6 +667,10 @@ mod tests {
|
|||||||
assert_eq!(schema.fields.len(), 2);
|
assert_eq!(schema.fields.len(), 2);
|
||||||
assert_eq!(schema.field(0).name(), "id");
|
assert_eq!(schema.field(0).name(), "id");
|
||||||
assert_eq!(schema.field(1).name(), "name");
|
assert_eq!(schema.field(1).name(), "name");
|
||||||
|
|
||||||
|
// Verify namespace and id
|
||||||
|
assert_eq!(opened_table.namespace(), &["test_ns"]);
|
||||||
|
assert_eq!(opened_table.id(), "test_ns$describe_test");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
@@ -635,10 +687,18 @@ mod tests {
|
|||||||
.await
|
.await
|
||||||
.expect("Failed to connect to namespace");
|
.expect("Failed to connect to namespace");
|
||||||
|
|
||||||
// Create initial table with 5 rows
|
// Create a child namespace first
|
||||||
|
conn.create_namespace(CreateNamespaceRequest {
|
||||||
|
namespace: vec!["test_ns".into()],
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.expect("Failed to create namespace");
|
||||||
|
|
||||||
|
// Create initial table with 5 rows in child namespace
|
||||||
let test_data1 = create_test_data();
|
let test_data1 = create_test_data();
|
||||||
let _table1 = conn
|
let _table1 = conn
|
||||||
.create_table("overwrite_test", test_data1)
|
.create_table("overwrite_test", test_data1)
|
||||||
|
.namespace(vec!["test_ns".into()])
|
||||||
.execute()
|
.execute()
|
||||||
.await
|
.await
|
||||||
.expect("Failed to create table");
|
.expect("Failed to create table");
|
||||||
@@ -665,6 +725,7 @@ mod tests {
|
|||||||
schema,
|
schema,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
.namespace(vec!["test_ns".into()])
|
||||||
.mode(CreateTableMode::Overwrite)
|
.mode(CreateTableMode::Overwrite)
|
||||||
.execute()
|
.execute()
|
||||||
.await
|
.await
|
||||||
@@ -708,10 +769,18 @@ mod tests {
|
|||||||
.await
|
.await
|
||||||
.expect("Failed to connect to namespace");
|
.expect("Failed to connect to namespace");
|
||||||
|
|
||||||
// Create initial table with test data
|
// Create a child namespace first
|
||||||
|
conn.create_namespace(CreateNamespaceRequest {
|
||||||
|
namespace: vec!["test_ns".into()],
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.expect("Failed to create namespace");
|
||||||
|
|
||||||
|
// Create initial table with test data in child namespace
|
||||||
let test_data1 = create_test_data();
|
let test_data1 = create_test_data();
|
||||||
let _table1 = conn
|
let _table1 = conn
|
||||||
.create_table("exist_ok_test", test_data1)
|
.create_table("exist_ok_test", test_data1)
|
||||||
|
.namespace(vec!["test_ns".into()])
|
||||||
.execute()
|
.execute()
|
||||||
.await
|
.await
|
||||||
.expect("Failed to create table");
|
.expect("Failed to create table");
|
||||||
@@ -720,6 +789,7 @@ mod tests {
|
|||||||
let test_data2 = create_test_data();
|
let test_data2 = create_test_data();
|
||||||
let table2 = conn
|
let table2 = conn
|
||||||
.create_table("exist_ok_test", test_data2)
|
.create_table("exist_ok_test", test_data2)
|
||||||
|
.namespace(vec!["test_ns".into()])
|
||||||
.mode(CreateTableMode::exist_ok(|req| req))
|
.mode(CreateTableMode::exist_ok(|req| req))
|
||||||
.execute()
|
.execute()
|
||||||
.await
|
.await
|
||||||
@@ -753,25 +823,35 @@ mod tests {
|
|||||||
.await
|
.await
|
||||||
.expect("Failed to connect to namespace");
|
.expect("Failed to connect to namespace");
|
||||||
|
|
||||||
// Create first table
|
// Create a child namespace first
|
||||||
|
conn.create_namespace(CreateNamespaceRequest {
|
||||||
|
namespace: vec!["test_ns".into()],
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.expect("Failed to create namespace");
|
||||||
|
|
||||||
|
// Create first table in child namespace
|
||||||
let test_data1 = create_test_data();
|
let test_data1 = create_test_data();
|
||||||
let _table1 = conn
|
let _table1 = conn
|
||||||
.create_table("table1", test_data1)
|
.create_table("table1", test_data1)
|
||||||
|
.namespace(vec!["test_ns".into()])
|
||||||
.execute()
|
.execute()
|
||||||
.await
|
.await
|
||||||
.expect("Failed to create first table");
|
.expect("Failed to create first table");
|
||||||
|
|
||||||
// Create second table
|
// Create second table in child namespace
|
||||||
let test_data2 = create_test_data();
|
let test_data2 = create_test_data();
|
||||||
let _table2 = conn
|
let _table2 = conn
|
||||||
.create_table("table2", test_data2)
|
.create_table("table2", test_data2)
|
||||||
|
.namespace(vec!["test_ns".into()])
|
||||||
.execute()
|
.execute()
|
||||||
.await
|
.await
|
||||||
.expect("Failed to create second table");
|
.expect("Failed to create second table");
|
||||||
|
|
||||||
// Verify: Both tables appear in table list
|
// Verify: Both tables appear in table list for the child namespace
|
||||||
let table_names = conn
|
let table_names = conn
|
||||||
.table_names()
|
.table_names()
|
||||||
|
.namespace(vec!["test_ns".into()])
|
||||||
.execute()
|
.execute()
|
||||||
.await
|
.await
|
||||||
.expect("Failed to list tables");
|
.expect("Failed to list tables");
|
||||||
@@ -782,12 +862,14 @@ mod tests {
|
|||||||
// Verify: Can open both tables
|
// Verify: Can open both tables
|
||||||
let opened_table1 = conn
|
let opened_table1 = conn
|
||||||
.open_table("table1")
|
.open_table("table1")
|
||||||
|
.namespace(vec!["test_ns".into()])
|
||||||
.execute()
|
.execute()
|
||||||
.await
|
.await
|
||||||
.expect("Failed to open table1");
|
.expect("Failed to open table1");
|
||||||
|
|
||||||
let opened_table2 = conn
|
let opened_table2 = conn
|
||||||
.open_table("table2")
|
.open_table("table2")
|
||||||
|
.namespace(vec!["test_ns".into()])
|
||||||
.execute()
|
.execute()
|
||||||
.await
|
.await
|
||||||
.expect("Failed to open table2");
|
.expect("Failed to open table2");
|
||||||
@@ -820,8 +902,19 @@ mod tests {
|
|||||||
.await
|
.await
|
||||||
.expect("Failed to connect to namespace");
|
.expect("Failed to connect to namespace");
|
||||||
|
|
||||||
// Test: Try to open a non-existent table
|
// Create a child namespace first
|
||||||
let result = conn.open_table("non_existent_table").execute().await;
|
conn.create_namespace(CreateNamespaceRequest {
|
||||||
|
namespace: vec!["test_ns".into()],
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.expect("Failed to create namespace");
|
||||||
|
|
||||||
|
// Test: Try to open a non-existent table in the child namespace
|
||||||
|
let result = conn
|
||||||
|
.open_table("non_existent_table")
|
||||||
|
.namespace(vec!["test_ns".into()])
|
||||||
|
.execute()
|
||||||
|
.await;
|
||||||
|
|
||||||
// Verify: Should return an error
|
// Verify: Should return an error
|
||||||
assert!(result.is_err());
|
assert!(result.is_err());
|
||||||
@@ -841,30 +934,40 @@ mod tests {
|
|||||||
.await
|
.await
|
||||||
.expect("Failed to connect to namespace");
|
.expect("Failed to connect to namespace");
|
||||||
|
|
||||||
// Create a table first
|
// Create a child namespace first
|
||||||
|
conn.create_namespace(CreateNamespaceRequest {
|
||||||
|
namespace: vec!["test_ns".into()],
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.expect("Failed to create namespace");
|
||||||
|
|
||||||
|
// Create a table in child namespace
|
||||||
let test_data = create_test_data();
|
let test_data = create_test_data();
|
||||||
let _table = conn
|
let _table = conn
|
||||||
.create_table("drop_test", test_data)
|
.create_table("drop_test", test_data)
|
||||||
|
.namespace(vec!["test_ns".into()])
|
||||||
.execute()
|
.execute()
|
||||||
.await
|
.await
|
||||||
.expect("Failed to create table");
|
.expect("Failed to create table");
|
||||||
|
|
||||||
// Verify table exists
|
// Verify table exists in child namespace
|
||||||
let table_names_before = conn
|
let table_names_before = conn
|
||||||
.table_names()
|
.table_names()
|
||||||
|
.namespace(vec!["test_ns".into()])
|
||||||
.execute()
|
.execute()
|
||||||
.await
|
.await
|
||||||
.expect("Failed to list tables");
|
.expect("Failed to list tables");
|
||||||
assert!(table_names_before.contains(&"drop_test".to_string()));
|
assert!(table_names_before.contains(&"drop_test".to_string()));
|
||||||
|
|
||||||
// Test: Drop the table
|
// Test: Drop the table
|
||||||
conn.drop_table("drop_test", &[])
|
conn.drop_table("drop_test", &["test_ns".into()])
|
||||||
.await
|
.await
|
||||||
.expect("Failed to drop table");
|
.expect("Failed to drop table");
|
||||||
|
|
||||||
// Verify: Table no longer exists
|
// Verify: Table no longer exists
|
||||||
let table_names_after = conn
|
let table_names_after = conn
|
||||||
.table_names()
|
.table_names()
|
||||||
|
.namespace(vec!["test_ns".into()])
|
||||||
.execute()
|
.execute()
|
||||||
.await
|
.await
|
||||||
.expect("Failed to list tables");
|
.expect("Failed to list tables");
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ use futures::{StreamExt, TryStreamExt};
|
|||||||
use lance::io::ObjectStore;
|
use lance::io::ObjectStore;
|
||||||
use lance_core::{cache::LanceCache, utils::futures::FinallyStreamExt};
|
use lance_core::{cache::LanceCache, utils::futures::FinallyStreamExt};
|
||||||
use lance_encoding::decoder::DecoderPlugins;
|
use lance_encoding::decoder::DecoderPlugins;
|
||||||
use lance_file::v2::{
|
use lance_file::{
|
||||||
reader::{FileReader, FileReaderOptions},
|
reader::{FileReader, FileReaderOptions},
|
||||||
writer::{FileWriter, FileWriterOptions},
|
writer::{FileWriter, FileWriterOptions},
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -416,6 +416,7 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
|
|||||||
namespace: request.namespace.clone(),
|
namespace: request.namespace.clone(),
|
||||||
index_cache_size: None,
|
index_cache_size: None,
|
||||||
lance_read_params: None,
|
lance_read_params: None,
|
||||||
|
location: None,
|
||||||
};
|
};
|
||||||
let req = (callback)(req);
|
let req = (callback)(req);
|
||||||
self.open_table(req).await
|
self.open_table(req).await
|
||||||
|
|||||||
@@ -511,7 +511,7 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
|
|||||||
/// Get the id of the table
|
/// Get the id of the table
|
||||||
///
|
///
|
||||||
/// This is the namespace of the table concatenated with the name
|
/// This is the namespace of the table concatenated with the name
|
||||||
/// separated by a dot (".")
|
/// separated by $
|
||||||
fn id(&self) -> &str;
|
fn id(&self) -> &str;
|
||||||
/// Get the arrow [Schema] of the table.
|
/// Get the arrow [Schema] of the table.
|
||||||
async fn schema(&self) -> Result<SchemaRef>;
|
async fn schema(&self) -> Result<SchemaRef>;
|
||||||
@@ -734,6 +734,16 @@ impl Table {
|
|||||||
self.inner.name()
|
self.inner.name()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get the namespace of the table.
|
||||||
|
pub fn namespace(&self) -> &[String] {
|
||||||
|
self.inner.namespace()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get the ID of the table (namespace + name joined by '$').
|
||||||
|
pub fn id(&self) -> &str {
|
||||||
|
self.inner.id()
|
||||||
|
}
|
||||||
|
|
||||||
/// Get the dataset of the table if it is a native table
|
/// Get the dataset of the table if it is a native table
|
||||||
///
|
///
|
||||||
/// Returns None otherwise
|
/// Returns None otherwise
|
||||||
@@ -1468,6 +1478,8 @@ impl NativeTableExt for Arc<dyn BaseTable> {
|
|||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct NativeTable {
|
pub struct NativeTable {
|
||||||
name: String,
|
name: String,
|
||||||
|
namespace: Vec<String>,
|
||||||
|
id: String,
|
||||||
uri: String,
|
uri: String,
|
||||||
pub(crate) dataset: dataset::DatasetConsistencyWrapper,
|
pub(crate) dataset: dataset::DatasetConsistencyWrapper,
|
||||||
// This comes from the connection options. We store here so we can pass down
|
// This comes from the connection options. We store here so we can pass down
|
||||||
@@ -1507,7 +1519,7 @@ impl NativeTable {
|
|||||||
/// * A [NativeTable] object.
|
/// * A [NativeTable] object.
|
||||||
pub async fn open(uri: &str) -> Result<Self> {
|
pub async fn open(uri: &str) -> Result<Self> {
|
||||||
let name = Self::get_table_name(uri)?;
|
let name = Self::get_table_name(uri)?;
|
||||||
Self::open_with_params(uri, &name, None, None, None).await
|
Self::open_with_params(uri, &name, vec![], None, None, None).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Opens an existing Table
|
/// Opens an existing Table
|
||||||
@@ -1524,6 +1536,7 @@ impl NativeTable {
|
|||||||
pub async fn open_with_params(
|
pub async fn open_with_params(
|
||||||
uri: &str,
|
uri: &str,
|
||||||
name: &str,
|
name: &str,
|
||||||
|
namespace: Vec<String>,
|
||||||
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
|
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
|
||||||
params: Option<ReadParams>,
|
params: Option<ReadParams>,
|
||||||
read_consistency_interval: Option<std::time::Duration>,
|
read_consistency_interval: Option<std::time::Duration>,
|
||||||
@@ -1548,9 +1561,12 @@ impl NativeTable {
|
|||||||
})?;
|
})?;
|
||||||
|
|
||||||
let dataset = DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval);
|
let dataset = DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval);
|
||||||
|
let id = Self::build_id(&namespace, name);
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
name: name.to_string(),
|
name: name.to_string(),
|
||||||
|
namespace,
|
||||||
|
id,
|
||||||
uri: uri.to_string(),
|
uri: uri.to_string(),
|
||||||
dataset,
|
dataset,
|
||||||
read_consistency_interval,
|
read_consistency_interval,
|
||||||
@@ -1573,12 +1589,24 @@ impl NativeTable {
|
|||||||
Ok(name.to_string())
|
Ok(name.to_string())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn build_id(namespace: &[String], name: &str) -> String {
|
||||||
|
if namespace.is_empty() {
|
||||||
|
name.to_string()
|
||||||
|
} else {
|
||||||
|
let mut parts = namespace.to_vec();
|
||||||
|
parts.push(name.to_string());
|
||||||
|
parts.join("$")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Creates a new Table
|
/// Creates a new Table
|
||||||
///
|
///
|
||||||
/// # Arguments
|
/// # Arguments
|
||||||
///
|
///
|
||||||
/// * `uri` - The URI to the table.
|
/// * `uri` - The URI to the table. When namespace is not empty, the caller must
|
||||||
|
/// provide an explicit URI (location) rather than deriving it from the table name.
|
||||||
/// * `name` The Table name
|
/// * `name` The Table name
|
||||||
|
/// * `namespace` - The namespace path. When non-empty, an explicit URI must be provided.
|
||||||
/// * `batches` RecordBatch to be saved in the database.
|
/// * `batches` RecordBatch to be saved in the database.
|
||||||
/// * `params` - Write parameters.
|
/// * `params` - Write parameters.
|
||||||
///
|
///
|
||||||
@@ -1588,6 +1616,7 @@ impl NativeTable {
|
|||||||
pub async fn create(
|
pub async fn create(
|
||||||
uri: &str,
|
uri: &str,
|
||||||
name: &str,
|
name: &str,
|
||||||
|
namespace: Vec<String>,
|
||||||
batches: impl StreamingWriteSource,
|
batches: impl StreamingWriteSource,
|
||||||
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
|
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
|
||||||
params: Option<WriteParams>,
|
params: Option<WriteParams>,
|
||||||
@@ -1614,8 +1643,12 @@ impl NativeTable {
|
|||||||
source => Error::Lance { source },
|
source => Error::Lance { source },
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
|
let id = Self::build_id(&namespace, name);
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
name: name.to_string(),
|
name: name.to_string(),
|
||||||
|
namespace,
|
||||||
|
id,
|
||||||
uri: uri.to_string(),
|
uri: uri.to_string(),
|
||||||
dataset: DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval),
|
dataset: DatasetConsistencyWrapper::new_latest(dataset, read_consistency_interval),
|
||||||
read_consistency_interval,
|
read_consistency_interval,
|
||||||
@@ -1625,6 +1658,7 @@ impl NativeTable {
|
|||||||
pub async fn create_empty(
|
pub async fn create_empty(
|
||||||
uri: &str,
|
uri: &str,
|
||||||
name: &str,
|
name: &str,
|
||||||
|
namespace: Vec<String>,
|
||||||
schema: SchemaRef,
|
schema: SchemaRef,
|
||||||
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
|
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
|
||||||
params: Option<WriteParams>,
|
params: Option<WriteParams>,
|
||||||
@@ -1634,6 +1668,7 @@ impl NativeTable {
|
|||||||
Self::create(
|
Self::create(
|
||||||
uri,
|
uri,
|
||||||
name,
|
name,
|
||||||
|
namespace,
|
||||||
batches,
|
batches,
|
||||||
write_store_wrapper,
|
write_store_wrapper,
|
||||||
params,
|
params,
|
||||||
@@ -2078,13 +2113,11 @@ impl BaseTable for NativeTable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn namespace(&self) -> &[String] {
|
fn namespace(&self) -> &[String] {
|
||||||
// Native tables don't support namespaces yet, return empty slice for root namespace
|
&self.namespace
|
||||||
&[]
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn id(&self) -> &str {
|
fn id(&self) -> &str {
|
||||||
// For native tables, id is same as name since no namespace support
|
&self.id
|
||||||
self.name.as_str()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn version(&self) -> Result<u64> {
|
async fn version(&self) -> Result<u64> {
|
||||||
@@ -2884,7 +2917,7 @@ mod tests {
|
|||||||
|
|
||||||
let batches = make_test_batches();
|
let batches = make_test_batches();
|
||||||
let batches = Box::new(batches) as Box<dyn RecordBatchReader + Send>;
|
let batches = Box::new(batches) as Box<dyn RecordBatchReader + Send>;
|
||||||
let table = NativeTable::create(uri, "test", batches, None, None, None)
|
let table = NativeTable::create(uri, "test", vec![], batches, None, None, None)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user