From cde0814bbc9c9e962c13697e2047eed91be41366 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Tue, 31 Mar 2026 23:09:15 -0500 Subject: [PATCH] feat(rust): add WAL host routing for merge_insert operations Add support for routing merge_insert requests to a separate WAL/ingest host. The WAL host is auto-derived as {db}.{region}.wal.lancedb.com or can be explicitly overridden via ConnectBuilder::wal_host_override(). Callers opt in per-operation with MergeInsertBuilder::use_wal(true). Co-Authored-By: Claude Opus 4.6 (1M context) --- rust/lancedb/src/connection.rs | 24 ++++++++++++++- rust/lancedb/src/remote/client.rs | 21 ++++++++++++- rust/lancedb/src/remote/db.rs | 29 ++++++++++++++++++ rust/lancedb/src/remote/table.rs | 51 ++++++++++++++++++++++++++++--- rust/lancedb/src/table/merge.rs | 14 +++++++++ 5 files changed, 132 insertions(+), 7 deletions(-) diff --git a/rust/lancedb/src/connection.rs b/rust/lancedb/src/connection.rs index 95d985aac..dc0d00919 100644 --- a/rust/lancedb/src/connection.rs +++ b/rust/lancedb/src/connection.rs @@ -30,7 +30,10 @@ use crate::error::{Error, Result}; #[cfg(feature = "remote")] use crate::remote::{ client::ClientConfig, - db::{OPT_REMOTE_API_KEY, OPT_REMOTE_HOST_OVERRIDE, OPT_REMOTE_REGION}, + db::{ + OPT_REMOTE_API_KEY, OPT_REMOTE_HOST_OVERRIDE, OPT_REMOTE_REGION, + OPT_REMOTE_WAL_HOST_OVERRIDE, + }, }; use lance::io::ObjectStoreParams; pub use lance_encoding::version::LanceFileVersion; @@ -667,6 +670,24 @@ impl ConnectBuilder { self } + /// Set the WAL host override for routing merge_insert requests + /// to a separate WAL/ingest service. + /// + /// This option is only used when connecting to LanceDB Cloud (db:// URIs) + /// and will be ignored for other URIs. + /// + /// # Arguments + /// + /// * `wal_host_override` - The WAL host override to use for the connection + #[cfg(feature = "remote")] + pub fn wal_host_override(mut self, wal_host_override: &str) -> Self { + self.request.options.insert( + OPT_REMOTE_WAL_HOST_OVERRIDE.to_string(), + wal_host_override.to_string(), + ); + self + } + /// Set the database specific options /// /// See [crate::database::listing::ListingDatabaseOptions] for the options available for @@ -820,6 +841,7 @@ impl ConnectBuilder { &api_key, ®ion, options.host_override, + options.wal_host_override, self.request.client_config, storage_options.into(), )?); diff --git a/rust/lancedb/src/remote/client.rs b/rust/lancedb/src/remote/client.rs index 41a5949ca..11a73c72f 100644 --- a/rust/lancedb/src/remote/client.rs +++ b/rust/lancedb/src/remote/client.rs @@ -190,6 +190,7 @@ pub struct RetryConfig { pub struct RestfulLanceDbClient { client: reqwest::Client, host: String, + wal_host: String, pub(crate) retry_config: ResolvedRetryConfig, pub(crate) sender: S, pub(crate) id_delimiter: String, @@ -200,6 +201,7 @@ impl std::fmt::Debug for RestfulLanceDbClient { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("RestfulLanceDbClient") .field("host", &self.host) + .field("wal_host", &self.wal_host) .field("retry_config", &self.retry_config) .field("sender", &self.sender) .field("id_delimiter", &self.id_delimiter) @@ -285,6 +287,7 @@ impl RestfulLanceDbClient { parsed_url: &ParsedDbUrl, region: &str, host_override: Option, + wal_host_override: Option, default_headers: HeaderMap, client_config: ClientConfig, ) -> Result { @@ -372,11 +375,16 @@ impl RestfulLanceDbClient { Some(host_override) => host_override, None => format!("https://{}.{}.api.lancedb.com", parsed_url.db_name, region), }; - debug!("Created client for host: {}", host); + let wal_host = match wal_host_override { + Some(wal_host_override) => wal_host_override, + None => format!("https://{}.{}.wal.lancedb.com", parsed_url.db_name, region), + }; + debug!("Created client for host: {}, wal_host: {}", host, wal_host); let retry_config = client_config.retry_config.clone().try_into()?; Ok(Self { client, host, + wal_host, retry_config, sender: Sender, id_delimiter: client_config @@ -489,6 +497,12 @@ impl RestfulLanceDbClient { self.add_id_delimiter_query_param(builder) } + pub fn post_wal(&self, uri: &str) -> RequestBuilder { + let full_uri = format!("{}{}", self.wal_host, uri); + let builder = self.client.post(full_uri); + self.add_id_delimiter_query_param(builder) + } + fn add_id_delimiter_query_param(&self, req: RequestBuilder) -> RequestBuilder { if self.id_delimiter != "$" { req.query(&[("delimiter", self.id_delimiter.clone())]) @@ -801,6 +815,7 @@ pub mod test_utils { RestfulLanceDbClient { client: reqwest::Client::new(), host: "http://localhost".to_string(), + wal_host: "http://localhost-wal".to_string(), retry_config: RetryConfig::default().try_into().unwrap(), sender: MockSender { f: Arc::new(wrapper), @@ -825,6 +840,7 @@ pub mod test_utils { RestfulLanceDbClient { client: reqwest::Client::new(), host: "http://localhost".to_string(), + wal_host: "http://localhost-wal".to_string(), retry_config: config.retry_config.try_into().unwrap(), sender: MockSender { f: Arc::new(wrapper), @@ -992,6 +1008,7 @@ mod tests { let client = RestfulLanceDbClient { client: reqwest::Client::new(), host: "https://example.com".to_string(), + wal_host: "https://example.com".to_string(), retry_config: RetryConfig::default().try_into().unwrap(), sender: Sender, id_delimiter: "+".to_string(), @@ -1027,6 +1044,7 @@ mod tests { let client = RestfulLanceDbClient { client: reqwest::Client::new(), host: "https://example.com".to_string(), + wal_host: "https://example.com".to_string(), retry_config: RetryConfig::default().try_into().unwrap(), sender: Sender, id_delimiter: "+".to_string(), @@ -1064,6 +1082,7 @@ mod tests { let client = RestfulLanceDbClient { client: reqwest::Client::new(), host: "https://example.com".to_string(), + wal_host: "https://example.com".to_string(), retry_config: RetryConfig::default().try_into().unwrap(), sender: Sender, id_delimiter: "+".to_string(), diff --git a/rust/lancedb/src/remote/db.rs b/rust/lancedb/src/remote/db.rs index 909de4312..7f88e56ab 100644 --- a/rust/lancedb/src/remote/db.rs +++ b/rust/lancedb/src/remote/db.rs @@ -78,6 +78,7 @@ pub const OPT_REMOTE_PREFIX: &str = "remote_database_"; pub const OPT_REMOTE_API_KEY: &str = "remote_database_api_key"; pub const OPT_REMOTE_REGION: &str = "remote_database_region"; pub const OPT_REMOTE_HOST_OVERRIDE: &str = "remote_database_host_override"; +pub const OPT_REMOTE_WAL_HOST_OVERRIDE: &str = "remote_database_wal_host_override"; // TODO: add support for configuring client config via key/value options #[derive(Clone, Debug, Default)] @@ -91,6 +92,11 @@ pub struct RemoteDatabaseOptions { /// This is required when connecting to LanceDB Enterprise and should be /// provided if using an on-premises LanceDB Enterprise instance. pub host_override: Option, + /// The WAL host override + /// + /// When set, merge_insert operations using WAL routing will be sent to + /// this host instead of the auto-derived WAL host. + pub wal_host_override: Option, /// Storage options configure the storage layer (e.g. S3, GCS, Azure, etc.) /// /// See available options at @@ -109,6 +115,7 @@ impl RemoteDatabaseOptions { let api_key = map.get(OPT_REMOTE_API_KEY).cloned(); let region = map.get(OPT_REMOTE_REGION).cloned(); let host_override = map.get(OPT_REMOTE_HOST_OVERRIDE).cloned(); + let wal_host_override = map.get(OPT_REMOTE_WAL_HOST_OVERRIDE).cloned(); let storage_options = map .iter() .filter(|(key, _)| !key.starts_with(OPT_REMOTE_PREFIX)) @@ -118,6 +125,7 @@ impl RemoteDatabaseOptions { api_key, region, host_override, + wal_host_override, storage_options, }) } @@ -137,6 +145,12 @@ impl DatabaseOptions for RemoteDatabaseOptions { if let Some(host_override) = &self.host_override { map.insert(OPT_REMOTE_HOST_OVERRIDE.to_string(), host_override.clone()); } + if let Some(wal_host_override) = &self.wal_host_override { + map.insert( + OPT_REMOTE_WAL_HOST_OVERRIDE.to_string(), + wal_host_override.clone(), + ); + } } } @@ -181,6 +195,19 @@ impl RemoteDatabaseOptionsBuilder { self.options.host_override = Some(host_override); self } + + /// Set the WAL host override + /// + /// When set, merge_insert operations using WAL routing will be sent to + /// this host instead of the auto-derived WAL host. + /// + /// # Arguments + /// + /// * `wal_host_override` - The WAL host override + pub fn wal_host_override(mut self, wal_host_override: String) -> Self { + self.options.wal_host_override = Some(wal_host_override); + self + } } #[derive(Debug)] @@ -200,6 +227,7 @@ impl RemoteDatabase { api_key: &str, region: &str, host_override: Option, + wal_host_override: Option, client_config: ClientConfig, options: RemoteOptions, ) -> Result { @@ -227,6 +255,7 @@ impl RemoteDatabase { &parsed, region, host_override, + wal_host_override, header_map, client_config.clone(), )?; diff --git a/rust/lancedb/src/remote/table.rs b/rust/lancedb/src/remote/table.rs index 13edae94f..6d094d71f 100644 --- a/rust/lancedb/src/remote/table.rs +++ b/rust/lancedb/src/remote/table.rs @@ -1369,13 +1369,17 @@ impl BaseTable for RemoteTable { self.check_mutable().await?; let timeout = params.timeout; + let use_wal = params.use_wal; let query = MergeInsertRequest::try_from(params)?; - let mut request = self - .client - .post(&format!("/v1/table/{}/merge_insert/", self.identifier)) - .query(&query) - .header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE); + let path = format!("/v1/table/{}/merge_insert/", self.identifier); + let mut request = if use_wal { + self.client.post_wal(&path) + } else { + self.client.post(&path) + } + .query(&query) + .header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE); if let Some(timeout) = timeout { // (If it doesn't fit into u64, it's not worth sending anyways.) @@ -2462,6 +2466,43 @@ mod tests { } } + #[tokio::test] + async fn test_merge_insert_use_wal() { + let batch = RecordBatch::try_new( + Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])), + vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], + ) + .unwrap(); + let data: Box = Box::new(RecordBatchIterator::new( + [Ok(batch.clone())], + batch.schema(), + )); + + let table = Table::new_with_handler("my_table", move |request| { + if request.url().path() == "/v1/table/my_table/merge_insert/" { + // Verify the request was sent to the WAL host + assert_eq!( + request.url().host_str().unwrap(), + "localhost-wal", + "merge_insert with use_wal should route to WAL host" + ); + + http::Response::builder() + .status(200) + .body(r#"{"version": 1, "num_deleted_rows": 0, "num_inserted_rows": 3, "num_updated_rows": 0}"#) + .unwrap() + } else { + panic!("Unexpected request path: {}", request.url().path()); + } + }); + + let mut builder = table.merge_insert(&["some_col"]); + builder.use_wal(true); + let result = builder.execute(data).await.unwrap(); + + assert_eq!(result.num_inserted_rows, 3); + } + #[tokio::test] async fn test_merge_insert_retries_on_409() { let batch = RecordBatch::try_new( diff --git a/rust/lancedb/src/table/merge.rs b/rust/lancedb/src/table/merge.rs index d8805acb8..293c64fed 100644 --- a/rust/lancedb/src/table/merge.rs +++ b/rust/lancedb/src/table/merge.rs @@ -55,6 +55,7 @@ pub struct MergeInsertBuilder { pub(crate) when_not_matched_by_source_delete_filt: Option, pub(crate) timeout: Option, pub(crate) use_index: bool, + pub(crate) use_wal: bool, } impl MergeInsertBuilder { @@ -69,6 +70,7 @@ impl MergeInsertBuilder { when_not_matched_by_source_delete_filt: None, timeout: None, use_index: true, + use_wal: false, } } @@ -148,6 +150,18 @@ impl MergeInsertBuilder { self } + /// Controls whether to route the merge insert operation through the WAL host. + /// + /// When set to `true`, the operation will be sent to the WAL host instead of + /// the main API host. The WAL host is auto-derived from the database connection + /// or can be explicitly set via [`crate::connection::ConnectBuilder::wal_host_override`]. + /// + /// Defaults to `false`. + pub fn use_wal(&mut self, use_wal: bool) -> &mut Self { + self.use_wal = use_wal; + self + } + /// Executes the merge insert operation /// /// Returns version and statistics about the merge operation including the number of rows