feat: add column metadata to response extensions (#6451)

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-07-07 11:38:13 +08:00
committed by GitHub
parent 3508fddd74
commit 30f7955d2b
8 changed files with 372 additions and 63 deletions

1
Cargo.lock generated
View File

@@ -7243,6 +7243,7 @@ dependencies = [
"common-base",
"common-error",
"common-macro",
"common-meta",
"common-query",
"common-recordbatch",
"common-runtime",

View File

@@ -40,5 +40,6 @@ store-api.workspace = true
tokio.workspace = true
[dev-dependencies]
common-meta = { workspace = true, features = ["testing"] }
common-test-util.workspace = true
mito2 = { workspace = true, features = ["test"] }

View File

@@ -66,7 +66,7 @@ impl MetricEngineInner {
let mut manifest_infos = Vec::with_capacity(1);
self.alter_logical_regions(physical_region_id, requests, extension_return_value)
.await?;
append_manifest_info(&self.mito, region_id, &mut manifest_infos);
append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos);
encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
} else {
let grouped_requests =
@@ -222,13 +222,17 @@ mod test {
use std::time::Duration;
use api::v1::SemanticType;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use store_api::metadata::ColumnMetadata;
use store_api::region_request::{AddColumn, SetRegionOption};
use common_meta::ddl::test_util::assert_column_name_and_id;
use common_meta::ddl::utils::{parse_column_metadatas, parse_manifest_infos_from_extensions};
use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{
AlterKind, BatchRegionDdlRequest, RegionAlterRequest, SetRegionOption,
};
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::RegionId;
use super::*;
use crate::test_util::TestEnv;
use crate::test_util::{alter_logical_region_request, create_logical_region_request, TestEnv};
#[tokio::test]
async fn test_alter_region() {
@@ -239,22 +243,7 @@ mod test {
// alter physical region
let physical_region_id = env.default_physical_region_id();
let request = RegionAlterRequest {
kind: AlterKind::AddColumns {
columns: vec![AddColumn {
column_metadata: ColumnMetadata {
column_id: 0,
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new(
"tag1",
ConcreteDataType::string_datatype(),
false,
),
},
location: None,
}],
},
};
let request = alter_logical_region_request(&["tag1"]);
let result = engine_inner
.alter_physical_region(physical_region_id, request.clone())
@@ -287,14 +276,18 @@ mod test {
assert!(!is_column_exist);
let region_id = env.default_logical_region_id();
engine_inner
.alter_logical_regions(
physical_region_id,
vec![(region_id, request)],
&mut HashMap::new(),
)
let response = env
.metric()
.handle_batch_ddl_requests(BatchRegionDdlRequest::Alter(vec![(
region_id,
request.clone(),
)]))
.await
.unwrap();
let manifest_infos = parse_manifest_infos_from_extensions(&response.extensions).unwrap();
assert_eq!(manifest_infos[0].0, physical_region_id);
assert!(manifest_infos[0].1.is_metric());
let semantic_type = metadata_region
.column_semantic_type(physical_region_id, logical_region_id, "tag1")
.await
@@ -307,5 +300,77 @@ mod test {
.unwrap()
.unwrap();
assert_eq!(timestamp_index, SemanticType::Timestamp);
let column_metadatas =
parse_column_metadatas(&response.extensions, ALTER_PHYSICAL_EXTENSION_KEY).unwrap();
assert_column_name_and_id(
&column_metadatas,
&[
("greptime_timestamp", 0),
("greptime_value", 1),
("__table_id", ReservedColumnId::table_id()),
("__tsid", ReservedColumnId::tsid()),
("job", 2),
("tag1", 3),
],
);
}
#[tokio::test]
async fn test_alter_logical_regions() {
let env = TestEnv::new().await;
let engine = env.metric();
let physical_region_id1 = RegionId::new(1024, 0);
let physical_region_id2 = RegionId::new(1024, 1);
let logical_region_id1 = RegionId::new(1025, 0);
let logical_region_id2 = RegionId::new(1025, 1);
env.create_physical_region(physical_region_id1, "/test_dir1")
.await;
env.create_physical_region(physical_region_id2, "/test_dir2")
.await;
let region_create_request1 = crate::test_util::create_logical_region_request(
&["job"],
physical_region_id1,
"logical1",
);
let region_create_request2 =
create_logical_region_request(&["job"], physical_region_id2, "logical2");
engine
.handle_batch_ddl_requests(BatchRegionDdlRequest::Create(vec![
(logical_region_id1, region_create_request1),
(logical_region_id2, region_create_request2),
]))
.await
.unwrap();
let region_alter_request1 = alter_logical_region_request(&["tag1"]);
let region_alter_request2 = alter_logical_region_request(&["tag1"]);
let response = engine
.handle_batch_ddl_requests(BatchRegionDdlRequest::Alter(vec![
(logical_region_id1, region_alter_request1),
(logical_region_id2, region_alter_request2),
]))
.await
.unwrap();
let manifest_infos = parse_manifest_infos_from_extensions(&response.extensions).unwrap();
assert_eq!(manifest_infos.len(), 2);
let region_ids = manifest_infos.into_iter().map(|i| i.0).collect::<Vec<_>>();
assert!(region_ids.contains(&physical_region_id1));
assert!(region_ids.contains(&physical_region_id2));
let column_metadatas =
parse_column_metadatas(&response.extensions, ALTER_PHYSICAL_EXTENSION_KEY).unwrap();
assert_column_name_and_id(
&column_metadatas,
&[
("greptime_timestamp", 0),
("greptime_value", 1),
("__table_id", ReservedColumnId::table_id()),
("__tsid", ReservedColumnId::tsid()),
("job", 2),
("tag1", 3),
],
);
}
}

View File

@@ -80,7 +80,8 @@ impl MetricEngineInner {
}
);
let (region_id, request) = requests.pop().unwrap();
self.create_physical_region(region_id, request).await?;
self.create_physical_region(region_id, request, extension_return_value)
.await?;
return Ok(0);
} else if first_request
@@ -122,6 +123,7 @@ impl MetricEngineInner {
&self,
region_id: RegionId,
request: RegionCreateRequest,
extension_return_value: &mut HashMap<String, Vec<u8>>,
) -> Result<()> {
let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
let (data_region_id, metadata_region_id) = Self::transform_region_id(region_id);
@@ -162,7 +164,8 @@ impl MetricEngineInner {
.context(UnexpectedRequestSnafu {
reason: "No time index column found",
})?;
self.mito
let response = self
.mito
.handle_request(
data_region_id,
RegionRequest::Create(create_data_region_request),
@@ -176,6 +179,7 @@ impl MetricEngineInner {
region_id: data_region_id,
},
)?;
extension_return_value.extend(response.extensions);
info!("Created physical metric region {region_id}, primary key encoding={primary_key_encoding}, physical_region_options={physical_region_options:?}");
PHYSICAL_REGION_COUNT.inc();
@@ -613,12 +617,15 @@ pub(crate) fn region_options_for_metadata_region(
#[cfg(test)]
mod test {
use common_meta::ddl::test_util::assert_column_name_and_id;
use common_meta::ddl::utils::{parse_column_metadatas, parse_manifest_infos_from_extensions};
use store_api::metric_engine_consts::{METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY};
use store_api::region_request::BatchRegionDdlRequest;
use super::*;
use crate::config::EngineConfig;
use crate::engine::MetricEngine;
use crate::test_util::TestEnv;
use crate::test_util::{create_logical_region_request, TestEnv};
#[test]
fn test_verify_region_create_request() {
@@ -807,4 +814,50 @@ mod test {
);
assert!(!metadata_region_request.options.contains_key("skip_wal"));
}
#[tokio::test]
async fn test_create_logical_regions() {
let env = TestEnv::new().await;
let engine = env.metric();
let physical_region_id1 = RegionId::new(1024, 0);
let physical_region_id2 = RegionId::new(1024, 1);
let logical_region_id1 = RegionId::new(1025, 0);
let logical_region_id2 = RegionId::new(1025, 1);
env.create_physical_region(physical_region_id1, "/test_dir1")
.await;
env.create_physical_region(physical_region_id2, "/test_dir2")
.await;
let region_create_request1 =
create_logical_region_request(&["job"], physical_region_id1, "logical1");
let region_create_request2 =
create_logical_region_request(&["job"], physical_region_id2, "logical2");
let response = engine
.handle_batch_ddl_requests(BatchRegionDdlRequest::Create(vec![
(logical_region_id1, region_create_request1),
(logical_region_id2, region_create_request2),
]))
.await
.unwrap();
let manifest_infos = parse_manifest_infos_from_extensions(&response.extensions).unwrap();
assert_eq!(manifest_infos.len(), 2);
let region_ids = manifest_infos.into_iter().map(|i| i.0).collect::<Vec<_>>();
assert!(region_ids.contains(&physical_region_id1));
assert!(region_ids.contains(&physical_region_id2));
let column_metadatas =
parse_column_metadatas(&response.extensions, ALTER_PHYSICAL_EXTENSION_KEY).unwrap();
assert_column_name_and_id(
&column_metadatas,
&[
("greptime_timestamp", 0),
("greptime_value", 1),
("__table_id", ReservedColumnId::table_id()),
("__tsid", ReservedColumnId::tsid()),
("job", 2),
],
);
}
}

View File

@@ -16,6 +16,7 @@
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema as PbColumnSchema, Row, SemanticType, Value};
use common_meta::ddl::utils::parse_column_metadatas;
use common_telemetry::debug;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
@@ -26,12 +27,14 @@ use object_store::util::join_dir;
use object_store::ObjectStore;
use store_api::metadata::ColumnMetadata;
use store_api::metric_engine_consts::{
LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
ALTER_PHYSICAL_EXTENSION_KEY, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
PHYSICAL_TABLE_METADATA_KEY, TABLE_COLUMN_METADATA_EXTENSION_KEY,
};
use store_api::region_engine::RegionEngine;
use store_api::region_request::{
AddColumn, AlterKind, RegionAlterRequest, RegionCreateRequest, RegionOpenRequest, RegionRequest,
};
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::{ColumnId, RegionId};
use crate::config::EngineConfig;
@@ -116,13 +119,8 @@ impl TestEnv {
(mito, metric)
}
/// Create regions in [MetricEngine] under [`default_region_id`]
/// and region dir `"test_metric_region"`.
///
/// This method will create one logical region with three columns `(ts, val, job)`
/// under [`default_logical_region_id`].
pub async fn init_metric_region(&self) {
let region_id = self.default_physical_region_id();
/// Create regions in [MetricEngine] with specific `physical_region_id`.
pub async fn create_physical_region(&self, physical_region_id: RegionId, region_dir: &str) {
let region_create_request = RegionCreateRequest {
engine: METRIC_ENGINE_NAME.to_string(),
column_metadatas: vec![
@@ -149,26 +147,88 @@ impl TestEnv {
options: [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
.into_iter()
.collect(),
region_dir: self.default_region_dir(),
region_dir: region_dir.to_string(),
};
// create physical region
self.metric()
.handle_request(region_id, RegionRequest::Create(region_create_request))
let response = self
.metric()
.handle_request(
physical_region_id,
RegionRequest::Create(region_create_request),
)
.await
.unwrap();
let column_metadatas =
parse_column_metadatas(&response.extensions, TABLE_COLUMN_METADATA_EXTENSION_KEY)
.unwrap();
assert_eq!(column_metadatas.len(), 4);
}
// create logical region
let region_id = self.default_logical_region_id();
/// Create logical region in [MetricEngine] with specific `physical_region_id` and `logical_region_id`.
pub async fn create_logical_region(
&self,
physical_region_id: RegionId,
logical_region_id: RegionId,
) {
let region_create_request = create_logical_region_request(
&["job"],
self.default_physical_region_id(),
physical_region_id,
"test_metric_logical_region",
);
self.metric()
.handle_request(region_id, RegionRequest::Create(region_create_request))
let response = self
.metric()
.handle_request(
logical_region_id,
RegionRequest::Create(region_create_request),
)
.await
.unwrap();
let column_metadatas =
parse_column_metadatas(&response.extensions, ALTER_PHYSICAL_EXTENSION_KEY).unwrap();
assert_eq!(column_metadatas.len(), 5);
let column_names = column_metadatas
.iter()
.map(|c| c.column_schema.name.as_str())
.collect::<Vec<_>>();
let column_ids = column_metadatas
.iter()
.map(|c| c.column_id)
.collect::<Vec<_>>();
assert_eq!(
column_names,
vec![
"greptime_timestamp",
"greptime_value",
"__table_id",
"__tsid",
"job",
]
);
assert_eq!(
column_ids,
vec![
0,
1,
ReservedColumnId::table_id(),
ReservedColumnId::tsid(),
2,
]
);
}
/// Create regions in [MetricEngine] under [`default_region_id`]
/// and region dir `"test_metric_region"`.
///
/// This method will create one logical region with three columns `(ts, val, job)`
/// under [`default_logical_region_id`].
pub async fn init_metric_region(&self) {
let physical_region_id = self.default_physical_region_id();
self.create_physical_region(physical_region_id, &self.default_region_dir())
.await;
let logical_region_id = self.default_logical_region_id();
self.create_logical_region(physical_region_id, logical_region_id)
.await;
}
pub fn metadata_region(&self) -> MetadataRegion {
@@ -274,6 +334,30 @@ pub fn create_logical_region_request(
}
}
/// Generate a [RegionAlterRequest] for logical region.
/// Only need to specify tag column's name
pub fn alter_logical_region_request(tags: &[&str]) -> RegionAlterRequest {
RegionAlterRequest {
kind: AlterKind::AddColumns {
columns: tags
.iter()
.map(|tag| AddColumn {
column_metadata: ColumnMetadata {
column_id: 0,
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new(
tag.to_string(),
ConcreteDataType::string_datatype(),
false,
),
},
location: None,
})
.collect::<Vec<_>>(),
},
}
}
/// Generate a row schema with given tag columns.
///
/// The result will also contains default timestamp and value column at beginning.

View File

@@ -80,8 +80,10 @@ use snafu::{ensure, OptionExt, ResultExt};
use store_api::codec::PrimaryKeyEncoding;
use store_api::logstore::provider::Provider;
use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::MANIFEST_INFO_EXTENSION_KEY;
use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
use store_api::metric_engine_consts::{
MANIFEST_INFO_EXTENSION_KEY, TABLE_COLUMN_METADATA_EXTENSION_KEY,
};
use store_api::region_engine::{
BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
RegionStatistic, SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse,
@@ -95,7 +97,7 @@ use crate::cache::CacheStrategy;
use crate::config::MitoConfig;
use crate::error::{
InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result,
SerdeJsonSnafu,
SerdeJsonSnafu, SerializeColumnMetadataSnafu,
};
#[cfg(feature = "enterprise")]
use crate::extension::BoxedExtensionRangeProviderFactory;
@@ -335,6 +337,22 @@ impl MitoEngine {
Ok(())
}
fn encode_column_metadatas_to_extensions(
region_id: &RegionId,
column_metadatas: Vec<ColumnMetadata>,
extensions: &mut HashMap<String, Vec<u8>>,
) -> Result<()> {
extensions.insert(
TABLE_COLUMN_METADATA_EXTENSION_KEY.to_string(),
ColumnMetadata::encode_list(&column_metadatas).context(SerializeColumnMetadataSnafu)?,
);
info!(
"Added column metadatas: {:?} to extensions, region_id: {:?}",
column_metadatas, region_id
);
Ok(())
}
/// Find the current version's memtables and SSTs stats by region_id.
/// The stats must be collected in one place one time to ensure data consistency.
pub fn find_memtable_and_sst_stats(
@@ -695,6 +713,7 @@ impl RegionEngine for MitoEngine {
.start_timer();
let is_alter = matches!(request, RegionRequest::Alter(_));
let is_create = matches!(request, RegionRequest::Create(_));
let mut response = self
.inner
.handle_request(region_id, request)
@@ -703,14 +722,11 @@ impl RegionEngine for MitoEngine {
.map_err(BoxedError::new)?;
if is_alter {
if let Some(statistic) = self.region_statistic(region_id) {
Self::encode_manifest_info_to_extensions(
&region_id,
statistic.manifest,
&mut response.extensions,
)
self.handle_alter_response(region_id, &mut response)
.map_err(BoxedError::new)?;
} else if is_create {
self.handle_create_response(region_id, &mut response)
.map_err(BoxedError::new)?;
}
}
Ok(response)
@@ -803,6 +819,55 @@ impl RegionEngine for MitoEngine {
}
}
impl MitoEngine {
fn handle_alter_response(
&self,
region_id: RegionId,
response: &mut RegionResponse,
) -> Result<()> {
if let Some(statistic) = self.region_statistic(region_id) {
Self::encode_manifest_info_to_extensions(
&region_id,
statistic.manifest,
&mut response.extensions,
)?;
}
let column_metadatas = self
.inner
.find_region(region_id)
.ok()
.map(|r| r.metadata().column_metadatas.clone());
if let Some(column_metadatas) = column_metadatas {
Self::encode_column_metadatas_to_extensions(
&region_id,
column_metadatas,
&mut response.extensions,
)?;
}
Ok(())
}
fn handle_create_response(
&self,
region_id: RegionId,
response: &mut RegionResponse,
) -> Result<()> {
let column_metadatas = self
.inner
.find_region(region_id)
.ok()
.map(|r| r.metadata().column_metadatas.clone());
if let Some(column_metadatas) = column_metadatas {
Self::encode_column_metadatas_to_extensions(
&region_id,
column_metadatas,
&mut response.extensions,
)?;
}
Ok(())
}
}
// Tests methods.
#[cfg(any(test, feature = "test"))]
#[allow(clippy::too_many_arguments)]

View File

@@ -20,16 +20,18 @@ use std::time::Duration;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, Row, Rows, SemanticType};
use common_error::ext::ErrorExt;
use common_meta::ddl::utils::{parse_column_metadatas, parse_manifest_infos_from_extensions};
use common_recordbatch::RecordBatches;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend, FulltextOptions};
use store_api::metadata::ColumnMetadata;
use store_api::region_engine::{RegionEngine, RegionRole};
use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY;
use store_api::region_engine::{RegionEngine, RegionManifestInfo, RegionRole};
use store_api::region_request::{
AddColumn, AddColumnLocation, AlterKind, ApiSetIndexOptions, RegionAlterRequest,
RegionOpenRequest, RegionRequest, SetRegionOption,
};
use store_api::storage::{RegionId, ScanRequest};
use store_api::storage::{ColumnId, RegionId, ScanRequest};
use crate::config::MitoConfig;
use crate::engine::listener::{AlterFlushListener, NotifyRegionChangeResultListener};
@@ -113,6 +115,17 @@ fn check_region_version(
assert_eq!(flushed_sequence, version_data.version.flushed_sequence);
}
fn assert_column_metadatas(column_name: &[(&str, ColumnId)], column_metadatas: &[ColumnMetadata]) {
assert_eq!(column_name.len(), column_metadatas.len());
for (name, id) in column_name {
let column_metadata = column_metadatas
.iter()
.find(|c| c.column_id == *id)
.unwrap();
assert_eq!(column_metadata.column_schema.name, *name);
}
}
#[tokio::test]
async fn test_alter_region() {
common_telemetry::init_default_ut_logging();
@@ -136,10 +149,16 @@ async fn test_alter_region() {
let column_schemas = rows_schema(&request);
let region_dir = request.region_dir.clone();
engine
let response = engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let column_metadatas =
parse_column_metadatas(&response.extensions, TABLE_COLUMN_METADATA_EXTENSION_KEY).unwrap();
assert_column_metadatas(
&[("tag_0", 0), ("field_0", 1), ("ts", 2)],
&column_metadatas,
);
let rows = Rows {
schema: column_schemas,
@@ -148,7 +167,7 @@ async fn test_alter_region() {
put_rows(&engine, region_id, rows).await;
let request = add_tag1();
engine
let response = engine
.handle_request(region_id, RegionRequest::Alter(request))
.await
.unwrap();
@@ -164,6 +183,18 @@ async fn test_alter_region() {
scan_check_after_alter(&engine, region_id, expected).await;
check_region_version(&engine, region_id, 1, 3, 1, 3);
let mut manifests = parse_manifest_infos_from_extensions(&response.extensions).unwrap();
assert_eq!(manifests.len(), 1);
let (return_region_id, manifest) = manifests.remove(0);
assert_eq!(return_region_id, region_id);
assert_eq!(manifest, RegionManifestInfo::mito(2, 1));
let column_metadatas =
parse_column_metadatas(&response.extensions, TABLE_COLUMN_METADATA_EXTENSION_KEY).unwrap();
assert_column_metadatas(
&[("tag_0", 0), ("field_0", 1), ("ts", 2), ("tag_1", 3)],
&column_metadatas,
);
// Reopen region.
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
engine

View File

@@ -96,6 +96,14 @@ pub enum Error {
error: serde_json::Error,
},
#[snafu(display("Failed to serialize column metadata"))]
SerializeColumnMetadata {
#[snafu(source)]
error: serde_json::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid scan index, start: {}, end: {}", start, end))]
InvalidScanIndex {
start: ManifestVersion,
@@ -1063,7 +1071,8 @@ impl ErrorExt for Error {
| NoCheckpoint { .. }
| NoManifests { .. }
| InstallManifestTo { .. }
| Unexpected { .. } => StatusCode::Unexpected,
| Unexpected { .. }
| SerializeColumnMetadata { .. } => StatusCode::Unexpected,
RegionNotFound { .. } => StatusCode::RegionNotFound,
ObjectStoreNotFound { .. }