From 695813463cc6e296e41c455f5c2537e50567b246 Mon Sep 17 00:00:00 2001 From: LuQQiu Date: Mon, 5 May 2025 16:47:19 -0700 Subject: [PATCH] chore: reduce unneeded API calls for return version for write operations and improve test (#2373) Reduce the duplicate code for remote write operation testing. Avoid double call to remote to get version info, just return 0 instead of suddenly adding extra API calls for end users when they are using old servers. ## Summary by CodeRabbit - **New Features** - Added version tracking to table operation results, allowing users to see the commit version associated with add, update, delete, merge, and column modification operations. - **Bug Fixes** - Improved compatibility with legacy servers by standardizing version information as zero when the server does not return a version. - **Documentation** - Clarified the meaning of the version field in operation results, especially for cases involving legacy server responses. - **Tests** - Enhanced test coverage to verify correct behavior with both legacy and modern server responses. --- rust/lancedb/src/remote/table.rs | 567 +++++++------------------------ rust/lancedb/src/table.rs | 21 ++ 2 files changed, 144 insertions(+), 444 deletions(-) diff --git a/rust/lancedb/src/remote/table.rs b/rust/lancedb/src/remote/table.rs index 11344964..81061fb4 100644 --- a/rust/lancedb/src/remote/table.rs +++ b/rust/lancedb/src/remote/table.rs @@ -761,8 +761,7 @@ impl BaseTable for RemoteTable { if body.trim().is_empty() || body == "{}" { // Backward compatible with old servers - let version = self.version().await?; - return Ok(AddResult { version }); + return Ok(AddResult { version: 0 }); } let add_response: AddResult = serde_json::from_str(&body).map_err(|e| Error::Http { @@ -925,10 +924,9 @@ impl BaseTable for RemoteTable { if body.trim().is_empty() || body == "{}" { // Backward compatible with old servers - let version = self.version().await?; return Ok(UpdateResult { rows_updated: 0, - version, + version: 0, }); } @@ -955,8 +953,7 @@ impl BaseTable for RemoteTable { if body == "{}" { // Backward compatible with old servers - let version = self.version().await?; - return Ok(DeleteResult { version }); + return Ok(DeleteResult { version: 0 }); } let delete_response: DeleteResult = @@ -1088,9 +1085,8 @@ impl BaseTable for RemoteTable { if body.trim().is_empty() || body == "{}" { // Backward compatible with old servers - let version = self.version().await?; return Ok(MergeResult { - version, + version: 0, num_deleted_rows: 0, num_inserted_rows: 0, num_updated_rows: 0, @@ -1151,8 +1147,7 @@ impl BaseTable for RemoteTable { if body.trim().is_empty() || body == "{}" { // Backward compatible with old servers - let version = self.version().await?; - return Ok(AddColumnsResult { version }); + return Ok(AddColumnsResult { version: 0 }); } let result: AddColumnsResult = @@ -1205,8 +1200,7 @@ impl BaseTable for RemoteTable { if body.trim().is_empty() || body == "{}" { // Backward compatible with old servers - let version = self.version().await?; - return Ok(AlterColumnsResult { version }); + return Ok(AlterColumnsResult { version: 0 }); } let result: AlterColumnsResult = serde_json::from_str(&body).map_err(|e| Error::Http { @@ -1231,8 +1225,7 @@ impl BaseTable for RemoteTable { if body.trim().is_empty() || body == "{}" { // Backward compatible with old servers - let version = self.version().await?; - return Ok(DropColumnsResult { version }); + return Ok(DropColumnsResult { version: 0 }); } let result: DropColumnsResult = serde_json::from_str(&body).map_err(|e| Error::Http { @@ -1609,8 +1602,11 @@ mod tests { body } + #[rstest] + #[case(true)] + #[case(false)] #[tokio::test] - async fn test_add_append_old_server() { + async fn test_add_append(#[case] old_server: bool) { let data = RecordBatch::try_new( Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])), vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], @@ -1636,14 +1632,14 @@ mod tests { std::mem::swap(request.body_mut().as_mut().unwrap(), &mut body_out); sender.send(body_out).unwrap(); - // Return empty JSON object for old server behavior - http::Response::builder().status(200).body("").unwrap() - } else if request.url().path() == "/v1/table/my_table/describe/" { - // Handle describe call for backward compatibility - http::Response::builder() - .status(200) - .body(r#"{"version": 42, "schema": { "fields": [] }}"#) - .unwrap() + if old_server { + http::Response::builder().status(200).body("").unwrap() + } else { + http::Response::builder() + .status(200) + .body(r#"{"version": 43}"#) + .unwrap() + } } else { panic!("Unexpected request path: {}", request.url().path()); } @@ -1655,7 +1651,7 @@ mod tests { .await .unwrap(); - assert_eq!(result.version, 42); + assert_eq!(result.version, if old_server { 0 } else { 43 }); let body = receiver.recv().unwrap(); let body = collect_body(body).await; @@ -1663,117 +1659,11 @@ mod tests { assert_eq!(&body, &expected_body); } + #[rstest] + #[case(true)] + #[case(false)] #[tokio::test] - async fn test_add_append() { - let data = RecordBatch::try_new( - Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])), - vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], - ) - .unwrap(); - - let (sender, receiver) = std::sync::mpsc::channel(); - let table = Table::new_with_handler("my_table", move |mut request| { - assert_eq!(request.method(), "POST"); - assert_eq!(request.url().path(), "/v1/table/my_table/insert/"); - // If mode is specified, it should be "append". Append is default - // so it's not required. - assert!(request - .url() - .query_pairs() - .filter(|(k, _)| k == "mode") - .all(|(_, v)| v == "append")); - - assert_eq!( - request.headers().get("Content-Type").unwrap(), - ARROW_STREAM_CONTENT_TYPE - ); - - let mut body_out = reqwest::Body::from(Vec::new()); - std::mem::swap(request.body_mut().as_mut().unwrap(), &mut body_out); - sender.send(body_out).unwrap(); - - http::Response::builder() - .status(200) - .body(r#"{"version": 43}"#) - .unwrap() - }); - - let result = table - .add(RecordBatchIterator::new([Ok(data.clone())], data.schema())) - .execute() - .await - .unwrap(); - - assert_eq!(result.version, 43); - - let body = receiver.recv().unwrap(); - let body = collect_body(body).await; - let expected_body = write_ipc_stream(&data); - assert_eq!(&body, &expected_body); - } - - #[tokio::test] - async fn test_add_overwrite_old_server() { - let data = RecordBatch::try_new( - Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])), - vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], - ) - .unwrap(); - - let (sender, receiver) = std::sync::mpsc::channel(); - let table = Table::new_with_handler("my_table", move |mut request| { - if request.url().path() == "/v1/table/my_table/insert/" { - assert_eq!(request.method(), "POST"); - assert_eq!( - request - .url() - .query_pairs() - .find(|(k, _)| k == "mode") - .map(|kv| kv.1) - .as_deref(), - Some("overwrite"), - "Expected mode=overwrite" - ); - - assert_eq!( - request.headers().get("Content-Type").unwrap(), - ARROW_STREAM_CONTENT_TYPE - ); - - let mut body_out = reqwest::Body::from(Vec::new()); - std::mem::swap(request.body_mut().as_mut().unwrap(), &mut body_out); - sender.send(body_out).unwrap(); - - // Return empty JSON object for old server behavior - http::Response::builder().status(200).body("{}").unwrap() - } else if request.url().path() == "/v1/table/my_table/describe/" { - // Handle describe call for backward compatibility - http::Response::builder() - .status(200) - .body(r#"{"version": 42, "schema": { "fields": [] }}"#) - .unwrap() - } else { - panic!("Unexpected request path: {}", request.url().path()); - } - }); - - let result = table - .add(RecordBatchIterator::new([Ok(data.clone())], data.schema())) - .mode(AddDataMode::Overwrite) - .execute() - .await - .unwrap(); - - assert_eq!(result.version, 42); - - let body = receiver.recv().unwrap(); - let body = collect_body(body).await; - let expected_body = write_ipc_stream(&data); - assert_eq!(&body, &expected_body); - } - - #[tokio::test] - async fn test_add_overwrite() { + async fn test_add_overwrite(#[case] old_server: bool) { let data = RecordBatch::try_new( Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])), vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], @@ -1804,10 +1694,14 @@ mod tests { std::mem::swap(request.body_mut().as_mut().unwrap(), &mut body_out); sender.send(body_out).unwrap(); - http::Response::builder() - .status(200) - .body(r#"{"version": 43}"#) - .unwrap() + if old_server { + http::Response::builder().status(200).body("").unwrap() + } else { + http::Response::builder() + .status(200) + .body(r#"{"version": 43}"#) + .unwrap() + } }); let result = table @@ -1817,7 +1711,7 @@ mod tests { .await .unwrap(); - assert_eq!(result.version, 43); + assert_eq!(result.version, if old_server { 0 } else { 43 }); let body = receiver.recv().unwrap(); let body = collect_body(body).await; @@ -1825,9 +1719,12 @@ mod tests { assert_eq!(&body, &expected_body); } + #[rstest] + #[case(true)] + #[case(false)] #[tokio::test] - async fn test_update_old_server() { - let table = Table::new_with_handler("my_table", |request| { + async fn test_update(#[case] old_server: bool) { + let table = Table::new_with_handler("my_table", move |request| { if request.url().path() == "/v1/table/my_table/update/" { assert_eq!(request.method(), "POST"); assert_eq!( @@ -1855,14 +1752,14 @@ mod tests { assert_eq!(only_if, "b > 10"); } - // Return empty JSON object (old server behavior) - http::Response::builder().status(200).body("{}").unwrap() - } else if request.url().path() == "/v1/table/my_table/describe/" { - // Handle the describe request for version lookup - http::Response::builder() - .status(200) - .body(r#"{"version": 42, "schema": { "fields": [] }}"#) - .unwrap() + if old_server { + http::Response::builder().status(200).body("").unwrap() + } else { + http::Response::builder() + .status(200) + .body(r#"{"rows_updated": 5, "version": 43}"#) + .unwrap() + } } else { panic!("Unexpected request path: {}", request.url().path()); } @@ -1877,64 +1774,16 @@ mod tests { .await .unwrap(); - assert_eq!(result.version, 42); - assert_eq!(result.rows_updated, 0); + assert_eq!(result.version, if old_server { 0 } else { 43 }); + assert_eq!(result.rows_updated, if old_server { 0 } else { 5 }); } + #[rstest] + #[case(true)] + #[case(false)] #[tokio::test] - async fn test_update() { - let table = Table::new_with_handler("my_table", |request| { - assert_eq!(request.method(), "POST"); - assert_eq!(request.url().path(), "/v1/table/my_table/update/"); - assert_eq!( - request.headers().get("Content-Type").unwrap(), - JSON_CONTENT_TYPE - ); - - if let Some(body) = request.body().unwrap().as_bytes() { - let body = std::str::from_utf8(body).unwrap(); - let value: serde_json::Value = serde_json::from_str(body).unwrap(); - let updates = value.get("updates").unwrap().as_array().unwrap(); - assert!(updates.len() == 2); - - let col_name = updates[0][0].as_str().unwrap(); - let expression = updates[0][1].as_str().unwrap(); - assert_eq!(col_name, "a"); - assert_eq!(expression, "a + 1"); - - let col_name = updates[1][0].as_str().unwrap(); - let expression = updates[1][1].as_str().unwrap(); - assert_eq!(col_name, "b"); - assert_eq!(expression, "b - 1"); - - let only_if = value.get("predicate").unwrap().as_str().unwrap(); - assert_eq!(only_if, "b > 10"); - } - - // Return structured response (new server behavior) - http::Response::builder() - .status(200) - .body(r#"{"rows_updated": 5, "version": 43}"#) - .unwrap() - }); - - let result = table - .update() - .column("a", "a + 1") - .column("b", "b - 1") - .only_if("b > 10") - .execute() - .await - .unwrap(); - - // Verify result for new behavior - assert_eq!(result.rows_updated, 5); // From structured response - assert_eq!(result.version, 43); // From structured response - } - - #[tokio::test] - async fn test_alter_columns_old_server() { - let table = Table::new_with_handler("my_table", |request| { + async fn test_alter_columns(#[case] old_server: bool) { + let table = Table::new_with_handler("my_table", move |request| { if request.url().path() == "/v1/table/my_table/alter_columns/" { assert_eq!(request.method(), "POST"); assert_eq!( @@ -1960,12 +1809,14 @@ mod tests { assert!(nullable); assert_eq!(rename, "y"); - http::Response::builder().status(200).body("{}").unwrap() - } else if request.url().path() == "/v1/table/my_table/describe/" { - http::Response::builder() - .status(200) - .body(r#"{"version": 42, "schema": { "fields": [] }}"#) - .unwrap() + if old_server { + http::Response::builder().status(200).body("{}").unwrap() + } else { + http::Response::builder() + .status(200) + .body(r#"{"version": 43}"#) + .unwrap() + } } else { panic!("Unexpected request path: {}", request.url().path()); } @@ -1981,58 +1832,14 @@ mod tests { .await .unwrap(); - assert_eq!(result.version, 42); + assert_eq!(result.version, if old_server { 0 } else { 43 }); } + #[rstest] + #[case(true)] + #[case(false)] #[tokio::test] - async fn test_alter_columns() { - let table = Table::new_with_handler("my_table", |request| { - assert_eq!(request.method(), "POST"); - assert_eq!(request.url().path(), "/v1/table/my_table/alter_columns/"); - assert_eq!( - request.headers().get("Content-Type").unwrap(), - JSON_CONTENT_TYPE - ); - - let body = request.body().unwrap().as_bytes().unwrap(); - let body = std::str::from_utf8(body).unwrap(); - let value: serde_json::Value = serde_json::from_str(body).unwrap(); - let alterations = value.get("alterations").unwrap().as_array().unwrap(); - assert!(alterations.len() == 2); - - let path = alterations[0]["path"].as_str().unwrap(); - let data_type = alterations[0]["data_type"]["type"].as_str().unwrap(); - assert_eq!(path, "b.c"); - assert_eq!(data_type, "int32"); - - let path = alterations[1]["path"].as_str().unwrap(); - let nullable = alterations[1]["nullable"].as_bool().unwrap(); - let rename = alterations[1]["rename"].as_str().unwrap(); - assert_eq!(path, "x"); - assert!(nullable); - assert_eq!(rename, "y"); - - http::Response::builder() - .status(200) - .body(r#"{"version": 43}"#) - .unwrap() - }); - - let result = table - .alter_columns(&[ - ColumnAlteration::new("b.c".into()).cast_to(DataType::Int32), - ColumnAlteration::new("x".into()) - .rename("y".into()) - .set_nullable(true), - ]) - .await - .unwrap(); - - assert_eq!(result.version, 43); - } - - #[tokio::test] - async fn test_merge_insert_old_server() { + async fn test_merge_insert(#[case] old_server: bool) { let batch = RecordBatch::try_new( Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])), vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], @@ -2043,8 +1850,7 @@ mod tests { batch.schema(), )); - // Default parameters - let table = Table::new_with_handler("my_table", |request| { + let table = Table::new_with_handler("my_table", move |request| { if request.url().path() == "/v1/table/my_table/merge_insert/" { assert_eq!(request.method(), "POST"); @@ -2056,12 +1862,14 @@ mod tests { assert!(!params.contains_key("when_matched_update_all_filt")); assert!(!params.contains_key("when_not_matched_by_source_delete_filt")); - http::Response::builder().status(200).body("{}").unwrap() - } else if request.url().path() == "/v1/table/my_table/describe/" { - http::Response::builder() - .status(200) - .body(r#"{"version": 42, "schema": { "fields": [] }}"#) - .unwrap() + if old_server { + http::Response::builder().status(200).body("{}").unwrap() + } else { + http::Response::builder() + .status(200) + .body(r#"{"version": 43, "num_deleted_rows": 0, "num_inserted_rows": 3, "num_updated_rows": 0}"#) + .unwrap() + } } else { panic!("Unexpected request path: {}", request.url().path()); } @@ -2073,53 +1881,12 @@ mod tests { .await .unwrap(); - assert_eq!(result.version, 42); - assert_eq!(result.num_deleted_rows, 0); - assert_eq!(result.num_inserted_rows, 0); - assert_eq!(result.num_updated_rows, 0); - } - - #[tokio::test] - async fn test_merge_insert() { - let batch = RecordBatch::try_new( - Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])), - vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], - ) - .unwrap(); - let data = Box::new(RecordBatchIterator::new( - [Ok(batch.clone())], - batch.schema(), - )); - - // Default parameters with new server behavior - let table = Table::new_with_handler("my_table", |request| { - assert_eq!(request.method(), "POST"); - assert_eq!(request.url().path(), "/v1/table/my_table/merge_insert/"); - - let params = request.url().query_pairs().collect::>(); - assert_eq!(params["on"], "some_col"); - assert_eq!(params["when_matched_update_all"], "false"); - assert_eq!(params["when_not_matched_insert_all"], "false"); - assert_eq!(params["when_not_matched_by_source_delete"], "false"); - assert!(!params.contains_key("when_matched_update_all_filt")); - assert!(!params.contains_key("when_not_matched_by_source_delete_filt")); - - http::Response::builder() - .status(200) - .body(r#"{"version": 43, "num_deleted_rows": 0, "num_inserted_rows": 3, "num_updated_rows": 0}"#) - .unwrap() - }); - - let result = table - .merge_insert(&["some_col"]) - .execute(data) - .await - .unwrap(); - - assert_eq!(result.version, 43); - assert_eq!(result.num_deleted_rows, 0); - assert_eq!(result.num_inserted_rows, 3); - assert_eq!(result.num_updated_rows, 0); + assert_eq!(result.version, if old_server { 0 } else { 43 }); + if !old_server { + assert_eq!(result.num_deleted_rows, 0); + assert_eq!(result.num_inserted_rows, 3); + assert_eq!(result.num_updated_rows, 0); + } } #[tokio::test] @@ -2158,9 +1925,12 @@ mod tests { assert!(e.to_string().contains("Hit retry limit")); } + #[rstest] + #[case(true)] + #[case(false)] #[tokio::test] - async fn test_delete_old_server() { - let table = Table::new_with_handler("my_table", |request| { + async fn test_delete(#[case] old_server: bool) { + let table = Table::new_with_handler("my_table", move |request| { if request.url().path() == "/v1/table/my_table/delete/" { assert_eq!(request.method(), "POST"); assert_eq!( @@ -2173,49 +1943,29 @@ mod tests { let predicate = body.get("predicate").unwrap().as_str().unwrap(); assert_eq!(predicate, "id in (1, 2, 3)"); - http::Response::builder().status(200).body("{}").unwrap() - } else if request.url().path() == "/v1/table/my_table/describe/" { - http::Response::builder() - .status(200) - .body(r#"{"version": 42, "schema": { "fields": [] }}"#) - .unwrap() + if old_server { + http::Response::builder().status(200).body("{}").unwrap() + } else { + http::Response::builder() + .status(200) + .body(r#"{"version": 43}"#) + .unwrap() + } } else { panic!("Unexpected request path: {}", request.url().path()); } }); let result = table.delete("id in (1, 2, 3)").await.unwrap(); - assert_eq!(result.version, 42); + assert_eq!(result.version, if old_server { 0 } else { 43 }); } + #[rstest] + #[case(true)] + #[case(false)] #[tokio::test] - async fn test_delete() { - let table = Table::new_with_handler("my_table", |request| { - assert_eq!(request.method(), "POST"); - assert_eq!(request.url().path(), "/v1/table/my_table/delete/"); - assert_eq!( - request.headers().get("Content-Type").unwrap(), - JSON_CONTENT_TYPE - ); - - let body = request.body().unwrap().as_bytes().unwrap(); - let body: serde_json::Value = serde_json::from_slice(body).unwrap(); - let predicate = body.get("predicate").unwrap().as_str().unwrap(); - assert_eq!(predicate, "id in (1, 2, 3)"); - - http::Response::builder() - .status(200) - .body(r#"{"version": 43}"#) - .unwrap() - }); - - let result = table.delete("id in (1, 2, 3)").await.unwrap(); - assert_eq!(result.version, 43); - } - - #[tokio::test] - async fn test_drop_columns_old_server() { - let table = Table::new_with_handler("my_table", |request| { + async fn test_drop_columns(#[case] old_server: bool) { + let table = Table::new_with_handler("my_table", move |request| { if request.url().path() == "/v1/table/my_table/drop_columns/" { assert_eq!(request.method(), "POST"); assert_eq!( @@ -2234,51 +1984,23 @@ mod tests { assert_eq!(col1, "a"); assert_eq!(col2, "b"); - http::Response::builder().status(200).body("{}").unwrap() - } else if request.url().path() == "/v1/table/my_table/describe/" { - http::Response::builder() - .status(200) - .body(r#"{"version": 42, "schema": { "fields": [] }}"#) - .unwrap() + if old_server { + http::Response::builder().status(200).body("{}").unwrap() + } else { + http::Response::builder() + .status(200) + .body(r#"{"version": 43}"#) + .unwrap() + } } else { panic!("Unexpected request path: {}", request.url().path()); } }); let result = table.drop_columns(&["a", "b"]).await.unwrap(); - assert_eq!(result.version, 42); + assert_eq!(result.version, if old_server { 0 } else { 43 }); } - #[tokio::test] - async fn test_drop_columns() { - let table = Table::new_with_handler("my_table", |request| { - assert_eq!(request.method(), "POST"); - assert_eq!(request.url().path(), "/v1/table/my_table/drop_columns/"); - assert_eq!( - request.headers().get("Content-Type").unwrap(), - JSON_CONTENT_TYPE - ); - - let body = request.body().unwrap().as_bytes().unwrap(); - let body = std::str::from_utf8(body).unwrap(); - let value: serde_json::Value = serde_json::from_str(body).unwrap(); - let columns = value.get("columns").unwrap().as_array().unwrap(); - assert!(columns.len() == 2); - - let col1 = columns[0].as_str().unwrap(); - let col2 = columns[1].as_str().unwrap(); - assert_eq!(col1, "a"); - assert_eq!(col2, "b"); - - http::Response::builder() - .status(200) - .body(r#"{"version": 43}"#) - .unwrap() - }); - - let result = table.drop_columns(&["a", "b"]).await.unwrap(); - assert_eq!(result.version, 43); - } #[tokio::test] async fn test_query_plain() { let expected_data = RecordBatch::try_new( @@ -3093,9 +2815,12 @@ mod tests { assert!(matches!(res, Err(Error::NotSupported { .. }))); } + #[rstest] + #[case(true)] + #[case(false)] #[tokio::test] - async fn test_add_columns_old_server() { - let table = Table::new_with_handler("my_table", |request| { + async fn test_add_columns(#[case] old_server: bool) { + let table = Table::new_with_handler("my_table", move |request| { if request.url().path() == "/v1/table/my_table/add_columns/" { assert_eq!(request.method(), "POST"); assert_eq!( @@ -3119,14 +2844,14 @@ mod tests { assert_eq!(col_name, "x"); assert_eq!(expression, "cast(NULL as int32)"); - // Return empty JSON object for old server behavior - http::Response::builder().status(200).body("{}").unwrap() - } else if request.url().path() == "/v1/table/my_table/describe/" { - // Handle describe call for backward compatibility - http::Response::builder() - .status(200) - .body(r#"{"version": 42, "schema": { "fields": [] }}"#) - .unwrap() + if old_server { + http::Response::builder().status(200).body("{}").unwrap() + } else { + http::Response::builder() + .status(200) + .body(r#"{"version": 43}"#) + .unwrap() + } } else { panic!("Unexpected request path: {}", request.url().path()); } @@ -3143,53 +2868,7 @@ mod tests { .await .unwrap(); - assert_eq!(result.version, 42); - } - - #[tokio::test] - async fn test_add_columns() { - let table = Table::new_with_handler("my_table", |request| { - assert_eq!(request.method(), "POST"); - assert_eq!(request.url().path(), "/v1/table/my_table/add_columns/"); - assert_eq!( - request.headers().get("Content-Type").unwrap(), - JSON_CONTENT_TYPE - ); - - let body = request.body().unwrap().as_bytes().unwrap(); - let body = std::str::from_utf8(body).unwrap(); - let value: serde_json::Value = serde_json::from_str(body).unwrap(); - let new_columns = value.get("new_columns").unwrap().as_array().unwrap(); - assert!(new_columns.len() == 2); - - let col_name = new_columns[0]["name"].as_str().unwrap(); - let expression = new_columns[0]["expression"].as_str().unwrap(); - assert_eq!(col_name, "b"); - assert_eq!(expression, "a + 1"); - - let col_name = new_columns[1]["name"].as_str().unwrap(); - let expression = new_columns[1]["expression"].as_str().unwrap(); - assert_eq!(col_name, "x"); - assert_eq!(expression, "cast(NULL as int32)"); - - http::Response::builder() - .status(200) - .body(r#"{"version": 43}"#) - .unwrap() - }); - - let result = table - .add_columns( - NewColumnTransform::SqlExpressions(vec![ - ("b".into(), "a + 1".into()), - ("x".into(), "cast(NULL as int32)".into()), - ]), - None, - ) - .await - .unwrap(); - - assert_eq!(result.version, 43); + assert_eq!(result.version, if old_server { 0 } else { 43 }); } #[tokio::test] diff --git a/rust/lancedb/src/table.rs b/rust/lancedb/src/table.rs index 704c48f1..2ce192c0 100644 --- a/rust/lancedb/src/table.rs +++ b/rust/lancedb/src/table.rs @@ -426,21 +426,33 @@ pub trait Tags: Send + Sync { #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct UpdateResult { pub rows_updated: u64, + // The commit version associated with the operation. + // A version of `0` indicates compatibility with legacy servers that do not return + /// a commit version. pub version: u64, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct AddResult { + // The commit version associated with the operation. + // A version of `0` indicates compatibility with legacy servers that do not return + /// a commit version. pub version: u64, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct DeleteResult { + // The commit version associated with the operation. + // A version of `0` indicates compatibility with legacy servers that do not return + /// a commit version. pub version: u64, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct MergeResult { + // The commit version associated with the operation. + // A version of `0` indicates compatibility with legacy servers that do not return + /// a commit version. pub version: u64, /// Number of inserted rows (for user statistics) pub num_inserted_rows: u64, @@ -454,16 +466,25 @@ pub struct MergeResult { #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct AddColumnsResult { + // The commit version associated with the operation. + // A version of `0` indicates compatibility with legacy servers that do not return + /// a commit version. pub version: u64, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct AlterColumnsResult { + // The commit version associated with the operation. + // A version of `0` indicates compatibility with legacy servers that do not return + /// a commit version. pub version: u64, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct DropColumnsResult { + // The commit version associated with the operation. + // A version of `0` indicates compatibility with legacy servers that do not return + /// a commit version. pub version: u64, }