From 37bc2e6b076dfa6f5e7d1cc558e19e0688551024 Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Wed, 29 Oct 2025 10:59:36 +0800 Subject: [PATCH] feat: gc worker heartbeat instruction (#7118) again false by default test: config api refactor: per code review less info! even less info!! docs: gc regions instr refactor: grp by region id per code review per review error handling? test: fix todos aft rebase fix after refactor Signed-off-by: discord9 --- src/common/meta/src/datanode.rs | 58 +++++- src/common/meta/src/instruction.rs | 136 ++++++++++++- src/datanode/src/error.rs | 25 ++- src/datanode/src/heartbeat.rs | 21 +- src/datanode/src/heartbeat/handler.rs | 21 +- .../src/heartbeat/handler/file_ref.rs | 62 ++++++ .../src/heartbeat/handler/gc_worker.rs | 156 +++++++++++++++ src/datanode/src/region_server.rs | 33 +++- src/meta-srv/src/service/admin/heartbeat.rs | 2 +- src/mito2/src/config.rs | 4 + src/mito2/src/engine.rs | 32 ++- src/mito2/src/error.rs | 8 +- src/mito2/src/gc.rs | 182 +++++++++++++----- src/mito2/src/metrics.rs | 6 +- src/mito2/src/region.rs | 4 + src/mito2/src/sst/file_purger.rs | 1 + src/mito2/src/sst/file_ref.rs | 145 +++++++------- src/mito2/src/worker.rs | 11 ++ src/store-api/src/storage.rs | 2 +- src/store-api/src/storage/descriptors.rs | 26 ++- src/store-api/src/storage/file.rs | 73 +++++++ tests-integration/tests/http.rs | 7 + 22 files changed, 869 insertions(+), 146 deletions(-) create mode 100644 src/datanode/src/heartbeat/handler/file_ref.rs create mode 100644 src/datanode/src/heartbeat/handler/gc_worker.rs diff --git a/src/common/meta/src/datanode.rs b/src/common/meta/src/datanode.rs index 2083b5886b..ffa85b4a7e 100644 --- a/src/common/meta/src/datanode.rs +++ b/src/common/meta/src/datanode.rs @@ -25,8 +25,7 @@ use store_api::region_engine::{RegionRole, RegionStatistic}; use store_api::storage::RegionId; use table::metadata::TableId; -use crate::error; -use crate::error::Result; +use crate::error::{self, DeserializeFromJsonSnafu, Result}; use crate::heartbeat::utils::get_datanode_workloads; const DATANODE_STAT_PREFIX: &str = "__meta_datanode_stat"; @@ -66,10 +65,12 @@ pub struct Stat { pub node_epoch: u64, /// The datanode workloads. pub datanode_workloads: DatanodeWorkloads, + /// The GC statistics of the datanode. + pub gc_stat: Option, } /// The statistics of a region. -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct RegionStat { /// The region_id. pub id: RegionId, @@ -126,7 +127,7 @@ pub trait TopicStatsReporter: Send + Sync { fn reportable_topics(&mut self) -> Vec; } -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)] pub enum RegionManifestInfo { Mito { manifest_version: u64, @@ -222,11 +223,12 @@ impl TryFrom<&HeartbeatRequest> for Stat { node_epoch, node_workloads, topic_stats, + extensions, .. } = value; match (header, peer) { - (Some(_header), Some(peer)) => { + (Some(header), Some(peer)) => { let region_stats = region_stats .iter() .map(RegionStat::from) @@ -234,6 +236,14 @@ impl TryFrom<&HeartbeatRequest> for Stat { let topic_stats = topic_stats.iter().map(TopicStat::from).collect::>(); let datanode_workloads = get_datanode_workloads(node_workloads.as_ref()); + + let gc_stat = GcStat::from_extensions(extensions).map_err(|err| { + common_telemetry::error!( + "Failed to deserialize GcStat from extensions: {}", + err + ); + header.clone() + })?; Ok(Self { timestamp_millis: time_util::current_time_millis(), // datanode id @@ -247,6 +257,7 @@ impl TryFrom<&HeartbeatRequest> for Stat { topic_stats, node_epoch: *node_epoch, datanode_workloads, + gc_stat, }) } (header, _) => Err(header.clone()), @@ -319,6 +330,43 @@ impl From<&api::v1::meta::TopicStat> for TopicStat { } } +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct GcStat { + /// Number of GC tasks currently running on the datanode. + pub running_gc_tasks: u32, + /// The maximum number of concurrent GC tasks the datanode can handle. + pub gc_concurrency: u32, +} + +impl GcStat { + pub const GC_STAT_KEY: &str = "__gc_stat"; + + pub fn new(running_gc_tasks: u32, gc_concurrency: u32) -> Self { + Self { + running_gc_tasks, + gc_concurrency, + } + } + + pub fn into_extensions(&self, extensions: &mut std::collections::HashMap>) { + let bytes = serde_json::to_vec(self).unwrap_or_default(); + extensions.insert(Self::GC_STAT_KEY.to_string(), bytes); + } + + pub fn from_extensions( + extensions: &std::collections::HashMap>, + ) -> Result> { + extensions + .get(Self::GC_STAT_KEY) + .map(|bytes| { + serde_json::from_slice(bytes).with_context(|_| DeserializeFromJsonSnafu { + input: String::from_utf8_lossy(bytes).to_string(), + }) + }) + .transpose() + } +} + /// The key of the datanode stat in the memory store. /// /// The format is `__meta_datanode_stat-0-{node_id}`. diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index c7bd82d675..d8e5affe30 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -17,7 +17,7 @@ use std::fmt::{Display, Formatter}; use std::time::Duration; use serde::{Deserialize, Deserializer, Serialize}; -use store_api::storage::{RegionId, RegionNumber}; +use store_api::storage::{FileRefsManifest, GcReport, RegionId, RegionNumber}; use strum::Display; use table::metadata::TableId; use table::table_name::TableName; @@ -417,6 +417,88 @@ where }) } +/// Instruction to get file references for specified regions. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct GetFileRefs { + /// List of region IDs to get file references for. + pub region_ids: Vec, +} + +impl Display for GetFileRefs { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "GetFileRefs(region_ids={:?})", self.region_ids) + } +} + +/// Instruction to trigger garbage collection for a region. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct GcRegions { + /// The region ID to perform GC on. + pub regions: Vec, + /// The file references manifest containing temporary file references. + pub file_refs_manifest: FileRefsManifest, + /// Whether to perform a full file listing to find orphan files. + pub full_file_listing: bool, +} + +impl Display for GcRegions { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "GcRegion(regions={:?}, file_refs_count={}, full_file_listing={})", + self.regions, + self.file_refs_manifest.file_refs.len(), + self.full_file_listing + ) + } +} + +/// Reply for GetFileRefs instruction. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct GetFileRefsReply { + /// The file references manifest. + pub file_refs_manifest: FileRefsManifest, + /// Whether the operation was successful. + pub success: bool, + /// Error message if any. + pub error: Option, +} + +impl Display for GetFileRefsReply { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "GetFileRefsReply(success={}, file_refs_count={}, error={:?})", + self.success, + self.file_refs_manifest.file_refs.len(), + self.error + ) + } +} + +/// Reply for GC instruction. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct GcRegionsReply { + pub result: Result, +} + +impl Display for GcRegionsReply { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "GcReply(result={})", + match &self.result { + Ok(report) => format!( + "GcReport(deleted_files_count={}, need_retry_regions_count={})", + report.deleted_files.len(), + report.need_retry_regions.len() + ), + Err(err) => format!("Err({})", err), + } + ) + } +} + #[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)] pub enum Instruction { /// Opens regions. @@ -437,6 +519,10 @@ pub enum Instruction { InvalidateCaches(Vec), /// Flushes regions. FlushRegions(FlushRegions), + /// Gets file references for regions. + GetFileRefs(GetFileRefs), + /// Triggers garbage collection for a region. + GcRegions(GcRegions), } impl Instruction { @@ -479,6 +565,20 @@ impl Instruction { _ => None, } } + + pub fn into_get_file_refs(self) -> Option { + match self { + Self::GetFileRefs(get_file_refs) => Some(get_file_refs), + _ => None, + } + } + + pub fn into_gc_regions(self) -> Option { + match self { + Self::GcRegions(gc_regions) => Some(gc_regions), + _ => None, + } + } } /// The reply of [UpgradeRegion]. @@ -549,6 +649,8 @@ pub enum InstructionReply { )] DowngradeRegions(DowngradeRegionsReply), FlushRegions(FlushRegionReply), + GetFileRefs(GetFileRefsReply), + GcRegions(GcRegionsReply), } impl Display for InstructionReply { @@ -561,6 +663,8 @@ impl Display for InstructionReply { write!(f, "InstructionReply::DowngradeRegions({:?})", reply) } Self::FlushRegions(reply) => write!(f, "InstructionReply::FlushRegions({})", reply), + Self::GetFileRefs(reply) => write!(f, "InstructionReply::GetFileRefs({})", reply), + Self::GcRegions(reply) => write!(f, "InstructionReply::GcRegion({})", reply), } } } @@ -605,6 +709,10 @@ impl InstructionReply { #[cfg(test)] mod tests { + use std::collections::HashSet; + + use store_api::storage::FileId; + use super::*; #[test] @@ -903,4 +1011,30 @@ mod tests { _ => panic!("Expected FlushRegions instruction"), } } + + #[test] + fn test_serialize_get_file_refs_instruction_reply() { + let mut manifest = FileRefsManifest::default(); + let r0 = RegionId::new(1024, 1); + let r1 = RegionId::new(1024, 2); + manifest + .file_refs + .insert(r0, HashSet::from([FileId::random()])); + manifest + .file_refs + .insert(r1, HashSet::from([FileId::random()])); + manifest.manifest_version.insert(r0, 10); + manifest.manifest_version.insert(r1, 20); + + let instruction_reply = InstructionReply::GetFileRefs(GetFileRefsReply { + file_refs_manifest: manifest, + success: true, + error: None, + }); + + let serialized = serde_json::to_string(&instruction_reply).unwrap(); + let deserialized = serde_json::from_str(&serialized).unwrap(); + + assert_eq!(instruction_reply, deserialized); + } } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index eda483a1e2..74bddbaede 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -322,6 +322,21 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to run gc for region {}", region_id))] + GcMitoEngine { + region_id: RegionId, + source: mito2::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Invalid arguments for GC: {}", msg))] + InvalidGcArgs { + msg: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to list SST entries from storage"))] ListStorageSsts { #[snafu(implicit)] @@ -446,9 +461,11 @@ impl ErrorExt for Error { AsyncTaskExecute { source, .. } => source.status_code(), - CreateDir { .. } | RemoveDir { .. } | ShutdownInstance { .. } | DataFusion { .. } => { - StatusCode::Internal - } + CreateDir { .. } + | RemoveDir { .. } + | ShutdownInstance { .. } + | DataFusion { .. } + | InvalidGcArgs { .. } => StatusCode::Internal, RegionNotFound { .. } => StatusCode::RegionNotFound, RegionNotReady { .. } => StatusCode::RegionNotReady, @@ -466,7 +483,7 @@ impl ErrorExt for Error { StopRegionEngine { source, .. } => source.status_code(), FindLogicalRegions { source, .. } => source.status_code(), - BuildMitoEngine { source, .. } => source.status_code(), + BuildMitoEngine { source, .. } | GcMitoEngine { source, .. } => source.status_code(), BuildMetricEngine { source, .. } => source.status_code(), ListStorageSsts { source, .. } => source.status_code(), ConcurrentQueryLimiterClosed { .. } | ConcurrentQueryLimiterTimeout { .. } => { diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index 607e031b43..33ba648830 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -36,14 +36,14 @@ use common_workload::DatanodeWorkloadType; use meta_client::MetaClientRef; use meta_client::client::{HeartbeatSender, MetaClient}; use servers::addrs; -use snafu::ResultExt; +use snafu::{OptionExt as _, ResultExt}; use tokio::sync::{Notify, mpsc}; use tokio::time::Instant; use self::handler::RegionHeartbeatResponseHandler; use crate::alive_keeper::{CountdownTaskHandlerExtRef, RegionAliveKeeper}; use crate::config::DatanodeOptions; -use crate::error::{self, MetaClientInitSnafu, Result}; +use crate::error::{self, MetaClientInitSnafu, RegionEngineNotFoundSnafu, Result}; use crate::event_listener::RegionServerEventReceiver; use crate::metrics::{self, HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT}; use crate::region_server::RegionServer; @@ -242,12 +242,18 @@ impl HeartbeatTask { let total_cpu_millicores = self.resource_stat.get_total_cpu_millicores(); let total_memory_bytes = self.resource_stat.get_total_memory_bytes(); let resource_stat = self.resource_stat.clone(); + let gc_limiter = self + .region_server + .mito_engine() + .context(RegionEngineNotFoundSnafu { name: "mito" })? + .gc_limiter(); common_runtime::spawn_hb(async move { let sleep = tokio::time::sleep(Duration::from_millis(0)); tokio::pin!(sleep); let build_info = common_version::build_info(); + let heartbeat_request = HeartbeatRequest { peer: self_peer, node_epoch, @@ -283,8 +289,13 @@ impl HeartbeatTask { if let Some(message) = message { match outgoing_message_to_mailbox_message(message) { Ok(message) => { + let mut extensions = heartbeat_request.extensions.clone(); + let gc_stat = gc_limiter.gc_stat(); + gc_stat.into_extensions(&mut extensions); + let req = HeartbeatRequest { mailbox_message: Some(message), + extensions, ..heartbeat_request.clone() }; HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc(); @@ -305,10 +316,16 @@ impl HeartbeatTask { let topic_stats = region_server_clone.topic_stats(); let now = Instant::now(); let duration_since_epoch = (now - epoch).as_millis() as u64; + + let mut extensions = heartbeat_request.extensions.clone(); + let gc_stat = gc_limiter.gc_stat(); + gc_stat.into_extensions(&mut extensions); + let mut req = HeartbeatRequest { region_stats, topic_stats, duration_since_epoch, + extensions, ..heartbeat_request.clone() }; diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index 8566f8806c..8573314b82 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -20,16 +20,21 @@ use common_meta::heartbeat::handler::{ use common_meta::instruction::{Instruction, InstructionReply}; use common_telemetry::error; use snafu::OptionExt; +use store_api::storage::GcReport; mod close_region; mod downgrade_region; +mod file_ref; mod flush_region; +mod gc_worker; mod open_region; mod upgrade_region; use crate::heartbeat::handler::close_region::CloseRegionsHandler; use crate::heartbeat::handler::downgrade_region::DowngradeRegionsHandler; +use crate::heartbeat::handler::file_ref::GetFileRefsHandler; use crate::heartbeat::handler::flush_region::FlushRegionsHandler; +use crate::heartbeat::handler::gc_worker::GcRegionsHandler; use crate::heartbeat::handler::open_region::OpenRegionsHandler; use crate::heartbeat::handler::upgrade_region::UpgradeRegionsHandler; use crate::heartbeat::task_tracker::TaskTracker; @@ -43,6 +48,7 @@ pub struct RegionHeartbeatResponseHandler { downgrade_tasks: TaskTracker<()>, flush_tasks: TaskTracker<()>, open_region_parallelism: usize, + gc_tasks: TaskTracker, } #[async_trait::async_trait] @@ -61,6 +67,7 @@ pub struct HandlerContext { catchup_tasks: TaskTracker<()>, downgrade_tasks: TaskTracker<()>, flush_tasks: TaskTracker<()>, + gc_tasks: TaskTracker, } impl HandlerContext { @@ -71,6 +78,7 @@ impl HandlerContext { catchup_tasks: TaskTracker::new(), downgrade_tasks: TaskTracker::new(), flush_tasks: TaskTracker::new(), + gc_tasks: TaskTracker::new(), } } } @@ -85,6 +93,7 @@ impl RegionHeartbeatResponseHandler { flush_tasks: TaskTracker::new(), // Default to half of the number of CPUs. open_region_parallelism: (num_cpus::get() / 2).max(1), + gc_tasks: TaskTracker::new(), } } @@ -106,6 +115,8 @@ impl RegionHeartbeatResponseHandler { Instruction::FlushRegions(_) => Ok(Box::new(FlushRegionsHandler.into())), Instruction::DowngradeRegions(_) => Ok(Box::new(DowngradeRegionsHandler.into())), Instruction::UpgradeRegion(_) => Ok(Box::new(UpgradeRegionsHandler.into())), + Instruction::GetFileRefs(_) => Ok(Box::new(GetFileRefsHandler.into())), + Instruction::GcRegions(_) => Ok(Box::new(GcRegionsHandler.into())), Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(), } } @@ -118,6 +129,8 @@ pub enum InstructionHandlers { FlushRegions(FlushRegionsHandler), DowngradeRegions(DowngradeRegionsHandler), UpgradeRegions(UpgradeRegionsHandler), + GetFileRefs(GetFileRefsHandler), + GcRegions(GcRegionsHandler), } macro_rules! impl_from_handler { @@ -137,7 +150,9 @@ impl_from_handler!( OpenRegionsHandler => OpenRegions, FlushRegionsHandler => FlushRegions, DowngradeRegionsHandler => DowngradeRegions, - UpgradeRegionsHandler => UpgradeRegions + UpgradeRegionsHandler => UpgradeRegions, + GetFileRefsHandler => GetFileRefs, + GcRegionsHandler => GcRegions ); macro_rules! dispatch_instr { @@ -180,6 +195,8 @@ dispatch_instr!( FlushRegions => FlushRegions, DowngradeRegions => DowngradeRegions, UpgradeRegion => UpgradeRegions, + GetFileRefs => GetFileRefs, + GcRegions => GcRegions, ); #[async_trait] @@ -202,6 +219,7 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler { let catchup_tasks = self.catchup_tasks.clone(); let downgrade_tasks = self.downgrade_tasks.clone(); let flush_tasks = self.flush_tasks.clone(); + let gc_tasks = self.gc_tasks.clone(); let handler = self.build_handler(&instruction)?; let _handle = common_runtime::spawn_global(async move { let reply = handler @@ -211,6 +229,7 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler { catchup_tasks, downgrade_tasks, flush_tasks, + gc_tasks, }, instruction, ) diff --git a/src/datanode/src/heartbeat/handler/file_ref.rs b/src/datanode/src/heartbeat/handler/file_ref.rs new file mode 100644 index 0000000000..ccad7922b5 --- /dev/null +++ b/src/datanode/src/heartbeat/handler/file_ref.rs @@ -0,0 +1,62 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_error::ext::ErrorExt; +use common_meta::instruction::{GetFileRefs, GetFileRefsReply, InstructionReply}; +use store_api::storage::FileRefsManifest; + +use crate::heartbeat::handler::{HandlerContext, InstructionHandler}; + +pub struct GetFileRefsHandler; + +#[async_trait::async_trait] +impl InstructionHandler for GetFileRefsHandler { + type Instruction = GetFileRefs; + + async fn handle( + &self, + ctx: &HandlerContext, + get_file_refs: Self::Instruction, + ) -> Option { + let region_server = &ctx.region_server; + + // Get the MitoEngine + let Some(mito_engine) = region_server.mito_engine() else { + return Some(InstructionReply::GetFileRefs(GetFileRefsReply { + file_refs_manifest: FileRefsManifest::default(), + success: false, + error: Some("MitoEngine not found".to_string()), + })); + }; + + match mito_engine + .get_snapshot_of_unmanifested_refs(get_file_refs.region_ids) + .await + { + Ok(all_file_refs) => { + // Return the file references + Some(InstructionReply::GetFileRefs(GetFileRefsReply { + file_refs_manifest: all_file_refs, + success: true, + error: None, + })) + } + Err(e) => Some(InstructionReply::GetFileRefs(GetFileRefsReply { + file_refs_manifest: FileRefsManifest::default(), + success: false, + error: Some(format!("Failed to get file refs: {}", e.output_msg())), + })), + } + } +} diff --git a/src/datanode/src/heartbeat/handler/gc_worker.rs b/src/datanode/src/heartbeat/handler/gc_worker.rs new file mode 100644 index 0000000000..75b0005e93 --- /dev/null +++ b/src/datanode/src/heartbeat/handler/gc_worker.rs @@ -0,0 +1,156 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_meta::instruction::{GcRegions, GcRegionsReply, InstructionReply}; +use common_telemetry::{debug, warn}; +use mito2::gc::LocalGcWorker; +use snafu::{OptionExt, ResultExt}; +use store_api::storage::{FileRefsManifest, RegionId}; + +use crate::error::{GcMitoEngineSnafu, InvalidGcArgsSnafu, Result, UnexpectedSnafu}; +use crate::heartbeat::handler::{HandlerContext, InstructionHandler}; + +pub struct GcRegionsHandler; + +#[async_trait::async_trait] +impl InstructionHandler for GcRegionsHandler { + type Instruction = GcRegions; + + async fn handle( + &self, + ctx: &HandlerContext, + gc_regions: Self::Instruction, + ) -> Option { + let region_ids = gc_regions.regions.clone(); + debug!("Received gc regions instruction: {:?}", region_ids); + + let is_same_table = region_ids.windows(2).all(|w| { + let t1 = w[0].table_id(); + let t2 = w[1].table_id(); + t1 == t2 + }); + if !is_same_table { + return Some(InstructionReply::GcRegions(GcRegionsReply { + result: Err(format!( + "Regions to GC should belong to the same table, found: {:?}", + region_ids + )), + })); + } + + let (region_id, gc_worker) = match self + .create_gc_worker( + ctx, + region_ids, + &gc_regions.file_refs_manifest, + gc_regions.full_file_listing, + ) + .await + { + Ok(worker) => worker, + Err(e) => { + return Some(InstructionReply::GcRegions(GcRegionsReply { + result: Err(format!("Failed to create GC worker: {}", e)), + })); + } + }; + + let register_result = ctx + .gc_tasks + .try_register( + region_id, + Box::pin(async move { + debug!("Starting gc worker for region {}", region_id); + let report = gc_worker + .run() + .await + .context(GcMitoEngineSnafu { region_id })?; + debug!("Gc worker for region {} finished", region_id); + Ok(report) + }), + ) + .await; + if register_result.is_busy() { + warn!("Another gc task is running for the region: {region_id}"); + return Some(InstructionReply::GcRegions(GcRegionsReply { + result: Err(format!( + "Another gc task is running for the region: {region_id}" + )), + })); + } + let mut watcher = register_result.into_watcher(); + let result = ctx.gc_tasks.wait_until_finish(&mut watcher).await; + match result { + Ok(report) => Some(InstructionReply::GcRegions(GcRegionsReply { + result: Ok(report), + })), + Err(err) => Some(InstructionReply::GcRegions(GcRegionsReply { + result: Err(format!("{err:?}")), + })), + } + } +} + +impl GcRegionsHandler { + async fn create_gc_worker( + &self, + ctx: &HandlerContext, + mut region_ids: Vec, + file_ref_manifest: &FileRefsManifest, + full_file_listing: bool, + ) -> Result<(RegionId, LocalGcWorker)> { + // always use the smallest region id on datanode as the target region id + region_ids.sort_by_key(|r| r.region_number()); + let mito_engine = ctx + .region_server + .mito_engine() + .with_context(|| UnexpectedSnafu { + violated: "MitoEngine not found".to_string(), + })?; + let region_id = *region_ids.first().with_context(|| UnexpectedSnafu { + violated: "No region ids provided".to_string(), + })?; + + let mito_config = mito_engine.mito_config(); + + // Find the access layer from one of the regions that exists on this datanode + let access_layer = region_ids + .iter() + .find_map(|rid| mito_engine.find_region(*rid)) + .with_context(|| InvalidGcArgsSnafu { + msg: format!( + "None of the regions is on current datanode:{:?}", + region_ids + ), + })? + .access_layer(); + + let cache_manager = mito_engine.cache_manager(); + + let gc_worker = LocalGcWorker::try_new( + access_layer.clone(), + Some(cache_manager), + region_ids.into_iter().collect(), + Default::default(), + mito_config.clone().into(), + file_ref_manifest.clone(), + &mito_engine.gc_limiter(), + full_file_listing, + ) + .await + .context(GcMitoEngineSnafu { region_id })?; + + Ok((region_id, gc_worker)) + } +} diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 341ee9442c..70373ca10c 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -158,6 +158,27 @@ impl RegionServer { } } + /// Gets the MitoEngine if it's registered. + pub fn mito_engine(&self) -> Option { + if let Some(mito) = self.inner.mito_engine.read().unwrap().clone() { + Some(mito) + } else { + self.inner + .engines + .read() + .unwrap() + .get(MITO_ENGINE_NAME) + .cloned() + .and_then(|e| { + let mito = e.as_any().downcast_ref::().cloned(); + if mito.is_none() { + warn!("Mito engine not found in region server engines"); + } + mito + }) + } + } + #[tracing::instrument(skip_all)] pub async fn handle_batch_open_requests( &self, @@ -676,14 +697,14 @@ struct RegionServerInner { runtime: Runtime, event_listener: RegionServerEventListenerRef, table_provider_factory: TableProviderFactoryRef, - // The number of queries allowed to be executed at the same time. - // Act as last line of defense on datanode to prevent query overloading. + /// The number of queries allowed to be executed at the same time. + /// Act as last line of defense on datanode to prevent query overloading. parallelism: Option, - // The topic stats reporter. + /// The topic stats reporter. topic_stats_reporter: RwLock>>, - // HACK(zhongzc): Direct MitoEngine handle for diagnostics. This couples the - // server with a concrete engine; acceptable for now to fetch Mito-specific - // info (e.g., list SSTs). Consider a diagnostics trait later. + /// HACK(zhongzc): Direct MitoEngine handle for diagnostics. This couples the + /// server with a concrete engine; acceptable for now to fetch Mito-specific + /// info (e.g., list SSTs). Consider a diagnostics trait later. mito_engine: RwLock>, } diff --git a/src/meta-srv/src/service/admin/heartbeat.rs b/src/meta-srv/src/service/admin/heartbeat.rs index cb13764d30..35ada0d3ae 100644 --- a/src/meta-srv/src/service/admin/heartbeat.rs +++ b/src/meta-srv/src/service/admin/heartbeat.rs @@ -254,7 +254,7 @@ mod tests { assert_eq!(status, http::StatusCode::OK); assert_eq!( body, - "[[{\"timestamp_millis\":3,\"id\":0,\"addr\":\"127.0.0.1:3001\",\"rcus\":0,\"wcus\":0,\"region_num\":0,\"region_stats\":[],\"topic_stats\":[],\"node_epoch\":0,\"datanode_workloads\":{\"types\":[]}}]]" + "[[{\"timestamp_millis\":3,\"id\":0,\"addr\":\"127.0.0.1:3001\",\"rcus\":0,\"wcus\":0,\"region_num\":0,\"region_stats\":[],\"topic_stats\":[],\"node_epoch\":0,\"datanode_workloads\":{\"types\":[]},\"gc_stat\":null}]]" ); } } diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index edf0709960..e76a8dbe19 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -25,6 +25,7 @@ use serde::{Deserialize, Serialize}; use serde_with::serde_as; use crate::error::Result; +use crate::gc::GcConfig; use crate::memtable::MemtableConfig; use crate::sst::DEFAULT_WRITE_BUFFER_SIZE; @@ -148,6 +149,8 @@ pub struct MitoConfig { /// Whether to enable experimental flat format as the default format. /// When enabled, forces using BulkMemtable and BulkMemtableBuilder. pub default_experimental_flat_format: bool, + + pub gc: GcConfig, } impl Default for MitoConfig { @@ -186,6 +189,7 @@ impl Default for MitoConfig { memtable: MemtableConfig::default(), min_compaction_interval: Duration::from_secs(0), default_experimental_flat_format: false, + gc: GcConfig::default(), }; // Adjust buffer and cache size according to system memory if we can. diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 3fb3a8abd8..73cb930f77 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -102,7 +102,7 @@ use store_api::region_engine::{ }; use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest}; use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry}; -use store_api::storage::{FileId, RegionId, ScanRequest, SequenceNumber}; +use store_api::storage::{FileId, FileRefsManifest, RegionId, ScanRequest, SequenceNumber}; use tokio::sync::{Semaphore, oneshot}; use crate::access_layer::RegionFilePathFactory; @@ -115,6 +115,7 @@ use crate::error::{ }; #[cfg(feature = "enterprise")] use crate::extension::BoxedExtensionRangeProviderFactory; +use crate::gc::GcLimiterRef; use crate::manifest::action::RegionEdit; use crate::memtable::MemtableStats; use crate::metrics::HANDLE_REQUEST_ELAPSED; @@ -261,6 +262,33 @@ impl MitoEngine { self.inner.workers.file_ref_manager() } + pub fn gc_limiter(&self) -> GcLimiterRef { + self.inner.workers.gc_limiter() + } + + /// Get all tmp ref files for given region ids, excluding files that's already in manifest. + pub async fn get_snapshot_of_unmanifested_refs( + &self, + region_ids: impl IntoIterator, + ) -> Result { + let file_ref_mgr = self.file_ref_manager(); + + let region_ids = region_ids.into_iter().collect::>(); + + // Convert region IDs to MitoRegionRef objects, error if any region doesn't exist + let regions: Vec = region_ids + .into_iter() + .map(|region_id| { + self.find_region(region_id) + .with_context(|| RegionNotFoundSnafu { region_id }) + }) + .collect::>()?; + + file_ref_mgr + .get_snapshot_of_unmanifested_refs(regions) + .await + } + /// Returns true if the specific region exists. pub fn is_region_exists(&self, region_id: RegionId) -> bool { self.inner.workers.is_region_exists(region_id) @@ -357,7 +385,7 @@ impl MitoEngine { self.find_region(id) } - pub(crate) fn find_region(&self, region_id: RegionId) -> Option { + pub fn find_region(&self, region_id: RegionId) -> Option { self.inner.workers.get_region(region_id) } diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index ad6d7c7caa..2a6fc855bc 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -1121,6 +1121,12 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("GC job permit exhausted"))] + TooManyGcJobs { + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -1291,7 +1297,7 @@ impl ErrorExt for Error { InconsistentTimestampLength { .. } => StatusCode::InvalidArguments, - TooManyFilesToRead { .. } => StatusCode::RateLimited, + TooManyFilesToRead { .. } | TooManyGcJobs { .. } => StatusCode::RateLimited, } } diff --git a/src/mito2/src/gc.rs b/src/mito2/src/gc.rs index e4d384d0f9..f7cd266eb4 100644 --- a/src/mito2/src/gc.rs +++ b/src/mito2/src/gc.rs @@ -22,14 +22,17 @@ //! use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; +use std::sync::Arc; use std::time::Duration; -use common_telemetry::{error, info, warn}; +use common_meta::datanode::GcStat; +use common_telemetry::{debug, error, info, warn}; use common_time::Timestamp; use object_store::{Entry, Lister}; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt as _, ensure}; -use store_api::storage::{FileId, RegionId}; +use store_api::storage::{FileId, FileRefsManifest, GcReport, RegionId}; +use tokio::sync::{OwnedSemaphorePermit, TryAcquireError}; use tokio_stream::StreamExt; use crate::access_layer::AccessLayerRef; @@ -37,26 +40,64 @@ use crate::cache::CacheManagerRef; use crate::config::MitoConfig; use crate::error::{ DurationOutOfRangeSnafu, EmptyRegionDirSnafu, JoinSnafu, OpenDalSnafu, RegionNotFoundSnafu, - Result, UnexpectedSnafu, + Result, TooManyGcJobsSnafu, UnexpectedSnafu, }; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, RemoveFileOptions}; use crate::manifest::storage::manifest_compress_type; -use crate::metrics::GC_FILE_CNT; +use crate::metrics::GC_DEL_FILE_CNT; use crate::region::opener::new_manifest_dir; use crate::sst::file::delete_files; -use crate::sst::file_ref::TableFileRefsManifest; use crate::sst::location::{self, region_dir_from_table_dir}; -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -pub struct GcReport { - /// deleted files per region - pub deleted_files: HashMap>, - /// Regions that need retry in next gc round, usually because their tmp ref files are outdated - pub need_retry_regions: HashSet, +/// Limit the amount of concurrent GC jobs on the datanode +pub struct GcLimiter { + pub gc_job_limit: Arc, + gc_concurrency: usize, } -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -pub struct FileGcOption { +pub type GcLimiterRef = Arc; + +impl GcLimiter { + pub fn new(gc_concurrency: usize) -> Self { + Self { + gc_job_limit: Arc::new(tokio::sync::Semaphore::new(gc_concurrency)), + gc_concurrency, + } + } + + pub fn running_gc_tasks(&self) -> u32 { + (self.gc_concurrency - self.gc_job_limit.available_permits()) as u32 + } + + pub fn gc_concurrency(&self) -> u32 { + self.gc_concurrency as u32 + } + + pub fn gc_stat(&self) -> GcStat { + GcStat::new(self.running_gc_tasks(), self.gc_concurrency()) + } + + /// Try to acquire a permit for a GC job. + /// + /// If no permit is available, returns an `TooManyGcJobs` error. + pub fn permit(&self) -> Result { + self.gc_job_limit + .clone() + .try_acquire_owned() + .map_err(|e| match e { + TryAcquireError::Closed => UnexpectedSnafu { + reason: format!("Failed to acquire gc permit: {e}"), + } + .build(), + TryAcquireError::NoPermits => TooManyGcJobsSnafu {}.build(), + }) + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct GcConfig { + /// Whether GC is enabled. + pub enable: bool, /// Lingering time before deleting files. /// Should be long enough to allow long running queries to finish. /// @@ -73,16 +114,22 @@ pub struct FileGcOption { /// Maximum concurrent list operations per GC job. /// This is used to limit the number of concurrent listing operations and speed up listing. pub max_concurrent_lister_per_gc_job: usize, + /// Maximum concurrent GC jobs. + /// This is used to limit the number of concurrent GC jobs running on the datanode + /// to prevent too many concurrent GC jobs from overwhelming the datanode. + pub max_concurrent_gc_job: usize, } -impl Default for FileGcOption { +impl Default for GcConfig { fn default() -> Self { Self { + enable: false, // expect long running queries to be finished within a reasonable time lingering_time: Duration::from_secs(60 * 5), // 6 hours, for unknown expel time, which is when this file get removed from manifest, it should rarely happen, can keep it longer unknown_file_lingering_time: Duration::from_secs(60 * 60 * 6), max_concurrent_lister_per_gc_job: 32, + max_concurrent_gc_job: 4, } } } @@ -92,13 +139,23 @@ pub struct LocalGcWorker { pub(crate) cache_manager: Option, pub(crate) manifest_mgrs: HashMap, /// Lingering time before deleting files. - pub(crate) opt: FileGcOption, + pub(crate) opt: GcConfig, pub(crate) manifest_open_config: ManifestOpenConfig, /// Tmp ref files manifest, used to determine which files are still in use by ongoing queries. /// /// Also contains manifest versions of regions when the tmp ref files are generated. /// Used to determine whether the tmp ref files are outdated. - pub(crate) file_ref_manifest: TableFileRefsManifest, + pub(crate) file_ref_manifest: FileRefsManifest, + _permit: OwnedSemaphorePermit, + /// Whether to perform full file listing during GC. + /// When set to false, GC will only delete files that are tracked in the manifest's removed_files, + /// which can significantly improve performance by avoiding expensive list operations. + /// When set to true, GC will perform a full listing to find and delete orphan files + /// (files not tracked in the manifest). + /// + /// Set to false for regular GC operations to optimize performance. + /// Set to true periodically or when you need to clean up orphan files. + pub full_file_listing: bool, } pub struct ManifestOpenConfig { @@ -125,13 +182,16 @@ impl LocalGcWorker { /// Create a new LocalGcWorker, with `regions_to_gc` regions to GC. /// The regions are specified by their `RegionId` and should all belong to the same table. /// + #[allow(clippy::too_many_arguments)] pub async fn try_new( access_layer: AccessLayerRef, cache_manager: Option, regions_to_gc: BTreeSet, - opt: FileGcOption, + opt: GcConfig, manifest_open_config: ManifestOpenConfig, - file_ref_manifest: TableFileRefsManifest, + file_ref_manifest: FileRefsManifest, + limiter: &GcLimiterRef, + full_file_listing: bool, ) -> Result { let table_id = regions_to_gc .first() @@ -139,6 +199,7 @@ impl LocalGcWorker { reason: "Expect at least one region, found none", })? .table_id(); + let permit = limiter.permit()?; let mut zelf = Self { access_layer, cache_manager, @@ -146,6 +207,8 @@ impl LocalGcWorker { opt, manifest_open_config, file_ref_manifest, + _permit: permit, + full_file_listing, }; // dedup just in case @@ -193,15 +256,15 @@ impl LocalGcWorker { // TODO(discord9): verify manifest version before reading tmp ref files let mut tmp_ref_files = HashMap::new(); - for file_ref in &self.file_ref_manifest.file_refs { - if outdated_regions.contains(&file_ref.region_id) { + for (region_id, file_refs) in &self.file_ref_manifest.file_refs { + if outdated_regions.contains(region_id) { // skip outdated regions continue; } tmp_ref_files - .entry(file_ref.region_id) + .entry(*region_id) .or_insert_with(HashSet::new) - .insert(file_ref.file_id); + .extend(file_refs.clone()); } Ok(tmp_ref_files) @@ -220,14 +283,14 @@ impl LocalGcWorker { let mut deleted_files = HashMap::new(); let tmp_ref_files = self.read_tmp_ref_files(&mut outdated_regions).await?; for region_id in self.manifest_mgrs.keys() { - info!("Doing gc for region {}", region_id); + debug!("Doing gc for region {}", region_id); let tmp_ref_files = tmp_ref_files .get(region_id) .cloned() .unwrap_or_else(HashSet::new); let files = self.do_region_gc(*region_id, &tmp_ref_files).await?; deleted_files.insert(*region_id, files); - info!("Gc for region {} finished", region_id); + debug!("Gc for region {} finished", region_id); } info!( "LocalGcWorker finished after {} secs.", @@ -244,7 +307,7 @@ impl LocalGcWorker { impl LocalGcWorker { /// concurrency of listing files per region. /// This is used to limit the number of concurrent listing operations and speed up listing - pub const CONCURRENCY_LIST_PER_FILES: usize = 512; + pub const CONCURRENCY_LIST_PER_FILES: usize = 1024; /// Perform GC for the region. /// 1. Get all the removed files in delta manifest files and their expel times @@ -259,7 +322,7 @@ impl LocalGcWorker { region_id: RegionId, tmp_ref_files: &HashSet, ) -> Result> { - info!("Doing gc for region {}", region_id); + debug!("Doing gc for region {}", region_id); let manifest = self .manifest_mgrs .get(®ion_id) @@ -272,10 +335,10 @@ impl LocalGcWorker { if recently_removed_files.is_empty() { // no files to remove, skip - info!("No recently removed files to gc for region {}", region_id); + debug!("No recently removed files to gc for region {}", region_id); } - info!( + debug!( "Found {} recently removed files sets for region {}", recently_removed_files.len(), region_id @@ -291,27 +354,20 @@ impl LocalGcWorker { .chain(tmp_ref_files.clone().into_iter()) .collect(); - let true_tmp_ref_files = tmp_ref_files - .iter() - .filter(|f| !current_files.contains_key(f)) - .collect::>(); - - info!("True tmp ref files: {:?}", true_tmp_ref_files); - let unused_files = self .list_to_be_deleted_files(region_id, in_used, recently_removed_files, concurrency) .await?; let unused_len = unused_files.len(); - info!( + debug!( "Found {} unused files to delete for region {}", unused_len, region_id ); self.delete_files(region_id, &unused_files).await?; - info!( + debug!( "Successfully deleted {} unused files for region {}", unused_len, region_id ); @@ -329,7 +385,8 @@ impl LocalGcWorker { ) .await?; - GC_FILE_CNT.add(file_ids.len() as i64); + // FIXME(discord9): if files are already deleted before calling delete_files, the metric will be inaccurate, no clean way to fix it now + GC_DEL_FILE_CNT.add(file_ids.len() as i64); Ok(()) } @@ -491,7 +548,7 @@ impl LocalGcWorker { entries: Vec, in_use_filenames: &HashSet<&FileId>, may_linger_filenames: &HashSet<&FileId>, - all_files_appear_in_delta_manifests: &HashSet<&FileId>, + eligible_for_removal: &HashSet<&FileId>, unknown_file_may_linger_until: chrono::DateTime, ) -> (Vec, HashSet) { let mut all_unused_files_ready_for_delete = vec![]; @@ -515,7 +572,7 @@ impl LocalGcWorker { let should_delete = !in_use_filenames.contains(&file_id) && !may_linger_filenames.contains(&file_id) && { - if !all_files_appear_in_delta_manifests.contains(&file_id) { + if !eligible_for_removal.contains(&file_id) { // if the file's expel time is unknown(because not appear in delta manifest), we keep it for a while // using it's last modified time // notice unknown files use a different lingering time @@ -541,6 +598,11 @@ impl LocalGcWorker { /// Concurrently list unused files in the region dir /// because there may be a lot of files in the region dir /// and listing them may take a long time. + /// + /// When `full_file_listing` is false, this method will only delete files tracked in + /// `recently_removed_files` without performing expensive list operations, which significantly + /// improves performance. When `full_file_listing` is true, it performs a full listing to + /// find and delete orphan files. pub async fn list_to_be_deleted_files( &self, region_id: RegionId, @@ -548,6 +610,7 @@ impl LocalGcWorker { recently_removed_files: BTreeMap>, concurrency: usize, ) -> Result> { + let start = tokio::time::Instant::now(); let now = chrono::Utc::now(); let may_linger_until = now - chrono::Duration::from_std(self.opt.lingering_time).with_context(|_| { @@ -569,7 +632,7 @@ impl LocalGcWorker { let may_linger_files = recently_removed_files.split_off(&threshold); let may_linger_filenames = may_linger_files.values().flatten().collect::>(); - let all_files_appear_in_delta_manifests = recently_removed_files + let eligible_for_removal = recently_removed_files .values() .flatten() .collect::>(); @@ -577,23 +640,56 @@ impl LocalGcWorker { // in use filenames, include sst and index files let in_use_filenames = in_used.iter().collect::>(); + // When full_file_listing is false, skip expensive list operations and only delete + // files that are tracked in recently_removed_files + if !self.full_file_listing { + // Only delete files that: + // 1. Are in recently_removed_files (tracked in manifest) + // 2. Are not in use + // 3. Have passed the lingering time + let files_to_delete: Vec = eligible_for_removal + .iter() + .filter(|file_id| !in_use_filenames.contains(*file_id)) + .map(|&f| *f) + .collect(); + + info!( + "gc: fast mode (no full listing) cost {} secs for region {}, found {} files to delete from manifest", + start.elapsed().as_secs_f64(), + region_id, + files_to_delete.len() + ); + + return Ok(files_to_delete); + } + + // Full file listing mode: perform expensive list operations to find orphan files // Step 1: Create partitioned listers for concurrent processing let listers = self.partition_region_files(region_id, concurrency).await?; + let lister_cnt = listers.len(); // Step 2: Concurrently list all files in the region directory let all_entries = self.list_region_files_concurrent(listers).await?; + let cnt = all_entries.len(); + // Step 3: Filter files to determine which ones can be deleted let (all_unused_files_ready_for_delete, all_in_exist_linger_files) = self .filter_deletable_files( all_entries, &in_use_filenames, &may_linger_filenames, - &all_files_appear_in_delta_manifests, + &eligible_for_removal, unknown_file_may_linger_until, ); - info!("All in exist linger files: {:?}", all_in_exist_linger_files); + info!( + "gc: full listing mode cost {} secs using {lister_cnt} lister for {cnt} files in region {}, found {} unused files to delete", + start.elapsed().as_secs_f64(), + region_id, + all_unused_files_ready_for_delete.len() + ); + debug!("All in exist linger files: {:?}", all_in_exist_linger_files); Ok(all_unused_files_ready_for_delete) } diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index 0f923f60a6..fd8110b526 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -437,7 +437,7 @@ lazy_static! { "mito stalled write request in each worker", &[WORKER_LABEL] ).unwrap(); - /// Number of ref files per table + /// Number of ref files pub static ref GC_REF_FILE_CNT: IntGauge = register_int_gauge!( "greptime_gc_ref_file_count", "gc ref file count", @@ -458,9 +458,9 @@ lazy_static! { .unwrap(); /// Counter for the number of files deleted by the GC worker. - pub static ref GC_FILE_CNT: IntGauge = + pub static ref GC_DEL_FILE_CNT: IntGauge = register_int_gauge!( - "greptime_mito_gc_file_count", + "greptime_mito_gc_delete_file_count", "mito gc deleted file count", ).unwrap(); } diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index ee49da763e..f4a9deb9c6 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -565,6 +565,10 @@ impl MitoRegion { Ok(()) } + pub fn access_layer(&self) -> AccessLayerRef { + self.access_layer.clone() + } + /// Returns the SST entries of the region. pub async fn manifest_sst_entries(&self) -> Vec { let table_dir = self.table_dir(); diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs index c5197ea2fb..11f38ac1ad 100644 --- a/src/mito2/src/sst/file_purger.rs +++ b/src/mito2/src/sst/file_purger.rs @@ -162,6 +162,7 @@ impl FilePurger for ObjectStoreFilePurger { // notice that no matter whether the file is deleted or not, we need to remove the reference // because the file is no longer in use nonetheless. self.file_ref_manager.remove_file(&file_meta); + // TODO(discord9): consider impl a .tombstone file to reduce files needed to list } fn new_file(&self, file_meta: &FileMeta) { diff --git a/src/mito2/src/sst/file_ref.rs b/src/mito2/src/sst/file_ref.rs index de071b3f04..28f3e95f89 100644 --- a/src/mito2/src/sst/file_ref.rs +++ b/src/mito2/src/sst/file_ref.rs @@ -17,38 +17,23 @@ use std::sync::Arc; use common_telemetry::debug; use dashmap::{DashMap, Entry}; -use serde::{Deserialize, Serialize}; -use store_api::ManifestVersion; -use store_api::storage::{FileId, RegionId, TableId}; +use store_api::storage::{FileRef, FileRefsManifest, RegionId}; use crate::error::Result; use crate::metrics::GC_REF_FILE_CNT; -use crate::region::RegionMapRef; +use crate::region::MitoRegionRef; use crate::sst::file::FileMeta; -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -pub struct FileRef { - pub region_id: RegionId, - pub file_id: FileId, -} - -impl FileRef { - pub fn new(region_id: RegionId, file_id: FileId) -> Self { - Self { region_id, file_id } - } -} - -/// File references for a table. -/// It contains all files referenced by the table. +/// File references for a region. +/// It contains all files referenced by the region. #[derive(Debug, Clone, Default)] -pub struct TableFileRefs { +pub struct RegionFileRefs { /// (FileRef, Ref Count) meaning how many FileHandleInner is opened for this file. pub files: HashMap, } /// Manages all file references in one datanode. /// It keeps track of which files are referenced and group by table ids. -/// And periodically update the references to tmp file in object storage. /// This is useful for ensuring that files are not deleted while they are still in use by any /// query. #[derive(Debug)] @@ -56,33 +41,24 @@ pub struct FileReferenceManager { /// Datanode id. used to determine tmp ref file name. node_id: Option, /// TODO(discord9): use no hash hasher since table id is sequential. - files_per_table: DashMap, + files_per_region: DashMap, } pub type FileReferenceManagerRef = Arc; -/// The tmp file uploaded to object storage to record one table's file references. -#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] -pub struct TableFileRefsManifest { - pub file_refs: HashSet, - /// Manifest version when this manifest is read for it's files - pub manifest_version: HashMap, -} - impl FileReferenceManager { pub fn new(node_id: Option) -> Self { Self { node_id, - files_per_table: Default::default(), + files_per_region: Default::default(), } } - fn ref_file_set(&self, table_id: TableId) -> Option> { - let file_refs = if let Some(file_refs) = self.files_per_table.get(&table_id) { + fn ref_file_set(&self, region_id: RegionId) -> Option> { + let file_refs = if let Some(file_refs) = self.files_per_region.get(®ion_id) { file_refs.clone() } else { - // still return an empty manifest to indicate no files are referenced. - // and differentiate from error case where table_id not found. + // region id not found. return None; }; @@ -95,8 +71,8 @@ impl FileReferenceManager { let ref_file_set: HashSet = file_refs.files.keys().cloned().collect(); debug!( - "Get file refs for table {}, node {:?}, {} files", - table_id, + "Get file refs for region {}, node {:?}, {} files", + region_id, self.node_id, ref_file_set.len(), ); @@ -120,22 +96,19 @@ impl FileReferenceManager { #[allow(unused)] pub(crate) async fn get_snapshot_of_unmanifested_refs( &self, - table_id: TableId, - region_map: &RegionMapRef, - ) -> Result { - let Some(ref_files) = self.ref_file_set(table_id) else { - return Ok(Default::default()); - }; - let region_list = region_map.list_regions(); - let table_regions = region_list - .iter() - .filter(|r| r.region_id().table_id() == table_id) - .collect::>(); + regions: Vec, + ) -> Result { + let mut ref_files = HashMap::new(); + for region_id in regions.iter().map(|r| r.region_id()) { + if let Some(files) = self.ref_file_set(region_id) { + ref_files.insert(region_id, files); + } + } let mut in_manifest_files = HashSet::new(); let mut manifest_version = HashMap::new(); - for r in &table_regions { + for r in ®ions { let manifest = r.manifest_ctx.manifest().await; let files = manifest.files.keys().cloned().collect::>(); in_manifest_files.extend(files); @@ -144,11 +117,18 @@ impl FileReferenceManager { let ref_files_excluding_in_manifest = ref_files .iter() - .filter(|f| !in_manifest_files.contains(&f.file_id)) - .cloned() - .collect::>(); - - Ok(TableFileRefsManifest { + .map(|(r, f)| { + ( + *r, + f.iter() + .filter_map(|f| { + (!in_manifest_files.contains(&f.file_id)).then_some(f.file_id) + }) + .collect::>(), + ) + }) + .collect(); + Ok(FileRefsManifest { file_refs: ref_files_excluding_in_manifest, manifest_version, }) @@ -158,12 +138,12 @@ impl FileReferenceManager { /// Also records the access layer for the table if not exists. /// The access layer will be used to upload ref file to object storage. pub fn add_file(&self, file_meta: &FileMeta) { - let table_id = file_meta.region_id.table_id(); + let region_id = file_meta.region_id; let mut is_new = false; { let file_ref = FileRef::new(file_meta.region_id, file_meta.file_id); - self.files_per_table - .entry(table_id) + self.files_per_region + .entry(region_id) .and_modify(|refs| { refs.files .entry(file_ref.clone()) @@ -173,7 +153,7 @@ impl FileReferenceManager { 1 }); }) - .or_insert_with(|| TableFileRefs { + .or_insert_with(|| RegionFileRefs { files: HashMap::from_iter([(file_ref, 1)]), }); } @@ -185,14 +165,14 @@ impl FileReferenceManager { /// Removes a file reference. /// If the reference count reaches zero, the file reference will be removed from the manager. pub fn remove_file(&self, file_meta: &FileMeta) { - let table_id = file_meta.region_id.table_id(); + let region_id = file_meta.region_id; let file_ref = FileRef::new(file_meta.region_id, file_meta.file_id); let mut remove_table_entry = false; let mut remove_file_ref = false; let mut file_cnt = 0; - let table_ref = self.files_per_table.entry(table_id).and_modify(|refs| { + let region_ref = self.files_per_region.entry(region_id).and_modify(|refs| { let entry = refs.files.entry(file_ref.clone()).and_modify(|count| { if *count > 0 { *count -= 1; @@ -214,7 +194,7 @@ impl FileReferenceManager { } }); - if let Entry::Occupied(o) = table_ref + if let Entry::Occupied(o) = region_ref && remove_table_entry { o.remove_entry(); @@ -234,7 +214,7 @@ mod tests { use std::num::NonZeroU64; use smallvec::SmallVec; - use store_api::storage::RegionId; + use store_api::storage::{FileId, RegionId}; use super::*; use crate::sst::file::{FileMeta, FileTimeRange, IndexType, RegionFileId}; @@ -265,54 +245,69 @@ mod tests { file_ref_mgr.add_file(&file_meta); assert_eq!( - file_ref_mgr.files_per_table.get(&0).unwrap().files, + file_ref_mgr + .files_per_region + .get(&file_meta.region_id) + .unwrap() + .files, HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)]) ); file_ref_mgr.add_file(&file_meta); - let expected_table_ref_manifest = + let expected_region_ref_manifest = HashSet::from_iter([FileRef::new(file_meta.region_id, file_meta.file_id)]); assert_eq!( - file_ref_mgr.ref_file_set(0).unwrap(), - expected_table_ref_manifest + file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(), + expected_region_ref_manifest ); assert_eq!( - file_ref_mgr.files_per_table.get(&0).unwrap().files, + file_ref_mgr + .files_per_region + .get(&file_meta.region_id) + .unwrap() + .files, HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 2)]) ); assert_eq!( - file_ref_mgr.ref_file_set(0).unwrap(), - expected_table_ref_manifest + file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(), + expected_region_ref_manifest ); file_ref_mgr.remove_file(&file_meta); assert_eq!( - file_ref_mgr.files_per_table.get(&0).unwrap().files, + file_ref_mgr + .files_per_region + .get(&file_meta.region_id) + .unwrap() + .files, HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)]) ); assert_eq!( - file_ref_mgr.ref_file_set(0).unwrap(), - expected_table_ref_manifest + file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(), + expected_region_ref_manifest ); file_ref_mgr.remove_file(&file_meta); assert!( - file_ref_mgr.files_per_table.get(&0).is_none(), + file_ref_mgr + .files_per_region + .get(&file_meta.region_id) + .is_none(), "{:?}", - file_ref_mgr.files_per_table + file_ref_mgr.files_per_region ); assert!( - file_ref_mgr.ref_file_set(0).is_none(), + file_ref_mgr.ref_file_set(file_meta.region_id).is_none(), "{:?}", - file_ref_mgr.files_per_table + file_ref_mgr.files_per_region ); } } diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 322141fd1b..75aff36b52 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -58,6 +58,7 @@ use crate::compaction::CompactionScheduler; use crate::config::MitoConfig; use crate::error::{self, CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu}; use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef}; +use crate::gc::{GcLimiter, GcLimiterRef}; use crate::memtable::MemtableBuilderProvider; use crate::metrics::{REGION_COUNT, REQUEST_WAIT_TIME, WRITE_STALLING}; use crate::region::opener::PartitionExprFetcherRef; @@ -138,6 +139,8 @@ pub(crate) struct WorkerGroup { cache_manager: CacheManagerRef, /// File reference manager. file_ref_manager: FileReferenceManagerRef, + /// Gc limiter to limit concurrent gc jobs. + gc_limiter: GcLimiterRef, } impl WorkerGroup { @@ -196,6 +199,7 @@ impl WorkerGroup { .build(), ); let time_provider = Arc::new(StdTimeProvider); + let gc_limiter = Arc::new(GcLimiter::new(config.gc.max_concurrent_gc_job)); let workers = (0..config.num_workers) .map(|id| { @@ -234,6 +238,7 @@ impl WorkerGroup { purge_scheduler, cache_manager, file_ref_manager, + gc_limiter, }) } @@ -291,6 +296,10 @@ impl WorkerGroup { self.file_ref_manager.clone() } + pub(crate) fn gc_limiter(&self) -> GcLimiterRef { + self.gc_limiter.clone() + } + /// Get worker for specific `region_id`. pub(crate) fn worker(&self, region_id: RegionId) -> &RegionWorker { let index = region_id_to_index(region_id, self.workers.len()); @@ -361,6 +370,7 @@ impl WorkerGroup { .write_cache(write_cache) .build(), ); + let gc_limiter = Arc::new(GcLimiter::new(config.gc.max_concurrent_gc_job)); let workers = (0..config.num_workers) .map(|id| { WorkerStarter { @@ -398,6 +408,7 @@ impl WorkerGroup { purge_scheduler, cache_manager, file_ref_manager, + gc_limiter, }) } diff --git a/src/store-api/src/storage.rs b/src/store-api/src/storage.rs index 1df7a0aff6..2cafaf027c 100644 --- a/src/store-api/src/storage.rs +++ b/src/store-api/src/storage.rs @@ -26,6 +26,6 @@ pub use datatypes::schema::{ }; pub use self::descriptors::*; -pub use self::file::{FileId, ParseIdError}; +pub use self::file::{FileId, FileRef, FileRefsManifest, GcReport, ParseIdError}; pub use self::requests::{ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector}; pub use self::types::{SequenceNumber, SequenceRange}; diff --git a/src/store-api/src/storage/descriptors.rs b/src/store-api/src/storage/descriptors.rs index 84bdbdf7a8..ae4f617b88 100644 --- a/src/store-api/src/storage/descriptors.rs +++ b/src/store-api/src/storage/descriptors.rs @@ -48,7 +48,24 @@ pub const MAX_REGION_SEQ: u32 = REGION_SEQ_MASK; /// Region Number(32) /// ``` #[derive(Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] -pub struct RegionId(u64); +pub struct RegionId(#[serde(deserialize_with = "str_or_u64")] u64); + +/// FIXME(discord9): workaround for serde issue: https://github.com/serde-rs/json/issues/1254 +fn str_or_u64<'de, D>(deserializer: D) -> Result +where + D: serde::Deserializer<'de>, +{ + #[derive(Deserialize)] + #[serde(untagged)] + enum StrOrU64 { + U64(u64), + Str(String), + } + match StrOrU64::deserialize(deserializer)? { + StrOrU64::U64(v) => Ok(v), + StrOrU64::Str(s) => s.parse::().map_err(serde::de::Error::custom), + } +} impl RegionId { /// Construct a new [RegionId] from table id and region number. @@ -328,6 +345,13 @@ mod tests { assert_eq!(region_id, parsed); } + #[test] + fn test_region_id_from_str() { + let region_id_str = "\"8589934602\""; + let region_id: RegionId = serde_json::from_str(region_id_str).unwrap(); + assert_eq!(RegionId::new(2, 10), region_id); + } + #[test] fn test_retrieve_region_group_and_seq() { let region_id = RegionId::with_group_and_seq(111, 222, 333); diff --git a/src/store-api/src/storage/file.rs b/src/store-api/src/storage/file.rs index 6e2fa334e4..a028ec0401 100644 --- a/src/store-api/src/storage/file.rs +++ b/src/store-api/src/storage/file.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::{HashMap, HashSet}; use std::fmt; use std::fmt::Debug; use std::str::FromStr; @@ -20,6 +21,9 @@ use serde::{Deserialize, Serialize}; use snafu::{ResultExt, Snafu}; use uuid::Uuid; +use crate::ManifestVersion; +use crate::storage::RegionId; + #[derive(Debug, Snafu, PartialEq)] pub struct ParseIdError { source: uuid::Error, @@ -66,6 +70,60 @@ impl FromStr for FileId { } } +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct FileRef { + pub region_id: RegionId, + pub file_id: FileId, +} + +impl FileRef { + pub fn new(region_id: RegionId, file_id: FileId) -> Self { + Self { region_id, file_id } + } +} + +/// The tmp file manifest which record a table's file references. +/// Also record the manifest version when these tmp files are read. +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] +pub struct FileRefsManifest { + pub file_refs: HashMap>, + /// Manifest version when this manifest is read for it's files + pub manifest_version: HashMap, +} + +#[derive(Clone, Default, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct GcReport { + /// deleted files per region + pub deleted_files: HashMap>, + /// Regions that need retry in next gc round, usually because their tmp ref files are outdated + pub need_retry_regions: HashSet, +} + +impl GcReport { + pub fn new( + deleted_files: HashMap>, + need_retry_regions: HashSet, + ) -> Self { + Self { + deleted_files, + need_retry_regions, + } + } + + pub fn merge(&mut self, other: GcReport) { + for (region, files) in other.deleted_files { + let self_files = self.deleted_files.entry(region).or_default(); + let dedup: HashSet = HashSet::from_iter( + std::mem::take(self_files) + .into_iter() + .chain(files.iter().cloned()), + ); + *self_files = dedup.into_iter().collect(); + } + self.need_retry_regions.extend(other.need_retry_regions); + } +} + #[cfg(test)] mod tests { @@ -92,4 +150,19 @@ mod tests { let parsed = serde_json::from_str(&json).unwrap(); assert_eq!(id, parsed); } + + #[test] + fn test_file_refs_manifest_serialization() { + let mut manifest = FileRefsManifest::default(); + let r0 = RegionId::new(1024, 1); + let r1 = RegionId::new(1024, 2); + manifest.file_refs.insert(r0, [FileId::random()].into()); + manifest.file_refs.insert(r1, [FileId::random()].into()); + manifest.manifest_version.insert(r0, 10); + manifest.manifest_version.insert(r1, 20); + + let json = serde_json::to_string(&manifest).unwrap(); + let parsed: FileRefsManifest = serde_json::from_str(&json).unwrap(); + assert_eq!(manifest, parsed); + } } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 9113b356ae..bd193769ee 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1534,6 +1534,13 @@ mem_threshold_on_create = "auto" [region_engine.mito.memtable] type = "time_series" +[region_engine.mito.gc] +enable = false +lingering_time = "5m" +unknown_file_lingering_time = "6h" +max_concurrent_lister_per_gc_job = 32 +max_concurrent_gc_job = 4 + [[region_engine]] [region_engine.file]