diff --git a/config/config.md b/config/config.md
index 2953fdb007..7826b897a8 100644
--- a/config/config.md
+++ b/config/config.md
@@ -98,6 +98,7 @@
| `procedure` | -- | -- | Procedure storage options. |
| `procedure.max_retry_times` | Integer | `3` | Procedure max retry time. |
| `procedure.retry_delay` | String | `500ms` | Initial retry delay of procedures, increases exponentially |
+| `procedure.max_running_procedures` | Integer | `128` | Max running procedures.
The maximum number of procedures that can be running at the same time.
If the number of running procedures exceeds this limit, the procedure will be rejected. |
| `flow` | -- | -- | flow engine options. |
| `flow.num_workers` | Integer | `0` | The number of flow worker in flownode.
Not setting(or set to 0) this value will use the number of CPU cores divided by 2. |
| `storage` | -- | -- | The data storage options. |
@@ -328,6 +329,7 @@
| `procedure.max_retry_times` | Integer | `12` | Procedure max retry time. |
| `procedure.retry_delay` | String | `500ms` | Initial retry delay of procedures, increases exponentially |
| `procedure.max_metadata_value_size` | String | `1500KiB` | Auto split large value
GreptimeDB procedure uses etcd as the default metadata storage backend.
The etcd the maximum size of any request is 1.5 MiB
1500KiB = 1536KiB (1.5MiB) - 36KiB (reserved size of key)
Comments out the `max_metadata_value_size`, for don't split large value (no limit). |
+| `procedure.max_running_procedures` | Integer | `128` | Max running procedures.
The maximum number of procedures that can be running at the same time.
If the number of running procedures exceeds this limit, the procedure will be rejected. |
| `failure_detector` | -- | -- | -- |
| `failure_detector.threshold` | Float | `8.0` | The threshold value used by the failure detector to determine failure conditions. |
| `failure_detector.min_std_deviation` | String | `100ms` | The minimum standard deviation of the heartbeat intervals, used to calculate acceptable variations. |
diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml
index e2b86a98f3..6909726c73 100644
--- a/config/metasrv.example.toml
+++ b/config/metasrv.example.toml
@@ -79,6 +79,11 @@ retry_delay = "500ms"
## Comments out the `max_metadata_value_size`, for don't split large value (no limit).
max_metadata_value_size = "1500KiB"
+## Max running procedures.
+## The maximum number of procedures that can be running at the same time.
+## If the number of running procedures exceeds this limit, the procedure will be rejected.
+max_running_procedures = 128
+
# Failure detectors options.
[failure_detector]
diff --git a/config/standalone.example.toml b/config/standalone.example.toml
index 0d4285d9ec..ec1c73fc3e 100644
--- a/config/standalone.example.toml
+++ b/config/standalone.example.toml
@@ -302,6 +302,10 @@ purge_interval = "1m"
max_retry_times = 3
## Initial retry delay of procedures, increases exponentially
retry_delay = "500ms"
+## Max running procedures.
+## The maximum number of procedures that can be running at the same time.
+## If the number of running procedures exceeds this limit, the procedure will be rejected.
+max_running_procedures = 128
## flow engine options.
[flow]
diff --git a/src/common/procedure/src/error.rs b/src/common/procedure/src/error.rs
index 44b827b81c..c1efe25734 100644
--- a/src/common/procedure/src/error.rs
+++ b/src/common/procedure/src/error.rs
@@ -58,6 +58,13 @@ pub enum Error {
location: Location,
},
+ #[snafu(display("Too many running procedures, max: {}", max_running_procedures))]
+ TooManyRunningProcedures {
+ max_running_procedures: usize,
+ #[snafu(implicit)]
+ location: Location,
+ },
+
#[snafu(display("Failed to put state, key: '{key}'"))]
PutState {
key: String,
@@ -192,7 +199,8 @@ impl ErrorExt for Error {
| Error::FromJson { .. }
| Error::WaitWatcher { .. }
| Error::RetryLater { .. }
- | Error::RollbackProcedureRecovered { .. } => StatusCode::Internal,
+ | Error::RollbackProcedureRecovered { .. }
+ | Error::TooManyRunningProcedures { .. } => StatusCode::Internal,
Error::RetryTimesExceeded { .. }
| Error::RollbackTimesExceeded { .. }
diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs
index da456fb37a..9554b325c5 100644
--- a/src/common/procedure/src/local.rs
+++ b/src/common/procedure/src/local.rs
@@ -15,7 +15,8 @@
mod runner;
mod rwlock;
-use std::collections::{HashMap, VecDeque};
+use std::collections::hash_map::Entry;
+use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant};
@@ -33,6 +34,7 @@ use self::rwlock::KeyRwLock;
use crate::error::{
self, DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ManagerNotStartSnafu, Result,
StartRemoveOutdatedMetaTaskSnafu, StopRemoveOutdatedMetaTaskSnafu,
+ TooManyRunningProceduresSnafu,
};
use crate::local::runner::Runner;
use crate::procedure::{BoxedProcedureLoader, InitProcedureState, ProcedureInfo};
@@ -147,7 +149,6 @@ type ProcedureMetaRef = Arc;
/// Procedure loaded from store.
struct LoadedProcedure {
procedure: BoxedProcedure,
- parent_id: Option,
step: u32,
}
@@ -157,8 +158,7 @@ pub(crate) struct ManagerContext {
loaders: Mutex>,
key_lock: KeyRwLock,
procedures: RwLock>,
- /// Messages loaded from the procedure store.
- messages: Mutex>,
+ running_procedures: Mutex>,
/// Ids and finished time of finished procedures.
finished_procedures: Mutex>,
/// Running flag.
@@ -179,7 +179,7 @@ impl ManagerContext {
key_lock: KeyRwLock::new(),
loaders: Mutex::new(HashMap::new()),
procedures: RwLock::new(HashMap::new()),
- messages: Mutex::new(HashMap::new()),
+ running_procedures: Mutex::new(HashSet::new()),
finished_procedures: Mutex::new(VecDeque::new()),
running: Arc::new(AtomicBool::new(false)),
}
@@ -210,18 +210,27 @@ impl ManagerContext {
procedures.contains_key(&procedure_id)
}
+ /// Returns the number of running procedures.
+ fn num_running_procedures(&self) -> usize {
+ self.running_procedures.lock().unwrap().len()
+ }
+
/// Try to insert the `procedure` to the context if there is no procedure
/// with same [ProcedureId].
///
/// Returns `false` if there is already a procedure using the same [ProcedureId].
fn try_insert_procedure(&self, meta: ProcedureMetaRef) -> bool {
+ let procedure_id = meta.id;
let mut procedures = self.procedures.write().unwrap();
- if procedures.contains_key(&meta.id) {
- return false;
+ match procedures.entry(procedure_id) {
+ Entry::Occupied(_) => return false,
+ Entry::Vacant(vacant_entry) => {
+ vacant_entry.insert(meta);
+ }
}
- let old = procedures.insert(meta.id, meta);
- debug_assert!(old.is_none());
+ let mut running_procedures = self.running_procedures.lock().unwrap();
+ running_procedures.insert(procedure_id);
true
}
@@ -264,16 +273,6 @@ impl ManagerContext {
}
}
- /// Load procedure with specific `procedure_id` from cached [ProcedureMessage]s.
- fn load_one_procedure(&self, procedure_id: ProcedureId) -> Option {
- let message = {
- let messages = self.messages.lock().unwrap();
- messages.get(&procedure_id).cloned()?
- };
-
- self.load_one_procedure_from_message(procedure_id, &message)
- }
-
/// Load procedure from specific [ProcedureMessage].
fn load_one_procedure_from_message(
&self,
@@ -301,7 +300,6 @@ impl ManagerContext {
Some(LoadedProcedure {
procedure,
- parent_id: message.parent_id,
step: message.step,
})
}
@@ -350,23 +348,19 @@ impl ManagerContext {
}
}
- /// Remove cached [ProcedureMessage] by ids.
- fn remove_messages(&self, procedure_ids: &[ProcedureId]) {
- let mut messages = self.messages.lock().unwrap();
- for procedure_id in procedure_ids {
- let _ = messages.remove(procedure_id);
- }
- }
-
/// Clean resources of finished procedures.
fn on_procedures_finish(&self, procedure_ids: &[ProcedureId]) {
- self.remove_messages(procedure_ids);
-
// Since users need to query the procedure state, so we can't remove the
// meta of the procedure directly.
let now = Instant::now();
let mut finished_procedures = self.finished_procedures.lock().unwrap();
finished_procedures.extend(procedure_ids.iter().map(|id| (*id, now)));
+
+ // Remove the procedures from the running set.
+ let mut running_procedures = self.running_procedures.lock().unwrap();
+ for procedure_id in procedure_ids {
+ running_procedures.remove(procedure_id);
+ }
}
/// Remove metadata of outdated procedures.
@@ -410,6 +404,7 @@ pub struct ManagerConfig {
pub retry_delay: Duration,
pub remove_outdated_meta_task_interval: Duration,
pub remove_outdated_meta_ttl: Duration,
+ pub max_running_procedures: usize,
}
impl Default for ManagerConfig {
@@ -420,6 +415,7 @@ impl Default for ManagerConfig {
retry_delay: Duration::from_millis(500),
remove_outdated_meta_task_interval: Duration::from_secs(60 * 10),
remove_outdated_meta_ttl: META_TTL,
+ max_running_procedures: 128,
}
}
}
@@ -492,6 +488,13 @@ impl LocalManager {
let watcher = meta.state_receiver.clone();
+ ensure!(
+ self.manager_ctx.num_running_procedures() < self.config.max_running_procedures,
+ TooManyRunningProceduresSnafu {
+ max_running_procedures: self.config.max_running_procedures,
+ }
+ );
+
// Inserts meta into the manager before actually spawnd the runner.
ensure!(
self.manager_ctx.try_insert_procedure(meta),
@@ -1119,6 +1122,7 @@ mod tests {
retry_delay: Duration::from_millis(500),
remove_outdated_meta_task_interval: Duration::from_millis(1),
remove_outdated_meta_ttl: Duration::from_millis(1),
+ max_running_procedures: 128,
};
let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
let manager = LocalManager::new(config, state_store);
@@ -1191,6 +1195,69 @@ mod tests {
.is_none());
}
+ #[tokio::test]
+ async fn test_too_many_running_procedures() {
+ let dir = create_temp_dir("too_many_running_procedures");
+ let config = ManagerConfig {
+ parent_path: "data/".to_string(),
+ max_retry_times: 3,
+ retry_delay: Duration::from_millis(500),
+ max_running_procedures: 1,
+ ..Default::default()
+ };
+ let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
+ let manager = LocalManager::new(config, state_store);
+ manager.manager_ctx.set_running();
+
+ manager
+ .manager_ctx
+ .running_procedures
+ .lock()
+ .unwrap()
+ .insert(ProcedureId::random());
+ manager.start().await.unwrap();
+
+ // Submit a new procedure should fail.
+ let mut procedure = ProcedureToLoad::new("submit");
+ procedure.lock_key = LockKey::single_exclusive("test.submit");
+ let procedure_id = ProcedureId::random();
+ let err = manager
+ .submit(ProcedureWithId {
+ id: procedure_id,
+ procedure: Box::new(procedure),
+ })
+ .await
+ .unwrap_err();
+ assert!(matches!(err, Error::TooManyRunningProcedures { .. }));
+
+ manager
+ .manager_ctx
+ .running_procedures
+ .lock()
+ .unwrap()
+ .clear();
+
+ // Submit a new procedure should succeed.
+ let mut procedure = ProcedureToLoad::new("submit");
+ procedure.lock_key = LockKey::single_exclusive("test.submit");
+ assert!(manager
+ .submit(ProcedureWithId {
+ id: procedure_id,
+ procedure: Box::new(procedure),
+ })
+ .await
+ .is_ok());
+ assert!(manager
+ .procedure_state(procedure_id)
+ .await
+ .unwrap()
+ .is_some());
+ // Wait for the procedure done.
+ let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
+ watcher.changed().await.unwrap();
+ assert!(watcher.borrow().is_done());
+ }
+
#[derive(Debug)]
struct ProcedureToRecover {
content: String,
diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs
index 85cfe2b625..a7972d8860 100644
--- a/src/common/procedure/src/local/runner.rs
+++ b/src/common/procedure/src/local/runner.rs
@@ -348,24 +348,14 @@ impl Runner {
&self,
procedure_id: ProcedureId,
procedure_state: ProcedureState,
- mut procedure: BoxedProcedure,
+ procedure: BoxedProcedure,
) {
if self.manager_ctx.contains_procedure(procedure_id) {
// If the parent has already submitted this procedure, don't submit it again.
return;
}
- let mut step = 0;
- if let Some(loaded_procedure) = self.manager_ctx.load_one_procedure(procedure_id) {
- // Try to load procedure state from the message to avoid re-run the subprocedure
- // from initial state.
- assert_eq!(self.meta.id, loaded_procedure.parent_id.unwrap());
-
- // Use the dumped procedure from the procedure store.
- procedure = loaded_procedure.procedure;
- // Update step number.
- step = loaded_procedure.step;
- }
+ let step = 0;
let meta = Arc::new(ProcedureMeta::new(
procedure_id,
diff --git a/src/common/procedure/src/options.rs b/src/common/procedure/src/options.rs
index fe54cff1da..9b934ef0e2 100644
--- a/src/common/procedure/src/options.rs
+++ b/src/common/procedure/src/options.rs
@@ -29,6 +29,8 @@ pub struct ProcedureConfig {
pub retry_delay: Duration,
/// `None` stands for no limit.
pub max_metadata_value_size: Option,
+ /// Max running procedures.
+ pub max_running_procedures: usize,
}
impl Default for ProcedureConfig {
@@ -37,6 +39,7 @@ impl Default for ProcedureConfig {
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
max_metadata_value_size: None,
+ max_running_procedures: 128,
}
}
}
diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs
index e2022ed08a..9153520a44 100644
--- a/src/frontend/src/instance.rs
+++ b/src/frontend/src/instance.rs
@@ -118,6 +118,7 @@ impl Instance {
let manager_config = ManagerConfig {
max_retry_times: procedure_config.max_retry_times,
retry_delay: procedure_config.retry_delay,
+ max_running_procedures: procedure_config.max_running_procedures,
..Default::default()
};
let procedure_manager = Arc::new(LocalManager::new(manager_config, state_store));
diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs
index 5f9a5fc4e3..c8b9469542 100644
--- a/src/log-store/src/error.rs
+++ b/src/log-store/src/error.rs
@@ -15,6 +15,7 @@
use std::any::Any;
use common_error::ext::ErrorExt;
+use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_runtime::error::Error as RuntimeError;
use serde_json::error::Error as JsonError;
@@ -310,10 +311,72 @@ pub enum Error {
},
}
+pub type Result = std::result::Result;
+
+fn rskafka_client_error_to_status_code(error: &rskafka::client::error::Error) -> StatusCode {
+ match error {
+ rskafka::client::error::Error::Connection(_)
+ | rskafka::client::error::Error::Request(_)
+ | rskafka::client::error::Error::InvalidResponse(_)
+ | rskafka::client::error::Error::ServerError { .. }
+ | rskafka::client::error::Error::RetryFailed(_) => StatusCode::Internal,
+ rskafka::client::error::Error::Timeout => StatusCode::StorageUnavailable,
+ _ => StatusCode::Internal,
+ }
+}
+
impl ErrorExt for Error {
fn as_any(&self) -> &dyn Any {
self
}
-}
-pub type Result = std::result::Result;
+ fn status_code(&self) -> StatusCode {
+ use Error::*;
+
+ match self {
+ TlsConfig { .. }
+ | InvalidProvider { .. }
+ | IllegalNamespace { .. }
+ | MissingKey { .. }
+ | MissingValue { .. }
+ | OverrideCompactedEntry { .. } => StatusCode::InvalidArguments,
+ StartWalTask { .. }
+ | StopWalTask { .. }
+ | IllegalState { .. }
+ | ResolveKafkaEndpoint { .. }
+ | NoMaxValue { .. }
+ | Cast { .. }
+ | EncodeJson { .. }
+ | DecodeJson { .. }
+ | IllegalSequence { .. }
+ | DiscontinuousLogIndex { .. }
+ | OrderedBatchProducerStopped { .. }
+ | WaitProduceResultReceiver { .. }
+ | WaitDumpIndex { .. }
+ | MetaLengthExceededLimit { .. } => StatusCode::Internal,
+
+ // Object store related errors
+ CreateWriter { .. } | WriteIndex { .. } | ReadIndex { .. } | Io { .. } => {
+ StatusCode::StorageUnavailable
+ }
+ // Raft engine
+ FetchEntry { .. } | RaftEngine { .. } | AddEntryLogBatch { .. } => {
+ StatusCode::StorageUnavailable
+ }
+ // Kafka producer related errors
+ ProduceRecord { error, .. } => match error {
+ rskafka::client::producer::Error::Client(error) => {
+ rskafka_client_error_to_status_code(error)
+ }
+ rskafka::client::producer::Error::Aggregator(_)
+ | rskafka::client::producer::Error::FlushError(_)
+ | rskafka::client::producer::Error::TooLarge => StatusCode::Internal,
+ },
+ BuildClient { error, .. }
+ | BuildPartitionClient { error, .. }
+ | BatchProduce { error, .. }
+ | GetOffset { error, .. }
+ | ConsumeRecord { error, .. } => rskafka_client_error_to_status_code(error),
+ }
+ }
+}
diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs
index ca9f3ad87d..115f1e5365 100644
--- a/src/meta-srv/src/metasrv.rs
+++ b/src/meta-srv/src/metasrv.rs
@@ -182,6 +182,7 @@ impl Default for MetasrvOptions {
// The etcd the maximum size of any request is 1.5 MiB
// 1500KiB = 1536KiB (1.5MiB) - 36KiB (reserved size of key)
max_metadata_value_size: Some(ReadableSize::kb(1500)),
+ max_running_procedures: 128,
},
failure_detector: PhiAccrualFailureDetectorOptions::default(),
datanode: DatanodeClientOptions::default(),
diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs
index e7a9edb563..14fcc2ffc3 100644
--- a/src/meta-srv/src/metasrv/builder.rs
+++ b/src/meta-srv/src/metasrv/builder.rs
@@ -430,6 +430,7 @@ fn build_procedure_manager(
let manager_config = ManagerConfig {
max_retry_times: options.procedure.max_retry_times,
retry_delay: options.procedure.retry_delay,
+ max_running_procedures: options.procedure.max_running_procedures,
..Default::default()
};
let state_store = KvStateStore::new(kv_backend.clone()).with_max_value_size(
diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs
index 7bcb58e45c..e89c318020 100644
--- a/src/mito2/src/error.rs
+++ b/src/mito2/src/error.rs
@@ -1047,11 +1047,10 @@ impl ErrorExt for Error {
use Error::*;
match self {
- OpenDal { .. }
- | ReadParquet { .. }
- | WriteWal { .. }
- | ReadWal { .. }
- | DeleteWal { .. } => StatusCode::StorageUnavailable,
+ OpenDal { .. } | ReadParquet { .. } => StatusCode::StorageUnavailable,
+ WriteWal { source, .. } | ReadWal { source, .. } | DeleteWal { source, .. } => {
+ source.status_code()
+ }
CompressObject { .. }
| DecompressObject { .. }
| SerdeJson { .. }
diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs
index a3a5c1cb50..3755f68665 100644
--- a/tests-integration/src/cluster.rs
+++ b/tests-integration/src/cluster.rs
@@ -191,6 +191,7 @@ impl GreptimeDbClusterBuilder {
max_retry_times: 5,
retry_delay: Duration::from_secs(1),
max_metadata_value_size: None,
+ max_running_procedures: 128,
},
wal: self.metasrv_wal_config.clone(),
server_addr: "127.0.0.1:3002".to_string(),
diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs
index 34d3b48083..cea00cf978 100644
--- a/tests-integration/tests/http.rs
+++ b/tests-integration/tests/http.rs
@@ -1034,6 +1034,7 @@ purge_interval = "1m"
[procedure]
max_retry_times = 3
retry_delay = "500ms"
+max_running_procedures = 128
[flow]