feat: schema evolution APIs in all SDKs (#1851)

* Support `add_columns`, `alter_columns`, `drop_columns` in Remote SDK
and async Python
* Add `data_type` parameter to node
* Docs updates
This commit is contained in:
Will Jones
2024-12-04 14:47:50 -08:00
committed by GitHub
parent bd82e1f66d
commit 79eaa52184
10 changed files with 535 additions and 44 deletions

View File

@@ -9,7 +9,7 @@ use crate::utils::{supported_btree_data_type, supported_vector_data_type};
use crate::{Error, Table};
use arrow_array::RecordBatchReader;
use arrow_ipc::reader::FileReader;
use arrow_schema::{DataType, SchemaRef};
use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef};
use async_trait::async_trait;
use datafusion_common::DataFusionError;
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
@@ -643,25 +643,85 @@ impl<S: HttpSend> TableInternal for RemoteTable<S> {
}
async fn add_columns(
&self,
_transforms: NewColumnTransform,
transforms: NewColumnTransform,
_read_columns: Option<Vec<String>>,
) -> Result<()> {
self.check_mutable().await?;
Err(Error::NotSupported {
message: "add_columns is not yet supported.".into(),
})
match transforms {
NewColumnTransform::SqlExpressions(expressions) => {
let body = expressions
.into_iter()
.map(|(name, expression)| {
serde_json::json!({
"name": name,
"expression": expression,
})
})
.collect::<Vec<_>>();
let body = serde_json::json!({ "new_columns": body });
let request = self
.client
.post(&format!("/v1/table/{}/add_columns/", self.name))
.json(&body);
let (request_id, response) = self.client.send(request, false).await?;
self.check_table_response(&request_id, response).await?;
Ok(())
}
_ => {
return Err(Error::NotSupported {
message: "Only SQL expressions are supported for adding columns".into(),
});
}
}
}
async fn alter_columns(&self, _alterations: &[ColumnAlteration]) -> Result<()> {
async fn alter_columns(&self, alterations: &[ColumnAlteration]) -> Result<()> {
self.check_mutable().await?;
Err(Error::NotSupported {
message: "alter_columns is not yet supported.".into(),
})
let body = alterations
.iter()
.map(|alteration| {
let mut value = serde_json::json!({
"path": alteration.path,
});
if let Some(rename) = &alteration.rename {
value["rename"] = serde_json::Value::String(rename.clone());
}
if let Some(data_type) = &alteration.data_type {
// TODO: we can later simplify this substantially, after getting:
// https://github.com/lancedb/lance/pull/3161
let dummy_schema =
ArrowSchema::new(vec![ArrowField::new("dummy", data_type.clone(), false)]);
let json_schema = JsonSchema::try_from(&dummy_schema).unwrap();
let json_string = serde_json::to_string(&json_schema).unwrap();
let json_value: serde_json::Value = serde_json::from_str(&json_string).unwrap();
value["data_type"] = json_value["fields"][0]["type"].clone();
}
if let Some(nullable) = &alteration.nullable {
value["nullable"] = serde_json::Value::Bool(*nullable);
}
value
})
.collect::<Vec<_>>();
let body = serde_json::json!({ "alterations": body });
let request = self
.client
.post(&format!("/v1/table/{}/alter_columns/", self.name))
.json(&body);
let (request_id, response) = self.client.send(request, false).await?;
self.check_table_response(&request_id, response).await?;
Ok(())
}
async fn drop_columns(&self, _columns: &[&str]) -> Result<()> {
async fn drop_columns(&self, columns: &[&str]) -> Result<()> {
self.check_mutable().await?;
Err(Error::NotSupported {
message: "drop_columns is not yet supported.".into(),
})
let body = serde_json::json!({ "columns": columns });
let request = self
.client
.post(&format!("/v1/table/{}/drop_columns/", self.name))
.json(&body);
let (request_id, response) = self.client.send(request, false).await?;
self.check_table_response(&request_id, response).await?;
Ok(())
}
async fn list_indices(&self) -> Result<Vec<IndexConfig>> {
@@ -844,7 +904,17 @@ mod tests {
Box::pin(table.update().column("a", "a + 1").execute().map_ok(|_| ())),
Box::pin(table.add(example_data()).execute().map_ok(|_| ())),
Box::pin(table.merge_insert(&["test"]).execute(example_data())),
Box::pin(table.delete("false")), // TODO: other endpoints.
Box::pin(table.delete("false")),
Box::pin(table.add_columns(
NewColumnTransform::SqlExpressions(vec![("x".into(), "y".into())]),
None,
)),
Box::pin(async {
let alterations = vec![ColumnAlteration::new("x".into()).rename("y".into())];
table.alter_columns(&alterations).await
}),
Box::pin(table.drop_columns(&["a"])),
// TODO: other endpoints.
];
for result in results {
@@ -1799,4 +1869,114 @@ mod tests {
.await;
assert!(matches!(res, Err(Error::NotSupported { .. })));
}
#[tokio::test]
async fn test_add_columns() {
let table = Table::new_with_handler("my_table", |request| {
assert_eq!(request.method(), "POST");
assert_eq!(request.url().path(), "/v1/table/my_table/add_columns/");
assert_eq!(
request.headers().get("Content-Type").unwrap(),
JSON_CONTENT_TYPE
);
let body = request.body().unwrap().as_bytes().unwrap();
let body = std::str::from_utf8(body).unwrap();
let value: serde_json::Value = serde_json::from_str(body).unwrap();
let new_columns = value.get("new_columns").unwrap().as_array().unwrap();
assert!(new_columns.len() == 2);
let col_name = new_columns[0]["name"].as_str().unwrap();
let expression = new_columns[0]["expression"].as_str().unwrap();
assert_eq!(col_name, "b");
assert_eq!(expression, "a + 1");
let col_name = new_columns[1]["name"].as_str().unwrap();
let expression = new_columns[1]["expression"].as_str().unwrap();
assert_eq!(col_name, "x");
assert_eq!(expression, "cast(NULL as int32)");
http::Response::builder().status(200).body("{}").unwrap()
});
table
.add_columns(
NewColumnTransform::SqlExpressions(vec![
("b".into(), "a + 1".into()),
("x".into(), "cast(NULL as int32)".into()),
]),
None,
)
.await
.unwrap();
}
#[tokio::test]
async fn test_alter_columns() {
let table = Table::new_with_handler("my_table", |request| {
assert_eq!(request.method(), "POST");
assert_eq!(request.url().path(), "/v1/table/my_table/alter_columns/");
assert_eq!(
request.headers().get("Content-Type").unwrap(),
JSON_CONTENT_TYPE
);
let body = request.body().unwrap().as_bytes().unwrap();
let body = std::str::from_utf8(body).unwrap();
let value: serde_json::Value = serde_json::from_str(body).unwrap();
let alterations = value.get("alterations").unwrap().as_array().unwrap();
assert!(alterations.len() == 2);
let path = alterations[0]["path"].as_str().unwrap();
let data_type = alterations[0]["data_type"]["type"].as_str().unwrap();
assert_eq!(path, "b.c");
assert_eq!(data_type, "int32");
let path = alterations[1]["path"].as_str().unwrap();
let nullable = alterations[1]["nullable"].as_bool().unwrap();
let rename = alterations[1]["rename"].as_str().unwrap();
assert_eq!(path, "x");
assert!(nullable);
assert_eq!(rename, "y");
http::Response::builder().status(200).body("{}").unwrap()
});
table
.alter_columns(&[
ColumnAlteration::new("b.c".into()).cast_to(DataType::Int32),
ColumnAlteration::new("x".into())
.rename("y".into())
.set_nullable(true),
])
.await
.unwrap();
}
#[tokio::test]
async fn test_drop_columns() {
let table = Table::new_with_handler("my_table", |request| {
assert_eq!(request.method(), "POST");
assert_eq!(request.url().path(), "/v1/table/my_table/drop_columns/");
assert_eq!(
request.headers().get("Content-Type").unwrap(),
JSON_CONTENT_TYPE
);
let body = request.body().unwrap().as_bytes().unwrap();
let body = std::str::from_utf8(body).unwrap();
let value: serde_json::Value = serde_json::from_str(body).unwrap();
let columns = value.get("columns").unwrap().as_array().unwrap();
assert!(columns.len() == 2);
let col1 = columns[0].as_str().unwrap();
let col2 = columns[1].as_str().unwrap();
assert_eq!(col1, "a");
assert_eq!(col2, "b");
http::Response::builder().status(200).body("{}").unwrap()
});
table.drop_columns(&["a", "b"]).await.unwrap();
}
}

View File

@@ -15,6 +15,7 @@
use std::sync::Arc;
use arrow_schema::{DataType, Schema};
use lance::arrow::json::JsonSchema;
use lance::dataset::{ReadParams, WriteParams};
use lance::io::{ObjectStoreParams, WrappingObjectStore};
use lazy_static::lazy_static;
@@ -175,6 +176,25 @@ pub fn supported_vector_data_type(dtype: &DataType) -> bool {
}
}
/// Note: this is temporary until we get a proper datatype conversion in Lance.
pub fn string_to_datatype(s: &str) -> Option<DataType> {
// TODO: we can later simplify this substantially, after getting:
// https://github.com/lancedb/lance/pull/3161
let dummy_schema = format!(
"{{\"fields\": [\
{{ \"name\": \"n\", \
\"nullable\": true, \
\"type\": {{\
\"type\": \"{}\"\
}} }}] }}",
s
);
let json_schema: JsonSchema = serde_json::from_str(&dummy_schema).ok()?;
let schema = Schema::try_from(json_schema).ok()?;
let data_type = schema.field(0).data_type().clone();
Some(data_type)
}
#[cfg(test)]
mod tests {
use super::*;