From 25dfe2cfd408f26862b1fd723a1c954e86d82bc3 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Wed, 29 Apr 2026 09:22:06 -0700 Subject: [PATCH] feat: add manifest-enabled directory namespace mode (#3332) Adds manifest_enabled for local/native connections so directory namespace manifests can be the source of truth, including migration from directory listing and Azure credential vending feature wiring. Also exposes the option through Rust, Python, and Node bindings with focused validation. --- Cargo.lock | 4 + docs/src/js/interfaces/ConnectionOptions.md | 23 +++ nodejs/src/connection.rs | 6 + nodejs/src/lib.rs | 7 + python/python/lancedb/__init__.py | 34 +++- python/python/lancedb/_lancedb.pyi | 2 + python/python/lancedb/db.py | 10 +- python/src/connection.rs | 10 +- rust/lancedb/Cargo.toml | 7 +- rust/lancedb/src/connection.rs | 175 ++++++++++++++++++++ rust/lancedb/src/database/listing.rs | 152 ++++++++++++++++- rust/lancedb/src/database/namespace.rs | 111 ++++++++++++- 12 files changed, 524 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index db63697ab..315556ed8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4476,9 +4476,11 @@ dependencies = [ "arrow-schema", "async-trait", "axum", + "base64 0.22.1", "bytes", "chrono", "futures", + "hmac", "lance", "lance-core", "lance-index", @@ -4488,10 +4490,12 @@ dependencies = [ "lance-table", "log", "object_store", + "quick-xml 0.38.4", "rand 0.9.4", "reqwest", "serde", "serde_json", + "sha2", "snafu 0.9.0", "tokio", "tower", diff --git a/docs/src/js/interfaces/ConnectionOptions.md b/docs/src/js/interfaces/ConnectionOptions.md index d617e8a19..1ad0e127a 100644 --- a/docs/src/js/interfaces/ConnectionOptions.md +++ b/docs/src/js/interfaces/ConnectionOptions.md @@ -41,6 +41,29 @@ for testing purposes. *** +### manifestEnabled? + +```ts +optional manifestEnabled: boolean; +``` + +(For LanceDB OSS only): use directory namespace manifests as the source +of truth for table metadata. Existing directory-listed root tables are +migrated into the manifest on access. + +*** + +### namespaceClientProperties? + +```ts +optional namespaceClientProperties: Record; +``` + +(For LanceDB OSS only): extra properties for the backing namespace +client used by manifest-enabled native connections. + +*** + ### readConsistencyInterval? ```ts diff --git a/nodejs/src/connection.rs b/nodejs/src/connection.rs index 19b2a5440..09be9465f 100644 --- a/nodejs/src/connection.rs +++ b/nodejs/src/connection.rs @@ -67,6 +67,12 @@ impl Connection { builder = builder.storage_option(key, value); } } + if let Some(manifest_enabled) = options.manifest_enabled { + builder = builder.manifest_enabled(manifest_enabled); + } + if let Some(namespace_client_properties) = options.namespace_client_properties { + builder = builder.namespace_client_properties(namespace_client_properties); + } // Create client config, optionally with header provider let client_config = options.client_config.unwrap_or_default(); diff --git a/nodejs/src/lib.rs b/nodejs/src/lib.rs index 055a6a3d3..87bc97ce7 100644 --- a/nodejs/src/lib.rs +++ b/nodejs/src/lib.rs @@ -37,6 +37,13 @@ pub struct ConnectionOptions { /// /// The available options are described at https://docs.lancedb.com/storage/ pub storage_options: Option>, + /// (For LanceDB OSS only): use directory namespace manifests as the source + /// of truth for table metadata. Existing directory-listed root tables are + /// migrated into the manifest on access. + pub manifest_enabled: Option, + /// (For LanceDB OSS only): extra properties for the backing namespace + /// client used by manifest-enabled native connections. + pub namespace_client_properties: Option>, /// (For LanceDB OSS only): the session to use for this connection. Holds /// shared caches and other session-specific state. pub session: Option, diff --git a/python/python/lancedb/__init__.py b/python/python/lancedb/__init__.py index ebf292b05..9e8ee0dd8 100644 --- a/python/python/lancedb/__init__.py +++ b/python/python/lancedb/__init__.py @@ -73,6 +73,7 @@ def connect( client_config: Union[ClientConfig, Dict[str, Any], None] = None, storage_options: Optional[Dict[str, str]] = None, session: Optional[Session] = None, + manifest_enabled: bool = False, namespace_client_impl: Optional[str] = None, namespace_client_properties: Optional[Dict[str, str]] = None, namespace_client_pushdown_operations: Optional[List[str]] = None, @@ -111,6 +112,10 @@ def connect( storage_options: dict, optional Additional options for the storage backend. See available options at + manifest_enabled : bool, default False + When true for local/native connections, use directory namespace + manifests as the source of truth for table metadata. Existing + directory-listed root tables are migrated into the manifest on access. session: Session, optional (For LanceDB OSS only) A session to use for this connection. Sessions allow you to configure @@ -158,11 +163,11 @@ def connect( conn : DBConnection A connection to a LanceDB database. """ - if namespace_client_impl is not None or namespace_client_properties is not None: - if namespace_client_impl is None or namespace_client_properties is None: + if namespace_client_impl is not None: + if namespace_client_properties is None: raise ValueError( - "Both namespace_client_impl and " - "namespace_client_properties must be provided" + "namespace_client_properties must be provided when " + "namespace_client_impl is set" ) if kwargs: raise ValueError(f"Unknown keyword arguments: {kwargs}") @@ -175,6 +180,12 @@ def connect( namespace_client_pushdown_operations=namespace_client_pushdown_operations, ) + if namespace_client_properties is not None and not manifest_enabled: + raise ValueError( + "namespace_client_impl must be provided when using " + "namespace_client_properties unless manifest_enabled=True" + ) + if namespace_client_pushdown_operations is not None: raise ValueError( "namespace_client_pushdown_operations is only valid when " @@ -212,6 +223,8 @@ def connect( read_consistency_interval=read_consistency_interval, storage_options=storage_options, session=session, + manifest_enabled=manifest_enabled, + namespace_client_properties=namespace_client_properties, ) @@ -289,6 +302,8 @@ def deserialize_conn( parsed["uri"], read_consistency_interval=rci, storage_options=storage_options, + manifest_enabled=parsed.get("manifest_enabled", False), + namespace_client_properties=parsed.get("namespace_client_properties"), ) else: raise ValueError(f"Unknown connection_type: {connection_type}") @@ -304,6 +319,8 @@ async def connect_async( client_config: Optional[Union[ClientConfig, Dict[str, Any]]] = None, storage_options: Optional[Dict[str, str]] = None, session: Optional[Session] = None, + manifest_enabled: bool = False, + namespace_client_properties: Optional[Dict[str, str]] = None, ) -> AsyncConnection: """Connect to a LanceDB database. @@ -343,6 +360,13 @@ async def connect_async( cache sizes for index and metadata caches, which can significantly impact memory use and performance. They can also be re-used across multiple connections to share the same cache state. + manifest_enabled : bool, default False + When true for local/native connections, use directory namespace + manifests as the source of truth for table metadata. Existing + directory-listed root tables are migrated into the manifest on access. + namespace_client_properties : dict, optional + Additional directory namespace client properties to use with + ``manifest_enabled=True``. Examples -------- @@ -385,6 +409,8 @@ async def connect_async( client_config, storage_options, session, + manifest_enabled, + namespace_client_properties, ) ) diff --git a/python/python/lancedb/_lancedb.pyi b/python/python/lancedb/_lancedb.pyi index 76c08041b..2298a9473 100644 --- a/python/python/lancedb/_lancedb.pyi +++ b/python/python/lancedb/_lancedb.pyi @@ -242,6 +242,8 @@ async def connect( client_config: Optional[Union[ClientConfig, Dict[str, Any]]], storage_options: Optional[Dict[str, str]], session: Optional[Session], + manifest_enabled: bool = False, + namespace_client_properties: Optional[Dict[str, str]] = None, ) -> Connection: ... class RecordBatchStream: diff --git a/python/python/lancedb/db.py b/python/python/lancedb/db.py index b07d409eb..276116db7 100644 --- a/python/python/lancedb/db.py +++ b/python/python/lancedb/db.py @@ -590,8 +590,13 @@ class LanceDBConnection(DBConnection): read_consistency_interval: Optional[timedelta] = None, storage_options: Optional[Dict[str, str]] = None, session: Optional[Session] = None, + manifest_enabled: bool = False, + namespace_client_properties: Optional[Dict[str, str]] = None, _inner: Optional[LanceDbConnection] = None, ): + self.storage_options = storage_options + self._manifest_enabled = manifest_enabled + self._namespace_client_properties = namespace_client_properties if _inner is not None: self._conn = _inner self._cached_namespace_client = None @@ -633,6 +638,8 @@ class LanceDBConnection(DBConnection): None, storage_options, session, + manifest_enabled, + namespace_client_properties, ) # TODO: It would be nice if we didn't store self.storage_options but it is @@ -640,7 +647,6 @@ class LanceDBConnection(DBConnection): # work because some paths like LanceDBConnection.from_inner will lose the # storage_options. Also, this class really shouldn't be holding any state # beyond _conn. - self.storage_options = storage_options self._conn = AsyncConnection(LOOP.run(do_connect())) self._cached_namespace_client: Optional[LanceNamespace] = None @@ -677,6 +683,8 @@ class LanceDBConnection(DBConnection): "connection_type": "local", "uri": self.uri, "storage_options": self.storage_options, + "manifest_enabled": self._manifest_enabled, + "namespace_client_properties": self._namespace_client_properties, "read_consistency_interval_seconds": ( rci.total_seconds() if rci else None ), diff --git a/python/src/connection.rs b/python/src/connection.rs index 9c67f38c7..1b12c33ab 100644 --- a/python/src/connection.rs +++ b/python/src/connection.rs @@ -525,7 +525,7 @@ impl Connection { } #[pyfunction] -#[pyo3(signature = (uri, api_key=None, region=None, host_override=None, read_consistency_interval=None, client_config=None, storage_options=None, session=None))] +#[pyo3(signature = (uri, api_key=None, region=None, host_override=None, read_consistency_interval=None, client_config=None, storage_options=None, session=None, manifest_enabled=false, namespace_client_properties=None))] #[allow(clippy::too_many_arguments)] pub fn connect( py: Python<'_>, @@ -537,6 +537,8 @@ pub fn connect( client_config: Option, storage_options: Option>, session: Option, + manifest_enabled: bool, + namespace_client_properties: Option>, ) -> PyResult> { future_into_py(py, async move { let mut builder = lancedb::connect(&uri); @@ -556,6 +558,12 @@ pub fn connect( if let Some(storage_options) = storage_options { builder = builder.storage_options(storage_options); } + if manifest_enabled { + builder = builder.manifest_enabled(true); + } + if let Some(namespace_client_properties) = namespace_client_properties { + builder = builder.namespace_client_properties(namespace_client_properties); + } #[cfg(feature = "remote")] if let Some(client_config) = client_config { builder = builder.client_config(client_config.into()); diff --git a/rust/lancedb/Cargo.toml b/rust/lancedb/Cargo.toml index cf6f8c44d..b652ed1a5 100644 --- a/rust/lancedb/Cargo.toml +++ b/rust/lancedb/Cargo.toml @@ -111,7 +111,12 @@ default = [] aws = ["lance/aws", "lance-io/aws", "lance-namespace-impls/dir-aws"] oss = ["lance/oss", "lance-io/oss", "lance-namespace-impls/dir-oss"] gcs = ["lance/gcp", "lance-io/gcp", "lance-namespace-impls/dir-gcp"] -azure = ["lance/azure", "lance-io/azure", "lance-namespace-impls/dir-azure"] +azure = [ + "lance/azure", + "lance-io/azure", + "lance-namespace-impls/dir-azure", + "lance-namespace-impls/credential-vendor-azure", +] huggingface = [ "lance/huggingface", "lance-io/huggingface", diff --git a/rust/lancedb/src/connection.rs b/rust/lancedb/src/connection.rs index 9e0d3ea3f..8034c2a53 100644 --- a/rust/lancedb/src/connection.rs +++ b/rust/lancedb/src/connection.rs @@ -590,6 +590,15 @@ pub struct ConnectRequest { /// storage options. pub namespace_client_properties: HashMap, + /// Use directory namespace manifests as the source of truth for native + /// LanceDB table metadata. + /// + /// When enabled for a local/native connection, LanceDB returns a + /// namespace-backed database directly. Directory listing fallback remains + /// enabled for migration, and directory-listing-to-manifest migration is + /// forced on. + pub manifest_enabled: bool, + /// The interval at which to check for updates from other processes. /// /// If None, then consistency is not checked. For performance @@ -630,6 +639,7 @@ impl ConnectBuilder { read_consistency_interval: None, options: HashMap::new(), namespace_client_properties: HashMap::new(), + manifest_enabled: false, session: None, }, embedding_registry: None, @@ -791,6 +801,17 @@ impl ConnectBuilder { self } + /// Enable or disable manifest-backed directory namespace mode for local + /// native connections. + /// + /// When enabled, the connection uses the directory namespace database + /// directly for all table operations and forces + /// `dir_listing_to_manifest_migration_enabled=true`. + pub fn manifest_enabled(mut self, enabled: bool) -> Self { + self.request.manifest_enabled = enabled; + self + } + /// The interval at which to check for updates from other processes. This /// only affects LanceDB OSS. /// @@ -886,6 +907,16 @@ impl ConnectBuilder { pub async fn execute(self) -> Result { if self.request.uri.starts_with("db") { self.execute_remote() + } else if self.request.manifest_enabled { + let internal = Arc::new( + ListingDatabase::connect_manifest_enabled_namespace_database(&self.request).await?, + ); + Ok(Connection { + internal, + embedding_registry: self + .embedding_registry + .unwrap_or_else(|| Arc::new(MemoryRegistry::new())), + }) } else { let internal = Arc::new(ListingDatabase::connect_with_options(&self.request).await?); Ok(Connection { @@ -1132,6 +1163,9 @@ mod tests { use lance_testing::datagen::{BatchGenerator, IncrementingInt32}; use tempfile::tempdir; + use crate::database::listing::{ListingDatabaseOptions, OPT_NEW_TABLE_V2_MANIFEST_PATHS}; + use crate::database::namespace::LanceNamespaceDatabase; + use crate::table::NativeTable; use crate::test_utils::connection::new_test_connection; use super::*; @@ -1204,6 +1238,147 @@ mod tests { ); } + #[tokio::test] + async fn test_connect_with_manifest_enabled_uses_directory_namespace() { + let tmp_dir = tempdir().unwrap(); + let uri = tmp_dir.path().to_str().unwrap(); + + let db = connect(uri) + .manifest_enabled(true) + .storage_option("timeout", "30s") + .namespace_client_property("manifest_enabled", "false") + .namespace_client_property("dir_listing_to_manifest_migration_enabled", "false") + .execute() + .await + .unwrap(); + + assert!( + db.database() + .as_any() + .downcast_ref::() + .is_some() + ); + assert_eq!(db.uri(), uri); + + let (ns_impl, properties) = db.namespace_client_config().await.unwrap(); + assert_eq!(ns_impl, "dir"); + assert_eq!(properties.get("root"), Some(&uri.to_string())); + assert_eq!( + properties.get("manifest_enabled"), + Some(&"true".to_string()) + ); + assert_eq!( + properties.get("dir_listing_to_manifest_migration_enabled"), + Some(&"true".to_string()) + ); + assert_eq!(properties.get("storage.timeout"), Some(&"30s".to_string())); + } + + #[tokio::test] + async fn test_manifest_enabled_rejects_commit_engine_uri() { + let Err(err) = connect("s3+ddb://bucket/db?ddbTableName=manifest") + .manifest_enabled(true) + .execute() + .await + else { + panic!("expected manifest-enabled s3+ddb connection to fail"); + }; + assert!( + matches!(err, Error::NotSupported { message } if message.contains("commit engine URI schemes")) + ); + + let Err(err) = connect("s3://bucket/db?engine=ddb&ddbTableName=manifest") + .manifest_enabled(true) + .execute() + .await + else { + panic!("expected manifest-enabled engine query connection to fail"); + }; + assert!( + matches!(err, Error::NotSupported { message } if message.contains("commit engine")) + ); + } + + #[tokio::test] + async fn test_manifest_enabled_connection_migrates_root_listing_table() { + let tmp_dir = tempdir().unwrap(); + let uri = tmp_dir.path().to_str().unwrap(); + let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)])); + + connect(uri) + .execute() + .await + .unwrap() + .create_empty_table("legacy", schema) + .execute() + .await + .unwrap(); + + let db = connect(uri).manifest_enabled(true).execute().await.unwrap(); + let tables = db.table_names().execute().await.unwrap(); + assert_eq!(tables, vec!["legacy".to_string()]); + db.open_table("legacy").execute().await.unwrap(); + } + + #[tokio::test] + async fn test_manifest_enabled_preserves_new_table_options() { + let tmp_dir = tempdir().unwrap(); + let uri = tmp_dir.path().to_str().unwrap(); + let options = ListingDatabaseOptions::builder() + .enable_v2_manifest_paths(true) + .build(); + let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)])); + + let table = connect(uri) + .manifest_enabled(true) + .database_options(&options) + .execute() + .await + .unwrap() + .create_empty_table("v1_manifest", schema) + .storage_option(OPT_NEW_TABLE_V2_MANIFEST_PATHS, "false") + .execute() + .await + .unwrap(); + + let native_table = table + .base_table() + .as_any() + .downcast_ref::() + .unwrap(); + assert!(!native_table.uses_v2_manifest_paths().await.unwrap()); + } + + #[tokio::test] + async fn test_manifest_enabled_vend_input_storage_options() { + let tmp_dir = tempdir().unwrap(); + let uri = tmp_dir.path().to_str().unwrap(); + let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)])); + + let table = connect(uri) + .manifest_enabled(true) + .storage_option("test_storage_option", "test_value") + .namespace_client_property("vend_input_storage_options", "true") + .namespace_client_property( + "vend_input_storage_options_refresh_interval_millis", + "60000", + ) + .execute() + .await + .unwrap() + .create_empty_table("vended", schema) + .execute() + .await + .unwrap(); + + let storage_options = table.latest_storage_options().await.unwrap().unwrap(); + assert_eq!( + storage_options.get("test_storage_option"), + Some(&"test_value".to_string()) + ); + assert!(storage_options.contains_key("expires_at_millis")); + } + #[tokio::test] async fn test_table_names() { let tc = new_test_connection().await.unwrap(); diff --git a/rust/lancedb/src/database/listing.rs b/rust/lancedb/src/database/listing.rs index 02884bb63..73fad6eb9 100644 --- a/rust/lancedb/src/database/listing.rs +++ b/rust/lancedb/src/database/listing.rs @@ -285,7 +285,7 @@ const MIRRORED_STORE: &str = "mirroredStore"; /// A connection to LanceDB impl ListingDatabase { - fn build_namespace_client_properties( + pub(crate) fn build_namespace_client_properties( uri: &str, storage_options: &HashMap, namespace_client_properties: HashMap, @@ -298,6 +298,24 @@ impl ListingDatabase { properties } + pub(crate) fn build_manifest_enabled_namespace_client_properties( + uri: &str, + storage_options: &HashMap, + namespace_client_properties: HashMap, + ) -> HashMap { + let mut properties = Self::build_namespace_client_properties( + uri, + storage_options, + namespace_client_properties, + ); + properties.insert("manifest_enabled".to_string(), "true".to_string()); + properties.insert( + "dir_listing_to_manifest_migration_enabled".to_string(), + "true".to_string(), + ); + properties + } + async fn connect_namespace_database( uri: &str, storage_options: HashMap, @@ -323,6 +341,119 @@ impl ListingDatabase { )) } + async fn prepare_namespace_root( + uri: &str, + storage_options: &HashMap, + session: Arc, + ) -> Result { + match url::Url::parse(uri) { + Ok(url) if url.scheme().len() == 1 && cfg!(windows) => { + let (object_store, _) = ObjectStore::from_uri_and_params( + session.store_registry(), + uri, + &ObjectStoreParams::default(), + ) + .await?; + if object_store.is_local() { + Self::try_create_dir(uri).context(CreateDirSnafu { path: uri })?; + } + Ok(uri.to_string()) + } + Ok(mut url) => { + if url.scheme().contains('+') { + return Err(Error::NotSupported { + message: "commit engine URI schemes are not supported for manifest-enabled namespace connections".to_string(), + }); + } + + for (key, value) in url.query_pairs() { + if key == ENGINE { + return Err(Error::NotSupported { + message: format!( + "commit engine '{}' is not supported for manifest-enabled namespace connections", + value + ), + }); + } else if key == MIRRORED_STORE { + return Err(Error::NotSupported { + message: "mirrored store is not supported for manifest-enabled namespace connections" + .to_string(), + }); + } + } + + url.set_query(None); + let plain_uri = url.to_string(); + + let os_params = ObjectStoreParams { + storage_options_accessor: if storage_options.is_empty() { + None + } else { + Some(Arc::new(StorageOptionsAccessor::with_static_options( + storage_options.clone(), + ))) + }, + ..Default::default() + }; + let (object_store, _) = ObjectStore::from_uri_and_params( + session.store_registry(), + &plain_uri, + &os_params, + ) + .await?; + if object_store.is_local() { + Self::try_create_dir(&plain_uri).context(CreateDirSnafu { + path: plain_uri.clone(), + })?; + } + + Ok(plain_uri) + } + Err(_) => { + let (object_store, _) = ObjectStore::from_uri_and_params( + session.store_registry(), + uri, + &ObjectStoreParams::default(), + ) + .await?; + if object_store.is_local() { + Self::try_create_dir(uri).context(CreateDirSnafu { path: uri })?; + } + Ok(uri.to_string()) + } + } + } + + pub(crate) async fn connect_manifest_enabled_namespace_database( + request: &ConnectRequest, + ) -> Result { + let options = ListingDatabaseOptions::parse_from_map(&request.options)?; + let session = request + .session + .clone() + .unwrap_or_else(|| Arc::new(lance::session::Session::default())); + let namespace_root = + Self::prepare_namespace_root(&request.uri, &options.storage_options, session.clone()) + .await?; + let ns_properties = Self::build_manifest_enabled_namespace_client_properties( + &namespace_root, + &options.storage_options, + request.namespace_client_properties.clone(), + ); + + LanceNamespaceDatabase::connect_with_new_table_config( + "dir", + ns_properties, + options.storage_options, + request.read_consistency_interval, + Some(session), + HashSet::new(), + options.new_table_config, + ) + .await + .map(|db| db.with_uri(request.uri.clone())) + } + /// Connect to a listing database /// /// The URI should be a path to a directory where the tables are stored. @@ -690,15 +821,12 @@ impl ListingDatabase { store_params.storage_options_accessor = Some(Arc::new(accessor)); } - write_params.data_storage_version = self - .new_table_config - .data_storage_version - .or(storage_version_override); + write_params.data_storage_version = storage_version_override + .or(write_params.data_storage_version) + .or(self.new_table_config.data_storage_version); - if let Some(enable_v2_manifest_paths) = self - .new_table_config - .enable_v2_manifest_paths - .or(v2_manifest_override) + if let Some(enable_v2_manifest_paths) = + v2_manifest_override.or(self.new_table_config.enable_v2_manifest_paths) { write_params.enable_v2_manifest_paths = enable_v2_manifest_paths; } @@ -1158,6 +1286,7 @@ mod tests { client_config: Default::default(), options: Default::default(), namespace_client_properties: Default::default(), + manifest_enabled: false, read_consistency_interval: None, session: None, }; @@ -1292,6 +1421,7 @@ mod tests { client_config: Default::default(), options: options.clone(), namespace_client_properties: Default::default(), + manifest_enabled: false, read_consistency_interval: None, session: None, }; @@ -1827,6 +1957,7 @@ mod tests { client_config: Default::default(), options, namespace_client_properties: Default::default(), + manifest_enabled: false, read_consistency_interval: None, session: None, }; @@ -1933,6 +2064,7 @@ mod tests { client_config: Default::default(), options, namespace_client_properties: Default::default(), + manifest_enabled: false, read_consistency_interval: None, session: None, }; @@ -2005,6 +2137,7 @@ mod tests { client_config: Default::default(), options, namespace_client_properties: Default::default(), + manifest_enabled: false, read_consistency_interval: None, session: None, }; @@ -2202,6 +2335,7 @@ mod tests { client_config: Default::default(), options: Default::default(), namespace_client_properties, + manifest_enabled: false, read_consistency_interval: None, session: None, }; diff --git a/rust/lancedb/src/database/namespace.rs b/rust/lancedb/src/database/namespace.rs index 19dc1f174..de18f8db8 100644 --- a/rust/lancedb/src/database/namespace.rs +++ b/rust/lancedb/src/database/namespace.rs @@ -24,6 +24,10 @@ use lance_table::io::commit::external_manifest::ExternalManifestCommitHandler; use crate::connection::NamespaceClientPushdownOperation; use crate::database::ReadConsistency; +use crate::database::listing::{ + NewTableConfig, OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS, OPT_NEW_TABLE_STORAGE_VERSION, + OPT_NEW_TABLE_V2_MANIFEST_PATHS, +}; use crate::error::{Error, Result}; use crate::table::NativeTable; use lance::dataset::WriteMode; @@ -50,6 +54,8 @@ pub struct LanceNamespaceDatabase { ns_impl: String, // Namespace properties used to construct the namespace client ns_properties: HashMap, + // Options for tables created by this connection + new_table_config: NewTableConfig, } impl LanceNamespaceDatabase { @@ -71,9 +77,15 @@ impl LanceNamespaceDatabase { pushdown_operations: namespace_client_pushdown_operations, ns_impl: namespace_client_impl, ns_properties: namespace_client_properties, + new_table_config: NewTableConfig::default(), } } + pub(crate) fn with_uri(mut self, uri: impl Into) -> Self { + self.uri = uri.into(); + self + } + pub async fn connect( ns_impl: &str, ns_properties: HashMap, @@ -81,6 +93,27 @@ impl LanceNamespaceDatabase { read_consistency_interval: Option, session: Option>, pushdown_operations: HashSet, + ) -> Result { + Self::connect_with_new_table_config( + ns_impl, + ns_properties, + storage_options, + read_consistency_interval, + session, + pushdown_operations, + NewTableConfig::default(), + ) + .await + } + + pub(crate) async fn connect_with_new_table_config( + ns_impl: &str, + ns_properties: HashMap, + storage_options: HashMap, + read_consistency_interval: Option, + session: Option>, + pushdown_operations: HashSet, + new_table_config: NewTableConfig, ) -> Result { let mut builder = ConnectBuilder::new(ns_impl); for (key, value) in ns_properties.clone() { @@ -102,8 +135,79 @@ impl LanceNamespaceDatabase { pushdown_operations, ns_impl: ns_impl.to_string(), ns_properties, + new_table_config, }) } + + fn extract_storage_overrides( + &self, + request: &DbCreateTableRequest, + ) -> Result<( + Option, + Option, + Option, + )> { + let storage_options = request + .write_options + .lance_write_params + .as_ref() + .and_then(|p| p.store_params.as_ref()) + .and_then(|sp| sp.storage_options()); + + let storage_version_override = storage_options + .and_then(|opts| opts.get(OPT_NEW_TABLE_STORAGE_VERSION)) + .map(|s| s.parse::()) + .transpose()?; + + let v2_manifest_override = storage_options + .and_then(|opts| opts.get(OPT_NEW_TABLE_V2_MANIFEST_PATHS)) + .map(|s| s.parse::()) + .transpose() + .map_err(|_| Error::InvalidInput { + message: "enable_v2_manifest_paths must be a boolean".to_string(), + })?; + + let stable_row_ids_override = storage_options + .and_then(|opts| opts.get(OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS)) + .map(|s| s.parse::()) + .transpose() + .map_err(|_| Error::InvalidInput { + message: "enable_stable_row_ids must be a boolean".to_string(), + })?; + + Ok(( + storage_version_override, + v2_manifest_override, + stable_row_ids_override, + )) + } + + fn apply_new_table_config( + &self, + params: &mut lance::dataset::WriteParams, + request: &DbCreateTableRequest, + ) -> Result<()> { + let (storage_version_override, v2_manifest_override, stable_row_ids_override) = + self.extract_storage_overrides(request)?; + + params.data_storage_version = storage_version_override + .or(params.data_storage_version) + .or(self.new_table_config.data_storage_version); + + if let Some(enable_v2_manifest_paths) = + v2_manifest_override.or(self.new_table_config.enable_v2_manifest_paths) + { + params.enable_v2_manifest_paths = enable_v2_manifest_paths; + } + + if let Some(enable_stable_row_ids) = + stable_row_ids_override.or(self.new_table_config.enable_stable_row_ids) + { + params.enable_stable_row_ids = enable_stable_row_ids; + } + + Ok(()) + } } impl std::fmt::Debug for LanceNamespaceDatabase { @@ -299,7 +403,12 @@ impl Database for LanceNamespaceDatabase { }; // Build write params with storage options and commit handler - let mut params = request.write_options.lance_write_params.unwrap_or_default(); + let mut params = request + .write_options + .lance_write_params + .clone() + .unwrap_or_default(); + self.apply_new_table_config(&mut params, &request)?; if matches!(request.mode, CreateTableMode::Overwrite) { params.mode = WriteMode::Overwrite;