feat(remote): implement set/unset_lsm_write_spec REST variant

Wire RemoteTable::set_lsm_write_spec and unset_lsm_write_spec to the
sophon REST endpoints (lancedb/sophon#6181) instead of returning
NotSupported. set_lsm_write_spec maps the LsmWriteSpec onto sophon's
request DTO (mode-tagged sharding, maintained_indexes,
writer_config_defaults) and POSTs to /set_lsm_write_spec; unset_lsm_write_spec
POSTs to /unset_lsm_write_spec.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Daniel Rammer
2026-06-04 11:06:16 -05:00
parent 39a9f3e1e9
commit 6d90febea3

View File

@@ -1827,16 +1827,57 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
})
}
async fn set_lsm_write_spec(&self, _spec: crate::table::LsmWriteSpec) -> Result<()> {
Err(Error::NotSupported {
message: "set_lsm_write_spec is not supported on LanceDB cloud.".into(),
})
async fn set_lsm_write_spec(&self, spec: crate::table::LsmWriteSpec) -> Result<()> {
use crate::table::LsmWriteSpec;
self.check_mutable().await?;
// Map the spec onto the server's request DTO. `sharding` is internally
// tagged on `mode` to mirror sophon's `Sharding` enum; `maintained_indexes`
// and `writer_config_defaults` are sent verbatim (an empty list means "no
// maintained indexes", not "default to all").
let sharding = match &spec {
LsmWriteSpec::Bucket {
column,
num_buckets,
..
} => serde_json::json!({
"mode": "bucket",
"column": column,
"num_buckets": num_buckets,
}),
LsmWriteSpec::Identity { column, .. } => serde_json::json!({
"mode": "identity",
"column": column,
}),
LsmWriteSpec::Unsharded { .. } => serde_json::json!({ "mode": "unsharded" }),
};
let body = serde_json::json!({
"sharding": sharding,
"maintained_indexes": spec.maintained_indexes(),
"writer_config_defaults": spec.writer_config_defaults(),
});
let request = self
.client
.post(&format!(
"/v1/table/{}/set_lsm_write_spec/",
self.identifier
))
.json(&body);
let (request_id, response) = self.send(request, true).await?;
self.check_table_response(&request_id, response).await?;
Ok(())
}
async fn unset_lsm_write_spec(&self) -> Result<()> {
Err(Error::NotSupported {
message: "unset_lsm_write_spec is not supported on LanceDB cloud.".into(),
})
self.check_mutable().await?;
let request = self.client.post(&format!(
"/v1/table/{}/unset_lsm_write_spec/",
self.identifier
));
let (request_id, response) = self.send(request, true).await?;
self.check_table_response(&request_id, response).await?;
Ok(())
}
async fn tags(&self) -> Result<Box<dyn Tags + '_>> {
@@ -4428,6 +4469,91 @@ mod tests {
assert!(matches!(e, Error::IndexNotFound { .. }));
}
#[tokio::test]
async fn test_set_lsm_write_spec_unsharded() {
let table = Table::new_with_handler("my_table", |request| {
assert_eq!(request.method(), "POST");
assert_eq!(
request.url().path(),
"/v1/table/my_table/set_lsm_write_spec/"
);
let body = request.body().unwrap().as_bytes().unwrap();
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
assert_eq!(body["sharding"], serde_json::json!({ "mode": "unsharded" }));
assert_eq!(body["maintained_indexes"], serde_json::json!(["id_idx"]));
assert_eq!(
body["writer_config_defaults"],
serde_json::json!({ "max_memtable_rows": "1000" })
);
http::Response::builder()
.status(200)
.body(r#"{"maintained_indexes":["id_idx"]}"#)
.unwrap()
});
let spec = crate::table::LsmWriteSpec::unsharded()
.with_maintained_indexes(["id_idx"])
.with_writer_config_defaults([("max_memtable_rows", "1000")]);
table.set_lsm_write_spec(spec).await.unwrap();
}
#[tokio::test]
async fn test_set_lsm_write_spec_bucket() {
let table = Table::new_with_handler("my_table", |request| {
assert_eq!(request.method(), "POST");
assert_eq!(
request.url().path(),
"/v1/table/my_table/set_lsm_write_spec/"
);
let body = request.body().unwrap().as_bytes().unwrap();
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
assert_eq!(
body["sharding"],
serde_json::json!({ "mode": "bucket", "column": "id", "num_buckets": 16 })
);
assert_eq!(body["maintained_indexes"], serde_json::json!([]));
http::Response::builder().status(200).body("{}").unwrap()
});
table
.set_lsm_write_spec(crate::table::LsmWriteSpec::bucket("id", 16))
.await
.unwrap();
}
#[tokio::test]
async fn test_set_lsm_write_spec_identity() {
let table = Table::new_with_handler("my_table", |request| {
assert_eq!(request.method(), "POST");
assert_eq!(
request.url().path(),
"/v1/table/my_table/set_lsm_write_spec/"
);
let body = request.body().unwrap().as_bytes().unwrap();
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
assert_eq!(
body["sharding"],
serde_json::json!({ "mode": "identity", "column": "tenant" })
);
http::Response::builder().status(200).body("{}").unwrap()
});
table
.set_lsm_write_spec(crate::table::LsmWriteSpec::identity("tenant"))
.await
.unwrap();
}
#[tokio::test]
async fn test_unset_lsm_write_spec() {
let table = Table::new_with_handler("my_table", |request| {
assert_eq!(request.method(), "POST");
assert_eq!(
request.url().path(),
"/v1/table/my_table/unset_lsm_write_spec/"
);
http::Response::builder().status(200).body("{}").unwrap()
});
table.unset_lsm_write_spec().await.unwrap();
}
#[tokio::test]
async fn test_wait_for_index() {
let table = _make_table_with_indices(0);