Compare commits

...

5 Commits

Author SHA1 Message Date
Daniel Rammer
4446a2972c refactor: replace WAL host routing with x-use-wal header
Route all traffic through a single host and use an x-use-wal: true
header to signal WAL routing, letting the Pingora router handle the
split instead of maintaining a separate WAL host endpoint.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 15:52:16 -05:00
Daniel Rammer
b0d3fadfc0 Merge branch 'main' into feature/wal 2026-04-03 09:54:15 -05:00
Daniel Rammer
fd9dd390fc update to lance 5.0.0-beta.2 2026-04-01 23:51:14 -05:00
Daniel Rammer
931f19b737 Merge branch 'main' into feature/wal 2026-03-31 23:10:49 -05:00
Daniel Rammer
cde0814bbc feat(rust): add WAL host routing for merge_insert operations
Add support for routing merge_insert requests to a separate WAL/ingest
host. The WAL host is auto-derived as {db}.{region}.wal.lancedb.com or
can be explicitly overridden via ConnectBuilder::wal_host_override().
Callers opt in per-operation with MergeInsertBuilder::use_wal(true).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 23:09:15 -05:00
5 changed files with 75 additions and 5 deletions

View File

@@ -666,6 +666,11 @@ impl ConnectBuilder {
self
}
/// Set the WAL host override for routing merge_insert requests
/// to a separate WAL/ingest service.
///
/// This option is only used when connecting to LanceDB Cloud (db:// URIs)
/// and will be ignored for other URIs.
/// Set the database specific options
///
/// See [crate::database::listing::ListingDatabaseOptions] for the options available for

View File

@@ -479,6 +479,12 @@ impl<S: HttpSend> RestfulLanceDbClient<S> {
self.add_id_delimiter_query_param(builder)
}
pub fn post_wal(&self, uri: &str) -> RequestBuilder {
let full_uri = format!("{}{}", self.host, uri);
let builder = self.client.post(full_uri).header("x-use-wal", "true");
self.add_id_delimiter_query_param(builder)
}
fn add_id_delimiter_query_param(&self, req: RequestBuilder) -> RequestBuilder {
if self.id_delimiter != "$" {
req.query(&[("delimiter", self.id_delimiter.clone())])
@@ -982,6 +988,7 @@ mod tests {
let client = RestfulLanceDbClient {
client: reqwest::Client::new(),
host: "https://example.com".to_string(),
retry_config: RetryConfig::default().try_into().unwrap(),
sender: Sender,
id_delimiter: "+".to_string(),
@@ -1017,6 +1024,7 @@ mod tests {
let client = RestfulLanceDbClient {
client: reqwest::Client::new(),
host: "https://example.com".to_string(),
retry_config: RetryConfig::default().try_into().unwrap(),
sender: Sender,
id_delimiter: "+".to_string(),
@@ -1054,6 +1062,7 @@ mod tests {
let client = RestfulLanceDbClient {
client: reqwest::Client::new(),
host: "https://example.com".to_string(),
retry_config: RetryConfig::default().try_into().unwrap(),
sender: Sender,
id_delimiter: "+".to_string(),

View File

@@ -185,6 +185,7 @@ impl RemoteDatabaseOptionsBuilder {
self.options.host_override = Some(host_override);
self
}
}
#[derive(Debug)]

View File

@@ -1610,13 +1610,17 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
self.check_mutable().await?;
let timeout = params.timeout;
let use_wal = params.use_wal;
let query = MergeInsertRequest::try_from(params)?;
let mut request = self
.client
.post(&format!("/v1/table/{}/merge_insert/", self.identifier))
.query(&query)
.header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE);
let path = format!("/v1/table/{}/merge_insert/", self.identifier);
let mut request = if use_wal {
self.client.post_wal(&path)
} else {
self.client.post(&path)
}
.query(&query)
.header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE);
if let Some(timeout) = timeout {
// (If it doesn't fit into u64, it's not worth sending anyways.)
@@ -2705,6 +2709,43 @@ mod tests {
}
}
#[tokio::test]
async fn test_merge_insert_use_wal() {
let batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)
.unwrap();
let data: Box<dyn RecordBatchReader + Send> = Box::new(RecordBatchIterator::new(
[Ok(batch.clone())],
batch.schema(),
));
let table = Table::new_with_handler("my_table", move |request| {
if request.url().path() == "/v1/table/my_table/merge_insert/" {
// Verify the x-use-wal header is set for router-based WAL routing
assert_eq!(
request.headers().get("x-use-wal").unwrap(),
"true",
"merge_insert with use_wal should set x-use-wal header"
);
http::Response::builder()
.status(200)
.body(r#"{"version": 1, "num_deleted_rows": 0, "num_inserted_rows": 3, "num_updated_rows": 0}"#)
.unwrap()
} else {
panic!("Unexpected request path: {}", request.url().path());
}
});
let mut builder = table.merge_insert(&["some_col"]);
builder.use_wal(true);
let result = builder.execute(data).await.unwrap();
assert_eq!(result.num_inserted_rows, 3);
}
#[tokio::test]
async fn test_merge_insert_retries_on_409() {
let batch = RecordBatch::try_new(

View File

@@ -55,6 +55,7 @@ pub struct MergeInsertBuilder {
pub(crate) when_not_matched_by_source_delete_filt: Option<String>,
pub(crate) timeout: Option<Duration>,
pub(crate) use_index: bool,
pub(crate) use_wal: bool,
}
impl MergeInsertBuilder {
@@ -69,6 +70,7 @@ impl MergeInsertBuilder {
when_not_matched_by_source_delete_filt: None,
timeout: None,
use_index: true,
use_wal: false,
}
}
@@ -148,6 +150,18 @@ impl MergeInsertBuilder {
self
}
/// Controls whether to route the merge insert operation through the WAL.
///
/// When set to `true`, the request includes an `x-use-wal: true` header,
/// which the router uses to forward the operation to wal-writer instances
/// instead of Phalanx.
///
/// Defaults to `false`.
pub fn use_wal(&mut self, use_wal: bool) -> &mut Self {
self.use_wal = use_wal;
self
}
/// Executes the merge insert operation
///
/// Returns version and statistics about the merge operation including the number of rows