feat: Support ListMetadataRequest to retrieve regions' metadata (#6348)

* feat: support list metadata in region server

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: add test for list region metadata

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: return null if region not exists

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: update greptime-proto

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2025-06-23 15:11:20 +08:00
committed by WenyXu
parent c0f40ce8ed
commit cfd6c1c3e0
11 changed files with 306 additions and 15 deletions

2
Cargo.lock generated
View File

@@ -5158,7 +5158,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=a5d256ba4abb7393e0859ffbf7fca1e38f3433dc#a5d256ba4abb7393e0859ffbf7fca1e38f3433dc"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=17f14bc2791a336196ac0f4d2df9ee2972233ab8"
dependencies = [
"prost 0.13.5",
"serde",

View File

@@ -134,7 +134,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a5d256ba4abb7393e0859ffbf7fca1e38f3433dc" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "17f14bc2791a336196ac0f4d2df9ee2972233ab8" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -22,6 +22,7 @@ use greptime_proto::v1::region::RegionResponse as RegionResponseV1;
pub struct RegionResponse {
pub affected_rows: AffectedRows,
pub extensions: HashMap<String, Vec<u8>>,
pub metadata: Vec<u8>,
}
impl RegionResponse {
@@ -29,6 +30,7 @@ impl RegionResponse {
Self {
affected_rows: region_response.affected_rows as _,
extensions: region_response.extensions,
metadata: region_response.metadata,
}
}
@@ -37,6 +39,16 @@ impl RegionResponse {
Self {
affected_rows,
extensions: Default::default(),
metadata: Vec::new(),
}
}
/// Creates one response with metadata.
pub fn from_metadata(metadata: Vec<u8>) -> Self {
Self {
affected_rows: 0,
extensions: Default::default(),
metadata,
}
}
}

View File

@@ -38,6 +38,7 @@ impl MockDatanodeHandler for () {
Ok(RegionResponse {
affected_rows: 0,
extensions: Default::default(),
metadata: Vec::new(),
})
}

View File

@@ -387,6 +387,14 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to serialize json"))]
SerializeJson {
#[snafu(source)]
error: serde_json::Error,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -457,6 +465,7 @@ impl ErrorExt for Error {
StatusCode::RegionBusy
}
MissingCache { .. } => StatusCode::Internal,
SerializeJson { .. } => StatusCode::Internal,
}
}

View File

