mirror of
https://github.com/lancedb/lancedb.git
synced 2026-06-05 05:10:41 +00:00
Compare commits
1 Commits
codex/upda
...
codex/upda
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d1d65bd6d2 |
4
.github/workflows/rust.yml
vendored
4
.github/workflows/rust.yml
vendored
@@ -167,13 +167,13 @@ jobs:
|
||||
- name: Build
|
||||
run: |
|
||||
$env:VCPKG_ROOT = $env:VCPKG_INSTALLATION_ROOT
|
||||
cargo build --profile ci --features aws,remote --tests --locked --target ${{ matrix.target }}
|
||||
cargo build --profile ci --features remote --tests --locked --target ${{ matrix.target }}
|
||||
- name: Run tests
|
||||
# Can only run tests when target matches host
|
||||
if: ${{ matrix.target == 'x86_64-pc-windows-msvc' }}
|
||||
run: |
|
||||
$env:VCPKG_ROOT = $env:VCPKG_INSTALLATION_ROOT
|
||||
cargo test --profile ci --features aws,remote --locked
|
||||
cargo test --profile ci --features remote --locked
|
||||
|
||||
msrv:
|
||||
# Check the minimum supported Rust version
|
||||
|
||||
1849
Cargo.lock
generated
1849
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
56
Cargo.toml
56
Cargo.toml
@@ -15,37 +15,37 @@ categories = ["database-implementations"]
|
||||
rust-version = "1.78.0"
|
||||
|
||||
[workspace.dependencies]
|
||||
lance = { "version" = "=1.0.3-rc.1", default-features = false, "tag" = "v1.0.3-rc.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-core = { "version" = "=1.0.3-rc.1", "tag" = "v1.0.3-rc.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datagen = { "version" = "=1.0.3-rc.1", "tag" = "v1.0.3-rc.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-file = { "version" = "=1.0.3-rc.1", "tag" = "v1.0.3-rc.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-io = { "version" = "=1.0.3-rc.1", default-features = false, "tag" = "v1.0.3-rc.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-index = { "version" = "=1.0.3-rc.1", "tag" = "v1.0.3-rc.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-linalg = { "version" = "=1.0.3-rc.1", "tag" = "v1.0.3-rc.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace = { "version" = "=1.0.3-rc.1", "tag" = "v1.0.3-rc.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace-impls = { "version" = "=1.0.3-rc.1", default-features = false, "tag" = "v1.0.3-rc.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-table = { "version" = "=1.0.3-rc.1", "tag" = "v1.0.3-rc.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-testing = { "version" = "=1.0.3-rc.1", "tag" = "v1.0.3-rc.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datafusion = { "version" = "=1.0.3-rc.1", "tag" = "v1.0.3-rc.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-encoding = { "version" = "=1.0.3-rc.1", "tag" = "v1.0.3-rc.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-arrow = { "version" = "=1.0.3-rc.1", "tag" = "v1.0.3-rc.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance = { "version" = "=2.0.0-beta.7", default-features = false, "tag" = "v2.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-core = { "version" = "=2.0.0-beta.7", "tag" = "v2.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datagen = { "version" = "=2.0.0-beta.7", "tag" = "v2.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-file = { "version" = "=2.0.0-beta.7", "tag" = "v2.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-io = { "version" = "=2.0.0-beta.7", default-features = false, "tag" = "v2.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-index = { "version" = "=2.0.0-beta.7", "tag" = "v2.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-linalg = { "version" = "=2.0.0-beta.7", "tag" = "v2.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace = { "version" = "=2.0.0-beta.7", "tag" = "v2.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace-impls = { "version" = "=2.0.0-beta.7", default-features = false, "tag" = "v2.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-table = { "version" = "=2.0.0-beta.7", "tag" = "v2.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-testing = { "version" = "=2.0.0-beta.7", "tag" = "v2.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datafusion = { "version" = "=2.0.0-beta.7", "tag" = "v2.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-encoding = { "version" = "=2.0.0-beta.7", "tag" = "v2.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-arrow = { "version" = "=2.0.0-beta.7", "tag" = "v2.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
ahash = "0.8"
|
||||
# Note that this one does not include pyarrow
|
||||
arrow = { version = "56.1", optional = false }
|
||||
arrow-array = "56.1"
|
||||
arrow-data = "56.1"
|
||||
arrow-ipc = "56.1"
|
||||
arrow-ord = "56.1"
|
||||
arrow-schema = "56.1"
|
||||
arrow-select = "56.1"
|
||||
arrow-cast = "56.1"
|
||||
arrow = { version = "57.2.0", optional = false }
|
||||
arrow-array = "57.2.0"
|
||||
arrow-data = "57.2.0"
|
||||
arrow-ipc = "57.2.0"
|
||||
arrow-ord = "57.2.0"
|
||||
arrow-schema = "57.2.0"
|
||||
arrow-select = "57.2.0"
|
||||
arrow-cast = "57.2.0"
|
||||
async-trait = "0"
|
||||
datafusion = { version = "50.0.0", default-features = false }
|
||||
datafusion-catalog = "50.0.0"
|
||||
datafusion-common = { version = "50.0.0", default-features = false }
|
||||
datafusion-execution = "50.0.0"
|
||||
datafusion-expr = "50.0.0"
|
||||
datafusion-physical-plan = "50.0.0"
|
||||
datafusion = { version = "51.0.0", default-features = false }
|
||||
datafusion-catalog = "51.0.0"
|
||||
datafusion-common = { version = "51.0.0", default-features = false }
|
||||
datafusion-execution = "51.0.0"
|
||||
datafusion-expr = "51.0.0"
|
||||
datafusion-physical-plan = "51.0.0"
|
||||
env_logger = "0.11"
|
||||
half = { "version" = "2.7.1", default-features = false, features = [
|
||||
"num-traits",
|
||||
|
||||
@@ -36,6 +36,6 @@ aws-lc-rs = "=1.13.0"
|
||||
napi-build = "2.1"
|
||||
|
||||
[features]
|
||||
default = ["remote", "lancedb/aws", "lancedb/gcs", "lancedb/azure", "lancedb/dynamodb", "lancedb/oss", "lancedb/huggingface"]
|
||||
default = ["remote", "lancedb/default"]
|
||||
fp16kernels = ["lancedb/fp16kernels"]
|
||||
remote = ["lancedb/remote"]
|
||||
|
||||
@@ -14,15 +14,15 @@ name = "_lancedb"
|
||||
crate-type = ["cdylib"]
|
||||
|
||||
[dependencies]
|
||||
arrow = { version = "56.1", features = ["pyarrow"] }
|
||||
arrow = { version = "57.2.0", features = ["pyarrow"] }
|
||||
async-trait = "0.1"
|
||||
lancedb = { path = "../rust/lancedb", default-features = false }
|
||||
lance-core.workspace = true
|
||||
lance-namespace.workspace = true
|
||||
lance-io.workspace = true
|
||||
env_logger.workspace = true
|
||||
pyo3 = { version = "0.25.1", features = ["extension-module", "abi3-py39"] }
|
||||
pyo3-async-runtimes = { version = "0.25.0", features = [
|
||||
pyo3 = { version = "0.26", features = ["extension-module", "abi3-py39"] }
|
||||
pyo3-async-runtimes = { version = "0.26", features = [
|
||||
"attributes",
|
||||
"tokio-runtime",
|
||||
] }
|
||||
@@ -32,12 +32,12 @@ snafu.workspace = true
|
||||
tokio = { version = "1.40", features = ["sync"] }
|
||||
|
||||
[build-dependencies]
|
||||
pyo3-build-config = { version = "0.25.0", features = [
|
||||
pyo3-build-config = { version = "0.26", features = [
|
||||
"extension-module",
|
||||
"abi3-py39",
|
||||
] }
|
||||
|
||||
[features]
|
||||
default = ["remote", "lancedb/aws", "lancedb/gcs", "lancedb/azure", "lancedb/dynamodb", "lancedb/oss", "lancedb/huggingface"]
|
||||
default = ["remote", "lancedb/default"]
|
||||
fp16kernels = ["lancedb/fp16kernels"]
|
||||
remote = ["lancedb/remote"]
|
||||
|
||||
@@ -179,7 +179,6 @@ class Table:
|
||||
cleanup_since_ms: Optional[int] = None,
|
||||
delete_unverified: Optional[bool] = None,
|
||||
) -> OptimizeStats: ...
|
||||
async def uri(self) -> str: ...
|
||||
@property
|
||||
def tags(self) -> Tags: ...
|
||||
def query(self) -> Query: ...
|
||||
|
||||
@@ -961,27 +961,22 @@ class LanceQueryBuilder(ABC):
|
||||
>>> query = [100, 100]
|
||||
>>> plan = table.search(query).analyze_plan()
|
||||
>>> print(plan) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
|
||||
AnalyzeExec verbose=true, elapsed=..., metrics=...
|
||||
TracedExec, elapsed=..., metrics=...
|
||||
ProjectionExec: elapsed=..., expr=[...],
|
||||
metrics=[output_rows=..., elapsed_compute=..., output_bytes=...]
|
||||
GlobalLimitExec: elapsed=..., skip=0, fetch=10,
|
||||
metrics=[output_rows=..., elapsed_compute=..., output_bytes=...]
|
||||
FilterExec: elapsed=..., _distance@2 IS NOT NULL, metrics=[...]
|
||||
SortExec: elapsed=..., TopK(fetch=10), expr=[...],
|
||||
AnalyzeExec verbose=true, metrics=[], cumulative_cpu=...
|
||||
TracedExec, metrics=[], cumulative_cpu=...
|
||||
ProjectionExec: expr=[...], metrics=[...], cumulative_cpu=...
|
||||
GlobalLimitExec: skip=0, fetch=10, metrics=[...], cumulative_cpu=...
|
||||
FilterExec: _distance@2 IS NOT NULL,
|
||||
metrics=[output_rows=..., elapsed_compute=...], cumulative_cpu=...
|
||||
SortExec: TopK(fetch=10), expr=[...],
|
||||
preserve_partitioning=[...],
|
||||
metrics=[output_rows=..., elapsed_compute=...,
|
||||
output_bytes=..., row_replacements=...]
|
||||
KNNVectorDistance: elapsed=..., metric=l2,
|
||||
metrics=[output_rows=..., elapsed_compute=...,
|
||||
output_bytes=..., output_batches=...]
|
||||
LanceRead: elapsed=..., uri=..., projection=[vector],
|
||||
num_fragments=..., range_before=None, range_after=None,
|
||||
row_id=true, row_addr=false,
|
||||
full_filter=--, refine_filter=--,
|
||||
metrics=[output_rows=..., elapsed_compute=..., output_bytes=...,
|
||||
fragments_scanned=..., ranges_scanned=1, rows_scanned=1,
|
||||
bytes_read=..., iops=..., requests=..., task_wait_time=...]
|
||||
metrics=[output_rows=..., elapsed_compute=..., row_replacements=...],
|
||||
cumulative_cpu=...
|
||||
KNNVectorDistance: metric=l2,
|
||||
metrics=[output_rows=..., elapsed_compute=..., output_batches=...],
|
||||
cumulative_cpu=...
|
||||
LanceRead: uri=..., projection=[vector], ...
|
||||
metrics=[output_rows=..., elapsed_compute=...,
|
||||
bytes_read=..., iops=..., requests=...], cumulative_cpu=...
|
||||
|
||||
Returns
|
||||
-------
|
||||
|
||||
@@ -655,14 +655,6 @@ class RemoteTable(Table):
|
||||
def stats(self):
|
||||
return LOOP.run(self._table.stats())
|
||||
|
||||
@property
|
||||
def uri(self) -> str:
|
||||
"""The table URI (storage location).
|
||||
|
||||
For remote tables, this fetches the location from the server via describe.
|
||||
"""
|
||||
return LOOP.run(self._table.uri())
|
||||
|
||||
def take_offsets(self, offsets: list[int]) -> LanceTakeQueryBuilder:
|
||||
return LanceTakeQueryBuilder(self._table.take_offsets(offsets))
|
||||
|
||||
|
||||
@@ -2218,10 +2218,6 @@ class LanceTable(Table):
|
||||
def stats(self) -> TableStatistics:
|
||||
return LOOP.run(self._table.stats())
|
||||
|
||||
@property
|
||||
def uri(self) -> str:
|
||||
return LOOP.run(self._table.uri())
|
||||
|
||||
def create_scalar_index(
|
||||
self,
|
||||
column: str,
|
||||
@@ -3610,20 +3606,6 @@ class AsyncTable:
|
||||
"""
|
||||
return await self._inner.stats()
|
||||
|
||||
async def uri(self) -> str:
|
||||
"""
|
||||
Get the table URI (storage location).
|
||||
|
||||
For remote tables, this fetches the location from the server via describe.
|
||||
For local tables, this returns the dataset URI.
|
||||
|
||||
Returns
|
||||
-------
|
||||
str
|
||||
The full storage location of the table (e.g., S3/GCS path).
|
||||
"""
|
||||
return await self._inner.uri()
|
||||
|
||||
async def add(
|
||||
self,
|
||||
data: DATA,
|
||||
|
||||
@@ -1967,9 +1967,3 @@ def test_add_table_with_empty_embeddings(tmp_path):
|
||||
on_bad_vectors="drop",
|
||||
)
|
||||
assert table.count_rows() == 1
|
||||
|
||||
|
||||
def test_table_uri(tmp_path):
|
||||
db = lancedb.connect(tmp_path)
|
||||
table = db.create_table("my_table", data=[{"x": 0}])
|
||||
assert table.uri == str(tmp_path / "my_table.lance")
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
#![allow(deprecated)]
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
@@ -36,7 +37,10 @@ impl RecordBatchStream {
|
||||
impl RecordBatchStream {
|
||||
#[getter]
|
||||
pub fn schema(&self, py: Python) -> PyResult<Py<PyAny>> {
|
||||
(*self.schema).clone().into_pyarrow(py)
|
||||
(*self.schema)
|
||||
.clone()
|
||||
.into_pyarrow(py)
|
||||
.map(|bound| bound.unbind())
|
||||
}
|
||||
|
||||
pub fn __aiter__(self_: PyRef<'_, Self>) -> PyRef<'_, Self> {
|
||||
@@ -52,11 +56,12 @@ impl RecordBatchStream {
|
||||
.next()
|
||||
.await
|
||||
.ok_or_else(|| PyStopAsyncIteration::new_err(""))?;
|
||||
#[allow(deprecated)]
|
||||
let py_obj: Py<PyAny> = Python::with_gil(|py| -> PyResult<Py<PyAny>> {
|
||||
inner_next.infer_error()?.to_pyarrow(py)
|
||||
})?;
|
||||
Ok(py_obj)
|
||||
Python::with_gil(|py| {
|
||||
inner_next
|
||||
.infer_error()?
|
||||
.to_pyarrow(py)
|
||||
.map(|bound| bound.unbind())
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
#![allow(deprecated)]
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
@@ -12,7 +13,7 @@ use pyo3::{
|
||||
exceptions::{PyRuntimeError, PyValueError},
|
||||
pyclass, pyfunction, pymethods,
|
||||
types::{PyDict, PyDictMethods},
|
||||
Bound, FromPyObject, Py, PyAny, PyRef, PyResult, Python,
|
||||
Bound, FromPyObject, Py, PyAny, PyObject, PyRef, PyResult, Python,
|
||||
};
|
||||
use pyo3_async_runtimes::tokio::future_into_py;
|
||||
|
||||
@@ -114,7 +115,7 @@ impl Connection {
|
||||
data: Bound<'_, PyAny>,
|
||||
namespace: Vec<String>,
|
||||
storage_options: Option<HashMap<String, String>>,
|
||||
storage_options_provider: Option<Py<PyAny>>,
|
||||
storage_options_provider: Option<PyObject>,
|
||||
location: Option<String>,
|
||||
) -> PyResult<Bound<'a, PyAny>> {
|
||||
let inner = self_.get_inner()?.clone();
|
||||
@@ -152,7 +153,7 @@ impl Connection {
|
||||
schema: Bound<'_, PyAny>,
|
||||
namespace: Vec<String>,
|
||||
storage_options: Option<HashMap<String, String>>,
|
||||
storage_options_provider: Option<Py<PyAny>>,
|
||||
storage_options_provider: Option<PyObject>,
|
||||
location: Option<String>,
|
||||
) -> PyResult<Bound<'a, PyAny>> {
|
||||
let inner = self_.get_inner()?.clone();
|
||||
@@ -187,7 +188,7 @@ impl Connection {
|
||||
name: String,
|
||||
namespace: Vec<String>,
|
||||
storage_options: Option<HashMap<String, String>>,
|
||||
storage_options_provider: Option<Py<PyAny>>,
|
||||
storage_options_provider: Option<PyObject>,
|
||||
index_cache_size: Option<u32>,
|
||||
location: Option<String>,
|
||||
) -> PyResult<Bound<'_, PyAny>> {
|
||||
@@ -307,7 +308,6 @@ impl Connection {
|
||||
..Default::default()
|
||||
};
|
||||
let response = inner.list_namespaces(request).await.infer_error()?;
|
||||
#[allow(deprecated)]
|
||||
Python::with_gil(|py| -> PyResult<Py<PyDict>> {
|
||||
let dict = PyDict::new(py);
|
||||
dict.set_item("namespaces", response.namespaces)?;
|
||||
@@ -328,24 +328,17 @@ impl Connection {
|
||||
let py = self_.py();
|
||||
future_into_py(py, async move {
|
||||
use lance_namespace::models::CreateNamespaceRequest;
|
||||
let mode_enum = mode.and_then(|m| match m.to_lowercase().as_str() {
|
||||
"create" => Some("Create".to_string()),
|
||||
"exist_ok" => Some("ExistOk".to_string()),
|
||||
"overwrite" => Some("Overwrite".to_string()),
|
||||
_ => None,
|
||||
});
|
||||
let request = CreateNamespaceRequest {
|
||||
id: if namespace.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(namespace)
|
||||
},
|
||||
mode: mode_enum,
|
||||
mode,
|
||||
properties,
|
||||
..Default::default()
|
||||
};
|
||||
let response = inner.create_namespace(request).await.infer_error()?;
|
||||
#[allow(deprecated)]
|
||||
Python::with_gil(|py| -> PyResult<Py<PyDict>> {
|
||||
let dict = PyDict::new(py);
|
||||
dict.set_item("properties", response.properties)?;
|
||||
@@ -365,28 +358,17 @@ impl Connection {
|
||||
let py = self_.py();
|
||||
future_into_py(py, async move {
|
||||
use lance_namespace::models::DropNamespaceRequest;
|
||||
let mode_enum = mode.and_then(|m| match m.to_uppercase().as_str() {
|
||||
"SKIP" => Some("Skip".to_string()),
|
||||
"FAIL" => Some("Fail".to_string()),
|
||||
_ => None,
|
||||
});
|
||||
let behavior_enum = behavior.and_then(|b| match b.to_uppercase().as_str() {
|
||||
"RESTRICT" => Some("Restrict".to_string()),
|
||||
"CASCADE" => Some("Cascade".to_string()),
|
||||
_ => None,
|
||||
});
|
||||
let request = DropNamespaceRequest {
|
||||
id: if namespace.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(namespace)
|
||||
},
|
||||
mode: mode_enum,
|
||||
behavior: behavior_enum,
|
||||
mode,
|
||||
behavior,
|
||||
..Default::default()
|
||||
};
|
||||
let response = inner.drop_namespace(request).await.infer_error()?;
|
||||
#[allow(deprecated)]
|
||||
Python::with_gil(|py| -> PyResult<Py<PyDict>> {
|
||||
let dict = PyDict::new(py);
|
||||
dict.set_item("properties", response.properties)?;
|
||||
@@ -414,7 +396,6 @@ impl Connection {
|
||||
..Default::default()
|
||||
};
|
||||
let response = inner.describe_namespace(request).await.infer_error()?;
|
||||
#[allow(deprecated)]
|
||||
Python::with_gil(|py| -> PyResult<Py<PyDict>> {
|
||||
let dict = PyDict::new(py);
|
||||
dict.set_item("properties", response.properties)?;
|
||||
@@ -445,7 +426,6 @@ impl Connection {
|
||||
..Default::default()
|
||||
};
|
||||
let response = inner.list_tables(request).await.infer_error()?;
|
||||
#[allow(deprecated)]
|
||||
Python::with_gil(|py| -> PyResult<Py<PyDict>> {
|
||||
let dict = PyDict::new(py);
|
||||
dict.set_item("tables", response.tables)?;
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
#![allow(deprecated)]
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
@@ -40,34 +41,31 @@ impl<T> PythonErrorExt<T> for std::result::Result<T, LanceError> {
|
||||
request_id,
|
||||
source,
|
||||
status_code,
|
||||
} => {
|
||||
#[allow(deprecated)]
|
||||
Python::with_gil(|py| {
|
||||
let message = err.to_string();
|
||||
let http_err_cls = py
|
||||
.import(intern!(py, "lancedb.remote.errors"))?
|
||||
.getattr(intern!(py, "HttpError"))?;
|
||||
let err = http_err_cls.call1((
|
||||
message,
|
||||
} => Python::with_gil(|py| {
|
||||
let message = err.to_string();
|
||||
let http_err_cls = py
|
||||
.import(intern!(py, "lancedb.remote.errors"))?
|
||||
.getattr(intern!(py, "HttpError"))?;
|
||||
let err = http_err_cls.call1((
|
||||
message,
|
||||
request_id,
|
||||
status_code.map(|s| s.as_u16()),
|
||||
))?;
|
||||
|
||||
if let Some(cause) = source.source() {
|
||||
// The HTTP error already includes the first cause. But
|
||||
// we can add the rest of the chain if there is any more.
|
||||
let cause_err = http_from_rust_error(
|
||||
py,
|
||||
cause,
|
||||
request_id,
|
||||
status_code.map(|s| s.as_u16()),
|
||||
))?;
|
||||
)?;
|
||||
err.setattr(intern!(py, "__cause__"), cause_err)?;
|
||||
}
|
||||
|
||||
if let Some(cause) = source.source() {
|
||||
// The HTTP error already includes the first cause. But
|
||||
// we can add the rest of the chain if there is any more.
|
||||
let cause_err = http_from_rust_error(
|
||||
py,
|
||||
cause,
|
||||
request_id,
|
||||
status_code.map(|s| s.as_u16()),
|
||||
)?;
|
||||
err.setattr(intern!(py, "__cause__"), cause_err)?;
|
||||
}
|
||||
|
||||
Err(PyErr::from_value(err))
|
||||
})
|
||||
}
|
||||
Err(PyErr::from_value(err))
|
||||
}),
|
||||
LanceError::Retry {
|
||||
request_id,
|
||||
request_failures,
|
||||
@@ -78,37 +76,33 @@ impl<T> PythonErrorExt<T> for std::result::Result<T, LanceError> {
|
||||
max_read_failures,
|
||||
source,
|
||||
status_code,
|
||||
} =>
|
||||
{
|
||||
#[allow(deprecated)]
|
||||
Python::with_gil(|py| {
|
||||
let cause_err = http_from_rust_error(
|
||||
py,
|
||||
source.as_ref(),
|
||||
request_id,
|
||||
status_code.map(|s| s.as_u16()),
|
||||
)?;
|
||||
} => Python::with_gil(|py| {
|
||||
let cause_err = http_from_rust_error(
|
||||
py,
|
||||
source.as_ref(),
|
||||
request_id,
|
||||
status_code.map(|s| s.as_u16()),
|
||||
)?;
|
||||
|
||||
let message = err.to_string();
|
||||
let retry_error_cls = py
|
||||
.import(intern!(py, "lancedb.remote.errors"))?
|
||||
.getattr("RetryError")?;
|
||||
let err = retry_error_cls.call1((
|
||||
message,
|
||||
request_id,
|
||||
*request_failures,
|
||||
*connect_failures,
|
||||
*read_failures,
|
||||
*max_request_failures,
|
||||
*max_connect_failures,
|
||||
*max_read_failures,
|
||||
status_code.map(|s| s.as_u16()),
|
||||
))?;
|
||||
let message = err.to_string();
|
||||
let retry_error_cls = py
|
||||
.import(intern!(py, "lancedb.remote.errors"))?
|
||||
.getattr("RetryError")?;
|
||||
let err = retry_error_cls.call1((
|
||||
message,
|
||||
request_id,
|
||||
*request_failures,
|
||||
*connect_failures,
|
||||
*read_failures,
|
||||
*max_request_failures,
|
||||
*max_connect_failures,
|
||||
*max_read_failures,
|
||||
status_code.map(|s| s.as_u16()),
|
||||
))?;
|
||||
|
||||
err.setattr(intern!(py, "__cause__"), cause_err)?;
|
||||
Err(PyErr::from_value(err))
|
||||
})
|
||||
}
|
||||
err.setattr(intern!(py, "__cause__"), cause_err)?;
|
||||
Err(PyErr::from_value(err))
|
||||
}),
|
||||
_ => self.runtime_error(),
|
||||
},
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
#![allow(deprecated)]
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
@@ -12,7 +13,6 @@ pub struct PyHeaderProvider {
|
||||
|
||||
impl Clone for PyHeaderProvider {
|
||||
fn clone(&self) -> Self {
|
||||
#[allow(deprecated)]
|
||||
Python::with_gil(|py| Self {
|
||||
provider: self.provider.clone_ref(py),
|
||||
})
|
||||
@@ -26,7 +26,6 @@ impl PyHeaderProvider {
|
||||
|
||||
/// Get headers from the Python provider (internal implementation)
|
||||
fn get_headers_internal(&self) -> Result<HashMap<String, String>, String> {
|
||||
#[allow(deprecated)]
|
||||
Python::with_gil(|py| {
|
||||
// Call the get_headers method
|
||||
let result = self.provider.call_method0(py, "get_headers");
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
#![allow(deprecated)]
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
@@ -19,7 +20,7 @@ use pyo3::{
|
||||
exceptions::PyRuntimeError,
|
||||
pyclass, pymethods,
|
||||
types::{PyAnyMethods, PyDict, PyDictMethods, PyType},
|
||||
Bound, Py, PyAny, PyRef, PyRefMut, PyResult, Python,
|
||||
Bound, PyAny, PyRef, PyRefMut, PyResult, Python,
|
||||
};
|
||||
use pyo3_async_runtimes::tokio::future_into_py;
|
||||
|
||||
@@ -281,10 +282,7 @@ impl PyPermutationReader {
|
||||
let reader = slf.reader.clone();
|
||||
future_into_py(slf.py(), async move {
|
||||
let schema = reader.output_schema(selection).await.infer_error()?;
|
||||
#[allow(deprecated)]
|
||||
let py_obj: Py<PyAny> =
|
||||
Python::with_gil(|py| -> PyResult<Py<PyAny>> { schema.to_pyarrow(py) })?;
|
||||
Ok(py_obj)
|
||||
Python::with_gil(|py| schema.to_pyarrow(py).map(|b| b.unbind()))
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
#![allow(deprecated)]
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
@@ -29,7 +30,6 @@ use pyo3::types::PyList;
|
||||
use pyo3::types::{PyDict, PyString};
|
||||
use pyo3::Bound;
|
||||
use pyo3::IntoPyObject;
|
||||
use pyo3::Py;
|
||||
use pyo3::PyAny;
|
||||
use pyo3::PyRef;
|
||||
use pyo3::PyResult;
|
||||
@@ -217,7 +217,7 @@ impl<'py> IntoPyObject<'py> for PyQueryVectors {
|
||||
let py_objs = self
|
||||
.0
|
||||
.into_iter()
|
||||
.map(|v| v.to_data().into_pyarrow(py))
|
||||
.map(|v| v.to_data().into_pyarrow(py).map(|b| b.unbind()))
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
PyList::new(py, py_objs)
|
||||
}
|
||||
@@ -454,10 +454,7 @@ impl Query {
|
||||
let inner = self_.inner.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
let schema = inner.output_schema().await.infer_error()?;
|
||||
#[allow(deprecated)]
|
||||
let py_obj: Py<PyAny> =
|
||||
Python::with_gil(|py| -> PyResult<Py<PyAny>> { schema.to_pyarrow(py) })?;
|
||||
Ok(py_obj)
|
||||
Python::with_gil(|py| schema.to_pyarrow(py).map(|b| b.unbind()))
|
||||
})
|
||||
}
|
||||
|
||||
@@ -536,10 +533,7 @@ impl TakeQuery {
|
||||
let inner = self_.inner.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
let schema = inner.output_schema().await.infer_error()?;
|
||||
#[allow(deprecated)]
|
||||
let py_obj: Py<PyAny> =
|
||||
Python::with_gil(|py| -> PyResult<Py<PyAny>> { schema.to_pyarrow(py) })?;
|
||||
Ok(py_obj)
|
||||
Python::with_gil(|py| schema.to_pyarrow(py).map(|b| b.unbind()))
|
||||
})
|
||||
}
|
||||
|
||||
@@ -634,10 +628,7 @@ impl FTSQuery {
|
||||
let inner = self_.inner.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
let schema = inner.output_schema().await.infer_error()?;
|
||||
#[allow(deprecated)]
|
||||
let py_obj: Py<PyAny> =
|
||||
Python::with_gil(|py| -> PyResult<Py<PyAny>> { schema.to_pyarrow(py) })?;
|
||||
Ok(py_obj)
|
||||
Python::with_gil(|py| schema.to_pyarrow(py).map(|b| b.unbind()))
|
||||
})
|
||||
}
|
||||
|
||||
@@ -816,10 +807,7 @@ impl VectorQuery {
|
||||
let inner = self_.inner.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
let schema = inner.output_schema().await.infer_error()?;
|
||||
#[allow(deprecated)]
|
||||
let py_obj: Py<PyAny> =
|
||||
Python::with_gil(|py| -> PyResult<Py<PyAny>> { schema.to_pyarrow(py) })?;
|
||||
Ok(py_obj)
|
||||
Python::with_gil(|py| schema.to_pyarrow(py).map(|b| b.unbind()))
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
#![allow(deprecated)]
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
@@ -17,12 +18,11 @@ use pyo3::types::PyDict;
|
||||
/// Internal wrapper around a Python object implementing StorageOptionsProvider
|
||||
pub struct PyStorageOptionsProvider {
|
||||
/// The Python object implementing fetch_storage_options()
|
||||
inner: Py<PyAny>,
|
||||
inner: PyObject,
|
||||
}
|
||||
|
||||
impl Clone for PyStorageOptionsProvider {
|
||||
fn clone(&self) -> Self {
|
||||
#[allow(deprecated)]
|
||||
Python::with_gil(|py| Self {
|
||||
inner: self.inner.clone_ref(py),
|
||||
})
|
||||
@@ -30,8 +30,7 @@ impl Clone for PyStorageOptionsProvider {
|
||||
}
|
||||
|
||||
impl PyStorageOptionsProvider {
|
||||
pub fn new(obj: Py<PyAny>) -> PyResult<Self> {
|
||||
#[allow(deprecated)]
|
||||
pub fn new(obj: PyObject) -> PyResult<Self> {
|
||||
Python::with_gil(|py| {
|
||||
// Verify the object has a fetch_storage_options method
|
||||
if !obj.bind(py).hasattr("fetch_storage_options")? {
|
||||
@@ -39,9 +38,7 @@ impl PyStorageOptionsProvider {
|
||||
"StorageOptionsProvider must implement fetch_storage_options() method",
|
||||
));
|
||||
}
|
||||
Ok(Self {
|
||||
inner: obj.clone_ref(py),
|
||||
})
|
||||
Ok(Self { inner: obj })
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -64,7 +61,6 @@ impl StorageOptionsProvider for PyStorageOptionsProviderWrapper {
|
||||
let py_provider = self.py_provider.clone();
|
||||
|
||||
tokio::task::spawn_blocking(move || {
|
||||
#[allow(deprecated)]
|
||||
Python::with_gil(|py| {
|
||||
// Call the Python fetch_storage_options method
|
||||
let result = py_provider
|
||||
@@ -124,7 +120,6 @@ impl StorageOptionsProvider for PyStorageOptionsProviderWrapper {
|
||||
}
|
||||
|
||||
fn provider_id(&self) -> String {
|
||||
#[allow(deprecated)]
|
||||
Python::with_gil(|py| {
|
||||
// Call provider_id() method on the Python object
|
||||
let obj = self.py_provider.inner.bind(py);
|
||||
@@ -149,7 +144,7 @@ impl std::fmt::Debug for PyStorageOptionsProviderWrapper {
|
||||
/// This is the main entry point for converting Python StorageOptionsProvider objects
|
||||
/// to Rust trait objects that can be used by the Lance ecosystem.
|
||||
pub fn py_object_to_storage_options_provider(
|
||||
py_obj: Py<PyAny>,
|
||||
py_obj: PyObject,
|
||||
) -> PyResult<Arc<dyn StorageOptionsProvider>> {
|
||||
let py_provider = PyStorageOptionsProvider::new(py_obj)?;
|
||||
Ok(Arc::new(PyStorageOptionsProviderWrapper::new(py_provider)))
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
#![allow(deprecated)]
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
@@ -21,7 +22,7 @@ use pyo3::{
|
||||
exceptions::{PyKeyError, PyRuntimeError, PyValueError},
|
||||
pyclass, pymethods,
|
||||
types::{IntoPyDict, PyAnyMethods, PyDict, PyDictMethods},
|
||||
Bound, FromPyObject, Py, PyAny, PyRef, PyResult, Python,
|
||||
Bound, FromPyObject, PyAny, PyRef, PyResult, Python,
|
||||
};
|
||||
use pyo3_async_runtimes::tokio::future_into_py;
|
||||
|
||||
@@ -287,10 +288,7 @@ impl Table {
|
||||
let inner = self_.inner_ref()?.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
let schema = inner.schema().await.infer_error()?;
|
||||
#[allow(deprecated)]
|
||||
let py_obj: Py<PyAny> =
|
||||
Python::with_gil(|py| -> PyResult<Py<PyAny>> { schema.to_pyarrow(py) })?;
|
||||
Ok(py_obj)
|
||||
Python::with_gil(|py| schema.to_pyarrow(py).map(|b| b.unbind()))
|
||||
})
|
||||
}
|
||||
|
||||
@@ -440,7 +438,6 @@ impl Table {
|
||||
future_into_py(self_.py(), async move {
|
||||
let stats = inner.index_stats(&index_name).await.infer_error()?;
|
||||
if let Some(stats) = stats {
|
||||
#[allow(deprecated)]
|
||||
Python::with_gil(|py| {
|
||||
let dict = PyDict::new(py);
|
||||
dict.set_item("num_indexed_rows", stats.num_indexed_rows)?;
|
||||
@@ -471,7 +468,6 @@ impl Table {
|
||||
let inner = self_.inner_ref()?.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
let stats = inner.stats().await.infer_error()?;
|
||||
#[allow(deprecated)]
|
||||
Python::with_gil(|py| {
|
||||
let dict = PyDict::new(py);
|
||||
dict.set_item("total_bytes", stats.total_bytes)?;
|
||||
@@ -502,11 +498,6 @@ impl Table {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn uri(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
|
||||
let inner = self_.inner_ref()?.clone();
|
||||
future_into_py(self_.py(), async move { inner.uri().await.infer_error() })
|
||||
}
|
||||
|
||||
pub fn __repr__(&self) -> String {
|
||||
match &self.inner {
|
||||
None => format!("ClosedTable({})", self.name),
|
||||
@@ -526,7 +517,6 @@ impl Table {
|
||||
let inner = self_.inner_ref()?.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
let versions = inner.list_versions().await.infer_error()?;
|
||||
#[allow(deprecated)]
|
||||
let versions_as_dict = Python::with_gil(|py| {
|
||||
versions
|
||||
.iter()
|
||||
@@ -878,7 +868,6 @@ impl Tags {
|
||||
let tags = inner.tags().await.infer_error()?;
|
||||
let res = tags.list().await.infer_error()?;
|
||||
|
||||
#[allow(deprecated)]
|
||||
Python::with_gil(|py| {
|
||||
let py_dict = PyDict::new(py);
|
||||
for (key, contents) in res {
|
||||
|
||||
@@ -104,16 +104,11 @@ test-log = "0.2"
|
||||
|
||||
|
||||
[features]
|
||||
default = []
|
||||
default = ["aws", "gcs", "azure", "dynamodb", "oss"]
|
||||
aws = ["lance/aws", "lance-io/aws", "lance-namespace-impls/dir-aws"]
|
||||
oss = ["lance/oss", "lance-io/oss", "lance-namespace-impls/dir-oss"]
|
||||
gcs = ["lance/gcp", "lance-io/gcp", "lance-namespace-impls/dir-gcp"]
|
||||
azure = ["lance/azure", "lance-io/azure", "lance-namespace-impls/dir-azure"]
|
||||
huggingface = [
|
||||
"lance/huggingface",
|
||||
"lance-io/huggingface",
|
||||
"lance-namespace-impls/dir-huggingface",
|
||||
]
|
||||
dynamodb = ["lance/dynamodb", "aws"]
|
||||
remote = ["dep:reqwest", "dep:http", "lance-namespace-impls/rest", "lance-namespace-impls/rest-adapter"]
|
||||
fp16kernels = ["lance-linalg/fp16kernels"]
|
||||
@@ -153,6 +148,3 @@ name = "ivf_pq"
|
||||
[[example]]
|
||||
name = "hybrid_search"
|
||||
required-features = ["sentence-transformers"]
|
||||
|
||||
[package.metadata.docs.rs]
|
||||
all-features = true
|
||||
|
||||
@@ -9,7 +9,6 @@ use std::sync::Arc;
|
||||
use arrow_array::RecordBatchReader;
|
||||
use arrow_schema::{Field, SchemaRef};
|
||||
use lance::dataset::ReadParams;
|
||||
use lance::io::ObjectStoreParams;
|
||||
use lance_namespace::models::{
|
||||
CreateNamespaceRequest, CreateNamespaceResponse, DescribeNamespaceRequest,
|
||||
DescribeNamespaceResponse, DropNamespaceRequest, DropNamespaceResponse, ListNamespacesRequest,
|
||||
@@ -40,18 +39,7 @@ use crate::Table;
|
||||
pub use lance_encoding::version::LanceFileVersion;
|
||||
#[cfg(feature = "remote")]
|
||||
use lance_io::object_store::StorageOptions;
|
||||
use lance_io::object_store::{StorageOptionsAccessor, StorageOptionsProvider};
|
||||
|
||||
fn update_storage_options<F>(store_params: &mut ObjectStoreParams, update: F)
|
||||
where
|
||||
F: FnOnce(&mut HashMap<String, String>),
|
||||
{
|
||||
let mut options = store_params.storage_options().cloned().unwrap_or_default();
|
||||
update(&mut options);
|
||||
store_params.storage_options_accessor = Some(Arc::new(
|
||||
StorageOptionsAccessor::with_static_options(options),
|
||||
));
|
||||
}
|
||||
use lance_io::object_store::StorageOptionsProvider;
|
||||
|
||||
/// A builder for configuring a [`Connection::table_names`] operation
|
||||
pub struct TableNamesBuilder {
|
||||
@@ -258,16 +246,16 @@ impl<const HAS_DATA: bool> CreateTableBuilder<HAS_DATA> {
|
||||
///
|
||||
/// See available options at <https://lancedb.com/docs/storage/>
|
||||
pub fn storage_option(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
|
||||
let store_params = self
|
||||
let store_options = self
|
||||
.request
|
||||
.write_options
|
||||
.lance_write_params
|
||||
.get_or_insert(Default::default())
|
||||
.store_params
|
||||
.get_or_insert(Default::default())
|
||||
.storage_options
|
||||
.get_or_insert(Default::default());
|
||||
update_storage_options(store_params, |options| {
|
||||
options.insert(key.into(), value.into());
|
||||
});
|
||||
store_options.insert(key.into(), value.into());
|
||||
self
|
||||
}
|
||||
|
||||
@@ -281,18 +269,19 @@ impl<const HAS_DATA: bool> CreateTableBuilder<HAS_DATA> {
|
||||
mut self,
|
||||
pairs: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
|
||||
) -> Self {
|
||||
let store_params = self
|
||||
let store_options = self
|
||||
.request
|
||||
.write_options
|
||||
.lance_write_params
|
||||
.get_or_insert(Default::default())
|
||||
.store_params
|
||||
.get_or_insert(Default::default())
|
||||
.storage_options
|
||||
.get_or_insert(Default::default());
|
||||
update_storage_options(store_params, |options| {
|
||||
for (key, value) in pairs {
|
||||
options.insert(key.into(), value.into());
|
||||
}
|
||||
});
|
||||
|
||||
for (key, value) in pairs {
|
||||
store_options.insert(key.into(), value.into());
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
@@ -329,23 +318,24 @@ impl<const HAS_DATA: bool> CreateTableBuilder<HAS_DATA> {
|
||||
/// This has no effect in LanceDB Cloud.
|
||||
#[deprecated(since = "0.15.1", note = "Use `database_options` instead")]
|
||||
pub fn enable_v2_manifest_paths(mut self, use_v2_manifest_paths: bool) -> Self {
|
||||
let store_params = self
|
||||
let storage_options = self
|
||||
.request
|
||||
.write_options
|
||||
.lance_write_params
|
||||
.get_or_insert_with(Default::default)
|
||||
.store_params
|
||||
.get_or_insert_with(Default::default)
|
||||
.storage_options
|
||||
.get_or_insert_with(Default::default);
|
||||
update_storage_options(store_params, |options| {
|
||||
options.insert(
|
||||
OPT_NEW_TABLE_V2_MANIFEST_PATHS.to_string(),
|
||||
if use_v2_manifest_paths {
|
||||
"true".to_string()
|
||||
} else {
|
||||
"false".to_string()
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
storage_options.insert(
|
||||
OPT_NEW_TABLE_V2_MANIFEST_PATHS.to_string(),
|
||||
if use_v2_manifest_paths {
|
||||
"true".to_string()
|
||||
} else {
|
||||
"false".to_string()
|
||||
},
|
||||
);
|
||||
self
|
||||
}
|
||||
|
||||
@@ -354,19 +344,20 @@ impl<const HAS_DATA: bool> CreateTableBuilder<HAS_DATA> {
|
||||
/// The default is `LanceFileVersion::Stable`.
|
||||
#[deprecated(since = "0.15.1", note = "Use `database_options` instead")]
|
||||
pub fn data_storage_version(mut self, data_storage_version: LanceFileVersion) -> Self {
|
||||
let store_params = self
|
||||
let storage_options = self
|
||||
.request
|
||||
.write_options
|
||||
.lance_write_params
|
||||
.get_or_insert_with(Default::default)
|
||||
.store_params
|
||||
.get_or_insert_with(Default::default)
|
||||
.storage_options
|
||||
.get_or_insert_with(Default::default);
|
||||
update_storage_options(store_params, |options| {
|
||||
options.insert(
|
||||
OPT_NEW_TABLE_STORAGE_VERSION.to_string(),
|
||||
data_storage_version.to_string(),
|
||||
);
|
||||
});
|
||||
|
||||
storage_options.insert(
|
||||
OPT_NEW_TABLE_STORAGE_VERSION.to_string(),
|
||||
data_storage_version.to_string(),
|
||||
);
|
||||
self
|
||||
}
|
||||
|
||||
@@ -390,17 +381,13 @@ impl<const HAS_DATA: bool> CreateTableBuilder<HAS_DATA> {
|
||||
/// This allows tables to automatically refresh cloud storage credentials
|
||||
/// when they expire, enabling long-running operations on remote storage.
|
||||
pub fn storage_options_provider(mut self, provider: Arc<dyn StorageOptionsProvider>) -> Self {
|
||||
let store_params = self
|
||||
.request
|
||||
self.request
|
||||
.write_options
|
||||
.lance_write_params
|
||||
.get_or_insert(Default::default())
|
||||
.store_params
|
||||
.get_or_insert(Default::default());
|
||||
let initial = store_params.storage_options().cloned().unwrap_or_default();
|
||||
store_params.storage_options_accessor = Some(Arc::new(
|
||||
StorageOptionsAccessor::with_initial_and_provider(initial, provider),
|
||||
));
|
||||
.get_or_insert(Default::default())
|
||||
.storage_options_provider = Some(provider);
|
||||
self
|
||||
}
|
||||
}
|
||||
@@ -463,15 +450,15 @@ impl OpenTableBuilder {
|
||||
///
|
||||
/// See available options at <https://lancedb.com/docs/storage/>
|
||||
pub fn storage_option(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
|
||||
let store_params = self
|
||||
let storage_options = self
|
||||
.request
|
||||
.lance_read_params
|
||||
.get_or_insert(Default::default())
|
||||
.store_options
|
||||
.get_or_insert(Default::default())
|
||||
.storage_options
|
||||
.get_or_insert(Default::default());
|
||||
update_storage_options(store_params, |options| {
|
||||
options.insert(key.into(), value.into());
|
||||
});
|
||||
storage_options.insert(key.into(), value.into());
|
||||
self
|
||||
}
|
||||
|
||||
@@ -485,17 +472,18 @@ impl OpenTableBuilder {
|
||||
mut self,
|
||||
pairs: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
|
||||
) -> Self {
|
||||
let store_params = self
|
||||
let storage_options = self
|
||||
.request
|
||||
.lance_read_params
|
||||
.get_or_insert(Default::default())
|
||||
.store_options
|
||||
.get_or_insert(Default::default())
|
||||
.storage_options
|
||||
.get_or_insert(Default::default());
|
||||
update_storage_options(store_params, |options| {
|
||||
for (key, value) in pairs {
|
||||
options.insert(key.into(), value.into());
|
||||
}
|
||||
});
|
||||
|
||||
for (key, value) in pairs {
|
||||
storage_options.insert(key.into(), value.into());
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
@@ -519,16 +507,12 @@ impl OpenTableBuilder {
|
||||
/// This allows tables to automatically refresh cloud storage credentials
|
||||
/// when they expire, enabling long-running operations on remote storage.
|
||||
pub fn storage_options_provider(mut self, provider: Arc<dyn StorageOptionsProvider>) -> Self {
|
||||
let store_params = self
|
||||
.request
|
||||
self.request
|
||||
.lance_read_params
|
||||
.get_or_insert(Default::default())
|
||||
.store_options
|
||||
.get_or_insert(Default::default());
|
||||
let initial = store_params.storage_options().cloned().unwrap_or_default();
|
||||
store_params.storage_options_accessor = Some(Arc::new(
|
||||
StorageOptionsAccessor::with_initial_and_provider(initial, provider),
|
||||
));
|
||||
.get_or_insert(Default::default())
|
||||
.storage_options_provider = Some(provider);
|
||||
self
|
||||
}
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@ use lance::dataset::{builder::DatasetBuilder, ReadParams, WriteMode};
|
||||
use lance::io::{ObjectStore, ObjectStoreParams, WrappingObjectStore};
|
||||
use lance_datafusion::utils::StreamingWriteSource;
|
||||
use lance_encoding::version::LanceFileVersion;
|
||||
use lance_io::object_store::{StorageOptionsAccessor, StorageOptionsProvider};
|
||||
use lance_io::object_store::StorageOptionsProvider;
|
||||
use lance_table::io::commit::commit_handler_from_url;
|
||||
use object_store::local::LocalFileSystem;
|
||||
use snafu::ResultExt;
|
||||
@@ -356,11 +356,7 @@ impl ListingDatabase {
|
||||
.clone()
|
||||
.unwrap_or_else(|| Arc::new(lance::session::Session::default()));
|
||||
let os_params = ObjectStoreParams {
|
||||
storage_options_accessor: Some(Arc::new(
|
||||
StorageOptionsAccessor::with_static_options(
|
||||
options.storage_options.clone(),
|
||||
),
|
||||
)),
|
||||
storage_options: Some(options.storage_options.clone()),
|
||||
..Default::default()
|
||||
};
|
||||
let (object_store, base_path) = ObjectStore::from_uri_and_params(
|
||||
@@ -467,20 +463,9 @@ impl ListingDatabase {
|
||||
validate_table_name(name)?;
|
||||
|
||||
let mut uri = self.uri.clone();
|
||||
// If the URI does not end with a path separator, add one
|
||||
// Use forward slash for URIs (http://, s3://, gs://, file://, etc.)
|
||||
// Use platform-specific separator for local paths without scheme
|
||||
let has_scheme = uri.contains("://");
|
||||
let ends_with_separator = uri.ends_with('/') || uri.ends_with('\\');
|
||||
|
||||
if !ends_with_separator {
|
||||
if has_scheme {
|
||||
// URIs always use forward slash
|
||||
uri.push('/');
|
||||
} else {
|
||||
// Local path without scheme - use platform separator
|
||||
uri.push(std::path::MAIN_SEPARATOR);
|
||||
}
|
||||
// If the URI does not end with a slash, add one
|
||||
if !uri.ends_with('/') {
|
||||
uri.push('/');
|
||||
}
|
||||
// Append the table name with the lance file extension
|
||||
uri.push_str(&format!("{}.{}", name, LANCE_FILE_EXTENSION));
|
||||
@@ -496,9 +481,7 @@ impl ListingDatabase {
|
||||
|
||||
async fn drop_tables(&self, names: Vec<String>) -> Result<()> {
|
||||
let object_store_params = ObjectStoreParams {
|
||||
storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
|
||||
self.storage_options.clone(),
|
||||
))),
|
||||
storage_options: Some(self.storage_options.clone()),
|
||||
..Default::default()
|
||||
};
|
||||
let mut uri = self.uri.clone();
|
||||
@@ -547,7 +530,7 @@ impl ListingDatabase {
|
||||
.lance_write_params
|
||||
.as_ref()
|
||||
.and_then(|p| p.store_params.as_ref())
|
||||
.and_then(|sp| sp.storage_options());
|
||||
.and_then(|sp| sp.storage_options.as_ref());
|
||||
|
||||
let storage_version_override = storage_options
|
||||
.and_then(|opts| opts.get(OPT_NEW_TABLE_STORAGE_VERSION))
|
||||
@@ -599,25 +582,20 @@ impl ListingDatabase {
|
||||
// be dropped from the cache when python GCs the table object, which
|
||||
// confounds reuse across tables.
|
||||
if !self.storage_options.is_empty() {
|
||||
let store_params = write_params
|
||||
let storage_options = write_params
|
||||
.store_params
|
||||
.get_or_insert_with(Default::default)
|
||||
.storage_options
|
||||
.get_or_insert_with(Default::default);
|
||||
let mut storage_options = store_params.storage_options().cloned().unwrap_or_default();
|
||||
self.inherit_storage_options(&mut storage_options);
|
||||
store_params.storage_options_accessor = Some(Arc::new(
|
||||
StorageOptionsAccessor::with_static_options(storage_options),
|
||||
));
|
||||
self.inherit_storage_options(storage_options);
|
||||
}
|
||||
|
||||
// Set storage options provider if available
|
||||
if let Some(provider) = self.storage_options_provider.clone() {
|
||||
let store_params = write_params
|
||||
if self.storage_options_provider.is_some() {
|
||||
write_params
|
||||
.store_params
|
||||
.get_or_insert_with(Default::default);
|
||||
let initial = store_params.storage_options().cloned().unwrap_or_default();
|
||||
store_params.storage_options_accessor = Some(Arc::new(
|
||||
StorageOptionsAccessor::with_initial_and_provider(initial, provider),
|
||||
));
|
||||
.get_or_insert_with(Default::default)
|
||||
.storage_options_provider = self.storage_options_provider.clone();
|
||||
}
|
||||
|
||||
write_params.data_storage_version = self
|
||||
@@ -903,9 +881,7 @@ impl Database for ListingDatabase {
|
||||
validate_table_name(&request.target_table_name)?;
|
||||
|
||||
let storage_params = ObjectStoreParams {
|
||||
storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
|
||||
self.storage_options.clone(),
|
||||
))),
|
||||
storage_options: Some(self.storage_options.clone()),
|
||||
..Default::default()
|
||||
};
|
||||
let read_params = ReadParams {
|
||||
@@ -970,29 +946,24 @@ impl Database for ListingDatabase {
|
||||
// be dropped from the cache when python GCs the table object, which
|
||||
// confounds reuse across tables.
|
||||
if !self.storage_options.is_empty() {
|
||||
let store_params = request
|
||||
let storage_options = request
|
||||
.lance_read_params
|
||||
.get_or_insert_with(Default::default)
|
||||
.store_options
|
||||
.get_or_insert_with(Default::default)
|
||||
.storage_options
|
||||
.get_or_insert_with(Default::default);
|
||||
let mut storage_options = store_params.storage_options().cloned().unwrap_or_default();
|
||||
self.inherit_storage_options(&mut storage_options);
|
||||
store_params.storage_options_accessor = Some(Arc::new(
|
||||
StorageOptionsAccessor::with_static_options(storage_options),
|
||||
));
|
||||
self.inherit_storage_options(storage_options);
|
||||
}
|
||||
|
||||
// Set storage options provider if available
|
||||
if let Some(provider) = self.storage_options_provider.clone() {
|
||||
let store_params = request
|
||||
if self.storage_options_provider.is_some() {
|
||||
request
|
||||
.lance_read_params
|
||||
.get_or_insert_with(Default::default)
|
||||
.store_options
|
||||
.get_or_insert_with(Default::default);
|
||||
let initial = store_params.storage_options().cloned().unwrap_or_default();
|
||||
store_params.storage_options_accessor = Some(Arc::new(
|
||||
StorageOptionsAccessor::with_initial_and_provider(initial, provider),
|
||||
));
|
||||
.get_or_insert_with(Default::default)
|
||||
.storage_options_provider = self.storage_options_provider.clone();
|
||||
}
|
||||
|
||||
// Some ReadParams are exposed in the OpenTableBuilder, but we also
|
||||
@@ -1100,7 +1071,6 @@ mod tests {
|
||||
use crate::table::{Table, TableDefinition};
|
||||
use arrow_array::{Int32Array, RecordBatch, StringArray};
|
||||
use arrow_schema::{DataType, Field, Schema};
|
||||
use std::path::PathBuf;
|
||||
use tempfile::tempdir;
|
||||
|
||||
async fn setup_database() -> (tempfile::TempDir, ListingDatabase) {
|
||||
@@ -1899,9 +1869,7 @@ mod tests {
|
||||
let write_options = WriteOptions {
|
||||
lance_write_params: Some(lance::dataset::WriteParams {
|
||||
store_params: Some(lance::io::ObjectStoreParams {
|
||||
storage_options_accessor: Some(Arc::new(
|
||||
StorageOptionsAccessor::with_static_options(storage_options),
|
||||
)),
|
||||
storage_options: Some(storage_options),
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
@@ -1975,9 +1943,7 @@ mod tests {
|
||||
let write_options = WriteOptions {
|
||||
lance_write_params: Some(lance::dataset::WriteParams {
|
||||
store_params: Some(lance::io::ObjectStoreParams {
|
||||
storage_options_accessor: Some(Arc::new(
|
||||
StorageOptionsAccessor::with_static_options(storage_options),
|
||||
)),
|
||||
storage_options: Some(storage_options),
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
@@ -2080,19 +2046,6 @@ mod tests {
|
||||
assert_eq!(db_options.new_table_config.enable_stable_row_ids, None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_table_uri() {
|
||||
let (_tempdir, db) = setup_database().await;
|
||||
|
||||
let mut pb = PathBuf::new();
|
||||
pb.push(db.uri.clone());
|
||||
pb.push("test.lance");
|
||||
|
||||
let expected = pb.to_str().unwrap();
|
||||
let uri = db.table_uri("test").ok().unwrap();
|
||||
assert_eq!(uri, expected);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_namespace_client() {
|
||||
let (_tempdir, db) = setup_database().await;
|
||||
|
||||
@@ -205,26 +205,24 @@ impl Database for LanceNamespaceDatabase {
|
||||
let mut table_id = request.namespace.clone();
|
||||
table_id.push(request.name.clone());
|
||||
|
||||
let create_empty_request = DeclareTableRequest {
|
||||
let declare_request = DeclareTableRequest {
|
||||
id: Some(table_id.clone()),
|
||||
location: None,
|
||||
vend_credentials: None,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let create_empty_response = self
|
||||
let declare_response = self
|
||||
.namespace
|
||||
.declare_table(create_empty_request)
|
||||
.declare_table(declare_request)
|
||||
.await
|
||||
.map_err(|e| Error::Runtime {
|
||||
message: format!("Failed to declare table: {}", e),
|
||||
})?;
|
||||
|
||||
let location = create_empty_response
|
||||
.location
|
||||
.ok_or_else(|| Error::Runtime {
|
||||
message: "Table location is missing from create_empty_table response".to_string(),
|
||||
})?;
|
||||
let location = declare_response.location.ok_or_else(|| Error::Runtime {
|
||||
message: "Table location is missing from declare_table response".to_string(),
|
||||
})?;
|
||||
|
||||
let native_table = NativeTable::create_from_namespace(
|
||||
self.namespace.clone(),
|
||||
|
||||
@@ -8,10 +8,9 @@ use datafusion_execution::{disk_manager::DiskManagerBuilder, runtime_env::Runtim
|
||||
use datafusion_expr::col;
|
||||
use futures::TryStreamExt;
|
||||
use lance_core::ROW_ID;
|
||||
use lance_datafusion::exec::SessionContextExt;
|
||||
|
||||
use crate::{
|
||||
arrow::{SendableRecordBatchStream, SendableRecordBatchStreamExt, SimpleRecordBatchStream},
|
||||
arrow::{SendableRecordBatchStream, SimpleRecordBatchStream},
|
||||
connect,
|
||||
database::{CreateTableData, CreateTableRequest, Database},
|
||||
dataloader::permutation::{
|
||||
@@ -178,12 +177,17 @@ impl PermutationBuilder {
|
||||
.build_arc()
|
||||
.unwrap(),
|
||||
);
|
||||
let df = ctx
|
||||
.read_one_shot(data.into_df_stream())
|
||||
let batches = data
|
||||
.map_err(|e| Error::Other {
|
||||
message: format!("Failed to setup sort by split id: {}", e),
|
||||
source: Some(e.into()),
|
||||
})?;
|
||||
})
|
||||
.try_collect::<Vec<_>>()
|
||||
.await?;
|
||||
let df = ctx.read_batches(batches).map_err(|e| Error::Other {
|
||||
message: format!("Failed to setup sort by split id: {}", e),
|
||||
source: Some(e.into()),
|
||||
})?;
|
||||
let df_stream = df
|
||||
.sort_by(vec![col(SPLIT_ID_COLUMN)])
|
||||
.map_err(|e| Error::Other {
|
||||
|
||||
@@ -25,14 +25,13 @@
|
||||
//!
|
||||
//! ## Crate Features
|
||||
//!
|
||||
//! - `aws` - Enable AWS S3 object store support.
|
||||
//! - `dynamodb` - Enable DynamoDB manifest store support.
|
||||
//! - `azure` - Enable Azure Blob Storage object store support.
|
||||
//! - `gcs` - Enable Google Cloud Storage object store support.
|
||||
//! - `oss` - Enable Alibaba Cloud OSS object store support.
|
||||
//! - `remote` - Enable remote client to connect to LanceDB cloud.
|
||||
//! - `huggingface` - Enable HuggingFace Hub integration for loading datasets from the Hub.
|
||||
//! - `fp16kernels` - Enable FP16 kernels for faster vector search on CPU.
|
||||
//! ### Experimental Features
|
||||
//!
|
||||
//! These features are not enabled by default. They are experimental or in-development features that
|
||||
//! are not yet ready to be released.
|
||||
//!
|
||||
//! - `remote` - Enable remote client to connect to LanceDB cloud. This is not yet fully implemented
|
||||
//! and should not be enabled.
|
||||
//!
|
||||
//! ### Quick Start
|
||||
//!
|
||||
@@ -54,8 +53,6 @@
|
||||
//! You can also use [`ConnectOptions`] to configure the connection to the database.
|
||||
//!
|
||||
//! ```rust
|
||||
//! # #[cfg(feature = "aws")]
|
||||
//! # {
|
||||
//! use object_store::aws::AwsCredential;
|
||||
//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
|
||||
//! let db = lancedb::connect("data/sample-lancedb")
|
||||
@@ -68,7 +65,6 @@
|
||||
//! .await
|
||||
//! .unwrap();
|
||||
//! # });
|
||||
//! # }
|
||||
//! ```
|
||||
//!
|
||||
//! LanceDB uses [arrow-rs](https://github.com/apache/arrow-rs) to define schema, data types and array itself.
|
||||
|
||||
@@ -204,7 +204,6 @@ pub struct RemoteTable<S: HttpSend = Sender> {
|
||||
server_version: ServerVersion,
|
||||
|
||||
version: RwLock<Option<u64>>,
|
||||
location: RwLock<Option<String>>,
|
||||
}
|
||||
|
||||
impl<S: HttpSend> RemoteTable<S> {
|
||||
@@ -222,7 +221,6 @@ impl<S: HttpSend> RemoteTable<S> {
|
||||
identifier,
|
||||
server_version,
|
||||
version: RwLock::new(None),
|
||||
location: RwLock::new(None),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -641,7 +639,6 @@ impl<S: HttpSend> RemoteTable<S> {
|
||||
struct TableDescription {
|
||||
version: u64,
|
||||
schema: JsonSchema,
|
||||
location: Option<String>,
|
||||
}
|
||||
|
||||
impl<S: HttpSend> std::fmt::Display for RemoteTable<S> {
|
||||
@@ -670,7 +667,6 @@ mod test_utils {
|
||||
identifier: name,
|
||||
server_version: version.map(ServerVersion).unwrap_or_default(),
|
||||
version: RwLock::new(None),
|
||||
location: RwLock::new(None),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1465,28 +1461,8 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
message: "table_definition is not supported on LanceDB cloud.".into(),
|
||||
})
|
||||
}
|
||||
async fn uri(&self) -> Result<String> {
|
||||
// Check if we already have the location cached
|
||||
{
|
||||
let location = self.location.read().await;
|
||||
if let Some(ref loc) = *location {
|
||||
return Ok(loc.clone());
|
||||
}
|
||||
}
|
||||
|
||||
// Fetch from server via describe
|
||||
let description = self.describe().await?;
|
||||
let location = description.location.ok_or_else(|| Error::NotSupported {
|
||||
message: "Table URI not supported by the server".into(),
|
||||
})?;
|
||||
|
||||
// Cache the location for future use
|
||||
{
|
||||
let mut cached_location = self.location.write().await;
|
||||
*cached_location = Some(location.clone());
|
||||
}
|
||||
|
||||
Ok(location)
|
||||
fn dataset_uri(&self) -> &str {
|
||||
"NOT_SUPPORTED"
|
||||
}
|
||||
|
||||
async fn storage_options(&self) -> Option<HashMap<String, String>> {
|
||||
@@ -3356,69 +3332,4 @@ mod tests {
|
||||
let result = table.drop_columns(&["old_col1", "old_col2"]).await.unwrap();
|
||||
assert_eq!(result.version, 5);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_uri() {
|
||||
let table = Table::new_with_handler("my_table", |request| {
|
||||
assert_eq!(request.method(), "POST");
|
||||
assert_eq!(request.url().path(), "/v1/table/my_table/describe/");
|
||||
|
||||
http::Response::builder()
|
||||
.status(200)
|
||||
.body(r#"{"version": 1, "schema": {"fields": []}, "location": "s3://bucket/path/to/table"}"#)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
let uri = table.uri().await.unwrap();
|
||||
assert_eq!(uri, "s3://bucket/path/to/table");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_uri_missing_location() {
|
||||
let table = Table::new_with_handler("my_table", |request| {
|
||||
assert_eq!(request.method(), "POST");
|
||||
assert_eq!(request.url().path(), "/v1/table/my_table/describe/");
|
||||
|
||||
// Server returns response without location field
|
||||
http::Response::builder()
|
||||
.status(200)
|
||||
.body(r#"{"version": 1, "schema": {"fields": []}}"#)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
let result = table.uri().await;
|
||||
assert!(result.is_err());
|
||||
assert!(matches!(&result, Err(Error::NotSupported { .. })));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_uri_caching() {
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
let call_count = Arc::new(AtomicUsize::new(0));
|
||||
let call_count_clone = call_count.clone();
|
||||
|
||||
let table = Table::new_with_handler("my_table", move |request| {
|
||||
assert_eq!(request.url().path(), "/v1/table/my_table/describe/");
|
||||
call_count_clone.fetch_add(1, Ordering::SeqCst);
|
||||
|
||||
http::Response::builder()
|
||||
.status(200)
|
||||
.body(
|
||||
r#"{"version": 1, "schema": {"fields": []}, "location": "gs://bucket/table"}"#,
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
// First call should fetch from server
|
||||
let uri1 = table.uri().await.unwrap();
|
||||
assert_eq!(uri1, "gs://bucket/table");
|
||||
assert_eq!(call_count.load(Ordering::SeqCst), 1);
|
||||
|
||||
// Second call should use cached value
|
||||
let uri2 = table.uri().await.unwrap();
|
||||
assert_eq!(uri2, "gs://bucket/table");
|
||||
assert_eq!(call_count.load(Ordering::SeqCst), 1); // Still 1, no new call
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,7 +40,7 @@ use lance_index::vector::pq::PQBuildParams;
|
||||
use lance_index::vector::sq::builder::SQBuildParams;
|
||||
use lance_index::DatasetIndexExt;
|
||||
use lance_index::IndexType;
|
||||
use lance_io::object_store::{LanceNamespaceStorageOptionsProvider, StorageOptionsAccessor};
|
||||
use lance_io::object_store::LanceNamespaceStorageOptionsProvider;
|
||||
use lance_namespace::models::{
|
||||
QueryTableRequest as NsQueryTableRequest, QueryTableRequestColumns,
|
||||
QueryTableRequestFullTextQuery, QueryTableRequestVector, StringFtsQuery,
|
||||
@@ -608,8 +608,8 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
|
||||
async fn list_versions(&self) -> Result<Vec<Version>>;
|
||||
/// Get the table definition.
|
||||
async fn table_definition(&self) -> Result<TableDefinition>;
|
||||
/// Get the table URI (storage location)
|
||||
async fn uri(&self) -> Result<String>;
|
||||
/// Get the table URI
|
||||
fn dataset_uri(&self) -> &str;
|
||||
/// Get the storage options used when opening this table, if any.
|
||||
async fn storage_options(&self) -> Option<HashMap<String, String>>;
|
||||
/// Poll until the columns are fully indexed. Will return Error::Timeout if the columns
|
||||
@@ -1317,12 +1317,11 @@ impl Table {
|
||||
self.inner.list_indices().await
|
||||
}
|
||||
|
||||
/// Get the table URI (storage location)
|
||||
/// Get the underlying dataset URI
|
||||
///
|
||||
/// Returns the full storage location of the table (e.g., S3/GCS path).
|
||||
/// For remote tables, this fetches the location from the server via describe.
|
||||
pub async fn uri(&self) -> Result<String> {
|
||||
self.inner.uri().await
|
||||
/// Warning: This is an internal API and the return value is subject to change.
|
||||
pub fn dataset_uri(&self) -> &str {
|
||||
self.inner.dataset_uri()
|
||||
}
|
||||
|
||||
/// Get the storage options used when opening this table, if any.
|
||||
@@ -1412,26 +1411,35 @@ impl Table {
|
||||
let projected_plans = plans
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.map(|(plan_i, plan)| {
|
||||
let query_index = datafusion_common::ScalarValue::Int32(Some(plan_i as i32));
|
||||
let query_index_expr =
|
||||
datafusion_physical_plan::expressions::Literal::new(query_index);
|
||||
let query_index_expr =
|
||||
Arc::new(query_index_expr) as Arc<dyn datafusion_physical_plan::PhysicalExpr>;
|
||||
let mut projections = vec![(query_index_expr, "query_index".to_string())];
|
||||
projections.extend_from_slice(&project_all_columns);
|
||||
let projection = ProjectionExec::try_new(projections, plan).unwrap();
|
||||
Arc::new(projection) as Arc<dyn datafusion_physical_plan::ExecutionPlan>
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
.map(
|
||||
|(plan_i, plan)| -> Result<Arc<dyn datafusion_physical_plan::ExecutionPlan>> {
|
||||
let query_index = datafusion_common::ScalarValue::Int32(Some(plan_i as i32));
|
||||
let query_index_expr =
|
||||
datafusion_physical_plan::expressions::Literal::new(query_index);
|
||||
let query_index_expr = Arc::new(query_index_expr)
|
||||
as Arc<dyn datafusion_physical_plan::PhysicalExpr>;
|
||||
let mut projections = vec![(query_index_expr, "query_index".to_string())];
|
||||
projections.extend_from_slice(&project_all_columns);
|
||||
let projection =
|
||||
ProjectionExec::try_new(projections, plan).map_err(|e| Error::Runtime {
|
||||
message: format!("Failed to build projection plan: {e}"),
|
||||
})?;
|
||||
Ok(Arc::new(projection) as Arc<dyn datafusion_physical_plan::ExecutionPlan>)
|
||||
},
|
||||
)
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
let unioned = Arc::new(UnionExec::new(projected_plans));
|
||||
let unioned = UnionExec::try_new(projected_plans).map_err(|e| Error::Runtime {
|
||||
message: format!("Failed to union query plans: {e}"),
|
||||
})?;
|
||||
// We require 1 partition in the final output
|
||||
let repartitioned = RepartitionExec::try_new(
|
||||
unioned,
|
||||
datafusion_physical_plan::Partitioning::RoundRobinBatch(1),
|
||||
)
|
||||
.unwrap();
|
||||
.map_err(|e| Error::Runtime {
|
||||
message: format!("Failed to repartition query plans: {e}"),
|
||||
})?;
|
||||
Ok(Arc::new(repartitioned))
|
||||
}
|
||||
|
||||
@@ -1666,14 +1674,18 @@ impl NativeTable {
|
||||
|
||||
// Use DatasetBuilder::from_namespace which automatically fetches location
|
||||
// and storage options from the namespace
|
||||
let builder = DatasetBuilder::from_namespace(namespace_client.clone(), table_id)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
lance::Error::Namespace { source, .. } => Error::Runtime {
|
||||
message: format!("Failed to get table info from namespace: {:?}", source),
|
||||
},
|
||||
source => Error::Lance { source },
|
||||
})?;
|
||||
let builder = DatasetBuilder::from_namespace(
|
||||
namespace_client.clone(),
|
||||
table_id,
|
||||
false, // Don't ignore namespace storage options
|
||||
)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
lance::Error::Namespace { source, .. } => Error::Runtime {
|
||||
message: format!("Failed to get table info from namespace: {:?}", source),
|
||||
},
|
||||
source => Error::Lance { source },
|
||||
})?;
|
||||
|
||||
let dataset = builder
|
||||
.with_read_params(params)
|
||||
@@ -1877,10 +1889,7 @@ impl NativeTable {
|
||||
let store_params = params
|
||||
.store_params
|
||||
.get_or_insert_with(ObjectStoreParams::default);
|
||||
let initial = store_params.storage_options().cloned().unwrap_or_default();
|
||||
store_params.storage_options_accessor = Some(Arc::new(
|
||||
StorageOptionsAccessor::with_initial_and_provider(initial, storage_options_provider),
|
||||
));
|
||||
store_params.storage_options_provider = Some(storage_options_provider);
|
||||
|
||||
// Patch the params if we have a write store wrapper
|
||||
let params = match write_store_wrapper.clone() {
|
||||
@@ -2334,6 +2343,23 @@ impl NativeTable {
|
||||
|
||||
/// Convert an AnyQuery to the namespace QueryTableRequest format.
|
||||
fn convert_to_namespace_query(&self, query: &AnyQuery) -> Result<NsQueryTableRequest> {
|
||||
let to_namespace_columns =
|
||||
|select: &Select| -> Result<Option<Box<QueryTableRequestColumns>>> {
|
||||
match select {
|
||||
Select::All => Ok(None),
|
||||
Select::Columns(cols) => {
|
||||
let mut columns = QueryTableRequestColumns::new();
|
||||
columns.column_names = Some(cols.clone());
|
||||
Ok(Some(Box::new(columns)))
|
||||
}
|
||||
Select::Dynamic(_) => Err(Error::NotSupported {
|
||||
message:
|
||||
"Dynamic column selection is not supported for server-side queries"
|
||||
.to_string(),
|
||||
}),
|
||||
}
|
||||
};
|
||||
|
||||
match query {
|
||||
AnyQuery::VectorQuery(vq) => {
|
||||
// Extract the query vector(s)
|
||||
@@ -2345,22 +2371,6 @@ impl NativeTable {
|
||||
None => None,
|
||||
};
|
||||
|
||||
// Convert select to columns list
|
||||
let columns: Option<Box<QueryTableRequestColumns>> = match &vq.base.select {
|
||||
Select::All => None,
|
||||
Select::Columns(cols) => Some(Box::new(QueryTableRequestColumns {
|
||||
column_names: Some(cols.clone()),
|
||||
column_aliases: None,
|
||||
})),
|
||||
Select::Dynamic(_) => {
|
||||
return Err(Error::NotSupported {
|
||||
message:
|
||||
"Dynamic column selection is not supported for server-side queries"
|
||||
.to_string(),
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
// Check for unsupported features
|
||||
if vq.base.reranker.is_some() {
|
||||
return Err(Error::NotSupported {
|
||||
@@ -2368,6 +2378,8 @@ impl NativeTable {
|
||||
});
|
||||
}
|
||||
|
||||
let columns = to_namespace_columns(&vq.base.select)?;
|
||||
|
||||
// Convert FTS query if present
|
||||
let full_text_query = vq.base.full_text_search.as_ref().map(|fts| {
|
||||
let columns = fts.columns();
|
||||
@@ -2423,19 +2435,7 @@ impl NativeTable {
|
||||
.map(|f| self.filter_to_sql(f))
|
||||
.transpose()?;
|
||||
|
||||
let columns: Option<Box<QueryTableRequestColumns>> = match &q.select {
|
||||
Select::All => None,
|
||||
Select::Columns(cols) => Some(Box::new(QueryTableRequestColumns {
|
||||
column_names: Some(cols.clone()),
|
||||
column_aliases: None,
|
||||
})),
|
||||
Select::Dynamic(_) => {
|
||||
return Err(Error::NotSupported {
|
||||
message: "Dynamic columns are not supported for server-side query"
|
||||
.to_string(),
|
||||
});
|
||||
}
|
||||
};
|
||||
let columns = to_namespace_columns(&q.select)?;
|
||||
|
||||
// Handle full text search if present
|
||||
let full_text_query = q.full_text_search.as_ref().map(|fts| {
|
||||
@@ -3232,8 +3232,8 @@ impl BaseTable for NativeTable {
|
||||
Ok(results.into_iter().flatten().collect())
|
||||
}
|
||||
|
||||
async fn uri(&self) -> Result<String> {
|
||||
Ok(self.uri.clone())
|
||||
fn dataset_uri(&self) -> &str {
|
||||
self.uri.as_str()
|
||||
}
|
||||
|
||||
async fn storage_options(&self) -> Option<HashMap<String, String>> {
|
||||
@@ -3241,7 +3241,7 @@ impl BaseTable for NativeTable {
|
||||
.get()
|
||||
.await
|
||||
.ok()
|
||||
.and_then(|dataset| dataset.initial_storage_options().cloned())
|
||||
.and_then(|dataset| dataset.storage_options().cloned())
|
||||
}
|
||||
|
||||
async fn index_stats(&self, index_name: &str) -> Result<Option<IndexStatistics>> {
|
||||
@@ -5151,15 +5151,16 @@ mod tests {
|
||||
let any_query = AnyQuery::VectorQuery(vq);
|
||||
let ns_request = table.convert_to_namespace_query(&any_query).unwrap();
|
||||
|
||||
let column_names = ns_request
|
||||
.columns
|
||||
.as_ref()
|
||||
.and_then(|cols| cols.column_names.clone());
|
||||
|
||||
assert_eq!(ns_request.k, 10);
|
||||
assert_eq!(ns_request.offset, Some(5));
|
||||
assert_eq!(ns_request.filter, Some("id > 0".to_string()));
|
||||
assert_eq!(column_names, Some(vec!["id".to_string()]));
|
||||
assert_eq!(
|
||||
ns_request
|
||||
.columns
|
||||
.as_ref()
|
||||
.and_then(|cols| cols.column_names.clone()),
|
||||
Some(vec!["id".to_string()])
|
||||
);
|
||||
assert_eq!(ns_request.vector_column, Some("vector".to_string()));
|
||||
assert_eq!(ns_request.distance_type, Some("l2".to_string()));
|
||||
assert!(ns_request.vector.single_vector.is_some());
|
||||
@@ -5196,16 +5197,17 @@ mod tests {
|
||||
let any_query = AnyQuery::Query(q);
|
||||
let ns_request = table.convert_to_namespace_query(&any_query).unwrap();
|
||||
|
||||
let column_names = ns_request
|
||||
.columns
|
||||
.as_ref()
|
||||
.and_then(|cols| cols.column_names.clone());
|
||||
|
||||
// Plain queries should pass an empty vector
|
||||
assert_eq!(ns_request.k, 20);
|
||||
assert_eq!(ns_request.offset, Some(5));
|
||||
assert_eq!(ns_request.filter, Some("id > 5".to_string()));
|
||||
assert_eq!(column_names, Some(vec!["id".to_string()]));
|
||||
assert_eq!(
|
||||
ns_request
|
||||
.columns
|
||||
.as_ref()
|
||||
.and_then(|cols| cols.column_names.clone()),
|
||||
Some(vec!["id".to_string()])
|
||||
);
|
||||
assert_eq!(ns_request.with_row_id, Some(true));
|
||||
assert_eq!(ns_request.bypass_vector_index, Some(true));
|
||||
assert!(ns_request.vector_column.is_none()); // No vector column for plain queries
|
||||
|
||||
@@ -101,6 +101,7 @@ impl DatasetRef {
|
||||
refs::Ref::Version(_, Some(target_ver)) => version != target_ver,
|
||||
refs::Ref::Version(_, None) => true, // No specific version, always checkout
|
||||
refs::Ref::Tag(_) => true, // Always checkout for tags
|
||||
refs::Ref::VersionNumber(target_ver) => version != target_ver,
|
||||
};
|
||||
|
||||
if should_checkout {
|
||||
|
||||
Reference in New Issue
Block a user