mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-10 07:12:54 +00:00
feat: make RepeatedTask invoke remove_outdated_meta method (#1578)
* feat: make RepeatedTask invoke remove_outdated_meta method * fix: typo * chore: improve error message
This commit is contained in:
@@ -121,6 +121,18 @@ pub enum Error {
|
||||
|
||||
#[snafu(display("Corrupted data, error: {source}"))]
|
||||
CorruptedData { source: FromUtf8Error },
|
||||
|
||||
#[snafu(display("Failed to start the remove_outdated_meta method, error: {}", source))]
|
||||
StartRemoveOutdatedMetaTask {
|
||||
source: common_runtime::error::Error,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to stop the remove_outdated_meta method, error: {}", source))]
|
||||
StopRemoveOutdatedMetaTask {
|
||||
source: common_runtime::error::Error,
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -145,6 +157,8 @@ impl ErrorExt for Error {
|
||||
}
|
||||
Error::ProcedurePanic { .. } | Error::CorruptedData { .. } => StatusCode::Unexpected,
|
||||
Error::ProcedureExec { source, .. } => source.status_code(),
|
||||
Error::StartRemoveOutdatedMetaTask { source, .. }
|
||||
| Error::StopRemoveOutdatedMetaTask { source, .. } => source.status_code(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -21,12 +21,16 @@ use std::time::{Duration, Instant};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use backon::ExponentialBuilder;
|
||||
use common_runtime::{RepeatedTask, TaskFunction};
|
||||
use common_telemetry::logging;
|
||||
use snafu::ensure;
|
||||
use snafu::{ensure, ResultExt};
|
||||
use tokio::sync::watch::{self, Receiver, Sender};
|
||||
use tokio::sync::Notify;
|
||||
|
||||
use crate::error::{DuplicateProcedureSnafu, LoaderConflictSnafu, Result};
|
||||
use crate::error::{
|
||||
DuplicateProcedureSnafu, Error, LoaderConflictSnafu, Result, StartRemoveOutdatedMetaTaskSnafu,
|
||||
StopRemoveOutdatedMetaTaskSnafu,
|
||||
};
|
||||
use crate::local::lock::LockMap;
|
||||
use crate::local::runner::Runner;
|
||||
use crate::procedure::BoxedProcedureLoader;
|
||||
@@ -341,6 +345,8 @@ impl ManagerContext {
|
||||
pub struct ManagerConfig {
|
||||
pub max_retry_times: usize,
|
||||
pub retry_delay: Duration,
|
||||
pub remove_outdated_meta_task_interval: Duration,
|
||||
pub remove_outdated_meta_ttl: Duration,
|
||||
}
|
||||
|
||||
impl Default for ManagerConfig {
|
||||
@@ -348,6 +354,8 @@ impl Default for ManagerConfig {
|
||||
Self {
|
||||
max_retry_times: 3,
|
||||
retry_delay: Duration::from_millis(500),
|
||||
remove_outdated_meta_task_interval: Duration::from_secs(60 * 10),
|
||||
remove_outdated_meta_ttl: META_TTL,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -358,16 +366,26 @@ pub struct LocalManager {
|
||||
state_store: StateStoreRef,
|
||||
max_retry_times: usize,
|
||||
retry_delay: Duration,
|
||||
remove_outdated_meta_task: RepeatedTask<Error>,
|
||||
}
|
||||
|
||||
impl LocalManager {
|
||||
/// Create a new [LocalManager] with specific `config`.
|
||||
pub fn new(config: ManagerConfig, state_store: StateStoreRef) -> LocalManager {
|
||||
let manager_ctx = Arc::new(ManagerContext::new());
|
||||
let remove_outdated_meta_task = RepeatedTask::new(
|
||||
config.remove_outdated_meta_task_interval,
|
||||
Box::new(RemoveOutdatedMetaFunction {
|
||||
manager_ctx: manager_ctx.clone(),
|
||||
ttl: config.remove_outdated_meta_ttl,
|
||||
}),
|
||||
);
|
||||
LocalManager {
|
||||
manager_ctx: Arc::new(ManagerContext::new()),
|
||||
manager_ctx,
|
||||
state_store,
|
||||
max_retry_times: config.max_retry_times,
|
||||
retry_delay: config.retry_delay,
|
||||
remove_outdated_meta_task,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -419,6 +437,21 @@ impl ProcedureManager for LocalManager {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn start(&self) -> Result<()> {
|
||||
self.remove_outdated_meta_task
|
||||
.start(common_runtime::bg_runtime())
|
||||
.context(StartRemoveOutdatedMetaTaskSnafu)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn stop(&self) -> Result<()> {
|
||||
self.remove_outdated_meta_task
|
||||
.stop()
|
||||
.await
|
||||
.context(StopRemoveOutdatedMetaTaskSnafu)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn submit(&self, procedure: ProcedureWithId) -> Result<Watcher> {
|
||||
let procedure_id = procedure.id;
|
||||
ensure!(
|
||||
@@ -426,9 +459,6 @@ impl ProcedureManager for LocalManager {
|
||||
DuplicateProcedureSnafu { procedure_id }
|
||||
);
|
||||
|
||||
// TODO(yingwen): We can use a repeated task to remove outdated meta.
|
||||
self.manager_ctx.remove_outdated_meta(META_TTL);
|
||||
|
||||
self.submit_root(procedure.id, 0, procedure.procedure)
|
||||
}
|
||||
|
||||
@@ -487,18 +517,31 @@ impl ProcedureManager for LocalManager {
|
||||
}
|
||||
|
||||
async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>> {
|
||||
self.manager_ctx.remove_outdated_meta(META_TTL);
|
||||
|
||||
Ok(self.manager_ctx.state(procedure_id))
|
||||
}
|
||||
|
||||
fn procedure_watcher(&self, procedure_id: ProcedureId) -> Option<Watcher> {
|
||||
self.manager_ctx.remove_outdated_meta(META_TTL);
|
||||
|
||||
self.manager_ctx.watcher(procedure_id)
|
||||
}
|
||||
}
|
||||
|
||||
struct RemoveOutdatedMetaFunction {
|
||||
manager_ctx: Arc<ManagerContext>,
|
||||
ttl: Duration,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl TaskFunction<Error> for RemoveOutdatedMetaFunction {
|
||||
fn name(&self) -> &str {
|
||||
"ProcedureManager-remove-outdated-meta-task"
|
||||
}
|
||||
|
||||
async fn call(&mut self) -> Result<()> {
|
||||
self.manager_ctx.remove_outdated_meta(self.ttl);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new [ProcedureMeta] for test purpose.
|
||||
#[cfg(test)]
|
||||
mod test_util {
|
||||
@@ -639,6 +682,7 @@ mod tests {
|
||||
let config = ManagerConfig {
|
||||
max_retry_times: 3,
|
||||
retry_delay: Duration::from_millis(500),
|
||||
..Default::default()
|
||||
};
|
||||
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
|
||||
let manager = LocalManager::new(config, state_store);
|
||||
@@ -660,6 +704,7 @@ mod tests {
|
||||
let config = ManagerConfig {
|
||||
max_retry_times: 3,
|
||||
retry_delay: Duration::from_millis(500),
|
||||
..Default::default()
|
||||
};
|
||||
let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
|
||||
let manager = LocalManager::new(config, state_store);
|
||||
@@ -706,6 +751,7 @@ mod tests {
|
||||
let config = ManagerConfig {
|
||||
max_retry_times: 3,
|
||||
retry_delay: Duration::from_millis(500),
|
||||
..Default::default()
|
||||
};
|
||||
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
|
||||
let manager = LocalManager::new(config, state_store);
|
||||
@@ -754,6 +800,7 @@ mod tests {
|
||||
let config = ManagerConfig {
|
||||
max_retry_times: 3,
|
||||
retry_delay: Duration::from_millis(500),
|
||||
..Default::default()
|
||||
};
|
||||
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
|
||||
let manager = LocalManager::new(config, state_store);
|
||||
@@ -807,4 +854,59 @@ mod tests {
|
||||
check_procedure(MockProcedure { panic: false }).await;
|
||||
check_procedure(MockProcedure { panic: true }).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_remove_outdated_meta_task() {
|
||||
let dir = create_temp_dir("remove_outdated_meta_task");
|
||||
let object_store = test_util::new_object_store(&dir);
|
||||
let config = ManagerConfig {
|
||||
max_retry_times: 3,
|
||||
retry_delay: Duration::from_millis(500),
|
||||
remove_outdated_meta_task_interval: Duration::from_millis(1),
|
||||
remove_outdated_meta_ttl: Duration::from_millis(1),
|
||||
};
|
||||
let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
|
||||
let manager = LocalManager::new(config, state_store);
|
||||
|
||||
let mut procedure = ProcedureToLoad::new("submit");
|
||||
procedure.lock_key = LockKey::single("test.submit");
|
||||
let procedure_id = ProcedureId::random();
|
||||
manager
|
||||
.submit(ProcedureWithId {
|
||||
id: procedure_id,
|
||||
procedure: Box::new(procedure),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
|
||||
watcher.changed().await.unwrap();
|
||||
manager.start().unwrap();
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
assert!(manager
|
||||
.procedure_state(procedure_id)
|
||||
.await
|
||||
.unwrap()
|
||||
.is_none());
|
||||
|
||||
// The remove_outdated_meta method has been stopped, so any procedure meta-data will not be automatically removed.
|
||||
manager.stop().await.unwrap();
|
||||
let mut procedure = ProcedureToLoad::new("submit");
|
||||
procedure.lock_key = LockKey::single("test.submit");
|
||||
let procedure_id = ProcedureId::random();
|
||||
manager
|
||||
.submit(ProcedureWithId {
|
||||
id: procedure_id,
|
||||
procedure: Box::new(procedure),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
|
||||
watcher.changed().await.unwrap();
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
assert!(manager
|
||||
.procedure_state(procedure_id)
|
||||
.await
|
||||
.unwrap()
|
||||
.is_some());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -260,6 +260,10 @@ pub trait ProcedureManager: Send + Sync + 'static {
|
||||
/// Registers loader for specific procedure type `name`.
|
||||
fn register_loader(&self, name: &str, loader: BoxedProcedureLoader) -> Result<()>;
|
||||
|
||||
fn start(&self) -> Result<()>;
|
||||
|
||||
async fn stop(&self) -> Result<()>;
|
||||
|
||||
/// Submits a procedure to execute.
|
||||
///
|
||||
/// Returns a [Watcher] to watch the created procedure.
|
||||
|
||||
@@ -482,6 +482,16 @@ pub enum Error {
|
||||
|
||||
#[snafu(display("Payload not exist"))]
|
||||
PayloadNotExist { location: Location },
|
||||
|
||||
#[snafu(display("Failed to start the procedure manager"))]
|
||||
StartProcedureManager {
|
||||
source: common_procedure::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to stop the procedure manager"))]
|
||||
StopProcedureManager {
|
||||
source: common_procedure::error::Error,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -581,6 +591,9 @@ impl ErrorExt for Error {
|
||||
source.status_code()
|
||||
}
|
||||
WaitProcedure { source, .. } => source.status_code(),
|
||||
StartProcedureManager { source } | StopProcedureManager { source } => {
|
||||
source.status_code()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -62,6 +62,7 @@ use crate::datanode::{
|
||||
use crate::error::{
|
||||
self, CatalogSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu,
|
||||
NewCatalogSnafu, OpenLogStoreSnafu, RecoverProcedureSnafu, Result, ShutdownInstanceSnafu,
|
||||
StartProcedureManagerSnafu, StopProcedureManagerSnafu,
|
||||
};
|
||||
use crate::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
|
||||
use crate::heartbeat::handler::HandlerGroupExecutor;
|
||||
@@ -258,10 +259,17 @@ impl Instance {
|
||||
.recover()
|
||||
.await
|
||||
.context(RecoverProcedureSnafu)?;
|
||||
self.procedure_manager
|
||||
.start()
|
||||
.context(StartProcedureManagerSnafu)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn shutdown(&self) -> Result<()> {
|
||||
self.procedure_manager
|
||||
.stop()
|
||||
.await
|
||||
.context(StopProcedureManagerSnafu)?;
|
||||
if let Some(heartbeat_task) = &self.heartbeat_task {
|
||||
heartbeat_task
|
||||
.close()
|
||||
@@ -568,6 +576,7 @@ pub(crate) async fn create_procedure_manager(
|
||||
let manager_config = ManagerConfig {
|
||||
max_retry_times: procedure_config.max_retry_times,
|
||||
retry_delay: procedure_config.retry_delay,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
Ok(Arc::new(LocalManager::new(manager_config, state_store)))
|
||||
|
||||
@@ -73,6 +73,7 @@ impl TestEnv {
|
||||
let config = ManagerConfig {
|
||||
max_retry_times: 3,
|
||||
retry_delay: Duration::from_secs(500),
|
||||
..Default::default()
|
||||
};
|
||||
let state_store = Arc::new(ObjectStateStore::new(object_store));
|
||||
let procedure_manager = Arc::new(LocalManager::new(config, state_store));
|
||||
|
||||
Reference in New Issue
Block a user