mirror of
https://github.com/lancedb/lancedb.git
synced 2026-06-05 05:10:41 +00:00
Compare commits
1 Commits
python-v0.
...
wal-table-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c6f9607820 |
@@ -1826,16 +1826,57 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
})
|
||||
}
|
||||
|
||||
async fn set_lsm_write_spec(&self, _spec: crate::table::LsmWriteSpec) -> Result<()> {
|
||||
Err(Error::NotSupported {
|
||||
message: "set_lsm_write_spec is not supported on LanceDB cloud.".into(),
|
||||
})
|
||||
async fn set_lsm_write_spec(&self, spec: crate::table::LsmWriteSpec) -> Result<()> {
|
||||
use crate::table::LsmWriteSpec;
|
||||
self.check_mutable().await?;
|
||||
|
||||
// Map the spec onto the server's request DTO. `sharding` is internally
|
||||
// tagged on `mode` to mirror sophon's `Sharding` enum; `maintained_indexes`
|
||||
// and `writer_config_defaults` are sent verbatim (an empty list means "no
|
||||
// maintained indexes", not "default to all").
|
||||
let sharding = match &spec {
|
||||
LsmWriteSpec::Bucket {
|
||||
column,
|
||||
num_buckets,
|
||||
..
|
||||
} => serde_json::json!({
|
||||
"mode": "bucket",
|
||||
"column": column,
|
||||
"num_buckets": num_buckets,
|
||||
}),
|
||||
LsmWriteSpec::Identity { column, .. } => serde_json::json!({
|
||||
"mode": "identity",
|
||||
"column": column,
|
||||
}),
|
||||
LsmWriteSpec::Unsharded { .. } => serde_json::json!({ "mode": "unsharded" }),
|
||||
};
|
||||
let body = serde_json::json!({
|
||||
"sharding": sharding,
|
||||
"maintained_indexes": spec.maintained_indexes(),
|
||||
"writer_config_defaults": spec.writer_config_defaults(),
|
||||
});
|
||||
|
||||
let request = self
|
||||
.client
|
||||
.post(&format!(
|
||||
"/v1/table/{}/set_lsm_write_spec/",
|
||||
self.identifier
|
||||
))
|
||||
.json(&body);
|
||||
let (request_id, response) = self.send(request, true).await?;
|
||||
self.check_table_response(&request_id, response).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn unset_lsm_write_spec(&self) -> Result<()> {
|
||||
Err(Error::NotSupported {
|
||||
message: "unset_lsm_write_spec is not supported on LanceDB cloud.".into(),
|
||||
})
|
||||
self.check_mutable().await?;
|
||||
let request = self.client.post(&format!(
|
||||
"/v1/table/{}/unset_lsm_write_spec/",
|
||||
self.identifier
|
||||
));
|
||||
let (request_id, response) = self.send(request, true).await?;
|
||||
self.check_table_response(&request_id, response).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn tags(&self) -> Result<Box<dyn Tags + '_>> {
|
||||
@@ -4376,6 +4417,91 @@ mod tests {
|
||||
assert!(matches!(e, Error::IndexNotFound { .. }));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_set_lsm_write_spec_unsharded() {
|
||||
let table = Table::new_with_handler("my_table", |request| {
|
||||
assert_eq!(request.method(), "POST");
|
||||
assert_eq!(
|
||||
request.url().path(),
|
||||
"/v1/table/my_table/set_lsm_write_spec/"
|
||||
);
|
||||
let body = request.body().unwrap().as_bytes().unwrap();
|
||||
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
|
||||
assert_eq!(body["sharding"], serde_json::json!({ "mode": "unsharded" }));
|
||||
assert_eq!(body["maintained_indexes"], serde_json::json!(["id_idx"]));
|
||||
assert_eq!(
|
||||
body["writer_config_defaults"],
|
||||
serde_json::json!({ "max_memtable_rows": "1000" })
|
||||
);
|
||||
http::Response::builder()
|
||||
.status(200)
|
||||
.body(r#"{"maintained_indexes":["id_idx"]}"#)
|
||||
.unwrap()
|
||||
});
|
||||
let spec = crate::table::LsmWriteSpec::unsharded()
|
||||
.with_maintained_indexes(["id_idx"])
|
||||
.with_writer_config_defaults([("max_memtable_rows", "1000")]);
|
||||
table.set_lsm_write_spec(spec).await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_set_lsm_write_spec_bucket() {
|
||||
let table = Table::new_with_handler("my_table", |request| {
|
||||
assert_eq!(request.method(), "POST");
|
||||
assert_eq!(
|
||||
request.url().path(),
|
||||
"/v1/table/my_table/set_lsm_write_spec/"
|
||||
);
|
||||
let body = request.body().unwrap().as_bytes().unwrap();
|
||||
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
|
||||
assert_eq!(
|
||||
body["sharding"],
|
||||
serde_json::json!({ "mode": "bucket", "column": "id", "num_buckets": 16 })
|
||||
);
|
||||
assert_eq!(body["maintained_indexes"], serde_json::json!([]));
|
||||
http::Response::builder().status(200).body("{}").unwrap()
|
||||
});
|
||||
table
|
||||
.set_lsm_write_spec(crate::table::LsmWriteSpec::bucket("id", 16))
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_set_lsm_write_spec_identity() {
|
||||
let table = Table::new_with_handler("my_table", |request| {
|
||||
assert_eq!(request.method(), "POST");
|
||||
assert_eq!(
|
||||
request.url().path(),
|
||||
"/v1/table/my_table/set_lsm_write_spec/"
|
||||
);
|
||||
let body = request.body().unwrap().as_bytes().unwrap();
|
||||
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
|
||||
assert_eq!(
|
||||
body["sharding"],
|
||||
serde_json::json!({ "mode": "identity", "column": "tenant" })
|
||||
);
|
||||
http::Response::builder().status(200).body("{}").unwrap()
|
||||
});
|
||||
table
|
||||
.set_lsm_write_spec(crate::table::LsmWriteSpec::identity("tenant"))
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_unset_lsm_write_spec() {
|
||||
let table = Table::new_with_handler("my_table", |request| {
|
||||
assert_eq!(request.method(), "POST");
|
||||
assert_eq!(
|
||||
request.url().path(),
|
||||
"/v1/table/my_table/unset_lsm_write_spec/"
|
||||
);
|
||||
http::Response::builder().status(200).body("{}").unwrap()
|
||||
});
|
||||
table.unset_lsm_write_spec().await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_wait_for_index() {
|
||||
let table = _make_table_with_indices(0);
|
||||
|
||||
Reference in New Issue
Block a user