From 39d3e0651d0d3fa1661f618827228e7f31af39c2 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Mon, 23 Jun 2025 15:11:20 +0800 Subject: [PATCH] feat: Support ListMetadataRequest to retrieve regions' metadata (#6348) * feat: support list metadata in region server Signed-off-by: evenyag * test: add test for list region metadata Signed-off-by: evenyag * feat: return null if region not exists Signed-off-by: evenyag * chore: update greptime-proto Signed-off-by: evenyag --------- Signed-off-by: evenyag --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/api/src/region.rs | 12 + .../src/ddl/test_util/datanode_handler.rs | 1 + src/datanode/src/error.rs | 9 + src/datanode/src/region_server.rs | 252 +++++++++++++++++- src/datanode/src/tests.rs | 33 ++- src/meta-srv/src/procedure/utils.rs | 1 + src/metric-engine/src/engine.rs | 4 + src/store-api/src/region_engine.rs | 1 + src/store-api/src/region_request.rs | 4 + 11 files changed, 306 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c41aa4d8e2..fbe70027a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5144,7 +5144,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=82fe5c6282f623c185b86f03e898ee8952e50cf9#82fe5c6282f623c185b86f03e898ee8952e50cf9" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=819a9495507c7e33186b70814dc356a14e1ffb45#819a9495507c7e33186b70814dc356a14e1ffb45" dependencies = [ "prost 0.13.5", "serde", diff --git a/Cargo.toml b/Cargo.toml index 5f17e7f722..4af4c93814 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -134,7 +134,7 @@ etcd-client = "0.14" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "82fe5c6282f623c185b86f03e898ee8952e50cf9" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "819a9495507c7e33186b70814dc356a14e1ffb45" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/api/src/region.rs b/src/api/src/region.rs index d752382534..e7000cd744 100644 --- a/src/api/src/region.rs +++ b/src/api/src/region.rs @@ -22,6 +22,7 @@ use greptime_proto::v1::region::RegionResponse as RegionResponseV1; pub struct RegionResponse { pub affected_rows: AffectedRows, pub extensions: HashMap>, + pub metadata: Vec, } impl RegionResponse { @@ -29,6 +30,7 @@ impl RegionResponse { Self { affected_rows: region_response.affected_rows as _, extensions: region_response.extensions, + metadata: region_response.metadata, } } @@ -37,6 +39,16 @@ impl RegionResponse { Self { affected_rows, extensions: Default::default(), + metadata: Vec::new(), + } + } + + /// Creates one response with metadata. + pub fn from_metadata(metadata: Vec) -> Self { + Self { + affected_rows: 0, + extensions: Default::default(), + metadata, } } } diff --git a/src/common/meta/src/ddl/test_util/datanode_handler.rs b/src/common/meta/src/ddl/test_util/datanode_handler.rs index 775fc644f7..fcab09f65f 100644 --- a/src/common/meta/src/ddl/test_util/datanode_handler.rs +++ b/src/common/meta/src/ddl/test_util/datanode_handler.rs @@ -32,6 +32,7 @@ impl MockDatanodeHandler for () { Ok(RegionResponse { affected_rows: 0, extensions: Default::default(), + metadata: Vec::new(), }) } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 5c81f6ab46..4914100b80 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -387,6 +387,14 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to serialize json"))] + SerializeJson { + #[snafu(source)] + error: serde_json::Error, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -457,6 +465,7 @@ impl ErrorExt for Error { StatusCode::RegionBusy } MissingCache { .. } => StatusCode::Internal, + SerializeJson { .. } => StatusCode::Internal, } } diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 2f3530721a..7e4d5397cc 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -20,12 +20,14 @@ use std::time::Duration; use api::region::RegionResponse; use api::v1::region::sync_request::ManifestInfo; -use api::v1::region::{region_request, RegionResponse as RegionResponseV1, SyncRequest}; +use api::v1::region::{ + region_request, ListMetadataRequest, RegionResponse as RegionResponseV1, SyncRequest, +}; use api::v1::{ResponseHeader, Status}; use arrow_flight::{FlightData, Ticket}; use async_trait::async_trait; use bytes::Bytes; -use common_error::ext::BoxedError; +use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use common_query::request::QueryRequest; use common_query::OutputData; @@ -47,6 +49,7 @@ pub use query::dummy_catalog::{ DummyCatalogList, DummyTableProviderFactory, TableProviderFactoryRef, }; use query::QueryEngineRef; +use serde_json; use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult}; use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream}; use servers::grpc::region_server::RegionServerHandler; @@ -71,10 +74,10 @@ use tonic::{Request, Response, Result as TonicResult}; use crate::error::{ self, BuildRegionRequestsSnafu, ConcurrentQueryLimiterClosedSnafu, ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu, - ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, HandleBatchDdlRequestSnafu, - HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu, NewPlanDecoderSnafu, - RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu, Result, - StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu, + ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, GetRegionMetadataSnafu, + HandleBatchDdlRequestSnafu, HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu, + NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu, + Result, SerializeJsonSnafu, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu, }; use crate::event_listener::RegionServerEventListenerRef; @@ -138,12 +141,12 @@ impl RegionServer { /// Finds the region's engine by its id. If the region is not ready, returns `None`. pub fn find_engine(&self, region_id: RegionId) -> Result> { - self.inner - .get_engine(region_id, &RegionChange::None) - .map(|x| match x { - CurrentEngine::Engine(engine) => Some(engine), - CurrentEngine::EarlyReturn(_) => None, - }) + match self.inner.get_engine(region_id, &RegionChange::None) { + Ok(CurrentEngine::Engine(engine)) => Ok(Some(engine)), + Ok(CurrentEngine::EarlyReturn(_)) => Ok(None), + Err(error::Error::RegionNotFound { .. }) => Ok(None), + Err(err) => Err(err), + } } #[tracing::instrument(skip_all)] @@ -412,6 +415,7 @@ impl RegionServer { Ok(RegionResponse { affected_rows, extensions, + metadata: Vec::new(), }) } @@ -441,6 +445,7 @@ impl RegionServer { Ok(RegionResponse { affected_rows, extensions, + metadata: Vec::new(), }) } @@ -473,6 +478,48 @@ impl RegionServer { .map(|_| RegionResponse::new(AffectedRows::default())) } + /// Handles the ListMetadata request and retrieves metadata for specified regions. + /// + /// Returns the results as a JSON-serialized list in the [RegionResponse]. It serializes + /// non-existing regions as `null`. + #[tracing::instrument(skip_all)] + async fn handle_list_metadata_request( + &self, + request: &ListMetadataRequest, + ) -> Result { + let mut region_metadatas = Vec::new(); + // Collect metadata for each region + for region_id in &request.region_ids { + let region_id = RegionId::from_u64(*region_id); + // Get the engine. + let Some(engine) = self.find_engine(region_id)? else { + region_metadatas.push(None); + continue; + }; + + match engine.get_metadata(region_id).await { + Ok(metadata) => region_metadatas.push(Some(metadata)), + Err(err) => { + if err.status_code() == StatusCode::RegionNotFound { + region_metadatas.push(None); + } else { + Err(err).with_context(|_| GetRegionMetadataSnafu { + engine: engine.name(), + region_id, + })?; + } + } + } + } + + // Serialize metadata to JSON + let json_result = serde_json::to_vec(®ion_metadatas).context(SerializeJsonSnafu)?; + + let response = RegionResponse::from_metadata(json_result); + + Ok(response) + } + /// Sync region manifest and registers new opened logical regions. pub async fn sync_region( &self, @@ -504,6 +551,10 @@ impl RegionServerHandler for RegionServer { region_request::Body::Sync(sync_request) => { self.handle_sync_region_request(sync_request).await } + region_request::Body::ListMetadata(list_metadata_request) => { + self.handle_list_metadata_request(list_metadata_request) + .await + } _ => self.handle_requests_in_serial(request).await, } .map_err(BoxedError::new) @@ -518,6 +569,7 @@ impl RegionServerHandler for RegionServer { }), affected_rows: response.affected_rows as _, extensions: response.extensions, + metadata: response.metadata, }) } } @@ -897,6 +949,7 @@ impl RegionServerInner { Ok(RegionResponse { affected_rows: result.affected_rows, extensions: result.extensions, + metadata: Vec::new(), }) } Err(err) => { @@ -967,6 +1020,7 @@ impl RegionServerInner { Ok(RegionResponse { affected_rows: result.affected_rows, extensions: result.extensions, + metadata: Vec::new(), }) } Err(err) => { @@ -1242,8 +1296,11 @@ mod tests { use std::assert_matches::assert_matches; + use api::v1::SemanticType; use common_error::ext::ErrorExt; + use datatypes::prelude::ConcreteDataType; use mito2::test_util::CreateRequestBuilder; + use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}; use store_api::region_engine::RegionEngine; use store_api::region_request::{RegionDropRequest, RegionOpenRequest, RegionTruncateRequest}; use store_api::storage::RegionId; @@ -1605,4 +1662,175 @@ mod tests { let forth_query = p.acquire().await; assert!(forth_query.is_ok()); } + + fn mock_region_metadata(region_id: RegionId) -> RegionMetadata { + let mut metadata_builder = RegionMetadataBuilder::new(region_id); + metadata_builder.push_column_metadata(ColumnMetadata { + column_schema: datatypes::schema::ColumnSchema::new( + "timestamp", + ConcreteDataType::timestamp_nanosecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 0, + }); + metadata_builder.push_column_metadata(ColumnMetadata { + column_schema: datatypes::schema::ColumnSchema::new( + "file", + ConcreteDataType::string_datatype(), + true, + ), + semantic_type: SemanticType::Tag, + column_id: 1, + }); + metadata_builder.push_column_metadata(ColumnMetadata { + column_schema: datatypes::schema::ColumnSchema::new( + "message", + ConcreteDataType::string_datatype(), + true, + ), + semantic_type: SemanticType::Field, + column_id: 2, + }); + metadata_builder.primary_key(vec![1]); + metadata_builder.build().unwrap() + } + + #[tokio::test] + async fn test_handle_list_metadata_request() { + common_telemetry::init_default_ut_logging(); + + let mut mock_region_server = mock_region_server(); + let region_id_1 = RegionId::new(1, 0); + let region_id_2 = RegionId::new(2, 0); + + let metadata_1 = mock_region_metadata(region_id_1); + let metadata_2 = mock_region_metadata(region_id_2); + let metadatas = vec![Some(metadata_1.clone()), Some(metadata_2.clone())]; + + let metadata_1 = Arc::new(metadata_1); + let metadata_2 = Arc::new(metadata_2); + let (engine, _) = MockRegionEngine::with_metadata_mock_fn( + MITO_ENGINE_NAME, + Box::new(move |region_id| { + if region_id == region_id_1 { + Ok(metadata_1.clone()) + } else if region_id == region_id_2 { + Ok(metadata_2.clone()) + } else { + error::RegionNotFoundSnafu { region_id }.fail() + } + }), + ); + + mock_region_server.register_engine(engine.clone()); + mock_region_server + .inner + .region_map + .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone())); + mock_region_server + .inner + .region_map + .insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone())); + + // All regions exist. + let list_metadata_request = ListMetadataRequest { + region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()], + }; + let response = mock_region_server + .handle_list_metadata_request(&list_metadata_request) + .await + .unwrap(); + let decoded_metadata: Vec> = + serde_json::from_slice(&response.metadata).unwrap(); + assert_eq!(metadatas, decoded_metadata); + } + + #[tokio::test] + async fn test_handle_list_metadata_not_found() { + common_telemetry::init_default_ut_logging(); + + let mut mock_region_server = mock_region_server(); + let region_id_1 = RegionId::new(1, 0); + let region_id_2 = RegionId::new(2, 0); + + let metadata_1 = mock_region_metadata(region_id_1); + let metadatas = vec![Some(metadata_1.clone()), None]; + + let metadata_1 = Arc::new(metadata_1); + let (engine, _) = MockRegionEngine::with_metadata_mock_fn( + MITO_ENGINE_NAME, + Box::new(move |region_id| { + if region_id == region_id_1 { + Ok(metadata_1.clone()) + } else { + error::RegionNotFoundSnafu { region_id }.fail() + } + }), + ); + + mock_region_server.register_engine(engine.clone()); + mock_region_server + .inner + .region_map + .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone())); + + // Not in region map. + let list_metadata_request = ListMetadataRequest { + region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()], + }; + let response = mock_region_server + .handle_list_metadata_request(&list_metadata_request) + .await + .unwrap(); + let decoded_metadata: Vec> = + serde_json::from_slice(&response.metadata).unwrap(); + assert_eq!(metadatas, decoded_metadata); + + // Not in region engine. + mock_region_server + .inner + .region_map + .insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone())); + let response = mock_region_server + .handle_list_metadata_request(&list_metadata_request) + .await + .unwrap(); + let decoded_metadata: Vec> = + serde_json::from_slice(&response.metadata).unwrap(); + assert_eq!(metadatas, decoded_metadata); + } + + #[tokio::test] + async fn test_handle_list_metadata_failed() { + common_telemetry::init_default_ut_logging(); + + let mut mock_region_server = mock_region_server(); + let region_id_1 = RegionId::new(1, 0); + + let (engine, _) = MockRegionEngine::with_metadata_mock_fn( + MITO_ENGINE_NAME, + Box::new(move |region_id| { + error::UnexpectedSnafu { + violated: format!("Failed to get region {region_id}"), + } + .fail() + }), + ); + + mock_region_server.register_engine(engine.clone()); + mock_region_server + .inner + .region_map + .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone())); + + // Failed to get. + let list_metadata_request = ListMetadataRequest { + region_ids: vec![region_id_1.as_u64()], + }; + mock_region_server + .handle_list_metadata_request(&list_metadata_request) + .await + .unwrap_err(); + } } diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index 4c0b95c2ef..b5c59a35a5 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -108,11 +108,15 @@ pub type MockRequestHandler = pub type MockSetReadonlyGracefullyHandler = Box Result + Send + Sync>; +pub type MockGetMetadataHandler = + Box Result + Send + Sync>; + pub struct MockRegionEngine { sender: Sender<(RegionId, RegionRequest)>, pub(crate) handle_request_delay: Option, pub(crate) handle_request_mock_fn: Option, pub(crate) handle_set_readonly_gracefully_mock_fn: Option, + pub(crate) handle_get_metadata_mock_fn: Option, pub(crate) mock_role: Option>, engine: String, } @@ -127,6 +131,7 @@ impl MockRegionEngine { sender: tx, handle_request_mock_fn: None, handle_set_readonly_gracefully_mock_fn: None, + handle_get_metadata_mock_fn: None, mock_role: None, engine: engine.to_string(), }), @@ -146,6 +151,27 @@ impl MockRegionEngine { sender: tx, handle_request_mock_fn: Some(mock_fn), handle_set_readonly_gracefully_mock_fn: None, + handle_get_metadata_mock_fn: None, + mock_role: None, + engine: engine.to_string(), + }), + rx, + ) + } + + pub fn with_metadata_mock_fn( + engine: &str, + mock_fn: MockGetMetadataHandler, + ) -> (Arc, Receiver<(RegionId, RegionRequest)>) { + let (tx, rx) = tokio::sync::mpsc::channel(8); + + ( + Arc::new(Self { + handle_request_delay: None, + sender: tx, + handle_request_mock_fn: None, + handle_set_readonly_gracefully_mock_fn: None, + handle_get_metadata_mock_fn: Some(mock_fn), mock_role: None, engine: engine.to_string(), }), @@ -166,6 +192,7 @@ impl MockRegionEngine { sender: tx, handle_request_mock_fn: None, handle_set_readonly_gracefully_mock_fn: None, + handle_get_metadata_mock_fn: None, mock_role: None, engine: engine.to_string(), }; @@ -208,7 +235,11 @@ impl RegionEngine for MockRegionEngine { unimplemented!() } - async fn get_metadata(&self, _region_id: RegionId) -> Result { + async fn get_metadata(&self, region_id: RegionId) -> Result { + if let Some(mock_fn) = &self.handle_get_metadata_mock_fn { + return mock_fn(region_id).map_err(BoxedError::new); + }; + unimplemented!() } diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs index f2420522ae..45dc8f661d 100644 --- a/src/meta-srv/src/procedure/utils.rs +++ b/src/meta-srv/src/procedure/utils.rs @@ -103,6 +103,7 @@ pub mod mock { }), affected_rows: 0, extensions: Default::default(), + metadata: Vec::new(), }) } } diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 680f4064ca..827cb90992 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -158,6 +158,7 @@ impl RegionEngine for MetricEngine { Ok(RegionResponse { affected_rows: rows, extensions: extension_return_value, + metadata: Vec::new(), }) } BatchRegionDdlRequest::Alter(requests) => { @@ -171,6 +172,7 @@ impl RegionEngine for MetricEngine { Ok(RegionResponse { affected_rows: rows, extensions: extension_return_value, + metadata: Vec::new(), }) } BatchRegionDdlRequest::Drop(requests) => { @@ -243,6 +245,7 @@ impl RegionEngine for MetricEngine { result.map_err(BoxedError::new).map(|rows| RegionResponse { affected_rows: rows, extensions: extension_return_value, + metadata: Vec::new(), }) } @@ -439,6 +442,7 @@ impl MetricEngine { Ok(RegionResponse { affected_rows, extensions, + metadata: Vec::new(), }) } } diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index fc27c2ff10..a15f020a10 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -703,6 +703,7 @@ pub trait RegionEngine: Send + Sync { Ok(RegionResponse { affected_rows, extensions, + metadata: Vec::new(), }) } diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index f9b71c4bc6..8f4428be9e 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -154,6 +154,10 @@ impl RegionRequest { reason: "Sync request should be handled separately by RegionServer", } .fail(), + region_request::Body::ListMetadata(_) => UnexpectedSnafu { + reason: "ListMetadata request should be handled separately by RegionServer", + } + .fail(), } }