@@ -20,12 +20,14 @@ use std::time::Duration;
use api::region::RegionResponse;
use api::v1::region::sync_request::ManifestInfo;
use api::v1::region::{region_request, RegionResponse as RegionResponseV1, SyncRequest};
use api::v1::region::{
region_request, ListMetadataRequest, RegionResponse as RegionResponseV1, SyncRequest,
};
use api::v1::{ResponseHeader, Status};
use arrow_flight::{FlightData, Ticket};
use async_trait::async_trait;
use bytes::Bytes;
use common_error::ext::BoxedError;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_query::request::QueryRequest;
use common_query::OutputData;
@@ -47,6 +49,7 @@ pub use query::dummy_catalog::{
DummyCatalogList, DummyTableProviderFactory, TableProviderFactoryRef,
};
use query::QueryEngineRef;
use serde_json;
use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult};
use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
use servers::grpc::region_server::RegionServerHandler;
@@ -71,10 +74,10 @@ use tonic::{Request, Response, Result as TonicResult};
use crate::error::{
self, BuildRegionRequestsSnafu, ConcurrentQueryLimiterClosedSnafu,
ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, HandleBatchDdlRequestSnafu,
HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu, NewPlanDecoderSnafu,
RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu, Result,
StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, GetRegionMetadataSnafu,
HandleBatchDdlRequestSnafu, HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu,
NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu,
Result, SerializeJsonSnafu, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
};
use crate::event_listener::RegionServerEventListenerRef;
@@ -138,12 +141,12 @@ impl RegionServer {
/// Finds the region's engine by its id. If the region is not ready, returns `None`.
pub fn find_engine(&self, region_id: RegionId) -> Result<Option<RegionEngineRef>> {
self.inner
.get_engine(region_id, &RegionChange::None)
.map(|x| match x {
CurrentEngine::Engine(engine) => Some(engine),
CurrentEngine::EarlyReturn(_) => None,
})
match self.inner.get_engine(region_id, &RegionChange::None) {
Ok(CurrentEngine::Engine(engine)) => Ok(Some(engine)),
Ok(CurrentEngine::EarlyReturn(_)) => Ok(None),
Err(error::Error::RegionNotFound { .. }) => Ok(None),
Err(err) => Err(err),
}
}
#[tracing::instrument(skip_all)]
@@ -411,6 +414,7 @@ impl RegionServer {
Ok(RegionResponse {
affected_rows,
extensions,
metadata: Vec::new(),
})
}
@@ -440,6 +444,7 @@ impl RegionServer {
Ok(RegionResponse {
affected_rows,
extensions,
metadata: Vec::new(),
})
}
@@ -472,6 +477,48 @@ impl RegionServer {
.map(|_| RegionResponse::new(AffectedRows::default()))
}
/// Handles the ListMetadata request and retrieves metadata for specified regions.
///
/// Returns the results as a JSON-serialized list in the [RegionResponse]. It serializes
/// non-existing regions as `null`.
#[tracing::instrument(skip_all)]
async fn handle_list_metadata_request(
&self,
request: &ListMetadataRequest,
) -> Result<RegionResponse> {
let mut region_metadatas = Vec::new();
// Collect metadata for each region
for region_id in &request.region_ids {
let region_id = RegionId::from_u64(*region_id);
// Get the engine.
let Some(engine) = self.find_engine(region_id)? else {
region_metadatas.push(None);
continue;
};
match engine.get_metadata(region_id).await {
Ok(metadata) => region_metadatas.push(Some(metadata)),
Err(err) => {
if err.status_code() == StatusCode::RegionNotFound {
region_metadatas.push(None);
} else {
Err(err).with_context(|_| GetRegionMetadataSnafu {
engine: engine.name(),
region_id,
})?;
}
}
}
}
// Serialize metadata to JSON
let json_result = serde_json::to_vec(&region_metadatas).context(SerializeJsonSnafu)?;
let response = RegionResponse::from_metadata(json_result);
Ok(response)
}
/// Sync region manifest and registers new opened logical regions.
pub async fn sync_region(
&self,
@@ -503,6 +550,10 @@ impl RegionServerHandler for RegionServer {
region_request::Body::Sync(sync_request) => {
self.handle_sync_region_request(sync_request).await
}
region_request::Body::ListMetadata(list_metadata_request) => {
self.handle_list_metadata_request(list_metadata_request)
.await
}
_ => self.handle_requests_in_serial(request).await,
}
.map_err(BoxedError::new)
@@ -517,6 +568,7 @@ impl RegionServerHandler for RegionServer {
}),
affected_rows: response.affected_rows as _,
extensions: response.extensions,
metadata: response.metadata,
})
}
}
@@ -902,6 +954,7 @@ impl RegionServerInner {
Ok(RegionResponse {
affected_rows: result.affected_rows,
extensions: result.extensions,
metadata: Vec::new(),
})
}
Err(err) => {
@@ -971,6 +1024,7 @@ impl RegionServerInner {
Ok(RegionResponse {
affected_rows: result.affected_rows,
extensions: result.extensions,
metadata: Vec::new(),
})
}
Err(err) => {
@@ -1243,8 +1297,11 @@ mod tests {
use std::assert_matches::assert_matches;
use api::v1::SemanticType;
use common_error::ext::ErrorExt;
use datatypes::prelude::ConcreteDataType;
use mito2::test_util::CreateRequestBuilder;
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
use store_api::region_engine::RegionEngine;
use store_api::region_request::{RegionDropRequest, RegionOpenRequest, RegionTruncateRequest};
use store_api::storage::RegionId;
@@ -1606,4 +1663,175 @@ mod tests {
let forth_query = p.acquire().await;
assert!(forth_query.is_ok());
}
fn mock_region_metadata(region_id: RegionId) -> RegionMetadata {
let mut metadata_builder = RegionMetadataBuilder::new(region_id);
metadata_builder.push_column_metadata(ColumnMetadata {
column_schema: datatypes::schema::ColumnSchema::new(
"timestamp",
ConcreteDataType::timestamp_nanosecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 0,
});
metadata_builder.push_column_metadata(ColumnMetadata {
column_schema: datatypes::schema::ColumnSchema::new(
"file",
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id: 1,
});
metadata_builder.push_column_metadata(ColumnMetadata {
column_schema: datatypes::schema::ColumnSchema::new(
"message",
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Field,
column_id: 2,
});
metadata_builder.primary_key(vec![1]);
metadata_builder.build().unwrap()
}
#[tokio::test]
async fn test_handle_list_metadata_request() {
common_telemetry::init_default_ut_logging();
let mut mock_region_server = mock_region_server();
let region_id_1 = RegionId::new(1, 0);
let region_id_2 = RegionId::new(2, 0);
let metadata_1 = mock_region_metadata(region_id_1);
let metadata_2 = mock_region_metadata(region_id_2);
let metadatas = vec![Some(metadata_1.clone()), Some(metadata_2.clone())];
let metadata_1 = Arc::new(metadata_1);
let metadata_2 = Arc::new(metadata_2);
let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
MITO_ENGINE_NAME,
Box::new(move |region_id| {
if region_id == region_id_1 {
Ok(metadata_1.clone())
} else if region_id == region_id_2 {
Ok(metadata_2.clone())
} else {
error::RegionNotFoundSnafu { region_id }.fail()
}
}),
);
mock_region_server.register_engine(engine.clone());
mock_region_server
.inner
.region_map
.insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
mock_region_server
.inner
.region_map
.insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
// All regions exist.
let list_metadata_request = ListMetadataRequest {
region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
};
let response = mock_region_server
.handle_list_metadata_request(&list_metadata_request)
.await
.unwrap();
let decoded_metadata: Vec<Option<RegionMetadata>> =
serde_json::from_slice(&response.metadata).unwrap();
assert_eq!(metadatas, decoded_metadata);
}
#[tokio::test]
async fn test_handle_list_metadata_not_found() {
common_telemetry::init_default_ut_logging();
let mut mock_region_server = mock_region_server();
let region_id_1 = RegionId::new(1, 0);
let region_id_2 = RegionId::new(2, 0);
let metadata_1 = mock_region_metadata(region_id_1);
let metadatas = vec![Some(metadata_1.clone()), None];
let metadata_1 = Arc::new(metadata_1);
let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
MITO_ENGINE_NAME,
Box::new(move |region_id| {
if region_id == region_id_1 {
Ok(metadata_1.clone())
} else {
error::RegionNotFoundSnafu { region_id }.fail()
}
}),
);
mock_region_server.register_engine(engine.clone());
mock_region_server
.inner
.region_map
.insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
// Not in region map.
let list_metadata_request = ListMetadataRequest {
region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
};
let response = mock_region_server
.handle_list_metadata_request(&list_metadata_request)
.await
.unwrap();
let decoded_metadata: Vec<Option<RegionMetadata>> =
serde_json::from_slice(&response.metadata).unwrap();
assert_eq!(metadatas, decoded_metadata);
// Not in region engine.
mock_region_server
.inner
.region_map
.insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
let response = mock_region_server
.handle_list_metadata_request(&list_metadata_request)
.await
.unwrap();
let decoded_metadata: Vec<Option<RegionMetadata>> =
serde_json::from_slice(&response.metadata).unwrap();
assert_eq!(metadatas, decoded_metadata);
}
#[tokio::test]
async fn test_handle_list_metadata_failed() {
common_telemetry::init_default_ut_logging();
let mut mock_region_server = mock_region_server();
let region_id_1 = RegionId::new(1, 0);
let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
MITO_ENGINE_NAME,
Box::new(move |region_id| {
error::UnexpectedSnafu {
violated: format!("Failed to get region {region_id}"),
}
.fail()
}),
);
mock_region_server.register_engine(engine.clone());
mock_region_server
.inner
.region_map
.insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
// Failed to get.
let list_metadata_request = ListMetadataRequest {
region_ids: vec![region_id_1.as_u64()],
};
mock_region_server
.handle_list_metadata_request(&list_metadata_request)
.await
.unwrap_err();
}
}

View File

@@ -108,11 +108,15 @@ pub type MockRequestHandler =
pub type MockSetReadonlyGracefullyHandler =
Box<dyn Fn(RegionId) -> Result<SetRegionRoleStateResponse, Error> + Send + Sync>;
pub type MockGetMetadataHandler =
Box<dyn Fn(RegionId) -> Result<RegionMetadataRef, Error> + Send + Sync>;
pub struct MockRegionEngine {
sender: Sender<(RegionId, RegionRequest)>,
pub(crate) handle_request_delay: Option<Duration>,
pub(crate) handle_request_mock_fn: Option<MockRequestHandler>,
pub(crate) handle_set_readonly_gracefully_mock_fn: Option<MockSetReadonlyGracefullyHandler>,
pub(crate) handle_get_metadata_mock_fn: Option<MockGetMetadataHandler>,
pub(crate) mock_role: Option<Option<RegionRole>>,
engine: String,
}
@@ -127,6 +131,7 @@ impl MockRegionEngine {
sender: tx,
handle_request_mock_fn: None,
handle_set_readonly_gracefully_mock_fn: None,
handle_get_metadata_mock_fn: None,
mock_role: None,
engine: engine.to_string(),
}),
@@ -146,6 +151,27 @@ impl MockRegionEngine {
sender: tx,
handle_request_mock_fn: Some(mock_fn),
handle_set_readonly_gracefully_mock_fn: None,
handle_get_metadata_mock_fn: None,
mock_role: None,
engine: engine.to_string(),
}),
rx,
)
}
pub fn with_metadata_mock_fn(
engine: &str,
mock_fn: MockGetMetadataHandler,
) -> (Arc<Self>, Receiver<(RegionId, RegionRequest)>) {
let (tx, rx) = tokio::sync::mpsc::channel(8);
(
Arc::new(Self {
handle_request_delay: None,
sender: tx,
handle_request_mock_fn: None,
handle_set_readonly_gracefully_mock_fn: None,
handle_get_metadata_mock_fn: Some(mock_fn),
mock_role: None,
engine: engine.to_string(),
}),
@@ -166,6 +192,7 @@ impl MockRegionEngine {
sender: tx,
handle_request_mock_fn: None,
handle_set_readonly_gracefully_mock_fn: None,
handle_get_metadata_mock_fn: None,
mock_role: None,
engine: engine.to_string(),
};
@@ -208,7 +235,11 @@ impl RegionEngine for MockRegionEngine {
unimplemented!()
}
async fn get_metadata(&self, _region_id: RegionId) -> Result<RegionMetadataRef, BoxedError> {
async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError> {
if let Some(mock_fn) = &self.handle_get_metadata_mock_fn {
return mock_fn(region_id).map_err(BoxedError::new);
};
unimplemented!()
}

View File

@@ -103,6 +103,7 @@ pub mod mock {
}),
affected_rows: 0,
extensions: Default::default(),
metadata: Vec::new(),
})
}
}

