From 42801bdb977c780e303f32016a5138abb83ff594 Mon Sep 17 00:00:00 2001 From: Brendan Clement Date: Mon, 1 Jun 2026 16:56:37 -0700 Subject: [PATCH] feat: add update_field_metadata for editing field metadata --- rust/lancedb/src/remote/table.rs | 53 +++++++++- rust/lancedb/src/table.rs | 33 +++++- rust/lancedb/src/table/schema_evolution.rs | 116 +++++++++++++++++++++ 3 files changed, 200 insertions(+), 2 deletions(-) diff --git a/rust/lancedb/src/remote/table.rs b/rust/lancedb/src/remote/table.rs index dc16b61c6..6dab22590 100644 --- a/rust/lancedb/src/remote/table.rs +++ b/rust/lancedb/src/remote/table.rs @@ -18,13 +18,13 @@ use crate::index::waiter::wait_for_index; use crate::query::{QueryFilter, QueryRequest, Select, VectorQueryRequest}; use crate::table::AddColumnsResult; use crate::table::AddResult; -use crate::table::AlterColumnsResult; use crate::table::DeleteResult; use crate::table::DropColumnsResult; use crate::table::MergeResult; use crate::table::Tags; use crate::table::UpdateResult; use crate::table::query::create_multi_vector_plan; +use crate::table::{AlterColumnsResult, FieldMetadataUpdate, UpdateFieldMetadataResult}; use crate::table::{AnyQuery, Filter, Predicate, PreprocessingOutput, TableStatistics}; use crate::utils::background_cache::BackgroundCache; use crate::utils::{ @@ -1968,6 +1968,35 @@ impl BaseTable for RemoteTable { Ok(result) } + async fn update_field_metadata( + &self, + updates: &[FieldMetadataUpdate], + ) -> Result { + self.check_mutable().await?; + let body = serde_json::json!({ "updates": updates }); + let request = self + .client + .post(&format!( + "/v1/table/{}/update_field_metadata/", + self.identifier + )) + .json(&body); + let (request_id, response) = self.send(request, true).await?; + let response = self.check_table_response(&request_id, response).await?; + let body = response.text().await.err_to_http(request_id.clone())?; + + let result: UpdateFieldMetadataResult = + serde_json::from_str(&body).map_err(|e| Error::Http { + source: format!("Failed to parse update_field_metadata response: {}", e).into(), + request_id, + status_code: None, + })?; + + self.invalidate_schema_cache(); + self.track_write_version(result.version); + Ok(result) + } + async fn drop_columns(&self, columns: &[&str]) -> Result { self.check_mutable().await?; let body = serde_json::json!({ "columns": columns }); @@ -2261,6 +2290,7 @@ mod tests { use crate::remote::client::{ClientConfig, RetryConfig}; use crate::table::AddDataMode; + use crate::table::FieldMetadataUpdate; use arrow::{array::AsArray, compute::concat_batches, datatypes::Int32Type}; use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator, record_batch}; @@ -6460,4 +6490,25 @@ mod tests { assert!(!headers.contains_key("x-lancedb-min-version")); assert!(!headers.contains_key("x-lancedb-min-timestamp")); } + + #[tokio::test] + async fn test_update_field_metadata() { + let table = Table::new_with_handler("my_table", |request| { + assert_eq!(request.method(), "POST"); + assert_eq!( + request.url().path(), + "/v1/table/my_table/update_field_metadata/" + ); + http::Response::builder() + .status(200) + .body(r#"{"version": 7, "fields": {"category": {"unit": "label"}}}"#) + .unwrap() + }); + + let result = table + .update_field_metadata(&[FieldMetadataUpdate::new("category").set("unit", "label")]) + .await + .unwrap(); + assert_eq!(result.version, 7); + } } diff --git a/rust/lancedb/src/table.rs b/rust/lancedb/src/table.rs index ca34bbdf3..0eb6b578a 100644 --- a/rust/lancedb/src/table.rs +++ b/rust/lancedb/src/table.rs @@ -91,7 +91,10 @@ pub use lance::dataset::scanner::DatasetRecordBatchStream; use lance::dataset::statistics::DatasetStatisticsExt; pub use lance_index::optimize::OptimizeOptions; pub use optimize::{CompactionOptions, OptimizeAction, OptimizeStats}; -pub use schema_evolution::{AddColumnsResult, AlterColumnsResult, DropColumnsResult}; +pub use schema_evolution::{ + AddColumnsResult, AlterColumnsResult, DropColumnsResult, FieldMetadataUpdate, + UpdateFieldMetadataResult, +}; use serde_with::skip_serializing_none; pub use update::{UpdateBuilder, UpdateResult}; @@ -660,6 +663,19 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync { message: "create_insert_exec not implemented".to_string(), }) } + /// Update per-field metadata. Merges into existing metadata by default; + /// [`FieldMetadataUpdate::remove`] deletes a key and + /// [`FieldMetadataUpdate::replace`] swaps the field's whole map. + /// + /// The default returns `NotSupported`; Lance-backed and remote tables override it. + async fn update_field_metadata( + &self, + _updates: &[FieldMetadataUpdate], + ) -> Result { + Err(Error::NotSupported { + message: "update_field_metadata is not supported on this table type".into(), + }) + } } /// A Table is a collection of strong typed Rows. @@ -1340,6 +1356,14 @@ impl Table { self.inner.alter_columns(alterations).await } + /// Update per-field metadata (merges by default). + pub async fn update_field_metadata( + &self, + updates: &[FieldMetadataUpdate], + ) -> Result { + self.inner.update_field_metadata(updates).await + } + /// Remove columns from the table. pub async fn drop_columns(&self, columns: &[&str]) -> Result { self.inner.drop_columns(columns).await @@ -2886,6 +2910,13 @@ impl BaseTable for NativeTable { schema_evolution::execute_alter_columns(self, alterations).await } + async fn update_field_metadata( + &self, + updates: &[FieldMetadataUpdate], + ) -> Result { + schema_evolution::execute_update_field_metadata(self, updates).await + } + async fn drop_columns(&self, columns: &[&str]) -> Result { schema_evolution::execute_drop_columns(self, columns).await } diff --git a/rust/lancedb/src/table/schema_evolution.rs b/rust/lancedb/src/table/schema_evolution.rs index c9bf9d7a8..52c8c191f 100644 --- a/rust/lancedb/src/table/schema_evolution.rs +++ b/rust/lancedb/src/table/schema_evolution.rs @@ -10,6 +10,7 @@ use lance::dataset::{ColumnAlteration, NewColumnTransform}; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use super::NativeTable; use crate::Result; @@ -44,6 +45,52 @@ pub struct DropColumnsResult { pub version: u64, } +/// A single field's metadata update, addressed by dot-path. +/// +/// Merges into the field's existing metadata by default. Use [`Self::remove`] to +/// delete a key, or [`Self::replace`] to swap the field's entire metadata map. +#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize)] +pub struct FieldMetadataUpdate { + /// Dot-separated path to the field (e.g. `"embedding"` or `"address.zip"`). + pub path: String, + /// Keys to set (`Some`) or delete (`None`). + pub metadata: HashMap>, + /// If `true`, replace the field's entire metadata map instead of merging. + pub replace: bool, +} + +impl FieldMetadataUpdate { + pub fn new(path: impl Into) -> Self { + Self { + path: path.into(), + metadata: HashMap::new(), + replace: false, + } + } + + pub fn set(mut self, key: impl Into, value: impl Into) -> Self { + self.metadata.insert(key.into(), Some(value.into())); + self + } + + pub fn remove(mut self, key: impl Into) -> Self { + self.metadata.insert(key.into(), None); + self + } + + pub fn replace(mut self) -> Self { + self.replace = true; + self + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] +pub struct UpdateFieldMetadataResult { + /// The commit version associated with the operation. + #[serde(default)] + pub version: u64, +} + /// Internal implementation of the add columns logic. /// /// Adds new columns to the table using the provided transforms. @@ -90,6 +137,32 @@ pub(crate) async fn execute_drop_columns( Ok(DropColumnsResult { version }) } +/// Internal implementation of the update field metadata logic. +/// +/// Merges or replaces per-field metadata, addressing fields by dot-path. +pub(crate) async fn execute_update_field_metadata( + table: &NativeTable, + updates: &[FieldMetadataUpdate], +) -> Result { + table.dataset.ensure_mutable()?; + let mut dataset = (*table.dataset.get().await?).clone(); + + let mut builder = dataset.update_field_metadata(); + for update in updates { + let entries = update.metadata.iter().map(|(k, v)| (k.clone(), v.clone())); + builder = if update.replace { + builder.replace(&update.path, entries)? + } else { + builder.update(&update.path, entries)? + }; + } + builder.await?; + + let version = dataset.version().version; + table.dataset.update(dataset); + Ok(UpdateFieldMetadataResult { version }) +} + #[cfg(test)] mod tests { use arrow_array::{Int32Array, StringArray, record_batch}; @@ -97,6 +170,7 @@ mod tests { use futures::TryStreamExt; use lance::dataset::ColumnAlteration; + use super::FieldMetadataUpdate; use crate::connect; use crate::query::{ExecutableQuery, QueryBase, Select}; use crate::table::NewColumnTransform; @@ -610,4 +684,46 @@ mod tests { let v4 = table.version().await.unwrap(); assert_eq!(drop_result.version, v4); } + + #[tokio::test] + async fn test_update_field_metadata() { + let conn = connect("memory://").execute().await.unwrap(); + let batch = record_batch!( + ("id", Int32, [1, 2, 3]), + ("category", Utf8, ["A", "B", "C"]) + ) + .unwrap(); + let table = conn + .create_table("test_update_field_metadata", batch) + .execute() + .await + .unwrap(); + + // Set metadata on a field. + table + .update_field_metadata(&[FieldMetadataUpdate::new("category") + .set("unit", "label") + .set("pii", "false")]) + .await + .unwrap(); + let schema = table.schema().await.unwrap(); + let field = schema.field_with_name("category").unwrap(); + assert_eq!( + field.metadata().get("unit").map(String::as_str), + Some("label") + ); + + // Merge: add a key, delete one, keep the rest. + table + .update_field_metadata(&[FieldMetadataUpdate::new("category") + .set("source", "import") + .remove("pii")]) + .await + .unwrap(); + let schema = table.schema().await.unwrap(); + let md = schema.field_with_name("category").unwrap().metadata(); + assert_eq!(md.get("unit").map(String::as_str), Some("label")); // preserved + assert_eq!(md.get("source").map(String::as_str), Some("import")); // added + assert!(!md.contains_key("pii")); // deleted + } }