diff --git a/Cargo.lock b/Cargo.lock index db63697ab..315556ed8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4476,9 +4476,11 @@ dependencies = [ "arrow-schema", "async-trait", "axum", + "base64 0.22.1", "bytes", "chrono", "futures", + "hmac", "lance", "lance-core", "lance-index", @@ -4488,10 +4490,12 @@ dependencies = [ "lance-table", "log", "object_store", + "quick-xml 0.38.4", "rand 0.9.4", "reqwest", "serde", "serde_json", + "sha2", "snafu 0.9.0", "tokio", "tower", diff --git a/docs/src/js/interfaces/ConnectionOptions.md b/docs/src/js/interfaces/ConnectionOptions.md index d617e8a19..1ad0e127a 100644 --- a/docs/src/js/interfaces/ConnectionOptions.md +++ b/docs/src/js/interfaces/ConnectionOptions.md @@ -41,6 +41,29 @@ for testing purposes. *** +### manifestEnabled? + +```ts +optional manifestEnabled: boolean; +``` + +(For LanceDB OSS only): use directory namespace manifests as the source +of truth for table metadata. Existing directory-listed root tables are +migrated into the manifest on access. + +*** + +### namespaceClientProperties? + +```ts +optional namespaceClientProperties: Record; +``` + +(For LanceDB OSS only): extra properties for the backing namespace +client used by manifest-enabled native connections. + +*** + ### readConsistencyInterval? ```ts diff --git a/nodejs/src/connection.rs b/nodejs/src/connection.rs index 19b2a5440..09be9465f 100644 --- a/nodejs/src/connection.rs +++ b/nodejs/src/connection.rs @@ -67,6 +67,12 @@ impl Connection { builder = builder.storage_option(key, value); } } + if let Some(manifest_enabled) = options.manifest_enabled { + builder = builder.manifest_enabled(manifest_enabled); + } + if let Some(namespace_client_properties) = options.namespace_client_properties { + builder = builder.namespace_client_properties(namespace_client_properties); + } // Create client config, optionally with header provider let client_config = options.client_config.unwrap_or_default(); diff --git a/nodejs/src/lib.rs b/nodejs/src/lib.rs index 055a6a3d3..87bc97ce7 100644 --- a/nodejs/src/lib.rs +++ b/nodejs/src/lib.rs @@ -37,6 +37,13 @@ pub struct ConnectionOptions { /// /// The available options are described at https://docs.lancedb.com/storage/ pub storage_options: Option>, + /// (For LanceDB OSS only): use directory namespace manifests as the source + /// of truth for table metadata. Existing directory-listed root tables are + /// migrated into the manifest on access. + pub manifest_enabled: Option, + /// (For LanceDB OSS only): extra properties for the backing namespace + /// client used by manifest-enabled native connections. + pub namespace_client_properties: Option>, /// (For LanceDB OSS only): the session to use for this connection. Holds /// shared caches and other session-specific state. pub session: Option, diff --git a/python/python/lancedb/__init__.py b/python/python/lancedb/__init__.py index ebf292b05..9e8ee0dd8 100644 --- a/python/python/lancedb/__init__.py +++ b/python/python/lancedb/__init__.py @@ -73,6 +73,7 @@ def connect( client_config: Union[ClientConfig, Dict[str, Any], None] = None, storage_options: Optional[Dict[str, str]] = None, session: Optional[Session] = None, + manifest_enabled: bool = False, namespace_client_impl: Optional[str] = None, namespace_client_properties: Optional[Dict[str, str]] = None, namespace_client_pushdown_operations: Optional[List[str]] = None, @@ -111,6 +112,10 @@ def connect( storage_options: dict, optional Additional options for the storage backend. See available options at + manifest_enabled : bool, default False + When true for local/native connections, use directory namespace + manifests as the source of truth for table metadata. Existing + directory-listed root tables are migrated into the manifest on access. session: Session, optional (For LanceDB OSS only) A session to use for this connection. Sessions allow you to configure @@ -158,11 +163,11 @@ def connect( conn : DBConnection A connection to a LanceDB database. """ - if namespace_client_impl is not None or namespace_client_properties is not None: - if namespace_client_impl is None or namespace_client_properties is None: + if namespace_client_impl is not None: + if namespace_client_properties is None: raise ValueError( - "Both namespace_client_impl and " - "namespace_client_properties must be provided" + "namespace_client_properties must be provided when " + "namespace_client_impl is set" ) if kwargs: raise ValueError(f"Unknown keyword arguments: {kwargs}") @@ -175,6 +180,12 @@ def connect( namespace_client_pushdown_operations=namespace_client_pushdown_operations, ) + if namespace_client_properties is not None and not manifest_enabled: + raise ValueError( + "namespace_client_impl must be provided when using " + "namespace_client_properties unless manifest_enabled=True" + ) + if namespace_client_pushdown_operations is not None: raise ValueError( "namespace_client_pushdown_operations is only valid when " @@ -212,6 +223,8 @@ def connect( read_consistency_interval=read_consistency_interval, storage_options=storage_options, session=session, + manifest_enabled=manifest_enabled, + namespace_client_properties=namespace_client_properties, ) @@ -289,6 +302,8 @@ def deserialize_conn( parsed["uri"], read_consistency_interval=rci, storage_options=storage_options, + manifest_enabled=parsed.get("manifest_enabled", False), + namespace_client_properties=parsed.get("namespace_client_properties"), ) else: raise ValueError(f"Unknown connection_type: {connection_type}") @@ -304,6 +319,8 @@ async def connect_async( client_config: Optional[Union[ClientConfig, Dict[str, Any]]] = None, storage_options: Optional[Dict[str, str]] = None, session: Optional[Session] = None, + manifest_enabled: bool = False, + namespace_client_properties: Optional[Dict[str, str]] = None, ) -> AsyncConnection: """Connect to a LanceDB database. @@ -343,6 +360,13 @@ async def connect_async( cache sizes for index and metadata caches, which can significantly impact memory use and performance. They can also be re-used across multiple connections to share the same cache state. + manifest_enabled : bool, default False + When true for local/native connections, use directory namespace + manifests as the source of truth for table metadata. Existing + directory-listed root tables are migrated into the manifest on access. + namespace_client_properties : dict, optional + Additional directory namespace client properties to use with + ``manifest_enabled=True``. Examples -------- @@ -385,6 +409,8 @@ async def connect_async( client_config, storage_options, session, + manifest_enabled, + namespace_client_properties, ) ) diff --git a/python/python/lancedb/_lancedb.pyi b/python/python/lancedb/_lancedb.pyi index 76c08041b..2298a9473 100644 --- a/python/python/lancedb/_lancedb.pyi +++ b/python/python/lancedb/_lancedb.pyi @@ -242,6 +242,8 @@ async def connect( client_config: Optional[Union[ClientConfig, Dict[str, Any]]], storage_options: Optional[Dict[str, str]], session: Optional[Session], + manifest_enabled: bool = False, + namespace_client_properties: Optional[Dict[str, str]] = None, ) -> Connection: ... class RecordBatchStream: diff --git a/python/python/lancedb/db.py b/python/python/lancedb/db.py index b07d409eb..276116db7 100644 --- a/python/python/lancedb/db.py +++ b/python/python/lancedb/db.py @@ -590,8 +590,13 @@ class LanceDBConnection(DBConnection): read_consistency_interval: Optional[timedelta] = None, storage_options: Optional[Dict[str, str]] = None, session: Optional[Session] = None, + manifest_enabled: bool = False, + namespace_client_properties: Optional[Dict[str, str]] = None, _inner: Optional[LanceDbConnection] = None, ): + self.storage_options = storage_options + self._manifest_enabled = manifest_enabled + self._namespace_client_properties = namespace_client_properties if _inner is not None: self._conn = _inner self._cached_namespace_client = None @@ -633,6 +638,8 @@ class LanceDBConnection(DBConnection): None, storage_options, session, + manifest_enabled, + namespace_client_properties, ) # TODO: It would be nice if we didn't store self.storage_options but it is @@ -640,7 +647,6 @@ class LanceDBConnection(DBConnection): # work because some paths like LanceDBConnection.from_inner will lose the # storage_options. Also, this class really shouldn't be holding any state # beyond _conn. - self.storage_options = storage_options self._conn = AsyncConnection(LOOP.run(do_connect())) self._cached_namespace_client: Optional[LanceNamespace] = None @@ -677,6 +683,8 @@ class LanceDBConnection(DBConnection): "connection_type": "local", "uri": self.uri, "storage_options": self.storage_options, + "manifest_enabled": self._manifest_enabled, + "namespace_client_properties": self._namespace_client_properties, "read_consistency_interval_seconds": ( rci.total_seconds() if rci else None ), diff --git a/python/src/connection.rs b/python/src/connection.rs index 9c67f38c7..1b12c33ab 100644 --- a/python/src/connection.rs +++ b/python/src/connection.rs @@ -525,7 +525,7 @@ impl Connection { } #[pyfunction] -#[pyo3(signature = (uri, api_key=None, region=None, host_override=None, read_consistency_interval=None, client_config=None, storage_options=None, session=None))] +#[pyo3(signature = (uri, api_key=None, region=None, host_override=None, read_consistency_interval=None, client_config=None, storage_options=None, session=None, manifest_enabled=false, namespace_client_properties=None))] #[allow(clippy::too_many_arguments)] pub fn connect( py: Python<'_>, @@ -537,6 +537,8 @@ pub fn connect( client_config: Option, storage_options: Option>, session: Option, + manifest_enabled: bool, + namespace_client_properties: Option>, ) -> PyResult> { future_into_py(py, async move { let mut builder = lancedb::connect(&uri); @@ -556,6 +558,12 @@ pub fn connect( if let Some(storage_options) = storage_options { builder = builder.storage_options(storage_options); } + if manifest_enabled { + builder = builder.manifest_enabled(true); + } + if let Some(namespace_client_properties) = namespace_client_properties { + builder = builder.namespace_client_properties(namespace_client_properties); + } #[cfg(feature = "remote")] if let Some(client_config) = client_config { builder = builder.client_config(client_config.into()); diff --git a/rust/lancedb/Cargo.toml b/rust/lancedb/Cargo.toml index cf6f8c44d..b652ed1a5 100644 --- a/rust/lancedb/Cargo.toml +++ b/rust/lancedb/Cargo.toml @@ -111,7 +111,12 @@ default = [] aws = ["lance/aws", "lance-io/aws", "lance-namespace-impls/dir-aws"] oss = ["lance/oss", "lance-io/oss", "lance-namespace-impls/dir-oss"] gcs = ["lance/gcp", "lance-io/gcp", "lance-namespace-impls/dir-gcp"] -azure = ["lance/azure", "lance-io/azure", "lance-namespace-impls/dir-azure"] +azure = [ + "lance/azure", + "lance-io/azure", + "lance-namespace-impls/dir-azure", + "lance-namespace-impls/credential-vendor-azure", +] huggingface = [ "lance/huggingface", "lance-io/huggingface", diff --git a/rust/lancedb/src/connection.rs b/rust/lancedb/src/connection.rs index 9e0d3ea3f..8034c2a53 100644 --- a/rust/lancedb/src/connection.rs +++ b/rust/lancedb/src/connection.rs @@ -590,6 +590,15 @@ pub struct ConnectRequest { /// storage options. pub namespace_client_properties: HashMap, + /// Use directory namespace manifests as the source of truth for native + /// LanceDB table metadata. + /// + /// When enabled for a local/native connection, LanceDB returns a + /// namespace-backed database directly. Directory listing fallback remains + /// enabled for migration, and directory-listing-to-manifest migration is + /// forced on. + pub manifest_enabled: bool, + /// The interval at which to check for updates from other processes. /// /// If None, then consistency is not checked. For performance @@ -630,6 +639,7 @@ impl ConnectBuilder { read_consistency_interval: None, options: HashMap::new(), namespace_client_properties: HashMap::new(), + manifest_enabled: false, session: None, }, embedding_registry: None, @@ -791,6 +801,17 @@ impl ConnectBuilder { self } + /// Enable or disable manifest-backed directory namespace mode for local + /// native connections. + /// + /// When enabled, the connection uses the directory namespace database + /// directly for all table operations and forces + /// `dir_listing_to_manifest_migration_enabled=true`. + pub fn manifest_enabled(mut self, enabled: bool) -> Self { + self.request.manifest_enabled = enabled; + self + } + /// The interval at which to check for updates from other processes. This /// only affects LanceDB OSS. /// @@ -886,6 +907,16 @@ impl ConnectBuilder { pub async fn execute(self) -> Result { if self.request.uri.starts_with("db") { self.execute_remote() + } else if self.request.manifest_enabled { + let internal = Arc::new( + ListingDatabase::connect_manifest_enabled_namespace_database(&self.request).await?, + ); + Ok(Connection { + internal, + embedding_registry: self + .embedding_registry + .unwrap_or_else(|| Arc::new(MemoryRegistry::new())), + }) } else { let internal = Arc::new(ListingDatabase::connect_with_options(&self.request).await?); Ok(Connection { @@ -1132,6 +1163,9 @@ mod tests { use lance_testing::datagen::{BatchGenerator, IncrementingInt32}; use tempfile::tempdir; + use crate::database::listing::{ListingDatabaseOptions, OPT_NEW_TABLE_V2_MANIFEST_PATHS}; + use crate::database::namespace::LanceNamespaceDatabase; + use crate::table::NativeTable; use crate::test_utils::connection::new_test_connection; use super::*; @@ -1204,6 +1238,147 @@ mod tests { ); } + #[tokio::test] + async fn test_connect_with_manifest_enabled_uses_directory_namespace() { + let tmp_dir = tempdir().unwrap(); + let uri = tmp_dir.path().to_str().unwrap(); + + let db = connect(uri) + .manifest_enabled(true) + .storage_option("timeout", "30s") + .namespace_client_property("manifest_enabled", "false") + .namespace_client_property("dir_listing_to_manifest_migration_enabled", "false") + .execute() + .await + .unwrap(); + + assert!( + db.database() + .as_any() + .downcast_ref::() + .is_some() + ); + assert_eq!(db.uri(), uri); + + let (ns_impl, properties) = db.namespace_client_config().await.unwrap(); + assert_eq!(ns_impl, "dir"); + assert_eq!(properties.get("root"), Some(&uri.to_string())); + assert_eq!( + properties.get("manifest_enabled"), + Some(&"true".to_string()) + ); + assert_eq!( + properties.get("dir_listing_to_manifest_migration_enabled"), + Some(&"true".to_string()) + ); + assert_eq!(properties.get("storage.timeout"), Some(&"30s".to_string())); + } + + #[tokio::test] + async fn test_manifest_enabled_rejects_commit_engine_uri() { + let Err(err) = connect("s3+ddb://bucket/db?ddbTableName=manifest") + .manifest_enabled(true) + .execute() + .await + else { + panic!("expected manifest-enabled s3+ddb connection to fail"); + }; + assert!( + matches!(err, Error::NotSupported { message } if message.contains("commit engine URI schemes")) + ); + + let Err(err) = connect("s3://bucket/db?engine=ddb&ddbTableName=manifest") + .manifest_enabled(true) + .execute() + .await + else { + panic!("expected manifest-enabled engine query connection to fail"); + }; + assert!( + matches!(err, Error::NotSupported { message } if message.contains("commit engine")) + ); + } + + #[tokio::test] + async fn test_manifest_enabled_connection_migrates_root_listing_table() { + let tmp_dir = tempdir().unwrap(); + let uri = tmp_dir.path().to_str().unwrap(); + let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)])); + + connect(uri) + .execute() + .await + .unwrap() + .create_empty_table("legacy", schema) + .execute() + .await + .unwrap(); + + let db = connect(uri).manifest_enabled(true).execute().await.unwrap(); + let tables = db.table_names().execute().await.unwrap(); + assert_eq!(tables, vec!["legacy".to_string()]); + db.open_table("legacy").execute().await.unwrap(); + } + + #[tokio::test] + async fn test_manifest_enabled_preserves_new_table_options() { + let tmp_dir = tempdir().unwrap(); + let uri = tmp_dir.path().to_str().unwrap(); + let options = ListingDatabaseOptions::builder() + .enable_v2_manifest_paths(true) + .build(); + let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)])); + + let table = connect(uri) + .manifest_enabled(true) + .database_options(&options) + .execute() + .await + .unwrap() + .create_empty_table("v1_manifest", schema) + .storage_option(OPT_NEW_TABLE_V2_MANIFEST_PATHS, "false") + .execute() + .await + .unwrap(); + + let native_table = table + .base_table() + .as_any() + .downcast_ref::() + .unwrap(); + assert!(!native_table.uses_v2_manifest_paths().await.unwrap()); + } + + #[tokio::test] + async fn test_manifest_enabled_vend_input_storage_options() { + let tmp_dir = tempdir().unwrap(); + let uri = tmp_dir.path().to_str().unwrap(); + let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)])); + + let table = connect(uri) + .manifest_enabled(true) + .storage_option("test_storage_option", "test_value") + .namespace_client_property("vend_input_storage_options", "true") + .namespace_client_property( + "vend_input_storage_options_refresh_interval_millis", + "60000", + ) + .execute() + .await + .unwrap() + .create_empty_table("vended", schema) + .execute() + .await + .unwrap(); + + let storage_options = table.latest_storage_options().await.unwrap().unwrap(); + assert_eq!( + storage_options.get("test_storage_option"), + Some(&"test_value".to_string()) + ); + assert!(storage_options.contains_key("expires_at_millis")); + } + #[tokio::test] async fn test_table_names() { let tc = new_test_connection().await.unwrap(); diff --git a/rust/lancedb/src/database/listing.rs b/rust/lancedb/src/database/listing.rs index 02884bb63..73fad6eb9 100644 --- a/rust/lancedb/src/database/listing.rs +++ b/rust/lancedb/src/database/listing.rs @@ -285,7 +285,7 @@ const MIRRORED_STORE: &str = "mirroredStore"; /// A connection to LanceDB impl ListingDatabase { - fn build_namespace_client_properties( + pub(crate) fn build_namespace_client_properties( uri: &str, storage_options: &HashMap, namespace_client_properties: HashMap, @@ -298,6 +298,24 @@ impl ListingDatabase { properties } + pub(crate) fn build_manifest_enabled_namespace_client_properties( + uri: &str, + storage_options: &HashMap, + namespace_client_properties: HashMap, + ) -> HashMap { + let mut properties = Self::build_namespace_client_properties( + uri, + storage_options, + namespace_client_properties, + ); + properties.insert("manifest_enabled".to_string(), "true".to_string()); + properties.insert( + "dir_listing_to_manifest_migration_enabled".to_string(), + "true".to_string(), + ); + properties + } + async fn connect_namespace_database( uri: &str, storage_options: HashMap, @@ -323,6 +341,119 @@ impl ListingDatabase { )) } + async fn prepare_namespace_root( + uri: &str, + storage_options: &HashMap, + session: Arc, + ) -> Result { + match url::Url::parse(uri) { + Ok(url) if url.scheme().len() == 1 && cfg!(windows) => { + let (object_store, _) = ObjectStore::from_uri_and_params( + session.store_registry(), + uri, + &ObjectStoreParams::default(), + ) + .await?; + if object_store.is_local() { + Self::try_create_dir(uri).context(CreateDirSnafu { path: uri })?; + } + Ok(uri.to_string()) + } + Ok(mut url) => { + if url.scheme().contains('+') { + return Err(Error::NotSupported { + message: "commit engine URI schemes are not supported for manifest-enabled namespace connections".to_string(), + }); + } + + for (key, value) in url.query_pairs() { + if key == ENGINE { + return Err(Error::NotSupported { + message: format!( + "commit engine '{}' is not supported for manifest-enabled namespace connections", + value + ), + }); + } else if key == MIRRORED_STORE { + return Err(Error::NotSupported { + message: "mirrored store is not supported for manifest-enabled namespace connections" + .to_string(), + }); + } + } + + url.set_query(None); + let plain_uri = url.to_string(); + + let os_params = ObjectStoreParams { + storage_options_accessor: if storage_options.is_empty() { + None + } else { + Some(Arc::new(StorageOptionsAccessor::with_static_options( + storage_options.clone(), + ))) + }, + ..Default::default() + }; + let (object_store, _) = ObjectStore::from_uri_and_params( + session.store_registry(), + &plain_uri, + &os_params, + ) + .await?; + if object_store.is_local() { + Self::try_create_dir(&plain_uri).context(CreateDirSnafu { + path: plain_uri.clone(), + })?; + } + + Ok(plain_uri) + } + Err(_) => { + let (object_store, _) = ObjectStore::from_uri_and_params( + session.store_registry(), + uri, + &ObjectStoreParams::default(), + ) + .await?; + if object_store.is_local() { + Self::try_create_dir(uri).context(CreateDirSnafu { path: uri })?; + } + Ok(uri.to_string()) + } + } + } + + pub(crate) async fn connect_manifest_enabled_namespace_database( + request: &ConnectRequest, + ) -> Result { + let options = ListingDatabaseOptions::parse_from_map(&request.options)?; + let session = request + .session + .clone() + .unwrap_or_else(|| Arc::new(lance::session::Session::default())); + let namespace_root = + Self::prepare_namespace_root(&request.uri, &options.storage_options, session.clone()) + .await?; + let ns_properties = Self::build_manifest_enabled_namespace_client_properties( + &namespace_root, + &options.storage_options, + request.namespace_client_properties.clone(), + ); + + LanceNamespaceDatabase::connect_with_new_table_config( + "dir", + ns_properties, + options.storage_options, + request.read_consistency_interval, + Some(session), + HashSet::new(), + options.new_table_config, + ) + .await + .map(|db| db.with_uri(request.uri.clone())) + } + /// Connect to a listing database /// /// The URI should be a path to a directory where the tables are stored. @@ -690,15 +821,12 @@ impl ListingDatabase { store_params.storage_options_accessor = Some(Arc::new(accessor)); } - write_params.data_storage_version = self - .new_table_config - .data_storage_version - .or(storage_version_override); + write_params.data_storage_version = storage_version_override + .or(write_params.data_storage_version) + .or(self.new_table_config.data_storage_version); - if let Some(enable_v2_manifest_paths) = self - .new_table_config - .enable_v2_manifest_paths - .or(v2_manifest_override) + if let Some(enable_v2_manifest_paths) = + v2_manifest_override.or(self.new_table_config.enable_v2_manifest_paths) { write_params.enable_v2_manifest_paths = enable_v2_manifest_paths; } @@ -1158,6 +1286,7 @@ mod tests { client_config: Default::default(), options: Default::default(), namespace_client_properties: Default::default(), + manifest_enabled: false, read_consistency_interval: None, session: None, }; @@ -1292,6 +1421,7 @@ mod tests { client_config: Default::default(), options: options.clone(), namespace_client_properties: Default::default(), + manifest_enabled: false, read_consistency_interval: None, session: None, }; @@ -1827,6 +1957,7 @@ mod tests { client_config: Default::default(), options, namespace_client_properties: Default::default(), + manifest_enabled: false, read_consistency_interval: None, session: None, }; @@ -1933,6 +2064,7 @@ mod tests { client_config: Default::default(), options, namespace_client_properties: Default::default(), + manifest_enabled: false, read_consistency_interval: None, session: None, }; @@ -2005,6 +2137,7 @@ mod tests { client_config: Default::default(), options, namespace_client_properties: Default::default(), + manifest_enabled: false, read_consistency_interval: None, session: None, }; @@ -2202,6 +2335,7 @@ mod tests { client_config: Default::default(), options: Default::default(), namespace_client_properties, + manifest_enabled: false, read_consistency_interval: None, session: None, }; diff --git a/rust/lancedb/src/database/namespace.rs b/rust/lancedb/src/database/namespace.rs index 19dc1f174..de18f8db8 100644 --- a/rust/lancedb/src/database/namespace.rs +++ b/rust/lancedb/src/database/namespace.rs @@ -24,6 +24,10 @@ use lance_table::io::commit::external_manifest::ExternalManifestCommitHandler; use crate::connection::NamespaceClientPushdownOperation; use crate::database::ReadConsistency; +use crate::database::listing::{ + NewTableConfig, OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS, OPT_NEW_TABLE_STORAGE_VERSION, + OPT_NEW_TABLE_V2_MANIFEST_PATHS, +}; use crate::error::{Error, Result}; use crate::table::NativeTable; use lance::dataset::WriteMode; @@ -50,6 +54,8 @@ pub struct LanceNamespaceDatabase { ns_impl: String, // Namespace properties used to construct the namespace client ns_properties: HashMap, + // Options for tables created by this connection + new_table_config: NewTableConfig, } impl LanceNamespaceDatabase { @@ -71,9 +77,15 @@ impl LanceNamespaceDatabase { pushdown_operations: namespace_client_pushdown_operations, ns_impl: namespace_client_impl, ns_properties: namespace_client_properties, + new_table_config: NewTableConfig::default(), } } + pub(crate) fn with_uri(mut self, uri: impl Into) -> Self { + self.uri = uri.into(); + self + } + pub async fn connect( ns_impl: &str, ns_properties: HashMap, @@ -81,6 +93,27 @@ impl LanceNamespaceDatabase { read_consistency_interval: Option, session: Option>, pushdown_operations: HashSet, + ) -> Result { + Self::connect_with_new_table_config( + ns_impl, + ns_properties, + storage_options, + read_consistency_interval, + session, + pushdown_operations, + NewTableConfig::default(), + ) + .await + } + + pub(crate) async fn connect_with_new_table_config( + ns_impl: &str, + ns_properties: HashMap, + storage_options: HashMap, + read_consistency_interval: Option, + session: Option>, + pushdown_operations: HashSet, + new_table_config: NewTableConfig, ) -> Result { let mut builder = ConnectBuilder::new(ns_impl); for (key, value) in ns_properties.clone() { @@ -102,8 +135,79 @@ impl LanceNamespaceDatabase { pushdown_operations, ns_impl: ns_impl.to_string(), ns_properties, + new_table_config, }) } + + fn extract_storage_overrides( + &self, + request: &DbCreateTableRequest, + ) -> Result<( + Option, + Option, + Option, + )> { + let storage_options = request + .write_options + .lance_write_params + .as_ref() + .and_then(|p| p.store_params.as_ref()) + .and_then(|sp| sp.storage_options()); + + let storage_version_override = storage_options + .and_then(|opts| opts.get(OPT_NEW_TABLE_STORAGE_VERSION)) + .map(|s| s.parse::()) + .transpose()?; + + let v2_manifest_override = storage_options + .and_then(|opts| opts.get(OPT_NEW_TABLE_V2_MANIFEST_PATHS)) + .map(|s| s.parse::()) + .transpose() + .map_err(|_| Error::InvalidInput { + message: "enable_v2_manifest_paths must be a boolean".to_string(), + })?; + + let stable_row_ids_override = storage_options + .and_then(|opts| opts.get(OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS)) + .map(|s| s.parse::()) + .transpose() + .map_err(|_| Error::InvalidInput { + message: "enable_stable_row_ids must be a boolean".to_string(), + })?; + + Ok(( + storage_version_override, + v2_manifest_override, + stable_row_ids_override, + )) + } + + fn apply_new_table_config( + &self, + params: &mut lance::dataset::WriteParams, + request: &DbCreateTableRequest, + ) -> Result<()> { + let (storage_version_override, v2_manifest_override, stable_row_ids_override) = + self.extract_storage_overrides(request)?; + + params.data_storage_version = storage_version_override + .or(params.data_storage_version) + .or(self.new_table_config.data_storage_version); + + if let Some(enable_v2_manifest_paths) = + v2_manifest_override.or(self.new_table_config.enable_v2_manifest_paths) + { + params.enable_v2_manifest_paths = enable_v2_manifest_paths; + } + + if let Some(enable_stable_row_ids) = + stable_row_ids_override.or(self.new_table_config.enable_stable_row_ids) + { + params.enable_stable_row_ids = enable_stable_row_ids; + } + + Ok(()) + } } impl std::fmt::Debug for LanceNamespaceDatabase { @@ -299,7 +403,12 @@ impl Database for LanceNamespaceDatabase { }; // Build write params with storage options and commit handler - let mut params = request.write_options.lance_write_params.unwrap_or_default(); + let mut params = request + .write_options + .lance_write_params + .clone() + .unwrap_or_default(); + self.apply_new_table_config(&mut params, &request)?; if matches!(request.mode, CreateTableMode::Overwrite) { params.mode = WriteMode::Overwrite;