mirror of
https://github.com/lancedb/lancedb.git
synced 2026-05-14 02:20:40 +00:00
Compare commits
1 Commits
ticket/324
...
codex/upda
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2974b7e5c6 |
1277
Cargo.lock
generated
1277
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
62
Cargo.toml
62
Cargo.toml
@@ -13,40 +13,40 @@ categories = ["database-implementations"]
|
||||
rust-version = "1.91.0"
|
||||
|
||||
[workspace.dependencies]
|
||||
lance = { "version" = "=6.0.0-beta.4", default-features = false, "tag" = "v6.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-core = { "version" = "=6.0.0-beta.4", "tag" = "v6.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datagen = { "version" = "=6.0.0-beta.4", "tag" = "v6.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-file = { "version" = "=6.0.0-beta.4", "tag" = "v6.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-io = { "version" = "=6.0.0-beta.4", default-features = false, "tag" = "v6.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-index = { "version" = "=6.0.0-beta.4", "tag" = "v6.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-linalg = { "version" = "=6.0.0-beta.4", "tag" = "v6.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace = { "version" = "=6.0.0-beta.4", "tag" = "v6.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace-impls = { "version" = "=6.0.0-beta.4", default-features = false, "tag" = "v6.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-table = { "version" = "=6.0.0-beta.4", "tag" = "v6.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-testing = { "version" = "=6.0.0-beta.4", "tag" = "v6.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datafusion = { "version" = "=6.0.0-beta.4", "tag" = "v6.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-encoding = { "version" = "=6.0.0-beta.4", "tag" = "v6.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-arrow = { "version" = "=6.0.0-beta.4", "tag" = "v6.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance = { "version" = "=6.0.0-beta.6", default-features = false, "tag" = "v6.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-core = { "version" = "=6.0.0-beta.6", "tag" = "v6.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datagen = { "version" = "=6.0.0-beta.6", "tag" = "v6.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-file = { "version" = "=6.0.0-beta.6", "tag" = "v6.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-io = { "version" = "=6.0.0-beta.6", default-features = false, "tag" = "v6.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-index = { "version" = "=6.0.0-beta.6", "tag" = "v6.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-linalg = { "version" = "=6.0.0-beta.6", "tag" = "v6.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace = { "version" = "=6.0.0-beta.6", "tag" = "v6.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace-impls = { "version" = "=6.0.0-beta.6", default-features = false, "tag" = "v6.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-table = { "version" = "=6.0.0-beta.6", "tag" = "v6.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-testing = { "version" = "=6.0.0-beta.6", "tag" = "v6.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datafusion = { "version" = "=6.0.0-beta.6", "tag" = "v6.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-encoding = { "version" = "=6.0.0-beta.6", "tag" = "v6.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-arrow = { "version" = "=6.0.0-beta.6", "tag" = "v6.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
|
||||
ahash = "0.8"
|
||||
# Note that this one does not include pyarrow
|
||||
arrow = { version = "57.2", optional = false }
|
||||
arrow-array = "57.2"
|
||||
arrow-data = "57.2"
|
||||
arrow-ipc = "57.2"
|
||||
arrow-ord = "57.2"
|
||||
arrow-schema = "57.2"
|
||||
arrow-select = "57.2"
|
||||
arrow-cast = "57.2"
|
||||
arrow = { version = "58.1", optional = false }
|
||||
arrow-array = "58.1"
|
||||
arrow-data = "58.1"
|
||||
arrow-ipc = "58.1"
|
||||
arrow-ord = "58.1"
|
||||
arrow-schema = "58.1"
|
||||
arrow-select = "58.1"
|
||||
arrow-cast = "58.1"
|
||||
async-trait = "0"
|
||||
datafusion = { version = "52.1", default-features = false }
|
||||
datafusion-catalog = "52.1"
|
||||
datafusion-common = { version = "52.1", default-features = false }
|
||||
datafusion-execution = "52.1"
|
||||
datafusion-expr = "52.1"
|
||||
datafusion-functions = "52.1"
|
||||
datafusion-physical-plan = "52.1"
|
||||
datafusion-physical-expr = "52.1"
|
||||
datafusion-sql = "52.1"
|
||||
datafusion = { version = "53.1", default-features = false }
|
||||
datafusion-catalog = "53.1"
|
||||
datafusion-common = { version = "53.1", default-features = false }
|
||||
datafusion-execution = "53.1"
|
||||
datafusion-expr = "53.1"
|
||||
datafusion-functions = "53.1"
|
||||
datafusion-physical-plan = "53.1"
|
||||
datafusion-physical-expr = "53.1"
|
||||
datafusion-sql = "53.1"
|
||||
env_logger = "0.11"
|
||||
half = { "version" = "2.7.1", default-features = false, features = [
|
||||
"num-traits",
|
||||
|
||||
@@ -41,29 +41,6 @@ for testing purposes.
|
||||
|
||||
***
|
||||
|
||||
### manifestEnabled?
|
||||
|
||||
```ts
|
||||
optional manifestEnabled: boolean;
|
||||
```
|
||||
|
||||
(For LanceDB OSS only): use directory namespace manifests as the source
|
||||
of truth for table metadata. Existing directory-listed root tables are
|
||||
migrated into the manifest on access.
|
||||
|
||||
***
|
||||
|
||||
### namespaceClientProperties?
|
||||
|
||||
```ts
|
||||
optional namespaceClientProperties: Record<string, string>;
|
||||
```
|
||||
|
||||
(For LanceDB OSS only): extra properties for the backing namespace
|
||||
client used by manifest-enabled native connections.
|
||||
|
||||
***
|
||||
|
||||
### readConsistencyInterval?
|
||||
|
||||
```ts
|
||||
|
||||
@@ -28,7 +28,7 @@
|
||||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<arrow.version>15.0.0</arrow.version>
|
||||
<lance-core.version>6.0.0-beta.4</lance-core.version>
|
||||
<lance-core.version>6.0.0-beta.6</lance-core.version>
|
||||
<spotless.skip>false</spotless.skip>
|
||||
<spotless.version>2.30.0</spotless.version>
|
||||
<spotless.java.googlejavaformat.version>1.7</spotless.java.googlejavaformat.version>
|
||||
|
||||
@@ -16,7 +16,7 @@ crate-type = ["cdylib"]
|
||||
async-trait.workspace = true
|
||||
arrow-ipc.workspace = true
|
||||
arrow-array.workspace = true
|
||||
arrow-buffer = "57.2"
|
||||
arrow-buffer = "58.1"
|
||||
half.workspace = true
|
||||
arrow-schema.workspace = true
|
||||
env_logger.workspace = true
|
||||
|
||||
@@ -67,12 +67,6 @@ impl Connection {
|
||||
builder = builder.storage_option(key, value);
|
||||
}
|
||||
}
|
||||
if let Some(manifest_enabled) = options.manifest_enabled {
|
||||
builder = builder.manifest_enabled(manifest_enabled);
|
||||
}
|
||||
if let Some(namespace_client_properties) = options.namespace_client_properties {
|
||||
builder = builder.namespace_client_properties(namespace_client_properties);
|
||||
}
|
||||
|
||||
// Create client config, optionally with header provider
|
||||
let client_config = options.client_config.unwrap_or_default();
|
||||
|
||||
@@ -37,13 +37,6 @@ pub struct ConnectionOptions {
|
||||
///
|
||||
/// The available options are described at https://docs.lancedb.com/storage/
|
||||
pub storage_options: Option<HashMap<String, String>>,
|
||||
/// (For LanceDB OSS only): use directory namespace manifests as the source
|
||||
/// of truth for table metadata. Existing directory-listed root tables are
|
||||
/// migrated into the manifest on access.
|
||||
pub manifest_enabled: Option<bool>,
|
||||
/// (For LanceDB OSS only): extra properties for the backing namespace
|
||||
/// client used by manifest-enabled native connections.
|
||||
pub namespace_client_properties: Option<HashMap<String, String>>,
|
||||
/// (For LanceDB OSS only): the session to use for this connection. Holds
|
||||
/// shared caches and other session-specific state.
|
||||
pub session: Option<session::Session>,
|
||||
|
||||
@@ -15,7 +15,7 @@ name = "_lancedb"
|
||||
crate-type = ["cdylib"]
|
||||
|
||||
[dependencies]
|
||||
arrow = { version = "57.2", features = ["pyarrow"] }
|
||||
arrow = { workspace = true, features = ["pyarrow"] }
|
||||
async-trait = "0.1"
|
||||
bytes = "1"
|
||||
lancedb = { path = "../rust/lancedb", default-features = false }
|
||||
@@ -25,8 +25,8 @@ lance-namespace-impls.workspace = true
|
||||
lance-io.workspace = true
|
||||
env_logger.workspace = true
|
||||
log.workspace = true
|
||||
pyo3 = { version = "0.26", features = ["extension-module", "abi3-py39"] }
|
||||
pyo3-async-runtimes = { version = "0.26", features = [
|
||||
pyo3 = { version = "0.28", features = ["extension-module", "abi3-py39"] }
|
||||
pyo3-async-runtimes = { version = "0.28", features = [
|
||||
"attributes",
|
||||
"tokio-runtime",
|
||||
] }
|
||||
@@ -38,7 +38,7 @@ snafu.workspace = true
|
||||
tokio = { version = "1.40", features = ["sync"] }
|
||||
|
||||
[build-dependencies]
|
||||
pyo3-build-config = { version = "0.26", features = [
|
||||
pyo3-build-config = { version = "0.28", features = [
|
||||
"extension-module",
|
||||
"abi3-py39",
|
||||
] }
|
||||
|
||||
@@ -73,7 +73,6 @@ def connect(
|
||||
client_config: Union[ClientConfig, Dict[str, Any], None] = None,
|
||||
storage_options: Optional[Dict[str, str]] = None,
|
||||
session: Optional[Session] = None,
|
||||
manifest_enabled: bool = False,
|
||||
namespace_client_impl: Optional[str] = None,
|
||||
namespace_client_properties: Optional[Dict[str, str]] = None,
|
||||
namespace_client_pushdown_operations: Optional[List[str]] = None,
|
||||
@@ -112,10 +111,6 @@ def connect(
|
||||
storage_options: dict, optional
|
||||
Additional options for the storage backend. See available options at
|
||||
<https://docs.lancedb.com/storage/>
|
||||
manifest_enabled : bool, default False
|
||||
When true for local/native connections, use directory namespace
|
||||
manifests as the source of truth for table metadata. Existing
|
||||
directory-listed root tables are migrated into the manifest on access.
|
||||
session: Session, optional
|
||||
(For LanceDB OSS only)
|
||||
A session to use for this connection. Sessions allow you to configure
|
||||
@@ -163,11 +158,11 @@ def connect(
|
||||
conn : DBConnection
|
||||
A connection to a LanceDB database.
|
||||
"""
|
||||
if namespace_client_impl is not None:
|
||||
if namespace_client_properties is None:
|
||||
if namespace_client_impl is not None or namespace_client_properties is not None:
|
||||
if namespace_client_impl is None or namespace_client_properties is None:
|
||||
raise ValueError(
|
||||
"namespace_client_properties must be provided when "
|
||||
"namespace_client_impl is set"
|
||||
"Both namespace_client_impl and "
|
||||
"namespace_client_properties must be provided"
|
||||
)
|
||||
if kwargs:
|
||||
raise ValueError(f"Unknown keyword arguments: {kwargs}")
|
||||
@@ -180,12 +175,6 @@ def connect(
|
||||
namespace_client_pushdown_operations=namespace_client_pushdown_operations,
|
||||
)
|
||||
|
||||
if namespace_client_properties is not None and not manifest_enabled:
|
||||
raise ValueError(
|
||||
"namespace_client_impl must be provided when using "
|
||||
"namespace_client_properties unless manifest_enabled=True"
|
||||
)
|
||||
|
||||
if namespace_client_pushdown_operations is not None:
|
||||
raise ValueError(
|
||||
"namespace_client_pushdown_operations is only valid when "
|
||||
@@ -223,8 +212,6 @@ def connect(
|
||||
read_consistency_interval=read_consistency_interval,
|
||||
storage_options=storage_options,
|
||||
session=session,
|
||||
manifest_enabled=manifest_enabled,
|
||||
namespace_client_properties=namespace_client_properties,
|
||||
)
|
||||
|
||||
|
||||
@@ -302,8 +289,6 @@ def deserialize_conn(
|
||||
parsed["uri"],
|
||||
read_consistency_interval=rci,
|
||||
storage_options=storage_options,
|
||||
manifest_enabled=parsed.get("manifest_enabled", False),
|
||||
namespace_client_properties=parsed.get("namespace_client_properties"),
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"Unknown connection_type: {connection_type}")
|
||||
@@ -319,8 +304,6 @@ async def connect_async(
|
||||
client_config: Optional[Union[ClientConfig, Dict[str, Any]]] = None,
|
||||
storage_options: Optional[Dict[str, str]] = None,
|
||||
session: Optional[Session] = None,
|
||||
manifest_enabled: bool = False,
|
||||
namespace_client_properties: Optional[Dict[str, str]] = None,
|
||||
) -> AsyncConnection:
|
||||
"""Connect to a LanceDB database.
|
||||
|
||||
@@ -360,13 +343,6 @@ async def connect_async(
|
||||
cache sizes for index and metadata caches, which can significantly
|
||||
impact memory use and performance. They can also be re-used across
|
||||
multiple connections to share the same cache state.
|
||||
manifest_enabled : bool, default False
|
||||
When true for local/native connections, use directory namespace
|
||||
manifests as the source of truth for table metadata. Existing
|
||||
directory-listed root tables are migrated into the manifest on access.
|
||||
namespace_client_properties : dict, optional
|
||||
Additional directory namespace client properties to use with
|
||||
``manifest_enabled=True``.
|
||||
|
||||
Examples
|
||||
--------
|
||||
@@ -409,8 +385,6 @@ async def connect_async(
|
||||
client_config,
|
||||
storage_options,
|
||||
session,
|
||||
manifest_enabled,
|
||||
namespace_client_properties,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@@ -242,8 +242,6 @@ async def connect(
|
||||
client_config: Optional[Union[ClientConfig, Dict[str, Any]]],
|
||||
storage_options: Optional[Dict[str, str]],
|
||||
session: Optional[Session],
|
||||
manifest_enabled: bool = False,
|
||||
namespace_client_properties: Optional[Dict[str, str]] = None,
|
||||
) -> Connection: ...
|
||||
|
||||
class RecordBatchStream:
|
||||
|
||||
@@ -590,13 +590,8 @@ class LanceDBConnection(DBConnection):
|
||||
read_consistency_interval: Optional[timedelta] = None,
|
||||
storage_options: Optional[Dict[str, str]] = None,
|
||||
session: Optional[Session] = None,
|
||||
manifest_enabled: bool = False,
|
||||
namespace_client_properties: Optional[Dict[str, str]] = None,
|
||||
_inner: Optional[LanceDbConnection] = None,
|
||||
):
|
||||
self.storage_options = storage_options
|
||||
self._manifest_enabled = manifest_enabled
|
||||
self._namespace_client_properties = namespace_client_properties
|
||||
if _inner is not None:
|
||||
self._conn = _inner
|
||||
self._cached_namespace_client = None
|
||||
@@ -638,8 +633,6 @@ class LanceDBConnection(DBConnection):
|
||||
None,
|
||||
storage_options,
|
||||
session,
|
||||
manifest_enabled,
|
||||
namespace_client_properties,
|
||||
)
|
||||
|
||||
# TODO: It would be nice if we didn't store self.storage_options but it is
|
||||
@@ -647,6 +640,7 @@ class LanceDBConnection(DBConnection):
|
||||
# work because some paths like LanceDBConnection.from_inner will lose the
|
||||
# storage_options. Also, this class really shouldn't be holding any state
|
||||
# beyond _conn.
|
||||
self.storage_options = storage_options
|
||||
self._conn = AsyncConnection(LOOP.run(do_connect()))
|
||||
self._cached_namespace_client: Optional[LanceNamespace] = None
|
||||
|
||||
@@ -683,8 +677,6 @@ class LanceDBConnection(DBConnection):
|
||||
"connection_type": "local",
|
||||
"uri": self.uri,
|
||||
"storage_options": self.storage_options,
|
||||
"manifest_enabled": self._manifest_enabled,
|
||||
"namespace_client_properties": self._namespace_client_properties,
|
||||
"read_consistency_interval_seconds": (
|
||||
rci.total_seconds() if rci else None
|
||||
),
|
||||
|
||||
@@ -425,35 +425,6 @@ class Permutation:
|
||||
"""
|
||||
return Permutation.from_tables(table, None, None)
|
||||
|
||||
@classmethod
|
||||
def from_table(cls, table: LanceTable) -> "_PermutationFromTable":
|
||||
"""
|
||||
Create a permutation directly from a base table, with HuggingFace /
|
||||
PyTorch-style chaining for ``shuffle``, ``filter``, and ``split_*``.
|
||||
|
||||
This is a convenience wrapper that hides the two-step
|
||||
``permutation_builder(table).shuffle().execute()`` /
|
||||
``Permutation.from_tables(table, perm_tbl)`` dance. The returned object
|
||||
accumulates builder operations and only materializes the underlying
|
||||
permutation table on first read (any access of an attribute that is
|
||||
not a builder operation), so chained calls do not pay an extra
|
||||
``execute()`` for each step.
|
||||
|
||||
After the first read all ``Permutation`` methods (``select_columns``,
|
||||
``with_format``, ``map``, ``__iter__``, ``fetch``, ``num_rows``, ...)
|
||||
are forwarded transparently.
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> import lancedb
|
||||
>>> db = lancedb.connect("memory:///")
|
||||
>>> tbl = db.create_table("tbl", data=[{"x": x} for x in range(100)])
|
||||
>>> perm = Permutation.from_table(tbl).shuffle(seed=42)
|
||||
>>> perm.num_rows
|
||||
100
|
||||
"""
|
||||
return _PermutationFromTable(table)
|
||||
|
||||
@classmethod
|
||||
def from_tables(
|
||||
cls,
|
||||
@@ -871,85 +842,3 @@ class Permutation:
|
||||
Repeat the permutation `times` times
|
||||
"""
|
||||
raise Exception("with_repeat is not yet implemented")
|
||||
|
||||
|
||||
class _PermutationFromTable:
|
||||
"""
|
||||
Result of [Permutation.from_table](#from_table).
|
||||
|
||||
Records pending builder operations (``shuffle``, ``filter``, ``split_*``)
|
||||
and lazily executes them on first read. After materialization all
|
||||
Permutation reads / transforms (``select_columns``, ``with_format``,
|
||||
``map``, ``__iter__``, ``fetch``, ``num_rows``, ...) are forwarded to the
|
||||
underlying [Permutation].
|
||||
"""
|
||||
|
||||
__slots__ = ("_base_table", "_pending_ops", "_materialized")
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
base_table: LanceTable,
|
||||
_pending_ops: Optional[list[tuple[str, tuple, dict]]] = None,
|
||||
):
|
||||
self._base_table = base_table
|
||||
self._pending_ops: list[tuple[str, tuple, dict]] = (
|
||||
list(_pending_ops) if _pending_ops is not None else []
|
||||
)
|
||||
self._materialized: Optional[Permutation] = None
|
||||
|
||||
def _with_op(
|
||||
self, name: str, args: tuple, kwargs: dict
|
||||
) -> "_PermutationFromTable":
|
||||
return _PermutationFromTable(
|
||||
self._base_table, _pending_ops=self._pending_ops + [(name, args, kwargs)]
|
||||
)
|
||||
|
||||
def shuffle(self, *args, **kwargs) -> "_PermutationFromTable":
|
||||
return self._with_op("shuffle", args, kwargs)
|
||||
|
||||
def filter(self, *args, **kwargs) -> "_PermutationFromTable":
|
||||
return self._with_op("filter", args, kwargs)
|
||||
|
||||
def split_random(self, *args, **kwargs) -> "_PermutationFromTable":
|
||||
return self._with_op("split_random", args, kwargs)
|
||||
|
||||
def split_sequential(self, *args, **kwargs) -> "_PermutationFromTable":
|
||||
return self._with_op("split_sequential", args, kwargs)
|
||||
|
||||
def split_hash(self, *args, **kwargs) -> "_PermutationFromTable":
|
||||
return self._with_op("split_hash", args, kwargs)
|
||||
|
||||
def split_calculated(self, *args, **kwargs) -> "_PermutationFromTable":
|
||||
return self._with_op("split_calculated", args, kwargs)
|
||||
|
||||
def _materialize(self) -> Permutation:
|
||||
if self._materialized is None:
|
||||
if self._pending_ops:
|
||||
builder = permutation_builder(self._base_table)
|
||||
for name, args, kwargs in self._pending_ops:
|
||||
builder = getattr(builder, name)(*args, **kwargs)
|
||||
perm_tbl = builder.execute()
|
||||
self._materialized = Permutation.from_tables(
|
||||
self._base_table, perm_tbl
|
||||
)
|
||||
else:
|
||||
self._materialized = Permutation.identity(self._base_table)
|
||||
return self._materialized
|
||||
|
||||
def __getattr__(self, name: str) -> Any:
|
||||
# Avoid recursion on dunder/private state.
|
||||
if name.startswith("_"):
|
||||
raise AttributeError(name)
|
||||
return getattr(self._materialize(), name)
|
||||
|
||||
def __iter__(self):
|
||||
return iter(self._materialize())
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self._materialize())
|
||||
|
||||
def __getitem__(self, index):
|
||||
return self._materialize()[index]
|
||||
|
||||
def __getitems__(self, indices):
|
||||
return self._materialize().__getitems__(indices)
|
||||
|
||||
@@ -1095,60 +1095,3 @@ def test_getitems_invalid_offset(some_permutation: Permutation):
|
||||
"""Test __getitems__ with an out-of-range offset raises an error."""
|
||||
with pytest.raises(Exception):
|
||||
some_permutation.__getitems__([999999])
|
||||
|
||||
|
||||
def test_from_table_identity(mem_db):
|
||||
"""Permutation.from_table without ops behaves like identity."""
|
||||
tbl = mem_db.create_table("tbl", pa.table({"x": range(10)}))
|
||||
perm = Permutation.from_table(tbl)
|
||||
assert perm.num_rows == 10
|
||||
assert perm.column_names == ["x"]
|
||||
|
||||
|
||||
def test_from_table_shuffle_seeded(mem_db):
|
||||
"""from_table().shuffle(seed=...) is reproducible and reorders rows."""
|
||||
tbl = mem_db.create_table("tbl", pa.table({"x": range(100)}))
|
||||
perm = Permutation.from_table(tbl).shuffle(seed=42)
|
||||
rows = [r["x"] for r in perm.__getitems__(list(range(100)))]
|
||||
assert sorted(rows) == list(range(100))
|
||||
assert rows != list(range(100))
|
||||
|
||||
# Same seed → same order
|
||||
rows2 = [
|
||||
r["x"]
|
||||
for r in Permutation.from_table(tbl)
|
||||
.shuffle(seed=42)
|
||||
.__getitems__(list(range(100)))
|
||||
]
|
||||
assert rows == rows2
|
||||
|
||||
|
||||
def test_from_table_filter(mem_db):
|
||||
"""from_table().filter(...) limits the rows."""
|
||||
tbl = mem_db.create_table("tbl", pa.table({"x": range(100)}))
|
||||
perm = Permutation.from_table(tbl).filter("x < 25")
|
||||
assert perm.num_rows == 25
|
||||
|
||||
|
||||
def test_from_table_chained_ops(mem_db):
|
||||
"""Chained shuffle + filter materializes once."""
|
||||
tbl = mem_db.create_table("tbl", pa.table({"x": range(100)}))
|
||||
perm = Permutation.from_table(tbl).filter("x >= 50").shuffle(seed=7)
|
||||
assert perm.num_rows == 50
|
||||
rows = [r["x"] for r in perm.__getitems__(list(range(50)))]
|
||||
assert sorted(rows) == list(range(50, 100))
|
||||
|
||||
|
||||
def test_from_table_forwards_read_methods(mem_db):
|
||||
"""from_table() result transparently forwards Permutation read methods."""
|
||||
tbl = mem_db.create_table("tbl", pa.table({"x": range(10), "y": range(10)}))
|
||||
perm = Permutation.from_table(tbl).select_columns(["x"])
|
||||
assert perm.column_names == ["x"]
|
||||
|
||||
|
||||
def test_from_table_split_random(mem_db):
|
||||
"""from_table().split_random(...) returns rows from the first split."""
|
||||
tbl = mem_db.create_table("tbl", pa.table({"x": range(100)}))
|
||||
perm = Permutation.from_table(tbl).split_random(ratios=[0.3, 0.7], seed=1)
|
||||
# Default split is 0 — ratio 0.3 → ~30 rows
|
||||
assert 25 <= perm.num_rows <= 35
|
||||
|
||||
@@ -525,7 +525,7 @@ impl Connection {
|
||||
}
|
||||
|
||||
#[pyfunction]
|
||||
#[pyo3(signature = (uri, api_key=None, region=None, host_override=None, read_consistency_interval=None, client_config=None, storage_options=None, session=None, manifest_enabled=false, namespace_client_properties=None))]
|
||||
#[pyo3(signature = (uri, api_key=None, region=None, host_override=None, read_consistency_interval=None, client_config=None, storage_options=None, session=None))]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn connect(
|
||||
py: Python<'_>,
|
||||
@@ -537,8 +537,6 @@ pub fn connect(
|
||||
client_config: Option<PyClientConfig>,
|
||||
storage_options: Option<HashMap<String, String>>,
|
||||
session: Option<crate::session::Session>,
|
||||
manifest_enabled: bool,
|
||||
namespace_client_properties: Option<HashMap<String, String>>,
|
||||
) -> PyResult<Bound<'_, PyAny>> {
|
||||
future_into_py(py, async move {
|
||||
let mut builder = lancedb::connect(&uri);
|
||||
@@ -558,12 +556,6 @@ pub fn connect(
|
||||
if let Some(storage_options) = storage_options {
|
||||
builder = builder.storage_options(storage_options);
|
||||
}
|
||||
if manifest_enabled {
|
||||
builder = builder.manifest_enabled(true);
|
||||
}
|
||||
if let Some(namespace_client_properties) = namespace_client_properties {
|
||||
builder = builder.namespace_client_properties(namespace_client_properties);
|
||||
}
|
||||
#[cfg(feature = "remote")]
|
||||
if let Some(client_config) = client_config {
|
||||
builder = builder.client_config(client_config.into());
|
||||
|
||||
@@ -17,7 +17,7 @@ use pyo3::{Bound, PyAny, PyResult, exceptions::PyValueError, prelude::*, pyfunct
|
||||
/// [`expr_lit`] and combined with the methods on this struct. On the Python
|
||||
/// side a thin wrapper class (`lancedb.expr.Expr`) delegates to these methods
|
||||
/// and adds Python operator overloads.
|
||||
#[pyclass(name = "PyExpr")]
|
||||
#[pyclass(name = "PyExpr", from_py_object)]
|
||||
#[derive(Clone)]
|
||||
pub struct PyExpr(pub DfExpr);
|
||||
|
||||
|
||||
@@ -33,7 +33,7 @@ impl PyHeaderProvider {
|
||||
Ok(headers_py) => {
|
||||
// Convert Python dict to Rust HashMap
|
||||
let bound_headers = headers_py.bind(py);
|
||||
let dict: &Bound<PyDict> = bound_headers.downcast().map_err(|e| {
|
||||
let dict: &Bound<PyDict> = bound_headers.cast().map_err(|e| {
|
||||
format!("HeaderProvider.get_headers must return a dict: {}", e)
|
||||
})?;
|
||||
|
||||
|
||||
@@ -22,7 +22,7 @@ pub fn class_name(ob: &'_ Bound<'_, PyAny>) -> PyResult<String> {
|
||||
let full_name = ob
|
||||
.getattr(intern!(ob.py(), "__class__"))?
|
||||
.getattr(intern!(ob.py(), "__name__"))?;
|
||||
let full_name = full_name.downcast()?.to_string_lossy();
|
||||
let full_name = full_name.cast()?.to_string_lossy();
|
||||
|
||||
match full_name.rsplit_once('.') {
|
||||
Some((_, name)) => Ok(name.to_string()),
|
||||
|
||||
@@ -183,7 +183,7 @@ async fn call_py_method_primitive<Req, Resp>(
|
||||
) -> lance_core::Result<Resp>
|
||||
where
|
||||
Req: serde::Serialize + Send + 'static,
|
||||
Resp: for<'py> pyo3::FromPyObject<'py> + Send + 'static,
|
||||
Resp: for<'a, 'py> pyo3::FromPyObject<'a, 'py> + Send + 'static,
|
||||
{
|
||||
let request_json = serde_json::to_string(&request).map_err(|e| {
|
||||
lance_core::Error::io(format!(
|
||||
@@ -203,7 +203,7 @@ where
|
||||
|
||||
// Call the Python method
|
||||
let result = py_namespace.call_method1(py, method_name, (request_arg,))?;
|
||||
let value: Resp = result.extract(py)?;
|
||||
let value: Resp = result.extract(py).map_err(Into::into)?;
|
||||
Ok::<_, PyErr>(value)
|
||||
})
|
||||
})
|
||||
|
||||
@@ -25,12 +25,12 @@ use pyo3_async_runtimes::tokio::future_into_py;
|
||||
|
||||
fn table_from_py<'a>(table: Bound<'a, PyAny>) -> PyResult<Bound<'a, Table>> {
|
||||
if table.hasattr("_inner")? {
|
||||
Ok(table.getattr("_inner")?.downcast_into::<Table>()?)
|
||||
Ok(table.getattr("_inner")?.cast_into::<Table>()?)
|
||||
} else if table.hasattr("_table")? {
|
||||
Ok(table
|
||||
.getattr("_table")?
|
||||
.getattr("_inner")?
|
||||
.downcast_into::<Table>()?)
|
||||
.cast_into::<Table>()?)
|
||||
} else {
|
||||
Err(PyRuntimeError::new_err(
|
||||
"Provided table does not appear to be a Table or RemoteTable instance",
|
||||
@@ -90,9 +90,9 @@ impl PyAsyncPermutationBuilder {
|
||||
database
|
||||
.getattr("_conn")?
|
||||
.getattr("_inner")?
|
||||
.downcast_into::<Connection>()?
|
||||
.cast_into::<Connection>()?
|
||||
} else {
|
||||
database.getattr("_inner")?.downcast_into::<Connection>()?
|
||||
database.getattr("_inner")?.cast_into::<Connection>()?
|
||||
};
|
||||
let database = conn.borrow().database()?;
|
||||
slf.modify(|builder| builder.persist(database, table_name))
|
||||
@@ -243,7 +243,7 @@ impl PyPermutationReader {
|
||||
let Some(selection) = selection else {
|
||||
return Ok(Select::All);
|
||||
};
|
||||
let selection = selection.downcast_into::<PyDict>()?;
|
||||
let selection = selection.cast_into::<PyDict>()?;
|
||||
let selection = selection
|
||||
.iter()
|
||||
.map(|(key, value)| {
|
||||
|
||||
@@ -22,6 +22,7 @@ use lancedb::query::{
|
||||
VectorQuery as LanceDbVectorQuery,
|
||||
};
|
||||
use lancedb::table::AnyQuery;
|
||||
use pyo3::Borrowed;
|
||||
use pyo3::Bound;
|
||||
use pyo3::IntoPyObject;
|
||||
use pyo3::PyAny;
|
||||
@@ -43,9 +44,11 @@ use crate::util::parse_distance_type;
|
||||
use crate::{arrow::RecordBatchStream, util::PyLanceDB};
|
||||
use crate::{error::PythonErrorExt, index::class_name};
|
||||
|
||||
impl FromPyObject<'_> for PyLanceDB<FtsQuery> {
|
||||
fn extract_bound(ob: &Bound<'_, PyAny>) -> PyResult<Self> {
|
||||
match class_name(ob)?.as_str() {
|
||||
impl FromPyObject<'_, '_> for PyLanceDB<FtsQuery> {
|
||||
type Error = PyErr;
|
||||
|
||||
fn extract(ob: Borrowed<'_, '_, PyAny>) -> PyResult<Self> {
|
||||
match class_name(&ob)?.as_str() {
|
||||
"MatchQuery" => {
|
||||
let query = ob.getattr("query")?.extract()?;
|
||||
let column = ob.getattr("column")?.extract()?;
|
||||
@@ -424,7 +427,7 @@ impl Query {
|
||||
"Query text is required for nearest_to_text",
|
||||
))?;
|
||||
|
||||
let query = if let Ok(query_text) = fts_query.downcast::<PyString>() {
|
||||
let query = if let Ok(query_text) = fts_query.cast::<PyString>() {
|
||||
let mut query_text = query_text.to_string();
|
||||
let columns = query
|
||||
.get_item("columns")?
|
||||
@@ -606,7 +609,7 @@ impl TakeQuery {
|
||||
}
|
||||
}
|
||||
|
||||
#[pyclass]
|
||||
#[pyclass(from_py_object)]
|
||||
#[derive(Clone)]
|
||||
pub struct FTSQuery {
|
||||
inner: LanceDbQuery,
|
||||
@@ -735,7 +738,7 @@ impl FTSQuery {
|
||||
}
|
||||
}
|
||||
|
||||
#[pyclass]
|
||||
#[pyclass(from_py_object)]
|
||||
#[derive(Clone)]
|
||||
pub struct VectorQuery {
|
||||
inner: LanceDbVectorQuery,
|
||||
|
||||
@@ -11,7 +11,7 @@ use pyo3::{PyResult, pyclass, pymethods};
|
||||
/// Sessions allow you to configure cache sizes for index and metadata caches,
|
||||
/// which can significantly impact memory use and performance. They can
|
||||
/// also be re-used across multiple connections to share the same cache state.
|
||||
#[pyclass]
|
||||
#[pyclass(from_py_object)]
|
||||
#[derive(Clone)]
|
||||
pub struct Session {
|
||||
pub(crate) inner: Arc<LanceSession>,
|
||||
|
||||
@@ -29,7 +29,7 @@ use pyo3_async_runtimes::tokio::future_into_py;
|
||||
mod scannable;
|
||||
|
||||
/// Statistics about a compaction operation.
|
||||
#[pyclass(get_all)]
|
||||
#[pyclass(get_all, from_py_object)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct CompactionStats {
|
||||
/// The number of fragments removed
|
||||
@@ -43,7 +43,7 @@ pub struct CompactionStats {
|
||||
}
|
||||
|
||||
/// Statistics about a cleanup operation
|
||||
#[pyclass(get_all)]
|
||||
#[pyclass(get_all, from_py_object)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct RemovalStats {
|
||||
/// The number of bytes removed
|
||||
@@ -53,7 +53,7 @@ pub struct RemovalStats {
|
||||
}
|
||||
|
||||
/// Statistics about an optimize operation
|
||||
#[pyclass(get_all)]
|
||||
#[pyclass(get_all, from_py_object)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct OptimizeStats {
|
||||
/// Statistics about the compaction operation
|
||||
@@ -62,7 +62,7 @@ pub struct OptimizeStats {
|
||||
pub prune: RemovalStats,
|
||||
}
|
||||
|
||||
#[pyclass(get_all)]
|
||||
#[pyclass(get_all, from_py_object)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct UpdateResult {
|
||||
pub rows_updated: u64,
|
||||
@@ -88,7 +88,7 @@ impl From<lancedb::table::UpdateResult> for UpdateResult {
|
||||
}
|
||||
}
|
||||
|
||||
#[pyclass(get_all)]
|
||||
#[pyclass(get_all, from_py_object)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct AddResult {
|
||||
pub version: u64,
|
||||
@@ -109,7 +109,7 @@ impl From<lancedb::table::AddResult> for AddResult {
|
||||
}
|
||||
}
|
||||
|
||||
#[pyclass(get_all)]
|
||||
#[pyclass(get_all, from_py_object)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct DeleteResult {
|
||||
pub num_deleted_rows: u64,
|
||||
@@ -135,7 +135,7 @@ impl From<lancedb::table::DeleteResult> for DeleteResult {
|
||||
}
|
||||
}
|
||||
|
||||
#[pyclass(get_all)]
|
||||
#[pyclass(get_all, from_py_object)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct MergeResult {
|
||||
pub version: u64,
|
||||
@@ -171,7 +171,7 @@ impl From<lancedb::table::MergeResult> for MergeResult {
|
||||
}
|
||||
}
|
||||
|
||||
#[pyclass(get_all)]
|
||||
#[pyclass(get_all, from_py_object)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct AddColumnsResult {
|
||||
pub version: u64,
|
||||
@@ -192,7 +192,7 @@ impl From<lancedb::table::AddColumnsResult> for AddColumnsResult {
|
||||
}
|
||||
}
|
||||
|
||||
#[pyclass(get_all)]
|
||||
#[pyclass(get_all, from_py_object)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct AlterColumnsResult {
|
||||
pub version: u64,
|
||||
@@ -213,7 +213,7 @@ impl From<lancedb::table::AlterColumnsResult> for AlterColumnsResult {
|
||||
}
|
||||
}
|
||||
|
||||
#[pyclass(get_all)]
|
||||
#[pyclass(get_all, from_py_object)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct DropColumnsResult {
|
||||
pub version: u64,
|
||||
|
||||
@@ -14,7 +14,7 @@ use lancedb::{
|
||||
arrow::{SendableRecordBatchStream, SimpleRecordBatchStream},
|
||||
data::scannable::Scannable,
|
||||
};
|
||||
use pyo3::{FromPyObject, Py, PyAny, Python, types::PyAnyMethods};
|
||||
use pyo3::{Borrowed, FromPyObject, Py, PyAny, Python, types::PyAnyMethods};
|
||||
|
||||
/// Adapter that implements Scannable for a Python reader factory callable.
|
||||
///
|
||||
@@ -126,8 +126,10 @@ impl Scannable for PyScannable {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'py> FromPyObject<'py> for PyScannable {
|
||||
fn extract_bound(ob: &pyo3::Bound<'py, PyAny>) -> pyo3::PyResult<Self> {
|
||||
impl FromPyObject<'_, '_> for PyScannable {
|
||||
type Error = pyo3::PyErr;
|
||||
|
||||
fn extract(ob: Borrowed<'_, '_, PyAny>) -> pyo3::PyResult<Self> {
|
||||
// Convert from Scannable dataclass.
|
||||
let schema: PyArrowType<Schema> = ob.getattr("schema")?.extract()?;
|
||||
let schema = Arc::new(schema.0);
|
||||
|
||||
@@ -111,12 +111,7 @@ default = []
|
||||
aws = ["lance/aws", "lance-io/aws", "lance-namespace-impls/dir-aws"]
|
||||
oss = ["lance/oss", "lance-io/oss", "lance-namespace-impls/dir-oss"]
|
||||
gcs = ["lance/gcp", "lance-io/gcp", "lance-namespace-impls/dir-gcp"]
|
||||
azure = [
|
||||
"lance/azure",
|
||||
"lance-io/azure",
|
||||
"lance-namespace-impls/dir-azure",
|
||||
"lance-namespace-impls/credential-vendor-azure",
|
||||
]
|
||||
azure = ["lance/azure", "lance-io/azure", "lance-namespace-impls/dir-azure"]
|
||||
huggingface = [
|
||||
"lance/huggingface",
|
||||
"lance-io/huggingface",
|
||||
|
||||
@@ -590,15 +590,6 @@ pub struct ConnectRequest {
|
||||
/// storage options.
|
||||
pub namespace_client_properties: HashMap<String, String>,
|
||||
|
||||
/// Use directory namespace manifests as the source of truth for native
|
||||
/// LanceDB table metadata.
|
||||
///
|
||||
/// When enabled for a local/native connection, LanceDB returns a
|
||||
/// namespace-backed database directly. Directory listing fallback remains
|
||||
/// enabled for migration, and directory-listing-to-manifest migration is
|
||||
/// forced on.
|
||||
pub manifest_enabled: bool,
|
||||
|
||||
/// The interval at which to check for updates from other processes.
|
||||
///
|
||||
/// If None, then consistency is not checked. For performance
|
||||
@@ -639,7 +630,6 @@ impl ConnectBuilder {
|
||||
read_consistency_interval: None,
|
||||
options: HashMap::new(),
|
||||
namespace_client_properties: HashMap::new(),
|
||||
manifest_enabled: false,
|
||||
session: None,
|
||||
},
|
||||
embedding_registry: None,
|
||||
@@ -801,17 +791,6 @@ impl ConnectBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
/// Enable or disable manifest-backed directory namespace mode for local
|
||||
/// native connections.
|
||||
///
|
||||
/// When enabled, the connection uses the directory namespace database
|
||||
/// directly for all table operations and forces
|
||||
/// `dir_listing_to_manifest_migration_enabled=true`.
|
||||
pub fn manifest_enabled(mut self, enabled: bool) -> Self {
|
||||
self.request.manifest_enabled = enabled;
|
||||
self
|
||||
}
|
||||
|
||||
/// The interval at which to check for updates from other processes. This
|
||||
/// only affects LanceDB OSS.
|
||||
///
|
||||
@@ -907,16 +886,6 @@ impl ConnectBuilder {
|
||||
pub async fn execute(self) -> Result<Connection> {
|
||||
if self.request.uri.starts_with("db") {
|
||||
self.execute_remote()
|
||||
} else if self.request.manifest_enabled {
|
||||
let internal = Arc::new(
|
||||
ListingDatabase::connect_manifest_enabled_namespace_database(&self.request).await?,
|
||||
);
|
||||
Ok(Connection {
|
||||
internal,
|
||||
embedding_registry: self
|
||||
.embedding_registry
|
||||
.unwrap_or_else(|| Arc::new(MemoryRegistry::new())),
|
||||
})
|
||||
} else {
|
||||
let internal = Arc::new(ListingDatabase::connect_with_options(&self.request).await?);
|
||||
Ok(Connection {
|
||||
@@ -1163,9 +1132,6 @@ mod tests {
|
||||
use lance_testing::datagen::{BatchGenerator, IncrementingInt32};
|
||||
use tempfile::tempdir;
|
||||
|
||||
use crate::database::listing::{ListingDatabaseOptions, OPT_NEW_TABLE_V2_MANIFEST_PATHS};
|
||||
use crate::database::namespace::LanceNamespaceDatabase;
|
||||
use crate::table::NativeTable;
|
||||
use crate::test_utils::connection::new_test_connection;
|
||||
|
||||
use super::*;
|
||||
@@ -1238,147 +1204,6 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_connect_with_manifest_enabled_uses_directory_namespace() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
|
||||
let db = connect(uri)
|
||||
.manifest_enabled(true)
|
||||
.storage_option("timeout", "30s")
|
||||
.namespace_client_property("manifest_enabled", "false")
|
||||
.namespace_client_property("dir_listing_to_manifest_migration_enabled", "false")
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(
|
||||
db.database()
|
||||
.as_any()
|
||||
.downcast_ref::<LanceNamespaceDatabase>()
|
||||
.is_some()
|
||||
);
|
||||
assert_eq!(db.uri(), uri);
|
||||
|
||||
let (ns_impl, properties) = db.namespace_client_config().await.unwrap();
|
||||
assert_eq!(ns_impl, "dir");
|
||||
assert_eq!(properties.get("root"), Some(&uri.to_string()));
|
||||
assert_eq!(
|
||||
properties.get("manifest_enabled"),
|
||||
Some(&"true".to_string())
|
||||
);
|
||||
assert_eq!(
|
||||
properties.get("dir_listing_to_manifest_migration_enabled"),
|
||||
Some(&"true".to_string())
|
||||
);
|
||||
assert_eq!(properties.get("storage.timeout"), Some(&"30s".to_string()));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_manifest_enabled_rejects_commit_engine_uri() {
|
||||
let Err(err) = connect("s3+ddb://bucket/db?ddbTableName=manifest")
|
||||
.manifest_enabled(true)
|
||||
.execute()
|
||||
.await
|
||||
else {
|
||||
panic!("expected manifest-enabled s3+ddb connection to fail");
|
||||
};
|
||||
assert!(
|
||||
matches!(err, Error::NotSupported { message } if message.contains("commit engine URI schemes"))
|
||||
);
|
||||
|
||||
let Err(err) = connect("s3://bucket/db?engine=ddb&ddbTableName=manifest")
|
||||
.manifest_enabled(true)
|
||||
.execute()
|
||||
.await
|
||||
else {
|
||||
panic!("expected manifest-enabled engine query connection to fail");
|
||||
};
|
||||
assert!(
|
||||
matches!(err, Error::NotSupported { message } if message.contains("commit engine"))
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_manifest_enabled_connection_migrates_root_listing_table() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)]));
|
||||
|
||||
connect(uri)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap()
|
||||
.create_empty_table("legacy", schema)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let db = connect(uri).manifest_enabled(true).execute().await.unwrap();
|
||||
let tables = db.table_names().execute().await.unwrap();
|
||||
assert_eq!(tables, vec!["legacy".to_string()]);
|
||||
db.open_table("legacy").execute().await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_manifest_enabled_preserves_new_table_options() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
let options = ListingDatabaseOptions::builder()
|
||||
.enable_v2_manifest_paths(true)
|
||||
.build();
|
||||
let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)]));
|
||||
|
||||
let table = connect(uri)
|
||||
.manifest_enabled(true)
|
||||
.database_options(&options)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap()
|
||||
.create_empty_table("v1_manifest", schema)
|
||||
.storage_option(OPT_NEW_TABLE_V2_MANIFEST_PATHS, "false")
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let native_table = table
|
||||
.base_table()
|
||||
.as_any()
|
||||
.downcast_ref::<NativeTable>()
|
||||
.unwrap();
|
||||
assert!(!native_table.uses_v2_manifest_paths().await.unwrap());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_manifest_enabled_vend_input_storage_options() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)]));
|
||||
|
||||
let table = connect(uri)
|
||||
.manifest_enabled(true)
|
||||
.storage_option("test_storage_option", "test_value")
|
||||
.namespace_client_property("vend_input_storage_options", "true")
|
||||
.namespace_client_property(
|
||||
"vend_input_storage_options_refresh_interval_millis",
|
||||
"60000",
|
||||
)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap()
|
||||
.create_empty_table("vended", schema)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let storage_options = table.latest_storage_options().await.unwrap().unwrap();
|
||||
assert_eq!(
|
||||
storage_options.get("test_storage_option"),
|
||||
Some(&"test_value".to_string())
|
||||
);
|
||||
assert!(storage_options.contains_key("expires_at_millis"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_table_names() {
|
||||
let tc = new_test_connection().await.unwrap();
|
||||
|
||||
@@ -285,7 +285,7 @@ const MIRRORED_STORE: &str = "mirroredStore";
|
||||
|
||||
/// A connection to LanceDB
|
||||
impl ListingDatabase {
|
||||
pub(crate) fn build_namespace_client_properties(
|
||||
fn build_namespace_client_properties(
|
||||
uri: &str,
|
||||
storage_options: &HashMap<String, String>,
|
||||
namespace_client_properties: HashMap<String, String>,
|
||||
@@ -298,24 +298,6 @@ impl ListingDatabase {
|
||||
properties
|
||||
}
|
||||
|
||||
pub(crate) fn build_manifest_enabled_namespace_client_properties(
|
||||
uri: &str,
|
||||
storage_options: &HashMap<String, String>,
|
||||
namespace_client_properties: HashMap<String, String>,
|
||||
) -> HashMap<String, String> {
|
||||
let mut properties = Self::build_namespace_client_properties(
|
||||
uri,
|
||||
storage_options,
|
||||
namespace_client_properties,
|
||||
);
|
||||
properties.insert("manifest_enabled".to_string(), "true".to_string());
|
||||
properties.insert(
|
||||
"dir_listing_to_manifest_migration_enabled".to_string(),
|
||||
"true".to_string(),
|
||||
);
|
||||
properties
|
||||
}
|
||||
|
||||
async fn connect_namespace_database(
|
||||
uri: &str,
|
||||
storage_options: HashMap<String, String>,
|
||||
@@ -341,119 +323,6 @@ impl ListingDatabase {
|
||||
))
|
||||
}
|
||||
|
||||
async fn prepare_namespace_root(
|
||||
uri: &str,
|
||||
storage_options: &HashMap<String, String>,
|
||||
session: Arc<lance::session::Session>,
|
||||
) -> Result<String> {
|
||||
match url::Url::parse(uri) {
|
||||
Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
|
||||
let (object_store, _) = ObjectStore::from_uri_and_params(
|
||||
session.store_registry(),
|
||||
uri,
|
||||
&ObjectStoreParams::default(),
|
||||
)
|
||||
.await?;
|
||||
if object_store.is_local() {
|
||||
Self::try_create_dir(uri).context(CreateDirSnafu { path: uri })?;
|
||||
}
|
||||
Ok(uri.to_string())
|
||||
}
|
||||
Ok(mut url) => {
|
||||
if url.scheme().contains('+') {
|
||||
return Err(Error::NotSupported {
|
||||
message: "commit engine URI schemes are not supported for manifest-enabled namespace connections".to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
for (key, value) in url.query_pairs() {
|
||||
if key == ENGINE {
|
||||
return Err(Error::NotSupported {
|
||||
message: format!(
|
||||
"commit engine '{}' is not supported for manifest-enabled namespace connections",
|
||||
value
|
||||
),
|
||||
});
|
||||
} else if key == MIRRORED_STORE {
|
||||
return Err(Error::NotSupported {
|
||||
message: "mirrored store is not supported for manifest-enabled namespace connections"
|
||||
.to_string(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
url.set_query(None);
|
||||
let plain_uri = url.to_string();
|
||||
|
||||
let os_params = ObjectStoreParams {
|
||||
storage_options_accessor: if storage_options.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(Arc::new(StorageOptionsAccessor::with_static_options(
|
||||
storage_options.clone(),
|
||||
)))
|
||||
},
|
||||
..Default::default()
|
||||
};
|
||||
let (object_store, _) = ObjectStore::from_uri_and_params(
|
||||
session.store_registry(),
|
||||
&plain_uri,
|
||||
&os_params,
|
||||
)
|
||||
.await?;
|
||||
if object_store.is_local() {
|
||||
Self::try_create_dir(&plain_uri).context(CreateDirSnafu {
|
||||
path: plain_uri.clone(),
|
||||
})?;
|
||||
}
|
||||
|
||||
Ok(plain_uri)
|
||||
}
|
||||
Err(_) => {
|
||||
let (object_store, _) = ObjectStore::from_uri_and_params(
|
||||
session.store_registry(),
|
||||
uri,
|
||||
&ObjectStoreParams::default(),
|
||||
)
|
||||
.await?;
|
||||
if object_store.is_local() {
|
||||
Self::try_create_dir(uri).context(CreateDirSnafu { path: uri })?;
|
||||
}
|
||||
Ok(uri.to_string())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn connect_manifest_enabled_namespace_database(
|
||||
request: &ConnectRequest,
|
||||
) -> Result<LanceNamespaceDatabase> {
|
||||
let options = ListingDatabaseOptions::parse_from_map(&request.options)?;
|
||||
let session = request
|
||||
.session
|
||||
.clone()
|
||||
.unwrap_or_else(|| Arc::new(lance::session::Session::default()));
|
||||
let namespace_root =
|
||||
Self::prepare_namespace_root(&request.uri, &options.storage_options, session.clone())
|
||||
.await?;
|
||||
let ns_properties = Self::build_manifest_enabled_namespace_client_properties(
|
||||
&namespace_root,
|
||||
&options.storage_options,
|
||||
request.namespace_client_properties.clone(),
|
||||
);
|
||||
|
||||
LanceNamespaceDatabase::connect_with_new_table_config(
|
||||
"dir",
|
||||
ns_properties,
|
||||
options.storage_options,
|
||||
request.read_consistency_interval,
|
||||
Some(session),
|
||||
HashSet::new(),
|
||||
options.new_table_config,
|
||||
)
|
||||
.await
|
||||
.map(|db| db.with_uri(request.uri.clone()))
|
||||
}
|
||||
|
||||
/// Connect to a listing database
|
||||
///
|
||||
/// The URI should be a path to a directory where the tables are stored.
|
||||
@@ -821,12 +690,15 @@ impl ListingDatabase {
|
||||
store_params.storage_options_accessor = Some(Arc::new(accessor));
|
||||
}
|
||||
|
||||
write_params.data_storage_version = storage_version_override
|
||||
.or(write_params.data_storage_version)
|
||||
.or(self.new_table_config.data_storage_version);
|
||||
write_params.data_storage_version = self
|
||||
.new_table_config
|
||||
.data_storage_version
|
||||
.or(storage_version_override);
|
||||
|
||||
if let Some(enable_v2_manifest_paths) =
|
||||
v2_manifest_override.or(self.new_table_config.enable_v2_manifest_paths)
|
||||
if let Some(enable_v2_manifest_paths) = self
|
||||
.new_table_config
|
||||
.enable_v2_manifest_paths
|
||||
.or(v2_manifest_override)
|
||||
{
|
||||
write_params.enable_v2_manifest_paths = enable_v2_manifest_paths;
|
||||
}
|
||||
@@ -1286,7 +1158,6 @@ mod tests {
|
||||
client_config: Default::default(),
|
||||
options: Default::default(),
|
||||
namespace_client_properties: Default::default(),
|
||||
manifest_enabled: false,
|
||||
read_consistency_interval: None,
|
||||
session: None,
|
||||
};
|
||||
@@ -1421,7 +1292,6 @@ mod tests {
|
||||
client_config: Default::default(),
|
||||
options: options.clone(),
|
||||
namespace_client_properties: Default::default(),
|
||||
manifest_enabled: false,
|
||||
read_consistency_interval: None,
|
||||
session: None,
|
||||
};
|
||||
@@ -1957,7 +1827,6 @@ mod tests {
|
||||
client_config: Default::default(),
|
||||
options,
|
||||
namespace_client_properties: Default::default(),
|
||||
manifest_enabled: false,
|
||||
read_consistency_interval: None,
|
||||
session: None,
|
||||
};
|
||||
@@ -2064,7 +1933,6 @@ mod tests {
|
||||
client_config: Default::default(),
|
||||
options,
|
||||
namespace_client_properties: Default::default(),
|
||||
manifest_enabled: false,
|
||||
read_consistency_interval: None,
|
||||
session: None,
|
||||
};
|
||||
@@ -2137,7 +2005,6 @@ mod tests {
|
||||
client_config: Default::default(),
|
||||
options,
|
||||
namespace_client_properties: Default::default(),
|
||||
manifest_enabled: false,
|
||||
read_consistency_interval: None,
|
||||
session: None,
|
||||
};
|
||||
@@ -2335,7 +2202,6 @@ mod tests {
|
||||
client_config: Default::default(),
|
||||
options: Default::default(),
|
||||
namespace_client_properties,
|
||||
manifest_enabled: false,
|
||||
read_consistency_interval: None,
|
||||
session: None,
|
||||
};
|
||||
|
||||
@@ -24,10 +24,6 @@ use lance_table::io::commit::external_manifest::ExternalManifestCommitHandler;
|
||||
|
||||
use crate::connection::NamespaceClientPushdownOperation;
|
||||
use crate::database::ReadConsistency;
|
||||
use crate::database::listing::{
|
||||
NewTableConfig, OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS, OPT_NEW_TABLE_STORAGE_VERSION,
|
||||
OPT_NEW_TABLE_V2_MANIFEST_PATHS,
|
||||
};
|
||||
use crate::error::{Error, Result};
|
||||
use crate::table::NativeTable;
|
||||
use lance::dataset::WriteMode;
|
||||
@@ -54,8 +50,6 @@ pub struct LanceNamespaceDatabase {
|
||||
ns_impl: String,
|
||||
// Namespace properties used to construct the namespace client
|
||||
ns_properties: HashMap<String, String>,
|
||||
// Options for tables created by this connection
|
||||
new_table_config: NewTableConfig,
|
||||
}
|
||||
|
||||
impl LanceNamespaceDatabase {
|
||||
@@ -77,15 +71,9 @@ impl LanceNamespaceDatabase {
|
||||
pushdown_operations: namespace_client_pushdown_operations,
|
||||
ns_impl: namespace_client_impl,
|
||||
ns_properties: namespace_client_properties,
|
||||
new_table_config: NewTableConfig::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn with_uri(mut self, uri: impl Into<String>) -> Self {
|
||||
self.uri = uri.into();
|
||||
self
|
||||
}
|
||||
|
||||
pub async fn connect(
|
||||
ns_impl: &str,
|
||||
ns_properties: HashMap<String, String>,
|
||||
@@ -93,27 +81,6 @@ impl LanceNamespaceDatabase {
|
||||
read_consistency_interval: Option<std::time::Duration>,
|
||||
session: Option<Arc<lance::session::Session>>,
|
||||
pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
|
||||
) -> Result<Self> {
|
||||
Self::connect_with_new_table_config(
|
||||
ns_impl,
|
||||
ns_properties,
|
||||
storage_options,
|
||||
read_consistency_interval,
|
||||
session,
|
||||
pushdown_operations,
|
||||
NewTableConfig::default(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn connect_with_new_table_config(
|
||||
ns_impl: &str,
|
||||
ns_properties: HashMap<String, String>,
|
||||
storage_options: HashMap<String, String>,
|
||||
read_consistency_interval: Option<std::time::Duration>,
|
||||
session: Option<Arc<lance::session::Session>>,
|
||||
pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
|
||||
new_table_config: NewTableConfig,
|
||||
) -> Result<Self> {
|
||||
let mut builder = ConnectBuilder::new(ns_impl);
|
||||
for (key, value) in ns_properties.clone() {
|
||||
@@ -135,79 +102,8 @@ impl LanceNamespaceDatabase {
|
||||
pushdown_operations,
|
||||
ns_impl: ns_impl.to_string(),
|
||||
ns_properties,
|
||||
new_table_config,
|
||||
})
|
||||
}
|
||||
|
||||
fn extract_storage_overrides(
|
||||
&self,
|
||||
request: &DbCreateTableRequest,
|
||||
) -> Result<(
|
||||
Option<lance_encoding::version::LanceFileVersion>,
|
||||
Option<bool>,
|
||||
Option<bool>,
|
||||
)> {
|
||||
let storage_options = request
|
||||
.write_options
|
||||
.lance_write_params
|
||||
.as_ref()
|
||||
.and_then(|p| p.store_params.as_ref())
|
||||
.and_then(|sp| sp.storage_options());
|
||||
|
||||
let storage_version_override = storage_options
|
||||
.and_then(|opts| opts.get(OPT_NEW_TABLE_STORAGE_VERSION))
|
||||
.map(|s| s.parse::<lance_encoding::version::LanceFileVersion>())
|
||||
.transpose()?;
|
||||
|
||||
let v2_manifest_override = storage_options
|
||||
.and_then(|opts| opts.get(OPT_NEW_TABLE_V2_MANIFEST_PATHS))
|
||||
.map(|s| s.parse::<bool>())
|
||||
.transpose()
|
||||
.map_err(|_| Error::InvalidInput {
|
||||
message: "enable_v2_manifest_paths must be a boolean".to_string(),
|
||||
})?;
|
||||
|
||||
let stable_row_ids_override = storage_options
|
||||
.and_then(|opts| opts.get(OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS))
|
||||
.map(|s| s.parse::<bool>())
|
||||
.transpose()
|
||||
.map_err(|_| Error::InvalidInput {
|
||||
message: "enable_stable_row_ids must be a boolean".to_string(),
|
||||
})?;
|
||||
|
||||
Ok((
|
||||
storage_version_override,
|
||||
v2_manifest_override,
|
||||
stable_row_ids_override,
|
||||
))
|
||||
}
|
||||
|
||||
fn apply_new_table_config(
|
||||
&self,
|
||||
params: &mut lance::dataset::WriteParams,
|
||||
request: &DbCreateTableRequest,
|
||||
) -> Result<()> {
|
||||
let (storage_version_override, v2_manifest_override, stable_row_ids_override) =
|
||||
self.extract_storage_overrides(request)?;
|
||||
|
||||
params.data_storage_version = storage_version_override
|
||||
.or(params.data_storage_version)
|
||||
.or(self.new_table_config.data_storage_version);
|
||||
|
||||
if let Some(enable_v2_manifest_paths) =
|
||||
v2_manifest_override.or(self.new_table_config.enable_v2_manifest_paths)
|
||||
{
|
||||
params.enable_v2_manifest_paths = enable_v2_manifest_paths;
|
||||
}
|
||||
|
||||
if let Some(enable_stable_row_ids) =
|
||||
stable_row_ids_override.or(self.new_table_config.enable_stable_row_ids)
|
||||
{
|
||||
params.enable_stable_row_ids = enable_stable_row_ids;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for LanceNamespaceDatabase {
|
||||
@@ -403,12 +299,7 @@ impl Database for LanceNamespaceDatabase {
|
||||
};
|
||||
|
||||
// Build write params with storage options and commit handler
|
||||
let mut params = request
|
||||
.write_options
|
||||
.lance_write_params
|
||||
.clone()
|
||||
.unwrap_or_default();
|
||||
self.apply_new_table_config(&mut params, &request)?;
|
||||
let mut params = request.write_options.lance_write_params.unwrap_or_default();
|
||||
|
||||
if matches!(request.mode, CreateTableMode::Overwrite) {
|
||||
params.mode = WriteMode::Overwrite;
|
||||
|
||||
@@ -43,7 +43,7 @@ pub struct RemoteInsertExec<S: HttpSend = Sender> {
|
||||
client: RestfulLanceDbClient<S>,
|
||||
input: Arc<dyn ExecutionPlan>,
|
||||
overwrite: bool,
|
||||
properties: PlanProperties,
|
||||
properties: Arc<PlanProperties>,
|
||||
add_result: Arc<Mutex<Option<AddResult>>>,
|
||||
metrics: ExecutionPlanMetricsSet,
|
||||
upload_id: Option<String>,
|
||||
@@ -105,12 +105,12 @@ impl<S: HttpSend + 'static> RemoteInsertExec<S> {
|
||||
1
|
||||
};
|
||||
let schema = COUNT_SCHEMA.clone();
|
||||
let properties = PlanProperties::new(
|
||||
let properties = Arc::new(PlanProperties::new(
|
||||
EquivalenceProperties::new(schema),
|
||||
datafusion_physical_plan::Partitioning::UnknownPartitioning(num_partitions),
|
||||
datafusion_physical_plan::execution_plan::EmissionType::Final,
|
||||
datafusion_physical_plan::execution_plan::Boundedness::Bounded,
|
||||
);
|
||||
));
|
||||
|
||||
Self {
|
||||
table_name,
|
||||
@@ -232,7 +232,7 @@ impl<S: HttpSend + 'static> ExecutionPlan for RemoteInsertExec<S> {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.properties
|
||||
}
|
||||
|
||||
|
||||
@@ -39,21 +39,26 @@ use lance_index::scalar::FullTextSearchQuery;
|
||||
struct MetadataEraserExec {
|
||||
input: Arc<dyn ExecutionPlan>,
|
||||
schema: Arc<ArrowSchema>,
|
||||
properties: PlanProperties,
|
||||
properties: Arc<PlanProperties>,
|
||||
}
|
||||
|
||||
impl MetadataEraserExec {
|
||||
fn compute_properties_from_input(
|
||||
input: &Arc<dyn ExecutionPlan>,
|
||||
schema: &Arc<ArrowSchema>,
|
||||
) -> PlanProperties {
|
||||
) -> Arc<PlanProperties> {
|
||||
let input_properties = input.properties();
|
||||
let eq_properties = input_properties
|
||||
.eq_properties
|
||||
.clone()
|
||||
.with_new_schema(schema.clone())
|
||||
.unwrap();
|
||||
input_properties.clone().with_eq_properties(eq_properties)
|
||||
Arc::new(
|
||||
input_properties
|
||||
.as_ref()
|
||||
.clone()
|
||||
.with_eq_properties(eq_properties),
|
||||
)
|
||||
}
|
||||
|
||||
fn new(input: Arc<dyn ExecutionPlan>) -> Self {
|
||||
@@ -87,7 +92,7 @@ impl ExecutionPlan for MetadataEraserExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.properties
|
||||
}
|
||||
|
||||
|
||||
@@ -81,7 +81,7 @@ pub struct InsertExec {
|
||||
dataset: Arc<Dataset>,
|
||||
input: Arc<dyn ExecutionPlan>,
|
||||
write_params: WriteParams,
|
||||
properties: PlanProperties,
|
||||
properties: Arc<PlanProperties>,
|
||||
partial_transactions: Arc<Mutex<Vec<Transaction>>>,
|
||||
metrics: ExecutionPlanMetricsSet,
|
||||
}
|
||||
@@ -95,12 +95,12 @@ impl InsertExec {
|
||||
) -> Self {
|
||||
let schema = COUNT_SCHEMA.clone();
|
||||
let num_partitions = input.output_partitioning().partition_count();
|
||||
let properties = PlanProperties::new(
|
||||
let properties = Arc::new(PlanProperties::new(
|
||||
EquivalenceProperties::new(schema),
|
||||
Partitioning::UnknownPartitioning(num_partitions),
|
||||
EmissionType::Final,
|
||||
Boundedness::Bounded,
|
||||
);
|
||||
));
|
||||
|
||||
Self {
|
||||
ds_wrapper,
|
||||
@@ -136,7 +136,7 @@ impl ExecutionPlan for InsertExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.properties
|
||||
}
|
||||
|
||||
|
||||
@@ -20,7 +20,7 @@ pub(crate) struct ScannableExec {
|
||||
// We don't require Scannable to be Sync, so we wrap it in a Mutex to allow safe concurrent access.
|
||||
source: Mutex<Box<dyn Scannable>>,
|
||||
num_rows: Option<usize>,
|
||||
properties: PlanProperties,
|
||||
properties: Arc<PlanProperties>,
|
||||
tracker: Option<Arc<WriteProgressTracker>>,
|
||||
}
|
||||
|
||||
@@ -37,12 +37,12 @@ impl ScannableExec {
|
||||
pub fn new(source: Box<dyn Scannable>, tracker: Option<Arc<WriteProgressTracker>>) -> Self {
|
||||
let schema = source.schema();
|
||||
let eq_properties = EquivalenceProperties::new(schema);
|
||||
let properties = PlanProperties::new(
|
||||
let properties = Arc::new(PlanProperties::new(
|
||||
eq_properties,
|
||||
Partitioning::UnknownPartitioning(1),
|
||||
EmissionType::Incremental,
|
||||
datafusion_physical_plan::execution_plan::Boundedness::Bounded,
|
||||
);
|
||||
));
|
||||
|
||||
let num_rows = source.num_rows();
|
||||
let source = Mutex::new(source);
|
||||
@@ -70,7 +70,7 @@ impl ExecutionPlan for ScannableExec {
|
||||
self
|
||||
}
|
||||
|
||||
fn properties(&self) -> &PlanProperties {
|
||||
fn properties(&self) -> &Arc<PlanProperties> {
|
||||
&self.properties
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user