diff --git a/Cargo.lock b/Cargo.lock index 8ff06da6c9..de440ec733 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1631,10 +1631,12 @@ version = "0.6.0" dependencies = [ "common-base", "humantime-serde", + "num_cpus", "rskafka", "serde", "serde_json", "serde_with", + "sysinfo", "toml 0.8.8", ] @@ -3966,7 +3968,7 @@ dependencies = [ "iana-time-zone-haiku", "js-sys", "wasm-bindgen", - "windows-core", + "windows-core 0.51.1", ] [[package]] @@ -4998,7 +5000,6 @@ dependencies = [ "log-store", "memcomparable", "moka", - "num_cpus", "object-store", "parquet", "paste", @@ -5288,6 +5289,15 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be" +[[package]] +name = "ntapi" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4" +dependencies = [ + "winapi", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -8028,7 +8038,7 @@ dependencies = [ "which", "widestring", "winapi", - "windows", + "windows 0.39.0", "winreg 0.10.1", ] @@ -9353,6 +9363,21 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "sysinfo" +version = "0.30.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fb4f3438c8f6389c864e61221cbc97e9bca98b4daf39a5beb7bea660f528bb2" +dependencies = [ + "cfg-if 1.0.0", + "core-foundation-sys", + "libc", + "ntapi", + "once_cell", + "rayon", + "windows 0.52.0", +] + [[package]] name = "system-configuration" version = "0.5.1" @@ -9530,7 +9555,6 @@ dependencies = [ "meta-client", "meta-srv", "mysql_async", - "num_cpus", "object-store", "once_cell", "opentelemetry-proto 0.3.0", @@ -10930,6 +10954,16 @@ dependencies = [ "windows_x86_64_msvc 0.39.0", ] +[[package]] +name = "windows" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" +dependencies = [ + "windows-core 0.52.0", + "windows-targets 0.52.0", +] + [[package]] name = "windows-core" version = "0.51.1" @@ -10939,6 +10973,15 @@ dependencies = [ "windows-targets 0.48.5", ] +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets 0.52.0", +] + [[package]] name = "windows-sys" version = "0.45.0" diff --git a/Cargo.toml b/Cargo.toml index d934d7d4c9..b5e5331073 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -99,6 +99,7 @@ lazy_static = "1.4" meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "abbd357c1e193cd270ea65ee7652334a150b628f" } mockall = "0.11.4" moka = "0.12" +num_cpus = "1.16" once_cell = "1.18" opentelemetry-proto = { git = "https://github.com/waynexia/opentelemetry-rust.git", rev = "33841b38dda79b15f2024952be5f32533325ca02", features = [ "gen-tonic", @@ -125,6 +126,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" smallvec = { version = "1", features = ["serde"] } snafu = "0.7" +sysinfo = "0.30" # on branch v0.38.x sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "6a93567ae38d42be5c8d08b13c8ff4dde26502ef", features = [ "visitor", diff --git a/src/common/base/src/readable_size.rs b/src/common/base/src/readable_size.rs index 100d0f56a8..8c539f3f78 100644 --- a/src/common/base/src/readable_size.rs +++ b/src/common/base/src/readable_size.rs @@ -33,7 +33,7 @@ pub const GIB: u64 = MIB * BINARY_DATA_MAGNITUDE; pub const TIB: u64 = GIB * BINARY_DATA_MAGNITUDE; pub const PIB: u64 = TIB * BINARY_DATA_MAGNITUDE; -#[derive(Clone, Copy, PartialEq, Eq, PartialOrd)] +#[derive(Clone, Copy, PartialEq, Eq, Ord, PartialOrd)] pub struct ReadableSize(pub u64); impl ReadableSize { diff --git a/src/common/config/Cargo.toml b/src/common/config/Cargo.toml index ce779d9043..12029609e1 100644 --- a/src/common/config/Cargo.toml +++ b/src/common/config/Cargo.toml @@ -7,8 +7,10 @@ license.workspace = true [dependencies] common-base.workspace = true humantime-serde.workspace = true +num_cpus.workspace = true rskafka.workspace = true serde.workspace = true serde_json.workspace = true serde_with = "3" +sysinfo.workspace = true toml.workspace = true diff --git a/src/common/config/src/lib.rs b/src/common/config/src/lib.rs index 33b07c0498..6ad928dc3c 100644 --- a/src/common/config/src/lib.rs +++ b/src/common/config/src/lib.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod utils; pub mod wal; use common_base::readable_size::ReadableSize; diff --git a/src/common/config/src/utils.rs b/src/common/config/src/utils.rs new file mode 100644 index 0000000000..6055041d89 --- /dev/null +++ b/src/common/config/src/utils.rs @@ -0,0 +1,55 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_base::readable_size::ReadableSize; +use sysinfo::System; + +/// Get the CPU core number of system, aware of cgroups. +pub fn get_cpus() -> usize { + // This function will check cgroups + num_cpus::get() +} + +/// Get the total memory of the system. +/// If `cgroup_limits` is enabled, it will also check it. +pub fn get_sys_total_memory() -> Option { + if sysinfo::IS_SUPPORTED_SYSTEM { + let mut sys_info = System::new(); + sys_info.refresh_memory(); + let mut total_memory = sys_info.total_memory(); + // Compare with cgroups memory limit, use smaller values + // This method is only implemented for Linux. It always returns None for all other systems. + if let Some(cgroup_limits) = sys_info.cgroup_limits() { + total_memory = total_memory.min(cgroup_limits.total_memory) + } + Some(ReadableSize(total_memory)) + } else { + None + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_get_cpus() { + assert!(get_cpus() > 0); + } + + #[test] + fn test_get_sys_total_memory() { + assert!(get_sys_total_memory().unwrap() > ReadableSize::mb(0)); + } +} diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 917785fcff..f33511cf0b 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -44,7 +44,6 @@ lazy_static = "1.4" log-store = { workspace = true, optional = true } memcomparable = "0.2" moka = { workspace = true, features = ["sync", "future"] } -num_cpus = "1.13" object-store.workspace = true parquet = { workspace = true, features = ["async"] } paste.workspace = true diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 748401b622..dbd8989e0a 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -14,6 +14,7 @@ //! Configurations. +use std::cmp; use std::time::Duration; use common_base::readable_size::ReadableSize; @@ -23,6 +24,7 @@ use serde::{Deserialize, Serialize}; use serde_with::{serde_as, NoneAsEmptyString}; use crate::error::Result; +use crate::sst::DEFAULT_WRITE_BUFFER_SIZE; /// Default max running background job. const DEFAULT_MAX_BG_JOB: usize = 4; @@ -59,17 +61,17 @@ pub struct MitoConfig { /// Interval to auto flush a region if it has not flushed yet (default 30 min). #[serde(with = "humantime_serde")] pub auto_flush_interval: Duration, - /// Global write buffer size threshold to trigger flush (default 1G). + /// Global write buffer size threshold to trigger flush. pub global_write_buffer_size: ReadableSize, - /// Global write buffer size threshold to reject write requests (default 2G). + /// Global write buffer size threshold to reject write requests. pub global_write_buffer_reject_size: ReadableSize, // Cache configs: - /// Cache size for SST metadata (default 128MB). Setting it to 0 to disable the cache. + /// Cache size for SST metadata. Setting it to 0 to disable the cache. pub sst_meta_cache_size: ReadableSize, - /// Cache size for vectors and arrow arrays (default 512MB). Setting it to 0 to disable the cache. + /// Cache size for vectors and arrow arrays. Setting it to 0 to disable the cache. pub vector_cache_size: ReadableSize, - /// Cache size for pages of SST row groups (default 512MB). Setting it to 0 to disable the cache. + /// Cache size for pages of SST row groups. Setting it to 0 to disable the cache. pub page_cache_size: ReadableSize, /// Whether to enable the experimental write cache. pub enable_experimental_write_cache: bool, @@ -97,7 +99,7 @@ pub struct MitoConfig { impl Default for MitoConfig { fn default() -> Self { - MitoConfig { + let mut mito_config = MitoConfig { num_workers: divide_num_cpus(2), worker_channel_size: 128, worker_request_batch_size: 64, @@ -113,12 +115,19 @@ impl Default for MitoConfig { enable_experimental_write_cache: false, experimental_write_cache_path: String::new(), experimental_write_cache_size: ReadableSize::mb(512), - sst_write_buffer_size: ReadableSize::mb(8), + sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE, scan_parallelism: divide_num_cpus(4), parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE, allow_stale_entries: false, inverted_index: InvertedIndexConfig::default(), + }; + + // Adjust buffer and cache size according to system memory if we can. + if let Some(sys_memory) = common_config::utils::get_sys_total_memory() { + mito_config.adjust_buffer_and_cache_size(sys_memory); } + + mito_config } } @@ -182,6 +191,23 @@ impl MitoConfig { Ok(()) } + fn adjust_buffer_and_cache_size(&mut self, sys_memory: ReadableSize) { + // Use 1/64 of OS memory as global write buffer size, it shouldn't be greater than 1G in default mode. + let global_write_buffer_size = cmp::min(sys_memory / 64, ReadableSize::gb(1)); + // Use 2x of global write buffer size as global write buffer reject size. + let global_write_buffer_reject_size = global_write_buffer_size * 2; + // Use 1/256 of OS memory size as SST meta cache size, it shouldn't be greater than 128MB in default mode. + let sst_meta_cache_size = cmp::min(sys_memory / 256, ReadableSize::mb(128)); + // Use 1/128 of OS memory size as mem cache size, it shouldn't be greater than 512MB in default mode. + let mem_cache_size = cmp::min(sys_memory / 128, ReadableSize::mb(512)); + + self.global_write_buffer_size = global_write_buffer_size; + self.global_write_buffer_reject_size = global_write_buffer_reject_size; + self.sst_meta_cache_size = sst_meta_cache_size; + self.vector_cache_size = mem_cache_size; + self.page_cache_size = mem_cache_size; + } + /// Enable experimental write cache. #[cfg(test)] pub fn enable_write_cache(mut self, path: String, size: ReadableSize) -> Self { @@ -259,7 +285,7 @@ impl InvertedIndexConfig { /// Divide cpu num by a non-zero `divisor` and returns at least 1. fn divide_num_cpus(divisor: usize) -> usize { debug_assert!(divisor > 0); - let cores = num_cpus::get(); + let cores = common_config::utils::get_cpus(); debug_assert!(cores > 0); (cores + divisor - 1) / divisor diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index b32cb58565..7abbc495ac 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -73,7 +73,6 @@ uuid.workspace = true datafusion-expr.workspace = true datafusion.workspace = true itertools.workspace = true -num_cpus = "1.13" opentelemetry-proto.workspace = true partition.workspace = true paste.workspace = true diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index cbe18e7fbf..8eb15c2f87 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -647,6 +647,7 @@ pub async fn test_config_api(store_type: StorageType) { let res_get = client.get("/config").send().await; assert_eq!(res_get.status(), StatusCode::OK); + let expected_toml_str = format!( r#" [procedure] @@ -765,11 +766,6 @@ manifest_checkpoint_distance = 10 compress_manifest = false max_background_jobs = 4 auto_flush_interval = "30m" -global_write_buffer_size = "1GiB" -global_write_buffer_reject_size = "2GiB" -sst_meta_cache_size = "128MiB" -vector_cache_size = "512MiB" -page_cache_size = "512MiB" enable_experimental_write_cache = false experimental_write_cache_path = "" experimental_write_cache_size = "512MiB" @@ -822,6 +818,11 @@ fn drop_lines_with_inconsistent_results(input: String) -> String { "scope =", "num_workers =", "scan_parallelism =", + "global_write_buffer_size =", + "global_write_buffer_reject_size =", + "sst_meta_cache_size =", + "vector_cache_size =", + "page_cache_size =", ]; input