feat: add limit for the number of running procedures (#5793)

* refactor: remove unused `messages`

* feat: introduce running procedure num limit

* feat: update config

* chore: apply suggestions from CR

* feat: impl `status_code` for `log-store` crate
This commit is contained in:
Weny Xu
2025-03-31 14:14:21 +08:00
committed by GitHub
parent 5d688c6565
commit d3a60d8821
14 changed files with 196 additions and 50 deletions

View File

@@ -98,6 +98,7 @@
| `procedure` | -- | -- | Procedure storage options. |
| `procedure.max_retry_times` | Integer | `3` | Procedure max retry time. |
| `procedure.retry_delay` | String | `500ms` | Initial retry delay of procedures, increases exponentially |
| `procedure.max_running_procedures` | Integer | `128` | Max running procedures.<br/>The maximum number of procedures that can be running at the same time.<br/>If the number of running procedures exceeds this limit, the procedure will be rejected. |
| `flow` | -- | -- | flow engine options. |
| `flow.num_workers` | Integer | `0` | The number of flow worker in flownode.<br/>Not setting(or set to 0) this value will use the number of CPU cores divided by 2. |
| `storage` | -- | -- | The data storage options. |
@@ -328,6 +329,7 @@
| `procedure.max_retry_times` | Integer | `12` | Procedure max retry time. |
| `procedure.retry_delay` | String | `500ms` | Initial retry delay of procedures, increases exponentially |
| `procedure.max_metadata_value_size` | String | `1500KiB` | Auto split large value<br/>GreptimeDB procedure uses etcd as the default metadata storage backend.<br/>The etcd the maximum size of any request is 1.5 MiB<br/>1500KiB = 1536KiB (1.5MiB) - 36KiB (reserved size of key)<br/>Comments out the `max_metadata_value_size`, for don't split large value (no limit). |
| `procedure.max_running_procedures` | Integer | `128` | Max running procedures.<br/>The maximum number of procedures that can be running at the same time.<br/>If the number of running procedures exceeds this limit, the procedure will be rejected. |
| `failure_detector` | -- | -- | -- |
| `failure_detector.threshold` | Float | `8.0` | The threshold value used by the failure detector to determine failure conditions. |
| `failure_detector.min_std_deviation` | String | `100ms` | The minimum standard deviation of the heartbeat intervals, used to calculate acceptable variations. |

View File

@@ -79,6 +79,11 @@ retry_delay = "500ms"
## Comments out the `max_metadata_value_size`, for don't split large value (no limit).
max_metadata_value_size = "1500KiB"
## Max running procedures.
## The maximum number of procedures that can be running at the same time.
## If the number of running procedures exceeds this limit, the procedure will be rejected.
max_running_procedures = 128
# Failure detectors options.
[failure_detector]

View File

@@ -302,6 +302,10 @@ purge_interval = "1m"
max_retry_times = 3
## Initial retry delay of procedures, increases exponentially
retry_delay = "500ms"
## Max running procedures.
## The maximum number of procedures that can be running at the same time.
## If the number of running procedures exceeds this limit, the procedure will be rejected.
max_running_procedures = 128
## flow engine options.
[flow]

View File

@@ -58,6 +58,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Too many running procedures, max: {}", max_running_procedures))]
TooManyRunningProcedures {
max_running_procedures: usize,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to put state, key: '{key}'"))]
PutState {
key: String,
@@ -192,7 +199,8 @@ impl ErrorExt for Error {
| Error::FromJson { .. }
| Error::WaitWatcher { .. }
| Error::RetryLater { .. }
| Error::RollbackProcedureRecovered { .. } => StatusCode::Internal,
| Error::RollbackProcedureRecovered { .. }
| Error::TooManyRunningProcedures { .. } => StatusCode::Internal,
Error::RetryTimesExceeded { .. }
| Error::RollbackTimesExceeded { .. }

View File

@@ -15,7 +15,8 @@
mod runner;
mod rwlock;
use std::collections::{HashMap, VecDeque};
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant};
@@ -33,6 +34,7 @@ use self::rwlock::KeyRwLock;
use crate::error::{
self, DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ManagerNotStartSnafu, Result,
StartRemoveOutdatedMetaTaskSnafu, StopRemoveOutdatedMetaTaskSnafu,
TooManyRunningProceduresSnafu,
};
use crate::local::runner::Runner;
use crate::procedure::{BoxedProcedureLoader, InitProcedureState, ProcedureInfo};
@@ -147,7 +149,6 @@ type ProcedureMetaRef = Arc<ProcedureMeta>;
/// Procedure loaded from store.
struct LoadedProcedure {
procedure: BoxedProcedure,
parent_id: Option<ProcedureId>,
step: u32,
}
@@ -157,8 +158,7 @@ pub(crate) struct ManagerContext {
loaders: Mutex<HashMap<String, BoxedProcedureLoader>>,
key_lock: KeyRwLock<String>,
procedures: RwLock<HashMap<ProcedureId, ProcedureMetaRef>>,
/// Messages loaded from the procedure store.
messages: Mutex<HashMap<ProcedureId, ProcedureMessage>>,
running_procedures: Mutex<HashSet<ProcedureId>>,
/// Ids and finished time of finished procedures.
finished_procedures: Mutex<VecDeque<(ProcedureId, Instant)>>,
/// Running flag.
@@ -179,7 +179,7 @@ impl ManagerContext {
key_lock: KeyRwLock::new(),
loaders: Mutex::new(HashMap::new()),
procedures: RwLock::new(HashMap::new()),
messages: Mutex::new(HashMap::new()),
running_procedures: Mutex::new(HashSet::new()),
finished_procedures: Mutex::new(VecDeque::new()),
running: Arc::new(AtomicBool::new(false)),
}
@@ -210,18 +210,27 @@ impl ManagerContext {
procedures.contains_key(&procedure_id)
}
/// Returns the number of running procedures.
fn num_running_procedures(&self) -> usize {
self.running_procedures.lock().unwrap().len()
}
/// Try to insert the `procedure` to the context if there is no procedure
/// with same [ProcedureId].
///
/// Returns `false` if there is already a procedure using the same [ProcedureId].
fn try_insert_procedure(&self, meta: ProcedureMetaRef) -> bool {
let procedure_id = meta.id;
let mut procedures = self.procedures.write().unwrap();
if procedures.contains_key(&meta.id) {
return false;
match procedures.entry(procedure_id) {
Entry::Occupied(_) => return false,
Entry::Vacant(vacant_entry) => {
vacant_entry.insert(meta);
}
}
let old = procedures.insert(meta.id, meta);
debug_assert!(old.is_none());
let mut running_procedures = self.running_procedures.lock().unwrap();
running_procedures.insert(procedure_id);
true
}
@@ -264,16 +273,6 @@ impl ManagerContext {
}
}
/// Load procedure with specific `procedure_id` from cached [ProcedureMessage]s.
fn load_one_procedure(&self, procedure_id: ProcedureId) -> Option<LoadedProcedure> {
let message = {
let messages = self.messages.lock().unwrap();
messages.get(&procedure_id).cloned()?
};
self.load_one_procedure_from_message(procedure_id, &message)
}
/// Load procedure from specific [ProcedureMessage].
fn load_one_procedure_from_message(
&self,
@@ -301,7 +300,6 @@ impl ManagerContext {
Some(LoadedProcedure {
procedure,
parent_id: message.parent_id,
step: message.step,
})
}
@@ -350,23 +348,19 @@ impl ManagerContext {
}
}
/// Remove cached [ProcedureMessage] by ids.
fn remove_messages(&self, procedure_ids: &[ProcedureId]) {
let mut messages = self.messages.lock().unwrap();
for procedure_id in procedure_ids {
let _ = messages.remove(procedure_id);
}
}
/// Clean resources of finished procedures.
fn on_procedures_finish(&self, procedure_ids: &[ProcedureId]) {
self.remove_messages(procedure_ids);
// Since users need to query the procedure state, so we can't remove the
// meta of the procedure directly.
let now = Instant::now();
let mut finished_procedures = self.finished_procedures.lock().unwrap();
finished_procedures.extend(procedure_ids.iter().map(|id| (*id, now)));
// Remove the procedures from the running set.
let mut running_procedures = self.running_procedures.lock().unwrap();
for procedure_id in procedure_ids {
running_procedures.remove(procedure_id);
}
}
/// Remove metadata of outdated procedures.
@@ -410,6 +404,7 @@ pub struct ManagerConfig {
pub retry_delay: Duration,
pub remove_outdated_meta_task_interval: Duration,
pub remove_outdated_meta_ttl: Duration,
pub max_running_procedures: usize,
}
impl Default for ManagerConfig {
@@ -420,6 +415,7 @@ impl Default for ManagerConfig {
retry_delay: Duration::from_millis(500),
remove_outdated_meta_task_interval: Duration::from_secs(60 * 10),
remove_outdated_meta_ttl: META_TTL,
max_running_procedures: 128,
}
}
}
@@ -492,6 +488,13 @@ impl LocalManager {
let watcher = meta.state_receiver.clone();
ensure!(
self.manager_ctx.num_running_procedures() < self.config.max_running_procedures,
TooManyRunningProceduresSnafu {
max_running_procedures: self.config.max_running_procedures,
}
);
// Inserts meta into the manager before actually spawnd the runner.
ensure!(
self.manager_ctx.try_insert_procedure(meta),
@@ -1119,6 +1122,7 @@ mod tests {
retry_delay: Duration::from_millis(500),
remove_outdated_meta_task_interval: Duration::from_millis(1),
remove_outdated_meta_ttl: Duration::from_millis(1),
max_running_procedures: 128,
};
let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
let manager = LocalManager::new(config, state_store);
@@ -1191,6 +1195,69 @@ mod tests {
.is_none());
}
#[tokio::test]
async fn test_too_many_running_procedures() {
let dir = create_temp_dir("too_many_running_procedures");
let config = ManagerConfig {
parent_path: "data/".to_string(),
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
max_running_procedures: 1,
..Default::default()
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let manager = LocalManager::new(config, state_store);
manager.manager_ctx.set_running();
manager
.manager_ctx
.running_procedures
.lock()
.unwrap()
.insert(ProcedureId::random());
manager.start().await.unwrap();
// Submit a new procedure should fail.
let mut procedure = ProcedureToLoad::new("submit");
procedure.lock_key = LockKey::single_exclusive("test.submit");
let procedure_id = ProcedureId::random();
let err = manager
.submit(ProcedureWithId {
id: procedure_id,
procedure: Box::new(procedure),
})
.await
.unwrap_err();
assert!(matches!(err, Error::TooManyRunningProcedures { .. }));
manager
.manager_ctx
.running_procedures
.lock()
.unwrap()
.clear();
// Submit a new procedure should succeed.
let mut procedure = ProcedureToLoad::new("submit");
procedure.lock_key = LockKey::single_exclusive("test.submit");
assert!(manager
.submit(ProcedureWithId {
id: procedure_id,
procedure: Box::new(procedure),
})
.await
.is_ok());
assert!(manager
.procedure_state(procedure_id)
.await
.unwrap()
.is_some());
// Wait for the procedure done.
let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
watcher.changed().await.unwrap();
assert!(watcher.borrow().is_done());
}
#[derive(Debug)]
struct ProcedureToRecover {
content: String,

View File

@@ -348,24 +348,14 @@ impl Runner {
&self,
procedure_id: ProcedureId,
procedure_state: ProcedureState,
mut procedure: BoxedProcedure,
procedure: BoxedProcedure,
) {
if self.manager_ctx.contains_procedure(procedure_id) {
// If the parent has already submitted this procedure, don't submit it again.
return;
}
let mut step = 0;
if let Some(loaded_procedure) = self.manager_ctx.load_one_procedure(procedure_id) {
// Try to load procedure state from the message to avoid re-run the subprocedure
// from initial state.
assert_eq!(self.meta.id, loaded_procedure.parent_id.unwrap());
// Use the dumped procedure from the procedure store.
procedure = loaded_procedure.procedure;
// Update step number.
step = loaded_procedure.step;
}
let step = 0;
let meta = Arc::new(ProcedureMeta::new(
procedure_id,

View File

@@ -29,6 +29,8 @@ pub struct ProcedureConfig {
pub retry_delay: Duration,
/// `None` stands for no limit.
pub max_metadata_value_size: Option<ReadableSize>,
/// Max running procedures.
pub max_running_procedures: usize,
}
impl Default for ProcedureConfig {
@@ -37,6 +39,7 @@ impl Default for ProcedureConfig {
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
max_metadata_value_size: None,
max_running_procedures: 128,
}
}
}

View File

@@ -118,6 +118,7 @@ impl Instance {
let manager_config = ManagerConfig {
max_retry_times: procedure_config.max_retry_times,
retry_delay: procedure_config.retry_delay,
max_running_procedures: procedure_config.max_running_procedures,
..Default::default()
};
let procedure_manager = Arc::new(LocalManager::new(manager_config, state_store));

View File

@@ -15,6 +15,7 @@
use std::any::Any;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_runtime::error::Error as RuntimeError;
use serde_json::error::Error as JsonError;
@@ -310,10 +311,72 @@ pub enum Error {
},
}
pub type Result<T> = std::result::Result<T, Error>;
fn rskafka_client_error_to_status_code(error: &rskafka::client::error::Error) -> StatusCode {
match error {
rskafka::client::error::Error::Connection(_)
| rskafka::client::error::Error::Request(_)
| rskafka::client::error::Error::InvalidResponse(_)
| rskafka::client::error::Error::ServerError { .. }
| rskafka::client::error::Error::RetryFailed(_) => StatusCode::Internal,
rskafka::client::error::Error::Timeout => StatusCode::StorageUnavailable,
_ => StatusCode::Internal,
}
}
impl ErrorExt for Error {
fn as_any(&self) -> &dyn Any {
self
}
}
pub type Result<T> = std::result::Result<T, Error>;
fn status_code(&self) -> StatusCode {
use Error::*;
match self {
TlsConfig { .. }
| InvalidProvider { .. }
| IllegalNamespace { .. }
| MissingKey { .. }
| MissingValue { .. }
| OverrideCompactedEntry { .. } => StatusCode::InvalidArguments,
StartWalTask { .. }
| StopWalTask { .. }
| IllegalState { .. }
| ResolveKafkaEndpoint { .. }
| NoMaxValue { .. }
| Cast { .. }
| EncodeJson { .. }
| DecodeJson { .. }
| IllegalSequence { .. }
| DiscontinuousLogIndex { .. }
| OrderedBatchProducerStopped { .. }
| WaitProduceResultReceiver { .. }
| WaitDumpIndex { .. }
| MetaLengthExceededLimit { .. } => StatusCode::Internal,
// Object store related errors
CreateWriter { .. } | WriteIndex { .. } | ReadIndex { .. } | Io { .. } => {
StatusCode::StorageUnavailable
}
// Raft engine
FetchEntry { .. } | RaftEngine { .. } | AddEntryLogBatch { .. } => {
StatusCode::StorageUnavailable
}
// Kafka producer related errors
ProduceRecord { error, .. } => match error {
rskafka::client::producer::Error::Client(error) => {
rskafka_client_error_to_status_code(error)
}
rskafka::client::producer::Error::Aggregator(_)
| rskafka::client::producer::Error::FlushError(_)
| rskafka::client::producer::Error::TooLarge => StatusCode::Internal,
},
BuildClient { error, .. }
| BuildPartitionClient { error, .. }
| BatchProduce { error, .. }
| GetOffset { error, .. }
| ConsumeRecord { error, .. } => rskafka_client_error_to_status_code(error),
}
}
}

View File

@@ -182,6 +182,7 @@ impl Default for MetasrvOptions {
// The etcd the maximum size of any request is 1.5 MiB
// 1500KiB = 1536KiB (1.5MiB) - 36KiB (reserved size of key)
max_metadata_value_size: Some(ReadableSize::kb(1500)),
max_running_procedures: 128,
},
failure_detector: PhiAccrualFailureDetectorOptions::default(),
datanode: DatanodeClientOptions::default(),

View File

@@ -430,6 +430,7 @@ fn build_procedure_manager(
let manager_config = ManagerConfig {
max_retry_times: options.procedure.max_retry_times,
retry_delay: options.procedure.retry_delay,
max_running_procedures: options.procedure.max_running_procedures,
..Default::default()
};
let state_store = KvStateStore::new(kv_backend.clone()).with_max_value_size(

View File

@@ -1047,11 +1047,10 @@ impl ErrorExt for Error {
use Error::*;
match self {
OpenDal { .. }
| ReadParquet { .. }
| WriteWal { .. }
| ReadWal { .. }
| DeleteWal { .. } => StatusCode::StorageUnavailable,
OpenDal { .. } | ReadParquet { .. } => StatusCode::StorageUnavailable,
WriteWal { source, .. } | ReadWal { source, .. } | DeleteWal { source, .. } => {
source.status_code()
}
CompressObject { .. }
| DecompressObject { .. }
| SerdeJson { .. }

View File

@@ -191,6 +191,7 @@ impl GreptimeDbClusterBuilder {
max_retry_times: 5,
retry_delay: Duration::from_secs(1),
max_metadata_value_size: None,
max_running_procedures: 128,
},
wal: self.metasrv_wal_config.clone(),
server_addr: "127.0.0.1:3002".to_string(),

View File

@@ -1034,6 +1034,7 @@ purge_interval = "1m"
[procedure]
max_retry_times = 3
retry_delay = "500ms"
max_running_procedures = 128
[flow]