mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
feat: gc worker heartbeat instruction (#7118)
again false by default test: config api refactor: per code review less info! even less info!! docs: gc regions instr refactor: grp by region id per code review per review error handling? test: fix todos aft rebase fix after refactor Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
@@ -25,8 +25,7 @@ use store_api::region_engine::{RegionRole, RegionStatistic};
|
||||
use store_api::storage::RegionId;
|
||||
use table::metadata::TableId;
|
||||
|
||||
use crate::error;
|
||||
use crate::error::Result;
|
||||
use crate::error::{self, DeserializeFromJsonSnafu, Result};
|
||||
use crate::heartbeat::utils::get_datanode_workloads;
|
||||
|
||||
const DATANODE_STAT_PREFIX: &str = "__meta_datanode_stat";
|
||||
@@ -66,10 +65,12 @@ pub struct Stat {
|
||||
pub node_epoch: u64,
|
||||
/// The datanode workloads.
|
||||
pub datanode_workloads: DatanodeWorkloads,
|
||||
/// The GC statistics of the datanode.
|
||||
pub gc_stat: Option<GcStat>,
|
||||
}
|
||||
|
||||
/// The statistics of a region.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct RegionStat {
|
||||
/// The region_id.
|
||||
pub id: RegionId,
|
||||
@@ -126,7 +127,7 @@ pub trait TopicStatsReporter: Send + Sync {
|
||||
fn reportable_topics(&mut self) -> Vec<TopicStat>;
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
|
||||
pub enum RegionManifestInfo {
|
||||
Mito {
|
||||
manifest_version: u64,
|
||||
@@ -222,11 +223,12 @@ impl TryFrom<&HeartbeatRequest> for Stat {
|
||||
node_epoch,
|
||||
node_workloads,
|
||||
topic_stats,
|
||||
extensions,
|
||||
..
|
||||
} = value;
|
||||
|
||||
match (header, peer) {
|
||||
(Some(_header), Some(peer)) => {
|
||||
(Some(header), Some(peer)) => {
|
||||
let region_stats = region_stats
|
||||
.iter()
|
||||
.map(RegionStat::from)
|
||||
@@ -234,6 +236,14 @@ impl TryFrom<&HeartbeatRequest> for Stat {
|
||||
let topic_stats = topic_stats.iter().map(TopicStat::from).collect::<Vec<_>>();
|
||||
|
||||
let datanode_workloads = get_datanode_workloads(node_workloads.as_ref());
|
||||
|
||||
let gc_stat = GcStat::from_extensions(extensions).map_err(|err| {
|
||||
common_telemetry::error!(
|
||||
"Failed to deserialize GcStat from extensions: {}",
|
||||
err
|
||||
);
|
||||
header.clone()
|
||||
})?;
|
||||
Ok(Self {
|
||||
timestamp_millis: time_util::current_time_millis(),
|
||||
// datanode id
|
||||
@@ -247,6 +257,7 @@ impl TryFrom<&HeartbeatRequest> for Stat {
|
||||
topic_stats,
|
||||
node_epoch: *node_epoch,
|
||||
datanode_workloads,
|
||||
gc_stat,
|
||||
})
|
||||
}
|
||||
(header, _) => Err(header.clone()),
|
||||
@@ -319,6 +330,43 @@ impl From<&api::v1::meta::TopicStat> for TopicStat {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
||||
pub struct GcStat {
|
||||
/// Number of GC tasks currently running on the datanode.
|
||||
pub running_gc_tasks: u32,
|
||||
/// The maximum number of concurrent GC tasks the datanode can handle.
|
||||
pub gc_concurrency: u32,
|
||||
}
|
||||
|
||||
impl GcStat {
|
||||
pub const GC_STAT_KEY: &str = "__gc_stat";
|
||||
|
||||
pub fn new(running_gc_tasks: u32, gc_concurrency: u32) -> Self {
|
||||
Self {
|
||||
running_gc_tasks,
|
||||
gc_concurrency,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn into_extensions(&self, extensions: &mut std::collections::HashMap<String, Vec<u8>>) {
|
||||
let bytes = serde_json::to_vec(self).unwrap_or_default();
|
||||
extensions.insert(Self::GC_STAT_KEY.to_string(), bytes);
|
||||
}
|
||||
|
||||
pub fn from_extensions(
|
||||
extensions: &std::collections::HashMap<String, Vec<u8>>,
|
||||
) -> Result<Option<Self>> {
|
||||
extensions
|
||||
.get(Self::GC_STAT_KEY)
|
||||
.map(|bytes| {
|
||||
serde_json::from_slice(bytes).with_context(|_| DeserializeFromJsonSnafu {
|
||||
input: String::from_utf8_lossy(bytes).to_string(),
|
||||
})
|
||||
})
|
||||
.transpose()
|
||||
}
|
||||
}
|
||||
|
||||
/// The key of the datanode stat in the memory store.
|
||||
///
|
||||
/// The format is `__meta_datanode_stat-0-{node_id}`.
|
||||
|
||||
@@ -17,7 +17,7 @@ use std::fmt::{Display, Formatter};
|
||||
use std::time::Duration;
|
||||
|
||||
use serde::{Deserialize, Deserializer, Serialize};
|
||||
use store_api::storage::{RegionId, RegionNumber};
|
||||
use store_api::storage::{FileRefsManifest, GcReport, RegionId, RegionNumber};
|
||||
use strum::Display;
|
||||
use table::metadata::TableId;
|
||||
use table::table_name::TableName;
|
||||
@@ -417,6 +417,88 @@ where
|
||||
})
|
||||
}
|
||||
|
||||
/// Instruction to get file references for specified regions.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct GetFileRefs {
|
||||
/// List of region IDs to get file references for.
|
||||
pub region_ids: Vec<RegionId>,
|
||||
}
|
||||
|
||||
impl Display for GetFileRefs {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "GetFileRefs(region_ids={:?})", self.region_ids)
|
||||
}
|
||||
}
|
||||
|
||||
/// Instruction to trigger garbage collection for a region.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct GcRegions {
|
||||
/// The region ID to perform GC on.
|
||||
pub regions: Vec<RegionId>,
|
||||
/// The file references manifest containing temporary file references.
|
||||
pub file_refs_manifest: FileRefsManifest,
|
||||
/// Whether to perform a full file listing to find orphan files.
|
||||
pub full_file_listing: bool,
|
||||
}
|
||||
|
||||
impl Display for GcRegions {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"GcRegion(regions={:?}, file_refs_count={}, full_file_listing={})",
|
||||
self.regions,
|
||||
self.file_refs_manifest.file_refs.len(),
|
||||
self.full_file_listing
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// Reply for GetFileRefs instruction.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct GetFileRefsReply {
|
||||
/// The file references manifest.
|
||||
pub file_refs_manifest: FileRefsManifest,
|
||||
/// Whether the operation was successful.
|
||||
pub success: bool,
|
||||
/// Error message if any.
|
||||
pub error: Option<String>,
|
||||
}
|
||||
|
||||
impl Display for GetFileRefsReply {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"GetFileRefsReply(success={}, file_refs_count={}, error={:?})",
|
||||
self.success,
|
||||
self.file_refs_manifest.file_refs.len(),
|
||||
self.error
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// Reply for GC instruction.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct GcRegionsReply {
|
||||
pub result: Result<GcReport, String>,
|
||||
}
|
||||
|
||||
impl Display for GcRegionsReply {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"GcReply(result={})",
|
||||
match &self.result {
|
||||
Ok(report) => format!(
|
||||
"GcReport(deleted_files_count={}, need_retry_regions_count={})",
|
||||
report.deleted_files.len(),
|
||||
report.need_retry_regions.len()
|
||||
),
|
||||
Err(err) => format!("Err({})", err),
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
|
||||
pub enum Instruction {
|
||||
/// Opens regions.
|
||||
@@ -437,6 +519,10 @@ pub enum Instruction {
|
||||
InvalidateCaches(Vec<CacheIdent>),
|
||||
/// Flushes regions.
|
||||
FlushRegions(FlushRegions),
|
||||
/// Gets file references for regions.
|
||||
GetFileRefs(GetFileRefs),
|
||||
/// Triggers garbage collection for a region.
|
||||
GcRegions(GcRegions),
|
||||
}
|
||||
|
||||
impl Instruction {
|
||||
@@ -479,6 +565,20 @@ impl Instruction {
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn into_get_file_refs(self) -> Option<GetFileRefs> {
|
||||
match self {
|
||||
Self::GetFileRefs(get_file_refs) => Some(get_file_refs),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn into_gc_regions(self) -> Option<GcRegions> {
|
||||
match self {
|
||||
Self::GcRegions(gc_regions) => Some(gc_regions),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The reply of [UpgradeRegion].
|
||||
@@ -549,6 +649,8 @@ pub enum InstructionReply {
|
||||
)]
|
||||
DowngradeRegions(DowngradeRegionsReply),
|
||||
FlushRegions(FlushRegionReply),
|
||||
GetFileRefs(GetFileRefsReply),
|
||||
GcRegions(GcRegionsReply),
|
||||
}
|
||||
|
||||
impl Display for InstructionReply {
|
||||
@@ -561,6 +663,8 @@ impl Display for InstructionReply {
|
||||
write!(f, "InstructionReply::DowngradeRegions({:?})", reply)
|
||||
}
|
||||
Self::FlushRegions(reply) => write!(f, "InstructionReply::FlushRegions({})", reply),
|
||||
Self::GetFileRefs(reply) => write!(f, "InstructionReply::GetFileRefs({})", reply),
|
||||
Self::GcRegions(reply) => write!(f, "InstructionReply::GcRegion({})", reply),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -605,6 +709,10 @@ impl InstructionReply {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashSet;
|
||||
|
||||
use store_api::storage::FileId;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
@@ -903,4 +1011,30 @@ mod tests {
|
||||
_ => panic!("Expected FlushRegions instruction"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_serialize_get_file_refs_instruction_reply() {
|
||||
let mut manifest = FileRefsManifest::default();
|
||||
let r0 = RegionId::new(1024, 1);
|
||||
let r1 = RegionId::new(1024, 2);
|
||||
manifest
|
||||
.file_refs
|
||||
.insert(r0, HashSet::from([FileId::random()]));
|
||||
manifest
|
||||
.file_refs
|
||||
.insert(r1, HashSet::from([FileId::random()]));
|
||||
manifest.manifest_version.insert(r0, 10);
|
||||
manifest.manifest_version.insert(r1, 20);
|
||||
|
||||
let instruction_reply = InstructionReply::GetFileRefs(GetFileRefsReply {
|
||||
file_refs_manifest: manifest,
|
||||
success: true,
|
||||
error: None,
|
||||
});
|
||||
|
||||
let serialized = serde_json::to_string(&instruction_reply).unwrap();
|
||||
let deserialized = serde_json::from_str(&serialized).unwrap();
|
||||
|
||||
assert_eq!(instruction_reply, deserialized);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -322,6 +322,21 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to run gc for region {}", region_id))]
|
||||
GcMitoEngine {
|
||||
region_id: RegionId,
|
||||
source: mito2::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid arguments for GC: {}", msg))]
|
||||
InvalidGcArgs {
|
||||
msg: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to list SST entries from storage"))]
|
||||
ListStorageSsts {
|
||||
#[snafu(implicit)]
|
||||
@@ -446,9 +461,11 @@ impl ErrorExt for Error {
|
||||
|
||||
AsyncTaskExecute { source, .. } => source.status_code(),
|
||||
|
||||
CreateDir { .. } | RemoveDir { .. } | ShutdownInstance { .. } | DataFusion { .. } => {
|
||||
StatusCode::Internal
|
||||
}
|
||||
CreateDir { .. }
|
||||
| RemoveDir { .. }
|
||||
| ShutdownInstance { .. }
|
||||
| DataFusion { .. }
|
||||
| InvalidGcArgs { .. } => StatusCode::Internal,
|
||||
|
||||
RegionNotFound { .. } => StatusCode::RegionNotFound,
|
||||
RegionNotReady { .. } => StatusCode::RegionNotReady,
|
||||
@@ -466,7 +483,7 @@ impl ErrorExt for Error {
|
||||
StopRegionEngine { source, .. } => source.status_code(),
|
||||
|
||||
FindLogicalRegions { source, .. } => source.status_code(),
|
||||
BuildMitoEngine { source, .. } => source.status_code(),
|
||||
BuildMitoEngine { source, .. } | GcMitoEngine { source, .. } => source.status_code(),
|
||||
BuildMetricEngine { source, .. } => source.status_code(),
|
||||
ListStorageSsts { source, .. } => source.status_code(),
|
||||
ConcurrentQueryLimiterClosed { .. } | ConcurrentQueryLimiterTimeout { .. } => {
|
||||
|
||||
@@ -36,14 +36,14 @@ use common_workload::DatanodeWorkloadType;
|
||||
use meta_client::MetaClientRef;
|
||||
use meta_client::client::{HeartbeatSender, MetaClient};
|
||||
use servers::addrs;
|
||||
use snafu::ResultExt;
|
||||
use snafu::{OptionExt as _, ResultExt};
|
||||
use tokio::sync::{Notify, mpsc};
|
||||
use tokio::time::Instant;
|
||||
|
||||
use self::handler::RegionHeartbeatResponseHandler;
|
||||
use crate::alive_keeper::{CountdownTaskHandlerExtRef, RegionAliveKeeper};
|
||||
use crate::config::DatanodeOptions;
|
||||
use crate::error::{self, MetaClientInitSnafu, Result};
|
||||
use crate::error::{self, MetaClientInitSnafu, RegionEngineNotFoundSnafu, Result};
|
||||
use crate::event_listener::RegionServerEventReceiver;
|
||||
use crate::metrics::{self, HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
|
||||
use crate::region_server::RegionServer;
|
||||
@@ -242,12 +242,18 @@ impl HeartbeatTask {
|
||||
let total_cpu_millicores = self.resource_stat.get_total_cpu_millicores();
|
||||
let total_memory_bytes = self.resource_stat.get_total_memory_bytes();
|
||||
let resource_stat = self.resource_stat.clone();
|
||||
let gc_limiter = self
|
||||
.region_server
|
||||
.mito_engine()
|
||||
.context(RegionEngineNotFoundSnafu { name: "mito" })?
|
||||
.gc_limiter();
|
||||
|
||||
common_runtime::spawn_hb(async move {
|
||||
let sleep = tokio::time::sleep(Duration::from_millis(0));
|
||||
tokio::pin!(sleep);
|
||||
|
||||
let build_info = common_version::build_info();
|
||||
|
||||
let heartbeat_request = HeartbeatRequest {
|
||||
peer: self_peer,
|
||||
node_epoch,
|
||||
@@ -283,8 +289,13 @@ impl HeartbeatTask {
|
||||
if let Some(message) = message {
|
||||
match outgoing_message_to_mailbox_message(message) {
|
||||
Ok(message) => {
|
||||
let mut extensions = heartbeat_request.extensions.clone();
|
||||
let gc_stat = gc_limiter.gc_stat();
|
||||
gc_stat.into_extensions(&mut extensions);
|
||||
|
||||
let req = HeartbeatRequest {
|
||||
mailbox_message: Some(message),
|
||||
extensions,
|
||||
..heartbeat_request.clone()
|
||||
};
|
||||
HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
|
||||
@@ -305,10 +316,16 @@ impl HeartbeatTask {
|
||||
let topic_stats = region_server_clone.topic_stats();
|
||||
let now = Instant::now();
|
||||
let duration_since_epoch = (now - epoch).as_millis() as u64;
|
||||
|
||||
let mut extensions = heartbeat_request.extensions.clone();
|
||||
let gc_stat = gc_limiter.gc_stat();
|
||||
gc_stat.into_extensions(&mut extensions);
|
||||
|
||||
let mut req = HeartbeatRequest {
|
||||
region_stats,
|
||||
topic_stats,
|
||||
duration_since_epoch,
|
||||
extensions,
|
||||
..heartbeat_request.clone()
|
||||
};
|
||||
|
||||
|
||||
@@ -20,16 +20,21 @@ use common_meta::heartbeat::handler::{
|
||||
use common_meta::instruction::{Instruction, InstructionReply};
|
||||
use common_telemetry::error;
|
||||
use snafu::OptionExt;
|
||||
use store_api::storage::GcReport;
|
||||
|
||||
mod close_region;
|
||||
mod downgrade_region;
|
||||
mod file_ref;
|
||||
mod flush_region;
|
||||
mod gc_worker;
|
||||
mod open_region;
|
||||
mod upgrade_region;
|
||||
|
||||
use crate::heartbeat::handler::close_region::CloseRegionsHandler;
|
||||
use crate::heartbeat::handler::downgrade_region::DowngradeRegionsHandler;
|
||||
use crate::heartbeat::handler::file_ref::GetFileRefsHandler;
|
||||
use crate::heartbeat::handler::flush_region::FlushRegionsHandler;
|
||||
use crate::heartbeat::handler::gc_worker::GcRegionsHandler;
|
||||
use crate::heartbeat::handler::open_region::OpenRegionsHandler;
|
||||
use crate::heartbeat::handler::upgrade_region::UpgradeRegionsHandler;
|
||||
use crate::heartbeat::task_tracker::TaskTracker;
|
||||
@@ -43,6 +48,7 @@ pub struct RegionHeartbeatResponseHandler {
|
||||
downgrade_tasks: TaskTracker<()>,
|
||||
flush_tasks: TaskTracker<()>,
|
||||
open_region_parallelism: usize,
|
||||
gc_tasks: TaskTracker<GcReport>,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
@@ -61,6 +67,7 @@ pub struct HandlerContext {
|
||||
catchup_tasks: TaskTracker<()>,
|
||||
downgrade_tasks: TaskTracker<()>,
|
||||
flush_tasks: TaskTracker<()>,
|
||||
gc_tasks: TaskTracker<GcReport>,
|
||||
}
|
||||
|
||||
impl HandlerContext {
|
||||
@@ -71,6 +78,7 @@ impl HandlerContext {
|
||||
catchup_tasks: TaskTracker::new(),
|
||||
downgrade_tasks: TaskTracker::new(),
|
||||
flush_tasks: TaskTracker::new(),
|
||||
gc_tasks: TaskTracker::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -85,6 +93,7 @@ impl RegionHeartbeatResponseHandler {
|
||||
flush_tasks: TaskTracker::new(),
|
||||
// Default to half of the number of CPUs.
|
||||
open_region_parallelism: (num_cpus::get() / 2).max(1),
|
||||
gc_tasks: TaskTracker::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -106,6 +115,8 @@ impl RegionHeartbeatResponseHandler {
|
||||
Instruction::FlushRegions(_) => Ok(Box::new(FlushRegionsHandler.into())),
|
||||
Instruction::DowngradeRegions(_) => Ok(Box::new(DowngradeRegionsHandler.into())),
|
||||
Instruction::UpgradeRegion(_) => Ok(Box::new(UpgradeRegionsHandler.into())),
|
||||
Instruction::GetFileRefs(_) => Ok(Box::new(GetFileRefsHandler.into())),
|
||||
Instruction::GcRegions(_) => Ok(Box::new(GcRegionsHandler.into())),
|
||||
Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(),
|
||||
}
|
||||
}
|
||||
@@ -118,6 +129,8 @@ pub enum InstructionHandlers {
|
||||
FlushRegions(FlushRegionsHandler),
|
||||
DowngradeRegions(DowngradeRegionsHandler),
|
||||
UpgradeRegions(UpgradeRegionsHandler),
|
||||
GetFileRefs(GetFileRefsHandler),
|
||||
GcRegions(GcRegionsHandler),
|
||||
}
|
||||
|
||||
macro_rules! impl_from_handler {
|
||||
@@ -137,7 +150,9 @@ impl_from_handler!(
|
||||
OpenRegionsHandler => OpenRegions,
|
||||
FlushRegionsHandler => FlushRegions,
|
||||
DowngradeRegionsHandler => DowngradeRegions,
|
||||
UpgradeRegionsHandler => UpgradeRegions
|
||||
UpgradeRegionsHandler => UpgradeRegions,
|
||||
GetFileRefsHandler => GetFileRefs,
|
||||
GcRegionsHandler => GcRegions
|
||||
);
|
||||
|
||||
macro_rules! dispatch_instr {
|
||||
@@ -180,6 +195,8 @@ dispatch_instr!(
|
||||
FlushRegions => FlushRegions,
|
||||
DowngradeRegions => DowngradeRegions,
|
||||
UpgradeRegion => UpgradeRegions,
|
||||
GetFileRefs => GetFileRefs,
|
||||
GcRegions => GcRegions,
|
||||
);
|
||||
|
||||
#[async_trait]
|
||||
@@ -202,6 +219,7 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
|
||||
let catchup_tasks = self.catchup_tasks.clone();
|
||||
let downgrade_tasks = self.downgrade_tasks.clone();
|
||||
let flush_tasks = self.flush_tasks.clone();
|
||||
let gc_tasks = self.gc_tasks.clone();
|
||||
let handler = self.build_handler(&instruction)?;
|
||||
let _handle = common_runtime::spawn_global(async move {
|
||||
let reply = handler
|
||||
@@ -211,6 +229,7 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
|
||||
catchup_tasks,
|
||||
downgrade_tasks,
|
||||
flush_tasks,
|
||||
gc_tasks,
|
||||
},
|
||||
instruction,
|
||||
)
|
||||
|
||||
62
src/datanode/src/heartbeat/handler/file_ref.rs
Normal file
62
src/datanode/src/heartbeat/handler/file_ref.rs
Normal file
@@ -0,0 +1,62 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_meta::instruction::{GetFileRefs, GetFileRefsReply, InstructionReply};
|
||||
use store_api::storage::FileRefsManifest;
|
||||
|
||||
use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
|
||||
|
||||
pub struct GetFileRefsHandler;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl InstructionHandler for GetFileRefsHandler {
|
||||
type Instruction = GetFileRefs;
|
||||
|
||||
async fn handle(
|
||||
&self,
|
||||
ctx: &HandlerContext,
|
||||
get_file_refs: Self::Instruction,
|
||||
) -> Option<InstructionReply> {
|
||||
let region_server = &ctx.region_server;
|
||||
|
||||
// Get the MitoEngine
|
||||
let Some(mito_engine) = region_server.mito_engine() else {
|
||||
return Some(InstructionReply::GetFileRefs(GetFileRefsReply {
|
||||
file_refs_manifest: FileRefsManifest::default(),
|
||||
success: false,
|
||||
error: Some("MitoEngine not found".to_string()),
|
||||
}));
|
||||
};
|
||||
|
||||
match mito_engine
|
||||
.get_snapshot_of_unmanifested_refs(get_file_refs.region_ids)
|
||||
.await
|
||||
{
|
||||
Ok(all_file_refs) => {
|
||||
// Return the file references
|
||||
Some(InstructionReply::GetFileRefs(GetFileRefsReply {
|
||||
file_refs_manifest: all_file_refs,
|
||||
success: true,
|
||||
error: None,
|
||||
}))
|
||||
}
|
||||
Err(e) => Some(InstructionReply::GetFileRefs(GetFileRefsReply {
|
||||
file_refs_manifest: FileRefsManifest::default(),
|
||||
success: false,
|
||||
error: Some(format!("Failed to get file refs: {}", e.output_msg())),
|
||||
})),
|
||||
}
|
||||
}
|
||||
}
|
||||
156
src/datanode/src/heartbeat/handler/gc_worker.rs
Normal file
156
src/datanode/src/heartbeat/handler/gc_worker.rs
Normal file
@@ -0,0 +1,156 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_meta::instruction::{GcRegions, GcRegionsReply, InstructionReply};
|
||||
use common_telemetry::{debug, warn};
|
||||
use mito2::gc::LocalGcWorker;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::storage::{FileRefsManifest, RegionId};
|
||||
|
||||
use crate::error::{GcMitoEngineSnafu, InvalidGcArgsSnafu, Result, UnexpectedSnafu};
|
||||
use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
|
||||
|
||||
pub struct GcRegionsHandler;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl InstructionHandler for GcRegionsHandler {
|
||||
type Instruction = GcRegions;
|
||||
|
||||
async fn handle(
|
||||
&self,
|
||||
ctx: &HandlerContext,
|
||||
gc_regions: Self::Instruction,
|
||||
) -> Option<InstructionReply> {
|
||||
let region_ids = gc_regions.regions.clone();
|
||||
debug!("Received gc regions instruction: {:?}", region_ids);
|
||||
|
||||
let is_same_table = region_ids.windows(2).all(|w| {
|
||||
let t1 = w[0].table_id();
|
||||
let t2 = w[1].table_id();
|
||||
t1 == t2
|
||||
});
|
||||
if !is_same_table {
|
||||
return Some(InstructionReply::GcRegions(GcRegionsReply {
|
||||
result: Err(format!(
|
||||
"Regions to GC should belong to the same table, found: {:?}",
|
||||
region_ids
|
||||
)),
|
||||
}));
|
||||
}
|
||||
|
||||
let (region_id, gc_worker) = match self
|
||||
.create_gc_worker(
|
||||
ctx,
|
||||
region_ids,
|
||||
&gc_regions.file_refs_manifest,
|
||||
gc_regions.full_file_listing,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(worker) => worker,
|
||||
Err(e) => {
|
||||
return Some(InstructionReply::GcRegions(GcRegionsReply {
|
||||
result: Err(format!("Failed to create GC worker: {}", e)),
|
||||
}));
|
||||
}
|
||||
};
|
||||
|
||||
let register_result = ctx
|
||||
.gc_tasks
|
||||
.try_register(
|
||||
region_id,
|
||||
Box::pin(async move {
|
||||
debug!("Starting gc worker for region {}", region_id);
|
||||
let report = gc_worker
|
||||
.run()
|
||||
.await
|
||||
.context(GcMitoEngineSnafu { region_id })?;
|
||||
debug!("Gc worker for region {} finished", region_id);
|
||||
Ok(report)
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
if register_result.is_busy() {
|
||||
warn!("Another gc task is running for the region: {region_id}");
|
||||
return Some(InstructionReply::GcRegions(GcRegionsReply {
|
||||
result: Err(format!(
|
||||
"Another gc task is running for the region: {region_id}"
|
||||
)),
|
||||
}));
|
||||
}
|
||||
let mut watcher = register_result.into_watcher();
|
||||
let result = ctx.gc_tasks.wait_until_finish(&mut watcher).await;
|
||||
match result {
|
||||
Ok(report) => Some(InstructionReply::GcRegions(GcRegionsReply {
|
||||
result: Ok(report),
|
||||
})),
|
||||
Err(err) => Some(InstructionReply::GcRegions(GcRegionsReply {
|
||||
result: Err(format!("{err:?}")),
|
||||
})),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl GcRegionsHandler {
|
||||
async fn create_gc_worker(
|
||||
&self,
|
||||
ctx: &HandlerContext,
|
||||
mut region_ids: Vec<RegionId>,
|
||||
file_ref_manifest: &FileRefsManifest,
|
||||
full_file_listing: bool,
|
||||
) -> Result<(RegionId, LocalGcWorker)> {
|
||||
// always use the smallest region id on datanode as the target region id
|
||||
region_ids.sort_by_key(|r| r.region_number());
|
||||
let mito_engine = ctx
|
||||
.region_server
|
||||
.mito_engine()
|
||||
.with_context(|| UnexpectedSnafu {
|
||||
violated: "MitoEngine not found".to_string(),
|
||||
})?;
|
||||
let region_id = *region_ids.first().with_context(|| UnexpectedSnafu {
|
||||
violated: "No region ids provided".to_string(),
|
||||
})?;
|
||||
|
||||
let mito_config = mito_engine.mito_config();
|
||||
|
||||
// Find the access layer from one of the regions that exists on this datanode
|
||||
let access_layer = region_ids
|
||||
.iter()
|
||||
.find_map(|rid| mito_engine.find_region(*rid))
|
||||
.with_context(|| InvalidGcArgsSnafu {
|
||||
msg: format!(
|
||||
"None of the regions is on current datanode:{:?}",
|
||||
region_ids
|
||||
),
|
||||
})?
|
||||
.access_layer();
|
||||
|
||||
let cache_manager = mito_engine.cache_manager();
|
||||
|
||||
let gc_worker = LocalGcWorker::try_new(
|
||||
access_layer.clone(),
|
||||
Some(cache_manager),
|
||||
region_ids.into_iter().collect(),
|
||||
Default::default(),
|
||||
mito_config.clone().into(),
|
||||
file_ref_manifest.clone(),
|
||||
&mito_engine.gc_limiter(),
|
||||
full_file_listing,
|
||||
)
|
||||
.await
|
||||
.context(GcMitoEngineSnafu { region_id })?;
|
||||
|
||||
Ok((region_id, gc_worker))
|
||||
}
|
||||
}
|
||||
@@ -158,6 +158,27 @@ impl RegionServer {
|
||||
}
|
||||
}
|
||||
|
||||
/// Gets the MitoEngine if it's registered.
|
||||
pub fn mito_engine(&self) -> Option<MitoEngine> {
|
||||
if let Some(mito) = self.inner.mito_engine.read().unwrap().clone() {
|
||||
Some(mito)
|
||||
} else {
|
||||
self.inner
|
||||
.engines
|
||||
.read()
|
||||
.unwrap()
|
||||
.get(MITO_ENGINE_NAME)
|
||||
.cloned()
|
||||
.and_then(|e| {
|
||||
let mito = e.as_any().downcast_ref::<MitoEngine>().cloned();
|
||||
if mito.is_none() {
|
||||
warn!("Mito engine not found in region server engines");
|
||||
}
|
||||
mito
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub async fn handle_batch_open_requests(
|
||||
&self,
|
||||
@@ -676,14 +697,14 @@ struct RegionServerInner {
|
||||
runtime: Runtime,
|
||||
event_listener: RegionServerEventListenerRef,
|
||||
table_provider_factory: TableProviderFactoryRef,
|
||||
// The number of queries allowed to be executed at the same time.
|
||||
// Act as last line of defense on datanode to prevent query overloading.
|
||||
/// The number of queries allowed to be executed at the same time.
|
||||
/// Act as last line of defense on datanode to prevent query overloading.
|
||||
parallelism: Option<RegionServerParallelism>,
|
||||
// The topic stats reporter.
|
||||
/// The topic stats reporter.
|
||||
topic_stats_reporter: RwLock<Option<Box<dyn TopicStatsReporter>>>,
|
||||
// HACK(zhongzc): Direct MitoEngine handle for diagnostics. This couples the
|
||||
// server with a concrete engine; acceptable for now to fetch Mito-specific
|
||||
// info (e.g., list SSTs). Consider a diagnostics trait later.
|
||||
/// HACK(zhongzc): Direct MitoEngine handle for diagnostics. This couples the
|
||||
/// server with a concrete engine; acceptable for now to fetch Mito-specific
|
||||
/// info (e.g., list SSTs). Consider a diagnostics trait later.
|
||||
mito_engine: RwLock<Option<MitoEngine>>,
|
||||
}
|
||||
|
||||
|
||||
@@ -254,7 +254,7 @@ mod tests {
|
||||
assert_eq!(status, http::StatusCode::OK);
|
||||
assert_eq!(
|
||||
body,
|
||||
"[[{\"timestamp_millis\":3,\"id\":0,\"addr\":\"127.0.0.1:3001\",\"rcus\":0,\"wcus\":0,\"region_num\":0,\"region_stats\":[],\"topic_stats\":[],\"node_epoch\":0,\"datanode_workloads\":{\"types\":[]}}]]"
|
||||
"[[{\"timestamp_millis\":3,\"id\":0,\"addr\":\"127.0.0.1:3001\",\"rcus\":0,\"wcus\":0,\"region_num\":0,\"region_stats\":[],\"topic_stats\":[],\"node_epoch\":0,\"datanode_workloads\":{\"types\":[]},\"gc_stat\":null}]]"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ use serde::{Deserialize, Serialize};
|
||||
use serde_with::serde_as;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::gc::GcConfig;
|
||||
use crate::memtable::MemtableConfig;
|
||||
use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
|
||||
|
||||
@@ -148,6 +149,8 @@ pub struct MitoConfig {
|
||||
/// Whether to enable experimental flat format as the default format.
|
||||
/// When enabled, forces using BulkMemtable and BulkMemtableBuilder.
|
||||
pub default_experimental_flat_format: bool,
|
||||
|
||||
pub gc: GcConfig,
|
||||
}
|
||||
|
||||
impl Default for MitoConfig {
|
||||
@@ -186,6 +189,7 @@ impl Default for MitoConfig {
|
||||
memtable: MemtableConfig::default(),
|
||||
min_compaction_interval: Duration::from_secs(0),
|
||||
default_experimental_flat_format: false,
|
||||
gc: GcConfig::default(),
|
||||
};
|
||||
|
||||
// Adjust buffer and cache size according to system memory if we can.
|
||||
|
||||
@@ -102,7 +102,7 @@ use store_api::region_engine::{
|
||||
};
|
||||
use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
|
||||
use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry};
|
||||
use store_api::storage::{FileId, RegionId, ScanRequest, SequenceNumber};
|
||||
use store_api::storage::{FileId, FileRefsManifest, RegionId, ScanRequest, SequenceNumber};
|
||||
use tokio::sync::{Semaphore, oneshot};
|
||||
|
||||
use crate::access_layer::RegionFilePathFactory;
|
||||
@@ -115,6 +115,7 @@ use crate::error::{
|
||||
};
|
||||
#[cfg(feature = "enterprise")]
|
||||
use crate::extension::BoxedExtensionRangeProviderFactory;
|
||||
use crate::gc::GcLimiterRef;
|
||||
use crate::manifest::action::RegionEdit;
|
||||
use crate::memtable::MemtableStats;
|
||||
use crate::metrics::HANDLE_REQUEST_ELAPSED;
|
||||
@@ -261,6 +262,33 @@ impl MitoEngine {
|
||||
self.inner.workers.file_ref_manager()
|
||||
}
|
||||
|
||||
pub fn gc_limiter(&self) -> GcLimiterRef {
|
||||
self.inner.workers.gc_limiter()
|
||||
}
|
||||
|
||||
/// Get all tmp ref files for given region ids, excluding files that's already in manifest.
|
||||
pub async fn get_snapshot_of_unmanifested_refs(
|
||||
&self,
|
||||
region_ids: impl IntoIterator<Item = RegionId>,
|
||||
) -> Result<FileRefsManifest> {
|
||||
let file_ref_mgr = self.file_ref_manager();
|
||||
|
||||
let region_ids = region_ids.into_iter().collect::<Vec<_>>();
|
||||
|
||||
// Convert region IDs to MitoRegionRef objects, error if any region doesn't exist
|
||||
let regions: Vec<MitoRegionRef> = region_ids
|
||||
.into_iter()
|
||||
.map(|region_id| {
|
||||
self.find_region(region_id)
|
||||
.with_context(|| RegionNotFoundSnafu { region_id })
|
||||
})
|
||||
.collect::<Result<_>>()?;
|
||||
|
||||
file_ref_mgr
|
||||
.get_snapshot_of_unmanifested_refs(regions)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Returns true if the specific region exists.
|
||||
pub fn is_region_exists(&self, region_id: RegionId) -> bool {
|
||||
self.inner.workers.is_region_exists(region_id)
|
||||
@@ -357,7 +385,7 @@ impl MitoEngine {
|
||||
self.find_region(id)
|
||||
}
|
||||
|
||||
pub(crate) fn find_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
|
||||
pub fn find_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
|
||||
self.inner.workers.get_region(region_id)
|
||||
}
|
||||
|
||||
|
||||
@@ -1121,6 +1121,12 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("GC job permit exhausted"))]
|
||||
TooManyGcJobs {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
@@ -1291,7 +1297,7 @@ impl ErrorExt for Error {
|
||||
|
||||
InconsistentTimestampLength { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
TooManyFilesToRead { .. } => StatusCode::RateLimited,
|
||||
TooManyFilesToRead { .. } | TooManyGcJobs { .. } => StatusCode::RateLimited,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -22,14 +22,17 @@
|
||||
//!
|
||||
|
||||
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use common_telemetry::{error, info, warn};
|
||||
use common_meta::datanode::GcStat;
|
||||
use common_telemetry::{debug, error, info, warn};
|
||||
use common_time::Timestamp;
|
||||
use object_store::{Entry, Lister};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{OptionExt, ResultExt as _, ensure};
|
||||
use store_api::storage::{FileId, RegionId};
|
||||
use store_api::storage::{FileId, FileRefsManifest, GcReport, RegionId};
|
||||
use tokio::sync::{OwnedSemaphorePermit, TryAcquireError};
|
||||
use tokio_stream::StreamExt;
|
||||
|
||||
use crate::access_layer::AccessLayerRef;
|
||||
@@ -37,26 +40,64 @@ use crate::cache::CacheManagerRef;
|
||||
use crate::config::MitoConfig;
|
||||
use crate::error::{
|
||||
DurationOutOfRangeSnafu, EmptyRegionDirSnafu, JoinSnafu, OpenDalSnafu, RegionNotFoundSnafu,
|
||||
Result, UnexpectedSnafu,
|
||||
Result, TooManyGcJobsSnafu, UnexpectedSnafu,
|
||||
};
|
||||
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, RemoveFileOptions};
|
||||
use crate::manifest::storage::manifest_compress_type;
|
||||
use crate::metrics::GC_FILE_CNT;
|
||||
use crate::metrics::GC_DEL_FILE_CNT;
|
||||
use crate::region::opener::new_manifest_dir;
|
||||
use crate::sst::file::delete_files;
|
||||
use crate::sst::file_ref::TableFileRefsManifest;
|
||||
use crate::sst::location::{self, region_dir_from_table_dir};
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub struct GcReport {
|
||||
/// deleted files per region
|
||||
pub deleted_files: HashMap<RegionId, Vec<FileId>>,
|
||||
/// Regions that need retry in next gc round, usually because their tmp ref files are outdated
|
||||
pub need_retry_regions: HashSet<RegionId>,
|
||||
/// Limit the amount of concurrent GC jobs on the datanode
|
||||
pub struct GcLimiter {
|
||||
pub gc_job_limit: Arc<tokio::sync::Semaphore>,
|
||||
gc_concurrency: usize,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub struct FileGcOption {
|
||||
pub type GcLimiterRef = Arc<GcLimiter>;
|
||||
|
||||
impl GcLimiter {
|
||||
pub fn new(gc_concurrency: usize) -> Self {
|
||||
Self {
|
||||
gc_job_limit: Arc::new(tokio::sync::Semaphore::new(gc_concurrency)),
|
||||
gc_concurrency,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn running_gc_tasks(&self) -> u32 {
|
||||
(self.gc_concurrency - self.gc_job_limit.available_permits()) as u32
|
||||
}
|
||||
|
||||
pub fn gc_concurrency(&self) -> u32 {
|
||||
self.gc_concurrency as u32
|
||||
}
|
||||
|
||||
pub fn gc_stat(&self) -> GcStat {
|
||||
GcStat::new(self.running_gc_tasks(), self.gc_concurrency())
|
||||
}
|
||||
|
||||
/// Try to acquire a permit for a GC job.
|
||||
///
|
||||
/// If no permit is available, returns an `TooManyGcJobs` error.
|
||||
pub fn permit(&self) -> Result<OwnedSemaphorePermit> {
|
||||
self.gc_job_limit
|
||||
.clone()
|
||||
.try_acquire_owned()
|
||||
.map_err(|e| match e {
|
||||
TryAcquireError::Closed => UnexpectedSnafu {
|
||||
reason: format!("Failed to acquire gc permit: {e}"),
|
||||
}
|
||||
.build(),
|
||||
TryAcquireError::NoPermits => TooManyGcJobsSnafu {}.build(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct GcConfig {
|
||||
/// Whether GC is enabled.
|
||||
pub enable: bool,
|
||||
/// Lingering time before deleting files.
|
||||
/// Should be long enough to allow long running queries to finish.
|
||||
///
|
||||
@@ -73,16 +114,22 @@ pub struct FileGcOption {
|
||||
/// Maximum concurrent list operations per GC job.
|
||||
/// This is used to limit the number of concurrent listing operations and speed up listing.
|
||||
pub max_concurrent_lister_per_gc_job: usize,
|
||||
/// Maximum concurrent GC jobs.
|
||||
/// This is used to limit the number of concurrent GC jobs running on the datanode
|
||||
/// to prevent too many concurrent GC jobs from overwhelming the datanode.
|
||||
pub max_concurrent_gc_job: usize,
|
||||
}
|
||||
|
||||
impl Default for FileGcOption {
|
||||
impl Default for GcConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
enable: false,
|
||||
// expect long running queries to be finished within a reasonable time
|
||||
lingering_time: Duration::from_secs(60 * 5),
|
||||
// 6 hours, for unknown expel time, which is when this file get removed from manifest, it should rarely happen, can keep it longer
|
||||
unknown_file_lingering_time: Duration::from_secs(60 * 60 * 6),
|
||||
max_concurrent_lister_per_gc_job: 32,
|
||||
max_concurrent_gc_job: 4,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -92,13 +139,23 @@ pub struct LocalGcWorker {
|
||||
pub(crate) cache_manager: Option<CacheManagerRef>,
|
||||
pub(crate) manifest_mgrs: HashMap<RegionId, RegionManifestManager>,
|
||||
/// Lingering time before deleting files.
|
||||
pub(crate) opt: FileGcOption,
|
||||
pub(crate) opt: GcConfig,
|
||||
pub(crate) manifest_open_config: ManifestOpenConfig,
|
||||
/// Tmp ref files manifest, used to determine which files are still in use by ongoing queries.
|
||||
///
|
||||
/// Also contains manifest versions of regions when the tmp ref files are generated.
|
||||
/// Used to determine whether the tmp ref files are outdated.
|
||||
pub(crate) file_ref_manifest: TableFileRefsManifest,
|
||||
pub(crate) file_ref_manifest: FileRefsManifest,
|
||||
_permit: OwnedSemaphorePermit,
|
||||
/// Whether to perform full file listing during GC.
|
||||
/// When set to false, GC will only delete files that are tracked in the manifest's removed_files,
|
||||
/// which can significantly improve performance by avoiding expensive list operations.
|
||||
/// When set to true, GC will perform a full listing to find and delete orphan files
|
||||
/// (files not tracked in the manifest).
|
||||
///
|
||||
/// Set to false for regular GC operations to optimize performance.
|
||||
/// Set to true periodically or when you need to clean up orphan files.
|
||||
pub full_file_listing: bool,
|
||||
}
|
||||
|
||||
pub struct ManifestOpenConfig {
|
||||
@@ -125,13 +182,16 @@ impl LocalGcWorker {
|
||||
/// Create a new LocalGcWorker, with `regions_to_gc` regions to GC.
|
||||
/// The regions are specified by their `RegionId` and should all belong to the same table.
|
||||
///
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn try_new(
|
||||
access_layer: AccessLayerRef,
|
||||
cache_manager: Option<CacheManagerRef>,
|
||||
regions_to_gc: BTreeSet<RegionId>,
|
||||
opt: FileGcOption,
|
||||
opt: GcConfig,
|
||||
manifest_open_config: ManifestOpenConfig,
|
||||
file_ref_manifest: TableFileRefsManifest,
|
||||
file_ref_manifest: FileRefsManifest,
|
||||
limiter: &GcLimiterRef,
|
||||
full_file_listing: bool,
|
||||
) -> Result<Self> {
|
||||
let table_id = regions_to_gc
|
||||
.first()
|
||||
@@ -139,6 +199,7 @@ impl LocalGcWorker {
|
||||
reason: "Expect at least one region, found none",
|
||||
})?
|
||||
.table_id();
|
||||
let permit = limiter.permit()?;
|
||||
let mut zelf = Self {
|
||||
access_layer,
|
||||
cache_manager,
|
||||
@@ -146,6 +207,8 @@ impl LocalGcWorker {
|
||||
opt,
|
||||
manifest_open_config,
|
||||
file_ref_manifest,
|
||||
_permit: permit,
|
||||
full_file_listing,
|
||||
};
|
||||
|
||||
// dedup just in case
|
||||
@@ -193,15 +256,15 @@ impl LocalGcWorker {
|
||||
// TODO(discord9): verify manifest version before reading tmp ref files
|
||||
|
||||
let mut tmp_ref_files = HashMap::new();
|
||||
for file_ref in &self.file_ref_manifest.file_refs {
|
||||
if outdated_regions.contains(&file_ref.region_id) {
|
||||
for (region_id, file_refs) in &self.file_ref_manifest.file_refs {
|
||||
if outdated_regions.contains(region_id) {
|
||||
// skip outdated regions
|
||||
continue;
|
||||
}
|
||||
tmp_ref_files
|
||||
.entry(file_ref.region_id)
|
||||
.entry(*region_id)
|
||||
.or_insert_with(HashSet::new)
|
||||
.insert(file_ref.file_id);
|
||||
.extend(file_refs.clone());
|
||||
}
|
||||
|
||||
Ok(tmp_ref_files)
|
||||
@@ -220,14 +283,14 @@ impl LocalGcWorker {
|
||||
let mut deleted_files = HashMap::new();
|
||||
let tmp_ref_files = self.read_tmp_ref_files(&mut outdated_regions).await?;
|
||||
for region_id in self.manifest_mgrs.keys() {
|
||||
info!("Doing gc for region {}", region_id);
|
||||
debug!("Doing gc for region {}", region_id);
|
||||
let tmp_ref_files = tmp_ref_files
|
||||
.get(region_id)
|
||||
.cloned()
|
||||
.unwrap_or_else(HashSet::new);
|
||||
let files = self.do_region_gc(*region_id, &tmp_ref_files).await?;
|
||||
deleted_files.insert(*region_id, files);
|
||||
info!("Gc for region {} finished", region_id);
|
||||
debug!("Gc for region {} finished", region_id);
|
||||
}
|
||||
info!(
|
||||
"LocalGcWorker finished after {} secs.",
|
||||
@@ -244,7 +307,7 @@ impl LocalGcWorker {
|
||||
impl LocalGcWorker {
|
||||
/// concurrency of listing files per region.
|
||||
/// This is used to limit the number of concurrent listing operations and speed up listing
|
||||
pub const CONCURRENCY_LIST_PER_FILES: usize = 512;
|
||||
pub const CONCURRENCY_LIST_PER_FILES: usize = 1024;
|
||||
|
||||
/// Perform GC for the region.
|
||||
/// 1. Get all the removed files in delta manifest files and their expel times
|
||||
@@ -259,7 +322,7 @@ impl LocalGcWorker {
|
||||
region_id: RegionId,
|
||||
tmp_ref_files: &HashSet<FileId>,
|
||||
) -> Result<Vec<FileId>> {
|
||||
info!("Doing gc for region {}", region_id);
|
||||
debug!("Doing gc for region {}", region_id);
|
||||
let manifest = self
|
||||
.manifest_mgrs
|
||||
.get(®ion_id)
|
||||
@@ -272,10 +335,10 @@ impl LocalGcWorker {
|
||||
|
||||
if recently_removed_files.is_empty() {
|
||||
// no files to remove, skip
|
||||
info!("No recently removed files to gc for region {}", region_id);
|
||||
debug!("No recently removed files to gc for region {}", region_id);
|
||||
}
|
||||
|
||||
info!(
|
||||
debug!(
|
||||
"Found {} recently removed files sets for region {}",
|
||||
recently_removed_files.len(),
|
||||
region_id
|
||||
@@ -291,27 +354,20 @@ impl LocalGcWorker {
|
||||
.chain(tmp_ref_files.clone().into_iter())
|
||||
.collect();
|
||||
|
||||
let true_tmp_ref_files = tmp_ref_files
|
||||
.iter()
|
||||
.filter(|f| !current_files.contains_key(f))
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
info!("True tmp ref files: {:?}", true_tmp_ref_files);
|
||||
|
||||
let unused_files = self
|
||||
.list_to_be_deleted_files(region_id, in_used, recently_removed_files, concurrency)
|
||||
.await?;
|
||||
|
||||
let unused_len = unused_files.len();
|
||||
|
||||
info!(
|
||||
debug!(
|
||||
"Found {} unused files to delete for region {}",
|
||||
unused_len, region_id
|
||||
);
|
||||
|
||||
self.delete_files(region_id, &unused_files).await?;
|
||||
|
||||
info!(
|
||||
debug!(
|
||||
"Successfully deleted {} unused files for region {}",
|
||||
unused_len, region_id
|
||||
);
|
||||
@@ -329,7 +385,8 @@ impl LocalGcWorker {
|
||||
)
|
||||
.await?;
|
||||
|
||||
GC_FILE_CNT.add(file_ids.len() as i64);
|
||||
// FIXME(discord9): if files are already deleted before calling delete_files, the metric will be inaccurate, no clean way to fix it now
|
||||
GC_DEL_FILE_CNT.add(file_ids.len() as i64);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -491,7 +548,7 @@ impl LocalGcWorker {
|
||||
entries: Vec<Entry>,
|
||||
in_use_filenames: &HashSet<&FileId>,
|
||||
may_linger_filenames: &HashSet<&FileId>,
|
||||
all_files_appear_in_delta_manifests: &HashSet<&FileId>,
|
||||
eligible_for_removal: &HashSet<&FileId>,
|
||||
unknown_file_may_linger_until: chrono::DateTime<chrono::Utc>,
|
||||
) -> (Vec<FileId>, HashSet<FileId>) {
|
||||
let mut all_unused_files_ready_for_delete = vec![];
|
||||
@@ -515,7 +572,7 @@ impl LocalGcWorker {
|
||||
let should_delete = !in_use_filenames.contains(&file_id)
|
||||
&& !may_linger_filenames.contains(&file_id)
|
||||
&& {
|
||||
if !all_files_appear_in_delta_manifests.contains(&file_id) {
|
||||
if !eligible_for_removal.contains(&file_id) {
|
||||
// if the file's expel time is unknown(because not appear in delta manifest), we keep it for a while
|
||||
// using it's last modified time
|
||||
// notice unknown files use a different lingering time
|
||||
@@ -541,6 +598,11 @@ impl LocalGcWorker {
|
||||
/// Concurrently list unused files in the region dir
|
||||
/// because there may be a lot of files in the region dir
|
||||
/// and listing them may take a long time.
|
||||
///
|
||||
/// When `full_file_listing` is false, this method will only delete files tracked in
|
||||
/// `recently_removed_files` without performing expensive list operations, which significantly
|
||||
/// improves performance. When `full_file_listing` is true, it performs a full listing to
|
||||
/// find and delete orphan files.
|
||||
pub async fn list_to_be_deleted_files(
|
||||
&self,
|
||||
region_id: RegionId,
|
||||
@@ -548,6 +610,7 @@ impl LocalGcWorker {
|
||||
recently_removed_files: BTreeMap<Timestamp, HashSet<FileId>>,
|
||||
concurrency: usize,
|
||||
) -> Result<Vec<FileId>> {
|
||||
let start = tokio::time::Instant::now();
|
||||
let now = chrono::Utc::now();
|
||||
let may_linger_until = now
|
||||
- chrono::Duration::from_std(self.opt.lingering_time).with_context(|_| {
|
||||
@@ -569,7 +632,7 @@ impl LocalGcWorker {
|
||||
let may_linger_files = recently_removed_files.split_off(&threshold);
|
||||
let may_linger_filenames = may_linger_files.values().flatten().collect::<HashSet<_>>();
|
||||
|
||||
let all_files_appear_in_delta_manifests = recently_removed_files
|
||||
let eligible_for_removal = recently_removed_files
|
||||
.values()
|
||||
.flatten()
|
||||
.collect::<HashSet<_>>();
|
||||
@@ -577,23 +640,56 @@ impl LocalGcWorker {
|
||||
// in use filenames, include sst and index files
|
||||
let in_use_filenames = in_used.iter().collect::<HashSet<_>>();
|
||||
|
||||
// When full_file_listing is false, skip expensive list operations and only delete
|
||||
// files that are tracked in recently_removed_files
|
||||
if !self.full_file_listing {
|
||||
// Only delete files that:
|
||||
// 1. Are in recently_removed_files (tracked in manifest)
|
||||
// 2. Are not in use
|
||||
// 3. Have passed the lingering time
|
||||
let files_to_delete: Vec<FileId> = eligible_for_removal
|
||||
.iter()
|
||||
.filter(|file_id| !in_use_filenames.contains(*file_id))
|
||||
.map(|&f| *f)
|
||||
.collect();
|
||||
|
||||
info!(
|
||||
"gc: fast mode (no full listing) cost {} secs for region {}, found {} files to delete from manifest",
|
||||
start.elapsed().as_secs_f64(),
|
||||
region_id,
|
||||
files_to_delete.len()
|
||||
);
|
||||
|
||||
return Ok(files_to_delete);
|
||||
}
|
||||
|
||||
// Full file listing mode: perform expensive list operations to find orphan files
|
||||
// Step 1: Create partitioned listers for concurrent processing
|
||||
let listers = self.partition_region_files(region_id, concurrency).await?;
|
||||
let lister_cnt = listers.len();
|
||||
|
||||
// Step 2: Concurrently list all files in the region directory
|
||||
let all_entries = self.list_region_files_concurrent(listers).await?;
|
||||
|
||||
let cnt = all_entries.len();
|
||||
|
||||
// Step 3: Filter files to determine which ones can be deleted
|
||||
let (all_unused_files_ready_for_delete, all_in_exist_linger_files) = self
|
||||
.filter_deletable_files(
|
||||
all_entries,
|
||||
&in_use_filenames,
|
||||
&may_linger_filenames,
|
||||
&all_files_appear_in_delta_manifests,
|
||||
&eligible_for_removal,
|
||||
unknown_file_may_linger_until,
|
||||
);
|
||||
|
||||
info!("All in exist linger files: {:?}", all_in_exist_linger_files);
|
||||
info!(
|
||||
"gc: full listing mode cost {} secs using {lister_cnt} lister for {cnt} files in region {}, found {} unused files to delete",
|
||||
start.elapsed().as_secs_f64(),
|
||||
region_id,
|
||||
all_unused_files_ready_for_delete.len()
|
||||
);
|
||||
debug!("All in exist linger files: {:?}", all_in_exist_linger_files);
|
||||
|
||||
Ok(all_unused_files_ready_for_delete)
|
||||
}
|
||||
|
||||
@@ -437,7 +437,7 @@ lazy_static! {
|
||||
"mito stalled write request in each worker",
|
||||
&[WORKER_LABEL]
|
||||
).unwrap();
|
||||
/// Number of ref files per table
|
||||
/// Number of ref files
|
||||
pub static ref GC_REF_FILE_CNT: IntGauge = register_int_gauge!(
|
||||
"greptime_gc_ref_file_count",
|
||||
"gc ref file count",
|
||||
@@ -458,9 +458,9 @@ lazy_static! {
|
||||
.unwrap();
|
||||
|
||||
/// Counter for the number of files deleted by the GC worker.
|
||||
pub static ref GC_FILE_CNT: IntGauge =
|
||||
pub static ref GC_DEL_FILE_CNT: IntGauge =
|
||||
register_int_gauge!(
|
||||
"greptime_mito_gc_file_count",
|
||||
"greptime_mito_gc_delete_file_count",
|
||||
"mito gc deleted file count",
|
||||
).unwrap();
|
||||
}
|
||||
|
||||
@@ -565,6 +565,10 @@ impl MitoRegion {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn access_layer(&self) -> AccessLayerRef {
|
||||
self.access_layer.clone()
|
||||
}
|
||||
|
||||
/// Returns the SST entries of the region.
|
||||
pub async fn manifest_sst_entries(&self) -> Vec<ManifestSstEntry> {
|
||||
let table_dir = self.table_dir();
|
||||
|
||||
@@ -162,6 +162,7 @@ impl FilePurger for ObjectStoreFilePurger {
|
||||
// notice that no matter whether the file is deleted or not, we need to remove the reference
|
||||
// because the file is no longer in use nonetheless.
|
||||
self.file_ref_manager.remove_file(&file_meta);
|
||||
// TODO(discord9): consider impl a .tombstone file to reduce files needed to list
|
||||
}
|
||||
|
||||
fn new_file(&self, file_meta: &FileMeta) {
|
||||
|
||||
@@ -17,38 +17,23 @@ use std::sync::Arc;
|
||||
|
||||
use common_telemetry::debug;
|
||||
use dashmap::{DashMap, Entry};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use store_api::ManifestVersion;
|
||||
use store_api::storage::{FileId, RegionId, TableId};
|
||||
use store_api::storage::{FileRef, FileRefsManifest, RegionId};
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::metrics::GC_REF_FILE_CNT;
|
||||
use crate::region::RegionMapRef;
|
||||
use crate::region::MitoRegionRef;
|
||||
use crate::sst::file::FileMeta;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||
pub struct FileRef {
|
||||
pub region_id: RegionId,
|
||||
pub file_id: FileId,
|
||||
}
|
||||
|
||||
impl FileRef {
|
||||
pub fn new(region_id: RegionId, file_id: FileId) -> Self {
|
||||
Self { region_id, file_id }
|
||||
}
|
||||
}
|
||||
|
||||
/// File references for a table.
|
||||
/// It contains all files referenced by the table.
|
||||
/// File references for a region.
|
||||
/// It contains all files referenced by the region.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct TableFileRefs {
|
||||
pub struct RegionFileRefs {
|
||||
/// (FileRef, Ref Count) meaning how many FileHandleInner is opened for this file.
|
||||
pub files: HashMap<FileRef, usize>,
|
||||
}
|
||||
|
||||
/// Manages all file references in one datanode.
|
||||
/// It keeps track of which files are referenced and group by table ids.
|
||||
/// And periodically update the references to tmp file in object storage.
|
||||
/// This is useful for ensuring that files are not deleted while they are still in use by any
|
||||
/// query.
|
||||
#[derive(Debug)]
|
||||
@@ -56,33 +41,24 @@ pub struct FileReferenceManager {
|
||||
/// Datanode id. used to determine tmp ref file name.
|
||||
node_id: Option<u64>,
|
||||
/// TODO(discord9): use no hash hasher since table id is sequential.
|
||||
files_per_table: DashMap<TableId, TableFileRefs>,
|
||||
files_per_region: DashMap<RegionId, RegionFileRefs>,
|
||||
}
|
||||
|
||||
pub type FileReferenceManagerRef = Arc<FileReferenceManager>;
|
||||
|
||||
/// The tmp file uploaded to object storage to record one table's file references.
|
||||
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct TableFileRefsManifest {
|
||||
pub file_refs: HashSet<FileRef>,
|
||||
/// Manifest version when this manifest is read for it's files
|
||||
pub manifest_version: HashMap<RegionId, ManifestVersion>,
|
||||
}
|
||||
|
||||
impl FileReferenceManager {
|
||||
pub fn new(node_id: Option<u64>) -> Self {
|
||||
Self {
|
||||
node_id,
|
||||
files_per_table: Default::default(),
|
||||
files_per_region: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
fn ref_file_set(&self, table_id: TableId) -> Option<HashSet<FileRef>> {
|
||||
let file_refs = if let Some(file_refs) = self.files_per_table.get(&table_id) {
|
||||
fn ref_file_set(&self, region_id: RegionId) -> Option<HashSet<FileRef>> {
|
||||
let file_refs = if let Some(file_refs) = self.files_per_region.get(®ion_id) {
|
||||
file_refs.clone()
|
||||
} else {
|
||||
// still return an empty manifest to indicate no files are referenced.
|
||||
// and differentiate from error case where table_id not found.
|
||||
// region id not found.
|
||||
return None;
|
||||
};
|
||||
|
||||
@@ -95,8 +71,8 @@ impl FileReferenceManager {
|
||||
let ref_file_set: HashSet<FileRef> = file_refs.files.keys().cloned().collect();
|
||||
|
||||
debug!(
|
||||
"Get file refs for table {}, node {:?}, {} files",
|
||||
table_id,
|
||||
"Get file refs for region {}, node {:?}, {} files",
|
||||
region_id,
|
||||
self.node_id,
|
||||
ref_file_set.len(),
|
||||
);
|
||||
@@ -120,22 +96,19 @@ impl FileReferenceManager {
|
||||
#[allow(unused)]
|
||||
pub(crate) async fn get_snapshot_of_unmanifested_refs(
|
||||
&self,
|
||||
table_id: TableId,
|
||||
region_map: &RegionMapRef,
|
||||
) -> Result<TableFileRefsManifest> {
|
||||
let Some(ref_files) = self.ref_file_set(table_id) else {
|
||||
return Ok(Default::default());
|
||||
};
|
||||
let region_list = region_map.list_regions();
|
||||
let table_regions = region_list
|
||||
.iter()
|
||||
.filter(|r| r.region_id().table_id() == table_id)
|
||||
.collect::<Vec<_>>();
|
||||
regions: Vec<MitoRegionRef>,
|
||||
) -> Result<FileRefsManifest> {
|
||||
let mut ref_files = HashMap::new();
|
||||
for region_id in regions.iter().map(|r| r.region_id()) {
|
||||
if let Some(files) = self.ref_file_set(region_id) {
|
||||
ref_files.insert(region_id, files);
|
||||
}
|
||||
}
|
||||
|
||||
let mut in_manifest_files = HashSet::new();
|
||||
let mut manifest_version = HashMap::new();
|
||||
|
||||
for r in &table_regions {
|
||||
for r in ®ions {
|
||||
let manifest = r.manifest_ctx.manifest().await;
|
||||
let files = manifest.files.keys().cloned().collect::<Vec<_>>();
|
||||
in_manifest_files.extend(files);
|
||||
@@ -144,11 +117,18 @@ impl FileReferenceManager {
|
||||
|
||||
let ref_files_excluding_in_manifest = ref_files
|
||||
.iter()
|
||||
.filter(|f| !in_manifest_files.contains(&f.file_id))
|
||||
.cloned()
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
Ok(TableFileRefsManifest {
|
||||
.map(|(r, f)| {
|
||||
(
|
||||
*r,
|
||||
f.iter()
|
||||
.filter_map(|f| {
|
||||
(!in_manifest_files.contains(&f.file_id)).then_some(f.file_id)
|
||||
})
|
||||
.collect::<HashSet<_>>(),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
Ok(FileRefsManifest {
|
||||
file_refs: ref_files_excluding_in_manifest,
|
||||
manifest_version,
|
||||
})
|
||||
@@ -158,12 +138,12 @@ impl FileReferenceManager {
|
||||
/// Also records the access layer for the table if not exists.
|
||||
/// The access layer will be used to upload ref file to object storage.
|
||||
pub fn add_file(&self, file_meta: &FileMeta) {
|
||||
let table_id = file_meta.region_id.table_id();
|
||||
let region_id = file_meta.region_id;
|
||||
let mut is_new = false;
|
||||
{
|
||||
let file_ref = FileRef::new(file_meta.region_id, file_meta.file_id);
|
||||
self.files_per_table
|
||||
.entry(table_id)
|
||||
self.files_per_region
|
||||
.entry(region_id)
|
||||
.and_modify(|refs| {
|
||||
refs.files
|
||||
.entry(file_ref.clone())
|
||||
@@ -173,7 +153,7 @@ impl FileReferenceManager {
|
||||
1
|
||||
});
|
||||
})
|
||||
.or_insert_with(|| TableFileRefs {
|
||||
.or_insert_with(|| RegionFileRefs {
|
||||
files: HashMap::from_iter([(file_ref, 1)]),
|
||||
});
|
||||
}
|
||||
@@ -185,14 +165,14 @@ impl FileReferenceManager {
|
||||
/// Removes a file reference.
|
||||
/// If the reference count reaches zero, the file reference will be removed from the manager.
|
||||
pub fn remove_file(&self, file_meta: &FileMeta) {
|
||||
let table_id = file_meta.region_id.table_id();
|
||||
let region_id = file_meta.region_id;
|
||||
let file_ref = FileRef::new(file_meta.region_id, file_meta.file_id);
|
||||
|
||||
let mut remove_table_entry = false;
|
||||
let mut remove_file_ref = false;
|
||||
let mut file_cnt = 0;
|
||||
|
||||
let table_ref = self.files_per_table.entry(table_id).and_modify(|refs| {
|
||||
let region_ref = self.files_per_region.entry(region_id).and_modify(|refs| {
|
||||
let entry = refs.files.entry(file_ref.clone()).and_modify(|count| {
|
||||
if *count > 0 {
|
||||
*count -= 1;
|
||||
@@ -214,7 +194,7 @@ impl FileReferenceManager {
|
||||
}
|
||||
});
|
||||
|
||||
if let Entry::Occupied(o) = table_ref
|
||||
if let Entry::Occupied(o) = region_ref
|
||||
&& remove_table_entry
|
||||
{
|
||||
o.remove_entry();
|
||||
@@ -234,7 +214,7 @@ mod tests {
|
||||
use std::num::NonZeroU64;
|
||||
|
||||
use smallvec::SmallVec;
|
||||
use store_api::storage::RegionId;
|
||||
use store_api::storage::{FileId, RegionId};
|
||||
|
||||
use super::*;
|
||||
use crate::sst::file::{FileMeta, FileTimeRange, IndexType, RegionFileId};
|
||||
@@ -265,54 +245,69 @@ mod tests {
|
||||
file_ref_mgr.add_file(&file_meta);
|
||||
|
||||
assert_eq!(
|
||||
file_ref_mgr.files_per_table.get(&0).unwrap().files,
|
||||
file_ref_mgr
|
||||
.files_per_region
|
||||
.get(&file_meta.region_id)
|
||||
.unwrap()
|
||||
.files,
|
||||
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)])
|
||||
);
|
||||
|
||||
file_ref_mgr.add_file(&file_meta);
|
||||
|
||||
let expected_table_ref_manifest =
|
||||
let expected_region_ref_manifest =
|
||||
HashSet::from_iter([FileRef::new(file_meta.region_id, file_meta.file_id)]);
|
||||
|
||||
assert_eq!(
|
||||
file_ref_mgr.ref_file_set(0).unwrap(),
|
||||
expected_table_ref_manifest
|
||||
file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(),
|
||||
expected_region_ref_manifest
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
file_ref_mgr.files_per_table.get(&0).unwrap().files,
|
||||
file_ref_mgr
|
||||
.files_per_region
|
||||
.get(&file_meta.region_id)
|
||||
.unwrap()
|
||||
.files,
|
||||
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 2)])
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
file_ref_mgr.ref_file_set(0).unwrap(),
|
||||
expected_table_ref_manifest
|
||||
file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(),
|
||||
expected_region_ref_manifest
|
||||
);
|
||||
|
||||
file_ref_mgr.remove_file(&file_meta);
|
||||
|
||||
assert_eq!(
|
||||
file_ref_mgr.files_per_table.get(&0).unwrap().files,
|
||||
file_ref_mgr
|
||||
.files_per_region
|
||||
.get(&file_meta.region_id)
|
||||
.unwrap()
|
||||
.files,
|
||||
HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)])
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
file_ref_mgr.ref_file_set(0).unwrap(),
|
||||
expected_table_ref_manifest
|
||||
file_ref_mgr.ref_file_set(file_meta.region_id).unwrap(),
|
||||
expected_region_ref_manifest
|
||||
);
|
||||
|
||||
file_ref_mgr.remove_file(&file_meta);
|
||||
|
||||
assert!(
|
||||
file_ref_mgr.files_per_table.get(&0).is_none(),
|
||||
file_ref_mgr
|
||||
.files_per_region
|
||||
.get(&file_meta.region_id)
|
||||
.is_none(),
|
||||
"{:?}",
|
||||
file_ref_mgr.files_per_table
|
||||
file_ref_mgr.files_per_region
|
||||
);
|
||||
|
||||
assert!(
|
||||
file_ref_mgr.ref_file_set(0).is_none(),
|
||||
file_ref_mgr.ref_file_set(file_meta.region_id).is_none(),
|
||||
"{:?}",
|
||||
file_ref_mgr.files_per_table
|
||||
file_ref_mgr.files_per_region
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -58,6 +58,7 @@ use crate::compaction::CompactionScheduler;
|
||||
use crate::config::MitoConfig;
|
||||
use crate::error::{self, CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu};
|
||||
use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
|
||||
use crate::gc::{GcLimiter, GcLimiterRef};
|
||||
use crate::memtable::MemtableBuilderProvider;
|
||||
use crate::metrics::{REGION_COUNT, REQUEST_WAIT_TIME, WRITE_STALLING};
|
||||
use crate::region::opener::PartitionExprFetcherRef;
|
||||
@@ -138,6 +139,8 @@ pub(crate) struct WorkerGroup {
|
||||
cache_manager: CacheManagerRef,
|
||||
/// File reference manager.
|
||||
file_ref_manager: FileReferenceManagerRef,
|
||||
/// Gc limiter to limit concurrent gc jobs.
|
||||
gc_limiter: GcLimiterRef,
|
||||
}
|
||||
|
||||
impl WorkerGroup {
|
||||
@@ -196,6 +199,7 @@ impl WorkerGroup {
|
||||
.build(),
|
||||
);
|
||||
let time_provider = Arc::new(StdTimeProvider);
|
||||
let gc_limiter = Arc::new(GcLimiter::new(config.gc.max_concurrent_gc_job));
|
||||
|
||||
let workers = (0..config.num_workers)
|
||||
.map(|id| {
|
||||
@@ -234,6 +238,7 @@ impl WorkerGroup {
|
||||
purge_scheduler,
|
||||
cache_manager,
|
||||
file_ref_manager,
|
||||
gc_limiter,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -291,6 +296,10 @@ impl WorkerGroup {
|
||||
self.file_ref_manager.clone()
|
||||
}
|
||||
|
||||
pub(crate) fn gc_limiter(&self) -> GcLimiterRef {
|
||||
self.gc_limiter.clone()
|
||||
}
|
||||
|
||||
/// Get worker for specific `region_id`.
|
||||
pub(crate) fn worker(&self, region_id: RegionId) -> &RegionWorker {
|
||||
let index = region_id_to_index(region_id, self.workers.len());
|
||||
@@ -361,6 +370,7 @@ impl WorkerGroup {
|
||||
.write_cache(write_cache)
|
||||
.build(),
|
||||
);
|
||||
let gc_limiter = Arc::new(GcLimiter::new(config.gc.max_concurrent_gc_job));
|
||||
let workers = (0..config.num_workers)
|
||||
.map(|id| {
|
||||
WorkerStarter {
|
||||
@@ -398,6 +408,7 @@ impl WorkerGroup {
|
||||
purge_scheduler,
|
||||
cache_manager,
|
||||
file_ref_manager,
|
||||
gc_limiter,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -26,6 +26,6 @@ pub use datatypes::schema::{
|
||||
};
|
||||
|
||||
pub use self::descriptors::*;
|
||||
pub use self::file::{FileId, ParseIdError};
|
||||
pub use self::file::{FileId, FileRef, FileRefsManifest, GcReport, ParseIdError};
|
||||
pub use self::requests::{ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
|
||||
pub use self::types::{SequenceNumber, SequenceRange};
|
||||
|
||||
@@ -48,7 +48,24 @@ pub const MAX_REGION_SEQ: u32 = REGION_SEQ_MASK;
|
||||
/// Region Number(32)
|
||||
/// ```
|
||||
#[derive(Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
|
||||
pub struct RegionId(u64);
|
||||
pub struct RegionId(#[serde(deserialize_with = "str_or_u64")] u64);
|
||||
|
||||
/// FIXME(discord9): workaround for serde issue: https://github.com/serde-rs/json/issues/1254
|
||||
fn str_or_u64<'de, D>(deserializer: D) -> Result<u64, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
#[derive(Deserialize)]
|
||||
#[serde(untagged)]
|
||||
enum StrOrU64 {
|
||||
U64(u64),
|
||||
Str(String),
|
||||
}
|
||||
match StrOrU64::deserialize(deserializer)? {
|
||||
StrOrU64::U64(v) => Ok(v),
|
||||
StrOrU64::Str(s) => s.parse::<u64>().map_err(serde::de::Error::custom),
|
||||
}
|
||||
}
|
||||
|
||||
impl RegionId {
|
||||
/// Construct a new [RegionId] from table id and region number.
|
||||
@@ -328,6 +345,13 @@ mod tests {
|
||||
assert_eq!(region_id, parsed);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_region_id_from_str() {
|
||||
let region_id_str = "\"8589934602\"";
|
||||
let region_id: RegionId = serde_json::from_str(region_id_str).unwrap();
|
||||
assert_eq!(RegionId::new(2, 10), region_id);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_retrieve_region_group_and_seq() {
|
||||
let region_id = RegionId::with_group_and_seq(111, 222, 333);
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::fmt;
|
||||
use std::fmt::Debug;
|
||||
use std::str::FromStr;
|
||||
@@ -20,6 +21,9 @@ use serde::{Deserialize, Serialize};
|
||||
use snafu::{ResultExt, Snafu};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::ManifestVersion;
|
||||
use crate::storage::RegionId;
|
||||
|
||||
#[derive(Debug, Snafu, PartialEq)]
|
||||
pub struct ParseIdError {
|
||||
source: uuid::Error,
|
||||
@@ -66,6 +70,60 @@ impl FromStr for FileId {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||
pub struct FileRef {
|
||||
pub region_id: RegionId,
|
||||
pub file_id: FileId,
|
||||
}
|
||||
|
||||
impl FileRef {
|
||||
pub fn new(region_id: RegionId, file_id: FileId) -> Self {
|
||||
Self { region_id, file_id }
|
||||
}
|
||||
}
|
||||
|
||||
/// The tmp file manifest which record a table's file references.
|
||||
/// Also record the manifest version when these tmp files are read.
|
||||
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct FileRefsManifest {
|
||||
pub file_refs: HashMap<RegionId, HashSet<FileId>>,
|
||||
/// Manifest version when this manifest is read for it's files
|
||||
pub manifest_version: HashMap<RegionId, ManifestVersion>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Default, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct GcReport {
|
||||
/// deleted files per region
|
||||
pub deleted_files: HashMap<RegionId, Vec<FileId>>,
|
||||
/// Regions that need retry in next gc round, usually because their tmp ref files are outdated
|
||||
pub need_retry_regions: HashSet<RegionId>,
|
||||
}
|
||||
|
||||
impl GcReport {
|
||||
pub fn new(
|
||||
deleted_files: HashMap<RegionId, Vec<FileId>>,
|
||||
need_retry_regions: HashSet<RegionId>,
|
||||
) -> Self {
|
||||
Self {
|
||||
deleted_files,
|
||||
need_retry_regions,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn merge(&mut self, other: GcReport) {
|
||||
for (region, files) in other.deleted_files {
|
||||
let self_files = self.deleted_files.entry(region).or_default();
|
||||
let dedup: HashSet<FileId> = HashSet::from_iter(
|
||||
std::mem::take(self_files)
|
||||
.into_iter()
|
||||
.chain(files.iter().cloned()),
|
||||
);
|
||||
*self_files = dedup.into_iter().collect();
|
||||
}
|
||||
self.need_retry_regions.extend(other.need_retry_regions);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
@@ -92,4 +150,19 @@ mod tests {
|
||||
let parsed = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(id, parsed);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_file_refs_manifest_serialization() {
|
||||
let mut manifest = FileRefsManifest::default();
|
||||
let r0 = RegionId::new(1024, 1);
|
||||
let r1 = RegionId::new(1024, 2);
|
||||
manifest.file_refs.insert(r0, [FileId::random()].into());
|
||||
manifest.file_refs.insert(r1, [FileId::random()].into());
|
||||
manifest.manifest_version.insert(r0, 10);
|
||||
manifest.manifest_version.insert(r1, 20);
|
||||
|
||||
let json = serde_json::to_string(&manifest).unwrap();
|
||||
let parsed: FileRefsManifest = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(manifest, parsed);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1534,6 +1534,13 @@ mem_threshold_on_create = "auto"
|
||||
[region_engine.mito.memtable]
|
||||
type = "time_series"
|
||||
|
||||
[region_engine.mito.gc]
|
||||
enable = false
|
||||
lingering_time = "5m"
|
||||
unknown_file_lingering_time = "6h"
|
||||
max_concurrent_lister_per_gc_job = 32
|
||||
max_concurrent_gc_job = 4
|
||||
|
||||
[[region_engine]]
|
||||
|
||||
[region_engine.file]
|
||||
|
||||
Reference in New Issue
Block a user