Compare commits

..

1 Commits

Author SHA1 Message Date
lancedb automation
2974b7e5c6 chore: update lance dependency to v6.0.0-beta.6 2026-04-29 07:27:24 +00:00
31 changed files with 806 additions and 1352 deletions

1277
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -13,40 +13,40 @@ categories = ["database-implementations"]
rust-version = "1.91.0"
[workspace.dependencies]
lance = { "version" = "=6.0.0-beta.4", default-features = false, "tag" = "v6.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=6.0.0-beta.4", "tag" = "v6.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=6.0.0-beta.4", "tag" = "v6.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=6.0.0-beta.4", "tag" = "v6.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=6.0.0-beta.4", default-features = false, "tag" = "v6.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=6.0.0-beta.4", "tag" = "v6.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=6.0.0-beta.4", "tag" = "v6.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=6.0.0-beta.4", "tag" = "v6.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=6.0.0-beta.4", default-features = false, "tag" = "v6.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=6.0.0-beta.4", "tag" = "v6.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=6.0.0-beta.4", "tag" = "v6.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=6.0.0-beta.4", "tag" = "v6.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=6.0.0-beta.4", "tag" = "v6.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=6.0.0-beta.4", "tag" = "v6.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance = { "version" = "=6.0.0-beta.6", default-features = false, "tag" = "v6.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=6.0.0-beta.6", "tag" = "v6.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=6.0.0-beta.6", "tag" = "v6.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=6.0.0-beta.6", "tag" = "v6.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=6.0.0-beta.6", default-features = false, "tag" = "v6.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=6.0.0-beta.6", "tag" = "v6.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=6.0.0-beta.6", "tag" = "v6.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=6.0.0-beta.6", "tag" = "v6.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=6.0.0-beta.6", default-features = false, "tag" = "v6.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=6.0.0-beta.6", "tag" = "v6.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=6.0.0-beta.6", "tag" = "v6.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=6.0.0-beta.6", "tag" = "v6.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=6.0.0-beta.6", "tag" = "v6.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=6.0.0-beta.6", "tag" = "v6.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
ahash = "0.8"
# Note that this one does not include pyarrow
arrow = { version = "57.2", optional = false }
arrow-array = "57.2"
arrow-data = "57.2"
arrow-ipc = "57.2"
arrow-ord = "57.2"
arrow-schema = "57.2"
arrow-select = "57.2"
arrow-cast = "57.2"
arrow = { version = "58.1", optional = false }
arrow-array = "58.1"
arrow-data = "58.1"
arrow-ipc = "58.1"
arrow-ord = "58.1"
arrow-schema = "58.1"
arrow-select = "58.1"
arrow-cast = "58.1"
async-trait = "0"
datafusion = { version = "52.1", default-features = false }
datafusion-catalog = "52.1"
datafusion-common = { version = "52.1", default-features = false }
datafusion-execution = "52.1"
datafusion-expr = "52.1"
datafusion-functions = "52.1"
datafusion-physical-plan = "52.1"
datafusion-physical-expr = "52.1"
datafusion-sql = "52.1"
datafusion = { version = "53.1", default-features = false }
datafusion-catalog = "53.1"
datafusion-common = { version = "53.1", default-features = false }
datafusion-execution = "53.1"
datafusion-expr = "53.1"
datafusion-functions = "53.1"
datafusion-physical-plan = "53.1"
datafusion-physical-expr = "53.1"
datafusion-sql = "53.1"
env_logger = "0.11"
half = { "version" = "2.7.1", default-features = false, features = [
"num-traits",

View File

@@ -41,29 +41,6 @@ for testing purposes.
***
### manifestEnabled?
```ts
optional manifestEnabled: boolean;
```
(For LanceDB OSS only): use directory namespace manifests as the source
of truth for table metadata. Existing directory-listed root tables are
migrated into the manifest on access.
***
### namespaceClientProperties?
```ts
optional namespaceClientProperties: Record<string, string>;
```
(For LanceDB OSS only): extra properties for the backing namespace
client used by manifest-enabled native connections.
***
### readConsistencyInterval?
```ts

View File

@@ -28,7 +28,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<arrow.version>15.0.0</arrow.version>
<lance-core.version>6.0.0-beta.4</lance-core.version>
<lance-core.version>6.0.0-beta.6</lance-core.version>
<spotless.skip>false</spotless.skip>
<spotless.version>2.30.0</spotless.version>
<spotless.java.googlejavaformat.version>1.7</spotless.java.googlejavaformat.version>

View File

@@ -16,7 +16,7 @@ crate-type = ["cdylib"]
async-trait.workspace = true
arrow-ipc.workspace = true
arrow-array.workspace = true
arrow-buffer = "57.2"
arrow-buffer = "58.1"
half.workspace = true
arrow-schema.workspace = true
env_logger.workspace = true

View File

@@ -67,12 +67,6 @@ impl Connection {
builder = builder.storage_option(key, value);
}
}
if let Some(manifest_enabled) = options.manifest_enabled {
builder = builder.manifest_enabled(manifest_enabled);
}
if let Some(namespace_client_properties) = options.namespace_client_properties {
builder = builder.namespace_client_properties(namespace_client_properties);
}
// Create client config, optionally with header provider
let client_config = options.client_config.unwrap_or_default();

View File

@@ -37,13 +37,6 @@ pub struct ConnectionOptions {
///
/// The available options are described at https://docs.lancedb.com/storage/
pub storage_options: Option<HashMap<String, String>>,
/// (For LanceDB OSS only): use directory namespace manifests as the source
/// of truth for table metadata. Existing directory-listed root tables are
/// migrated into the manifest on access.
pub manifest_enabled: Option<bool>,
/// (For LanceDB OSS only): extra properties for the backing namespace
/// client used by manifest-enabled native connections.
pub namespace_client_properties: Option<HashMap<String, String>>,
/// (For LanceDB OSS only): the session to use for this connection. Holds
/// shared caches and other session-specific state.
pub session: Option<session::Session>,

View File

@@ -15,7 +15,7 @@ name = "_lancedb"
crate-type = ["cdylib"]
[dependencies]
arrow = { version = "57.2", features = ["pyarrow"] }
arrow = { workspace = true, features = ["pyarrow"] }
async-trait = "0.1"
bytes = "1"
lancedb = { path = "../rust/lancedb", default-features = false }
@@ -25,8 +25,8 @@ lance-namespace-impls.workspace = true
lance-io.workspace = true
env_logger.workspace = true
log.workspace = true
pyo3 = { version = "0.26", features = ["extension-module", "abi3-py39"] }
pyo3-async-runtimes = { version = "0.26", features = [
pyo3 = { version = "0.28", features = ["extension-module", "abi3-py39"] }
pyo3-async-runtimes = { version = "0.28", features = [
"attributes",
"tokio-runtime",
] }
@@ -38,7 +38,7 @@ snafu.workspace = true
tokio = { version = "1.40", features = ["sync"] }
[build-dependencies]
pyo3-build-config = { version = "0.26", features = [
pyo3-build-config = { version = "0.28", features = [
"extension-module",
"abi3-py39",
] }

View File

@@ -73,7 +73,6 @@ def connect(
client_config: Union[ClientConfig, Dict[str, Any], None] = None,
storage_options: Optional[Dict[str, str]] = None,
session: Optional[Session] = None,
manifest_enabled: bool = False,
namespace_client_impl: Optional[str] = None,
namespace_client_properties: Optional[Dict[str, str]] = None,
namespace_client_pushdown_operations: Optional[List[str]] = None,
@@ -112,10 +111,6 @@ def connect(
storage_options: dict, optional
Additional options for the storage backend. See available options at
<https://docs.lancedb.com/storage/>
manifest_enabled : bool, default False
When true for local/native connections, use directory namespace
manifests as the source of truth for table metadata. Existing
directory-listed root tables are migrated into the manifest on access.
session: Session, optional
(For LanceDB OSS only)
A session to use for this connection. Sessions allow you to configure
@@ -163,11 +158,11 @@ def connect(
conn : DBConnection
A connection to a LanceDB database.
"""
if namespace_client_impl is not None:
if namespace_client_properties is None:
if namespace_client_impl is not None or namespace_client_properties is not None:
if namespace_client_impl is None or namespace_client_properties is None:
raise ValueError(
"namespace_client_properties must be provided when "
"namespace_client_impl is set"
"Both namespace_client_impl and "
"namespace_client_properties must be provided"
)
if kwargs:
raise ValueError(f"Unknown keyword arguments: {kwargs}")
@@ -180,12 +175,6 @@ def connect(
namespace_client_pushdown_operations=namespace_client_pushdown_operations,
)
if namespace_client_properties is not None and not manifest_enabled:
raise ValueError(
"namespace_client_impl must be provided when using "
"namespace_client_properties unless manifest_enabled=True"
)
if namespace_client_pushdown_operations is not None:
raise ValueError(
"namespace_client_pushdown_operations is only valid when "
@@ -223,8 +212,6 @@ def connect(
read_consistency_interval=read_consistency_interval,
storage_options=storage_options,
session=session,
manifest_enabled=manifest_enabled,
namespace_client_properties=namespace_client_properties,
)
@@ -302,8 +289,6 @@ def deserialize_conn(
parsed["uri"],
read_consistency_interval=rci,
storage_options=storage_options,
manifest_enabled=parsed.get("manifest_enabled", False),
namespace_client_properties=parsed.get("namespace_client_properties"),
)
else:
raise ValueError(f"Unknown connection_type: {connection_type}")
@@ -319,8 +304,6 @@ async def connect_async(
client_config: Optional[Union[ClientConfig, Dict[str, Any]]] = None,
storage_options: Optional[Dict[str, str]] = None,
session: Optional[Session] = None,
manifest_enabled: bool = False,
namespace_client_properties: Optional[Dict[str, str]] = None,
) -> AsyncConnection:
"""Connect to a LanceDB database.
@@ -360,13 +343,6 @@ async def connect_async(
cache sizes for index and metadata caches, which can significantly
impact memory use and performance. They can also be re-used across
multiple connections to share the same cache state.
manifest_enabled : bool, default False
When true for local/native connections, use directory namespace
manifests as the source of truth for table metadata. Existing
directory-listed root tables are migrated into the manifest on access.
namespace_client_properties : dict, optional
Additional directory namespace client properties to use with
``manifest_enabled=True``.
Examples
--------
@@ -409,8 +385,6 @@ async def connect_async(
client_config,
storage_options,
session,
manifest_enabled,
namespace_client_properties,
)
)

View File

@@ -242,8 +242,6 @@ async def connect(
client_config: Optional[Union[ClientConfig, Dict[str, Any]]],
storage_options: Optional[Dict[str, str]],
session: Optional[Session],
manifest_enabled: bool = False,
namespace_client_properties: Optional[Dict[str, str]] = None,
) -> Connection: ...
class RecordBatchStream:

View File

@@ -590,13 +590,8 @@ class LanceDBConnection(DBConnection):
read_consistency_interval: Optional[timedelta] = None,
storage_options: Optional[Dict[str, str]] = None,
session: Optional[Session] = None,
manifest_enabled: bool = False,
namespace_client_properties: Optional[Dict[str, str]] = None,
_inner: Optional[LanceDbConnection] = None,
):
self.storage_options = storage_options
self._manifest_enabled = manifest_enabled
self._namespace_client_properties = namespace_client_properties
if _inner is not None:
self._conn = _inner
self._cached_namespace_client = None
@@ -638,8 +633,6 @@ class LanceDBConnection(DBConnection):
None,
storage_options,
session,
manifest_enabled,
namespace_client_properties,
)
# TODO: It would be nice if we didn't store self.storage_options but it is
@@ -647,6 +640,7 @@ class LanceDBConnection(DBConnection):
# work because some paths like LanceDBConnection.from_inner will lose the
# storage_options. Also, this class really shouldn't be holding any state
# beyond _conn.
self.storage_options = storage_options
self._conn = AsyncConnection(LOOP.run(do_connect()))
self._cached_namespace_client: Optional[LanceNamespace] = None
@@ -683,8 +677,6 @@ class LanceDBConnection(DBConnection):
"connection_type": "local",
"uri": self.uri,
"storage_options": self.storage_options,
"manifest_enabled": self._manifest_enabled,
"namespace_client_properties": self._namespace_client_properties,
"read_consistency_interval_seconds": (
rci.total_seconds() if rci else None
),

View File

@@ -425,35 +425,6 @@ class Permutation:
"""
return Permutation.from_tables(table, None, None)
@classmethod
def from_table(cls, table: LanceTable) -> "_PermutationFromTable":
"""
Create a permutation directly from a base table, with HuggingFace /
PyTorch-style chaining for ``shuffle``, ``filter``, and ``split_*``.
This is a convenience wrapper that hides the two-step
``permutation_builder(table).shuffle().execute()`` /
``Permutation.from_tables(table, perm_tbl)`` dance. The returned object
accumulates builder operations and only materializes the underlying
permutation table on first read (any access of an attribute that is
not a builder operation), so chained calls do not pay an extra
``execute()`` for each step.
After the first read all ``Permutation`` methods (``select_columns``,
``with_format``, ``map``, ``__iter__``, ``fetch``, ``num_rows``, ...)
are forwarded transparently.
Examples
--------
>>> import lancedb
>>> db = lancedb.connect("memory:///")
>>> tbl = db.create_table("tbl", data=[{"x": x} for x in range(100)])
>>> perm = Permutation.from_table(tbl).shuffle(seed=42)
>>> perm.num_rows
100
"""
return _PermutationFromTable(table)
@classmethod
def from_tables(
cls,
@@ -871,85 +842,3 @@ class Permutation:
Repeat the permutation `times` times
"""
raise Exception("with_repeat is not yet implemented")
class _PermutationFromTable:
"""
Result of [Permutation.from_table](#from_table).
Records pending builder operations (``shuffle``, ``filter``, ``split_*``)
and lazily executes them on first read. After materialization all
Permutation reads / transforms (``select_columns``, ``with_format``,
``map``, ``__iter__``, ``fetch``, ``num_rows``, ...) are forwarded to the
underlying [Permutation].
"""
__slots__ = ("_base_table", "_pending_ops", "_materialized")
def __init__(
self,
base_table: LanceTable,
_pending_ops: Optional[list[tuple[str, tuple, dict]]] = None,
):
self._base_table = base_table
self._pending_ops: list[tuple[str, tuple, dict]] = (
list(_pending_ops) if _pending_ops is not None else []
)
self._materialized: Optional[Permutation] = None
def _with_op(
self, name: str, args: tuple, kwargs: dict
) -> "_PermutationFromTable":
return _PermutationFromTable(
self._base_table, _pending_ops=self._pending_ops + [(name, args, kwargs)]
)
def shuffle(self, *args, **kwargs) -> "_PermutationFromTable":
return self._with_op("shuffle", args, kwargs)
def filter(self, *args, **kwargs) -> "_PermutationFromTable":
return self._with_op("filter", args, kwargs)
def split_random(self, *args, **kwargs) -> "_PermutationFromTable":
return self._with_op("split_random", args, kwargs)
def split_sequential(self, *args, **kwargs) -> "_PermutationFromTable":
return self._with_op("split_sequential", args, kwargs)
def split_hash(self, *args, **kwargs) -> "_PermutationFromTable":
return self._with_op("split_hash", args, kwargs)
def split_calculated(self, *args, **kwargs) -> "_PermutationFromTable":
return self._with_op("split_calculated", args, kwargs)
def _materialize(self) -> Permutation:
if self._materialized is None:
if self._pending_ops:
builder = permutation_builder(self._base_table)
for name, args, kwargs in self._pending_ops:
builder = getattr(builder, name)(*args, **kwargs)
perm_tbl = builder.execute()
self._materialized = Permutation.from_tables(
self._base_table, perm_tbl
)
else:
self._materialized = Permutation.identity(self._base_table)
return self._materialized
def __getattr__(self, name: str) -> Any:
# Avoid recursion on dunder/private state.
if name.startswith("_"):
raise AttributeError(name)
return getattr(self._materialize(), name)
def __iter__(self):
return iter(self._materialize())
def __len__(self) -> int:
return len(self._materialize())
def __getitem__(self, index):
return self._materialize()[index]
def __getitems__(self, indices):
return self._materialize().__getitems__(indices)

View File

@@ -1095,60 +1095,3 @@ def test_getitems_invalid_offset(some_permutation: Permutation):
"""Test __getitems__ with an out-of-range offset raises an error."""
with pytest.raises(Exception):
some_permutation.__getitems__([999999])
def test_from_table_identity(mem_db):
"""Permutation.from_table without ops behaves like identity."""
tbl = mem_db.create_table("tbl", pa.table({"x": range(10)}))
perm = Permutation.from_table(tbl)
assert perm.num_rows == 10
assert perm.column_names == ["x"]
def test_from_table_shuffle_seeded(mem_db):
"""from_table().shuffle(seed=...) is reproducible and reorders rows."""
tbl = mem_db.create_table("tbl", pa.table({"x": range(100)}))
perm = Permutation.from_table(tbl).shuffle(seed=42)
rows = [r["x"] for r in perm.__getitems__(list(range(100)))]
assert sorted(rows) == list(range(100))
assert rows != list(range(100))
# Same seed → same order
rows2 = [
r["x"]
for r in Permutation.from_table(tbl)
.shuffle(seed=42)
.__getitems__(list(range(100)))
]
assert rows == rows2
def test_from_table_filter(mem_db):
"""from_table().filter(...) limits the rows."""
tbl = mem_db.create_table("tbl", pa.table({"x": range(100)}))
perm = Permutation.from_table(tbl).filter("x < 25")
assert perm.num_rows == 25
def test_from_table_chained_ops(mem_db):
"""Chained shuffle + filter materializes once."""
tbl = mem_db.create_table("tbl", pa.table({"x": range(100)}))
perm = Permutation.from_table(tbl).filter("x >= 50").shuffle(seed=7)
assert perm.num_rows == 50
rows = [r["x"] for r in perm.__getitems__(list(range(50)))]
assert sorted(rows) == list(range(50, 100))
def test_from_table_forwards_read_methods(mem_db):
"""from_table() result transparently forwards Permutation read methods."""
tbl = mem_db.create_table("tbl", pa.table({"x": range(10), "y": range(10)}))
perm = Permutation.from_table(tbl).select_columns(["x"])
assert perm.column_names == ["x"]
def test_from_table_split_random(mem_db):
"""from_table().split_random(...) returns rows from the first split."""
tbl = mem_db.create_table("tbl", pa.table({"x": range(100)}))
perm = Permutation.from_table(tbl).split_random(ratios=[0.3, 0.7], seed=1)
# Default split is 0 — ratio 0.3 → ~30 rows
assert 25 <= perm.num_rows <= 35

View File

@@ -525,7 +525,7 @@ impl Connection {
}
#[pyfunction]
#[pyo3(signature = (uri, api_key=None, region=None, host_override=None, read_consistency_interval=None, client_config=None, storage_options=None, session=None, manifest_enabled=false, namespace_client_properties=None))]
#[pyo3(signature = (uri, api_key=None, region=None, host_override=None, read_consistency_interval=None, client_config=None, storage_options=None, session=None))]
#[allow(clippy::too_many_arguments)]
pub fn connect(
py: Python<'_>,
@@ -537,8 +537,6 @@ pub fn connect(
client_config: Option<PyClientConfig>,
storage_options: Option<HashMap<String, String>>,
session: Option<crate::session::Session>,
manifest_enabled: bool,
namespace_client_properties: Option<HashMap<String, String>>,
) -> PyResult<Bound<'_, PyAny>> {
future_into_py(py, async move {
let mut builder = lancedb::connect(&uri);
@@ -558,12 +556,6 @@ pub fn connect(
if let Some(storage_options) = storage_options {
builder = builder.storage_options(storage_options);
}
if manifest_enabled {
builder = builder.manifest_enabled(true);
}
if let Some(namespace_client_properties) = namespace_client_properties {
builder = builder.namespace_client_properties(namespace_client_properties);
}
#[cfg(feature = "remote")]
if let Some(client_config) = client_config {
builder = builder.client_config(client_config.into());

View File

@@ -17,7 +17,7 @@ use pyo3::{Bound, PyAny, PyResult, exceptions::PyValueError, prelude::*, pyfunct
/// [`expr_lit`] and combined with the methods on this struct. On the Python
/// side a thin wrapper class (`lancedb.expr.Expr`) delegates to these methods
/// and adds Python operator overloads.
#[pyclass(name = "PyExpr")]
#[pyclass(name = "PyExpr", from_py_object)]
#[derive(Clone)]
pub struct PyExpr(pub DfExpr);

View File

@@ -33,7 +33,7 @@ impl PyHeaderProvider {
Ok(headers_py) => {
// Convert Python dict to Rust HashMap
let bound_headers = headers_py.bind(py);
let dict: &Bound<PyDict> = bound_headers.downcast().map_err(|e| {
let dict: &Bound<PyDict> = bound_headers.cast().map_err(|e| {
format!("HeaderProvider.get_headers must return a dict: {}", e)
})?;

View File

@@ -22,7 +22,7 @@ pub fn class_name(ob: &'_ Bound<'_, PyAny>) -> PyResult<String> {
let full_name = ob
.getattr(intern!(ob.py(), "__class__"))?
.getattr(intern!(ob.py(), "__name__"))?;
let full_name = full_name.downcast()?.to_string_lossy();
let full_name = full_name.cast()?.to_string_lossy();
match full_name.rsplit_once('.') {
Some((_, name)) => Ok(name.to_string()),

View File

@@ -183,7 +183,7 @@ async fn call_py_method_primitive<Req, Resp>(
) -> lance_core::Result<Resp>
where
Req: serde::Serialize + Send + 'static,
Resp: for<'py> pyo3::FromPyObject<'py> + Send + 'static,
Resp: for<'a, 'py> pyo3::FromPyObject<'a, 'py> + Send + 'static,
{
let request_json = serde_json::to_string(&request).map_err(|e| {
lance_core::Error::io(format!(
@@ -203,7 +203,7 @@ where
// Call the Python method
let result = py_namespace.call_method1(py, method_name, (request_arg,))?;
let value: Resp = result.extract(py)?;
let value: Resp = result.extract(py).map_err(Into::into)?;
Ok::<_, PyErr>(value)
})
})

View File

@@ -25,12 +25,12 @@ use pyo3_async_runtimes::tokio::future_into_py;
fn table_from_py<'a>(table: Bound<'a, PyAny>) -> PyResult<Bound<'a, Table>> {
if table.hasattr("_inner")? {
Ok(table.getattr("_inner")?.downcast_into::<Table>()?)
Ok(table.getattr("_inner")?.cast_into::<Table>()?)
} else if table.hasattr("_table")? {
Ok(table
.getattr("_table")?
.getattr("_inner")?
.downcast_into::<Table>()?)
.cast_into::<Table>()?)
} else {
Err(PyRuntimeError::new_err(
"Provided table does not appear to be a Table or RemoteTable instance",
@@ -90,9 +90,9 @@ impl PyAsyncPermutationBuilder {
database
.getattr("_conn")?
.getattr("_inner")?
.downcast_into::<Connection>()?
.cast_into::<Connection>()?
} else {
database.getattr("_inner")?.downcast_into::<Connection>()?
database.getattr("_inner")?.cast_into::<Connection>()?
};
let database = conn.borrow().database()?;
slf.modify(|builder| builder.persist(database, table_name))
@@ -243,7 +243,7 @@ impl PyPermutationReader {
let Some(selection) = selection else {
return Ok(Select::All);
};
let selection = selection.downcast_into::<PyDict>()?;
let selection = selection.cast_into::<PyDict>()?;
let selection = selection
.iter()
.map(|(key, value)| {

View File

@@ -22,6 +22,7 @@ use lancedb::query::{
VectorQuery as LanceDbVectorQuery,
};
use lancedb::table::AnyQuery;
use pyo3::Borrowed;
use pyo3::Bound;
use pyo3::IntoPyObject;
use pyo3::PyAny;
@@ -43,9 +44,11 @@ use crate::util::parse_distance_type;
use crate::{arrow::RecordBatchStream, util::PyLanceDB};
use crate::{error::PythonErrorExt, index::class_name};
impl FromPyObject<'_> for PyLanceDB<FtsQuery> {
fn extract_bound(ob: &Bound<'_, PyAny>) -> PyResult<Self> {
match class_name(ob)?.as_str() {
impl FromPyObject<'_, '_> for PyLanceDB<FtsQuery> {
type Error = PyErr;
fn extract(ob: Borrowed<'_, '_, PyAny>) -> PyResult<Self> {
match class_name(&ob)?.as_str() {
"MatchQuery" => {
let query = ob.getattr("query")?.extract()?;
let column = ob.getattr("column")?.extract()?;
@@ -424,7 +427,7 @@ impl Query {
"Query text is required for nearest_to_text",
))?;
let query = if let Ok(query_text) = fts_query.downcast::<PyString>() {
let query = if let Ok(query_text) = fts_query.cast::<PyString>() {
let mut query_text = query_text.to_string();
let columns = query
.get_item("columns")?
@@ -606,7 +609,7 @@ impl TakeQuery {
}
}
#[pyclass]
#[pyclass(from_py_object)]
#[derive(Clone)]
pub struct FTSQuery {
inner: LanceDbQuery,
@@ -735,7 +738,7 @@ impl FTSQuery {
}
}
#[pyclass]
#[pyclass(from_py_object)]
#[derive(Clone)]
pub struct VectorQuery {
inner: LanceDbVectorQuery,

View File

@@ -11,7 +11,7 @@ use pyo3::{PyResult, pyclass, pymethods};
/// Sessions allow you to configure cache sizes for index and metadata caches,
/// which can significantly impact memory use and performance. They can
/// also be re-used across multiple connections to share the same cache state.
#[pyclass]
#[pyclass(from_py_object)]
#[derive(Clone)]
pub struct Session {
pub(crate) inner: Arc<LanceSession>,

View File

@@ -29,7 +29,7 @@ use pyo3_async_runtimes::tokio::future_into_py;
mod scannable;
/// Statistics about a compaction operation.
#[pyclass(get_all)]
#[pyclass(get_all, from_py_object)]
#[derive(Clone, Debug)]
pub struct CompactionStats {
/// The number of fragments removed
@@ -43,7 +43,7 @@ pub struct CompactionStats {
}
/// Statistics about a cleanup operation
#[pyclass(get_all)]
#[pyclass(get_all, from_py_object)]
#[derive(Clone, Debug)]
pub struct RemovalStats {
/// The number of bytes removed
@@ -53,7 +53,7 @@ pub struct RemovalStats {
}
/// Statistics about an optimize operation
#[pyclass(get_all)]
#[pyclass(get_all, from_py_object)]
#[derive(Clone, Debug)]
pub struct OptimizeStats {
/// Statistics about the compaction operation
@@ -62,7 +62,7 @@ pub struct OptimizeStats {
pub prune: RemovalStats,
}
#[pyclass(get_all)]
#[pyclass(get_all, from_py_object)]
#[derive(Clone, Debug)]
pub struct UpdateResult {
pub rows_updated: u64,
@@ -88,7 +88,7 @@ impl From<lancedb::table::UpdateResult> for UpdateResult {
}
}
#[pyclass(get_all)]
#[pyclass(get_all, from_py_object)]
#[derive(Clone, Debug)]
pub struct AddResult {
pub version: u64,
@@ -109,7 +109,7 @@ impl From<lancedb::table::AddResult> for AddResult {
}
}
#[pyclass(get_all)]
#[pyclass(get_all, from_py_object)]
#[derive(Clone, Debug)]
pub struct DeleteResult {
pub num_deleted_rows: u64,
@@ -135,7 +135,7 @@ impl From<lancedb::table::DeleteResult> for DeleteResult {
}
}
#[pyclass(get_all)]
#[pyclass(get_all, from_py_object)]
#[derive(Clone, Debug)]
pub struct MergeResult {
pub version: u64,
@@ -171,7 +171,7 @@ impl From<lancedb::table::MergeResult> for MergeResult {
}
}
#[pyclass(get_all)]
#[pyclass(get_all, from_py_object)]
#[derive(Clone, Debug)]
pub struct AddColumnsResult {
pub version: u64,
@@ -192,7 +192,7 @@ impl From<lancedb::table::AddColumnsResult> for AddColumnsResult {
}
}
#[pyclass(get_all)]
#[pyclass(get_all, from_py_object)]
#[derive(Clone, Debug)]
pub struct AlterColumnsResult {
pub version: u64,
@@ -213,7 +213,7 @@ impl From<lancedb::table::AlterColumnsResult> for AlterColumnsResult {
}
}
#[pyclass(get_all)]
#[pyclass(get_all, from_py_object)]
#[derive(Clone, Debug)]
pub struct DropColumnsResult {
pub version: u64,

View File

@@ -14,7 +14,7 @@ use lancedb::{
arrow::{SendableRecordBatchStream, SimpleRecordBatchStream},
data::scannable::Scannable,
};
use pyo3::{FromPyObject, Py, PyAny, Python, types::PyAnyMethods};
use pyo3::{Borrowed, FromPyObject, Py, PyAny, Python, types::PyAnyMethods};
/// Adapter that implements Scannable for a Python reader factory callable.
///
@@ -126,8 +126,10 @@ impl Scannable for PyScannable {
}
}
impl<'py> FromPyObject<'py> for PyScannable {
fn extract_bound(ob: &pyo3::Bound<'py, PyAny>) -> pyo3::PyResult<Self> {
impl FromPyObject<'_, '_> for PyScannable {
type Error = pyo3::PyErr;
fn extract(ob: Borrowed<'_, '_, PyAny>) -> pyo3::PyResult<Self> {
// Convert from Scannable dataclass.
let schema: PyArrowType<Schema> = ob.getattr("schema")?.extract()?;
let schema = Arc::new(schema.0);

View File

@@ -111,12 +111,7 @@ default = []
aws = ["lance/aws", "lance-io/aws", "lance-namespace-impls/dir-aws"]
oss = ["lance/oss", "lance-io/oss", "lance-namespace-impls/dir-oss"]
gcs = ["lance/gcp", "lance-io/gcp", "lance-namespace-impls/dir-gcp"]
azure = [
"lance/azure",
"lance-io/azure",
"lance-namespace-impls/dir-azure",
"lance-namespace-impls/credential-vendor-azure",
]
azure = ["lance/azure", "lance-io/azure", "lance-namespace-impls/dir-azure"]
huggingface = [
"lance/huggingface",
"lance-io/huggingface",

View File

@@ -590,15 +590,6 @@ pub struct ConnectRequest {
/// storage options.
pub namespace_client_properties: HashMap<String, String>,
/// Use directory namespace manifests as the source of truth for native
/// LanceDB table metadata.
///
/// When enabled for a local/native connection, LanceDB returns a
/// namespace-backed database directly. Directory listing fallback remains
/// enabled for migration, and directory-listing-to-manifest migration is
/// forced on.
pub manifest_enabled: bool,
/// The interval at which to check for updates from other processes.
///
/// If None, then consistency is not checked. For performance
@@ -639,7 +630,6 @@ impl ConnectBuilder {
read_consistency_interval: None,
options: HashMap::new(),
namespace_client_properties: HashMap::new(),
manifest_enabled: false,
session: None,
},
embedding_registry: None,
@@ -801,17 +791,6 @@ impl ConnectBuilder {
self
}
/// Enable or disable manifest-backed directory namespace mode for local
/// native connections.
///
/// When enabled, the connection uses the directory namespace database
/// directly for all table operations and forces
/// `dir_listing_to_manifest_migration_enabled=true`.
pub fn manifest_enabled(mut self, enabled: bool) -> Self {
self.request.manifest_enabled = enabled;
self
}
/// The interval at which to check for updates from other processes. This
/// only affects LanceDB OSS.
///
@@ -907,16 +886,6 @@ impl ConnectBuilder {
pub async fn execute(self) -> Result<Connection> {
if self.request.uri.starts_with("db") {
self.execute_remote()
} else if self.request.manifest_enabled {
let internal = Arc::new(
ListingDatabase::connect_manifest_enabled_namespace_database(&self.request).await?,
);
Ok(Connection {
internal,
embedding_registry: self
.embedding_registry
.unwrap_or_else(|| Arc::new(MemoryRegistry::new())),
})
} else {
let internal = Arc::new(ListingDatabase::connect_with_options(&self.request).await?);
Ok(Connection {
@@ -1163,9 +1132,6 @@ mod tests {
use lance_testing::datagen::{BatchGenerator, IncrementingInt32};
use tempfile::tempdir;
use crate::database::listing::{ListingDatabaseOptions, OPT_NEW_TABLE_V2_MANIFEST_PATHS};
use crate::database::namespace::LanceNamespaceDatabase;
use crate::table::NativeTable;
use crate::test_utils::connection::new_test_connection;
use super::*;
@@ -1238,147 +1204,6 @@ mod tests {
);
}
#[tokio::test]
async fn test_connect_with_manifest_enabled_uses_directory_namespace() {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let db = connect(uri)
.manifest_enabled(true)
.storage_option("timeout", "30s")
.namespace_client_property("manifest_enabled", "false")
.namespace_client_property("dir_listing_to_manifest_migration_enabled", "false")
.execute()
.await
.unwrap();
assert!(
db.database()
.as_any()
.downcast_ref::<LanceNamespaceDatabase>()
.is_some()
);
assert_eq!(db.uri(), uri);
let (ns_impl, properties) = db.namespace_client_config().await.unwrap();
assert_eq!(ns_impl, "dir");
assert_eq!(properties.get("root"), Some(&uri.to_string()));
assert_eq!(
properties.get("manifest_enabled"),
Some(&"true".to_string())
);
assert_eq!(
properties.get("dir_listing_to_manifest_migration_enabled"),
Some(&"true".to_string())
);
assert_eq!(properties.get("storage.timeout"), Some(&"30s".to_string()));
}
#[tokio::test]
async fn test_manifest_enabled_rejects_commit_engine_uri() {
let Err(err) = connect("s3+ddb://bucket/db?ddbTableName=manifest")
.manifest_enabled(true)
.execute()
.await
else {
panic!("expected manifest-enabled s3+ddb connection to fail");
};
assert!(
matches!(err, Error::NotSupported { message } if message.contains("commit engine URI schemes"))
);
let Err(err) = connect("s3://bucket/db?engine=ddb&ddbTableName=manifest")
.manifest_enabled(true)
.execute()
.await
else {
panic!("expected manifest-enabled engine query connection to fail");
};
assert!(
matches!(err, Error::NotSupported { message } if message.contains("commit engine"))
);
}
#[tokio::test]
async fn test_manifest_enabled_connection_migrates_root_listing_table() {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)]));
connect(uri)
.execute()
.await
.unwrap()
.create_empty_table("legacy", schema)
.execute()
.await
.unwrap();
let db = connect(uri).manifest_enabled(true).execute().await.unwrap();
let tables = db.table_names().execute().await.unwrap();
assert_eq!(tables, vec!["legacy".to_string()]);
db.open_table("legacy").execute().await.unwrap();
}
#[tokio::test]
async fn test_manifest_enabled_preserves_new_table_options() {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let options = ListingDatabaseOptions::builder()
.enable_v2_manifest_paths(true)
.build();
let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)]));
let table = connect(uri)
.manifest_enabled(true)
.database_options(&options)
.execute()
.await
.unwrap()
.create_empty_table("v1_manifest", schema)
.storage_option(OPT_NEW_TABLE_V2_MANIFEST_PATHS, "false")
.execute()
.await
.unwrap();
let native_table = table
.base_table()
.as_any()
.downcast_ref::<NativeTable>()
.unwrap();
assert!(!native_table.uses_v2_manifest_paths().await.unwrap());
}
#[tokio::test]
async fn test_manifest_enabled_vend_input_storage_options() {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)]));
let table = connect(uri)
.manifest_enabled(true)
.storage_option("test_storage_option", "test_value")
.namespace_client_property("vend_input_storage_options", "true")
.namespace_client_property(
"vend_input_storage_options_refresh_interval_millis",
"60000",
)
.execute()
.await
.unwrap()
.create_empty_table("vended", schema)
.execute()
.await
.unwrap();
let storage_options = table.latest_storage_options().await.unwrap().unwrap();
assert_eq!(
storage_options.get("test_storage_option"),
Some(&"test_value".to_string())
);
assert!(storage_options.contains_key("expires_at_millis"));
}
#[tokio::test]
async fn test_table_names() {
let tc = new_test_connection().await.unwrap();

View File

@@ -285,7 +285,7 @@ const MIRRORED_STORE: &str = "mirroredStore";
/// A connection to LanceDB
impl ListingDatabase {
pub(crate) fn build_namespace_client_properties(
fn build_namespace_client_properties(
uri: &str,
storage_options: &HashMap<String, String>,
namespace_client_properties: HashMap<String, String>,
@@ -298,24 +298,6 @@ impl ListingDatabase {
properties
}
pub(crate) fn build_manifest_enabled_namespace_client_properties(
uri: &str,
storage_options: &HashMap<String, String>,
namespace_client_properties: HashMap<String, String>,
) -> HashMap<String, String> {
let mut properties = Self::build_namespace_client_properties(
uri,
storage_options,
namespace_client_properties,
);
properties.insert("manifest_enabled".to_string(), "true".to_string());
properties.insert(
"dir_listing_to_manifest_migration_enabled".to_string(),
"true".to_string(),
);
properties
}
async fn connect_namespace_database(
uri: &str,
storage_options: HashMap<String, String>,
@@ -341,119 +323,6 @@ impl ListingDatabase {
))
}
async fn prepare_namespace_root(
uri: &str,
storage_options: &HashMap<String, String>,
session: Arc<lance::session::Session>,
) -> Result<String> {
match url::Url::parse(uri) {
Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
let (object_store, _) = ObjectStore::from_uri_and_params(
session.store_registry(),
uri,
&ObjectStoreParams::default(),
)
.await?;
if object_store.is_local() {
Self::try_create_dir(uri).context(CreateDirSnafu { path: uri })?;
}
Ok(uri.to_string())
}
Ok(mut url) => {
if url.scheme().contains('+') {
return Err(Error::NotSupported {
message: "commit engine URI schemes are not supported for manifest-enabled namespace connections".to_string(),
});
}
for (key, value) in url.query_pairs() {
if key == ENGINE {
return Err(Error::NotSupported {
message: format!(
"commit engine '{}' is not supported for manifest-enabled namespace connections",
value
),
});
} else if key == MIRRORED_STORE {
return Err(Error::NotSupported {
message: "mirrored store is not supported for manifest-enabled namespace connections"
.to_string(),
});
}
}
url.set_query(None);
let plain_uri = url.to_string();
let os_params = ObjectStoreParams {
storage_options_accessor: if storage_options.is_empty() {
None
} else {
Some(Arc::new(StorageOptionsAccessor::with_static_options(
storage_options.clone(),
)))
},
..Default::default()
};
let (object_store, _) = ObjectStore::from_uri_and_params(
session.store_registry(),
&plain_uri,
&os_params,
)
.await?;
if object_store.is_local() {
Self::try_create_dir(&plain_uri).context(CreateDirSnafu {
path: plain_uri.clone(),
})?;
}
Ok(plain_uri)
}
Err(_) => {
let (object_store, _) = ObjectStore::from_uri_and_params(
session.store_registry(),
uri,
&ObjectStoreParams::default(),
)
.await?;
if object_store.is_local() {
Self::try_create_dir(uri).context(CreateDirSnafu { path: uri })?;
}
Ok(uri.to_string())
}
}
}
pub(crate) async fn connect_manifest_enabled_namespace_database(
request: &ConnectRequest,
) -> Result<LanceNamespaceDatabase> {
let options = ListingDatabaseOptions::parse_from_map(&request.options)?;
let session = request
.session
.clone()
.unwrap_or_else(|| Arc::new(lance::session::Session::default()));
let namespace_root =
Self::prepare_namespace_root(&request.uri, &options.storage_options, session.clone())
.await?;
let ns_properties = Self::build_manifest_enabled_namespace_client_properties(
&namespace_root,
&options.storage_options,
request.namespace_client_properties.clone(),
);
LanceNamespaceDatabase::connect_with_new_table_config(
"dir",
ns_properties,
options.storage_options,
request.read_consistency_interval,
Some(session),
HashSet::new(),
options.new_table_config,
)
.await
.map(|db| db.with_uri(request.uri.clone()))
}
/// Connect to a listing database
///
/// The URI should be a path to a directory where the tables are stored.
@@ -821,12 +690,15 @@ impl ListingDatabase {
store_params.storage_options_accessor = Some(Arc::new(accessor));
}
write_params.data_storage_version = storage_version_override
.or(write_params.data_storage_version)
.or(self.new_table_config.data_storage_version);
write_params.data_storage_version = self
.new_table_config
.data_storage_version
.or(storage_version_override);
if let Some(enable_v2_manifest_paths) =
v2_manifest_override.or(self.new_table_config.enable_v2_manifest_paths)
if let Some(enable_v2_manifest_paths) = self
.new_table_config
.enable_v2_manifest_paths
.or(v2_manifest_override)
{
write_params.enable_v2_manifest_paths = enable_v2_manifest_paths;
}
@@ -1286,7 +1158,6 @@ mod tests {
client_config: Default::default(),
options: Default::default(),
namespace_client_properties: Default::default(),
manifest_enabled: false,
read_consistency_interval: None,
session: None,
};
@@ -1421,7 +1292,6 @@ mod tests {
client_config: Default::default(),
options: options.clone(),
namespace_client_properties: Default::default(),
manifest_enabled: false,
read_consistency_interval: None,
session: None,
};
@@ -1957,7 +1827,6 @@ mod tests {
client_config: Default::default(),
options,
namespace_client_properties: Default::default(),
manifest_enabled: false,
read_consistency_interval: None,
session: None,
};
@@ -2064,7 +1933,6 @@ mod tests {
client_config: Default::default(),
options,
namespace_client_properties: Default::default(),
manifest_enabled: false,
read_consistency_interval: None,
session: None,
};
@@ -2137,7 +2005,6 @@ mod tests {
client_config: Default::default(),
options,
namespace_client_properties: Default::default(),
manifest_enabled: false,
read_consistency_interval: None,
session: None,
};
@@ -2335,7 +2202,6 @@ mod tests {
client_config: Default::default(),
options: Default::default(),
namespace_client_properties,
manifest_enabled: false,
read_consistency_interval: None,
session: None,
};

View File

@@ -24,10 +24,6 @@ use lance_table::io::commit::external_manifest::ExternalManifestCommitHandler;
use crate::connection::NamespaceClientPushdownOperation;
use crate::database::ReadConsistency;
use crate::database::listing::{
NewTableConfig, OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS, OPT_NEW_TABLE_STORAGE_VERSION,
OPT_NEW_TABLE_V2_MANIFEST_PATHS,
};
use crate::error::{Error, Result};
use crate::table::NativeTable;
use lance::dataset::WriteMode;
@@ -54,8 +50,6 @@ pub struct LanceNamespaceDatabase {
ns_impl: String,
// Namespace properties used to construct the namespace client
ns_properties: HashMap<String, String>,
// Options for tables created by this connection
new_table_config: NewTableConfig,
}
impl LanceNamespaceDatabase {
@@ -77,15 +71,9 @@ impl LanceNamespaceDatabase {
pushdown_operations: namespace_client_pushdown_operations,
ns_impl: namespace_client_impl,
ns_properties: namespace_client_properties,
new_table_config: NewTableConfig::default(),
}
}
pub(crate) fn with_uri(mut self, uri: impl Into<String>) -> Self {
self.uri = uri.into();
self
}
pub async fn connect(
ns_impl: &str,
ns_properties: HashMap<String, String>,
@@ -93,27 +81,6 @@ impl LanceNamespaceDatabase {
read_consistency_interval: Option<std::time::Duration>,
session: Option<Arc<lance::session::Session>>,
pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
) -> Result<Self> {
Self::connect_with_new_table_config(
ns_impl,
ns_properties,
storage_options,
read_consistency_interval,
session,
pushdown_operations,
NewTableConfig::default(),
)
.await
}
pub(crate) async fn connect_with_new_table_config(
ns_impl: &str,
ns_properties: HashMap<String, String>,
storage_options: HashMap<String, String>,
read_consistency_interval: Option<std::time::Duration>,
session: Option<Arc<lance::session::Session>>,
pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
new_table_config: NewTableConfig,
) -> Result<Self> {
let mut builder = ConnectBuilder::new(ns_impl);
for (key, value) in ns_properties.clone() {
@@ -135,79 +102,8 @@ impl LanceNamespaceDatabase {
pushdown_operations,
ns_impl: ns_impl.to_string(),
ns_properties,
new_table_config,
})
}
fn extract_storage_overrides(
&self,
request: &DbCreateTableRequest,
) -> Result<(
Option<lance_encoding::version::LanceFileVersion>,
Option<bool>,
Option<bool>,
)> {
let storage_options = request
.write_options
.lance_write_params
.as_ref()
.and_then(|p| p.store_params.as_ref())
.and_then(|sp| sp.storage_options());
let storage_version_override = storage_options
.and_then(|opts| opts.get(OPT_NEW_TABLE_STORAGE_VERSION))
.map(|s| s.parse::<lance_encoding::version::LanceFileVersion>())
.transpose()?;
let v2_manifest_override = storage_options
.and_then(|opts| opts.get(OPT_NEW_TABLE_V2_MANIFEST_PATHS))
.map(|s| s.parse::<bool>())
.transpose()
.map_err(|_| Error::InvalidInput {
message: "enable_v2_manifest_paths must be a boolean".to_string(),
})?;
let stable_row_ids_override = storage_options
.and_then(|opts| opts.get(OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS))
.map(|s| s.parse::<bool>())
.transpose()
.map_err(|_| Error::InvalidInput {
message: "enable_stable_row_ids must be a boolean".to_string(),
})?;
Ok((
storage_version_override,
v2_manifest_override,
stable_row_ids_override,
))
}
fn apply_new_table_config(
&self,
params: &mut lance::dataset::WriteParams,
request: &DbCreateTableRequest,
) -> Result<()> {
let (storage_version_override, v2_manifest_override, stable_row_ids_override) =
self.extract_storage_overrides(request)?;
params.data_storage_version = storage_version_override
.or(params.data_storage_version)
.or(self.new_table_config.data_storage_version);
if let Some(enable_v2_manifest_paths) =
v2_manifest_override.or(self.new_table_config.enable_v2_manifest_paths)
{
params.enable_v2_manifest_paths = enable_v2_manifest_paths;
}
if let Some(enable_stable_row_ids) =
stable_row_ids_override.or(self.new_table_config.enable_stable_row_ids)
{
params.enable_stable_row_ids = enable_stable_row_ids;
}
Ok(())
}
}
impl std::fmt::Debug for LanceNamespaceDatabase {
@@ -403,12 +299,7 @@ impl Database for LanceNamespaceDatabase {
};
// Build write params with storage options and commit handler
let mut params = request
.write_options
.lance_write_params
.clone()
.unwrap_or_default();
self.apply_new_table_config(&mut params, &request)?;
let mut params = request.write_options.lance_write_params.unwrap_or_default();
if matches!(request.mode, CreateTableMode::Overwrite) {
params.mode = WriteMode::Overwrite;

View File

@@ -43,7 +43,7 @@ pub struct RemoteInsertExec<S: HttpSend = Sender> {
client: RestfulLanceDbClient<S>,
input: Arc<dyn ExecutionPlan>,
overwrite: bool,
properties: PlanProperties,
properties: Arc<PlanProperties>,
add_result: Arc<Mutex<Option<AddResult>>>,
metrics: ExecutionPlanMetricsSet,
upload_id: Option<String>,
@@ -105,12 +105,12 @@ impl<S: HttpSend + 'static> RemoteInsertExec<S> {
1
};
let schema = COUNT_SCHEMA.clone();
let properties = PlanProperties::new(
let properties = Arc::new(PlanProperties::new(
EquivalenceProperties::new(schema),
datafusion_physical_plan::Partitioning::UnknownPartitioning(num_partitions),
datafusion_physical_plan::execution_plan::EmissionType::Final,
datafusion_physical_plan::execution_plan::Boundedness::Bounded,
);
));
Self {
table_name,
@@ -232,7 +232,7 @@ impl<S: HttpSend + 'static> ExecutionPlan for RemoteInsertExec<S> {
self
}
fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.properties
}

View File

@@ -39,21 +39,26 @@ use lance_index::scalar::FullTextSearchQuery;
struct MetadataEraserExec {
input: Arc<dyn ExecutionPlan>,
schema: Arc<ArrowSchema>,
properties: PlanProperties,
properties: Arc<PlanProperties>,
}
impl MetadataEraserExec {
fn compute_properties_from_input(
input: &Arc<dyn ExecutionPlan>,
schema: &Arc<ArrowSchema>,
) -> PlanProperties {
) -> Arc<PlanProperties> {
let input_properties = input.properties();
let eq_properties = input_properties
.eq_properties
.clone()
.with_new_schema(schema.clone())
.unwrap();
input_properties.clone().with_eq_properties(eq_properties)
Arc::new(
input_properties
.as_ref()
.clone()
.with_eq_properties(eq_properties),
)
}
fn new(input: Arc<dyn ExecutionPlan>) -> Self {
@@ -87,7 +92,7 @@ impl ExecutionPlan for MetadataEraserExec {
self
}
fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.properties
}

View File

@@ -81,7 +81,7 @@ pub struct InsertExec {
dataset: Arc<Dataset>,
input: Arc<dyn ExecutionPlan>,
write_params: WriteParams,
properties: PlanProperties,
properties: Arc<PlanProperties>,
partial_transactions: Arc<Mutex<Vec<Transaction>>>,
metrics: ExecutionPlanMetricsSet,
}
@@ -95,12 +95,12 @@ impl InsertExec {
) -> Self {
let schema = COUNT_SCHEMA.clone();
let num_partitions = input.output_partitioning().partition_count();
let properties = PlanProperties::new(
let properties = Arc::new(PlanProperties::new(
EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(num_partitions),
EmissionType::Final,
Boundedness::Bounded,
);
));
Self {
ds_wrapper,
@@ -136,7 +136,7 @@ impl ExecutionPlan for InsertExec {
self
}
fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.properties
}

View File

@@ -20,7 +20,7 @@ pub(crate) struct ScannableExec {
// We don't require Scannable to be Sync, so we wrap it in a Mutex to allow safe concurrent access.
source: Mutex<Box<dyn Scannable>>,
num_rows: Option<usize>,
properties: PlanProperties,
properties: Arc<PlanProperties>,
tracker: Option<Arc<WriteProgressTracker>>,
}
@@ -37,12 +37,12 @@ impl ScannableExec {
pub fn new(source: Box<dyn Scannable>, tracker: Option<Arc<WriteProgressTracker>>) -> Self {
let schema = source.schema();
let eq_properties = EquivalenceProperties::new(schema);
let properties = PlanProperties::new(
let properties = Arc::new(PlanProperties::new(
eq_properties,
Partitioning::UnknownPartitioning(1),
EmissionType::Incremental,
datafusion_physical_plan::execution_plan::Boundedness::Bounded,
);
));
let num_rows = source.num_rows();
let source = Mutex::new(source);
@@ -70,7 +70,7 @@ impl ExecutionPlan for ScannableExec {
self
}
fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.properties
}