feat: Add option to limit the files reading simultaneously (#6635)

* feat: limits the max number of files to scan at the same time

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: make max_concurrent_scan_files configurable

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: reduce concurrent scan files to 128

Signed-off-by: evenyag <realevenyag@gmail.com>

* docs: update config example

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: add test for max_concurrent_scan_files

Signed-off-by: evenyag <realevenyag@gmail.com>

* style: fix clippy

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: update config test

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2025-08-04 15:18:58 +08:00
parent 5d330fad17
commit b62f219810
12 changed files with 181 additions and 4 deletions

View File

@@ -147,6 +147,7 @@
| `region_engine.mito.write_cache_ttl` | String | Unset | TTL for write cache. |
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.max_concurrent_scan_files` | Integer | `128` | Maximum number of SST files to scan concurrently. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.<br/>To align with the old behavior, the default value is 0 (no restrictions). |
| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. |
@@ -496,6 +497,7 @@
| `region_engine.mito.write_cache_ttl` | String | Unset | TTL for write cache. |
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.max_concurrent_scan_files` | Integer | `128` | Maximum number of SST files to scan concurrently. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.<br/>To align with the old behavior, the default value is 0 (no restrictions). |
| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. |

View File

@@ -474,6 +474,9 @@ sst_write_buffer_size = "8MB"
## Capacity of the channel to send data from parallel scan tasks to the main task.
parallel_scan_channel_size = 32
## Maximum number of SST files to scan concurrently.
max_concurrent_scan_files = 128
## Whether to allow stale WAL entries read during replay.
allow_stale_entries = false

View File

@@ -565,6 +565,9 @@ sst_write_buffer_size = "8MB"
## Capacity of the channel to send data from parallel scan tasks to the main task.
parallel_scan_channel_size = 32
## Maximum number of SST files to scan concurrently.
max_concurrent_scan_files = 128
## Whether to allow stale WAL entries read during replay.
allow_stale_entries = false

View File

@@ -30,6 +30,8 @@ use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5);
/// Default channel size for parallel scan task.
pub(crate) const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32;
/// Default maximum number of SST files to scan concurrently.
pub(crate) const DEFAULT_MAX_CONCURRENT_SCAN_FILES: usize = 128;
// Use `1/GLOBAL_WRITE_BUFFER_SIZE_FACTOR` of OS memory as global write buffer size in default mode
const GLOBAL_WRITE_BUFFER_SIZE_FACTOR: u64 = 8;
@@ -107,6 +109,8 @@ pub struct MitoConfig {
pub sst_write_buffer_size: ReadableSize,
/// Capacity of the channel to send data from parallel scan tasks to the main task (default 32).
pub parallel_scan_channel_size: usize,
/// Maximum number of SST files to scan concurrently (default 128).
pub max_concurrent_scan_files: usize,
/// Whether to allow stale entries read during replay.
pub allow_stale_entries: bool,
@@ -152,6 +156,7 @@ impl Default for MitoConfig {
write_cache_ttl: None,
sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
allow_stale_entries: false,
index: IndexConfig::default(),
inverted_index: InvertedIndexConfig::default(),

View File

@@ -506,6 +506,7 @@ impl EngineInner {
CacheStrategy::EnableAll(cache_manager),
)
.with_parallel_scan_channel_size(self.config.parallel_scan_channel_size)
.with_max_concurrent_scan_files(self.config.max_concurrent_scan_files)
.with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
.with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
.with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled())

View File

@@ -13,6 +13,8 @@
// limitations under the License.
use api::v1::Rows;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_recordbatch::RecordBatches;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use futures::TryStreamExt;
@@ -151,6 +153,58 @@ async fn test_scan_with_min_sst_sequence() {
.await;
}
#[tokio::test]
async fn test_max_concurrent_scan_files() {
let mut env = TestEnv::with_prefix("test_max_concurrent_scan_files").await;
let config = MitoConfig {
max_concurrent_scan_files: 2,
..Default::default()
};
let engine = env.create_engine(config).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let column_schemas = test_util::rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let put_and_flush = async |start, end| {
let rows = Rows {
schema: column_schemas.clone(),
rows: test_util::build_rows(start, end),
};
test_util::put_rows(&engine, region_id, rows).await;
test_util::flush_region(&engine, region_id, None).await;
};
// Write overlapping files.
put_and_flush(0, 4).await;
put_and_flush(3, 7).await;
put_and_flush(6, 9).await;
let request = ScanRequest::default();
let scanner = engine.scanner(region_id, request).await.unwrap();
let Scanner::Seq(scanner) = scanner else {
panic!("Scanner should be seq scan");
};
let error = scanner.check_scan_limit().unwrap_err();
assert_eq!(StatusCode::RateLimited, error.status_code());
let request = ScanRequest {
distribution: Some(TimeSeriesDistribution::PerSeries),
..Default::default()
};
let scanner = engine.scanner(region_id, request).await.unwrap();
let Scanner::Series(scanner) = scanner else {
panic!("Scanner should be series scan");
};
let error = scanner.check_scan_limit().unwrap_err();
assert_eq!(StatusCode::RateLimited, error.status_code());
}
#[tokio::test]
async fn test_series_scan() {
let mut env = TestEnv::with_prefix("test_series_scan").await;

View File

@@ -1032,6 +1032,18 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Too many files to read concurrently: {}, max allowed: {}",
actual,
max
))]
TooManyFilesToRead {
actual: usize,
max: usize,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -1189,6 +1201,8 @@ impl ErrorExt for Error {
Encode { source, .. } | Decode { source, .. } => source.status_code(),
InconsistentTimestampLength { .. } => StatusCode::InvalidArguments,
TooManyFilesToRead { .. } => StatusCode::RateLimited,
}
}

