diff --git a/config/config.md b/config/config.md
index 3aea01a581..cc787ae8f9 100644
--- a/config/config.md
+++ b/config/config.md
@@ -147,6 +147,7 @@
| `region_engine.mito.write_cache_ttl` | String | Unset | TTL for write cache. |
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
+| `region_engine.mito.max_concurrent_scan_files` | Integer | `128` | Maximum number of SST files to scan concurrently. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.
To align with the old behavior, the default value is 0 (no restrictions). |
| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. |
@@ -513,6 +514,7 @@
| `region_engine.mito.write_cache_ttl` | String | Unset | TTL for write cache. |
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
+| `region_engine.mito.max_concurrent_scan_files` | Integer | `128` | Maximum number of SST files to scan concurrently. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.
To align with the old behavior, the default value is 0 (no restrictions). |
| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. |
diff --git a/config/datanode.example.toml b/config/datanode.example.toml
index 0d865d4034..e70a0e24bb 100644
--- a/config/datanode.example.toml
+++ b/config/datanode.example.toml
@@ -474,6 +474,9 @@ sst_write_buffer_size = "8MB"
## Capacity of the channel to send data from parallel scan tasks to the main task.
parallel_scan_channel_size = 32
+## Maximum number of SST files to scan concurrently.
+max_concurrent_scan_files = 128
+
## Whether to allow stale WAL entries read during replay.
allow_stale_entries = false
diff --git a/config/standalone.example.toml b/config/standalone.example.toml
index 22913dbea2..a315ca75d2 100644
--- a/config/standalone.example.toml
+++ b/config/standalone.example.toml
@@ -565,6 +565,9 @@ sst_write_buffer_size = "8MB"
## Capacity of the channel to send data from parallel scan tasks to the main task.
parallel_scan_channel_size = 32
+## Maximum number of SST files to scan concurrently.
+max_concurrent_scan_files = 128
+
## Whether to allow stale WAL entries read during replay.
allow_stale_entries = false
diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs
index b9613292c8..2c527b5341 100644
--- a/src/mito2/src/config.rs
+++ b/src/mito2/src/config.rs
@@ -30,6 +30,8 @@ use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5);
/// Default channel size for parallel scan task.
pub(crate) const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32;
+/// Default maximum number of SST files to scan concurrently.
+pub(crate) const DEFAULT_MAX_CONCURRENT_SCAN_FILES: usize = 128;
// Use `1/GLOBAL_WRITE_BUFFER_SIZE_FACTOR` of OS memory as global write buffer size in default mode
const GLOBAL_WRITE_BUFFER_SIZE_FACTOR: u64 = 8;
@@ -107,6 +109,8 @@ pub struct MitoConfig {
pub sst_write_buffer_size: ReadableSize,
/// Capacity of the channel to send data from parallel scan tasks to the main task (default 32).
pub parallel_scan_channel_size: usize,
+ /// Maximum number of SST files to scan concurrently (default 128).
+ pub max_concurrent_scan_files: usize,
/// Whether to allow stale entries read during replay.
pub allow_stale_entries: bool,
@@ -152,6 +156,7 @@ impl Default for MitoConfig {
write_cache_ttl: None,
sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
+ max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
allow_stale_entries: false,
index: IndexConfig::default(),
inverted_index: InvertedIndexConfig::default(),
diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs
index edd7029b80..d9e9800eea 100644
--- a/src/mito2/src/engine.rs
+++ b/src/mito2/src/engine.rs
@@ -599,6 +599,7 @@ impl EngineInner {
CacheStrategy::EnableAll(cache_manager),
)
.with_parallel_scan_channel_size(self.config.parallel_scan_channel_size)
+ .with_max_concurrent_scan_files(self.config.max_concurrent_scan_files)
.with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
.with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
.with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled())
diff --git a/src/mito2/src/engine/scan_test.rs b/src/mito2/src/engine/scan_test.rs
index 81029ad13c..63e703a1c4 100644
--- a/src/mito2/src/engine/scan_test.rs
+++ b/src/mito2/src/engine/scan_test.rs
@@ -13,6 +13,8 @@
// limitations under the License.
use api::v1::Rows;
+use common_error::ext::ErrorExt;
+use common_error::status_code::StatusCode;
use common_recordbatch::RecordBatches;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use futures::TryStreamExt;
@@ -151,6 +153,58 @@ async fn test_scan_with_min_sst_sequence() {
.await;
}
+#[tokio::test]
+async fn test_max_concurrent_scan_files() {
+ let mut env = TestEnv::with_prefix("test_max_concurrent_scan_files").await;
+ let config = MitoConfig {
+ max_concurrent_scan_files: 2,
+ ..Default::default()
+ };
+ let engine = env.create_engine(config).await;
+
+ let region_id = RegionId::new(1, 1);
+ let request = CreateRequestBuilder::new().build();
+ let column_schemas = test_util::rows_schema(&request);
+
+ engine
+ .handle_request(region_id, RegionRequest::Create(request))
+ .await
+ .unwrap();
+
+ let put_and_flush = async |start, end| {
+ let rows = Rows {
+ schema: column_schemas.clone(),
+ rows: test_util::build_rows(start, end),
+ };
+ test_util::put_rows(&engine, region_id, rows).await;
+ test_util::flush_region(&engine, region_id, None).await;
+ };
+
+ // Write overlapping files.
+ put_and_flush(0, 4).await;
+ put_and_flush(3, 7).await;
+ put_and_flush(6, 9).await;
+
+ let request = ScanRequest::default();
+ let scanner = engine.scanner(region_id, request).await.unwrap();
+ let Scanner::Seq(scanner) = scanner else {
+ panic!("Scanner should be seq scan");
+ };
+ let error = scanner.check_scan_limit().unwrap_err();
+ assert_eq!(StatusCode::RateLimited, error.status_code());
+
+ let request = ScanRequest {
+ distribution: Some(TimeSeriesDistribution::PerSeries),
+ ..Default::default()
+ };
+ let scanner = engine.scanner(region_id, request).await.unwrap();
+ let Scanner::Series(scanner) = scanner else {
+ panic!("Scanner should be series scan");
+ };
+ let error = scanner.check_scan_limit().unwrap_err();
+ assert_eq!(StatusCode::RateLimited, error.status_code());
+}
+
#[tokio::test]
async fn test_series_scan() {
let mut env = TestEnv::with_prefix("test_series_scan").await;
diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs
index 4fa3e700f6..aa3d91ff4b 100644
--- a/src/mito2/src/error.rs
+++ b/src/mito2/src/error.rs
@@ -1029,6 +1029,18 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
+
+ #[snafu(display(
+ "Too many files to read concurrently: {}, max allowed: {}",
+ actual,
+ max
+ ))]
+ TooManyFilesToRead {
+ actual: usize,
+ max: usize,
+ #[snafu(implicit)]
+ location: Location,
+ },
}
pub type Result = std::result::Result;
@@ -1189,6 +1201,8 @@ impl ErrorExt for Error {
ScanExternalRange { source, .. } => source.status_code(),
InconsistentTimestampLength { .. } => StatusCode::InvalidArguments,
+
+ TooManyFilesToRead { .. } => StatusCode::RateLimited,
}
}
diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs
index 92815ef26d..f15b526028 100644
--- a/src/mito2/src/read/scan_region.rs
+++ b/src/mito2/src/read/scan_region.rs
@@ -39,7 +39,7 @@ use tokio_stream::wrappers::ReceiverStream;
use crate::access_layer::AccessLayerRef;
use crate::cache::CacheStrategy;
-use crate::config::DEFAULT_SCAN_CHANNEL_SIZE;
+use crate::config::{DEFAULT_MAX_CONCURRENT_SCAN_FILES, DEFAULT_SCAN_CHANNEL_SIZE};
use crate::error::Result;
#[cfg(feature = "enterprise")]
use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
@@ -199,6 +199,8 @@ pub(crate) struct ScanRegion {
cache_strategy: CacheStrategy,
/// Capacity of the channel to send data from parallel scan tasks to the main task.
parallel_scan_channel_size: usize,
+ /// Maximum number of SST files to scan concurrently.
+ max_concurrent_scan_files: usize,
/// Whether to ignore inverted index.
ignore_inverted_index: bool,
/// Whether to ignore fulltext index.
@@ -228,6 +230,7 @@ impl ScanRegion {
request,
cache_strategy,
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
+ max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
ignore_inverted_index: false,
ignore_fulltext_index: false,
ignore_bloom_filter: false,
@@ -248,6 +251,16 @@ impl ScanRegion {
self
}
+ /// Sets maximum number of SST files to scan concurrently.
+ #[must_use]
+ pub(crate) fn with_max_concurrent_scan_files(
+ mut self,
+ max_concurrent_scan_files: usize,
+ ) -> Self {
+ self.max_concurrent_scan_files = max_concurrent_scan_files;
+ self
+ }
+
/// Sets whether to ignore inverted index.
#[must_use]
pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
@@ -452,6 +465,7 @@ impl ScanRegion {
.with_bloom_filter_index_applier(bloom_filter_applier)
.with_fulltext_index_applier(fulltext_index_applier)
.with_parallel_scan_channel_size(self.parallel_scan_channel_size)
+ .with_max_concurrent_scan_files(self.max_concurrent_scan_files)
.with_start_time(self.start_time)
.with_append_mode(self.version.options.append_mode)
.with_filter_deleted(self.filter_deleted)
@@ -640,6 +654,8 @@ pub struct ScanInput {
ignore_file_not_found: bool,
/// Capacity of the channel to send data from parallel scan tasks to the main task.
pub(crate) parallel_scan_channel_size: usize,
+ /// Maximum number of SST files to scan concurrently.
+ pub(crate) max_concurrent_scan_files: usize,
/// Index appliers.
inverted_index_applier: Option,
bloom_filter_index_applier: Option,
@@ -674,6 +690,7 @@ impl ScanInput {
cache_strategy: CacheStrategy::Disabled,
ignore_file_not_found: false,
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
+ max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
inverted_index_applier: None,
bloom_filter_index_applier: None,
fulltext_index_applier: None,
@@ -740,6 +757,16 @@ impl ScanInput {
self
}
+ /// Sets maximum number of SST files to scan concurrently.
+ #[must_use]
+ pub(crate) fn with_max_concurrent_scan_files(
+ mut self,
+ max_concurrent_scan_files: usize,
+ ) -> Self {
+ self.max_concurrent_scan_files = max_concurrent_scan_files;
+ self
+ }
+
/// Sets invereted index applier.
#[must_use]
pub(crate) fn with_inverted_index_applier(
diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs
index d53dc1e8f2..a2ed0bda37 100644
--- a/src/mito2/src/read/seq_scan.rs
+++ b/src/mito2/src/read/seq_scan.rs
@@ -35,11 +35,11 @@ use store_api::region_engine::{
use store_api::storage::TimeSeriesRowSelector;
use tokio::sync::Semaphore;
-use crate::error::{PartitionOutOfRangeSnafu, Result};
+use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu};
use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
use crate::read::last_row::LastRowReader;
use crate::read::merge::MergeReaderBuilder;
-use crate::read::range::RangeBuilderList;
+use crate::read::range::{RangeBuilderList, RangeMeta};
use crate::read::scan_region::{ScanInput, StreamContext};
use crate::read::scan_util::{
scan_file_ranges, scan_mem_ranges, PartitionMetrics, PartitionMetricsList,
@@ -377,6 +377,40 @@ impl SeqScan {
metrics
}
+
+ /// Finds the maximum number of files to read in a single partition range.
+ fn max_files_in_partition(ranges: &[RangeMeta], partition_ranges: &[PartitionRange]) -> usize {
+ partition_ranges
+ .iter()
+ .map(|part_range| {
+ let range_meta = &ranges[part_range.identifier];
+ range_meta.indices.len()
+ })
+ .max()
+ .unwrap_or(0)
+ }
+
+ /// Checks resource limit for the scanner.
+ pub(crate) fn check_scan_limit(&self) -> Result<()> {
+ // Check max file count limit for all partitions since we scan them in parallel.
+ let total_max_files: usize = self
+ .properties
+ .partitions
+ .iter()
+ .map(|partition| Self::max_files_in_partition(&self.stream_ctx.ranges, partition))
+ .sum();
+
+ let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
+ if total_max_files > max_concurrent_files {
+ return TooManyFilesToReadSnafu {
+ actual: total_max_files,
+ max: max_concurrent_files,
+ }
+ .fail();
+ }
+
+ Ok(())
+ }
}
impl RegionScanner for SeqScan {
@@ -404,6 +438,9 @@ impl RegionScanner for SeqScan {
fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
self.properties.prepare(request);
+
+ self.check_scan_limit().map_err(BoxedError::new)?;
+
Ok(())
}
diff --git a/src/mito2/src/read/series_scan.rs b/src/mito2/src/read/series_scan.rs
index 1985481b60..2acb64fe8e 100644
--- a/src/mito2/src/read/series_scan.rs
+++ b/src/mito2/src/read/series_scan.rs
@@ -38,7 +38,7 @@ use tokio::sync::Semaphore;
use crate::error::{
Error, InvalidSenderSnafu, PartitionOutOfRangeSnafu, Result, ScanMultiTimesSnafu,
- ScanSeriesSnafu,
+ ScanSeriesSnafu, TooManyFilesToReadSnafu,
};
use crate::read::range::RangeBuilderList;
use crate::read::scan_region::{ScanInput, StreamContext};
@@ -241,6 +241,32 @@ impl SeriesScan {
Ok(Box::pin(futures::stream::iter(streams).flatten()))
}
+
+ /// Checks resource limit for the scanner.
+ pub(crate) fn check_scan_limit(&self) -> Result<()> {
+ // Sum the total number of files across all partitions
+ let total_files: usize = self
+ .properties
+ .partitions
+ .iter()
+ .flat_map(|partition| partition.iter())
+ .map(|part_range| {
+ let range_meta = &self.stream_ctx.ranges[part_range.identifier];
+ range_meta.indices.len()
+ })
+ .sum();
+
+ let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
+ if total_files > max_concurrent_files {
+ return TooManyFilesToReadSnafu {
+ actual: total_files,
+ max: max_concurrent_files,
+ }
+ .fail();
+ }
+
+ Ok(())
+ }
}
fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) {
@@ -278,6 +304,9 @@ impl RegionScanner for SeriesScan {
fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
self.properties.prepare(request);
+
+ self.check_scan_limit().map_err(BoxedError::new)?;
+
Ok(())
}
diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs
index a32d5e23b2..91e1e36612 100644
--- a/src/mito2/src/read/unordered_scan.rs
+++ b/src/mito2/src/read/unordered_scan.rs
@@ -298,6 +298,7 @@ impl RegionScanner for UnorderedScan {
fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
self.properties.prepare(request);
+ // UnorderedScan only scans one row group per partition so the resource requirement won't be too high.
Ok(())
}
diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs
index 4fafc35442..0eff0cfd02 100644
--- a/tests-integration/tests/http.rs
+++ b/tests-integration/tests/http.rs
@@ -1176,6 +1176,7 @@ write_cache_path = ""
write_cache_size = "5GiB"
sst_write_buffer_size = "8MiB"
parallel_scan_channel_size = 32
+max_concurrent_scan_files = 128
allow_stale_entries = false
min_compaction_interval = "0s"