mirror of
https://github.com/lancedb/lancedb.git
synced 2026-05-13 01:50:42 +00:00
Compare commits
1 Commits
ticket/324
...
v0.28.0-be
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b7efc9c9c0 |
10
Cargo.lock
generated
10
Cargo.lock
generated
@@ -4476,11 +4476,9 @@ dependencies = [
|
||||
"arrow-schema",
|
||||
"async-trait",
|
||||
"axum",
|
||||
"base64 0.22.1",
|
||||
"bytes",
|
||||
"chrono",
|
||||
"futures",
|
||||
"hmac",
|
||||
"lance",
|
||||
"lance-core",
|
||||
"lance-index",
|
||||
@@ -4490,12 +4488,10 @@ dependencies = [
|
||||
"lance-table",
|
||||
"log",
|
||||
"object_store",
|
||||
"quick-xml 0.38.4",
|
||||
"rand 0.9.4",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sha2",
|
||||
"snafu 0.9.0",
|
||||
"tokio",
|
||||
"tower",
|
||||
@@ -4580,7 +4576,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lancedb"
|
||||
version = "0.28.0-beta.10"
|
||||
version = "0.28.0-beta.9"
|
||||
dependencies = [
|
||||
"ahash",
|
||||
"anyhow",
|
||||
@@ -4662,7 +4658,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lancedb-nodejs"
|
||||
version = "0.28.0-beta.10"
|
||||
version = "0.28.0-beta.9"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
@@ -4684,7 +4680,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lancedb-python"
|
||||
version = "0.31.0-beta.10"
|
||||
version = "0.31.0-beta.9"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"async-trait",
|
||||
|
||||
@@ -41,29 +41,6 @@ for testing purposes.
|
||||
|
||||
***
|
||||
|
||||
### manifestEnabled?
|
||||
|
||||
```ts
|
||||
optional manifestEnabled: boolean;
|
||||
```
|
||||
|
||||
(For LanceDB OSS only): use directory namespace manifests as the source
|
||||
of truth for table metadata. Existing directory-listed root tables are
|
||||
migrated into the manifest on access.
|
||||
|
||||
***
|
||||
|
||||
### namespaceClientProperties?
|
||||
|
||||
```ts
|
||||
optional namespaceClientProperties: Record<string, string>;
|
||||
```
|
||||
|
||||
(For LanceDB OSS only): extra properties for the backing namespace
|
||||
client used by manifest-enabled native connections.
|
||||
|
||||
***
|
||||
|
||||
### readConsistencyInterval?
|
||||
|
||||
```ts
|
||||
|
||||
4
nodejs/package-lock.json
generated
4
nodejs/package-lock.json
generated
@@ -1,12 +1,12 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb",
|
||||
"version": "0.28.0-beta.10",
|
||||
"version": "0.28.0-beta.9",
|
||||
"lockfileVersion": 3,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "@lancedb/lancedb",
|
||||
"version": "0.28.0-beta.10",
|
||||
"version": "0.28.0-beta.9",
|
||||
"cpu": [
|
||||
"x64",
|
||||
"arm64"
|
||||
|
||||
@@ -67,12 +67,6 @@ impl Connection {
|
||||
builder = builder.storage_option(key, value);
|
||||
}
|
||||
}
|
||||
if let Some(manifest_enabled) = options.manifest_enabled {
|
||||
builder = builder.manifest_enabled(manifest_enabled);
|
||||
}
|
||||
if let Some(namespace_client_properties) = options.namespace_client_properties {
|
||||
builder = builder.namespace_client_properties(namespace_client_properties);
|
||||
}
|
||||
|
||||
// Create client config, optionally with header provider
|
||||
let client_config = options.client_config.unwrap_or_default();
|
||||
|
||||
@@ -37,13 +37,6 @@ pub struct ConnectionOptions {
|
||||
///
|
||||
/// The available options are described at https://docs.lancedb.com/storage/
|
||||
pub storage_options: Option<HashMap<String, String>>,
|
||||
/// (For LanceDB OSS only): use directory namespace manifests as the source
|
||||
/// of truth for table metadata. Existing directory-listed root tables are
|
||||
/// migrated into the manifest on access.
|
||||
pub manifest_enabled: Option<bool>,
|
||||
/// (For LanceDB OSS only): extra properties for the backing namespace
|
||||
/// client used by manifest-enabled native connections.
|
||||
pub namespace_client_properties: Option<HashMap<String, String>>,
|
||||
/// (For LanceDB OSS only): the session to use for this connection. Holds
|
||||
/// shared caches and other session-specific state.
|
||||
pub session: Option<session::Session>,
|
||||
|
||||
@@ -73,7 +73,6 @@ def connect(
|
||||
client_config: Union[ClientConfig, Dict[str, Any], None] = None,
|
||||
storage_options: Optional[Dict[str, str]] = None,
|
||||
session: Optional[Session] = None,
|
||||
manifest_enabled: bool = False,
|
||||
namespace_client_impl: Optional[str] = None,
|
||||
namespace_client_properties: Optional[Dict[str, str]] = None,
|
||||
namespace_client_pushdown_operations: Optional[List[str]] = None,
|
||||
@@ -112,10 +111,6 @@ def connect(
|
||||
storage_options: dict, optional
|
||||
Additional options for the storage backend. See available options at
|
||||
<https://docs.lancedb.com/storage/>
|
||||
manifest_enabled : bool, default False
|
||||
When true for local/native connections, use directory namespace
|
||||
manifests as the source of truth for table metadata. Existing
|
||||
directory-listed root tables are migrated into the manifest on access.
|
||||
session: Session, optional
|
||||
(For LanceDB OSS only)
|
||||
A session to use for this connection. Sessions allow you to configure
|
||||
@@ -163,11 +158,11 @@ def connect(
|
||||
conn : DBConnection
|
||||
A connection to a LanceDB database.
|
||||
"""
|
||||
if namespace_client_impl is not None:
|
||||
if namespace_client_properties is None:
|
||||
if namespace_client_impl is not None or namespace_client_properties is not None:
|
||||
if namespace_client_impl is None or namespace_client_properties is None:
|
||||
raise ValueError(
|
||||
"namespace_client_properties must be provided when "
|
||||
"namespace_client_impl is set"
|
||||
"Both namespace_client_impl and "
|
||||
"namespace_client_properties must be provided"
|
||||
)
|
||||
if kwargs:
|
||||
raise ValueError(f"Unknown keyword arguments: {kwargs}")
|
||||
@@ -180,12 +175,6 @@ def connect(
|
||||
namespace_client_pushdown_operations=namespace_client_pushdown_operations,
|
||||
)
|
||||
|
||||
if namespace_client_properties is not None and not manifest_enabled:
|
||||
raise ValueError(
|
||||
"namespace_client_impl must be provided when using "
|
||||
"namespace_client_properties unless manifest_enabled=True"
|
||||
)
|
||||
|
||||
if namespace_client_pushdown_operations is not None:
|
||||
raise ValueError(
|
||||
"namespace_client_pushdown_operations is only valid when "
|
||||
@@ -223,8 +212,6 @@ def connect(
|
||||
read_consistency_interval=read_consistency_interval,
|
||||
storage_options=storage_options,
|
||||
session=session,
|
||||
manifest_enabled=manifest_enabled,
|
||||
namespace_client_properties=namespace_client_properties,
|
||||
)
|
||||
|
||||
|
||||
@@ -302,8 +289,6 @@ def deserialize_conn(
|
||||
parsed["uri"],
|
||||
read_consistency_interval=rci,
|
||||
storage_options=storage_options,
|
||||
manifest_enabled=parsed.get("manifest_enabled", False),
|
||||
namespace_client_properties=parsed.get("namespace_client_properties"),
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"Unknown connection_type: {connection_type}")
|
||||
@@ -319,8 +304,6 @@ async def connect_async(
|
||||
client_config: Optional[Union[ClientConfig, Dict[str, Any]]] = None,
|
||||
storage_options: Optional[Dict[str, str]] = None,
|
||||
session: Optional[Session] = None,
|
||||
manifest_enabled: bool = False,
|
||||
namespace_client_properties: Optional[Dict[str, str]] = None,
|
||||
) -> AsyncConnection:
|
||||
"""Connect to a LanceDB database.
|
||||
|
||||
@@ -360,13 +343,6 @@ async def connect_async(
|
||||
cache sizes for index and metadata caches, which can significantly
|
||||
impact memory use and performance. They can also be re-used across
|
||||
multiple connections to share the same cache state.
|
||||
manifest_enabled : bool, default False
|
||||
When true for local/native connections, use directory namespace
|
||||
manifests as the source of truth for table metadata. Existing
|
||||
directory-listed root tables are migrated into the manifest on access.
|
||||
namespace_client_properties : dict, optional
|
||||
Additional directory namespace client properties to use with
|
||||
``manifest_enabled=True``.
|
||||
|
||||
Examples
|
||||
--------
|
||||
@@ -409,8 +385,6 @@ async def connect_async(
|
||||
client_config,
|
||||
storage_options,
|
||||
session,
|
||||
manifest_enabled,
|
||||
namespace_client_properties,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@@ -242,8 +242,6 @@ async def connect(
|
||||
client_config: Optional[Union[ClientConfig, Dict[str, Any]]],
|
||||
storage_options: Optional[Dict[str, str]],
|
||||
session: Optional[Session],
|
||||
manifest_enabled: bool = False,
|
||||
namespace_client_properties: Optional[Dict[str, str]] = None,
|
||||
) -> Connection: ...
|
||||
|
||||
class RecordBatchStream:
|
||||
|
||||
@@ -590,13 +590,8 @@ class LanceDBConnection(DBConnection):
|
||||
read_consistency_interval: Optional[timedelta] = None,
|
||||
storage_options: Optional[Dict[str, str]] = None,
|
||||
session: Optional[Session] = None,
|
||||
manifest_enabled: bool = False,
|
||||
namespace_client_properties: Optional[Dict[str, str]] = None,
|
||||
_inner: Optional[LanceDbConnection] = None,
|
||||
):
|
||||
self.storage_options = storage_options
|
||||
self._manifest_enabled = manifest_enabled
|
||||
self._namespace_client_properties = namespace_client_properties
|
||||
if _inner is not None:
|
||||
self._conn = _inner
|
||||
self._cached_namespace_client = None
|
||||
@@ -638,8 +633,6 @@ class LanceDBConnection(DBConnection):
|
||||
None,
|
||||
storage_options,
|
||||
session,
|
||||
manifest_enabled,
|
||||
namespace_client_properties,
|
||||
)
|
||||
|
||||
# TODO: It would be nice if we didn't store self.storage_options but it is
|
||||
@@ -647,6 +640,7 @@ class LanceDBConnection(DBConnection):
|
||||
# work because some paths like LanceDBConnection.from_inner will lose the
|
||||
# storage_options. Also, this class really shouldn't be holding any state
|
||||
# beyond _conn.
|
||||
self.storage_options = storage_options
|
||||
self._conn = AsyncConnection(LOOP.run(do_connect()))
|
||||
self._cached_namespace_client: Optional[LanceNamespace] = None
|
||||
|
||||
@@ -683,8 +677,6 @@ class LanceDBConnection(DBConnection):
|
||||
"connection_type": "local",
|
||||
"uri": self.uri,
|
||||
"storage_options": self.storage_options,
|
||||
"manifest_enabled": self._manifest_enabled,
|
||||
"namespace_client_properties": self._namespace_client_properties,
|
||||
"read_consistency_interval_seconds": (
|
||||
rci.total_seconds() if rci else None
|
||||
),
|
||||
|
||||
@@ -779,24 +779,6 @@ class Permutation:
|
||||
batch = LOOP.run(do_getitems())
|
||||
return self.transform_fn(batch)
|
||||
|
||||
def fetch(self, indices: list[int]) -> Any:
|
||||
"""
|
||||
Fetch rows from the permutation
|
||||
|
||||
returns the rows for the given permutation in the same shape as configured by
|
||||
with_format / with_transform.
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> import lancedb
|
||||
>>> db = lancedb.connect("memory:///")
|
||||
>>> tbl = db.create_table("tbl", data=[{"x": x} for x in range(10)])
|
||||
>>> perm = Permutation.identity(tbl)
|
||||
>>> perm.fetch([0, 5, 9])
|
||||
[{'x': 0}, {'x': 5}, {'x': 9}]
|
||||
"""
|
||||
return self.__getitems__(indices)
|
||||
|
||||
@deprecated(details="Use with_skip instead")
|
||||
def skip(self, skip: int) -> "Permutation":
|
||||
"""
|
||||
|
||||
@@ -1095,8 +1095,3 @@ def test_getitems_invalid_offset(some_permutation: Permutation):
|
||||
"""Test __getitems__ with an out-of-range offset raises an error."""
|
||||
with pytest.raises(Exception):
|
||||
some_permutation.__getitems__([999999])
|
||||
|
||||
|
||||
def test_fetch_matches_getitems(some_permutation: Permutation):
|
||||
indices = [0, 1, 2, 10, 100]
|
||||
assert some_permutation.fetch(indices) == some_permutation.__getitems__(indices)
|
||||
|
||||
@@ -525,7 +525,7 @@ impl Connection {
|
||||
}
|
||||
|
||||
#[pyfunction]
|
||||
#[pyo3(signature = (uri, api_key=None, region=None, host_override=None, read_consistency_interval=None, client_config=None, storage_options=None, session=None, manifest_enabled=false, namespace_client_properties=None))]
|
||||
#[pyo3(signature = (uri, api_key=None, region=None, host_override=None, read_consistency_interval=None, client_config=None, storage_options=None, session=None))]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn connect(
|
||||
py: Python<'_>,
|
||||
@@ -537,8 +537,6 @@ pub fn connect(
|
||||
client_config: Option<PyClientConfig>,
|
||||
storage_options: Option<HashMap<String, String>>,
|
||||
session: Option<crate::session::Session>,
|
||||
manifest_enabled: bool,
|
||||
namespace_client_properties: Option<HashMap<String, String>>,
|
||||
) -> PyResult<Bound<'_, PyAny>> {
|
||||
future_into_py(py, async move {
|
||||
let mut builder = lancedb::connect(&uri);
|
||||
@@ -558,12 +556,6 @@ pub fn connect(
|
||||
if let Some(storage_options) = storage_options {
|
||||
builder = builder.storage_options(storage_options);
|
||||
}
|
||||
if manifest_enabled {
|
||||
builder = builder.manifest_enabled(true);
|
||||
}
|
||||
if let Some(namespace_client_properties) = namespace_client_properties {
|
||||
builder = builder.namespace_client_properties(namespace_client_properties);
|
||||
}
|
||||
#[cfg(feature = "remote")]
|
||||
if let Some(client_config) = client_config {
|
||||
builder = builder.client_config(client_config.into());
|
||||
|
||||
@@ -111,12 +111,7 @@ default = []
|
||||
aws = ["lance/aws", "lance-io/aws", "lance-namespace-impls/dir-aws"]
|
||||
oss = ["lance/oss", "lance-io/oss", "lance-namespace-impls/dir-oss"]
|
||||
gcs = ["lance/gcp", "lance-io/gcp", "lance-namespace-impls/dir-gcp"]
|
||||
azure = [
|
||||
"lance/azure",
|
||||
"lance-io/azure",
|
||||
"lance-namespace-impls/dir-azure",
|
||||
"lance-namespace-impls/credential-vendor-azure",
|
||||
]
|
||||
azure = ["lance/azure", "lance-io/azure", "lance-namespace-impls/dir-azure"]
|
||||
huggingface = [
|
||||
"lance/huggingface",
|
||||
"lance-io/huggingface",
|
||||
|
||||
@@ -590,15 +590,6 @@ pub struct ConnectRequest {
|
||||
/// storage options.
|
||||
pub namespace_client_properties: HashMap<String, String>,
|
||||
|
||||
/// Use directory namespace manifests as the source of truth for native
|
||||
/// LanceDB table metadata.
|
||||
///
|
||||
/// When enabled for a local/native connection, LanceDB returns a
|
||||
/// namespace-backed database directly. Directory listing fallback remains
|
||||
/// enabled for migration, and directory-listing-to-manifest migration is
|
||||
/// forced on.
|
||||
pub manifest_enabled: bool,
|
||||
|
||||
/// The interval at which to check for updates from other processes.
|
||||
///
|
||||
/// If None, then consistency is not checked. For performance
|
||||
@@ -639,7 +630,6 @@ impl ConnectBuilder {
|
||||
read_consistency_interval: None,
|
||||
options: HashMap::new(),
|
||||
namespace_client_properties: HashMap::new(),
|
||||
manifest_enabled: false,
|
||||
session: None,
|
||||
},
|
||||
embedding_registry: None,
|
||||
@@ -801,17 +791,6 @@ impl ConnectBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
/// Enable or disable manifest-backed directory namespace mode for local
|
||||
/// native connections.
|
||||
///
|
||||
/// When enabled, the connection uses the directory namespace database
|
||||
/// directly for all table operations and forces
|
||||
/// `dir_listing_to_manifest_migration_enabled=true`.
|
||||
pub fn manifest_enabled(mut self, enabled: bool) -> Self {
|
||||
self.request.manifest_enabled = enabled;
|
||||
self
|
||||
}
|
||||
|
||||
/// The interval at which to check for updates from other processes. This
|
||||
/// only affects LanceDB OSS.
|
||||
///
|
||||
@@ -907,16 +886,6 @@ impl ConnectBuilder {
|
||||
pub async fn execute(self) -> Result<Connection> {
|
||||
if self.request.uri.starts_with("db") {
|
||||
self.execute_remote()
|
||||
} else if self.request.manifest_enabled {
|
||||
let internal = Arc::new(
|
||||
ListingDatabase::connect_manifest_enabled_namespace_database(&self.request).await?,
|
||||
);
|
||||
Ok(Connection {
|
||||
internal,
|
||||
embedding_registry: self
|
||||
.embedding_registry
|
||||
.unwrap_or_else(|| Arc::new(MemoryRegistry::new())),
|
||||
})
|
||||
} else {
|
||||
let internal = Arc::new(ListingDatabase::connect_with_options(&self.request).await?);
|
||||
Ok(Connection {
|
||||
@@ -1163,9 +1132,6 @@ mod tests {
|
||||
use lance_testing::datagen::{BatchGenerator, IncrementingInt32};
|
||||
use tempfile::tempdir;
|
||||
|
||||
use crate::database::listing::{ListingDatabaseOptions, OPT_NEW_TABLE_V2_MANIFEST_PATHS};
|
||||
use crate::database::namespace::LanceNamespaceDatabase;
|
||||
use crate::table::NativeTable;
|
||||
use crate::test_utils::connection::new_test_connection;
|
||||
|
||||
use super::*;
|
||||
@@ -1238,147 +1204,6 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_connect_with_manifest_enabled_uses_directory_namespace() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
|
||||
let db = connect(uri)
|
||||
.manifest_enabled(true)
|
||||
.storage_option("timeout", "30s")
|
||||
.namespace_client_property("manifest_enabled", "false")
|
||||
.namespace_client_property("dir_listing_to_manifest_migration_enabled", "false")
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(
|
||||
db.database()
|
||||
.as_any()
|
||||
.downcast_ref::<LanceNamespaceDatabase>()
|
||||
.is_some()
|
||||
);
|
||||
assert_eq!(db.uri(), uri);
|
||||
|
||||
let (ns_impl, properties) = db.namespace_client_config().await.unwrap();
|
||||
assert_eq!(ns_impl, "dir");
|
||||
assert_eq!(properties.get("root"), Some(&uri.to_string()));
|
||||
assert_eq!(
|
||||
properties.get("manifest_enabled"),
|
||||
Some(&"true".to_string())
|
||||
);
|
||||
assert_eq!(
|
||||
properties.get("dir_listing_to_manifest_migration_enabled"),
|
||||
Some(&"true".to_string())
|
||||
);
|
||||
assert_eq!(properties.get("storage.timeout"), Some(&"30s".to_string()));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_manifest_enabled_rejects_commit_engine_uri() {
|
||||
let Err(err) = connect("s3+ddb://bucket/db?ddbTableName=manifest")
|
||||
.manifest_enabled(true)
|
||||
.execute()
|
||||
.await
|
||||
else {
|
||||
panic!("expected manifest-enabled s3+ddb connection to fail");
|
||||
};
|
||||
assert!(
|
||||
matches!(err, Error::NotSupported { message } if message.contains("commit engine URI schemes"))
|
||||
);
|
||||
|
||||
let Err(err) = connect("s3://bucket/db?engine=ddb&ddbTableName=manifest")
|
||||
.manifest_enabled(true)
|
||||
.execute()
|
||||
.await
|
||||
else {
|
||||
panic!("expected manifest-enabled engine query connection to fail");
|
||||
};
|
||||
assert!(
|
||||
matches!(err, Error::NotSupported { message } if message.contains("commit engine"))
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_manifest_enabled_connection_migrates_root_listing_table() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)]));
|
||||
|
||||
connect(uri)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap()
|
||||
.create_empty_table("legacy", schema)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let db = connect(uri).manifest_enabled(true).execute().await.unwrap();
|
||||
let tables = db.table_names().execute().await.unwrap();
|
||||
assert_eq!(tables, vec!["legacy".to_string()]);
|
||||
db.open_table("legacy").execute().await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_manifest_enabled_preserves_new_table_options() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
let options = ListingDatabaseOptions::builder()
|
||||
.enable_v2_manifest_paths(true)
|
||||
.build();
|
||||
let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)]));
|
||||
|
||||
let table = connect(uri)
|
||||
.manifest_enabled(true)
|
||||
.database_options(&options)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap()
|
||||
.create_empty_table("v1_manifest", schema)
|
||||
.storage_option(OPT_NEW_TABLE_V2_MANIFEST_PATHS, "false")
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let native_table = table
|
||||
.base_table()
|
||||
.as_any()
|
||||
.downcast_ref::<NativeTable>()
|
||||
.unwrap();
|
||||
assert!(!native_table.uses_v2_manifest_paths().await.unwrap());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_manifest_enabled_vend_input_storage_options() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)]));
|
||||
|
||||
let table = connect(uri)
|
||||
.manifest_enabled(true)
|
||||
.storage_option("test_storage_option", "test_value")
|
||||
.namespace_client_property("vend_input_storage_options", "true")
|
||||
.namespace_client_property(
|
||||
"vend_input_storage_options_refresh_interval_millis",
|
||||
"60000",
|
||||
)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap()
|
||||
.create_empty_table("vended", schema)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let storage_options = table.latest_storage_options().await.unwrap().unwrap();
|
||||
assert_eq!(
|
||||
storage_options.get("test_storage_option"),
|
||||
Some(&"test_value".to_string())
|
||||
);
|
||||
assert!(storage_options.contains_key("expires_at_millis"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_table_names() {
|
||||
let tc = new_test_connection().await.unwrap();
|
||||
|
||||
@@ -285,7 +285,7 @@ const MIRRORED_STORE: &str = "mirroredStore";
|
||||
|
||||
/// A connection to LanceDB
|
||||
impl ListingDatabase {
|
||||
pub(crate) fn build_namespace_client_properties(
|
||||
fn build_namespace_client_properties(
|
||||
uri: &str,
|
||||
storage_options: &HashMap<String, String>,
|
||||
namespace_client_properties: HashMap<String, String>,
|
||||
@@ -298,24 +298,6 @@ impl ListingDatabase {
|
||||
properties
|
||||
}
|
||||
|
||||
pub(crate) fn build_manifest_enabled_namespace_client_properties(
|
||||
uri: &str,
|
||||
storage_options: &HashMap<String, String>,
|
||||
namespace_client_properties: HashMap<String, String>,
|
||||
) -> HashMap<String, String> {
|
||||
let mut properties = Self::build_namespace_client_properties(
|
||||
uri,
|
||||
storage_options,
|
||||
namespace_client_properties,
|
||||
);
|
||||
properties.insert("manifest_enabled".to_string(), "true".to_string());
|
||||
properties.insert(
|
||||
"dir_listing_to_manifest_migration_enabled".to_string(),
|
||||
"true".to_string(),
|
||||
);
|
||||
properties
|
||||
}
|
||||
|
||||
async fn connect_namespace_database(
|
||||
uri: &str,
|
||||
storage_options: HashMap<String, String>,
|
||||
@@ -341,119 +323,6 @@ impl ListingDatabase {
|
||||
))
|
||||
}
|
||||
|
||||
async fn prepare_namespace_root(
|
||||
uri: &str,
|
||||
storage_options: &HashMap<String, String>,
|
||||
session: Arc<lance::session::Session>,
|
||||
) -> Result<String> {
|
||||
match url::Url::parse(uri) {
|
||||
Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
|
||||
let (object_store, _) = ObjectStore::from_uri_and_params(
|
||||
session.store_registry(),
|
||||
uri,
|
||||
&ObjectStoreParams::default(),
|
||||
)
|
||||
.await?;
|
||||
if object_store.is_local() {
|
||||
Self::try_create_dir(uri).context(CreateDirSnafu { path: uri })?;
|
||||
}
|
||||
Ok(uri.to_string())
|
||||
}
|
||||
Ok(mut url) => {
|
||||
if url.scheme().contains('+') {
|
||||
return Err(Error::NotSupported {
|
||||
message: "commit engine URI schemes are not supported for manifest-enabled namespace connections".to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
for (key, value) in url.query_pairs() {
|
||||
if key == ENGINE {
|
||||
return Err(Error::NotSupported {
|
||||
message: format!(
|
||||
"commit engine '{}' is not supported for manifest-enabled namespace connections",
|
||||
value
|
||||
),
|
||||
});
|
||||
} else if key == MIRRORED_STORE {
|
||||
return Err(Error::NotSupported {
|
||||
message: "mirrored store is not supported for manifest-enabled namespace connections"
|
||||
.to_string(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
url.set_query(None);
|
||||
let plain_uri = url.to_string();
|
||||
|
||||
let os_params = ObjectStoreParams {
|
||||
storage_options_accessor: if storage_options.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(Arc::new(StorageOptionsAccessor::with_static_options(
|
||||
storage_options.clone(),
|
||||
)))
|
||||
},
|
||||
..Default::default()
|
||||
};
|
||||
let (object_store, _) = ObjectStore::from_uri_and_params(
|
||||
session.store_registry(),
|
||||
&plain_uri,
|
||||
&os_params,
|
||||
)
|
||||
.await?;
|
||||
if object_store.is_local() {
|
||||
Self::try_create_dir(&plain_uri).context(CreateDirSnafu {
|
||||
path: plain_uri.clone(),
|
||||
})?;
|
||||
}
|
||||
|
||||
Ok(plain_uri)
|
||||
}
|
||||
Err(_) => {
|
||||
let (object_store, _) = ObjectStore::from_uri_and_params(
|
||||
session.store_registry(),
|
||||
uri,
|
||||
&ObjectStoreParams::default(),
|
||||
)
|
||||
.await?;
|
||||
if object_store.is_local() {
|
||||
Self::try_create_dir(uri).context(CreateDirSnafu { path: uri })?;
|
||||
}
|
||||
Ok(uri.to_string())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn connect_manifest_enabled_namespace_database(
|
||||
request: &ConnectRequest,
|
||||
) -> Result<LanceNamespaceDatabase> {
|
||||
let options = ListingDatabaseOptions::parse_from_map(&request.options)?;
|
||||
let session = request
|
||||
.session
|
||||
.clone()
|
||||
.unwrap_or_else(|| Arc::new(lance::session::Session::default()));
|
||||
let namespace_root =
|
||||
Self::prepare_namespace_root(&request.uri, &options.storage_options, session.clone())
|
||||
.await?;
|
||||
let ns_properties = Self::build_manifest_enabled_namespace_client_properties(
|
||||
&namespace_root,
|
||||
&options.storage_options,
|
||||
request.namespace_client_properties.clone(),
|
||||
);
|
||||
|
||||
LanceNamespaceDatabase::connect_with_new_table_config(
|
||||
"dir",
|
||||
ns_properties,
|
||||
options.storage_options,
|
||||
request.read_consistency_interval,
|
||||
Some(session),
|
||||
HashSet::new(),
|
||||
options.new_table_config,
|
||||
)
|
||||
.await
|
||||
.map(|db| db.with_uri(request.uri.clone()))
|
||||
}
|
||||
|
||||
/// Connect to a listing database
|
||||
///
|
||||
/// The URI should be a path to a directory where the tables are stored.
|
||||
@@ -821,12 +690,15 @@ impl ListingDatabase {
|
||||
store_params.storage_options_accessor = Some(Arc::new(accessor));
|
||||
}
|
||||
|
||||
write_params.data_storage_version = storage_version_override
|
||||
.or(write_params.data_storage_version)
|
||||
.or(self.new_table_config.data_storage_version);
|
||||
write_params.data_storage_version = self
|
||||
.new_table_config
|
||||
.data_storage_version
|
||||
.or(storage_version_override);
|
||||
|
||||
if let Some(enable_v2_manifest_paths) =
|
||||
v2_manifest_override.or(self.new_table_config.enable_v2_manifest_paths)
|
||||
if let Some(enable_v2_manifest_paths) = self
|
||||
.new_table_config
|
||||
.enable_v2_manifest_paths
|
||||
.or(v2_manifest_override)
|
||||
{
|
||||
write_params.enable_v2_manifest_paths = enable_v2_manifest_paths;
|
||||
}
|
||||
@@ -1286,7 +1158,6 @@ mod tests {
|
||||
client_config: Default::default(),
|
||||
options: Default::default(),
|
||||
namespace_client_properties: Default::default(),
|
||||
manifest_enabled: false,
|
||||
read_consistency_interval: None,
|
||||
session: None,
|
||||
};
|
||||
@@ -1421,7 +1292,6 @@ mod tests {
|
||||
client_config: Default::default(),
|
||||
options: options.clone(),
|
||||
namespace_client_properties: Default::default(),
|
||||
manifest_enabled: false,
|
||||
read_consistency_interval: None,
|
||||
session: None,
|
||||
};
|
||||
@@ -1957,7 +1827,6 @@ mod tests {
|
||||
client_config: Default::default(),
|
||||
options,
|
||||
namespace_client_properties: Default::default(),
|
||||
manifest_enabled: false,
|
||||
read_consistency_interval: None,
|
||||
session: None,
|
||||
};
|
||||
@@ -2064,7 +1933,6 @@ mod tests {
|
||||
client_config: Default::default(),
|
||||
options,
|
||||
namespace_client_properties: Default::default(),
|
||||
manifest_enabled: false,
|
||||
read_consistency_interval: None,
|
||||
session: None,
|
||||
};
|
||||
@@ -2137,7 +2005,6 @@ mod tests {
|
||||
client_config: Default::default(),
|
||||
options,
|
||||
namespace_client_properties: Default::default(),
|
||||
manifest_enabled: false,
|
||||
read_consistency_interval: None,
|
||||
session: None,
|
||||
};
|
||||
@@ -2335,7 +2202,6 @@ mod tests {
|
||||
client_config: Default::default(),
|
||||
options: Default::default(),
|
||||
namespace_client_properties,
|
||||
manifest_enabled: false,
|
||||
read_consistency_interval: None,
|
||||
session: None,
|
||||
};
|
||||
|
||||
@@ -24,10 +24,6 @@ use lance_table::io::commit::external_manifest::ExternalManifestCommitHandler;
|
||||
|
||||
use crate::connection::NamespaceClientPushdownOperation;
|
||||
use crate::database::ReadConsistency;
|
||||
use crate::database::listing::{
|
||||
NewTableConfig, OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS, OPT_NEW_TABLE_STORAGE_VERSION,
|
||||
OPT_NEW_TABLE_V2_MANIFEST_PATHS,
|
||||
};
|
||||
use crate::error::{Error, Result};
|
||||
use crate::table::NativeTable;
|
||||
use lance::dataset::WriteMode;
|
||||
@@ -54,8 +50,6 @@ pub struct LanceNamespaceDatabase {
|
||||
ns_impl: String,
|
||||
// Namespace properties used to construct the namespace client
|
||||
ns_properties: HashMap<String, String>,
|
||||
// Options for tables created by this connection
|
||||
new_table_config: NewTableConfig,
|
||||
}
|
||||
|
||||
impl LanceNamespaceDatabase {
|
||||
@@ -77,15 +71,9 @@ impl LanceNamespaceDatabase {
|
||||
pushdown_operations: namespace_client_pushdown_operations,
|
||||
ns_impl: namespace_client_impl,
|
||||
ns_properties: namespace_client_properties,
|
||||
new_table_config: NewTableConfig::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn with_uri(mut self, uri: impl Into<String>) -> Self {
|
||||
self.uri = uri.into();
|
||||
self
|
||||
}
|
||||
|
||||
pub async fn connect(
|
||||
ns_impl: &str,
|
||||
ns_properties: HashMap<String, String>,
|
||||
@@ -93,27 +81,6 @@ impl LanceNamespaceDatabase {
|
||||
read_consistency_interval: Option<std::time::Duration>,
|
||||
session: Option<Arc<lance::session::Session>>,
|
||||
pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
|
||||
) -> Result<Self> {
|
||||
Self::connect_with_new_table_config(
|
||||
ns_impl,
|
||||
ns_properties,
|
||||
storage_options,
|
||||
read_consistency_interval,
|
||||
session,
|
||||
pushdown_operations,
|
||||
NewTableConfig::default(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn connect_with_new_table_config(
|
||||
ns_impl: &str,
|
||||
ns_properties: HashMap<String, String>,
|
||||
storage_options: HashMap<String, String>,
|
||||
read_consistency_interval: Option<std::time::Duration>,
|
||||
session: Option<Arc<lance::session::Session>>,
|
||||
pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
|
||||
new_table_config: NewTableConfig,
|
||||
) -> Result<Self> {
|
||||
let mut builder = ConnectBuilder::new(ns_impl);
|
||||
for (key, value) in ns_properties.clone() {
|
||||
@@ -135,79 +102,8 @@ impl LanceNamespaceDatabase {
|
||||
pushdown_operations,
|
||||
ns_impl: ns_impl.to_string(),
|
||||
ns_properties,
|
||||
new_table_config,
|
||||
})
|
||||
}
|
||||
|
||||
fn extract_storage_overrides(
|
||||
&self,
|
||||
request: &DbCreateTableRequest,
|
||||
) -> Result<(
|
||||
Option<lance_encoding::version::LanceFileVersion>,
|
||||
Option<bool>,
|
||||
Option<bool>,
|
||||
)> {
|
||||
let storage_options = request
|
||||
.write_options
|
||||
.lance_write_params
|
||||
.as_ref()
|
||||
.and_then(|p| p.store_params.as_ref())
|
||||
.and_then(|sp| sp.storage_options());
|
||||
|
||||
let storage_version_override = storage_options
|
||||
.and_then(|opts| opts.get(OPT_NEW_TABLE_STORAGE_VERSION))
|
||||
.map(|s| s.parse::<lance_encoding::version::LanceFileVersion>())
|
||||
.transpose()?;
|
||||
|
||||
let v2_manifest_override = storage_options
|
||||
.and_then(|opts| opts.get(OPT_NEW_TABLE_V2_MANIFEST_PATHS))
|
||||
.map(|s| s.parse::<bool>())
|
||||
.transpose()
|
||||
.map_err(|_| Error::InvalidInput {
|
||||
message: "enable_v2_manifest_paths must be a boolean".to_string(),
|
||||
})?;
|
||||
|
||||
let stable_row_ids_override = storage_options
|
||||
.and_then(|opts| opts.get(OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS))
|
||||
.map(|s| s.parse::<bool>())
|
||||
.transpose()
|
||||
.map_err(|_| Error::InvalidInput {
|
||||
message: "enable_stable_row_ids must be a boolean".to_string(),
|
||||
})?;
|
||||
|
||||
Ok((
|
||||
storage_version_override,
|
||||
v2_manifest_override,
|
||||
stable_row_ids_override,
|
||||
))
|
||||
}
|
||||
|
||||
fn apply_new_table_config(
|
||||
&self,
|
||||
params: &mut lance::dataset::WriteParams,
|
||||
request: &DbCreateTableRequest,
|
||||
) -> Result<()> {
|
||||
let (storage_version_override, v2_manifest_override, stable_row_ids_override) =
|
||||
self.extract_storage_overrides(request)?;
|
||||
|
||||
params.data_storage_version = storage_version_override
|
||||
.or(params.data_storage_version)
|
||||
.or(self.new_table_config.data_storage_version);
|
||||
|
||||
if let Some(enable_v2_manifest_paths) =
|
||||
v2_manifest_override.or(self.new_table_config.enable_v2_manifest_paths)
|
||||
{
|
||||
params.enable_v2_manifest_paths = enable_v2_manifest_paths;
|
||||
}
|
||||
|
||||
if let Some(enable_stable_row_ids) =
|
||||
stable_row_ids_override.or(self.new_table_config.enable_stable_row_ids)
|
||||
{
|
||||
params.enable_stable_row_ids = enable_stable_row_ids;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for LanceNamespaceDatabase {
|
||||
@@ -403,12 +299,7 @@ impl Database for LanceNamespaceDatabase {
|
||||
};
|
||||
|
||||
// Build write params with storage options and commit handler
|
||||
let mut params = request
|
||||
.write_options
|
||||
.lance_write_params
|
||||
.clone()
|
||||
.unwrap_or_default();
|
||||
self.apply_new_table_config(&mut params, &request)?;
|
||||
let mut params = request.write_options.lance_write_params.unwrap_or_default();
|
||||
|
||||
if matches!(request.mode, CreateTableMode::Overwrite) {
|
||||
params.mode = WriteMode::Overwrite;
|
||||
|
||||
Reference in New Issue
Block a user