View File

@@ -158,6 +158,7 @@ impl RegionEngine for MetricEngine {
Ok(RegionResponse {
affected_rows: rows,
extensions: extension_return_value,
metadata: Vec::new(),
})
}
BatchRegionDdlRequest::Alter(requests) => {
@@ -171,6 +172,7 @@ impl RegionEngine for MetricEngine {
Ok(RegionResponse {
affected_rows: rows,
extensions: extension_return_value,
metadata: Vec::new(),
})
}
BatchRegionDdlRequest::Drop(requests) => {
@@ -243,6 +245,7 @@ impl RegionEngine for MetricEngine {
result.map_err(BoxedError::new).map(|rows| RegionResponse {
affected_rows: rows,
extensions: extension_return_value,
metadata: Vec::new(),
})
}
@@ -439,6 +442,7 @@ impl MetricEngine {
Ok(RegionResponse {
affected_rows,
extensions,
metadata: Vec::new(),
})
}
}

View File

@@ -703,6 +703,7 @@ pub trait RegionEngine: Send + Sync {
Ok(RegionResponse {
affected_rows,
extensions,
metadata: Vec::new(),
})
}

View File

@@ -154,6 +154,10 @@ impl RegionRequest {
reason: "Sync request should be handled separately by RegionServer",
}
.fail(),
region_request::Body::ListMetadata(_) => UnexpectedSnafu {
reason: "ListMetadata request should be handled separately by RegionServer",
}
.fail(),
}
}