Compare commits

..

6 Commits

Author SHA1 Message Date
Daniel Rammer
6abef26cdd Merge branch 'main' into feature/wal 2026-04-08 15:24:43 -05:00
Daniel Rammer
4446a2972c refactor: replace WAL host routing with x-use-wal header
Route all traffic through a single host and use an x-use-wal: true
header to signal WAL routing, letting the Pingora router handle the
split instead of maintaining a separate WAL host endpoint.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 15:52:16 -05:00
Daniel Rammer
b0d3fadfc0 Merge branch 'main' into feature/wal 2026-04-03 09:54:15 -05:00
Daniel Rammer
fd9dd390fc update to lance 5.0.0-beta.2 2026-04-01 23:51:14 -05:00
Daniel Rammer
931f19b737 Merge branch 'main' into feature/wal 2026-03-31 23:10:49 -05:00
Daniel Rammer
cde0814bbc feat(rust): add WAL host routing for merge_insert operations
Add support for routing merge_insert requests to a separate WAL/ingest
host. The WAL host is auto-derived as {db}.{region}.wal.lancedb.com or
can be explicitly overridden via ConnectBuilder::wal_host_override().
Callers opt in per-operation with MergeInsertBuilder::use_wal(true).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 23:09:15 -05:00
17 changed files with 149 additions and 154 deletions

View File

