feat: gc worker on dropped region (#7537)

* feat: allow clean up for dropped region

Signed-off-by: discord9 <discord9@163.com>

* clippy

Signed-off-by: discord9 <discord9@163.com>

* pcr

Signed-off-by: discord9 <discord9@163.com>

* fix: get access layer correct

Signed-off-by: discord9 <discord9@163.com>

* chore: invalid gc args

Signed-off-by: discord9 <discord9@163.com>

* chore: fix test

Signed-off-by: discord9 <discord9@163.com>

* feat: more defend check

Signed-off-by: discord9 <discord9@163.com>

* per review

Signed-off-by: discord9 <discord9@163.com>

* feat: messy impl of drop region

Signed-off-by: discord9 <discord9@163.com>

* feat: add dropped region GC handling module and integrate with GcScheduler

Signed-off-by: discord9 <discord9@163.com>

* refactor: simplify access layer creation

Signed-off-by: discord9 <discord9@163.com>

* c

Signed-off-by: discord9 <discord9@163.com>

* fix: path type

Signed-off-by: discord9 <discord9@163.com>

* feat: gc handle drop

Signed-off-by: discord9 <discord9@163.com>

* chore: use proper const

Signed-off-by: discord9 <discord9@163.com>

* fix: recursive list when check empty dir

Signed-off-by: discord9 <discord9@163.com>

* per review

Signed-off-by: discord9 <discord9@163.com>

* refactor: with gc only delete if metadata region

Signed-off-by: discord9 <discord9@163.com>

* feat: add batch_get_table_route method to SchedulerCtx and MockSchedulerCtx

Signed-off-by: discord9 <discord9@163.com>

* chore: comment

Signed-off-by: discord9 <discord9@163.com>

* refactor: retry delete method

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-01-20 19:45:37 +08:00
committed by GitHub
parent 25687bb282
commit 67e51b4573
36 changed files with 1277 additions and 245 deletions

1
Cargo.lock generated
View File

@@ -4280,6 +4280,7 @@ dependencies = [
"cache",
"client",
"common-base",
"common-catalog",
"common-config",
"common-error",
"common-function",

View File

@@ -15,6 +15,7 @@
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::fmt::Display;
use futures::TryStreamExt;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt as _, ResultExt, ensure};
use store_api::storage::RegionId;
@@ -28,7 +29,9 @@ use crate::key::{
};
use crate::kv_backend::KvBackendRef;
use crate::kv_backend::txn::Txn;
use crate::rpc::store::BatchGetRequest;
use crate::range_stream::{DEFAULT_PAGE_SIZE, PaginationStream};
use crate::rpc::KeyValue;
use crate::rpc::store::{BatchGetRequest, RangeRequest};
/// The key stores table repartition metadata.
/// Specifically, it records the relation between source and destination regions after a repartition operation is completed.
@@ -138,6 +141,13 @@ impl MetadataValue for TableRepartValue {
pub type TableRepartValueDecodeResult =
Result<Option<DeserializedValueWithBytes<TableRepartValue>>>;
/// Decodes `KeyValue` to [TableRepartKey] and [TableRepartValue].
pub fn table_repart_decoder(kv: KeyValue) -> Result<(TableRepartKey, TableRepartValue)> {
let key = TableRepartKey::from_bytes(&kv.key)?;
let value = TableRepartValue::try_from_raw_value(&kv.value)?;
Ok((key, value))
}
pub struct TableRepartManager {
kv_backend: KvBackendRef,
}
@@ -147,6 +157,25 @@ impl TableRepartManager {
Self { kv_backend }
}
/// Returns all table repartition entries.
pub async fn table_reparts(&self) -> Result<Vec<(TableId, TableRepartValue)>> {
let prefix = TableRepartKey::range_prefix();
let req = RangeRequest::new().with_prefix(prefix);
let stream = PaginationStream::new(
self.kv_backend.clone(),
req,
DEFAULT_PAGE_SIZE,
table_repart_decoder,
)
.into_stream();
let res = stream.try_collect::<Vec<_>>().await?;
Ok(res
.into_iter()
.map(|(key, value)| (key.table_id, value))
.collect())
}
/// Builds a create table repart transaction,
/// it expected the `__table_repart/{table_id}` wasn't occupied.
pub fn build_create_txn(

View File

@@ -18,6 +18,7 @@ async-trait.workspace = true
bytes.workspace = true
client.workspace = true
common-base.workspace = true
common-catalog.workspace = true
common-config.workspace = true
common-error.workspace = true
common-function.workspace = true

View File

@@ -244,7 +244,19 @@ impl DatanodeBuilder {
table_id_schema_cache,
schema_cache,
));
let file_ref_manager = Arc::new(FileReferenceManager::new(Some(node_id)));
let gc_enabled = self.opts.region_engine.iter().any(|engine| {
if let RegionEngineConfig::Mito(config) = engine {
config.gc.enable
} else {
false
}
});
let file_ref_manager = Arc::new(FileReferenceManager::with_gc_enabled(
Some(node_id),
gc_enabled,
));
let region_server = self
.new_region_server(
schema_metadata_manager,
@@ -331,6 +343,7 @@ impl DatanodeBuilder {
&self.opts,
region_server.clone(),
meta_client,
self.kv_backend.clone(),
cache_invalidator,
self.plugins.clone(),
stat,

View File

@@ -332,13 +332,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Invalid arguments for GC: {}", msg))]
InvalidGcArgs {
msg: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to list SST entries from storage"))]
ListStorageSsts {
#[snafu(implicit)]
@@ -455,11 +448,9 @@ impl ErrorExt for Error {
AsyncTaskExecute { source, .. } => source.status_code(),
CreateDir { .. }
| RemoveDir { .. }
| ShutdownInstance { .. }
| DataFusion { .. }
| InvalidGcArgs { .. } => StatusCode::Internal,
CreateDir { .. } | RemoveDir { .. } | ShutdownInstance { .. } | DataFusion { .. } => {
StatusCode::Internal
}
RegionNotFound { .. } => StatusCode::RegionNotFound,
RegionNotReady { .. } => StatusCode::RegionNotReady,

View File

@@ -31,6 +31,7 @@ use common_meta::heartbeat::handler::{
};
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef};
use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
use common_meta::kv_backend::KvBackendRef;
use common_stat::ResourceStatRef;
use common_telemetry::{debug, error, info, trace, warn};
use common_workload::DatanodeWorkloadType;
@@ -79,6 +80,7 @@ impl HeartbeatTask {
opts: &DatanodeOptions,
region_server: RegionServer,
meta_client: MetaClientRef,
kv_backend: KvBackendRef,
cache_invalidator: CacheInvalidatorRef,
plugins: Plugins,
resource_stat: ResourceStatRef,
@@ -94,7 +96,7 @@ impl HeartbeatTask {
Arc::new(ParseMailboxMessageHandler),
Arc::new(SuspendHandler::new(region_server.suspend_state())),
Arc::new(
RegionHeartbeatResponseHandler::new(region_server.clone())
RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend)
.with_open_region_parallelism(opts.init_regions_parallelism),
),
Arc::new(InvalidateCacheHandler::new(cache_invalidator)),

View File

@@ -18,6 +18,7 @@ use common_meta::heartbeat::handler::{
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
};
use common_meta::instruction::{Instruction, InstructionReply};
use common_meta::kv_backend::KvBackendRef;
use common_telemetry::error;
use snafu::OptionExt;
use store_api::storage::GcReport;
@@ -56,6 +57,7 @@ pub struct RegionHeartbeatResponseHandler {
flush_tasks: TaskTracker<()>,
open_region_parallelism: usize,
gc_tasks: TaskTracker<GcReport>,
kv_backend: KvBackendRef,
}
#[async_trait::async_trait]
@@ -70,27 +72,29 @@ pub trait InstructionHandler: Send + Sync {
#[derive(Clone)]
pub struct HandlerContext {
region_server: RegionServer,
downgrade_tasks: TaskTracker<()>,
flush_tasks: TaskTracker<()>,
gc_tasks: TaskTracker<GcReport>,
pub region_server: RegionServer,
pub downgrade_tasks: TaskTracker<()>,
pub flush_tasks: TaskTracker<()>,
pub gc_tasks: TaskTracker<GcReport>,
pub kv_backend: KvBackendRef,
}
impl HandlerContext {
#[cfg(test)]
pub fn new_for_test(region_server: RegionServer) -> Self {
pub fn new_for_test(region_server: RegionServer, kv_backend: KvBackendRef) -> Self {
Self {
region_server,
downgrade_tasks: TaskTracker::new(),
flush_tasks: TaskTracker::new(),
gc_tasks: TaskTracker::new(),
kv_backend,
}
}
}
impl RegionHeartbeatResponseHandler {
/// Returns the [RegionHeartbeatResponseHandler].
pub fn new(region_server: RegionServer) -> Self {
pub fn new(region_server: RegionServer, kv_backend: KvBackendRef) -> Self {
Self {
region_server,
downgrade_tasks: TaskTracker::new(),
@@ -98,6 +102,7 @@ impl RegionHeartbeatResponseHandler {
// Default to half of the number of CPUs.
open_region_parallelism: (num_cpus::get() / 2).max(1),
gc_tasks: TaskTracker::new(),
kv_backend,
}
}
@@ -254,6 +259,7 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
downgrade_tasks: self.downgrade_tasks.clone(),
flush_tasks: self.flush_tasks.clone(),
gc_tasks: self.gc_tasks.clone(),
kv_backend: self.kv_backend.clone(),
};
let _handle = common_runtime::spawn_global(async move {
let reply = handler.handle(&context, instruction).await;
@@ -285,6 +291,7 @@ mod tests {
use common_meta::instruction::{
DowngradeRegion, EnterStagingRegion, OpenRegion, UpgradeRegion,
};
use common_meta::kv_backend::memory::MemoryKvBackend;
use mito2::config::MitoConfig;
use mito2::engine::MITO_ENGINE_NAME;
use mito2::test_util::{CreateRequestBuilder, TestEnv};
@@ -330,7 +337,9 @@ mod tests {
fn test_is_acceptable() {
common_telemetry::init_default_ut_logging();
let region_server = mock_region_server();
let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
let kv_backend = Arc::new(MemoryKvBackend::new());
let heartbeat_handler =
RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend);
let heartbeat_env = HeartbeatResponseTestEnv::new();
let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
@@ -409,7 +418,9 @@ mod tests {
common_telemetry::init_default_ut_logging();
let mut region_server = mock_region_server();
let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
let kv_backend = Arc::new(MemoryKvBackend::new());
let heartbeat_handler =
RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend);
let mut engine_env = TestEnv::with_prefix("close-region").await;
let engine = engine_env.create_engine(MitoConfig::default()).await;
@@ -457,7 +468,9 @@ mod tests {
common_telemetry::init_default_ut_logging();
let mut region_server = mock_region_server();
let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
let kv_backend = Arc::new(MemoryKvBackend::new());
let heartbeat_handler =
RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend);
let mut engine_env = TestEnv::with_prefix("open-region").await;
let engine = engine_env.create_engine(MitoConfig::default()).await;
@@ -505,7 +518,9 @@ mod tests {
common_telemetry::init_default_ut_logging();
let mut region_server = mock_region_server();
let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
let kv_backend = Arc::new(MemoryKvBackend::new());
let heartbeat_handler =
RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend);
let mut engine_env = TestEnv::with_prefix("open-not-exists-region").await;
let engine = engine_env.create_engine(MitoConfig::default()).await;
@@ -537,7 +552,9 @@ mod tests {
common_telemetry::init_default_ut_logging();
let mut region_server = mock_region_server();
let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
let kv_backend = Arc::new(MemoryKvBackend::new());
let heartbeat_handler =
RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend);
let mut engine_env = TestEnv::with_prefix("downgrade-region").await;
let engine = engine_env.create_engine(MitoConfig::default()).await;

View File

@@ -117,6 +117,7 @@ mod tests {
use std::sync::Arc;
use common_meta::instruction::RemapManifest;
use common_meta::kv_backend::memory::MemoryKvBackend;
use datatypes::value::Value;
use mito2::config::MitoConfig;
use mito2::engine::MITO_ENGINE_NAME;
@@ -137,7 +138,8 @@ mod tests {
let mut mock_region_server = mock_region_server();
let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
mock_region_server.register_engine(mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let kv_backend = Arc::new(MemoryKvBackend::new());
let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
let region_id = RegionId::new(1024, 1);
let reply = ApplyStagingManifestsHandler
.handle(
@@ -168,7 +170,8 @@ mod tests {
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let kv_backend = Arc::new(MemoryKvBackend::new());
let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
let region_id = RegionId::new(1024, 1);
let reply = ApplyStagingManifestsHandler
.handle(
@@ -231,7 +234,8 @@ mod tests {
region_server.register_engine(Arc::new(engine.clone()));
prepare_region(&region_server).await;
let handler_context = HandlerContext::new_for_test(region_server);
let kv_backend = Arc::new(MemoryKvBackend::new());
let handler_context = HandlerContext::new_for_test(region_server, kv_backend);
let region_id2 = RegionId::new(1024, 2);
let reply = RemapManifestHandler
.handle(

View File

@@ -84,6 +84,7 @@ mod tests {
use common_meta::heartbeat::handler::{HandleControl, HeartbeatResponseHandler};
use common_meta::heartbeat::mailbox::MessageMeta;
use common_meta::instruction::Instruction;
use common_meta::kv_backend::memory::MemoryKvBackend;
use mito2::config::MitoConfig;
use mito2::engine::MITO_ENGINE_NAME;
use mito2::test_util::{CreateRequestBuilder, TestEnv};
@@ -113,7 +114,9 @@ mod tests {
common_telemetry::init_default_ut_logging();
let mut region_server = mock_region_server();
let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
let kv_backend = Arc::new(MemoryKvBackend::new());
let heartbeat_handler =
RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend);
let mut engine_env = TestEnv::with_prefix("close-regions").await;
let engine = engine_env.create_engine(MitoConfig::default()).await;
region_server.register_engine(Arc::new(engine.clone()));

View File

@@ -232,6 +232,7 @@ mod tests {
use common_meta::heartbeat::handler::{HandleControl, HeartbeatResponseHandler};
use common_meta::heartbeat::mailbox::MessageMeta;
use common_meta::instruction::{DowngradeRegion, Instruction};
use common_meta::kv_backend::memory::MemoryKvBackend;
use mito2::config::MitoConfig;
use mito2::engine::MITO_ENGINE_NAME;
use mito2::test_util::{CreateRequestBuilder, TestEnv};
@@ -255,7 +256,8 @@ mod tests {
let mut mock_region_server = mock_region_server();
let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
mock_region_server.register_engine(mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let kv_backend = Arc::new(MemoryKvBackend::new());
let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
let region_id = RegionId::new(1024, 1);
let waits = vec![None, Some(Duration::from_millis(100u64))];
@@ -299,7 +301,8 @@ mod tests {
}))
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let kv_backend = Arc::new(MemoryKvBackend::new());
let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
let waits = vec![None, Some(Duration::from_millis(100u64))];
for flush_timeout in waits {
@@ -335,7 +338,8 @@ mod tests {
}))
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let kv_backend = Arc::new(MemoryKvBackend::new());
let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
let flush_timeout = Duration::from_millis(100);
let reply = DowngradeRegionsHandler
@@ -369,7 +373,8 @@ mod tests {
}))
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let kv_backend = Arc::new(MemoryKvBackend::new());
let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
let waits = vec![
Some(Duration::from_millis(100u64)),
@@ -432,7 +437,8 @@ mod tests {
}))
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let kv_backend = Arc::new(MemoryKvBackend::new());
let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
let waits = vec![
Some(Duration::from_millis(100u64)),
@@ -483,7 +489,8 @@ mod tests {
Some(Box::new(|_| Ok(SetRegionRoleStateResponse::NotFound)));
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let kv_backend = Arc::new(MemoryKvBackend::new());
let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
let reply = DowngradeRegionsHandler
.handle(
&handler_context,
@@ -514,7 +521,8 @@ mod tests {
}));
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let kv_backend = Arc::new(MemoryKvBackend::new());
let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
let reply = DowngradeRegionsHandler
.handle(
&handler_context,
@@ -541,7 +549,9 @@ mod tests {
common_telemetry::init_default_ut_logging();
let mut region_server = mock_region_server();
let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
let kv_backend = Arc::new(MemoryKvBackend::new());
let heartbeat_handler =
RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend);
let mut engine_env = TestEnv::with_prefix("downgrade-regions").await;
let engine = engine_env.create_engine(MitoConfig::default()).await;
region_server.register_engine(Arc::new(engine.clone()));

View File

@@ -107,6 +107,7 @@ mod tests {
use std::sync::Arc;
use common_meta::instruction::EnterStagingRegion;
use common_meta::kv_backend::memory::MemoryKvBackend;
use mito2::config::MitoConfig;
use mito2::engine::MITO_ENGINE_NAME;
use mito2::test_util::{CreateRequestBuilder, TestEnv};
@@ -127,7 +128,8 @@ mod tests {
let mut mock_region_server = mock_region_server();
let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
mock_region_server.register_engine(mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let kv_backend = Arc::new(MemoryKvBackend::new());
let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
let region_id = RegionId::new(1024, 1);
let replies = EnterStagingRegionsHandler
.handle(
@@ -156,7 +158,8 @@ mod tests {
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let kv_backend = Arc::new(MemoryKvBackend::new());
let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
let replies = EnterStagingRegionsHandler
.handle(
&handler_context,
@@ -194,7 +197,8 @@ mod tests {
region_server.register_engine(Arc::new(engine.clone()));
prepare_region(&region_server).await;
let handler_context = HandlerContext::new_for_test(region_server);
let kv_backend = Arc::new(MemoryKvBackend::new());
let handler_context = HandlerContext::new_for_test(region_server, kv_backend);
let replies = EnterStagingRegionsHandler
.handle(
&handler_context,

View File

@@ -177,6 +177,7 @@ mod tests {
use std::sync::{Arc, RwLock};
use common_meta::instruction::{FlushErrorStrategy, FlushRegions};
use common_meta::kv_backend::memory::MemoryKvBackend;
use mito2::engine::MITO_ENGINE_NAME;
use store_api::storage::RegionId;
@@ -201,7 +202,8 @@ mod tests {
});
mock_region_server.register_test_region(*region_id, mock_engine);
}
let handler_context = HandlerContext::new_for_test(mock_region_server);
let kv_backend = Arc::new(MemoryKvBackend::new());
let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
// Async hint mode
let flush_instruction = FlushRegions::async_batch(region_ids.clone());
@@ -238,7 +240,8 @@ mod tests {
}))
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let kv_backend = Arc::new(MemoryKvBackend::new());
let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
let flush_instruction = FlushRegions::sync_single(region_id);
let reply = FlushRegionsHandler
@@ -273,7 +276,8 @@ mod tests {
}))
});
mock_region_server.register_test_region(region_ids[0], mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let kv_backend = Arc::new(MemoryKvBackend::new());
let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
// Sync batch with fail-fast strategy
let flush_instruction =
@@ -304,7 +308,8 @@ mod tests {
}))
});
mock_region_server.register_test_region(region_ids[0], mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let kv_backend = Arc::new(MemoryKvBackend::new());
let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
// Sync batch with try-all strategy
let flush_instruction =

View File

@@ -12,13 +12,24 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::instruction::{GcRegions, GcRegionsReply, InstructionReply};
use common_telemetry::{debug, warn};
use mito2::gc::LocalGcWorker;
use snafu::{OptionExt, ResultExt, ensure};
use store_api::storage::{FileRefsManifest, RegionId};
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use crate::error::{GcMitoEngineSnafu, InvalidGcArgsSnafu, Result, UnexpectedSnafu};
use common_meta::instruction::{GcRegions, GcRegionsReply, InstructionReply};
use common_meta::key::table_info::TableInfoManager;
use common_meta::key::table_route::TableRouteManager;
use common_telemetry::{debug, warn};
use mito2::access_layer::{AccessLayer, AccessLayerRef};
use mito2::engine::MitoEngine;
use mito2::gc::LocalGcWorker;
use mito2::region::MitoRegionRef;
use snafu::{OptionExt, ResultExt};
use store_api::path_utils::table_dir;
use store_api::region_request::PathType;
use store_api::storage::{FileRefsManifest, GcReport, RegionId};
use table::requests::STORAGE_KEY;
use crate::error::{GcMitoEngineSnafu, GetMetadataSnafu, Result, UnexpectedSnafu};
use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
pub struct GcRegionsHandler;
@@ -35,43 +46,80 @@ impl InstructionHandler for GcRegionsHandler {
let region_ids = gc_regions.regions.clone();
debug!("Received gc regions instruction: {:?}", region_ids);
let (region_id, gc_worker) = match self
.create_gc_worker(
ctx,
region_ids,
&gc_regions.file_refs_manifest,
gc_regions.full_file_listing,
)
.await
{
Ok(worker) => worker,
Err(e) => {
return Some(InstructionReply::GcRegions(GcRegionsReply {
result: Err(format!("Failed to create GC worker: {}", e)),
}));
}
};
if region_ids.is_empty() {
return Some(InstructionReply::GcRegions(GcRegionsReply {
result: Ok(GcReport::default()),
}));
}
// Always use the smallest region id on datanode as the target region id for task tracker
let mut sorted_region_ids = gc_regions.regions.clone();
sorted_region_ids.sort_by_key(|r| r.region_number());
let target_region_id = sorted_region_ids[0];
// Group regions by table_id
let mut table_to_regions: HashMap<u32, Vec<RegionId>> = HashMap::new();
for rid in region_ids {
table_to_regions
.entry(rid.table_id())
.or_default()
.push(rid);
}
let file_refs_manifest = gc_regions.file_refs_manifest.clone();
let full_file_listing = gc_regions.full_file_listing;
let ctx_clone = ctx.clone();
let register_result = ctx
.gc_tasks
.try_register(
region_id,
target_region_id,
Box::pin(async move {
debug!("Starting gc worker for region {}", region_id);
let report = gc_worker
.run()
.await
.context(GcMitoEngineSnafu { region_id })?;
debug!("Gc worker for region {} finished", region_id);
Ok(report)
let mut reports = Vec::with_capacity(table_to_regions.len());
for (table_id, regions) in table_to_regions {
debug!(
"Starting gc worker for table {}, regions: {:?}",
table_id, regions
);
let gc_worker = GcRegionsHandler::create_gc_worker(
&ctx_clone,
table_id,
regions,
&file_refs_manifest,
full_file_listing,
)
.await?;
let report = gc_worker.run().await.context(GcMitoEngineSnafu {
region_id: target_region_id,
})?;
debug!(
"Gc worker for table {} finished, report: {:?}",
table_id, report
);
reports.push(report);
}
// Merge reports
let mut merged_report = GcReport::default();
for report in reports {
merged_report
.deleted_files
.extend(report.deleted_files.into_iter());
merged_report
.deleted_indexes
.extend(report.deleted_indexes.into_iter());
}
Ok(merged_report)
}),
)
.await;
if register_result.is_busy() {
warn!("Another gc task is running for the region: {region_id}");
warn!("Another gc task is running for the region: {target_region_id}");
return Some(InstructionReply::GcRegions(GcRegionsReply {
result: Err(format!(
"Another gc task is running for the region: {region_id}"
"Another gc task is running for the region: {target_region_id}"
)),
}));
}
@@ -89,17 +137,15 @@ impl InstructionHandler for GcRegionsHandler {
}
impl GcRegionsHandler {
/// Create a GC worker for the given region IDs.
/// Return the first region ID(after sort by given region id) and the GC worker.
/// Create a GC worker for the given table and region IDs.
async fn create_gc_worker(
&self,
ctx: &HandlerContext,
mut region_ids: Vec<RegionId>,
table_id: u32,
region_ids: Vec<RegionId>,
file_ref_manifest: &FileRefsManifest,
full_file_listing: bool,
) -> Result<(RegionId, LocalGcWorker)> {
// always use the smallest region id on datanode as the target region id
region_ids.sort_by_key(|r| r.region_number());
) -> Result<LocalGcWorker> {
debug_assert!(!region_ids.is_empty(), "region_ids should not be empty");
let mito_engine = ctx
.region_server
@@ -108,50 +154,13 @@ impl GcRegionsHandler {
violated: "MitoEngine not found".to_string(),
})?;
let region_id = *region_ids.first().with_context(|| InvalidGcArgsSnafu {
msg: "No region ids provided".to_string(),
})?;
// also need to ensure all regions are on this datanode
ensure!(
region_ids
.iter()
.all(|rid| mito_engine.find_region(*rid).is_some()),
InvalidGcArgsSnafu {
msg: format!(
"Some regions are not on current datanode:{:?}",
region_ids
.iter()
.filter(|rid| mito_engine.find_region(**rid).is_none())
.collect::<Vec<_>>()
),
}
);
// Find the access layer from one of the regions that exists on this datanode
let access_layer = mito_engine
.find_region(region_id)
.with_context(|| InvalidGcArgsSnafu {
msg: format!(
"None of the regions is on current datanode:{:?}",
region_ids
),
})?
.access_layer();
// if region happen to be dropped before this but after gc scheduler send gc instr,
// need to deal with it properly(it is ok for region to be dropped after GC worker started)
// region not found here can only be drop table/database case, since region migration is prevented by lock in gc procedure
// TODO(discord9): add integration test for this drop case
let mito_regions = region_ids
.iter()
.filter_map(|rid| mito_engine.find_region(*rid).map(|r| (*rid, r)))
.collect();
let (access_layer, mito_regions) =
Self::get_access_layer(ctx, &mito_engine, table_id, &region_ids).await?;
let cache_manager = mito_engine.cache_manager();
let gc_worker = LocalGcWorker::try_new(
access_layer.clone(),
access_layer,
Some(cache_manager),
mito_regions,
mito_engine.mito_config().gc.clone(),
@@ -160,8 +169,197 @@ impl GcRegionsHandler {
full_file_listing,
)
.await
.context(GcMitoEngineSnafu { region_id })?;
.context(GcMitoEngineSnafu {
region_id: region_ids[0],
})?;
Ok((region_id, gc_worker))
Ok(gc_worker)
}
/// Get the access layer for the given table and region IDs.
/// It also returns the mito regions if they are found in the engine.
///
/// This method validates:
/// 1. Any found region must be a Leader (not Follower)
/// 2. Any missing region must not be routed to another datanode
///
/// The AccessLayer is always constructed from table metadata for consistency.
async fn get_access_layer(
ctx: &HandlerContext,
mito_engine: &MitoEngine,
table_id: u32,
region_ids: &[RegionId],
) -> Result<(AccessLayerRef, BTreeMap<RegionId, Option<MitoRegionRef>>)> {
// 1. Collect mito regions and validate Leader status
let mut mito_regions = BTreeMap::new();
for rid in region_ids {
let region = mito_engine.find_region(*rid);
if let Some(ref r) = region {
// Validation: Check if region is a leader
if r.is_follower() {
return Err(UnexpectedSnafu {
violated: format!(
"Region {} is a follower, cannot perform GC on follower regions",
rid
),
}
.build());
}
}
mito_regions.insert(*rid, region);
}
// 2. Validate that missing regions are not routed to other datanodes
let missing_regions: Vec<_> = mito_regions
.iter()
.filter(|(_, r)| r.is_none())
.map(|(rid, _)| *rid)
.collect();
if !missing_regions.is_empty() {
Self::validate_regions_not_routed_elsewhere(ctx, table_id, &missing_regions).await?;
}
// 3. Construct AccessLayer directly from table metadata
let access_layer = Self::construct_access_layer(ctx, mito_engine, table_id).await?;
Ok((access_layer, mito_regions))
}
/// Manually construct an access layer from table metadata.
async fn construct_access_layer(
ctx: &HandlerContext,
mito_engine: &MitoEngine,
table_id: u32,
) -> Result<AccessLayerRef> {
let table_info_manager = TableInfoManager::new(ctx.kv_backend.clone());
let table_info_value = table_info_manager
.get(table_id)
.await
.context(GetMetadataSnafu)?
.with_context(|| UnexpectedSnafu {
violated: format!("Table metadata not found for table {}", table_id),
})?;
let table_dir = table_dir(&table_info_value.region_storage_path(), table_id);
let storage_name = table_info_value
.table_info
.meta
.options
.extra_options
.get(STORAGE_KEY);
let engine = &table_info_value.table_info.meta.engine;
let path_type = match engine.as_str() {
common_catalog::consts::MITO2_ENGINE => PathType::Bare,
common_catalog::consts::MITO_ENGINE => PathType::Bare,
common_catalog::consts::METRIC_ENGINE => PathType::Data,
_ => PathType::Bare,
};
let object_store = if let Some(name) = storage_name {
mito_engine
.object_store_manager()
.find(name)
.cloned()
.with_context(|| UnexpectedSnafu {
violated: format!("Object store {} not found", name),
})?
} else {
mito_engine
.object_store_manager()
.default_object_store()
.clone()
};
Ok(Arc::new(AccessLayer::new(
table_dir,
path_type,
object_store,
mito_engine.puffin_manager_factory().clone(),
mito_engine.intermediate_manager().clone(),
)))
}
/// Validate that the given regions are not routed to other datanodes.
///
/// If any region is still active on another datanode (has a leader_peer in route table),
/// this function returns an error to prevent accidental deletion of files
/// that are still in use.
async fn validate_regions_not_routed_elsewhere(
ctx: &HandlerContext,
table_id: u32,
missing_region_ids: &[RegionId],
) -> Result<()> {
if missing_region_ids.is_empty() {
return Ok(());
}
let table_route_manager = TableRouteManager::new(ctx.kv_backend.clone());
// Get table route
let table_route = match table_route_manager
.table_route_storage()
.get(table_id)
.await
.context(GetMetadataSnafu)?
{
Some(route) => route,
None => {
// Table route not found, all regions are likely deleted
debug!(
"Table route not found for table {}, regions {:?} are considered deleted",
table_id, missing_region_ids
);
return Ok(());
}
};
// Get region routes for physical table
let region_routes = match table_route.region_routes() {
Ok(routes) => routes,
Err(_) => {
// Logical table, skip validation
debug!(
"Table {} is a logical table, skipping region route validation",
table_id
);
return Ok(());
}
};
let region_routes_map: HashMap<RegionId, _> = region_routes
.iter()
.map(|route| (route.region.id, route))
.collect();
// Check each missing region
for region_id in missing_region_ids {
if let Some(route) = region_routes_map.get(region_id) {
if let Some(leader_peer) = &route.leader_peer {
// Region still has a leader on some datanode.
return Err(UnexpectedSnafu {
violated: format!(
"Region {} is not on this datanode but is routed to datanode {}. \
GC request may have been sent to wrong datanode.",
region_id, leader_peer.id
),
}
.build());
}
return Err(UnexpectedSnafu {
violated: format!(
"Region {} has no leader in route table; refusing GC without explicit tombstone/deleted state.",
region_id
),
}
.build());
}
// Region not in route table: treat as deleted and allow GC.
}
Ok(())
}
}

View File

@@ -80,6 +80,7 @@ mod tests {
use common_meta::heartbeat::handler::{HandleControl, HeartbeatResponseHandler};
use common_meta::heartbeat::mailbox::MessageMeta;
use common_meta::instruction::{Instruction, OpenRegion};
use common_meta::kv_backend::memory::MemoryKvBackend;
use mito2::config::MitoConfig;
use mito2::engine::MITO_ENGINE_NAME;
use mito2::test_util::{CreateRequestBuilder, TestEnv};
@@ -119,7 +120,9 @@ mod tests {
common_telemetry::init_default_ut_logging();
let mut region_server = mock_region_server();
let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
let kv_backend = Arc::new(MemoryKvBackend::new());
let heartbeat_handler =
RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend);
let mut engine_env = TestEnv::with_prefix("open-regions").await;
let engine = engine_env.create_engine(MitoConfig::default()).await;
region_server.register_engine(Arc::new(engine.clone()));

View File

@@ -97,6 +97,7 @@ mod tests {
use std::sync::Arc;
use common_meta::instruction::RemapManifest;
use common_meta::kv_backend::memory::MemoryKvBackend;
use datatypes::value::Value;
use mito2::config::MitoConfig;
use mito2::engine::MITO_ENGINE_NAME;
@@ -117,7 +118,8 @@ mod tests {
let mut mock_region_server = mock_region_server();
let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
mock_region_server.register_engine(mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let kv_backend = Arc::new(MemoryKvBackend::new());
let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
let region_id = RegionId::new(1024, 1);
let reply = RemapManifestHandler
.handle(
@@ -147,7 +149,8 @@ mod tests {
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let kv_backend = Arc::new(MemoryKvBackend::new());
let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
let reply = RemapManifestHandler
.handle(
&handler_context,
@@ -207,7 +210,8 @@ mod tests {
region_server.register_engine(Arc::new(engine.clone()));
prepare_region(&region_server).await;
let handler_context = HandlerContext::new_for_test(region_server);
let kv_backend = Arc::new(MemoryKvBackend::new());
let handler_context = HandlerContext::new_for_test(region_server, kv_backend);
let region_id2 = RegionId::new(1024, 2);
let reply = RemapManifestHandler
.handle(

View File

@@ -97,6 +97,9 @@ impl SyncRegionHandler {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_meta::kv_backend::memory::MemoryKvBackend;
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use store_api::region_engine::{RegionRole, SyncRegionFromRequest};
use store_api::storage::RegionId;
@@ -111,7 +114,8 @@ mod tests {
let (mock_engine, _) = MockRegionEngine::new(METRIC_ENGINE_NAME);
mock_region_server.register_engine(mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let kv_backend = Arc::new(MemoryKvBackend::new());
let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
let handler = SyncRegionHandler;
let region_id = RegionId::new(1024, 1);
@@ -141,7 +145,8 @@ mod tests {
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let kv_backend = Arc::new(MemoryKvBackend::new());
let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
let handler = SyncRegionHandler;
let sync_region = common_meta::instruction::SyncRegion {
@@ -171,7 +176,8 @@ mod tests {
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let kv_backend = Arc::new(MemoryKvBackend::new());
let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
let handler = SyncRegionHandler;
let sync_region = common_meta::instruction::SyncRegion {

View File

@@ -220,9 +220,11 @@ impl InstructionHandler for UpgradeRegionsHandler {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;
use common_meta::instruction::UpgradeRegion;
use common_meta::kv_backend::memory::MemoryKvBackend;
use mito2::engine::MITO_ENGINE_NAME;
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;
@@ -237,8 +239,8 @@ mod tests {
let mut mock_region_server = mock_region_server();
let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
mock_region_server.register_engine(mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let kv_backend = Arc::new(MemoryKvBackend::new());
let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
let region_id = RegionId::new(1024, 1);
let region_id2 = RegionId::new(1024, 2);
@@ -286,7 +288,8 @@ mod tests {
});
mock_region_server.register_test_region(region_id, mock_engine.clone());
mock_region_server.register_test_region(region_id2, mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let kv_backend = Arc::new(MemoryKvBackend::new());
let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
let replay_timeout = Duration::from_millis(100u64);
let reply = UpgradeRegionsHandler::new_test()
.handle(
@@ -330,8 +333,8 @@ mod tests {
region_engine.handle_request_delay = Some(Duration::from_secs(100));
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let kv_backend = Arc::new(MemoryKvBackend::new());
let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
let replay_timeout = Duration::from_millis(100u64);
let reply = UpgradeRegionsHandler::new_test()
.handle(
@@ -365,7 +368,8 @@ mod tests {
});
mock_region_server.register_test_region(region_id, mock_engine);
let waits = vec![Duration::from_millis(100u64), Duration::from_millis(100u64)];
let handler_context = HandlerContext::new_for_test(mock_region_server);
let kv_backend = Arc::new(MemoryKvBackend::new());
let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
for replay_timeout in waits {
let reply = UpgradeRegionsHandler::new_test()
.handle(
@@ -420,8 +424,8 @@ mod tests {
region_engine.handle_request_delay = Some(Duration::from_millis(100));
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let kv_backend = Arc::new(MemoryKvBackend::new());
let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
let reply = UpgradeRegionsHandler::new_test()
.handle(
&handler_context,

View File

@@ -92,7 +92,9 @@ impl FrontendBuilder {
options: &FrontendOptions,
meta_client: meta_client::MetaClientRef,
) -> Self {
let kv_backend = Arc::new(common_meta::kv_backend::memory::MemoryKvBackend::new());
use common_meta::kv_backend::memory::MemoryKvBackend;
let kv_backend = Arc::new(MemoryKvBackend::new());
let layered_cache_registry = Arc::new(
common_meta::cache::LayeredCacheRegistryBuilder::default()

View File

@@ -19,6 +19,7 @@ use store_api::storage::RegionId;
mod candidate;
mod ctx;
mod dropped;
mod handler;
#[cfg(test)]
mod mock;
@@ -32,6 +33,8 @@ pub use options::GcSchedulerOptions;
pub use procedure::BatchGcProcedure;
pub(crate) use scheduler::{GcScheduler, GcTickerRef};
/// Mapping from region ID to its associated peers (leader and followers).
pub type Region2Peers = HashMap<RegionId, (Peer, Vec<Peer>)>;
/// Mapping from leader peer to the set of region IDs it is responsible for.
pub(crate) type Peer2Regions = HashMap<Peer, HashSet<RegionId>>;

View File

@@ -17,6 +17,7 @@ use std::time::Duration;
use common_meta::datanode::RegionStat;
use common_meta::key::TableMetadataManagerRef;
use common_meta::key::table_repart::TableRepartValue;
use common_meta::key::table_route::PhysicalTableRouteValue;
use common_procedure::{ProcedureManagerRef, ProcedureWithId, watcher};
use common_telemetry::debug;
@@ -26,6 +27,7 @@ use table::metadata::TableId;
use crate::cluster::MetaPeerClientRef;
use crate::error::{self, Result, TableMetadataManagerSnafu};
use crate::gc::Region2Peers;
use crate::gc::procedure::BatchGcProcedure;
use crate::service::mailbox::MailboxRef;
@@ -33,16 +35,24 @@ use crate::service::mailbox::MailboxRef;
pub(crate) trait SchedulerCtx: Send + Sync {
async fn get_table_to_region_stats(&self) -> Result<HashMap<TableId, Vec<RegionStat>>>;
async fn get_table_reparts(&self) -> Result<Vec<(TableId, TableRepartValue)>>;
async fn get_table_route(
&self,
table_id: TableId,
) -> Result<(TableId, PhysicalTableRouteValue)>;
async fn batch_get_table_route(
&self,
table_ids: &[TableId],
) -> Result<HashMap<TableId, PhysicalTableRouteValue>>;
async fn gc_regions(
&self,
region_ids: &[RegionId],
full_file_listing: bool,
timeout: Duration,
region_routes_override: Region2Peers,
) -> Result<GcReport>;
}
@@ -99,6 +109,14 @@ impl SchedulerCtx for DefaultGcSchedulerCtx {
Ok(table_to_region_stats)
}
async fn get_table_reparts(&self) -> Result<Vec<(TableId, TableRepartValue)>> {
self.table_metadata_manager
.table_repart_manager()
.table_reparts()
.await
.context(TableMetadataManagerSnafu)
}
async fn get_table_route(
&self,
table_id: TableId,
@@ -110,14 +128,31 @@ impl SchedulerCtx for DefaultGcSchedulerCtx {
.context(TableMetadataManagerSnafu)
}
async fn batch_get_table_route(
&self,
table_ids: &[TableId],
) -> Result<HashMap<TableId, PhysicalTableRouteValue>> {
self.table_metadata_manager
.table_route_manager()
.batch_get_physical_table_routes(table_ids)
.await
.context(TableMetadataManagerSnafu)
}
async fn gc_regions(
&self,
region_ids: &[RegionId],
full_file_listing: bool,
timeout: Duration,
region_routes_override: Region2Peers,
) -> Result<GcReport> {
self.gc_regions_inner(region_ids, full_file_listing, timeout)
.await
self.gc_regions_inner(
region_ids,
full_file_listing,
timeout,
region_routes_override,
)
.await
}
}
@@ -127,6 +162,7 @@ impl DefaultGcSchedulerCtx {
region_ids: &[RegionId],
full_file_listing: bool,
timeout: Duration,
region_routes_override: Region2Peers,
) -> Result<GcReport> {
debug!(
"Sending GC instruction for {} regions (full_file_listing: {})",
@@ -141,6 +177,7 @@ impl DefaultGcSchedulerCtx {
region_ids.to_vec(),
full_file_listing,
timeout,
region_routes_override,
);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));

View File

@@ -0,0 +1,288 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Dropped region GC handling.
//!
//! This module handles garbage collection for "dropped regions". A dropped region is:
//! 1. Recorded in `table_repart` metadata (the only place that still remembers it)
//! 2. No longer exists in `table_route` metadata (removed from region routing)
//! 3. No longer present in any datanode's heartbeat (physically closed)
//!
//! This differs from "active regions" which exist in both metadata and heartbeats.
//! The `table_repart` entry serves as a tombstone that tracks which regions were
//! merged/split and need their files cleaned up.
use std::collections::{HashMap, HashSet};
use std::time::Instant;
use common_meta::key::table_repart::TableRepartValue;
use common_meta::peer::Peer;
use common_telemetry::{debug, warn};
use store_api::storage::RegionId;
use table::metadata::TableId;
use crate::error::Result;
use crate::gc::Region2Peers;
use crate::gc::ctx::SchedulerCtx;
use crate::gc::options::GcSchedulerOptions;
use crate::gc::tracker::RegionGcTracker;
/// Information about a dropped region ready for GC.
#[derive(Debug, Clone)]
pub(crate) struct DroppedRegionInfo {
pub region_id: RegionId,
pub table_id: TableId,
/// The destination regions this region was split into (if any).
#[allow(unused)]
pub dst_regions: HashSet<RegionId>,
}
/// Result of collecting and assigning dropped regions to peers.
#[derive(Debug, Default)]
pub(crate) struct DroppedRegionAssignment {
/// Dropped regions grouped by the peer responsible for GC.
pub regions_by_peer: HashMap<Peer, Vec<DroppedRegionInfo>>,
/// Regions that require full file listing (always true for dropped regions).
pub force_full_listing: HashMap<Peer, HashSet<RegionId>>,
/// Override region routes for dropped regions (they have no active route).
pub region_routes_override: HashMap<Peer, Region2Peers>,
}
/// Collector for dropped regions that need GC.
pub(crate) struct DroppedRegionCollector<'a> {
ctx: &'a dyn SchedulerCtx,
config: &'a GcSchedulerOptions,
tracker: &'a tokio::sync::Mutex<RegionGcTracker>,
}
impl<'a> DroppedRegionCollector<'a> {
pub fn new(
ctx: &'a dyn SchedulerCtx,
config: &'a GcSchedulerOptions,
tracker: &'a tokio::sync::Mutex<RegionGcTracker>,
) -> Self {
Self {
ctx,
config,
tracker,
}
}
/// Collect and assign dropped regions for GC.
///
/// This method:
/// 1. Identifies regions in repartition metadata that are no longer active
/// 2. Filters out regions still in cooldown period
/// 3. Assigns each dropped region to an available peer for cleanup
pub async fn collect_and_assign(
&self,
table_reparts: &[(TableId, TableRepartValue)],
) -> Result<DroppedRegionAssignment> {
// get active region ids for all tables involved in repartitioning
let active_region_ids: HashSet<RegionId> = {
let table_ids = table_reparts
.iter()
.map(|(table_id, _)| *table_id)
.collect::<Vec<_>>();
let mut active_region_ids = HashSet::new();
let table_routes = self.ctx.batch_get_table_route(&table_ids).await?;
for table_route in table_routes.values() {
for region in &table_route.region_routes {
active_region_ids.insert(region.region.id);
}
}
active_region_ids
};
let dropped_regions = self.identify_dropped_regions(table_reparts, &active_region_ids);
if dropped_regions.is_empty() {
return Ok(DroppedRegionAssignment::default());
}
let dropped_regions = self.filter_by_cooldown(dropped_regions).await;
if dropped_regions.is_empty() {
return Ok(DroppedRegionAssignment::default());
}
self.assign_to_peers(dropped_regions).await
}
/// Identify dropped regions: regions in `table_repart` but not in table routes.
/// The `assign_to_peers` step later double check they're also absent from `table_route`.
fn identify_dropped_regions(
&self,
table_reparts: &[(TableId, TableRepartValue)],
active_region_ids: &HashSet<RegionId>,
) -> HashMap<TableId, HashMap<RegionId, HashSet<RegionId>>> {
let mut dropped_regions: HashMap<TableId, HashMap<RegionId, HashSet<RegionId>>> =
HashMap::new();
for (table_id, repart) in table_reparts {
if repart.src_to_dst.is_empty() {
continue;
}
let entry = dropped_regions.entry(*table_id).or_default();
for (src_region, dst_regions) in repart.clone().src_to_dst {
if !active_region_ids.contains(&src_region) {
entry.insert(src_region, dst_regions.into_iter().collect());
}
}
}
dropped_regions.retain(|_, regions| !regions.is_empty());
dropped_regions
}
/// Filter out dropped regions that are still in their cooldown period.
async fn filter_by_cooldown(
&self,
dropped_regions: HashMap<TableId, HashMap<RegionId, HashSet<RegionId>>>,
) -> HashMap<TableId, HashMap<RegionId, HashSet<RegionId>>> {
let now = Instant::now();
let tracker = self.tracker.lock().await;
let mut filtered = HashMap::new();
for (table_id, regions) in dropped_regions {
let mut kept = HashMap::new();
for (region_id, dst_regions) in regions {
if let Some(gc_info) = tracker.get(&region_id) {
let elapsed = now.saturating_duration_since(gc_info.last_gc_time);
if elapsed < self.config.gc_cooldown_period {
debug!("Skipping dropped region {} due to cooldown", region_id);
continue;
}
}
kept.insert(region_id, dst_regions);
}
if !kept.is_empty() {
filtered.insert(table_id, kept);
}
}
filtered
}
/// Assign dropped regions to available peers for GC execution.
///
/// For dropped regions, we need to:
/// 1. Find an available peer from the table's current route
/// 2. Use consistent hashing (region_id % peer_count) for load distribution
/// 3. Create route overrides since dropped regions have no active route
async fn assign_to_peers(
&self,
dropped_regions: HashMap<TableId, HashMap<RegionId, HashSet<RegionId>>>,
) -> Result<DroppedRegionAssignment> {
let mut assignment = DroppedRegionAssignment::default();
for (table_id, regions) in dropped_regions {
let (phy_table_id, table_route) = match self.ctx.get_table_route(table_id).await {
Ok(route) => route,
Err(e) => {
warn!(
"Failed to get table route for table {}: {}, skipping dropped regions",
table_id, e
);
continue;
}
};
if phy_table_id != table_id {
continue;
}
let active_region_ids: HashSet<RegionId> = table_route
.region_routes
.iter()
.map(|r| r.region.id)
.collect();
let mut leader_peers: Vec<Peer> = table_route
.region_routes
.iter()
.filter_map(|r| r.leader_peer.clone())
.collect();
leader_peers.sort_by_key(|peer| peer.id);
leader_peers.dedup_by_key(|peer| peer.id);
if leader_peers.is_empty() {
warn!(
"No leader peers found for table {}, skipping dropped regions",
table_id
);
continue;
}
for (region_id, dst_regions) in regions {
if active_region_ids.contains(&region_id) {
debug!(
"Skipping dropped region {} since it still exists in table route",
region_id
);
continue;
}
let selected_idx = (region_id.as_u64() as usize) % leader_peers.len();
let peer = leader_peers[selected_idx].clone();
let info = DroppedRegionInfo {
region_id,
table_id,
dst_regions,
};
assignment
.regions_by_peer
.entry(peer.clone())
.or_default()
.push(info);
assignment
.force_full_listing
.entry(peer.clone())
.or_default()
.insert(region_id);
assignment
.region_routes_override
.entry(peer.clone())
.or_default()
.insert(region_id, (peer, Vec::new()));
}
}
Ok(assignment)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_dropped_region_info() {
let info = DroppedRegionInfo {
region_id: RegionId::new(1, 1),
table_id: 1,
dst_regions: HashSet::new(),
};
assert_eq!(info.region_id, RegionId::new(1, 1));
assert_eq!(info.table_id, 1);
}
}

View File

@@ -15,61 +15,71 @@
use std::collections::{HashMap, HashSet};
use std::time::Instant;
use common_catalog::consts::MITO_ENGINE;
use common_meta::datanode::{RegionManifestInfo, RegionStat};
use common_meta::peer::Peer;
use common_telemetry::{debug, error, info, warn};
use futures::StreamExt;
use itertools::Itertools;
use ordered_float::OrderedFloat;
use store_api::region_engine::RegionRole;
use store_api::storage::{GcReport, RegionId};
use table::metadata::TableId;
use crate::error::Result;
use crate::gc::Region2Peers;
use crate::gc::candidate::GcCandidate;
use crate::gc::dropped::DroppedRegionCollector;
use crate::gc::scheduler::{GcJobReport, GcScheduler};
use crate::gc::tracker::RegionGcInfo;
impl GcScheduler {
/// Iterate through all region stats, find region that might need gc, and send gc instruction to
/// the corresponding datanode with improved parallel processing and retry logic.
pub(crate) async fn trigger_gc(&self) -> Result<GcJobReport> {
let start_time = Instant::now();
info!("Starting GC cycle");
// Step 1: Get all region statistics
// limit gc region scope to regions whose datanode have reported stats(by heartbeat)
let table_to_region_stats = self.ctx.get_table_to_region_stats().await?;
info!(
"Fetched region stats for {} tables",
table_to_region_stats.len()
);
// Step 2: Select GC candidates based on our scoring algorithm
let per_table_candidates = self.select_gc_candidates(&table_to_region_stats).await?;
if per_table_candidates.is_empty() {
let table_reparts = self.ctx.get_table_reparts().await?;
let dropped_collector =
DroppedRegionCollector::new(self.ctx.as_ref(), &self.config, &self.region_gc_tracker);
let dropped_assignment = dropped_collector.collect_and_assign(&table_reparts).await?;
if per_table_candidates.is_empty() && dropped_assignment.regions_by_peer.is_empty() {
info!("No GC candidates found, skipping GC cycle");
return Ok(Default::default());
}
// Step 3: Aggregate candidates by datanode
let datanode_to_candidates = self
let mut datanode_to_candidates = self
.aggregate_candidates_by_datanode(per_table_candidates)
.await?;
// TODO(discord9): add deleted regions from repartition mapping
self.merge_dropped_regions(&mut datanode_to_candidates, &dropped_assignment);
if datanode_to_candidates.is_empty() {
info!("No valid datanode candidates found, skipping GC cycle");
return Ok(Default::default());
}
// Step 4: Process datanodes concurrently with limited parallelism
let report = self
.parallel_process_datanodes(datanode_to_candidates)
.parallel_process_datanodes(
datanode_to_candidates,
dropped_assignment.force_full_listing,
dropped_assignment.region_routes_override,
)
.await;
let duration = start_time.elapsed();
info!(
"Finished GC cycle. Processed {} datanodes ({} failed). Duration: {:?}",
report.per_datanode_reports.len(), // Reuse field for datanode count
report.per_datanode_reports.len(),
report.failed_datanodes.len(),
duration
);
@@ -78,7 +88,19 @@ impl GcScheduler {
Ok(report)
}
/// Aggregate GC candidates by their corresponding datanode peer.
fn merge_dropped_regions(
&self,
datanode_to_candidates: &mut HashMap<Peer, Vec<(TableId, GcCandidate)>>,
assignment: &crate::gc::dropped::DroppedRegionAssignment,
) {
for (peer, dropped_infos) in &assignment.regions_by_peer {
let entry = datanode_to_candidates.entry(peer.clone()).or_default();
for info in dropped_infos {
entry.push((info.table_id, dropped_candidate(info.region_id)));
}
}
}
pub(crate) async fn aggregate_candidates_by_datanode(
&self,
per_table_candidates: HashMap<TableId, Vec<GcCandidate>>,
@@ -134,6 +156,8 @@ impl GcScheduler {
pub(crate) async fn parallel_process_datanodes(
&self,
datanode_to_candidates: HashMap<Peer, Vec<(TableId, GcCandidate)>>,
force_full_listing_by_peer: HashMap<Peer, HashSet<RegionId>>,
region_routes_override_by_peer: HashMap<Peer, Region2Peers>,
) -> GcJobReport {
let mut report = GcJobReport::default();
@@ -146,10 +170,25 @@ impl GcScheduler {
.map(|(peer, candidates)| {
let scheduler = self;
let peer_clone = peer.clone();
let force_full_listing = force_full_listing_by_peer
.get(&peer)
.cloned()
.unwrap_or_default();
let region_routes_override = region_routes_override_by_peer
.get(&peer)
.cloned()
.unwrap_or_default();
async move {
(
peer,
scheduler.process_datanode_gc(peer_clone, candidates).await,
scheduler
.process_datanode_gc(
peer_clone,
candidates,
force_full_listing,
region_routes_override,
)
.await,
)
}
})
@@ -181,6 +220,8 @@ impl GcScheduler {
&self,
peer: Peer,
candidates: Vec<(TableId, GcCandidate)>,
force_full_listing: HashSet<RegionId>,
region_routes_override: Region2Peers,
) -> Result<GcReport> {
info!(
"Starting GC for datanode {} with {} candidate regions",
@@ -198,8 +239,9 @@ impl GcScheduler {
let (gc_report, fully_listed_regions) = {
// Partition regions into full listing and fast listing in a single pass
let batch_full_listing_decisions =
self.batch_should_use_full_listing(&all_region_ids).await;
let batch_full_listing_decisions = self
.batch_should_use_full_listing(&all_region_ids, &force_full_listing)
.await;
let need_full_list_regions = batch_full_listing_decisions
.iter()
@@ -224,7 +266,12 @@ impl GcScheduler {
if !fast_list_regions.is_empty() {
match self
.ctx
.gc_regions(&fast_list_regions, false, self.config.mailbox_timeout)
.gc_regions(
&fast_list_regions,
false,
self.config.mailbox_timeout,
region_routes_override.clone(),
)
.await
{
Ok(report) => combined_report.merge(report),
@@ -245,7 +292,12 @@ impl GcScheduler {
if !need_full_list_regions.is_empty() {
match self
.ctx
.gc_regions(&need_full_list_regions, true, self.config.mailbox_timeout)
.gc_regions(
&need_full_list_regions,
true,
self.config.mailbox_timeout,
region_routes_override,
)
.await
{
Ok(report) => combined_report.merge(report),
@@ -288,11 +340,26 @@ impl GcScheduler {
async fn batch_should_use_full_listing(
&self,
region_ids: &[RegionId],
force_full_listing: &HashSet<RegionId>,
) -> HashMap<RegionId, bool> {
let mut result = HashMap::new();
let mut gc_tracker = self.region_gc_tracker.lock().await;
let now = Instant::now();
for &region_id in region_ids {
if force_full_listing.contains(&region_id) {
gc_tracker
.entry(region_id)
.and_modify(|info| {
info.last_full_listing_time = Some(now);
info.last_gc_time = now;
})
.or_insert_with(|| RegionGcInfo {
last_gc_time: now,
last_full_listing_time: Some(now),
});
result.insert(region_id, true);
continue;
}
let use_full_listing = {
if let Some(gc_info) = gc_tracker.get(&region_id) {
if let Some(last_full_listing) = gc_info.last_full_listing_time {
@@ -320,3 +387,36 @@ impl GcScheduler {
result
}
}
fn dropped_candidate(region_id: RegionId) -> GcCandidate {
GcCandidate {
region_id,
score: OrderedFloat(0.0),
region_stat: dropped_region_stat(region_id),
}
}
fn dropped_region_stat(region_id: RegionId) -> RegionStat {
RegionStat {
id: region_id,
rcus: 0,
wcus: 0,
approximate_bytes: 0,
engine: MITO_ENGINE.to_string(),
role: RegionRole::Leader,
num_rows: 0,
memtable_size: 0,
manifest_size: 0,
sst_size: 0,
sst_num: 0,
index_size: 0,
region_manifest: RegionManifestInfo::Mito {
manifest_version: 0,
flushed_entry_id: 0,
file_removed_cnt: 0,
},
written_bytes: 0,
data_topic_latest_entry_id: 0,
metadata_topic_latest_entry_id: 0,
}
}

View File

@@ -25,7 +25,9 @@ use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use common_catalog::consts::MITO_ENGINE;
use common_meta::datanode::{RegionManifestInfo, RegionStat};
use common_meta::key::table_repart::TableRepartValue;
use common_meta::key::table_route::PhysicalTableRouteValue;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
@@ -37,6 +39,7 @@ use table::metadata::TableId;
use tokio::sync::mpsc::Sender;
use crate::error::Result;
use crate::gc::Region2Peers;
use crate::gc::candidate::GcCandidate;
use crate::gc::ctx::SchedulerCtx;
use crate::gc::options::GcSchedulerOptions;
@@ -61,6 +64,7 @@ pub fn new_empty_report_with(region_ids: impl IntoIterator<Item = RegionId>) ->
#[derive(Debug, Default)]
pub struct MockSchedulerCtx {
pub table_to_region_stats: Arc<Mutex<Option<HashMap<TableId, Vec<RegionStat>>>>>,
pub table_reparts: Arc<Mutex<HashMap<TableId, TableRepartValue>>>,
pub table_routes: Arc<Mutex<HashMap<TableId, (TableId, PhysicalTableRouteValue)>>>,
pub file_refs: Arc<Mutex<Option<FileRefsManifest>>>,
pub gc_reports: Arc<Mutex<HashMap<RegionId, GcReport>>>,
@@ -104,6 +108,12 @@ impl MockSchedulerCtx {
self
}
#[allow(dead_code)]
pub fn with_table_reparts(self, table_reparts: HashMap<TableId, TableRepartValue>) -> Self {
*self.table_reparts.lock().unwrap() = table_reparts;
self
}
/// Set an error to be returned by `get_table_to_region_stats`
#[allow(dead_code)]
pub fn with_get_table_to_region_stats_error(self, error: crate::error::Error) -> Self {
@@ -147,6 +157,16 @@ impl SchedulerCtx for MockSchedulerCtx {
.unwrap_or_default())
}
async fn get_table_reparts(&self) -> Result<Vec<(TableId, TableRepartValue)>> {
Ok(self
.table_reparts
.lock()
.unwrap()
.iter()
.map(|(table_id, value)| (*table_id, value.clone()))
.collect())
}
async fn get_table_route(
&self,
table_id: TableId,
@@ -165,11 +185,30 @@ impl SchedulerCtx for MockSchedulerCtx {
.unwrap_or_else(|| (table_id, PhysicalTableRouteValue::default())))
}
async fn batch_get_table_route(
&self,
table_ids: &[TableId],
) -> Result<HashMap<TableId, PhysicalTableRouteValue>> {
let mut result = HashMap::new();
for &table_id in table_ids {
let route = self
.table_routes
.lock()
.unwrap()
.get(&table_id)
.cloned()
.unwrap_or_else(|| (table_id, PhysicalTableRouteValue::default()));
result.insert(table_id, route.1);
}
Ok(result)
}
async fn gc_regions(
&self,
region_ids: &[RegionId],
_full_file_listing: bool,
_timeout: Duration,
_region_routes_override: Region2Peers,
) -> Result<GcReport> {
*self.gc_regions_calls.lock().unwrap() += 1;
@@ -368,7 +407,7 @@ fn mock_region_stat(
},
rcus: 0,
wcus: 0,
engine: "mito".to_string(),
engine: MITO_ENGINE.to_string(),
num_rows: 0,
memtable_size: 0,
manifest_size: 0,

View File

@@ -31,7 +31,7 @@ async fn test_parallel_process_datanodes_empty() {
let env = TestEnv::new();
let report = env
.scheduler
.parallel_process_datanodes(HashMap::new())
.parallel_process_datanodes(HashMap::new(), HashMap::new(), HashMap::new())
.await;
assert_eq!(report.per_datanode_reports.len(), 0);
@@ -85,7 +85,7 @@ async fn test_parallel_process_datanodes_with_candidates() {
)]);
let report = scheduler
.parallel_process_datanodes(datanode_to_candidates)
.parallel_process_datanodes(datanode_to_candidates, HashMap::new(), HashMap::new())
.await;
assert_eq!(report.per_datanode_reports.len(), 1);

View File

@@ -96,7 +96,7 @@ async fn test_concurrent_table_processing_limits() {
)]);
let report = scheduler
.parallel_process_datanodes(datanode_to_candidates)
.parallel_process_datanodes(datanode_to_candidates, HashMap::new(), HashMap::new())
.await;
// Should process all datanodes
@@ -168,7 +168,7 @@ async fn test_datanode_processes_tables_with_partial_gc_failures() {
)]);
let report = scheduler
.parallel_process_datanodes(datanode_to_candidates)
.parallel_process_datanodes(datanode_to_candidates, HashMap::new(), HashMap::new())
.await;
// Should have one datanode with mixed results
@@ -260,6 +260,8 @@ async fn test_region_gc_concurrency_limit() {
.process_datanode_gc(
peer,
candidates.into_iter().map(|c| (table_id, c)).collect(),
HashSet::new(),
HashMap::new(),
)
.await
.unwrap();
@@ -371,7 +373,7 @@ async fn test_region_gc_concurrency_with_partial_failures() {
)]);
let report = scheduler
.parallel_process_datanodes(datanode_to_candidates)
.parallel_process_datanodes(datanode_to_candidates, HashMap::new(), HashMap::new())
.await;
let report = report.per_datanode_reports.get(&peer.id).unwrap();
@@ -501,7 +503,7 @@ async fn test_region_gc_concurrency_with_retryable_errors() {
candidates.into_iter().map(|c| (table_id, c)).collect(),
)]);
let report = scheduler
.parallel_process_datanodes(datanode_to_candidates)
.parallel_process_datanodes(datanode_to_candidates, HashMap::new(), HashMap::new())
.await;
let report = report.per_datanode_reports.get(&peer.id).unwrap();

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
@@ -76,7 +76,12 @@ async fn test_full_file_listing_first_time_gc() {
// First GC - should use full listing since region has never been GC'd
let reports = scheduler
.process_datanode_gc(peer.clone(), vec![(table_id, mock_candidate(region_id))])
.process_datanode_gc(
peer.clone(),
vec![(table_id, mock_candidate(region_id))],
HashSet::new(),
HashMap::new(),
)
.await
.unwrap();
@@ -143,7 +148,12 @@ async fn test_full_file_listing_interval_enforcement() {
// First GC - should use full listing
let reports1 = scheduler
.process_datanode_gc(peer.clone(), vec![(table_id, mock_candidate(region_id))])
.process_datanode_gc(
peer.clone(),
vec![(table_id, mock_candidate(region_id))],
HashSet::new(),
HashMap::new(),
)
.await
.unwrap();
assert_eq!(reports1.deleted_files.len(), 1);
@@ -164,7 +174,12 @@ async fn test_full_file_listing_interval_enforcement() {
// Second GC - should use full listing again since interval has passed
let _reports2 = scheduler
.process_datanode_gc(peer.clone(), vec![(table_id, mock_candidate(region_id))])
.process_datanode_gc(
peer.clone(),
vec![(table_id, mock_candidate(region_id))],
HashSet::new(),
HashMap::new(),
)
.await
.unwrap();
@@ -233,7 +248,12 @@ async fn test_full_file_listing_no_interval_passed() {
// First GC - should use full listing
let reports1 = scheduler
.process_datanode_gc(peer.clone(), vec![(table_id, mock_candidate(region_id))])
.process_datanode_gc(
peer.clone(),
vec![(table_id, mock_candidate(region_id))],
HashSet::new(),
HashMap::new(),
)
.await
.unwrap();
assert_eq!(reports1.deleted_files.len(), 1);
@@ -251,7 +271,12 @@ async fn test_full_file_listing_no_interval_passed() {
// Second GC immediately - should NOT use full listing since interval hasn't passed
let reports2 = scheduler
.process_datanode_gc(peer.clone(), vec![(table_id, mock_candidate(region_id))])
.process_datanode_gc(
peer.clone(),
vec![(table_id, mock_candidate(region_id))],
HashSet::new(),
HashMap::new(),
)
.await
.unwrap();
assert_eq!(reports2.deleted_files.len(), 1);

View File

@@ -70,7 +70,7 @@ async fn test_empty_file_refs_manifest() {
)]);
let report = scheduler
.parallel_process_datanodes(datanode_to_candidates)
.parallel_process_datanodes(datanode_to_candidates, HashMap::new(), HashMap::new())
.await;
assert_eq!(report.per_datanode_reports.len(), 1);
@@ -147,7 +147,7 @@ async fn test_multiple_regions_per_table() {
)]);
let report = scheduler
.parallel_process_datanodes(datanode_to_candidates)
.parallel_process_datanodes(datanode_to_candidates, HashMap::new(), HashMap::new())
.await;
assert_eq!(report.per_datanode_reports.len(), 1);

View File

@@ -167,6 +167,9 @@ pub struct BatchGcData {
regions: Vec<RegionId>,
full_file_listing: bool,
region_routes: Region2Peers,
/// Routes assigned by the scheduler for regions missing from table routes.
#[serde(default)]
region_routes_override: Region2Peers,
/// Related regions (e.g., for shared files after repartition).
/// The source regions (where those files originally came from) are used as the key, and the destination regions (where files are currently stored) are used as the value.
related_regions: HashMap<RegionId, HashSet<RegionId>>,
@@ -199,6 +202,7 @@ impl BatchGcProcedure {
regions: Vec<RegionId>,
full_file_listing: bool,
timeout: Duration,
region_routes_override: Region2Peers,
) -> Self {
Self {
mailbox,
@@ -210,6 +214,7 @@ impl BatchGcProcedure {
full_file_listing,
timeout,
region_routes: HashMap::new(),
region_routes_override,
related_regions: HashMap::new(),
file_refs: FileRefsManifest::default(),
gc_report: None,
@@ -339,10 +344,16 @@ impl BatchGcProcedure {
let regions_to_discover = regions_set.into_iter().collect_vec();
let (region_to_peer, _) = self
let (mut region_to_peer, _) = self
.discover_route_for_regions(&regions_to_discover)
.await?;
for (region_id, route) in &self.data.region_routes_override {
region_to_peer
.entry(*region_id)
.or_insert_with(|| route.clone());
}
self.data.region_routes = region_to_peer;
Ok(())
@@ -356,11 +367,19 @@ impl BatchGcProcedure {
let related_regions = &self.data.related_regions;
let region_routes = &self.data.region_routes;
let timeout = self.data.timeout;
let dropped_regions = self
.data
.region_routes_override
.keys()
.collect::<HashSet<_>>();
// Group regions by datanode to minimize RPC calls
let mut datanode2query_regions: HashMap<Peer, Vec<RegionId>> = HashMap::new();
for region_id in query_regions {
if dropped_regions.contains(region_id) {
continue;
}
if let Some((leader, followers)) = region_routes.get(region_id) {
datanode2query_regions
.entry(leader.clone())
@@ -403,12 +422,21 @@ impl BatchGcProcedure {
let mut all_manifest_versions = HashMap::new();
let mut all_cross_region_refs = HashMap::new();
for (peer, regions) in datanode2query_regions {
let mut peers = HashSet::new();
peers.extend(datanode2query_regions.keys().cloned());
peers.extend(datanode2related_regions.keys().cloned());
for peer in peers {
let regions = datanode2query_regions.remove(&peer).unwrap_or_default();
let related_regions_for_peer =
datanode2related_regions.remove(&peer).unwrap_or_default();
if regions.is_empty() && related_regions_for_peer.is_empty() {
continue;
}
let instruction = GetFileRefs {
query_regions: regions.clone(),
query_regions: regions,
related_regions: related_regions_for_peer,
};

View File

@@ -57,6 +57,13 @@ impl GcScheduler {
}
}
let table_reparts = self.ctx.get_table_reparts().await?;
for (_, repart) in table_reparts {
for src_region in repart.src_to_dst.keys() {
current_regions.insert(*src_region);
}
}
// Remove stale entries from tracker
let mut tracker = self.region_gc_tracker.lock().await;
let initial_count = tracker.len();

View File

@@ -147,6 +147,8 @@ use crate::region::opener::PartitionExprFetcherRef;
use crate::request::{RegionEditRequest, WorkerRequest};
use crate::sst::file::{FileMeta, RegionFileId, RegionIndexId};
use crate::sst::file_ref::FileReferenceManagerRef;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::wal::entry_distributor::{
DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE, build_wal_entry_distributor_and_receivers,
};
@@ -303,6 +305,22 @@ impl MitoEngine {
self.inner.workers.gc_limiter()
}
pub fn object_store_manager(&self) -> &ObjectStoreManagerRef {
self.inner.workers.object_store_manager()
}
pub fn puffin_manager_factory(&self) -> &PuffinManagerFactory {
self.inner.workers.puffin_manager_factory()
}
pub fn intermediate_manager(&self) -> &IntermediateManager {
self.inner.workers.intermediate_manager()
}
pub fn schema_metadata_manager(&self) -> &SchemaMetadataManagerRef {
self.inner.workers.schema_metadata_manager()
}
/// Get all tmp ref files for given region ids, excluding files that's already in manifest.
pub async fn get_snapshot_of_file_refs(
&self,
@@ -467,6 +485,11 @@ impl MitoEngine {
self.inner.workers.get_region(region_id)
}
/// Returns all regions.
pub fn regions(&self) -> Vec<MitoRegionRef> {
self.inner.workers.all_regions().collect()
}
fn encode_manifest_info_to_extensions(
region_id: &RegionId,
manifest_info: RegionManifestInfo,

View File

@@ -31,7 +31,7 @@ use common_time::Timestamp;
use itertools::Itertools;
use object_store::{Entry, Lister};
use serde::{Deserialize, Serialize};
use snafu::ResultExt as _;
use snafu::{ResultExt as _, ensure};
use store_api::storage::{FileId, FileRef, FileRefsManifest, GcReport, IndexVersion, RegionId};
use tokio::sync::{OwnedSemaphorePermit, TryAcquireError};
use tokio_stream::StreamExt;
@@ -41,7 +41,8 @@ use crate::cache::CacheManagerRef;
use crate::cache::file_cache::FileType;
use crate::config::MitoConfig;
use crate::error::{
DurationOutOfRangeSnafu, JoinSnafu, OpenDalSnafu, Result, TooManyGcJobsSnafu, UnexpectedSnafu,
DurationOutOfRangeSnafu, InvalidRequestSnafu, JoinSnafu, OpenDalSnafu, Result,
TooManyGcJobsSnafu, UnexpectedSnafu,
};
use crate::manifest::action::{RegionManifest, RemovedFile};
use crate::metrics::{GC_DELETE_FILE_CNT, GC_ORPHANED_INDEX_FILES, GC_SKIPPED_UNPARSABLE_FILES};
@@ -174,7 +175,7 @@ impl Default for GcConfig {
pub struct LocalGcWorker {
pub(crate) access_layer: AccessLayerRef,
pub(crate) cache_manager: Option<CacheManagerRef>,
pub(crate) regions: BTreeMap<RegionId, MitoRegionRef>,
pub(crate) regions: BTreeMap<RegionId, Option<MitoRegionRef>>,
/// Lingering time before deleting files.
pub(crate) opt: GcConfig,
/// Tmp ref files manifest, used to determine which files are still in use by ongoing queries.
@@ -222,12 +223,28 @@ impl LocalGcWorker {
pub async fn try_new(
access_layer: AccessLayerRef,
cache_manager: Option<CacheManagerRef>,
regions_to_gc: BTreeMap<RegionId, MitoRegionRef>,
regions_to_gc: BTreeMap<RegionId, Option<MitoRegionRef>>,
opt: GcConfig,
file_ref_manifest: FileRefsManifest,
limiter: &GcLimiterRef,
full_file_listing: bool,
) -> Result<Self> {
if let Some(first_region_id) = regions_to_gc.keys().next() {
let table_id = first_region_id.table_id();
for region_id in regions_to_gc.keys() {
ensure!(
region_id.table_id() == table_id,
InvalidRequestSnafu {
region_id: *region_id,
reason: format!(
"Region {} does not belong to table {}",
region_id, table_id
),
}
);
}
}
let permit = limiter.permit()?;
Ok(Self {
@@ -269,7 +286,9 @@ impl LocalGcWorker {
let tmp_ref_files = self.read_tmp_ref_files().await?;
for (region_id, region) in &self.regions {
let per_region_time = std::time::Instant::now();
if region.manifest_ctx.current_state() == RegionRoleState::Follower {
if region.as_ref().map(|r| r.manifest_ctx.current_state())
== Some(RegionRoleState::Follower)
{
return UnexpectedSnafu {
reason: format!(
"Region {} is in Follower state, should not run GC on follower regions",
@@ -282,7 +301,9 @@ impl LocalGcWorker {
.get(region_id)
.cloned()
.unwrap_or_else(HashSet::new);
let files = self.do_region_gc(region.clone(), &tmp_ref_files).await?;
let files = self
.do_region_gc(*region_id, region.clone(), &tmp_ref_files)
.await?;
let index_files = files
.iter()
.filter_map(|f| f.index_version().map(|v| (f.file_id(), v)))
@@ -323,44 +344,73 @@ impl LocalGcWorker {
/// to avoid deleting files that are still needed.
pub async fn do_region_gc(
&self,
region: MitoRegionRef,
region_id: RegionId,
region: Option<MitoRegionRef>,
tmp_ref_files: &HashSet<FileRef>,
) -> Result<Vec<RemovedFile>> {
let region_id = region.region_id();
debug!(
"Doing gc for region {}, {}",
region_id,
if region.is_some() {
"region found"
} else {
"region not found, might be dropped"
}
);
debug!("Doing gc for region {}", region_id);
ensure!(
region.is_some() || self.full_file_listing,
InvalidRequestSnafu {
region_id,
reason: "region not found and full_file_listing is false; refusing GC without full listing".to_string(),
}
);
let manifest = region.manifest_ctx.manifest().await;
// If the manifest version does not match, skip GC for this region to avoid deleting files that are still in use.
let file_ref_manifest_version = self
.file_ref_manifest
.manifest_version
.get(&region.region_id())
.cloned();
if file_ref_manifest_version != Some(manifest.manifest_version) {
// should be rare enough(few seconds after leader update manifest version), just skip gc for this region
warn!(
"Tmp ref files manifest version {:?} does not match region {} manifest version {}, skip gc for this region",
file_ref_manifest_version,
region.region_id(),
manifest.manifest_version
);
return Ok(vec![]);
}
let manifest = if let Some(region) = &region {
let manifest = region.manifest_ctx.manifest().await;
// If the manifest version does not match, skip GC for this region to avoid deleting files that are still in use.
let file_ref_manifest_version = self
.file_ref_manifest
.manifest_version
.get(&region.region_id())
.cloned();
if file_ref_manifest_version != Some(manifest.manifest_version) {
// should be rare enough(few seconds after leader update manifest version), just skip gc for this region
warn!(
"Tmp ref files manifest version {:?} does not match region {} manifest version {}, skip gc for this region",
file_ref_manifest_version,
region.region_id(),
manifest.manifest_version
);
return Ok(vec![]);
}
Some(manifest)
} else {
None
};
// do the time consuming listing only when full_file_listing is true
// and do it first to make sure we have the latest manifest etc.
let all_entries = if self.full_file_listing {
self.list_from_object_store(region.region_id(), manifest.clone())
let all_entries = if let Some(manifest) = &manifest
&& self.full_file_listing
{
// do the time consuming listing only when full_file_listing is true(and region is open)
// and do it first to make sure we have the latest manifest etc.
self.list_from_object_store(region_id, manifest.files.len())
.await?
} else if manifest.is_none() && self.full_file_listing {
// if region is already dropped, we have no manifest to refer to,
// so only do gc if `full_file_listing` is true, otherwise just skip it
// TODO(discord9): is doing one serial listing enough here?
self.list_from_object_store(region_id, Self::CONCURRENCY_LIST_PER_FILES)
.await?
} else {
vec![]
};
let region_id = manifest.metadata.region_id;
let current_files = &manifest.files;
let recently_removed_files = self.get_removed_files_expel_times(&manifest).await?;
let recently_removed_files = if let Some(manifest) = &manifest {
self.get_removed_files_expel_times(manifest).await?
} else {
Default::default()
};
if recently_removed_files.is_empty() {
// no files to remove, skip
@@ -372,10 +422,16 @@ impl LocalGcWorker {
.map(|s| s.len())
.sum::<usize>();
let in_manifest = current_files
.iter()
.map(|(file_id, meta)| (*file_id, meta.index_version()))
.collect::<HashMap<_, _>>();
let current_files = manifest.as_ref().map(|m| &m.files);
let in_manifest = if let Some(current_files) = current_files {
current_files
.iter()
.map(|(file_id, meta)| (*file_id, meta.index_version()))
.collect::<HashMap<_, _>>()
} else {
Default::default()
};
let in_tmp_ref = tmp_ref_files
.iter()
@@ -395,8 +451,13 @@ impl LocalGcWorker {
let unused_file_cnt = deletable_files.len();
debug!(
"gc: for region {region_id}: In manifest files: {}, Tmp ref file cnt: {}, recently removed files: {}, Unused files to delete: {} ",
current_files.len(),
"gc: for region{}{region_id}: In manifest files: {}, Tmp ref file cnt: {}, recently removed files: {}, Unused files to delete: {}",
if region.is_none() {
"(region dropped)"
} else {
""
},
current_files.map(|c| c.len()).unwrap_or(0),
tmp_ref_files.len(),
removed_file_cnt,
deletable_files.len()
@@ -414,8 +475,10 @@ impl LocalGcWorker {
"Successfully deleted {} unused files for region {}",
unused_file_cnt, region_id
);
self.update_manifest_removed_files(&region, deletable_files.clone())
.await?;
if let Some(region) = &region {
self.update_manifest_removed_files(region, deletable_files.clone())
.await?;
}
Ok(deletable_files)
}
@@ -532,11 +595,10 @@ impl LocalGcWorker {
async fn list_from_object_store(
&self,
region_id: RegionId,
manifest: Arc<RegionManifest>,
file_cnt_hint: usize,
) -> Result<Vec<Entry>> {
let start = tokio::time::Instant::now();
let current_files = &manifest.files;
let concurrency = (current_files.len() / Self::CONCURRENCY_LIST_PER_FILES)
let concurrency = (file_cnt_hint / Self::CONCURRENCY_LIST_PER_FILES)
.max(1)
.min(self.opt.max_concurrent_lister_per_gc_job);
@@ -770,6 +832,8 @@ impl LocalGcWorker {
// files that may linger, which means they are not in use but may still be kept for a while
let threshold =
may_linger_until.map(|until| Timestamp::new_millisecond(until.timestamp_millis()));
// TODO(discord9): if region is already closed, maybe handle threshold differently?
// is consider all files to be recently removed acceptable?
let mut recently_removed_files = recently_removed_files;
let may_linger_files = match threshold {
Some(threshold) => recently_removed_files.split_off(&threshold),

View File

@@ -33,11 +33,19 @@ use crate::test_util::{
async fn create_gc_worker(
mito_engine: &MitoEngine,
regions: BTreeMap<RegionId, MitoRegionRef>,
regions: BTreeMap<RegionId, Option<MitoRegionRef>>,
file_ref_manifest: &FileRefsManifest,
full_file_listing: bool,
) -> LocalGcWorker {
let access_layer = regions.first_key_value().unwrap().1.access_layer.clone();
let access_layer = regions
.first_key_value()
.as_ref()
.unwrap()
.1
.as_ref()
.unwrap()
.access_layer
.clone();
let cache_manager = mito_engine.cache_manager();
LocalGcWorker::try_new(
@@ -130,7 +138,7 @@ async fn test_gc_worker_basic_truncate() {
);
let version = manifest.manifest_version;
let regions = BTreeMap::from([(region_id, region.clone())]);
let regions = BTreeMap::from([(region_id, Some(region.clone()))]);
let file_ref_manifest = FileRefsManifest {
file_refs: Default::default(),
manifest_version: [(region_id, version)].into(),
@@ -225,7 +233,7 @@ async fn test_gc_worker_truncate_with_ref() {
);
let version = manifest.manifest_version;
let regions = BTreeMap::from([(region_id, region.clone())]);
let regions = BTreeMap::from([(region_id, Some(region.clone()))]);
let file_ref_manifest = FileRefsManifest {
file_refs: [(
region_id,
@@ -311,7 +319,7 @@ async fn test_gc_worker_basic_compact() {
let version = manifest.manifest_version;
let regions = BTreeMap::from([(region_id, region.clone())]);
let regions = BTreeMap::from([(region_id, Some(region.clone()))]);
let file_ref_manifest = FileRefsManifest {
file_refs: Default::default(),
manifest_version: [(region_id, version)].into(),
@@ -388,7 +396,7 @@ async fn test_gc_worker_compact_with_ref() {
let version = manifest.manifest_version;
let regions = BTreeMap::from([(region_id, region.clone())]);
let regions = BTreeMap::from([(region_id, Some(region.clone()))]);
let file_ref_manifest = FileRefsManifest {
file_refs: HashMap::from([(
region_id,

View File

@@ -42,6 +42,9 @@ pub struct FileReferenceManager {
node_id: Option<u64>,
/// TODO(discord9): use no hash hasher since table id is sequential.
files_per_region: DashMap<RegionId, RegionFileRefs>,
/// Whether global GC is enabled.
/// This is only meaningful when using object store (not local filesystem).
gc_enabled: bool,
}
pub type FileReferenceManagerRef = Arc<FileReferenceManager>;
@@ -51,9 +54,28 @@ impl FileReferenceManager {
Self {
node_id,
files_per_region: Default::default(),
gc_enabled: false,
}
}
/// Creates a new FileReferenceManager with GC configuration.
pub fn with_gc_enabled(node_id: Option<u64>, gc_enabled: bool) -> Self {
Self {
node_id,
files_per_region: Default::default(),
gc_enabled,
}
}
/// Returns whether global GC is enabled.
///
/// This is useful for determining the file deletion strategy:
/// - If GC is enabled (and using object store), files are tracked but not immediately deleted
/// - If GC is disabled (or using local filesystem), files are deleted immediately
pub fn is_gc_enabled(&self) -> bool {
self.gc_enabled
}
fn ref_file_set(&self, region_id: RegionId) -> Option<HashSet<FileRef>> {
let file_refs = if let Some(file_refs) = self.files_per_region.get(&region_id) {
file_refs.clone()

View File

@@ -150,6 +150,14 @@ pub(crate) struct WorkerGroup {
file_ref_manager: FileReferenceManagerRef,
/// Gc limiter to limit concurrent gc jobs.
gc_limiter: GcLimiterRef,
/// Object store manager.
object_store_manager: ObjectStoreManagerRef,
/// Puffin manager factory.
puffin_manager_factory: PuffinManagerFactory,
/// Intermediate manager.
intermediate_manager: IntermediateManager,
/// Schema metadata manager.
schema_metadata_manager: SchemaMetadataManagerRef,
}
impl WorkerGroup {
@@ -260,6 +268,10 @@ impl WorkerGroup {
cache_manager,
file_ref_manager,
gc_limiter,
object_store_manager,
puffin_manager_factory,
intermediate_manager,
schema_metadata_manager,
})
}
@@ -338,6 +350,22 @@ impl WorkerGroup {
.iter()
.flat_map(|worker| worker.regions.list_regions())
}
pub(crate) fn object_store_manager(&self) -> &ObjectStoreManagerRef {
&self.object_store_manager
}
pub(crate) fn puffin_manager_factory(&self) -> &PuffinManagerFactory {
&self.puffin_manager_factory
}
pub(crate) fn intermediate_manager(&self) -> &IntermediateManager {
&self.intermediate_manager
}
pub(crate) fn schema_metadata_manager(&self) -> &SchemaMetadataManagerRef {
&self.schema_metadata_manager
}
}
// Tests methods.
@@ -447,6 +475,10 @@ impl WorkerGroup {
cache_manager,
file_ref_manager,
gc_limiter,
object_store_manager,
puffin_manager_factory,
intermediate_manager,
schema_metadata_manager,
})
}

View File

@@ -23,7 +23,7 @@ use object_store::util::join_path;
use object_store::{EntryMode, ObjectStore};
use snafu::ResultExt;
use store_api::logstore::LogStore;
use store_api::region_request::AffectedRows;
use store_api::region_request::{AffectedRows, PathType};
use store_api::storage::RegionId;
use tokio::time::sleep;
@@ -57,6 +57,7 @@ where
// Writes dropping marker
// We rarely drop a region so we still operate in the worker loop.
let region_dir = region.access_layer.build_region_dir(region_id);
let path_type = region.access_layer.path_type();
let table_dir = region.access_layer.table_dir().to_string();
let marker_path = join_path(&region_dir, DROPPING_MARKER_FILE);
region
@@ -104,34 +105,46 @@ where
self.region_count.dec();
// Detaches a background task to delete the region dir
let object_store = region.access_layer.object_store().clone();
let dropping_regions = self.dropping_regions.clone();
let listener = self.listener.clone();
let intm_manager = self.intermediate_manager.clone();
let cache_manager = self.cache_manager.clone();
let gc_enabled = self.file_ref_manager.is_gc_enabled();
common_runtime::spawn_global(async move {
let gc_duration = listener
.on_later_drop_begin(region_id)
.unwrap_or(Duration::from_secs(GC_TASK_INTERVAL_SEC));
let removed = later_drop_task(
region_id,
region_dir.clone(),
object_store,
dropping_regions,
gc_duration,
)
.await;
let removed = if gc_enabled {
later_drop_task_with_global_gc(
region_id,
region_dir.clone(),
path_type,
object_store,
dropping_regions,
gc_duration,
)
.await
} else {
later_drop_task_without_global_gc(
region_id,
region_dir.clone(),
object_store,
dropping_regions,
gc_duration,
)
.await
};
if let Err(err) = intm_manager.prune_region_dir(&region_id).await {
warn!(err; "Failed to prune intermediate region directory, region_id: {}", region_id);
}
// Clean manifest cache for the region
if let Some(write_cache) = cache_manager.write_cache()
&& let Some(manifest_cache) = write_cache.manifest_cache()
{
// We pass the table dir so we can remove the table dir in manifest cache
// when the last region in the same host is dropped.
manifest_cache.clean_manifests(&table_dir).await;
}
@@ -152,14 +165,32 @@ where
/// This task will retry on failure and keep running until finished. Any resource
/// captured by it will not be released before then. Be sure to only pass weak reference
/// if something is depended on ref-count mechanism.
async fn later_drop_task(
async fn later_drop_task_without_global_gc(
region_id: RegionId,
region_path: String,
object_store: ObjectStore,
dropping_regions: RegionMapRef,
gc_duration: Duration,
) -> bool {
let mut force = false;
remove_region_with_retry(
region_id,
region_path,
object_store,
dropping_regions,
gc_duration,
false,
)
.await
}
async fn remove_region_with_retry(
region_id: RegionId,
region_path: String,
object_store: ObjectStore,
dropping_regions: std::sync::Arc<crate::region::RegionMap>,
gc_duration: Duration,
mut force: bool,
) -> bool {
for _ in 0..MAX_RETRY_TIMES {
let result = remove_region_dir_once(&region_path, &object_store, force).await;
match result {
@@ -189,6 +220,31 @@ async fn later_drop_task(
false
}
async fn later_drop_task_with_global_gc(
region_id: RegionId,
region_path: String,
path_type: PathType,
object_store: ObjectStore,
dropping_regions: RegionMapRef,
gc_duration: Duration,
) -> bool {
if path_type == PathType::Metadata {
remove_region_with_retry(
region_id,
region_path,
object_store,
dropping_regions,
gc_duration,
true,
)
.await
} else {
// left for global gc
dropping_regions.remove_region(region_id);
true
}
}
// TODO(ruihang): place the marker in a separate dir
/// Removes region dir if there is no parquet files, returns whether the directory is removed.
/// If `force = true`, always removes the dir.

View File

@@ -203,6 +203,7 @@ async fn test_gc_basic(store_type: &StorageType) {
regions.clone(),
false, // full_file_listing
Duration::from_secs(10), // timeout
Default::default(),
);
// Submit the procedure to the procedure manager