diff --git a/Cargo.lock b/Cargo.lock index 9a77225fb5..de6ba8d9a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3635,7 +3635,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=20cdc57c3f320345b122eea43bc549a19d342e51#20cdc57c3f320345b122eea43bc549a19d342e51" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=3137cd184770e03f6a4dc191deaf02beb11fae7d#3137cd184770e03f6a4dc191deaf02beb11fae7d" dependencies = [ "prost 0.12.1", "serde", diff --git a/Cargo.toml b/Cargo.toml index 2ec02a2783..8b2c47f7bd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -79,7 +79,7 @@ derive_builder = "0.12" etcd-client = "0.11" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "20cdc57c3f320345b122eea43bc549a19d342e51" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "3137cd184770e03f6a4dc191deaf02beb11fae7d" } humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index 11772298b1..a829d969df 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -16,7 +16,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; -use api::v1::meta::{HeartbeatRequest, Peer, RegionStat, Role}; +use api::v1::meta::{HeartbeatRequest, Peer, RegionRole, RegionStat, Role}; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; @@ -282,14 +282,15 @@ impl HeartbeatTask { let regions = region_server.opened_regions(); let mut region_stats = Vec::new(); - for (region_id, engine) in regions { + for stat in regions { let approximate_bytes = region_server - .region_disk_usage(region_id) + .region_disk_usage(stat.region_id) .await .unwrap_or(0); let region_stat = RegionStat { - region_id: region_id.as_u64(), - engine, + region_id: stat.region_id.as_u64(), + engine: stat.engine, + role: RegionRole::from(stat.role).into(), approximate_bytes, // TODO(ruihang): scratch more info ..Default::default() diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index bcd17119ab..521dfe7c4e 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -47,7 +47,7 @@ use servers::grpc::region_server::RegionServerHandler; use session::context::{QueryContextBuilder, QueryContextRef}; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; -use store_api::region_engine::RegionEngineRef; +use store_api::region_engine::{RegionEngineRef, RegionRole}; use store_api::region_request::{RegionCloseRequest, RegionRequest}; use store_api::storage::{RegionId, ScanRequest}; use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; @@ -66,6 +66,12 @@ pub struct RegionServer { inner: Arc, } +pub struct RegionStat { + pub region_id: RegionId, + pub engine: String, + pub role: RegionRole, +} + impl RegionServer { pub fn new( query_engine: QueryEngineRef, @@ -97,11 +103,19 @@ impl RegionServer { self.inner.handle_read(request).await } - pub fn opened_regions(&self) -> Vec<(RegionId, String)> { + pub fn opened_regions(&self) -> Vec { self.inner .region_map .iter() - .map(|e| (*e.key(), e.value().name().to_string())) + .filter_map(|e| { + let region_id = *e.key(); + // Filters out any regions whose role equals None. + e.role(region_id).map(|role| RegionStat { + region_id, + engine: e.value().name().to_string(), + role, + }) + }) .collect() } diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index eb0e7003e4..20896efd25 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -37,7 +37,7 @@ use query::query_engine::DescribeResult; use query::QueryEngine; use session::context::QueryContextRef; use store_api::metadata::RegionMetadataRef; -use store_api::region_engine::RegionEngine; +use store_api::region_engine::{RegionEngine, RegionRole}; use store_api::region_request::RegionRequest; use store_api::storage::{RegionId, ScanRequest}; use table::TableRef; @@ -190,4 +190,8 @@ impl RegionEngine for MockRegionEngine { fn set_writable(&self, _region_id: RegionId, _writable: bool) -> Result<(), BoxedError> { Ok(()) } + + fn role(&self, _region_id: RegionId) -> Option { + Some(RegionRole::Leader) + } } diff --git a/src/file-engine/src/engine.rs b/src/file-engine/src/engine.rs index a2c808ea10..867d39a282 100644 --- a/src/file-engine/src/engine.rs +++ b/src/file-engine/src/engine.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::collections::HashMap; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use async_trait::async_trait; use common_catalog::consts::FILE_ENGINE; @@ -24,12 +24,12 @@ use common_telemetry::{error, info}; use object_store::ObjectStore; use snafu::{ensure, OptionExt}; use store_api::metadata::RegionMetadataRef; -use store_api::region_engine::RegionEngine; +use store_api::region_engine::{RegionEngine, RegionRole}; use store_api::region_request::{ RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest, RegionRequest, }; use store_api::storage::{RegionId, ScanRequest}; -use tokio::sync::{Mutex, RwLock}; +use tokio::sync::Mutex; use crate::config::EngineConfig; use crate::error::{ @@ -102,6 +102,10 @@ impl RegionEngine for FileRegionEngine { .set_writable(region_id, writable) .map_err(BoxedError::new) } + + fn role(&self, region_id: RegionId) -> Option { + self.inner.state(region_id) + } } struct EngineInner { @@ -147,7 +151,7 @@ impl EngineInner { async fn stop(&self) -> EngineResult<()> { let _lock = self.region_mutex.lock().await; - self.regions.write().await.clear(); + self.regions.write().unwrap().clear(); Ok(()) } @@ -155,6 +159,14 @@ impl EngineInner { // TODO(zhongzc): Improve the semantics and implementation of this API. Ok(()) } + + fn state(&self, region_id: RegionId) -> Option { + if self.regions.read().unwrap().get(®ion_id).is_some() { + Some(RegionRole::Leader) + } else { + None + } + } } impl EngineInner { @@ -189,7 +201,7 @@ impl EngineInner { region_id, err ); })?; - self.regions.write().await.insert(region_id, region); + self.regions.write().unwrap().insert(region_id, region); info!("A new region is created, region_id: {}", region_id); Ok(Output::AffectedRows(0)) @@ -219,7 +231,7 @@ impl EngineInner { region_id, err ); })?; - self.regions.write().await.insert(region_id, region); + self.regions.write().unwrap().insert(region_id, region); info!("Region opened, region_id: {}", region_id); Ok(Output::AffectedRows(0)) @@ -232,7 +244,7 @@ impl EngineInner { ) -> EngineResult { let _lock = self.region_mutex.lock().await; - let mut regions = self.regions.write().await; + let mut regions = self.regions.write().unwrap(); if regions.remove(®ion_id).is_some() { info!("Region closed, region_id: {}", region_id); } @@ -263,17 +275,17 @@ impl EngineInner { ); })?; } - let _ = self.regions.write().await.remove(®ion_id); + let _ = self.regions.write().unwrap().remove(®ion_id); info!("Region dropped, region_id: {}", region_id); Ok(Output::AffectedRows(0)) } async fn get_region(&self, region_id: RegionId) -> Option { - self.regions.read().await.get(®ion_id).cloned() + self.regions.read().unwrap().get(®ion_id).cloned() } async fn exists(&self, region_id: RegionId) -> bool { - self.regions.read().await.contains_key(®ion_id) + self.regions.read().unwrap().contains_key(®ion_id) } } diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 293107ca9a..6251a79376 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -50,7 +50,7 @@ use object_store::manager::ObjectStoreManagerRef; use snafu::{OptionExt, ResultExt}; use store_api::logstore::LogStore; use store_api::metadata::RegionMetadataRef; -use store_api::region_engine::RegionEngine; +use store_api::region_engine::{RegionEngine, RegionRole}; use store_api::region_request::RegionRequest; use store_api::storage::{RegionId, ScanRequest}; @@ -184,6 +184,16 @@ impl EngineInner { region.set_writable(writable); Ok(()) } + + fn role(&self, region_id: RegionId) -> Option { + self.workers.get_region(region_id).map(|region| { + if region.is_writable() { + RegionRole::Leader + } else { + RegionRole::Follower + } + }) + } } #[async_trait] @@ -247,6 +257,10 @@ impl RegionEngine for MitoEngine { .set_writable(region_id, writable) .map_err(BoxedError::new) } + + fn role(&self, region_id: RegionId) -> Option { + self.inner.role(region_id) + } } // Tests methods. diff --git a/src/mito2/src/engine/open_test.rs b/src/mito2/src/engine/open_test.rs index e8254cf71d..74cc1e0df8 100644 --- a/src/mito2/src/engine/open_test.rs +++ b/src/mito2/src/engine/open_test.rs @@ -18,7 +18,7 @@ use std::time::Duration; use api::v1::Rows; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; -use store_api::region_engine::RegionEngine; +use store_api::region_engine::{RegionEngine, RegionRole}; use store_api::region_request::{ RegionCloseRequest, RegionOpenRequest, RegionPutRequest, RegionRequest, }; @@ -49,6 +49,8 @@ async fn test_engine_open_empty() { assert_eq!(StatusCode::RegionNotFound, err.status_code()); let err = engine.set_writable(region_id, true).unwrap_err(); assert_eq!(StatusCode::RegionNotFound, err.status_code()); + let role = engine.role(region_id); + assert_eq!(role, None); } #[tokio::test] @@ -124,8 +126,11 @@ async fn test_engine_open_readonly() { .unwrap_err(); assert_eq!(StatusCode::RegionReadonly, err.status_code()); + assert_eq!(Some(RegionRole::Follower), engine.role(region_id)); // Set writable and write. engine.set_writable(region_id, true).unwrap(); + assert_eq!(Some(RegionRole::Leader), engine.role(region_id)); + put_rows(&engine, region_id, rows).await; } diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 85a6be02f2..7f6762562a 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -16,6 +16,7 @@ use std::sync::Arc; +use api::greptime_proto::v1::meta::RegionRole as PbRegionRole; use async_trait::async_trait; use common_error::ext::BoxedError; use common_query::Output; @@ -25,6 +26,23 @@ use crate::metadata::RegionMetadataRef; use crate::region_request::RegionRequest; use crate::storage::{RegionId, ScanRequest}; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RegionRole { + // Readonly region(mito2), Readonly region(file). + Follower, + // Writable region(mito2). + Leader, +} + +impl From for PbRegionRole { + fn from(value: RegionRole) -> Self { + match value { + RegionRole::Follower => PbRegionRole::Follower, + RegionRole::Leader => PbRegionRole::Leader, + } + } +} + #[async_trait] pub trait RegionEngine: Send + Sync { /// Name of this engine @@ -61,6 +79,11 @@ pub trait RegionEngine: Send + Sync { /// the region as readonly doesn't guarantee that write operations in progress will not /// take effect. fn set_writable(&self, region_id: RegionId, writable: bool) -> Result<(), BoxedError>; + + /// Indicates region role. + /// + /// Returns the `None` if the region is not found. + fn role(&self, region_id: RegionId) -> Option; } pub type RegionEngineRef = Arc;