@@ -8,7 +8,6 @@ on:
paths:
- Cargo.toml
- Cargo.lock
- rust-toolchain.toml
- nodejs/**
- rust/**
- docs/src/js/**

View File

@@ -8,7 +8,6 @@ on:
paths:
- Cargo.toml
- Cargo.lock
- rust-toolchain.toml
- python/**
- rust/**
- .github/workflows/python.yml

View File

@@ -8,7 +8,6 @@ on:
paths:
- Cargo.toml
- Cargo.lock
- rust-toolchain.toml
- rust/**
- .github/workflows/rust.yml

65
Cargo.lock generated
View File

@@ -3072,8 +3072,8 @@ checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
[[package]]
name = "fsst"
version = "5.1.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v5.1.0-beta.1#103e947aef451e4b88da03fe47512558d333c29c"
version = "5.0.0-beta.5"
source = "git+https://github.com/lance-format/lance.git?tag=v5.0.0-beta.5#d630106da5a238b3adfb8c5dea3b3921f3519945"
dependencies = [
"arrow-array",
"rand 0.9.2",
@@ -4134,8 +4134,8 @@ dependencies = [
[[package]]
name = "lance"
version = "5.1.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v5.1.0-beta.1#103e947aef451e4b88da03fe47512558d333c29c"
version = "5.0.0-beta.5"
source = "git+https://github.com/lance-format/lance.git?tag=v5.0.0-beta.5#d630106da5a238b3adfb8c5dea3b3921f3519945"
dependencies = [
"arrow",
"arrow-arith",
@@ -4201,14 +4201,13 @@ dependencies = [
[[package]]
name = "lance-arrow"
version = "5.1.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v5.1.0-beta.1#103e947aef451e4b88da03fe47512558d333c29c"
version = "5.0.0-beta.5"
source = "git+https://github.com/lance-format/lance.git?tag=v5.0.0-beta.5#d630106da5a238b3adfb8c5dea3b3921f3519945"
dependencies = [
"arrow-array",
"arrow-buffer",
"arrow-cast",
"arrow-data",
"arrow-ipc",
"arrow-ord",
"arrow-schema",
"arrow-select",
@@ -4223,8 +4222,8 @@ dependencies = [
[[package]]
name = "lance-bitpacking"
version = "5.1.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v5.1.0-beta.1#103e947aef451e4b88da03fe47512558d333c29c"
version = "5.0.0-beta.5"
source = "git+https://github.com/lance-format/lance.git?tag=v5.0.0-beta.5#d630106da5a238b3adfb8c5dea3b3921f3519945"
dependencies = [
"arrayref",
"paste",
@@ -4233,8 +4232,8 @@ dependencies = [
[[package]]
name = "lance-core"
version = "5.1.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v5.1.0-beta.1#103e947aef451e4b88da03fe47512558d333c29c"
version = "5.0.0-beta.5"
source = "git+https://github.com/lance-format/lance.git?tag=v5.0.0-beta.5#d630106da5a238b3adfb8c5dea3b3921f3519945"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4271,8 +4270,8 @@ dependencies = [
[[package]]
name = "lance-datafusion"
version = "5.1.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v5.1.0-beta.1#103e947aef451e4b88da03fe47512558d333c29c"
version = "5.0.0-beta.5"
source = "git+https://github.com/lance-format/lance.git?tag=v5.0.0-beta.5#d630106da5a238b3adfb8c5dea3b3921f3519945"
dependencies = [
"arrow",
"arrow-array",
@@ -4302,8 +4301,8 @@ dependencies = [
[[package]]
name = "lance-datagen"
version = "5.1.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v5.1.0-beta.1#103e947aef451e4b88da03fe47512558d333c29c"
version = "5.0.0-beta.5"
source = "git+https://github.com/lance-format/lance.git?tag=v5.0.0-beta.5#d630106da5a238b3adfb8c5dea3b3921f3519945"
dependencies = [
"arrow",
"arrow-array",
@@ -4321,8 +4320,8 @@ dependencies = [
[[package]]
name = "lance-encoding"
version = "5.1.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v5.1.0-beta.1#103e947aef451e4b88da03fe47512558d333c29c"
version = "5.0.0-beta.5"
source = "git+https://github.com/lance-format/lance.git?tag=v5.0.0-beta.5#d630106da5a238b3adfb8c5dea3b3921f3519945"
dependencies = [
"arrow-arith",
"arrow-array",
@@ -4359,8 +4358,8 @@ dependencies = [
[[package]]
name = "lance-file"
version = "5.1.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v5.1.0-beta.1#103e947aef451e4b88da03fe47512558d333c29c"
version = "5.0.0-beta.5"
source = "git+https://github.com/lance-format/lance.git?tag=v5.0.0-beta.5#d630106da5a238b3adfb8c5dea3b3921f3519945"
dependencies = [
"arrow-arith",
"arrow-array",
@@ -4392,8 +4391,8 @@ dependencies = [
[[package]]
name = "lance-index"
version = "5.1.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v5.1.0-beta.1#103e947aef451e4b88da03fe47512558d333c29c"
version = "5.0.0-beta.5"
source = "git+https://github.com/lance-format/lance.git?tag=v5.0.0-beta.5#d630106da5a238b3adfb8c5dea3b3921f3519945"
dependencies = [
"arrow",
"arrow-arith",
@@ -4457,8 +4456,8 @@ dependencies = [
[[package]]
name = "lance-io"
version = "5.1.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v5.1.0-beta.1#103e947aef451e4b88da03fe47512558d333c29c"
version = "5.0.0-beta.5"
source = "git+https://github.com/lance-format/lance.git?tag=v5.0.0-beta.5#d630106da5a238b3adfb8c5dea3b3921f3519945"
dependencies = [
"arrow",
"arrow-arith",
@@ -4502,8 +4501,8 @@ dependencies = [
[[package]]
name = "lance-linalg"
version = "5.1.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v5.1.0-beta.1#103e947aef451e4b88da03fe47512558d333c29c"
version = "5.0.0-beta.5"
source = "git+https://github.com/lance-format/lance.git?tag=v5.0.0-beta.5#d630106da5a238b3adfb8c5dea3b3921f3519945"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4519,8 +4518,8 @@ dependencies = [
[[package]]
name = "lance-namespace"
version = "5.1.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v5.1.0-beta.1#103e947aef451e4b88da03fe47512558d333c29c"
version = "5.0.0-beta.5"
source = "git+https://github.com/lance-format/lance.git?tag=v5.0.0-beta.5#d630106da5a238b3adfb8c5dea3b3921f3519945"
dependencies = [
"arrow",
"async-trait",
@@ -4533,8 +4532,8 @@ dependencies = [
[[package]]
name = "lance-namespace-impls"
version = "5.1.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v5.1.0-beta.1#103e947aef451e4b88da03fe47512558d333c29c"
version = "5.0.0-beta.5"
source = "git+https://github.com/lance-format/lance.git?tag=v5.0.0-beta.5#d630106da5a238b3adfb8c5dea3b3921f3519945"
dependencies = [
"arrow",
"arrow-ipc",
@@ -4579,8 +4578,8 @@ dependencies = [
[[package]]
name = "lance-table"
version = "5.1.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v5.1.0-beta.1#103e947aef451e4b88da03fe47512558d333c29c"
version = "5.0.0-beta.5"
source = "git+https://github.com/lance-format/lance.git?tag=v5.0.0-beta.5#d630106da5a238b3adfb8c5dea3b3921f3519945"
dependencies = [
"arrow",
"arrow-array",
@@ -4619,8 +4618,8 @@ dependencies = [
[[package]]
name = "lance-testing"
version = "5.1.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v5.1.0-beta.1#103e947aef451e4b88da03fe47512558d333c29c"
version = "5.0.0-beta.5"
source = "git+https://github.com/lance-format/lance.git?tag=v5.0.0-beta.5#d630106da5a238b3adfb8c5dea3b3921f3519945"
dependencies = [
"arrow-array",
"arrow-schema",

View File

@@ -15,20 +15,20 @@ categories = ["database-implementations"]
rust-version = "1.91.0"
[workspace.dependencies]
lance = { "version" = "=5.1.0-beta.1", default-features = false, "tag" = "v5.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=5.1.0-beta.1", "tag" = "v5.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=5.1.0-beta.1", "tag" = "v5.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=5.1.0-beta.1", "tag" = "v5.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=5.1.0-beta.1", default-features = false, "tag" = "v5.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=5.1.0-beta.1", "tag" = "v5.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=5.1.0-beta.1", "tag" = "v5.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=5.1.0-beta.1", "tag" = "v5.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=5.1.0-beta.1", default-features = false, "tag" = "v5.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=5.1.0-beta.1", "tag" = "v5.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=5.1.0-beta.1", "tag" = "v5.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=5.1.0-beta.1", "tag" = "v5.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=5.1.0-beta.1", "tag" = "v5.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=5.1.0-beta.1", "tag" = "v5.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance = { "version" = "=5.0.0-beta.5", default-features = false, "tag" = "v5.0.0-beta.5", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=5.0.0-beta.5", "tag" = "v5.0.0-beta.5", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=5.0.0-beta.5", "tag" = "v5.0.0-beta.5", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=5.0.0-beta.5", "tag" = "v5.0.0-beta.5", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=5.0.0-beta.5", default-features = false, "tag" = "v5.0.0-beta.5", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=5.0.0-beta.5", "tag" = "v5.0.0-beta.5", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=5.0.0-beta.5", "tag" = "v5.0.0-beta.5", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=5.0.0-beta.5", "tag" = "v5.0.0-beta.5", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=5.0.0-beta.5", default-features = false, "tag" = "v5.0.0-beta.5", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=5.0.0-beta.5", "tag" = "v5.0.0-beta.5", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=5.0.0-beta.5", "tag" = "v5.0.0-beta.5", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=5.0.0-beta.5", "tag" = "v5.0.0-beta.5", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=5.0.0-beta.5", "tag" = "v5.0.0-beta.5", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=5.0.0-beta.5", "tag" = "v5.0.0-beta.5", "git" = "https://github.com/lance-format/lance.git" }
ahash = "0.8"
# Note that this one does not include pyarrow
arrow = { version = "57.2", optional = false }

View File

@@ -28,7 +28,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<arrow.version>15.0.0</arrow.version>
<lance-core.version>5.1.0-beta.1</lance-core.version>
<lance-core.version>5.0.0-beta.5</lance-core.version>
<spotless.skip>false</spotless.skip>
<spotless.version>2.30.0</spotless.version>
<spotless.java.googlejavaformat.version>1.7</spotless.java.googlejavaformat.version>

View File

@@ -83,7 +83,7 @@ embeddings = [
"colpali-engine>=0.3.10",
"huggingface_hub>=0.19.0",
"InstructorEmbedding>=1.0.1",
"google-genai>=1.0.0",
"google.generativeai>=0.3.0",
"boto3>=1.28.57",
"awscli>=1.44.38",
"botocore>=1.31.57",

View File

@@ -19,10 +19,10 @@ from .utils import TEXT, api_key_not_found_help
@register("gemini-text")
class GeminiText(TextEmbeddingFunction):
"""
An embedding function that uses Google's Gemini API. Requires GOOGLE_API_KEY to
An embedding function that uses the Google's Gemini API. Requires GOOGLE_API_KEY to
be set.
https://ai.google.dev/gemini-api/docs/embeddings
https://ai.google.dev/docs/embeddings_guide
Supports various tasks types:
| Task Type | Description |
@@ -46,12 +46,9 @@ class GeminiText(TextEmbeddingFunction):
Parameters
----------
name: str, default "gemini-embedding-001"
The name of the model to use. Supported models include:
- "gemini-embedding-001" (768 dimensions)
Note: The legacy "models/embedding-001" format is also supported but
"gemini-embedding-001" is recommended.
name: str, default "models/embedding-001"
The name of the model to use. See the Gemini documentation for a list of
available models.
query_task_type: str, default "retrieval_query"
Sets the task type for the queries.
@@ -80,7 +77,7 @@ class GeminiText(TextEmbeddingFunction):
"""
name: str = "gemini-embedding-001"
name: str = "models/embedding-001"
query_task_type: str = "retrieval_query"
source_task_type: str = "retrieval_document"
@@ -117,48 +114,23 @@ class GeminiText(TextEmbeddingFunction):
texts: list[str] or np.ndarray (of str)
The texts to embed
"""
from google.genai import types
if (
kwargs.get("task_type") == "retrieval_document"
): # Provide a title to use existing API design
title = "Embedding of a document"
kwargs["title"] = title
task_type = kwargs.get("task_type")
# Build content objects for embed_content
contents = []
for text in texts:
if task_type == "retrieval_document":
# Provide a title for retrieval_document task
contents.append(
{"parts": [{"text": "Embedding of a document"}, {"text": text}]}
)
else:
contents.append({"parts": [{"text": text}]})
# Build config
config_kwargs = {}
if task_type:
config_kwargs["task_type"] = task_type.upper() # API expects uppercase
# Call embed_content for each content
embeddings = []
for content in contents:
config = (
types.EmbedContentConfig(**config_kwargs) if config_kwargs else None
)
response = self.client.models.embed_content(
model=self.name,
contents=content,
config=config,
)
embeddings.append(response.embeddings[0].values)
return embeddings
return [
self.client.embed_content(model=self.name, content=text, **kwargs)[
"embedding"
]
for text in texts
]
@cached_property
def client(self):
attempt_import_or_raise("google.genai", "google-genai")
genai = attempt_import_or_raise("google.generativeai", "google.generativeai")
if not os.environ.get("GOOGLE_API_KEY"):
api_key_not_found_help("google")
from google import genai as genai_module
return genai_module.Client(api_key=os.environ.get("GOOGLE_API_KEY"))
return genai

View File

@@ -284,8 +284,9 @@ class Permutations:
self.permutation_table = permutation_table
if permutation_table.schema.metadata is not None:
raw = permutation_table.schema.metadata.get(b"split_names")
split_names = raw.decode("utf-8") if raw is not None else None
split_names = permutation_table.schema.metadata.get(
b"split_names", None
).decode("utf-8")
if split_names is not None:
self.split_names = json.loads(split_names)
self.split_dict = {
@@ -459,8 +460,9 @@ class Permutation:
f"Cannot create a permutation on split `{split}`"
" because no split names are defined in the permutation table"
)
raw = permutation_table.schema.metadata.get(b"split_names")
split_names = raw.decode("utf-8") if raw is not None else None
split_names = permutation_table.schema.metadata.get(
b"split_names", None
).decode("utf-8")
if split_names is None:
raise ValueError(
f"Cannot create a permutation on split `{split}`"

View File

@@ -522,50 +522,6 @@ def test_no_split_names(some_table: Table):
assert permutations[1].num_rows == 500
def test_permutations_metadata_without_split_names_key(mem_db: DBConnection):
"""Regression: schema metadata present but missing split_names key must not crash.
Previously, `.get(b"split_names", None).decode()` was called unconditionally,
so any permutation table whose metadata dict had other keys but no split_names
raised AttributeError: 'NoneType' has no attribute 'decode'.
"""
base = mem_db.create_table("base_nosplit", pa.table({"x": range(10)}))
# Build a permutation-like table that carries some metadata but NOT split_names.
raw = pa.table(
{
"row_id": pa.array(range(10), type=pa.uint64()),
"split_id": pa.array([0] * 10, type=pa.uint32()),
}
).replace_schema_metadata({b"other_key": b"other_value"})
perm_tbl = mem_db.create_table("perm_nosplit", raw)
permutations = Permutations(base, perm_tbl)
assert permutations.split_names == []
assert permutations.split_dict == {}
def test_from_tables_string_split_missing_names_key(mem_db: DBConnection):
"""Regression: from_tables() with a string split must raise ValueError, not
AttributeError.
Previously, `.get(b"split_names", None).decode()` crashed with AttributeError
when the metadata dict existed but had no split_names key.
"""
base = mem_db.create_table("base_strsplit", pa.table({"x": range(10)}))
raw = pa.table(
{
"row_id": pa.array(range(10), type=pa.uint64()),
"split_id": pa.array([0] * 10, type=pa.uint32()),
}
).replace_schema_metadata({b"other_key": b"other_value"})
perm_tbl = mem_db.create_table("perm_strsplit", raw)
with pytest.raises(ValueError, match="no split names are defined"):
Permutation.from_tables(base, perm_tbl, split="train")
@pytest.fixture
def some_perm_table(some_table: Table) -> Table:
return (

View File

@@ -1,2 +1,2 @@
[toolchain]
channel = "1.94.0"
channel = "1.91.0"

View File

@@ -676,6 +676,11 @@ impl ConnectBuilder {
self
}
/// Set the WAL host override for routing merge_insert requests
/// to a separate WAL/ingest service.
///
/// This option is only used when connecting to LanceDB Cloud (db:// URIs)
/// and will be ignored for other URIs.
/// Set the database specific options
///
/// See [crate::database::listing::ListingDatabaseOptions] for the options available for

View File

@@ -177,7 +177,6 @@ impl BedrockEmbeddingFunction {
))
.send()
.await
.map_err(Box::new)
})
})
.unwrap();

View File

@@ -527,6 +527,12 @@ impl<S: HttpSend> RestfulLanceDbClient<S> {
self.add_id_delimiter_query_param(builder)
}
pub fn post_wal(&self, uri: &str) -> RequestBuilder {
let full_uri = format!("{}{}", self.host, uri);
let builder = self.client.post(full_uri).header("x-use-wal", "true");
self.add_id_delimiter_query_param(builder)
}
fn add_id_delimiter_query_param(&self, req: RequestBuilder) -> RequestBuilder {
if self.id_delimiter != "$" {
req.query(&[("delimiter", self.id_delimiter.clone())])
@@ -1030,6 +1036,7 @@ mod tests {
let client = RestfulLanceDbClient {
client: reqwest::Client::new(),
host: "https://example.com".to_string(),
retry_config: RetryConfig::default().try_into().unwrap(),
sender: Sender,
id_delimiter: "+".to_string(),
@@ -1065,6 +1072,7 @@ mod tests {
let client = RestfulLanceDbClient {
client: reqwest::Client::new(),
host: "https://example.com".to_string(),
retry_config: RetryConfig::default().try_into().unwrap(),
sender: Sender,
id_delimiter: "+".to_string(),
@@ -1102,6 +1110,7 @@ mod tests {
let client = RestfulLanceDbClient {
client: reqwest::Client::new(),
host: "https://example.com".to_string(),
retry_config: RetryConfig::default().try_into().unwrap(),
sender: Sender,
id_delimiter: "+".to_string(),

View File

@@ -185,6 +185,7 @@ impl RemoteDatabaseOptionsBuilder {
self.options.host_override = Some(host_override);
self
}
}
#[derive(Debug)]

View File

@@ -1610,13 +1610,17 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
self.check_mutable().await?;
let timeout = params.timeout;
let use_wal = params.use_wal;
let query = MergeInsertRequest::try_from(params)?;
let mut request = self
.client
.post(&format!("/v1/table/{}/merge_insert/", self.identifier))
.query(&query)
.header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE);
let path = format!("/v1/table/{}/merge_insert/", self.identifier);
let mut request = if use_wal {
self.client.post_wal(&path)
} else {
self.client.post(&path)
}
.query(&query)
.header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE);
if let Some(timeout) = timeout {
// (If it doesn't fit into u64, it's not worth sending anyways.)
@@ -2705,6 +2709,43 @@ mod tests {
}
}
#[tokio::test]
async fn test_merge_insert_use_wal() {
let batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)
.unwrap();
let data: Box<dyn RecordBatchReader + Send> = Box::new(RecordBatchIterator::new(
[Ok(batch.clone())],
batch.schema(),
));
let table = Table::new_with_handler("my_table", move |request| {
if request.url().path() == "/v1/table/my_table/merge_insert/" {
// Verify the x-use-wal header is set for router-based WAL routing
assert_eq!(
request.headers().get("x-use-wal").unwrap(),
"true",
"merge_insert with use_wal should set x-use-wal header"
);
http::Response::builder()
.status(200)
.body(r#"{"version": 1, "num_deleted_rows": 0, "num_inserted_rows": 3, "num_updated_rows": 0}"#)
.unwrap()
} else {
panic!("Unexpected request path: {}", request.url().path());
}
});
let mut builder = table.merge_insert(&["some_col"]);
builder.use_wal(true);
let result = builder.execute(data).await.unwrap();
assert_eq!(result.num_inserted_rows, 3);
}
#[tokio::test]
async fn test_merge_insert_retries_on_409() {
let batch = RecordBatch::try_new(

View File

@@ -55,6 +55,7 @@ pub struct MergeInsertBuilder {
pub(crate) when_not_matched_by_source_delete_filt: Option<String>,
pub(crate) timeout: Option<Duration>,
pub(crate) use_index: bool,
pub(crate) use_wal: bool,
}
impl MergeInsertBuilder {
@@ -69,6 +70,7 @@ impl MergeInsertBuilder {
when_not_matched_by_source_delete_filt: None,
timeout: None,
use_index: true,
use_wal: false,
}
}
@@ -148,6 +150,18 @@ impl MergeInsertBuilder {
self
}
/// Controls whether to route the merge insert operation through the WAL.
///
/// When set to `true`, the request includes an `x-use-wal: true` header,
/// which the router uses to forward the operation to wal-writer instances
/// instead of Phalanx.
///
/// Defaults to `false`.
pub fn use_wal(&mut self, use_wal: bool) -> &mut Self {
self.use_wal = use_wal;
self
}
/// Executes the merge insert operation
///
/// Returns version and statistics about the merge operation including the number of rows