diff --git a/Cargo.lock b/Cargo.lock index c525c24055..a460533e72 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4690,7 +4690,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?branch=zhongzc%2Falter-fulltext-backend#b794184a4ce71e7fb7e1dfe17821c5a472a79588" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=fb8e20ce29afd81835e3ea3c1164c8ce10de2c65#fb8e20ce29afd81835e3ea3c1164c8ce10de2c65" dependencies = [ "prost 0.13.3", "serde", diff --git a/Cargo.toml b/Cargo.toml index 6cce8055ca..8b432668be 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -130,8 +130,7 @@ etcd-client = "0.14" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", branch = "zhongzc/alter-fulltext-backend" } -# greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "103948cbce833e1a17ee7083f5ba79564d08d6ec" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "fb8e20ce29afd81835e3ea3c1164c8ce10de2c65" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/datanode/src/alive_keeper.rs b/src/datanode/src/alive_keeper.rs index 2fdd311b35..8a84693666 100644 --- a/src/datanode/src/alive_keeper.rs +++ b/src/datanode/src/alive_keeper.rs @@ -58,17 +58,24 @@ pub struct RegionAliveKeeper { /// non-decreasing). The heartbeat requests will carry the duration since this epoch, and the /// duration acts like an "invariant point" for region's keep alive lease. epoch: Instant, + + countdown_task_ext_handler: Option, } impl RegionAliveKeeper { /// Returns an empty [RegionAliveKeeper]. - pub fn new(region_server: RegionServer, heartbeat_interval_millis: u64) -> Self { + pub fn new( + region_server: RegionServer, + countdown_task_ext_handler: Option, + heartbeat_interval_millis: u64, + ) -> Self { Self { region_server, tasks: Arc::new(Mutex::new(HashMap::new())), heartbeat_interval_millis, started: Arc::new(AtomicBool::new(false)), epoch: Instant::now(), + countdown_task_ext_handler, } } @@ -85,6 +92,7 @@ impl RegionAliveKeeper { let handle = Arc::new(CountdownTaskHandle::new( self.region_server.clone(), + self.countdown_task_ext_handler.clone(), region_id, )); @@ -114,7 +122,9 @@ impl RegionAliveKeeper { for region in regions { let (role, region_id) = (region.role().into(), RegionId::from(region.region_id)); if let Some(handle) = self.find_handle(region_id).await { - handle.reset_deadline(role, deadline).await; + handle + .reset_deadline(role, deadline, region.extensions.clone()) + .await; } else { warn!( "Trying to renew the lease for region {region_id}, the keeper handler is not found!" @@ -265,13 +275,27 @@ enum CountdownCommand { /// 4 * `heartbeat_interval_millis` Start(u64), /// Reset countdown deadline to the given instance. - /// (NextRole, Deadline) - Reset((RegionRole, Instant)), + /// (NextRole, Deadline, ExtensionInfo) + Reset((RegionRole, Instant, HashMap>)), /// Returns the current deadline of the countdown task. #[cfg(test)] Deadline(oneshot::Sender), } +pub type CountdownTaskHandlerExtRef = Arc; + +/// Extension trait for [CountdownTaskHandle] to reset deadline method. +#[async_trait] +pub trait CountdownTaskExtHandler: Send + Sync { + async fn reset_deadline( + &self, + region_server: &RegionServer, + role: RegionRole, + deadline: Instant, + extension_info: HashMap>, + ); +} + struct CountdownTaskHandle { tx: mpsc::Sender, handler: JoinHandle<()>, @@ -280,11 +304,16 @@ struct CountdownTaskHandle { impl CountdownTaskHandle { /// Creates a new [CountdownTaskHandle] and starts the countdown task. - fn new(region_server: RegionServer, region_id: RegionId) -> Self { + fn new( + region_server: RegionServer, + handler_ext: Option, + region_id: RegionId, + ) -> Self { let (tx, rx) = mpsc::channel(1024); let mut countdown_task = CountdownTask { region_server, + handler_ext, region_id, rx, }; @@ -323,10 +352,15 @@ impl CountdownTaskHandle { None } - async fn reset_deadline(&self, role: RegionRole, deadline: Instant) { + async fn reset_deadline( + &self, + role: RegionRole, + deadline: Instant, + extension_info: HashMap>, + ) { if let Err(e) = self .tx - .send(CountdownCommand::Reset((role, deadline))) + .send(CountdownCommand::Reset((role, deadline, extension_info))) .await { warn!( @@ -350,6 +384,7 @@ impl Drop for CountdownTaskHandle { struct CountdownTask { region_server: RegionServer, region_id: RegionId, + handler_ext: Option, rx: mpsc::Receiver, } @@ -379,10 +414,18 @@ impl CountdownTask { started = true; } }, - Some(CountdownCommand::Reset((role, deadline))) => { + Some(CountdownCommand::Reset((role, deadline, extension_info))) => { if let Err(err) = self.region_server.set_region_role(self.region_id, role) { error!(err; "Failed to set region role to {role} for region {region_id}"); } + if let Some(ext_handler) = self.handler_ext.as_ref() { + ext_handler.reset_deadline( + &self.region_server, + role, + deadline, + extension_info, + ).await; + } trace!( "Reset deadline of region {region_id} to approximately {} seconds later.", (deadline - Instant::now()).as_secs_f32(), @@ -435,7 +478,7 @@ mod test { let engine = Arc::new(engine); region_server.register_engine(engine.clone()); - let alive_keeper = Arc::new(RegionAliveKeeper::new(region_server.clone(), 100)); + let alive_keeper = Arc::new(RegionAliveKeeper::new(region_server.clone(), None, 100)); let region_id = RegionId::new(1024, 1); let builder = CreateRequestBuilder::new(); @@ -472,7 +515,7 @@ mod test { &[GrantedRegion { region_id: region_id.as_u64(), role: api::v1::meta::RegionRole::Leader.into(), - manifest_version: 0, + extensions: HashMap::new(), }], Instant::now() + Duration::from_millis(200), ) @@ -497,7 +540,8 @@ mod test { async fn countdown_task() { let region_server = mock_region_server(); - let countdown_handle = CountdownTaskHandle::new(region_server, RegionId::new(9999, 2)); + let countdown_handle = + CountdownTaskHandle::new(region_server, None, RegionId::new(9999, 2)); // If countdown task is not started, its deadline is set to far future. assert!( @@ -527,6 +571,7 @@ mod test { .reset_deadline( RegionRole::Leader, Instant::now() + Duration::from_millis(heartbeat_interval_millis * 5), + HashMap::new(), ) .await; assert!( diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 4c352f9ac1..b32a1668c6 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -265,6 +265,7 @@ impl DatanodeBuilder { region_server.clone(), meta_client, cache_registry, + self.plugins.clone(), ) .await?, ) diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index 04d38a3524..606144ee38 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use std::time::Duration; use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat}; +use common_base::Plugins; use common_meta::cache_invalidator::CacheInvalidatorRef; use common_meta::datanode::REGION_STATISTIC_KEY; use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS; @@ -37,7 +38,7 @@ use tokio::sync::{mpsc, Notify}; use tokio::time::Instant; use self::handler::RegionHeartbeatResponseHandler; -use crate::alive_keeper::RegionAliveKeeper; +use crate::alive_keeper::{CountdownTaskHandlerExtRef, RegionAliveKeeper}; use crate::config::DatanodeOptions; use crate::error::{self, MetaClientInitSnafu, Result}; use crate::event_listener::RegionServerEventReceiver; @@ -73,9 +74,12 @@ impl HeartbeatTask { region_server: RegionServer, meta_client: MetaClientRef, cache_invalidator: CacheInvalidatorRef, + plugins: Plugins, ) -> Result { + let countdown_task_handler_ext = plugins.get::(); let region_alive_keeper = Arc::new(RegionAliveKeeper::new( region_server.clone(), + countdown_task_handler_ext, opts.heartbeat.interval.as_millis() as u64, )); let resp_handler_executor = Arc::new(HandlerGroupExecutor::new(vec![ diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 92e6e9138c..c05291050c 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -55,7 +55,7 @@ use store_api::metric_engine_consts::{ FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, }; use store_api::region_engine::{ - RegionEngineRef, RegionRole, RegionStatistic, SetRegionRoleStateResponse, + RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, SetRegionRoleStateResponse, SettableRegionRoleState, }; use store_api::region_request::{ @@ -308,6 +308,22 @@ impl RegionServer { .with_context(|_| HandleRegionRequestSnafu { region_id }) } + pub async fn sync_region_manifest( + &self, + region_id: RegionId, + manifest_info: RegionManifestInfo, + ) -> Result<()> { + let engine = self + .inner + .region_map + .get(®ion_id) + .with_context(|| RegionNotFoundSnafu { region_id })?; + engine + .sync_region(region_id, manifest_info) + .await + .with_context(|_| HandleRegionRequestSnafu { region_id }) + } + /// Set region role state gracefully. /// /// For [SettableRegionRoleState::Follower]: diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index cd911f49aa..b349024cc9 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -32,8 +32,8 @@ use query::{QueryEngine, QueryEngineContext}; use session::context::QueryContextRef; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{ - RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, SetRegionRoleStateResponse, - SettableRegionRoleState, + RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic, + SetRegionRoleStateResponse, SettableRegionRoleState, }; use store_api::region_request::{AffectedRows, RegionRequest}; use store_api::storage::{RegionId, ScanRequest, SequenceNumber}; @@ -246,7 +246,11 @@ impl RegionEngine for MockRegionEngine { Some(RegionRole::Leader) } - async fn sync_region(&self, _region_id: RegionId, _version: u64) -> Result<(), BoxedError> { + async fn sync_region( + &self, + _region_id: RegionId, + _manifest_info: RegionManifestInfo, + ) -> Result<(), BoxedError> { unimplemented!() } diff --git a/src/file-engine/src/engine.rs b/src/file-engine/src/engine.rs index 77c935b2b3..9e9d1aa405 100644 --- a/src/file-engine/src/engine.rs +++ b/src/file-engine/src/engine.rs @@ -24,11 +24,10 @@ use common_recordbatch::SendableRecordBatchStream; use common_telemetry::{error, info}; use object_store::ObjectStore; use snafu::{ensure, OptionExt}; -use store_api::manifest::ManifestVersion; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{ - RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, SetRegionRoleStateResponse, - SettableRegionRoleState, SinglePartitionScanner, + RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic, + SetRegionRoleStateResponse, SettableRegionRoleState, SinglePartitionScanner, }; use store_api::region_request::{ AffectedRows, RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest, @@ -142,7 +141,7 @@ impl RegionEngine for FileRegionEngine { async fn sync_region( &self, _region_id: RegionId, - _manifest_version: ManifestVersion, + _manifest_info: RegionManifestInfo, ) -> Result<(), BoxedError> { // File engine doesn't need to sync region manifest. Ok(()) diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 42994c0437..d1e837d078 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -36,7 +36,6 @@ use common_error::status_code::StatusCode; use mito2::engine::MitoEngine; pub(crate) use options::IndexOptions; use snafu::ResultExt; -use store_api::manifest::ManifestVersion; use store_api::metadata::RegionMetadataRef; use store_api::metric_engine_consts::METRIC_ENGINE_NAME; use store_api::region_engine::{ @@ -49,7 +48,7 @@ use store_api::storage::{RegionId, ScanRequest, SequenceNumber}; use self::state::MetricEngineState; use crate::config::EngineConfig; use crate::data_region::DataRegion; -use crate::error::{self, Result, UnsupportedRegionRequestSnafu, UnsupportedSyncRegionSnafu}; +use crate::error::{self, MetricManifestInfoSnafu, Result, UnsupportedRegionRequestSnafu}; use crate::metadata_region::MetadataRegion; use crate::row_modifier::RowModifier; use crate::utils; @@ -274,10 +273,10 @@ impl RegionEngine for MetricEngine { sst_size: metadata_stat.sst_size + data_stat.sst_size, index_size: metadata_stat.index_size + data_stat.index_size, manifest: RegionManifestInfo::Metric { - data_flushed_entry_id: data_stat.manifest.flushed_entry_id(), - data_manifest_version: data_stat.manifest.manifest_version(), - metadata_flushed_entry_id: metadata_stat.manifest.flushed_entry_id(), - metadata_manifest_version: metadata_stat.manifest.manifest_version(), + data_flushed_entry_id: data_stat.manifest.data_flushed_entry_id(), + data_manifest_version: data_stat.manifest.data_manifest_version(), + metadata_flushed_entry_id: metadata_stat.manifest.data_flushed_entry_id(), + metadata_manifest_version: metadata_stat.manifest.data_manifest_version(), }, }), _ => None, @@ -310,11 +309,42 @@ impl RegionEngine for MetricEngine { async fn sync_region( &self, - _region_id: RegionId, - _manifest_version: ManifestVersion, + region_id: RegionId, + manifest_info: RegionManifestInfo, ) -> Result<(), BoxedError> { - // TODO(weny): implement it later. - Err(BoxedError::new(UnsupportedSyncRegionSnafu {}.build())) + if !manifest_info.is_metric() { + return Err(BoxedError::new( + MetricManifestInfoSnafu { region_id }.build(), + )); + } + + let metadata_region_id = utils::to_metadata_region_id(region_id); + // checked by ensure above + let metadata_manifest_version = manifest_info + .metadata_manifest_version() + .unwrap_or_default(); + let metadata_flushed_entry_id = manifest_info + .metadata_flushed_entry_id() + .unwrap_or_default(); + let metadata_region_manifest = + RegionManifestInfo::mito(metadata_manifest_version, metadata_flushed_entry_id); + self.inner + .mito + .sync_region(metadata_region_id, metadata_region_manifest) + .await?; + + let data_region_id = utils::to_data_region_id(region_id); + let data_manifest_version = manifest_info.data_manifest_version(); + let data_flushed_entry_id = manifest_info.data_flushed_entry_id(); + let data_region_manifest = + RegionManifestInfo::mito(data_manifest_version, data_flushed_entry_id); + + self.inner + .mito + .sync_region(data_region_id, data_region_manifest) + .await?; + + Ok(()) } async fn set_region_role_state_gracefully( diff --git a/src/metric-engine/src/error.rs b/src/metric-engine/src/error.rs index cd321a8841..8be535ec9f 100644 --- a/src/metric-engine/src/error.rs +++ b/src/metric-engine/src/error.rs @@ -260,8 +260,9 @@ pub enum Error { location: Location, }, - #[snafu(display("Unsupported sync region request"))] - UnsupportedSyncRegion { + #[snafu(display("Expected metric manifest info, region: {}", region_id))] + MetricManifestInfo { + region_id: RegionId, #[snafu(implicit)] location: Location, }, @@ -286,9 +287,9 @@ impl ErrorExt for Error { | UnexpectedRequest { .. } | UnsupportedAlterKind { .. } => StatusCode::InvalidArguments, - ForbiddenPhysicalAlter { .. } - | UnsupportedRegionRequest { .. } - | UnsupportedSyncRegion { .. } => StatusCode::Unsupported, + ForbiddenPhysicalAlter { .. } | UnsupportedRegionRequest { .. } => { + StatusCode::Unsupported + } DeserializeColumnMetadata { .. } | SerializeColumnMetadata { .. } @@ -315,6 +316,8 @@ impl ErrorExt for Error { EncodePrimaryKey { source, .. } => source.status_code(), CollectRecordBatchStream { source, .. } => source.status_code(), + + MetricManifestInfo { .. } => StatusCode::Internal, } } diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 6203f5884a..110f79b875 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -81,8 +81,8 @@ use store_api::logstore::LogStore; use store_api::manifest::ManifestVersion; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{ - BatchResponses, RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, - SetRegionRoleStateResponse, SettableRegionRoleState, + BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, + RegionStatistic, SetRegionRoleStateResponse, SettableRegionRoleState, }; use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest}; use store_api::storage::{RegionId, ScanRequest, SequenceNumber}; @@ -91,7 +91,8 @@ use tokio::sync::{oneshot, Semaphore}; use crate::cache::CacheStrategy; use crate::config::MitoConfig; use crate::error::{ - InvalidRequestSnafu, JoinSnafu, RecvSnafu, RegionNotFoundSnafu, Result, SerdeJsonSnafu, + InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result, + SerdeJsonSnafu, }; use crate::manifest::action::RegionEdit; use crate::metrics::HANDLE_REQUEST_ELAPSED; @@ -494,8 +495,10 @@ impl EngineInner { async fn sync_region( &self, region_id: RegionId, - manifest_version: ManifestVersion, + manifest_info: RegionManifestInfo, ) -> Result { + ensure!(manifest_info.is_mito(), MitoManifestInfoSnafu); + let manifest_version = manifest_info.data_manifest_version(); let (request, receiver) = WorkerRequest::new_sync_region_request(region_id, manifest_version); self.workers.submit_to_worker(region_id, request).await?; @@ -627,10 +630,10 @@ impl RegionEngine for MitoEngine { async fn sync_region( &self, region_id: RegionId, - manifest_version: ManifestVersion, + manifest_info: RegionManifestInfo, ) -> Result<(), BoxedError> { self.inner - .sync_region(region_id, manifest_version) + .sync_region(region_id, manifest_info) .await .map_err(BoxedError::new) .map(|_| ()) diff --git a/src/mito2/src/engine/sync_test.rs b/src/mito2/src/engine/sync_test.rs index 44f7e33c4e..3c0120fd4d 100644 --- a/src/mito2/src/engine/sync_test.rs +++ b/src/mito2/src/engine/sync_test.rs @@ -20,7 +20,7 @@ use common_recordbatch::RecordBatches; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; use store_api::metadata::ColumnMetadata; -use store_api::region_engine::RegionEngine; +use store_api::region_engine::{RegionEngine, RegionManifestInfo}; use store_api::region_request::{ AddColumn, AddColumnLocation, AlterKind, RegionAlterRequest, RegionOpenRequest, RegionRequest, }; @@ -134,11 +134,19 @@ async fn test_sync_after_flush_region() { scan_check(&follower_engine, region_id, expected, 0, 0).await; // Returns error since the max manifest is 1 - let err = follower_engine.sync_region(region_id, 2).await.unwrap_err(); + let manifest_info = RegionManifestInfo::mito(2, 0); + let err = follower_engine + .sync_region(region_id, manifest_info) + .await + .unwrap_err(); let err = err.as_any().downcast_ref::().unwrap(); assert_matches!(err, Error::InstallManifestTo { .. }); - follower_engine.sync_region(region_id, 1).await.unwrap(); + let manifest_info = RegionManifestInfo::mito(1, 0); + follower_engine + .sync_region(region_id, manifest_info) + .await + .unwrap(); common_telemetry::info!("Scan the region on the follower engine after sync"); // Scan the region on the follower engine let expected = "\ @@ -222,7 +230,11 @@ async fn test_sync_after_alter_region() { scan_check(&follower_engine, region_id, expected, 0, 0).await; // Sync the region from the leader engine to the follower engine - follower_engine.sync_region(region_id, 2).await.unwrap(); + let manifest_info = RegionManifestInfo::mito(2, 0); + follower_engine + .sync_region(region_id, manifest_info) + .await + .unwrap(); let expected = "\ +-------+-------+---------+---------------------+ | tag_1 | tag_0 | field_0 | ts | diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index e89c318020..5bd183ee99 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -1023,6 +1023,12 @@ pub enum Error { #[snafu(display("Incompatible WAL provider change. This is typically caused by changing WAL provider in database config file without completely cleaning existing files. Global provider: {}, region provider: {}", global, region))] IncompatibleWalProviderChange { global: String, region: String }, + + #[snafu(display("Expected mito manifest info"))] + MitoManifestInfo { + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -1099,7 +1105,8 @@ impl ErrorExt for Error { | ReadDataPart { .. } | CorruptedEntry { .. } | BuildEntry { .. } - | Metadata { .. } => StatusCode::Internal, + | Metadata { .. } + | MitoManifestInfo { .. } => StatusCode::Internal, OpenRegion { source, .. } => source.status_code(), diff --git a/src/query/src/optimizer/test_util.rs b/src/query/src/optimizer/test_util.rs index a02d2ead5a..72e4ad093a 100644 --- a/src/query/src/optimizer/test_util.rs +++ b/src/query/src/optimizer/test_util.rs @@ -24,13 +24,12 @@ use async_trait::async_trait; use common_error::ext::{BoxedError, PlainError}; use common_error::status_code::StatusCode; use datatypes::schema::ColumnSchema; -use store_api::manifest::ManifestVersion; use store_api::metadata::{ ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef, }; use store_api::region_engine::{ - RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, SetRegionRoleStateResponse, - SettableRegionRoleState, + RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic, + SetRegionRoleStateResponse, SettableRegionRoleState, }; use store_api::region_request::RegionRequest; use store_api::storage::{ConcreteDataType, RegionId, ScanRequest, SequenceNumber}; @@ -113,7 +112,7 @@ impl RegionEngine for MetaRegionEngine { async fn sync_region( &self, _region_id: RegionId, - _manifest_version: ManifestVersion, + _manifest_info: RegionManifestInfo, ) -> Result<(), BoxedError> { unimplemented!() } diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 62a310700c..d4df5216f4 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -34,7 +34,6 @@ use serde::{Deserialize, Serialize}; use tokio::sync::Semaphore; use crate::logstore::entry; -use crate::manifest::ManifestVersion; use crate::metadata::RegionMetadataRef; use crate::region_request::{ BatchRegionDdlRequest, RegionOpenRequest, RegionRequest, RegionSequencesRequest, @@ -81,11 +80,11 @@ impl SetRegionRoleStateResponse { } } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct GrantedRegion { pub region_id: RegionId, pub region_role: RegionRole, - pub manifest_version: u64, + pub extensions: HashMap>, } impl GrantedRegion { @@ -93,7 +92,7 @@ impl GrantedRegion { Self { region_id, region_role, - manifest_version: 0, + extensions: HashMap::new(), } } } @@ -103,7 +102,7 @@ impl From for PbGrantedRegion { PbGrantedRegion { region_id: value.region_id.as_u64(), role: PbRegionRole::from(value.region_role).into(), - manifest_version: value.manifest_version, + extensions: value.extensions, } } } @@ -113,7 +112,7 @@ impl From for GrantedRegion { GrantedRegion { region_id: RegionId::from_u64(value.region_id), region_role: value.role().into(), - manifest_version: value.manifest_version, + extensions: value.extensions, } } } @@ -387,6 +386,7 @@ pub struct RegionStatistic { pub manifest: RegionManifestInfo, } +/// The manifest info of a region. #[derive(Debug, Clone, Serialize, Deserialize)] pub enum RegionManifestInfo { Mito { @@ -402,8 +402,41 @@ pub enum RegionManifestInfo { } impl RegionManifestInfo { + /// Creates a new [RegionManifestInfo] for mito2 engine. + pub fn mito(manifest_version: u64, flushed_entry_id: u64) -> Self { + Self::Mito { + manifest_version, + flushed_entry_id, + } + } + + /// Creates a new [RegionManifestInfo] for metric engine. + pub fn metric( + data_manifest_version: u64, + data_flushed_entry_id: u64, + metadata_manifest_version: u64, + metadata_flushed_entry_id: u64, + ) -> Self { + Self::Metric { + data_manifest_version, + data_flushed_entry_id, + metadata_manifest_version, + metadata_flushed_entry_id, + } + } + + /// Returns true if the region is a mito2 region. + pub fn is_mito(&self) -> bool { + matches!(self, RegionManifestInfo::Mito { .. }) + } + + /// Returns true if the region is a metric region. + pub fn is_metric(&self) -> bool { + matches!(self, RegionManifestInfo::Metric { .. }) + } + /// Returns the flushed entry id of the data region. - pub fn flushed_entry_id(&self) -> u64 { + pub fn data_flushed_entry_id(&self) -> u64 { match self { RegionManifestInfo::Mito { flushed_entry_id, .. @@ -416,15 +449,37 @@ impl RegionManifestInfo { } /// Returns the manifest version of the data region. - pub fn manifest_version(&self) -> u64 { + pub fn data_manifest_version(&self) -> u64 { match self { RegionManifestInfo::Mito { manifest_version, .. } => *manifest_version, + RegionManifestInfo::Metric { + data_manifest_version, + .. + } => *data_manifest_version, + } + } + + /// Returns the manifest version of the metadata region. + pub fn metadata_manifest_version(&self) -> Option { + match self { + RegionManifestInfo::Mito { .. } => None, RegionManifestInfo::Metric { metadata_manifest_version, .. - } => *metadata_manifest_version, + } => Some(*metadata_manifest_version), + } + } + + /// Returns the flushed entry id of the metadata region. + pub fn metadata_flushed_entry_id(&self) -> Option { + match self { + RegionManifestInfo::Mito { .. } => None, + RegionManifestInfo::Metric { + metadata_flushed_entry_id, + .. + } => Some(*metadata_flushed_entry_id), } } } @@ -566,7 +621,7 @@ pub trait RegionEngine: Send + Sync { async fn sync_region( &self, region_id: RegionId, - manifest_version: ManifestVersion, + manifest_info: RegionManifestInfo, ) -> Result<(), BoxedError>; /// Sets region role state gracefully.