chore: reduce unneeded API calls for return version for write operations and improve test (#2373)

Reduce the duplicate code for remote write operation testing.
Avoid double call to remote to get version info, just return 0 instead
of suddenly adding extra API calls for end users when they are using old
servers.

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **New Features**
- Added version tracking to table operation results, allowing users to
see the commit version associated with add, update, delete, merge, and
column modification operations.
- **Bug Fixes**
- Improved compatibility with legacy servers by standardizing version
information as zero when the server does not return a version.
- **Documentation**
- Clarified the meaning of the version field in operation results,
especially for cases involving legacy server responses.
- **Tests**
- Enhanced test coverage to verify correct behavior with both legacy and
modern server responses.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
LuQQiu
2025-05-05 16:47:19 -07:00
committed by GitHub
parent ed594b0f76
commit 695813463c
2 changed files with 144 additions and 444 deletions

View File

@@ -761,8 +761,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
if body.trim().is_empty() || body == "{}" {
// Backward compatible with old servers
let version = self.version().await?;
return Ok(AddResult { version });
return Ok(AddResult { version: 0 });
}
let add_response: AddResult = serde_json::from_str(&body).map_err(|e| Error::Http {
@@ -925,10 +924,9 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
if body.trim().is_empty() || body == "{}" {
// Backward compatible with old servers
let version = self.version().await?;
return Ok(UpdateResult {
rows_updated: 0,
version,
version: 0,
});
}
@@ -955,8 +953,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
if body == "{}" {
// Backward compatible with old servers
let version = self.version().await?;
return Ok(DeleteResult { version });
return Ok(DeleteResult { version: 0 });
}
let delete_response: DeleteResult =
@@ -1088,9 +1085,8 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
if body.trim().is_empty() || body == "{}" {
// Backward compatible with old servers
let version = self.version().await?;
return Ok(MergeResult {
version,
version: 0,
num_deleted_rows: 0,
num_inserted_rows: 0,
num_updated_rows: 0,
@@ -1151,8 +1147,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
if body.trim().is_empty() || body == "{}" {
// Backward compatible with old servers
let version = self.version().await?;
return Ok(AddColumnsResult { version });
return Ok(AddColumnsResult { version: 0 });
}
let result: AddColumnsResult =
@@ -1205,8 +1200,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
if body.trim().is_empty() || body == "{}" {
// Backward compatible with old servers
let version = self.version().await?;
return Ok(AlterColumnsResult { version });
return Ok(AlterColumnsResult { version: 0 });
}
let result: AlterColumnsResult = serde_json::from_str(&body).map_err(|e| Error::Http {
@@ -1231,8 +1225,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
if body.trim().is_empty() || body == "{}" {
// Backward compatible with old servers
let version = self.version().await?;
return Ok(DropColumnsResult { version });
return Ok(DropColumnsResult { version: 0 });
}
let result: DropColumnsResult = serde_json::from_str(&body).map_err(|e| Error::Http {
@@ -1609,8 +1602,11 @@ mod tests {
body
}
#[rstest]
#[case(true)]
#[case(false)]
#[tokio::test]
async fn test_add_append_old_server() {
async fn test_add_append(#[case] old_server: bool) {
let data = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
@@ -1636,14 +1632,14 @@ mod tests {
std::mem::swap(request.body_mut().as_mut().unwrap(), &mut body_out);
sender.send(body_out).unwrap();
// Return empty JSON object for old server behavior
http::Response::builder().status(200).body("").unwrap()
} else if request.url().path() == "/v1/table/my_table/describe/" {
// Handle describe call for backward compatibility
http::Response::builder()
.status(200)
.body(r#"{"version": 42, "schema": { "fields": [] }}"#)
.unwrap()
if old_server {
http::Response::builder().status(200).body("").unwrap()
} else {
http::Response::builder()
.status(200)
.body(r#"{"version": 43}"#)
.unwrap()
}
} else {
panic!("Unexpected request path: {}", request.url().path());
}
@@ -1655,7 +1651,7 @@ mod tests {
.await
.unwrap();
assert_eq!(result.version, 42);
assert_eq!(result.version, if old_server { 0 } else { 43 });
let body = receiver.recv().unwrap();
let body = collect_body(body).await;
@@ -1663,117 +1659,11 @@ mod tests {
assert_eq!(&body, &expected_body);
}
#[rstest]
#[case(true)]
#[case(false)]
#[tokio::test]
async fn test_add_append() {
let data = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)
.unwrap();
let (sender, receiver) = std::sync::mpsc::channel();
let table = Table::new_with_handler("my_table", move |mut request| {
assert_eq!(request.method(), "POST");
assert_eq!(request.url().path(), "/v1/table/my_table/insert/");
// If mode is specified, it should be "append". Append is default
// so it's not required.
assert!(request
.url()
.query_pairs()
.filter(|(k, _)| k == "mode")
.all(|(_, v)| v == "append"));
assert_eq!(
request.headers().get("Content-Type").unwrap(),
ARROW_STREAM_CONTENT_TYPE
);
let mut body_out = reqwest::Body::from(Vec::new());
std::mem::swap(request.body_mut().as_mut().unwrap(), &mut body_out);
sender.send(body_out).unwrap();
http::Response::builder()
.status(200)
.body(r#"{"version": 43}"#)
.unwrap()
});
let result = table
.add(RecordBatchIterator::new([Ok(data.clone())], data.schema()))
.execute()
.await
.unwrap();
assert_eq!(result.version, 43);
let body = receiver.recv().unwrap();
let body = collect_body(body).await;
let expected_body = write_ipc_stream(&data);
assert_eq!(&body, &expected_body);
}
#[tokio::test]
async fn test_add_overwrite_old_server() {
let data = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)
.unwrap();
let (sender, receiver) = std::sync::mpsc::channel();
let table = Table::new_with_handler("my_table", move |mut request| {
if request.url().path() == "/v1/table/my_table/insert/" {
assert_eq!(request.method(), "POST");
assert_eq!(
request
.url()
.query_pairs()
.find(|(k, _)| k == "mode")
.map(|kv| kv.1)
.as_deref(),
Some("overwrite"),
"Expected mode=overwrite"
);
assert_eq!(
request.headers().get("Content-Type").unwrap(),
ARROW_STREAM_CONTENT_TYPE
);
let mut body_out = reqwest::Body::from(Vec::new());
std::mem::swap(request.body_mut().as_mut().unwrap(), &mut body_out);
sender.send(body_out).unwrap();
// Return empty JSON object for old server behavior
http::Response::builder().status(200).body("{}").unwrap()
} else if request.url().path() == "/v1/table/my_table/describe/" {
// Handle describe call for backward compatibility
http::Response::builder()
.status(200)
.body(r#"{"version": 42, "schema": { "fields": [] }}"#)
.unwrap()
} else {
panic!("Unexpected request path: {}", request.url().path());
}
});
let result = table
.add(RecordBatchIterator::new([Ok(data.clone())], data.schema()))
.mode(AddDataMode::Overwrite)
.execute()
.await
.unwrap();
assert_eq!(result.version, 42);
let body = receiver.recv().unwrap();
let body = collect_body(body).await;
let expected_body = write_ipc_stream(&data);
assert_eq!(&body, &expected_body);
}
#[tokio::test]
async fn test_add_overwrite() {
async fn test_add_overwrite(#[case] old_server: bool) {
let data = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
@@ -1804,10 +1694,14 @@ mod tests {
std::mem::swap(request.body_mut().as_mut().unwrap(), &mut body_out);
sender.send(body_out).unwrap();
http::Response::builder()
.status(200)
.body(r#"{"version": 43}"#)
.unwrap()
if old_server {
http::Response::builder().status(200).body("").unwrap()
} else {
http::Response::builder()
.status(200)
.body(r#"{"version": 43}"#)
.unwrap()
}
});
let result = table
@@ -1817,7 +1711,7 @@ mod tests {
.await
.unwrap();
assert_eq!(result.version, 43);
assert_eq!(result.version, if old_server { 0 } else { 43 });
let body = receiver.recv().unwrap();
let body = collect_body(body).await;
@@ -1825,9 +1719,12 @@ mod tests {
assert_eq!(&body, &expected_body);
}
#[rstest]
#[case(true)]
#[case(false)]
#[tokio::test]
async fn test_update_old_server() {
let table = Table::new_with_handler("my_table", |request| {
async fn test_update(#[case] old_server: bool) {
let table = Table::new_with_handler("my_table", move |request| {
if request.url().path() == "/v1/table/my_table/update/" {
assert_eq!(request.method(), "POST");
assert_eq!(
@@ -1855,14 +1752,14 @@ mod tests {
assert_eq!(only_if, "b > 10");
}
// Return empty JSON object (old server behavior)
http::Response::builder().status(200).body("{}").unwrap()
} else if request.url().path() == "/v1/table/my_table/describe/" {
// Handle the describe request for version lookup
http::Response::builder()
.status(200)
.body(r#"{"version": 42, "schema": { "fields": [] }}"#)
.unwrap()
if old_server {
http::Response::builder().status(200).body("").unwrap()
} else {
http::Response::builder()
.status(200)
.body(r#"{"rows_updated": 5, "version": 43}"#)
.unwrap()
}
} else {
panic!("Unexpected request path: {}", request.url().path());
}
@@ -1877,64 +1774,16 @@ mod tests {
.await
.unwrap();
assert_eq!(result.version, 42);
assert_eq!(result.rows_updated, 0);
assert_eq!(result.version, if old_server { 0 } else { 43 });
assert_eq!(result.rows_updated, if old_server { 0 } else { 5 });
}
#[rstest]
#[case(true)]
#[case(false)]
#[tokio::test]
async fn test_update() {
let table = Table::new_with_handler("my_table", |request| {
assert_eq!(request.method(), "POST");
assert_eq!(request.url().path(), "/v1/table/my_table/update/");
assert_eq!(
request.headers().get("Content-Type").unwrap(),
JSON_CONTENT_TYPE
);
if let Some(body) = request.body().unwrap().as_bytes() {
let body = std::str::from_utf8(body).unwrap();
let value: serde_json::Value = serde_json::from_str(body).unwrap();
let updates = value.get("updates").unwrap().as_array().unwrap();
assert!(updates.len() == 2);
let col_name = updates[0][0].as_str().unwrap();
let expression = updates[0][1].as_str().unwrap();
assert_eq!(col_name, "a");
assert_eq!(expression, "a + 1");
let col_name = updates[1][0].as_str().unwrap();
let expression = updates[1][1].as_str().unwrap();
assert_eq!(col_name, "b");
assert_eq!(expression, "b - 1");
let only_if = value.get("predicate").unwrap().as_str().unwrap();
assert_eq!(only_if, "b > 10");
}
// Return structured response (new server behavior)
http::Response::builder()
.status(200)
.body(r#"{"rows_updated": 5, "version": 43}"#)
.unwrap()
});
let result = table
.update()
.column("a", "a + 1")
.column("b", "b - 1")
.only_if("b > 10")
.execute()
.await
.unwrap();
// Verify result for new behavior
assert_eq!(result.rows_updated, 5); // From structured response
assert_eq!(result.version, 43); // From structured response
}
#[tokio::test]
async fn test_alter_columns_old_server() {
let table = Table::new_with_handler("my_table", |request| {
async fn test_alter_columns(#[case] old_server: bool) {
let table = Table::new_with_handler("my_table", move |request| {
if request.url().path() == "/v1/table/my_table/alter_columns/" {
assert_eq!(request.method(), "POST");
assert_eq!(
@@ -1960,12 +1809,14 @@ mod tests {
assert!(nullable);
assert_eq!(rename, "y");
http::Response::builder().status(200).body("{}").unwrap()
} else if request.url().path() == "/v1/table/my_table/describe/" {
http::Response::builder()
.status(200)
.body(r#"{"version": 42, "schema": { "fields": [] }}"#)
.unwrap()
if old_server {
http::Response::builder().status(200).body("{}").unwrap()
} else {
http::Response::builder()
.status(200)
.body(r#"{"version": 43}"#)
.unwrap()
}
} else {
panic!("Unexpected request path: {}", request.url().path());
}
@@ -1981,58 +1832,14 @@ mod tests {
.await
.unwrap();
assert_eq!(result.version, 42);
assert_eq!(result.version, if old_server { 0 } else { 43 });
}
#[rstest]
#[case(true)]
#[case(false)]
#[tokio::test]
async fn test_alter_columns() {
let table = Table::new_with_handler("my_table", |request| {
assert_eq!(request.method(), "POST");
assert_eq!(request.url().path(), "/v1/table/my_table/alter_columns/");
assert_eq!(
request.headers().get("Content-Type").unwrap(),
JSON_CONTENT_TYPE
);
let body = request.body().unwrap().as_bytes().unwrap();
let body = std::str::from_utf8(body).unwrap();
let value: serde_json::Value = serde_json::from_str(body).unwrap();
let alterations = value.get("alterations").unwrap().as_array().unwrap();
assert!(alterations.len() == 2);
let path = alterations[0]["path"].as_str().unwrap();
let data_type = alterations[0]["data_type"]["type"].as_str().unwrap();
assert_eq!(path, "b.c");
assert_eq!(data_type, "int32");
let path = alterations[1]["path"].as_str().unwrap();
let nullable = alterations[1]["nullable"].as_bool().unwrap();
let rename = alterations[1]["rename"].as_str().unwrap();
assert_eq!(path, "x");
assert!(nullable);
assert_eq!(rename, "y");
http::Response::builder()
.status(200)
.body(r#"{"version": 43}"#)
.unwrap()
});
let result = table
.alter_columns(&[
ColumnAlteration::new("b.c".into()).cast_to(DataType::Int32),
ColumnAlteration::new("x".into())
.rename("y".into())
.set_nullable(true),
])
.await
.unwrap();
assert_eq!(result.version, 43);
}
#[tokio::test]
async fn test_merge_insert_old_server() {
async fn test_merge_insert(#[case] old_server: bool) {
let batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
@@ -2043,8 +1850,7 @@ mod tests {
batch.schema(),
));
// Default parameters
let table = Table::new_with_handler("my_table", |request| {
let table = Table::new_with_handler("my_table", move |request| {
if request.url().path() == "/v1/table/my_table/merge_insert/" {
assert_eq!(request.method(), "POST");
@@ -2056,12 +1862,14 @@ mod tests {
assert!(!params.contains_key("when_matched_update_all_filt"));
assert!(!params.contains_key("when_not_matched_by_source_delete_filt"));
http::Response::builder().status(200).body("{}").unwrap()
} else if request.url().path() == "/v1/table/my_table/describe/" {
http::Response::builder()
.status(200)
.body(r#"{"version": 42, "schema": { "fields": [] }}"#)
.unwrap()
if old_server {
http::Response::builder().status(200).body("{}").unwrap()
} else {
http::Response::builder()
.status(200)
.body(r#"{"version": 43, "num_deleted_rows": 0, "num_inserted_rows": 3, "num_updated_rows": 0}"#)
.unwrap()
}
} else {
panic!("Unexpected request path: {}", request.url().path());
}
@@ -2073,53 +1881,12 @@ mod tests {
.await
.unwrap();
assert_eq!(result.version, 42);
assert_eq!(result.num_deleted_rows, 0);
assert_eq!(result.num_inserted_rows, 0);
assert_eq!(result.num_updated_rows, 0);
}
#[tokio::test]
async fn test_merge_insert() {
let batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)
.unwrap();
let data = Box::new(RecordBatchIterator::new(
[Ok(batch.clone())],
batch.schema(),
));
// Default parameters with new server behavior
let table = Table::new_with_handler("my_table", |request| {
assert_eq!(request.method(), "POST");
assert_eq!(request.url().path(), "/v1/table/my_table/merge_insert/");
let params = request.url().query_pairs().collect::<HashMap<_, _>>();
assert_eq!(params["on"], "some_col");
assert_eq!(params["when_matched_update_all"], "false");
assert_eq!(params["when_not_matched_insert_all"], "false");
assert_eq!(params["when_not_matched_by_source_delete"], "false");
assert!(!params.contains_key("when_matched_update_all_filt"));
assert!(!params.contains_key("when_not_matched_by_source_delete_filt"));
http::Response::builder()
.status(200)
.body(r#"{"version": 43, "num_deleted_rows": 0, "num_inserted_rows": 3, "num_updated_rows": 0}"#)
.unwrap()
});
let result = table
.merge_insert(&["some_col"])
.execute(data)
.await
.unwrap();
assert_eq!(result.version, 43);
assert_eq!(result.num_deleted_rows, 0);
assert_eq!(result.num_inserted_rows, 3);
assert_eq!(result.num_updated_rows, 0);
assert_eq!(result.version, if old_server { 0 } else { 43 });
if !old_server {
assert_eq!(result.num_deleted_rows, 0);
assert_eq!(result.num_inserted_rows, 3);
assert_eq!(result.num_updated_rows, 0);
}
}
#[tokio::test]
@@ -2158,9 +1925,12 @@ mod tests {
assert!(e.to_string().contains("Hit retry limit"));
}
#[rstest]
#[case(true)]
#[case(false)]
#[tokio::test]
async fn test_delete_old_server() {
let table = Table::new_with_handler("my_table", |request| {
async fn test_delete(#[case] old_server: bool) {
let table = Table::new_with_handler("my_table", move |request| {
if request.url().path() == "/v1/table/my_table/delete/" {
assert_eq!(request.method(), "POST");
assert_eq!(
@@ -2173,49 +1943,29 @@ mod tests {
let predicate = body.get("predicate").unwrap().as_str().unwrap();
assert_eq!(predicate, "id in (1, 2, 3)");
http::Response::builder().status(200).body("{}").unwrap()
} else if request.url().path() == "/v1/table/my_table/describe/" {
http::Response::builder()
.status(200)
.body(r#"{"version": 42, "schema": { "fields": [] }}"#)
.unwrap()
if old_server {
http::Response::builder().status(200).body("{}").unwrap()
} else {
http::Response::builder()
.status(200)
.body(r#"{"version": 43}"#)
.unwrap()
}
} else {
panic!("Unexpected request path: {}", request.url().path());
}
});
let result = table.delete("id in (1, 2, 3)").await.unwrap();
assert_eq!(result.version, 42);
assert_eq!(result.version, if old_server { 0 } else { 43 });
}
#[rstest]
#[case(true)]
#[case(false)]
#[tokio::test]
async fn test_delete() {
let table = Table::new_with_handler("my_table", |request| {
assert_eq!(request.method(), "POST");
assert_eq!(request.url().path(), "/v1/table/my_table/delete/");
assert_eq!(
request.headers().get("Content-Type").unwrap(),
JSON_CONTENT_TYPE
);
let body = request.body().unwrap().as_bytes().unwrap();
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
let predicate = body.get("predicate").unwrap().as_str().unwrap();
assert_eq!(predicate, "id in (1, 2, 3)");
http::Response::builder()
.status(200)
.body(r#"{"version": 43}"#)
.unwrap()
});
let result = table.delete("id in (1, 2, 3)").await.unwrap();
assert_eq!(result.version, 43);
}
#[tokio::test]
async fn test_drop_columns_old_server() {
let table = Table::new_with_handler("my_table", |request| {
async fn test_drop_columns(#[case] old_server: bool) {
let table = Table::new_with_handler("my_table", move |request| {
if request.url().path() == "/v1/table/my_table/drop_columns/" {
assert_eq!(request.method(), "POST");
assert_eq!(
@@ -2234,51 +1984,23 @@ mod tests {
assert_eq!(col1, "a");
assert_eq!(col2, "b");
http::Response::builder().status(200).body("{}").unwrap()
} else if request.url().path() == "/v1/table/my_table/describe/" {
http::Response::builder()
.status(200)
.body(r#"{"version": 42, "schema": { "fields": [] }}"#)
.unwrap()
if old_server {
http::Response::builder().status(200).body("{}").unwrap()
} else {
http::Response::builder()
.status(200)
.body(r#"{"version": 43}"#)
.unwrap()
}
} else {
panic!("Unexpected request path: {}", request.url().path());
}
});
let result = table.drop_columns(&["a", "b"]).await.unwrap();
assert_eq!(result.version, 42);
assert_eq!(result.version, if old_server { 0 } else { 43 });
}
#[tokio::test]
async fn test_drop_columns() {
let table = Table::new_with_handler("my_table", |request| {
assert_eq!(request.method(), "POST");
assert_eq!(request.url().path(), "/v1/table/my_table/drop_columns/");
assert_eq!(
request.headers().get("Content-Type").unwrap(),
JSON_CONTENT_TYPE
);
let body = request.body().unwrap().as_bytes().unwrap();
let body = std::str::from_utf8(body).unwrap();
let value: serde_json::Value = serde_json::from_str(body).unwrap();
let columns = value.get("columns").unwrap().as_array().unwrap();
assert!(columns.len() == 2);
let col1 = columns[0].as_str().unwrap();
let col2 = columns[1].as_str().unwrap();
assert_eq!(col1, "a");
assert_eq!(col2, "b");
http::Response::builder()
.status(200)
.body(r#"{"version": 43}"#)
.unwrap()
});
let result = table.drop_columns(&["a", "b"]).await.unwrap();
assert_eq!(result.version, 43);
}
#[tokio::test]
async fn test_query_plain() {
let expected_data = RecordBatch::try_new(
@@ -3093,9 +2815,12 @@ mod tests {
assert!(matches!(res, Err(Error::NotSupported { .. })));
}
#[rstest]
#[case(true)]
#[case(false)]
#[tokio::test]
async fn test_add_columns_old_server() {
let table = Table::new_with_handler("my_table", |request| {
async fn test_add_columns(#[case] old_server: bool) {
let table = Table::new_with_handler("my_table", move |request| {
if request.url().path() == "/v1/table/my_table/add_columns/" {
assert_eq!(request.method(), "POST");
assert_eq!(
@@ -3119,14 +2844,14 @@ mod tests {
assert_eq!(col_name, "x");
assert_eq!(expression, "cast(NULL as int32)");
// Return empty JSON object for old server behavior
http::Response::builder().status(200).body("{}").unwrap()
} else if request.url().path() == "/v1/table/my_table/describe/" {
// Handle describe call for backward compatibility
http::Response::builder()
.status(200)
.body(r#"{"version": 42, "schema": { "fields": [] }}"#)
.unwrap()
if old_server {
http::Response::builder().status(200).body("{}").unwrap()
} else {
http::Response::builder()
.status(200)
.body(r#"{"version": 43}"#)
.unwrap()
}
} else {
panic!("Unexpected request path: {}", request.url().path());
}
@@ -3143,53 +2868,7 @@ mod tests {
.await
.unwrap();
assert_eq!(result.version, 42);
}
#[tokio::test]
async fn test_add_columns() {
let table = Table::new_with_handler("my_table", |request| {
assert_eq!(request.method(), "POST");
assert_eq!(request.url().path(), "/v1/table/my_table/add_columns/");
assert_eq!(
request.headers().get("Content-Type").unwrap(),
JSON_CONTENT_TYPE
);
let body = request.body().unwrap().as_bytes().unwrap();
let body = std::str::from_utf8(body).unwrap();
let value: serde_json::Value = serde_json::from_str(body).unwrap();
let new_columns = value.get("new_columns").unwrap().as_array().unwrap();
assert!(new_columns.len() == 2);
let col_name = new_columns[0]["name"].as_str().unwrap();
let expression = new_columns[0]["expression"].as_str().unwrap();
assert_eq!(col_name, "b");
assert_eq!(expression, "a + 1");
let col_name = new_columns[1]["name"].as_str().unwrap();
let expression = new_columns[1]["expression"].as_str().unwrap();
assert_eq!(col_name, "x");
assert_eq!(expression, "cast(NULL as int32)");
http::Response::builder()
.status(200)
.body(r#"{"version": 43}"#)
.unwrap()
});
let result = table
.add_columns(
NewColumnTransform::SqlExpressions(vec![
("b".into(), "a + 1".into()),
("x".into(), "cast(NULL as int32)".into()),
]),
None,
)
.await
.unwrap();
assert_eq!(result.version, 43);
assert_eq!(result.version, if old_server { 0 } else { 43 });
}
#[tokio::test]

View File

@@ -426,21 +426,33 @@ pub trait Tags: Send + Sync {
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct UpdateResult {
pub rows_updated: u64,
// The commit version associated with the operation.
// A version of `0` indicates compatibility with legacy servers that do not return
/// a commit version.
pub version: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AddResult {
// The commit version associated with the operation.
// A version of `0` indicates compatibility with legacy servers that do not return
/// a commit version.
pub version: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct DeleteResult {
// The commit version associated with the operation.
// A version of `0` indicates compatibility with legacy servers that do not return
/// a commit version.
pub version: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct MergeResult {
// The commit version associated with the operation.
// A version of `0` indicates compatibility with legacy servers that do not return
/// a commit version.
pub version: u64,
/// Number of inserted rows (for user statistics)
pub num_inserted_rows: u64,
@@ -454,16 +466,25 @@ pub struct MergeResult {
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AddColumnsResult {
// The commit version associated with the operation.
// A version of `0` indicates compatibility with legacy servers that do not return
/// a commit version.
pub version: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AlterColumnsResult {
// The commit version associated with the operation.
// A version of `0` indicates compatibility with legacy servers that do not return
/// a commit version.
pub version: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct DropColumnsResult {
// The commit version associated with the operation.
// A version of `0` indicates compatibility with legacy servers that do not return
/// a commit version.
pub version: u64,
}