diff --git a/Cargo.lock b/Cargo.lock index f35dedf864..003457d582 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1722,10 +1722,10 @@ dependencies = [ "common-test-util", "common-version", "hyper", - "once_cell", "reqwest", "serde", "serde_json", + "tempfile", "tokio", "uuid", ] diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 828a6da353..439f599d65 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -38,8 +38,9 @@ sync_write = false # Storage options, see `standalone.example.toml`. [storage] -type = "File" +# The working home directory. data_home = "/tmp/greptimedb/" +type = "File" # TTL for all tables. Disabled by default. # global_ttl = "7d" diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index 874d13a244..647d98c0ec 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -1,3 +1,5 @@ +# The working home directory. +data_home = "/tmp/metasrv/" # The bind address of metasrv, "127.0.0.1:3002" by default. bind_addr = "127.0.0.1:3002" # The communication server address for frontend and datanode to connect to metasrv, "127.0.0.1:3002" by default for localhost. @@ -13,6 +15,8 @@ datanode_lease_secs = 15 selector = "LeaseBased" # Store data in memory, false by default. use_memory_store = false +# Whether to enable greptimedb telemetry, true by default. +enable_telemetry = true # Log options, see `standalone.example.toml` # [logging] diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 5e846908a7..ed2b4645b9 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -2,6 +2,8 @@ mode = "standalone" # Whether to use in-memory catalog, `false` by default. enable_memory_catalog = false +# Whether to enable greptimedb telemetry, true by default. +enable_telemetry = true # HTTP server options. [http_options] @@ -96,10 +98,10 @@ sync_write = false # Storage options. [storage] +# The working home directory. +data_home = "/tmp/greptimedb/" # Storage type. type = "File" -# Data directory, "/tmp/greptimedb/data" by default. -data_home = "/tmp/greptimedb/" # TTL for all tables. Disabled by default. # global_ttl = "7d" diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index 037e151820..974c56b22f 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -13,10 +13,6 @@ path = "src/bin/greptime.rs" default = ["metrics-process"] tokio-console = ["common-telemetry/tokio-console"] metrics-process = ["servers/metrics-process"] -greptimedb-telemetry = [ - "datanode/greptimedb-telemetry", - "meta-srv/greptimedb-telemetry", -] [dependencies] anymap = "1.0.0-beta.2" diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index ad5e6840a6..64b9567efe 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -16,7 +16,7 @@ use std::time::Duration; use clap::Parser; use common_telemetry::logging; -use datanode::datanode::{Datanode, DatanodeOptions, FileConfig, ObjectStoreConfig}; +use datanode::datanode::{Datanode, DatanodeOptions}; use meta_client::MetaClientOptions; use servers::Mode; use snafu::ResultExt; @@ -143,9 +143,7 @@ impl StartCommand { } if let Some(data_home) = &self.data_home { - opts.storage.store = ObjectStoreConfig::File(FileConfig { - data_home: data_home.clone(), - }); + opts.storage.data_home = data_home.clone(); } if let Some(wal_dir) = &self.wal_dir { @@ -185,7 +183,9 @@ mod tests { use common_base::readable_size::ReadableSize; use common_test_util::temp_dir::create_named_temp_file; - use datanode::datanode::{CompactionConfig, ObjectStoreConfig, RegionManifestConfig}; + use datanode::datanode::{ + CompactionConfig, FileConfig, ObjectStoreConfig, RegionManifestConfig, + }; use servers::Mode; use super::*; @@ -270,16 +270,11 @@ mod tests { assert_eq!(10000, ddl_timeout_millis); assert_eq!(3000, timeout_millis); assert!(tcp_nodelay); - - match &options.storage.store { - ObjectStoreConfig::File(FileConfig { data_home, .. }) => { - assert_eq!("/tmp/greptimedb/", data_home) - } - ObjectStoreConfig::S3 { .. } => unreachable!(), - ObjectStoreConfig::Oss { .. } => unreachable!(), - ObjectStoreConfig::Azblob { .. } => unreachable!(), - ObjectStoreConfig::Gcs { .. } => unreachable!(), - }; + assert_eq!("/tmp/greptimedb/", options.storage.data_home); + assert!(matches!( + &options.storage.store, + ObjectStoreConfig::File(FileConfig { .. }) + )); assert_eq!( CompactionConfig { diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 076ce37848..312eda5f7f 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -83,6 +83,7 @@ impl SubCommand { pub struct StandaloneOptions { pub mode: Mode, pub enable_memory_catalog: bool, + pub enable_telemetry: bool, pub http_options: Option, pub grpc_options: Option, pub mysql_options: Option, @@ -102,6 +103,7 @@ impl Default for StandaloneOptions { Self { mode: Mode::Standalone, enable_memory_catalog: false, + enable_telemetry: true, http_options: Some(HttpOptions::default()), grpc_options: Some(GrpcOptions::default()), mysql_options: Some(MysqlOptions::default()), @@ -139,6 +141,7 @@ impl StandaloneOptions { fn datanode_options(self) -> DatanodeOptions { DatanodeOptions { enable_memory_catalog: self.enable_memory_catalog, + enable_telemetry: self.enable_telemetry, wal: self.wal, storage: self.storage, procedure: self.procedure, diff --git a/src/common/greptimedb-telemetry/Cargo.toml b/src/common/greptimedb-telemetry/Cargo.toml index 79d418ca39..0888cb59f0 100644 --- a/src/common/greptimedb-telemetry/Cargo.toml +++ b/src/common/greptimedb-telemetry/Cargo.toml @@ -9,7 +9,6 @@ async-trait.workspace = true common-error = { workspace = true } common-runtime = { workspace = true } common-telemetry = { workspace = true } -once_cell = "1.17.0" reqwest = { version = "0.11", features = [ "json", "rustls-tls", @@ -22,6 +21,7 @@ uuid.workspace = true [dev-dependencies] common-test-util = { workspace = true } hyper = { version = "0.14", features = ["full"] } +tempfile.workspace = true [build-dependencies] common-version = { workspace = true } diff --git a/src/common/greptimedb-telemetry/src/lib.rs b/src/common/greptimedb-telemetry/src/lib.rs index e701276dec..4431b9c972 100644 --- a/src/common/greptimedb-telemetry/src/lib.rs +++ b/src/common/greptimedb-telemetry/src/lib.rs @@ -14,30 +14,26 @@ use std::env; use std::io::ErrorKind; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::time::Duration; use common_runtime::error::{Error, Result}; use common_runtime::{BoxedTaskFunction, RepeatedTask, Runtime, TaskFunction}; use common_telemetry::{debug, info}; -use once_cell::sync::Lazy; use reqwest::{Client, Response}; use serde::{Deserialize, Serialize}; +/// The URL to report telemetry data. pub const TELEMETRY_URL: &str = "https://api.greptime.cloud/db/otel/statistics"; +/// The local installation uuid cache file +const UUID_FILE_NAME: &str = ".greptimedb-telemetry-uuid"; -// Getting the right path when running on windows -static TELEMETRY_UUID_FILE_NAME: Lazy = Lazy::new(|| { - let mut path = PathBuf::new(); - path.push(env::temp_dir()); - path.push(".greptimedb-telemetry-uuid"); - path -}); - +/// The default interval of reporting telemetry data to greptime cloud pub static TELEMETRY_INTERVAL: Duration = Duration::from_secs(60 * 30); - +/// The default connect timeout to greptime cloud. const GREPTIMEDB_TELEMETRY_CLIENT_CONNECT_TIMEOUT: Duration = Duration::from_secs(10); -const GREPTIMEDB_TELEMETRY_CLIENT_TIMEOUT: Duration = Duration::from_secs(10); +/// The default request timeout to greptime cloud. +const GREPTIMEDB_TELEMETRY_CLIENT_REQUEST_TIMEOUT: Duration = Duration::from_secs(10); pub enum GreptimeDBTelemetryTask { Enable(RepeatedTask), @@ -54,6 +50,8 @@ impl GreptimeDBTelemetryTask { } pub fn start(&self, runtime: Runtime) -> Result<()> { + print_anonymous_usage_data_disclaimer(); + match self { GreptimeDBTelemetryTask::Enable(task) => task.start(runtime), GreptimeDBTelemetryTask::Disable => Ok(()), @@ -68,14 +66,22 @@ impl GreptimeDBTelemetryTask { } } +/// Telemetry data to report #[derive(Serialize, Deserialize, Debug)] struct StatisticData { + /// Operating system name, such as `linux`, `windows` etc. pub os: String, + /// The greptimedb version pub version: String, + /// The architecture of the CPU, such as `x86`, `x86_64` etc. pub arch: String, + /// The running mode, `standalone` or `distributed`. pub mode: Mode, + /// The git commit revision of greptimedb pub git_commit: String, + /// The node number pub nodes: Option, + /// The local installation uuid pub uuid: String, } @@ -116,14 +122,14 @@ pub trait Collector { async fn get_nodes(&self) -> Option; - fn get_uuid(&mut self) -> Option { + fn get_uuid(&mut self, working_home: &Option) -> Option { match self.get_uuid_cache() { Some(uuid) => Some(uuid), None => { if self.get_retry() > 3 { return None; } - match default_get_uuid() { + match default_get_uuid(working_home) { Some(uuid) => { self.set_uuid_cache(uuid.clone()); Some(uuid) @@ -138,8 +144,26 @@ pub trait Collector { } } -pub fn default_get_uuid() -> Option { - let path = (*TELEMETRY_UUID_FILE_NAME).as_path(); +fn print_anonymous_usage_data_disclaimer() { + info!("Attention: GreptimeDB now collects anonymous usage data to help improve its roadmap and prioritize features."); + info!( + "To learn more about this anonymous program and how to deactivate it if you don't want to participate, please visit the following URL: "); + info!("https://docs.greptime.com/reference/telemetry"); +} + +pub fn default_get_uuid(working_home: &Option) -> Option { + let temp_dir = env::temp_dir(); + + let mut path = PathBuf::new(); + path.push( + working_home + .as_ref() + .map(Path::new) + .unwrap_or_else(|| temp_dir.as_path()), + ); + path.push(UUID_FILE_NAME); + + let path = path.as_path(); match std::fs::read(path) { Ok(bytes) => Some(String::from_utf8_lossy(&bytes).to_string()), Err(e) => { @@ -164,6 +188,7 @@ pub fn default_get_uuid() -> Option { pub struct GreptimeDBTelemetry { statistics: Box, client: Option, + working_home: Option, telemetry_url: &'static str, } @@ -180,12 +205,13 @@ impl TaskFunction for GreptimeDBTelemetry { } impl GreptimeDBTelemetry { - pub fn new(statistics: Box) -> Self { + pub fn new(working_home: Option, statistics: Box) -> Self { let client = Client::builder() .connect_timeout(GREPTIMEDB_TELEMETRY_CLIENT_CONNECT_TIMEOUT) - .timeout(GREPTIMEDB_TELEMETRY_CLIENT_TIMEOUT) + .timeout(GREPTIMEDB_TELEMETRY_CLIENT_REQUEST_TIMEOUT) .build(); Self { + working_home, statistics, client: client.ok(), telemetry_url: TELEMETRY_URL, @@ -193,7 +219,7 @@ impl GreptimeDBTelemetry { } pub async fn report_telemetry_info(&mut self) -> Option { - match self.statistics.get_uuid() { + match self.statistics.get_uuid(&self.working_home) { Some(uuid) => { let data = StatisticData { os: self.statistics.get_os(), @@ -232,7 +258,7 @@ mod tests { use reqwest::Client; use tokio::spawn; - use crate::{Collector, GreptimeDBTelemetry, Mode, StatisticData}; + use crate::{default_get_uuid, Collector, GreptimeDBTelemetry, Mode, StatisticData}; static COUNT: AtomicUsize = std::sync::atomic::AtomicUsize::new(0); @@ -300,7 +326,7 @@ mod tests { unimplemented!() } - fn get_uuid(&mut self) -> Option { + fn get_uuid(&mut self, _working_home: &Option) -> Option { Some("test".to_string()) } } @@ -331,13 +357,19 @@ mod tests { unimplemented!() } - fn get_uuid(&mut self) -> Option { + fn get_uuid(&mut self, _working_home: &Option) -> Option { None } } + let working_home_temp = tempfile::Builder::new() + .prefix("greptimedb_telemetry") + .tempdir() + .unwrap(); + let working_home = working_home_temp.path().to_str().unwrap().to_string(); + let test_statistic = Box::new(TestStatistic); - let mut test_report = GreptimeDBTelemetry::new(test_statistic); + let mut test_report = GreptimeDBTelemetry::new(Some(working_home.clone()), test_statistic); let url = Box::leak(format!("{}:{}", "http://localhost", port).into_boxed_str()); test_report.telemetry_url = url; let response = test_report.report_telemetry_info().await.unwrap(); @@ -351,7 +383,7 @@ mod tests { assert_eq!(1, body.nodes.unwrap()); let failed_statistic = Box::new(FailedStatistic); - let mut failed_report = GreptimeDBTelemetry::new(failed_statistic); + let mut failed_report = GreptimeDBTelemetry::new(Some(working_home), failed_statistic); failed_report.telemetry_url = url; let response = failed_report.report_telemetry_info().await; assert!(response.is_none()); @@ -368,4 +400,18 @@ mod tests { assert_eq!("1", body); tx.send(()).unwrap(); } + + #[test] + fn test_get_uuid() { + let working_home_temp = tempfile::Builder::new() + .prefix("greptimedb_telemetry") + .tempdir() + .unwrap(); + let working_home = working_home_temp.path().to_str().unwrap().to_string(); + + let uuid = default_get_uuid(&Some(working_home.clone())); + assert!(uuid.is_some()); + assert_eq!(uuid, default_get_uuid(&Some(working_home.clone()))); + assert_eq!(uuid, default_get_uuid(&Some(working_home.clone()))); + } } diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 312ff13cfd..333d341b8f 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -5,7 +5,6 @@ edition.workspace = true license.workspace = true [features] -greptimedb-telemetry = [] testing = ["meta-srv/mock"] [dependencies] diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 7352fae347..9b84f6eba9 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -58,7 +58,7 @@ pub enum ObjectStoreConfig { } /// Storage engine config -#[derive(Debug, Clone, Serialize, Deserialize, Default)] +#[derive(Debug, Clone, Serialize, Deserialize)] #[serde(default)] pub struct StorageConfig { /// Retention period for all tables. @@ -68,6 +68,8 @@ pub struct StorageConfig { /// The precedence order is: ttl in table options > global ttl. #[serde(with = "humantime_serde")] pub global_ttl: Option, + /// The working directory of database + pub data_home: String, #[serde(flatten)] pub store: ObjectStoreConfig, pub compaction: CompactionConfig, @@ -75,11 +77,22 @@ pub struct StorageConfig { pub flush: FlushConfig, } +impl Default for StorageConfig { + fn default() -> Self { + Self { + global_ttl: None, + data_home: DEFAULT_DATA_HOME.to_string(), + store: ObjectStoreConfig::default(), + compaction: CompactionConfig::default(), + manifest: RegionManifestConfig::default(), + flush: FlushConfig::default(), + } + } +} + #[derive(Debug, Clone, Serialize, Default, Deserialize)] #[serde(default)] -pub struct FileConfig { - pub data_home: String, -} +pub struct FileConfig {} #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(default)] @@ -198,9 +211,7 @@ impl Default for GcsConfig { impl Default for ObjectStoreConfig { fn default() -> Self { - ObjectStoreConfig::File(FileConfig { - data_home: DEFAULT_DATA_HOME.to_string(), - }) + ObjectStoreConfig::File(FileConfig {}) } } @@ -362,6 +373,7 @@ pub struct DatanodeOptions { pub storage: StorageConfig, pub procedure: ProcedureConfig, pub logging: LoggingOptions, + pub enable_telemetry: bool, } impl Default for DatanodeOptions { @@ -380,6 +392,7 @@ impl Default for DatanodeOptions { procedure: ProcedureConfig::default(), logging: LoggingOptions::default(), heartbeat: HeartbeatOptions::default(), + enable_telemetry: true, } } } diff --git a/src/datanode/src/greptimedb_telemetry.rs b/src/datanode/src/greptimedb_telemetry.rs index 48feba1727..e05be6c2ac 100644 --- a/src/datanode/src/greptimedb_telemetry.rs +++ b/src/datanode/src/greptimedb_telemetry.rs @@ -12,72 +12,66 @@ // See the License for the specific language governing permissions and // limitations under the License. -#[cfg(feature = "greptimedb-telemetry")] -pub mod telemetry { - use std::sync::Arc; +use std::sync::Arc; - use async_trait::async_trait; - use common_greptimedb_telemetry::{ - default_get_uuid, Collector, GreptimeDBTelemetry, GreptimeDBTelemetryTask, - Mode as VersionReporterMode, TELEMETRY_INTERVAL, - }; - use servers::Mode; +use async_trait::async_trait; +use common_greptimedb_telemetry::{ + default_get_uuid, Collector, GreptimeDBTelemetry, GreptimeDBTelemetryTask, + Mode as VersionReporterMode, TELEMETRY_INTERVAL, +}; +use servers::Mode; - struct StandaloneGreptimeDBTelemetryCollector { - uuid: Option, - retry: i32, - } - #[async_trait] - impl Collector for StandaloneGreptimeDBTelemetryCollector { - fn get_mode(&self) -> VersionReporterMode { - VersionReporterMode::Standalone - } - - async fn get_nodes(&self) -> Option { - Some(1) - } - - fn get_retry(&self) -> i32 { - self.retry - } - - fn inc_retry(&mut self) { - self.retry += 1; - } - - fn set_uuid_cache(&mut self, uuid: String) { - self.uuid = Some(uuid); - } - - fn get_uuid_cache(&self) -> Option { - self.uuid.clone() - } +struct StandaloneGreptimeDBTelemetryCollector { + uuid: Option, + retry: i32, +} +#[async_trait] +impl Collector for StandaloneGreptimeDBTelemetryCollector { + fn get_mode(&self) -> VersionReporterMode { + VersionReporterMode::Standalone } - pub async fn get_greptimedb_telemetry_task(mode: &Mode) -> Arc { - match mode { - Mode::Standalone => Arc::new(GreptimeDBTelemetryTask::enable( - TELEMETRY_INTERVAL, - Box::new(GreptimeDBTelemetry::new(Box::new( - StandaloneGreptimeDBTelemetryCollector { - uuid: default_get_uuid(), - retry: 0, - }, - ))), + async fn get_nodes(&self) -> Option { + Some(1) + } + + fn get_retry(&self) -> i32 { + self.retry + } + + fn inc_retry(&mut self) { + self.retry += 1; + } + + fn set_uuid_cache(&mut self, uuid: String) { + self.uuid = Some(uuid); + } + + fn get_uuid_cache(&self) -> Option { + self.uuid.clone() + } +} + +pub async fn get_greptimedb_telemetry_task( + working_home: Option, + mode: &Mode, + enable: bool, +) -> Arc { + if !enable || cfg!(test) { + return Arc::new(GreptimeDBTelemetryTask::disable()); + } + + match mode { + Mode::Standalone => Arc::new(GreptimeDBTelemetryTask::enable( + TELEMETRY_INTERVAL, + Box::new(GreptimeDBTelemetry::new( + working_home.clone(), + Box::new(StandaloneGreptimeDBTelemetryCollector { + uuid: default_get_uuid(&working_home), + retry: 0, + }), )), - Mode::Distributed => Arc::new(GreptimeDBTelemetryTask::disable()), - } - } -} - -#[cfg(not(feature = "greptimedb-telemetry"))] -pub mod telemetry { - use std::sync::Arc; - - use common_greptimedb_telemetry::GreptimeDBTelemetryTask; - use servers::Mode; - - pub async fn get_greptimedb_telemetry_task(_: &Mode) -> Arc { - Arc::new(GreptimeDBTelemetryTask::disable()) + )), + Mode::Distributed => Arc::new(GreptimeDBTelemetryTask::disable()), } } diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index ed229060b0..589ee18519 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -64,12 +64,12 @@ use crate::error::{ MissingNodeIdSnafu, NewCatalogSnafu, OpenLogStoreSnafu, RecoverProcedureSnafu, Result, ShutdownInstanceSnafu, StartProcedureManagerSnafu, StopProcedureManagerSnafu, }; +use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; use crate::heartbeat::handler::close_region::CloseRegionHandler; use crate::heartbeat::handler::open_region::OpenRegionHandler; use crate::heartbeat::HeartbeatTask; use crate::sql::{SqlHandler, SqlRequest}; use crate::store; -use crate::telemetry::get_greptimedb_telemetry_task; mod grpc; pub mod sql; @@ -164,8 +164,11 @@ impl Instance { compaction_scheduler: CompactionSchedulerRef, plugins: Arc, ) -> Result<(InstanceRef, Option)> { - let object_store = store::new_object_store(&opts.storage.store).await?; - let log_store = Arc::new(create_log_store(&opts.storage.store, &opts.wal).await?); + let data_home = util::normalize_dir(&opts.storage.data_home); + info!("The working home directory is: {}", data_home); + let object_store = store::new_object_store(&data_home, &opts.storage.store).await?; + let log_store = + Arc::new(create_log_store(&data_home, &opts.storage.store, &opts.wal).await?); let mito_engine = Arc::new(DefaultEngine::new( TableEngineConfig { @@ -305,7 +308,12 @@ impl Instance { catalog_manager: catalog_manager.clone(), table_id_provider, procedure_manager, - greptimedb_telemetry_task: get_greptimedb_telemetry_task(&opts.mode).await, + greptimedb_telemetry_task: get_greptimedb_telemetry_task( + Some(opts.storage.data_home.clone()), + &opts.mode, + opts.enable_telemetry, + ) + .await, }); let heartbeat_task = Instance::build_heartbeat_task( @@ -444,13 +452,14 @@ async fn new_metasrv_client(node_id: u64, meta_config: &MetaClientOptions) -> Re } pub(crate) async fn create_log_store( + data_home: &str, store_config: &ObjectStoreConfig, wal_config: &WalConfig, ) -> Result { let wal_dir = match (&wal_config.dir, store_config) { (Some(dir), _) => dir.to_string(), - (None, ObjectStoreConfig::File(file_config)) => { - format!("{}{WAL_DIR}", util::normalize_dir(&file_config.data_home)) + (None, ObjectStoreConfig::File(_file_config)) => { + format!("{}{WAL_DIR}", data_home) } _ => return error::MissingWalDirConfigSnafu {}.fail(), }; diff --git a/src/datanode/src/lib.rs b/src/datanode/src/lib.rs index c5c36e531d..24a010d317 100644 --- a/src/datanode/src/lib.rs +++ b/src/datanode/src/lib.rs @@ -27,7 +27,5 @@ pub mod server; pub mod sql; mod store; -use greptimedb_telemetry::telemetry; - #[cfg(test)] mod tests; diff --git a/src/datanode/src/store.rs b/src/datanode/src/store.rs index faaddf0460..0002e78e5f 100644 --- a/src/datanode/src/store.rs +++ b/src/datanode/src/store.rs @@ -33,9 +33,14 @@ use snafu::prelude::*; use crate::datanode::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE}; use crate::error::{self, Result}; -pub(crate) async fn new_object_store(store_config: &ObjectStoreConfig) -> Result { +pub(crate) async fn new_object_store( + data_home: &str, + store_config: &ObjectStoreConfig, +) -> Result { let object_store = match store_config { - ObjectStoreConfig::File(file_config) => fs::new_fs_object_store(file_config).await, + ObjectStoreConfig::File(file_config) => { + fs::new_fs_object_store(data_home, file_config).await + } ObjectStoreConfig::S3(s3_config) => s3::new_s3_object_store(s3_config).await, ObjectStoreConfig::Oss(oss_config) => oss::new_oss_object_store(oss_config).await, ObjectStoreConfig::Azblob(azblob_config) => { diff --git a/src/datanode/src/store/fs.rs b/src/datanode/src/store/fs.rs index bb5c64d308..a065a3d27c 100644 --- a/src/datanode/src/store/fs.rs +++ b/src/datanode/src/store/fs.rs @@ -16,24 +16,26 @@ use std::{fs, path}; use common_telemetry::logging::info; use object_store::services::Fs as FsBuilder; -use object_store::{util, ObjectStore}; +use object_store::ObjectStore; use snafu::prelude::*; use crate::datanode::FileConfig; use crate::error::{self, Result}; use crate::store; -pub(crate) async fn new_fs_object_store(file_config: &FileConfig) -> Result { - let data_home = util::normalize_dir(&file_config.data_home); +pub(crate) async fn new_fs_object_store( + data_home: &str, + _file_config: &FileConfig, +) -> Result { fs::create_dir_all(path::Path::new(&data_home)) - .context(error::CreateDirSnafu { dir: &data_home })?; - info!("The file storage home is: {}", &data_home); + .context(error::CreateDirSnafu { dir: data_home })?; + info!("The file storage home is: {}", data_home); let atomic_write_dir = format!("{data_home}.tmp/"); store::clean_temp_dir(&atomic_write_dir)?; let mut builder = FsBuilder::default(); - let _ = builder.root(&data_home).atomic_write_dir(&atomic_write_dir); + let _ = builder.root(data_home).atomic_write_dir(&atomic_write_dir); let object_store = ObjectStore::new(builder) .context(error::InitBackendSnafu)? diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs index 1d0f828cba..9176f5fb6d 100644 --- a/src/datanode/src/tests/test_util.rs +++ b/src/datanode/src/tests/test_util.rs @@ -74,9 +74,8 @@ fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard) ..Default::default() }, storage: StorageConfig { - store: ObjectStoreConfig::File(FileConfig { - data_home: data_tmp_dir.path().to_str().unwrap().to_string(), - }), + data_home: data_tmp_dir.path().to_str().unwrap().to_string(), + store: ObjectStoreConfig::File(FileConfig {}), ..Default::default() }, mode: Mode::Standalone, diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index d55c100547..488c28fabe 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -6,7 +6,6 @@ license.workspace = true [features] mock = [] -greptimedb-telemetry = [] [dependencies] anymap = "1.0.0-beta.2" diff --git a/src/meta-srv/src/greptimedb_telemetry.rs b/src/meta-srv/src/greptimedb_telemetry.rs index 377fe4d214..ea0685a53f 100644 --- a/src/meta-srv/src/greptimedb_telemetry.rs +++ b/src/meta-srv/src/greptimedb_telemetry.rs @@ -12,77 +12,67 @@ // See the License for the specific language governing permissions and // limitations under the License. -#[cfg(feature = "greptimedb-telemetry")] -pub mod telemetry { - use std::sync::Arc; +use std::sync::Arc; - use async_trait::async_trait; - use common_greptimedb_telemetry::{ - default_get_uuid, Collector, GreptimeDBTelemetry, GreptimeDBTelemetryTask, - Mode as VersionReporterMode, TELEMETRY_INTERVAL, - }; +use async_trait::async_trait; +use common_greptimedb_telemetry::{ + default_get_uuid, Collector, GreptimeDBTelemetry, GreptimeDBTelemetryTask, + Mode as VersionReporterMode, TELEMETRY_INTERVAL, +}; - use crate::cluster::MetaPeerClientRef; +use crate::cluster::MetaPeerClientRef; - struct DistributedGreptimeDBTelemetryCollector { - meta_peer_client: MetaPeerClientRef, - uuid: Option, - retry: i32, +struct DistributedGreptimeDBTelemetryCollector { + meta_peer_client: MetaPeerClientRef, + uuid: Option, + retry: i32, +} + +#[async_trait] +impl Collector for DistributedGreptimeDBTelemetryCollector { + fn get_mode(&self) -> VersionReporterMode { + VersionReporterMode::Distributed } - #[async_trait] - impl Collector for DistributedGreptimeDBTelemetryCollector { - fn get_mode(&self) -> VersionReporterMode { - VersionReporterMode::Distributed - } - - async fn get_nodes(&self) -> Option { - self.meta_peer_client.get_node_cnt().await.ok() - } - - fn get_retry(&self) -> i32 { - self.retry - } - - fn inc_retry(&mut self) { - self.retry += 1; - } - - fn set_uuid_cache(&mut self, uuid: String) { - self.uuid = Some(uuid); - } - - fn get_uuid_cache(&self) -> Option { - self.uuid.clone() - } + async fn get_nodes(&self) -> Option { + self.meta_peer_client.get_node_cnt().await.ok() } - pub async fn get_greptimedb_telemetry_task( - meta_peer_client: MetaPeerClientRef, - ) -> Arc { - Arc::new(GreptimeDBTelemetryTask::enable( - TELEMETRY_INTERVAL, - Box::new(GreptimeDBTelemetry::new(Box::new( - DistributedGreptimeDBTelemetryCollector { - meta_peer_client, - uuid: default_get_uuid(), - retry: 0, - }, - ))), - )) + fn get_retry(&self) -> i32 { + self.retry + } + + fn inc_retry(&mut self) { + self.retry += 1; + } + + fn set_uuid_cache(&mut self, uuid: String) { + self.uuid = Some(uuid); + } + + fn get_uuid_cache(&self) -> Option { + self.uuid.clone() } } -#[cfg(not(feature = "greptimedb-telemetry"))] -pub mod telemetry { - use std::sync::Arc; - - use common_greptimedb_telemetry::GreptimeDBTelemetryTask; - - use crate::cluster::MetaPeerClientRef; - pub async fn get_greptimedb_telemetry_task( - _: MetaPeerClientRef, - ) -> Arc { - Arc::new(GreptimeDBTelemetryTask::disable()) +pub async fn get_greptimedb_telemetry_task( + working_home: Option, + meta_peer_client: MetaPeerClientRef, + enable: bool, +) -> Arc { + if !enable || cfg!(test) { + return Arc::new(GreptimeDBTelemetryTask::disable()); } + + Arc::new(GreptimeDBTelemetryTask::enable( + TELEMETRY_INTERVAL, + Box::new(GreptimeDBTelemetry::new( + working_home.clone(), + Box::new(DistributedGreptimeDBTelemetryCollector { + meta_peer_client, + uuid: default_get_uuid(&working_home), + retry: 0, + }), + )), + )) } diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index e081d3fb8e..31b0a68888 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -42,7 +42,6 @@ pub use crate::error::Result; mod inactive_node_manager; mod greptimedb_telemetry; -use greptimedb_telemetry::telemetry; #[cfg(test)] mod test_util; diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 6e6a6961b3..2afc782dac 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -44,6 +44,7 @@ use crate::sequence::SequenceRef; use crate::service::mailbox::MailboxRef; use crate::service::store::kv::{KvStoreRef, ResettableKvStoreRef}; pub const TABLE_ID_SEQ: &str = "table_id"; +const METASRV_HOME: &str = "/tmp/metasrv"; #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde(default)] @@ -59,6 +60,8 @@ pub struct MetaSrvOptions { pub logging: LoggingOptions, pub procedure: ProcedureConfig, pub datanode: DatanodeOptions, + pub enable_telemetry: bool, + pub data_home: String, } impl Default for MetaSrvOptions { @@ -72,9 +75,14 @@ impl Default for MetaSrvOptions { use_memory_store: false, disable_region_failover: false, http_opts: HttpOptions::default(), - logging: LoggingOptions::default(), + logging: LoggingOptions { + dir: format!("{METASRV_HOME}/logs"), + ..Default::default() + }, procedure: ProcedureConfig::default(), datanode: DatanodeOptions::default(), + enable_telemetry: true, + data_home: METASRV_HOME.to_string(), } } } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index ebfbb5da31..8b76a537e3 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -25,6 +25,7 @@ use common_procedure::ProcedureManagerRef; use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef}; use crate::ddl::{DdlManager, DdlManagerRef}; use crate::error::Result; +use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; use crate::handler::mailbox_handler::MailboxHandler; use crate::handler::publish_heartbeat_handler::PublishHeartbeatHandler; use crate::handler::region_lease_handler::RegionLeaseHandler; @@ -48,7 +49,6 @@ use crate::service::mailbox::MailboxRef; use crate::service::store::cached_kv::{CheckLeader, LeaderCachedKvStore}; use crate::service::store::kv::{KvBackendAdapter, KvStoreRef, ResettableKvStoreRef}; use crate::service::store::memory::MemStore; -use crate::telemetry::get_greptimedb_telemetry_task; // TODO(fys): try use derive_builder macro pub struct MetaSrvBuilder { @@ -234,6 +234,9 @@ impl MetaSrvBuilder { } }; + let enable_telemetry = options.enable_telemetry; + let metasrv_home = options.data_home.to_string(); + Ok(MetaSrv { started, options, @@ -251,7 +254,12 @@ impl MetaSrvBuilder { mailbox, ddl_manager, table_metadata_manager, - greptimedb_telemetry_task: get_greptimedb_telemetry_task(meta_peer_client).await, + greptimedb_telemetry_task: get_greptimedb_telemetry_task( + Some(metasrv_home), + meta_peer_client, + enable_telemetry, + ) + .await, pubsub, }) } diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 683ed91055..8b5cff7663 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -44,12 +44,12 @@ use tonic::transport::Server; use tower::service_fn; use crate::test_util::{ - create_datanode_opts, create_tmp_dir_and_datanode_opts, StorageGuard, StorageType, WalGuard, + create_datanode_opts, create_tmp_dir_and_datanode_opts, FileDirGuard, StorageGuard, StorageType, }; pub struct GreptimeDbCluster { pub storage_guards: Vec, - _wal_guards: Vec, + pub _dir_guards: Vec, pub datanode_instances: HashMap>, pub datanode_heartbeat_tasks: HashMap>, @@ -93,7 +93,7 @@ impl GreptimeDbClusterBuilder { let meta_srv = self.build_metasrv(datanode_clients.clone()).await; - let (datanode_instances, heartbeat_tasks, storage_guards, wal_guards) = + let (datanode_instances, heartbeat_tasks, storage_guards, dir_guards) = self.build_datanodes(meta_srv.clone(), datanodes).await; build_datanode_clients(datanode_clients.clone(), &datanode_instances, datanodes).await; @@ -109,7 +109,7 @@ impl GreptimeDbClusterBuilder { GreptimeDbCluster { storage_guards, - _wal_guards: wal_guards, + _dir_guards: dir_guards, datanode_instances, datanode_heartbeat_tasks: heartbeat_tasks, kv_store: self.kv_store.clone(), @@ -149,22 +149,26 @@ impl GreptimeDbClusterBuilder { HashMap>, HashMap>, Vec, - Vec, + Vec, ) { let mut instances = HashMap::with_capacity(datanodes as usize); let mut heartbeat_tasks = HashMap::with_capacity(datanodes as usize); let mut storage_guards = Vec::with_capacity(datanodes as usize); - let mut wal_guards = Vec::with_capacity(datanodes as usize); + let mut dir_guards = Vec::with_capacity(datanodes as usize); for i in 0..datanodes { let datanode_id = i as u64 + 1; let mut opts = if let Some(store_config) = &self.store_config { + let home_tmp_dir = create_temp_dir(&format!("gt_home_{}", &self.cluster_name)); + let home_dir = home_tmp_dir.path().to_str().unwrap().to_string(); + let wal_tmp_dir = create_temp_dir(&format!("gt_wal_{}", &self.cluster_name)); let wal_dir = wal_tmp_dir.path().to_str().unwrap().to_string(); - wal_guards.push(WalGuard(wal_tmp_dir)); + dir_guards.push(FileDirGuard::new(home_tmp_dir, false)); + dir_guards.push(FileDirGuard::new(wal_tmp_dir, true)); - create_datanode_opts(store_config.clone(), wal_dir) + create_datanode_opts(store_config.clone(), home_dir, wal_dir) } else { let (opts, guard) = create_tmp_dir_and_datanode_opts( StorageType::File, @@ -172,7 +176,8 @@ impl GreptimeDbClusterBuilder { ); storage_guards.push(guard.storage_guard); - wal_guards.push(guard.wal_guard); + dir_guards.push(guard.home_guard); + dir_guards.push(guard.wal_guard); opts }; @@ -184,7 +189,7 @@ impl GreptimeDbClusterBuilder { let _ = instances.insert(datanode_id, dn_instance.0.clone()); let _ = heartbeat_tasks.insert(datanode_id, dn_instance.1); } - (instances, heartbeat_tasks, storage_guards, wal_guards) + (instances, heartbeat_tasks, storage_guards, dir_guards) } async fn wait_datanodes_alive( diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index b1a10beb75..e1f08960d8 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -135,10 +135,7 @@ fn s3_test_config() -> S3Config { } } -pub fn get_test_store_config( - store_type: &StorageType, - name: &str, -) -> (ObjectStoreConfig, TempDirGuard) { +pub fn get_test_store_config(store_type: &StorageType) -> (ObjectStoreConfig, TempDirGuard) { let _ = dotenv::dotenv(); match store_type { @@ -243,21 +240,12 @@ pub fn get_test_store_config( (config, TempDirGuard::S3(TempFolder::new(&store, "/"))) } - StorageType::File => { - let data_tmp_dir = create_temp_dir(&format!("gt_data_{name}")); - - ( - ObjectStoreConfig::File(FileConfig { - data_home: data_tmp_dir.path().to_str().unwrap().to_string(), - }), - TempDirGuard::File(data_tmp_dir), - ) - } + StorageType::File => (ObjectStoreConfig::File(FileConfig {}), TempDirGuard::None), } } pub enum TempDirGuard { - File(TempDir), + None, S3(TempFolder), Oss(TempFolder), Azblob(TempFolder), @@ -265,11 +253,21 @@ pub enum TempDirGuard { } pub struct TestGuard { - pub wal_guard: WalGuard, + pub home_guard: FileDirGuard, + pub wal_guard: FileDirGuard, pub storage_guard: StorageGuard, } -pub struct WalGuard(pub TempDir); +pub struct FileDirGuard { + pub temp_dir: TempDir, + pub is_wal: bool, +} + +impl FileDirGuard { + pub fn new(temp_dir: TempDir, is_wal: bool) -> Self { + Self { temp_dir, is_wal } + } +} pub struct StorageGuard(pub TempDirGuard); @@ -289,28 +287,36 @@ pub fn create_tmp_dir_and_datanode_opts( store_type: StorageType, name: &str, ) -> (DatanodeOptions, TestGuard) { + let home_tmp_dir = create_temp_dir(&format!("gt_data_{name}")); let wal_tmp_dir = create_temp_dir(&format!("gt_wal_{name}")); + let home_dir = home_tmp_dir.path().to_str().unwrap().to_string(); let wal_dir = wal_tmp_dir.path().to_str().unwrap().to_string(); - let (store, data_tmp_dir) = get_test_store_config(&store_type, name); - let opts = create_datanode_opts(store, wal_dir); + let (store, data_tmp_dir) = get_test_store_config(&store_type); + let opts = create_datanode_opts(store, home_dir, wal_dir); ( opts, TestGuard { - wal_guard: WalGuard(wal_tmp_dir), + home_guard: FileDirGuard::new(home_tmp_dir, false), + wal_guard: FileDirGuard::new(wal_tmp_dir, true), storage_guard: StorageGuard(data_tmp_dir), }, ) } -pub fn create_datanode_opts(store: ObjectStoreConfig, wal_dir: String) -> DatanodeOptions { +pub fn create_datanode_opts( + store: ObjectStoreConfig, + home_dir: String, + wal_dir: String, +) -> DatanodeOptions { DatanodeOptions { wal: WalConfig { dir: Some(wal_dir), ..Default::default() }, storage: StorageConfig { + data_home: home_dir, store, ..Default::default() }, diff --git a/tests-integration/src/tests.rs b/tests-integration/src/tests.rs index 436ac5f195..a987b2fe11 100644 --- a/tests-integration/src/tests.rs +++ b/tests-integration/src/tests.rs @@ -27,21 +27,16 @@ use frontend::instance::Instance; use table::engine::{region_name, table_dir}; use crate::cluster::{GreptimeDbCluster, GreptimeDbClusterBuilder}; -use crate::test_util::{create_tmp_dir_and_datanode_opts, StorageType, TempDirGuard, TestGuard}; +use crate::test_util::{create_tmp_dir_and_datanode_opts, StorageType, TestGuard}; pub struct MockDistributedInstance(GreptimeDbCluster); impl MockDistributedInstance { pub fn data_tmp_dirs(&self) -> Vec<&TempDir> { self.0 - .storage_guards + ._dir_guards .iter() - .map(|g| { - let TempDirGuard::File(dir) = &g.0 else { - unreachable!() - }; - dir - }) + .filter_map(|d| if !d.is_wal { Some(&d.temp_dir) } else { None }) .collect() } @@ -65,10 +60,7 @@ pub struct MockStandaloneInstance { impl MockStandaloneInstance { pub fn data_tmp_dir(&self) -> &TempDir { - let TempDirGuard::File(dir) = &self._guard.storage_guard.0 else { - unreachable!() - }; - dir + &self._guard.home_guard.temp_dir } } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index d408137b75..97dac9b595 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -597,6 +597,7 @@ pub async fn test_config_api(store_type: StorageType) { enable_memory_catalog = false rpc_addr = "127.0.0.1:3001" rpc_runtime_size = 8 + enable_telemetry = true [heartbeat] interval_millis = 5000 diff --git a/tests-integration/tests/region_failover.rs b/tests-integration/tests/region_failover.rs index e904f75274..8af0d59d05 100644 --- a/tests-integration/tests/region_failover.rs +++ b/tests-integration/tests/region_failover.rs @@ -79,13 +79,18 @@ macro_rules! region_failover_tests { } pub async fn test_region_failover(store_type: StorageType) { + if store_type == StorageType::File { + // Region failover doesn't make sense when using local file storage. + return; + } common_telemetry::init_default_ut_logging(); + info!("Running region failover test for {}", store_type); let mut logical_timer = 1685508715000; let cluster_name = "test_region_failover"; - let (store_config, _guard) = get_test_store_config(&store_type, cluster_name); + let (store_config, _guard) = get_test_store_config(&store_type); let datanodes = 5u64; let cluster = GreptimeDbClusterBuilder::new(cluster_name)