feat: expose dataset config (#2004)

Expose methods on NativeTable for updating schema metadata and dataset
config & getting the dataset config via the manifest.
This commit is contained in:
Bert
2025-01-08 21:13:18 -05:00
committed by GitHub
parent f4afe456e8
commit d3ea75cc2b

View File

@@ -14,6 +14,7 @@
//! LanceDB Table APIs
use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
@@ -48,6 +49,7 @@ use lance_index::vector::pq::PQBuildParams;
use lance_index::vector::sq::builder::SQBuildParams;
use lance_index::DatasetIndexExt;
use lance_index::IndexType;
use lance_table::format::Manifest;
use lance_table::io::commit::ManifestNamingScheme;
use log::info;
use serde::{Deserialize, Serialize};
@@ -1697,6 +1699,49 @@ impl NativeTable {
dataset.migrate_manifest_paths_v2().await?;
Ok(())
}
/// Get the table manifest
pub async fn manifest(&self) -> Result<Manifest> {
let dataset = self.dataset.get().await?;
Ok(dataset.manifest().clone())
}
/// Update key-value pairs in config.
pub async fn update_config(
&self,
upsert_values: impl IntoIterator<Item = (String, String)>,
) -> Result<()> {
let mut dataset = self.dataset.get_mut().await?;
dataset.update_config(upsert_values).await?;
Ok(())
}
/// Delete keys from the config
pub async fn delete_config_keys(&self, delete_keys: &[&str]) -> Result<()> {
let mut dataset = self.dataset.get_mut().await?;
dataset.delete_config_keys(delete_keys).await?;
Ok(())
}
/// Update schema metadata
pub async fn replace_schema_metadata(
&self,
upsert_values: impl IntoIterator<Item = (String, String)>,
) -> Result<()> {
let mut dataset = self.dataset.get_mut().await?;
dataset.replace_schema_metadata(upsert_values).await?;
Ok(())
}
/// Update field metadata
pub async fn replace_field_metadata(
&self,
new_values: impl IntoIterator<Item = (u32, HashMap<String, String>)>,
) -> Result<()> {
let mut dataset = self.dataset.get_mut().await?;
dataset.replace_field_metadata(new_values).await?;
Ok(())
}
}
#[async_trait::async_trait]
@@ -3308,4 +3353,197 @@ mod tests {
table.checkout(version).await.unwrap();
assert!(table.add(some_sample_data()).execute().await.is_err())
}
#[tokio::test]
async fn test_update_dataset_config() {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let conn = ConnectBuilder::new(uri)
.read_consistency_interval(Duration::from_secs(0))
.execute()
.await
.unwrap();
let table = conn
.create_table("my_table", some_sample_data())
.execute()
.await
.unwrap();
let native_tbl = table.as_native().unwrap();
let manifest = native_tbl.manifest().await.unwrap();
assert_eq!(manifest.config.len(), 0);
native_tbl
.update_config(vec![("test_key1".to_string(), "test_val1".to_string())])
.await
.unwrap();
let manifest = native_tbl.manifest().await.unwrap();
assert_eq!(manifest.config.len(), 1);
assert_eq!(
manifest.config.get("test_key1"),
Some(&"test_val1".to_string())
);
native_tbl
.update_config(vec![("test_key2".to_string(), "test_val2".to_string())])
.await
.unwrap();
let manifest = native_tbl.manifest().await.unwrap();
assert_eq!(manifest.config.len(), 2);
assert_eq!(
manifest.config.get("test_key1"),
Some(&"test_val1".to_string())
);
assert_eq!(
manifest.config.get("test_key2"),
Some(&"test_val2".to_string())
);
native_tbl
.update_config(vec![(
"test_key2".to_string(),
"test_val2_update".to_string(),
)])
.await
.unwrap();
let manifest = native_tbl.manifest().await.unwrap();
assert_eq!(manifest.config.len(), 2);
assert_eq!(
manifest.config.get("test_key1"),
Some(&"test_val1".to_string())
);
assert_eq!(
manifest.config.get("test_key2"),
Some(&"test_val2_update".to_string())
);
native_tbl.delete_config_keys(&["test_key1"]).await.unwrap();
let manifest = native_tbl.manifest().await.unwrap();
assert_eq!(manifest.config.len(), 1);
assert_eq!(
manifest.config.get("test_key2"),
Some(&"test_val2_update".to_string())
);
}
#[tokio::test]
async fn test_schema_metadata_config() {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let conn = ConnectBuilder::new(uri)
.read_consistency_interval(Duration::from_secs(0))
.execute()
.await
.unwrap();
let table = conn
.create_table("my_table", some_sample_data())
.execute()
.await
.unwrap();
let native_tbl = table.as_native().unwrap();
let schema = native_tbl.schema().await.unwrap();
let metadata = schema.metadata();
assert_eq!(metadata.len(), 0);
native_tbl
.replace_schema_metadata(vec![("test_key1".to_string(), "test_val1".to_string())])
.await
.unwrap();
let schema = native_tbl.schema().await.unwrap();
let metadata = schema.metadata();
assert_eq!(metadata.len(), 1);
assert_eq!(metadata.get("test_key1"), Some(&"test_val1".to_string()));
native_tbl
.replace_schema_metadata(vec![
("test_key1".to_string(), "test_val1_update".to_string()),
("test_key2".to_string(), "test_val2".to_string()),
])
.await
.unwrap();
let schema = native_tbl.schema().await.unwrap();
let metadata = schema.metadata();
assert_eq!(metadata.len(), 2);
assert_eq!(
metadata.get("test_key1"),
Some(&"test_val1_update".to_string())
);
assert_eq!(metadata.get("test_key2"), Some(&"test_val2".to_string()));
native_tbl
.replace_schema_metadata(vec![(
"test_key2".to_string(),
"test_val2_update".to_string(),
)])
.await
.unwrap();
let schema = native_tbl.schema().await.unwrap();
let metadata = schema.metadata();
assert_eq!(
metadata.get("test_key2"),
Some(&"test_val2_update".to_string())
);
}
#[tokio::test]
pub async fn test_field_metadata_update() {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let conn = ConnectBuilder::new(uri)
.read_consistency_interval(Duration::from_secs(0))
.execute()
.await
.unwrap();
let table = conn
.create_table("my_table", some_sample_data())
.execute()
.await
.unwrap();
let native_tbl = table.as_native().unwrap();
let schema = native_tbl.schema().await.unwrap();
let (field_idx, field) = schema.column_with_name("i").unwrap();
let field_metadata = field.metadata();
assert_eq!(field_metadata.len(), 0);
native_tbl
.replace_schema_metadata(vec![(
"test_key2".to_string(),
"test_val2_update".to_string(),
)])
.await
.unwrap();
let schema = native_tbl.schema().await.unwrap();
let metadata = schema.metadata();
assert_eq!(metadata.len(), 1);
assert_eq!(
metadata.get("test_key2"),
Some(&"test_val2_update".to_string())
);
let mut new_field_metadata = HashMap::<String, String>::new();
new_field_metadata.insert("test_field_key1".into(), "test_field_val1".into());
native_tbl
.replace_field_metadata(vec![(field_idx as u32, new_field_metadata)])
.await
.unwrap();
let schema = native_tbl.schema().await.unwrap();
let (_field_idx, field) = schema.column_with_name("i").unwrap();
let field_metadata = field.metadata();
assert_eq!(field_metadata.len(), 1);
assert_eq!(
field_metadata.get("test_field_key1"),
Some(&"test_field_val1".to_string())
);
}
}