View File

@@ -39,7 +39,7 @@ use tokio_stream::wrappers::ReceiverStream;
use crate::access_layer::AccessLayerRef;
use crate::cache::CacheStrategy;
use crate::config::DEFAULT_SCAN_CHANNEL_SIZE;
use crate::config::{DEFAULT_MAX_CONCURRENT_SCAN_FILES, DEFAULT_SCAN_CHANNEL_SIZE};
use crate::error::Result;
use crate::memtable::MemtableRange;
use crate::metrics::READ_SST_COUNT;
@@ -187,6 +187,8 @@ pub(crate) struct ScanRegion {
cache_strategy: CacheStrategy,
/// Capacity of the channel to send data from parallel scan tasks to the main task.
parallel_scan_channel_size: usize,
/// Maximum number of SST files to scan concurrently.
max_concurrent_scan_files: usize,
/// Whether to ignore inverted index.
ignore_inverted_index: bool,
/// Whether to ignore fulltext index.
@@ -214,6 +216,7 @@ impl ScanRegion {
request,
cache_strategy,
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
ignore_inverted_index: false,
ignore_fulltext_index: false,
ignore_bloom_filter: false,
@@ -232,6 +235,16 @@ impl ScanRegion {
self
}
/// Sets maximum number of SST files to scan concurrently.
#[must_use]
pub(crate) fn with_max_concurrent_scan_files(
mut self,
max_concurrent_scan_files: usize,
) -> Self {
self.max_concurrent_scan_files = max_concurrent_scan_files;
self
}
/// Sets whether to ignore inverted index.
#[must_use]
pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
@@ -421,6 +434,7 @@ impl ScanRegion {
.with_bloom_filter_index_applier(bloom_filter_applier)
.with_fulltext_index_applier(fulltext_index_applier)
.with_parallel_scan_channel_size(self.parallel_scan_channel_size)
.with_max_concurrent_scan_files(self.max_concurrent_scan_files)
.with_start_time(self.start_time)
.with_append_mode(self.version.options.append_mode)
.with_filter_deleted(self.filter_deleted)
@@ -597,6 +611,8 @@ pub struct ScanInput {
ignore_file_not_found: bool,
/// Capacity of the channel to send data from parallel scan tasks to the main task.
pub(crate) parallel_scan_channel_size: usize,
/// Maximum number of SST files to scan concurrently.
pub(crate) max_concurrent_scan_files: usize,
/// Index appliers.
inverted_index_applier: Option<InvertedIndexApplierRef>,
bloom_filter_index_applier: Option<BloomFilterIndexApplierRef>,
@@ -629,6 +645,7 @@ impl ScanInput {
cache_strategy: CacheStrategy::Disabled,
ignore_file_not_found: false,
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
inverted_index_applier: None,
bloom_filter_index_applier: None,
fulltext_index_applier: None,
@@ -693,6 +710,16 @@ impl ScanInput {
self
}
/// Sets maximum number of SST files to scan concurrently.
#[must_use]
pub(crate) fn with_max_concurrent_scan_files(
mut self,
max_concurrent_scan_files: usize,
) -> Self {
self.max_concurrent_scan_files = max_concurrent_scan_files;
self
}
/// Sets invereted index applier.
#[must_use]
pub(crate) fn with_inverted_index_applier(

View File

@@ -33,11 +33,11 @@ use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScanner, Sc
use store_api::storage::TimeSeriesRowSelector;
use tokio::sync::Semaphore;
use crate::error::{PartitionOutOfRangeSnafu, Result};
use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu};
use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
use crate::read::last_row::LastRowReader;
use crate::read::merge::MergeReaderBuilder;
use crate::read::range::RangeBuilderList;
use crate::read::range::{RangeBuilderList, RangeMeta};
use crate::read::scan_region::{ScanInput, StreamContext};
use crate::read::scan_util::{
scan_file_ranges, scan_mem_ranges, PartitionMetrics, PartitionMetricsList,
@@ -347,6 +347,40 @@ impl SeqScan {
metrics
}
/// Finds the maximum number of files to read in a single partition range.
fn max_files_in_partition(ranges: &[RangeMeta], partition_ranges: &[PartitionRange]) -> usize {
partition_ranges
.iter()
.map(|part_range| {
let range_meta = &ranges[part_range.identifier];
range_meta.indices.len()
})
.max()
.unwrap_or(0)
}
/// Checks resource limit for the scanner.
pub(crate) fn check_scan_limit(&self) -> Result<()> {
// Check max file count limit for all partitions since we scan them in parallel.
let total_max_files: usize = self
.properties
.partitions
.iter()
.map(|partition| Self::max_files_in_partition(&self.stream_ctx.ranges, partition))
.sum();
let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
if total_max_files > max_concurrent_files {
return TooManyFilesToReadSnafu {
actual: total_max_files,
max: max_concurrent_files,
}
.fail();
}
Ok(())
}
}
impl RegionScanner for SeqScan {
@@ -372,6 +406,9 @@ impl RegionScanner for SeqScan {
fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
self.properties.prepare(request);
self.check_scan_limit().map_err(BoxedError::new)?;
Ok(())
}

View File

@@ -37,7 +37,7 @@ use tokio::sync::Semaphore;
use crate::error::{
ComputeArrowSnafu, Error, InvalidSenderSnafu, PartitionOutOfRangeSnafu, Result,
ScanMultiTimesSnafu, ScanSeriesSnafu,
ScanMultiTimesSnafu, ScanSeriesSnafu, TooManyFilesToReadSnafu,
};
use crate::read::range::RangeBuilderList;
use crate::read::scan_region::{ScanInput, StreamContext};
@@ -201,6 +201,32 @@ impl SeriesScan {
let chained_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
Ok(Box::pin(chained_stream))
}
/// Checks resource limit for the scanner.
pub(crate) fn check_scan_limit(&self) -> Result<()> {
// Sum the total number of files across all partitions
let total_files: usize = self
.properties
.partitions
.iter()
.flat_map(|partition| partition.iter())
.map(|part_range| {
let range_meta = &self.stream_ctx.ranges[part_range.identifier];
range_meta.indices.len()
})
.sum();
let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
if total_files > max_concurrent_files {
return TooManyFilesToReadSnafu {
actual: total_files,
max: max_concurrent_files,
}
.fail();
}
Ok(())
}
}
fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) {
@@ -236,6 +262,9 @@ impl RegionScanner for SeriesScan {
fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
self.properties.prepare(request);
self.check_scan_limit().map_err(BoxedError::new)?;
Ok(())
}

View File

@@ -242,6 +242,7 @@ impl RegionScanner for UnorderedScan {
fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
self.properties.prepare(request);
// UnorderedScan only scans one row group per partition so the resource requirement won't be too high.
Ok(())
}

View File

@@ -1142,6 +1142,7 @@ write_cache_path = ""
write_cache_size = "5GiB"
sst_write_buffer_size = "8MiB"
parallel_scan_channel_size = 32
max_concurrent_scan_files = 128
allow_stale_entries = false
min_compaction_interval = "0s"