feat: auto config cache size according to memory size (#3165)

* feat: auto config cache and buffer size according to mem size

* feat: utils

* refactor: add util function to common config

* refactor: check cgroups

* refactor: code

* fix: test

* fix: test

* chore: cr comment

Co-authored-by: Yingwen <realevenyag@gmail.com>
Co-authored-by: Dennis Zhuang <killme2008@gmail.com>

* chore: remove default comment

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
Co-authored-by: Dennis Zhuang <killme2008@gmail.com>
This commit is contained in:
Wei
2024-01-17 22:35:35 +08:00
committed by GitHub
parent 3cfd60e139
commit 3d7d2fdb4a
10 changed files with 148 additions and 20 deletions

51
Cargo.lock generated
View File

@@ -1631,10 +1631,12 @@ version = "0.6.0"
dependencies = [
"common-base",
"humantime-serde",
"num_cpus",
"rskafka",
"serde",
"serde_json",
"serde_with",
"sysinfo",
"toml 0.8.8",
]
@@ -3966,7 +3968,7 @@ dependencies = [
"iana-time-zone-haiku",
"js-sys",
"wasm-bindgen",
"windows-core",
"windows-core 0.51.1",
]
[[package]]
@@ -4998,7 +5000,6 @@ dependencies = [
"log-store",
"memcomparable",
"moka",
"num_cpus",
"object-store",
"parquet",
"paste",
@@ -5288,6 +5289,15 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be"
[[package]]
name = "ntapi"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4"
dependencies = [
"winapi",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
@@ -8028,7 +8038,7 @@ dependencies = [
"which",
"widestring",
"winapi",
"windows",
"windows 0.39.0",
"winreg 0.10.1",
]
@@ -9353,6 +9363,21 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160"
[[package]]
name = "sysinfo"
version = "0.30.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fb4f3438c8f6389c864e61221cbc97e9bca98b4daf39a5beb7bea660f528bb2"
dependencies = [
"cfg-if 1.0.0",
"core-foundation-sys",
"libc",
"ntapi",
"once_cell",
"rayon",
"windows 0.52.0",
]
[[package]]
name = "system-configuration"
version = "0.5.1"
@@ -9530,7 +9555,6 @@ dependencies = [
"meta-client",
"meta-srv",
"mysql_async",
"num_cpus",
"object-store",
"once_cell",
"opentelemetry-proto 0.3.0",
@@ -10930,6 +10954,16 @@ dependencies = [
"windows_x86_64_msvc 0.39.0",
]
[[package]]
name = "windows"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be"
dependencies = [
"windows-core 0.52.0",
"windows-targets 0.52.0",
]
[[package]]
name = "windows-core"
version = "0.51.1"
@@ -10939,6 +10973,15 @@ dependencies = [
"windows-targets 0.48.5",
]
[[package]]
name = "windows-core"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9"
dependencies = [
"windows-targets 0.52.0",
]
[[package]]
name = "windows-sys"
version = "0.45.0"

View File

@@ -99,6 +99,7 @@ lazy_static = "1.4"
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "abbd357c1e193cd270ea65ee7652334a150b628f" }
mockall = "0.11.4"
moka = "0.12"
num_cpus = "1.16"
once_cell = "1.18"
opentelemetry-proto = { git = "https://github.com/waynexia/opentelemetry-rust.git", rev = "33841b38dda79b15f2024952be5f32533325ca02", features = [
"gen-tonic",
@@ -125,6 +126,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
smallvec = { version = "1", features = ["serde"] }
snafu = "0.7"
sysinfo = "0.30"
# on branch v0.38.x
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "6a93567ae38d42be5c8d08b13c8ff4dde26502ef", features = [
"visitor",

View File

@@ -33,7 +33,7 @@ pub const GIB: u64 = MIB * BINARY_DATA_MAGNITUDE;
pub const TIB: u64 = GIB * BINARY_DATA_MAGNITUDE;
pub const PIB: u64 = TIB * BINARY_DATA_MAGNITUDE;
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd)]
#[derive(Clone, Copy, PartialEq, Eq, Ord, PartialOrd)]
pub struct ReadableSize(pub u64);
impl ReadableSize {

View File

@@ -7,8 +7,10 @@ license.workspace = true
[dependencies]
common-base.workspace = true
humantime-serde.workspace = true
num_cpus.workspace = true
rskafka.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_with = "3"
sysinfo.workspace = true
toml.workspace = true

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod utils;
pub mod wal;
use common_base::readable_size::ReadableSize;

View File

@@ -0,0 +1,55 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_base::readable_size::ReadableSize;
use sysinfo::System;
/// Get the CPU core number of system, aware of cgroups.
pub fn get_cpus() -> usize {
// This function will check cgroups
num_cpus::get()
}
/// Get the total memory of the system.
/// If `cgroup_limits` is enabled, it will also check it.
pub fn get_sys_total_memory() -> Option<ReadableSize> {
if sysinfo::IS_SUPPORTED_SYSTEM {
let mut sys_info = System::new();
sys_info.refresh_memory();
let mut total_memory = sys_info.total_memory();
// Compare with cgroups memory limit, use smaller values
// This method is only implemented for Linux. It always returns None for all other systems.
if let Some(cgroup_limits) = sys_info.cgroup_limits() {
total_memory = total_memory.min(cgroup_limits.total_memory)
}
Some(ReadableSize(total_memory))
} else {
None
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_get_cpus() {
assert!(get_cpus() > 0);
}
#[test]
fn test_get_sys_total_memory() {
assert!(get_sys_total_memory().unwrap() > ReadableSize::mb(0));
}
}

View File

@@ -44,7 +44,6 @@ lazy_static = "1.4"
log-store = { workspace = true, optional = true }
memcomparable = "0.2"
moka = { workspace = true, features = ["sync", "future"] }
num_cpus = "1.13"
object-store.workspace = true
parquet = { workspace = true, features = ["async"] }
paste.workspace = true

View File

@@ -14,6 +14,7 @@
//! Configurations.
use std::cmp;
use std::time::Duration;
use common_base::readable_size::ReadableSize;
@@ -23,6 +24,7 @@ use serde::{Deserialize, Serialize};
use serde_with::{serde_as, NoneAsEmptyString};
use crate::error::Result;
use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
/// Default max running background job.
const DEFAULT_MAX_BG_JOB: usize = 4;
@@ -59,17 +61,17 @@ pub struct MitoConfig {
/// Interval to auto flush a region if it has not flushed yet (default 30 min).
#[serde(with = "humantime_serde")]
pub auto_flush_interval: Duration,
/// Global write buffer size threshold to trigger flush (default 1G).
/// Global write buffer size threshold to trigger flush.
pub global_write_buffer_size: ReadableSize,
/// Global write buffer size threshold to reject write requests (default 2G).
/// Global write buffer size threshold to reject write requests.
pub global_write_buffer_reject_size: ReadableSize,
// Cache configs:
/// Cache size for SST metadata (default 128MB). Setting it to 0 to disable the cache.
/// Cache size for SST metadata. Setting it to 0 to disable the cache.
pub sst_meta_cache_size: ReadableSize,
/// Cache size for vectors and arrow arrays (default 512MB). Setting it to 0 to disable the cache.
/// Cache size for vectors and arrow arrays. Setting it to 0 to disable the cache.
pub vector_cache_size: ReadableSize,
/// Cache size for pages of SST row groups (default 512MB). Setting it to 0 to disable the cache.
/// Cache size for pages of SST row groups. Setting it to 0 to disable the cache.
pub page_cache_size: ReadableSize,
/// Whether to enable the experimental write cache.
pub enable_experimental_write_cache: bool,
@@ -97,7 +99,7 @@ pub struct MitoConfig {
impl Default for MitoConfig {
fn default() -> Self {
MitoConfig {
let mut mito_config = MitoConfig {
num_workers: divide_num_cpus(2),
worker_channel_size: 128,
worker_request_batch_size: 64,
@@ -113,12 +115,19 @@ impl Default for MitoConfig {
enable_experimental_write_cache: false,
experimental_write_cache_path: String::new(),
experimental_write_cache_size: ReadableSize::mb(512),
sst_write_buffer_size: ReadableSize::mb(8),
sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
scan_parallelism: divide_num_cpus(4),
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
allow_stale_entries: false,
inverted_index: InvertedIndexConfig::default(),
};
// Adjust buffer and cache size according to system memory if we can.
if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
mito_config.adjust_buffer_and_cache_size(sys_memory);
}
mito_config
}
}
@@ -182,6 +191,23 @@ impl MitoConfig {
Ok(())
}
fn adjust_buffer_and_cache_size(&mut self, sys_memory: ReadableSize) {
// Use 1/64 of OS memory as global write buffer size, it shouldn't be greater than 1G in default mode.
let global_write_buffer_size = cmp::min(sys_memory / 64, ReadableSize::gb(1));
// Use 2x of global write buffer size as global write buffer reject size.
let global_write_buffer_reject_size = global_write_buffer_size * 2;
// Use 1/256 of OS memory size as SST meta cache size, it shouldn't be greater than 128MB in default mode.
let sst_meta_cache_size = cmp::min(sys_memory / 256, ReadableSize::mb(128));
// Use 1/128 of OS memory size as mem cache size, it shouldn't be greater than 512MB in default mode.
let mem_cache_size = cmp::min(sys_memory / 128, ReadableSize::mb(512));
self.global_write_buffer_size = global_write_buffer_size;
self.global_write_buffer_reject_size = global_write_buffer_reject_size;
self.sst_meta_cache_size = sst_meta_cache_size;
self.vector_cache_size = mem_cache_size;
self.page_cache_size = mem_cache_size;
}
/// Enable experimental write cache.
#[cfg(test)]
pub fn enable_write_cache(mut self, path: String, size: ReadableSize) -> Self {
@@ -259,7 +285,7 @@ impl InvertedIndexConfig {
/// Divide cpu num by a non-zero `divisor` and returns at least 1.
fn divide_num_cpus(divisor: usize) -> usize {
debug_assert!(divisor > 0);
let cores = num_cpus::get();
let cores = common_config::utils::get_cpus();
debug_assert!(cores > 0);
(cores + divisor - 1) / divisor

View File

@@ -73,7 +73,6 @@ uuid.workspace = true
datafusion-expr.workspace = true
datafusion.workspace = true
itertools.workspace = true
num_cpus = "1.13"
opentelemetry-proto.workspace = true
partition.workspace = true
paste.workspace = true

View File

@@ -647,6 +647,7 @@ pub async fn test_config_api(store_type: StorageType) {
let res_get = client.get("/config").send().await;
assert_eq!(res_get.status(), StatusCode::OK);
let expected_toml_str = format!(
r#"
[procedure]
@@ -765,11 +766,6 @@ manifest_checkpoint_distance = 10
compress_manifest = false
max_background_jobs = 4
auto_flush_interval = "30m"
global_write_buffer_size = "1GiB"
global_write_buffer_reject_size = "2GiB"
sst_meta_cache_size = "128MiB"
vector_cache_size = "512MiB"
page_cache_size = "512MiB"
enable_experimental_write_cache = false
experimental_write_cache_path = ""
experimental_write_cache_size = "512MiB"
@@ -822,6 +818,11 @@ fn drop_lines_with_inconsistent_results(input: String) -> String {
"scope =",
"num_workers =",
"scan_parallelism =",
"global_write_buffer_size =",
"global_write_buffer_reject_size =",
"sst_meta_cache_size =",
"vector_cache_size =",
"page_cache_size =",
];
input