diff --git a/Cargo.lock b/Cargo.lock index 51dd95a122..523b30086b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1591,7 +1591,6 @@ version = "0.3.2" dependencies = [ "anymap", "async-trait", - "build-data", "catalog", "chrono", "clap 3.2.25", @@ -1603,6 +1602,7 @@ dependencies = [ "common-recordbatch", "common-telemetry", "common-test-util", + "common-version", "config", "datanode", "datatypes", @@ -1756,6 +1756,25 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "common-greptimedb-telemetry" +version = "0.3.2" +dependencies = [ + "async-trait", + "common-error", + "common-runtime", + "common-telemetry", + "common-test-util", + "common-version", + "hyper", + "once_cell", + "reqwest", + "serde", + "serde_json", + "tokio", + "uuid", +] + [[package]] name = "common-grpc" version = "0.3.2" @@ -1835,9 +1854,11 @@ dependencies = [ "datatypes", "etcd-client", "futures", + "hyper", "lazy_static", "prost", "regex", + "reqwest", "serde", "serde_json", "snafu", @@ -1990,6 +2011,13 @@ dependencies = [ "snafu", ] +[[package]] +name = "common-version" +version = "0.3.2" +dependencies = [ + "build-data", +] + [[package]] name = "concurrent-queue" version = "2.2.0" @@ -2636,6 +2664,7 @@ dependencies = [ "common-datasource", "common-error", "common-function", + "common-greptimedb-telemetry", "common-grpc", "common-grpc-expr", "common-meta", @@ -2684,6 +2713,7 @@ dependencies = [ "tower", "tower-http", "url", + "uuid", ] [[package]] @@ -3224,6 +3254,21 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.0" @@ -4446,6 +4491,19 @@ dependencies = [ "tokio-io-timeout", ] +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes", + "hyper", + "native-tls", + "tokio", + "tokio-native-tls", +] + [[package]] name = "iana-time-zone" version = "0.1.56" @@ -5285,6 +5343,7 @@ dependencies = [ "common-base", "common-catalog", "common-error", + "common-greptimedb-telemetry", "common-grpc", "common-grpc-expr", "common-meta", @@ -5322,6 +5381,7 @@ dependencies = [ "tracing-subscriber", "typetag", "url", + "uuid", ] [[package]] @@ -5737,6 +5797,24 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "native-tls" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e" +dependencies = [ + "lazy_static", + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "new_debug_unreachable" version = "1.0.4" @@ -6096,12 +6174,50 @@ dependencies = [ "tokio-rustls 0.24.0", ] +[[package]] +name = "openssl" +version = "0.10.54" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69b3f656a17a6cbc115b5c7a40c616947d213ba182135b014d6051b73ab6f019" +dependencies = [ + "bitflags 1.3.2", + "cfg-if 1.0.0", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.18", +] + [[package]] name = "openssl-probe" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "openssl-sys" +version = "0.9.88" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2ce0f250f34a308dcfdbb351f511359857d4ed2134ba715a4eadd46e1ffd617" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "opentelemetry" version = "0.17.0" @@ -7640,11 +7756,13 @@ dependencies = [ "http-body", "hyper", "hyper-rustls", + "hyper-tls", "ipnet", "js-sys", "log", "mime", "mime_guess", + "native-tls", "once_cell", "percent-encoding", "pin-project-lite", @@ -7655,6 +7773,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "tokio", + "tokio-native-tls", "tokio-rustls 0.24.0", "tokio-util", "tower-service", @@ -10130,6 +10249,16 @@ dependencies = [ "syn 2.0.18", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-postgres" version = "0.7.8" diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index 6846ff27bd..0c1bf280b9 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -13,6 +13,10 @@ path = "src/bin/greptime.rs" default = ["metrics-process"] tokio-console = ["common-telemetry/tokio-console"] metrics-process = ["servers/metrics-process"] +greptimedb-telemetry = [ + "datanode/greptimedb-telemetry", + "meta-srv/greptimedb-telemetry", +] [dependencies] anymap = "1.0.0-beta.2" @@ -61,4 +65,4 @@ serde.workspace = true toml.workspace = true [build-dependencies] -build-data = "0.1.4" +common-version = { path = "../common/version" } diff --git a/src/cmd/build.rs b/src/cmd/build.rs index d141625b65..85d72f6df2 100644 --- a/src/cmd/build.rs +++ b/src/cmd/build.rs @@ -12,22 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -const DEFAULT_VALUE: &str = "unknown"; fn main() { - println!( - "cargo:rustc-env=GIT_COMMIT={}", - build_data::get_git_commit().unwrap_or_else(|_| DEFAULT_VALUE.to_string()) - ); - println!( - "cargo:rustc-env=GIT_COMMIT_SHORT={}", - build_data::get_git_commit_short().unwrap_or_else(|_| DEFAULT_VALUE.to_string()) - ); - println!( - "cargo:rustc-env=GIT_BRANCH={}", - build_data::get_git_branch().unwrap_or_else(|_| DEFAULT_VALUE.to_string()) - ); - println!( - "cargo:rustc-env=GIT_DIRTY={}", - build_data::get_git_dirty().map_or(DEFAULT_VALUE.to_string(), |v| v.to_string()) - ); + common_version::setup_git_versions(); } diff --git a/src/common/greptimedb-telemetry/Cargo.toml b/src/common/greptimedb-telemetry/Cargo.toml new file mode 100644 index 0000000000..63fb4cf84d --- /dev/null +++ b/src/common/greptimedb-telemetry/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "common-greptimedb-telemetry" +version.workspace = true +edition.workspace = true +license.workspace = true + +[dependencies] +async-trait.workspace = true +common-error = { path = "../error" } +common-runtime = { path = "../runtime" } +common-telemetry = { path = "../telemetry" } +serde.workspace = true +serde_json.workspace = true +tokio.workspace = true +reqwest = { version = "0.11", features = ["json"] } +uuid.workspace = true +once_cell = "1.17.0" + +[dev-dependencies] +hyper = { version = "0.14", features = ["full"] } +common-test-util = { path = "../test-util" } + +[build-dependencies] +common-version = { path = "../version" } diff --git a/src/common/greptimedb-telemetry/build.rs b/src/common/greptimedb-telemetry/build.rs new file mode 100644 index 0000000000..85d72f6df2 --- /dev/null +++ b/src/common/greptimedb-telemetry/build.rs @@ -0,0 +1,17 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +fn main() { + common_version::setup_git_versions(); +} diff --git a/src/common/greptimedb-telemetry/src/lib.rs b/src/common/greptimedb-telemetry/src/lib.rs new file mode 100644 index 0000000000..551f137570 --- /dev/null +++ b/src/common/greptimedb-telemetry/src/lib.rs @@ -0,0 +1,371 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::env; +use std::io::ErrorKind; +use std::path::PathBuf; +use std::time::Duration; + +use common_runtime::error::{Error, Result}; +use common_runtime::{BoxedTaskFunction, RepeatedTask, Runtime, TaskFunction}; +use common_telemetry::debug; +use once_cell::sync::Lazy; +use reqwest::{Client, Response}; +use serde::{Deserialize, Serialize}; + +pub const TELEMETRY_URL: &str = "https://api-preview.greptime.cloud/db/otel/statistics"; + +// Getting the right path when running on windows +static TELEMETRY_UUID_FILE_NAME: Lazy = Lazy::new(|| { + let mut path = PathBuf::new(); + path.push(env::temp_dir()); + path.push(".greptimedb-telemetry-uuid"); + path +}); + +pub static TELEMETRY_INTERVAL: Duration = Duration::from_secs(60 * 30); + +const GREPTIMEDB_TELEMETRY_CLIENT_CONNECT_TIMEOUT: Duration = Duration::from_secs(10); +const GREPTIMEDB_TELEMETRY_CLIENT_TIMEOUT: Duration = Duration::from_secs(10); + +pub enum GreptimeDBTelemetryTask { + Enable(RepeatedTask), + Disable, +} + +impl GreptimeDBTelemetryTask { + pub fn enable(interval: Duration, task_fn: BoxedTaskFunction) -> Self { + GreptimeDBTelemetryTask::Enable(RepeatedTask::new(interval, task_fn)) + } + + pub fn disable() -> Self { + GreptimeDBTelemetryTask::Disable + } + + pub fn start(&self, runtime: Runtime) -> Result<()> { + match self { + GreptimeDBTelemetryTask::Enable(task) => task.start(runtime), + GreptimeDBTelemetryTask::Disable => Ok(()), + } + } + + pub async fn stop(&self) -> Result<()> { + match self { + GreptimeDBTelemetryTask::Enable(task) => task.stop().await, + GreptimeDBTelemetryTask::Disable => Ok(()), + } + } +} + +#[derive(Serialize, Deserialize, Debug)] +struct StatisticData { + pub os: String, + pub version: String, + pub arch: String, + pub mode: Mode, + pub git_commit: String, + pub nodes: Option, + pub uuid: String, +} + +#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)] +#[serde(rename_all = "lowercase")] +pub enum Mode { + Distributed, + Standalone, +} + +#[async_trait::async_trait] +pub trait Collector { + fn get_version(&self) -> String { + env!("CARGO_PKG_VERSION").to_string() + } + + fn get_git_hash(&self) -> String { + env!("GIT_COMMIT").to_string() + } + + fn get_os(&self) -> String { + env::consts::OS.to_string() + } + + fn get_arch(&self) -> String { + env::consts::ARCH.to_string() + } + + fn get_mode(&self) -> Mode; + + fn get_retry(&self) -> i32; + + fn inc_retry(&mut self); + + fn set_uuid_cache(&mut self, uuid: String); + + fn get_uuid_cache(&self) -> Option; + + async fn get_nodes(&self) -> Option; + + fn get_uuid(&mut self) -> Option { + match self.get_uuid_cache() { + Some(uuid) => Some(uuid), + None => { + if self.get_retry() > 3 { + return None; + } + match default_get_uuid() { + Some(uuid) => { + self.set_uuid_cache(uuid.clone()); + Some(uuid) + } + None => { + self.inc_retry(); + None + } + } + } + } + } +} + +pub fn default_get_uuid() -> Option { + let path = (*TELEMETRY_UUID_FILE_NAME).as_path(); + match std::fs::read(path) { + Ok(bytes) => Some(String::from_utf8_lossy(&bytes).to_string()), + Err(e) => { + if e.kind() == ErrorKind::NotFound { + let uuid = uuid::Uuid::new_v4().to_string(); + let _ = std::fs::write(path, uuid.as_bytes()); + Some(uuid) + } else { + None + } + } + } +} + +/// Report version info to GreptimeDB. +/// We do not collect any identity-sensitive information. +/// This task is scheduled to run every 30 minutes. +/// The task will be disabled default. It can be enabled by setting the build feature `greptimedb-telemetry` +/// Collector is used to collect the version info. It can be implemented by different components. +/// client is used to send the HTTP request to GreptimeDB. +/// telemetry_url is the GreptimeDB url. +pub struct GreptimeDBTelemetry { + statistics: Box, + client: Option, + telemetry_url: &'static str, +} + +#[async_trait::async_trait] +impl TaskFunction for GreptimeDBTelemetry { + fn name(&self) -> &str { + "Greptimedb-telemetry-task" + } + + async fn call(&mut self) -> Result<()> { + self.report_telemetry_info().await; + Ok(()) + } +} + +impl GreptimeDBTelemetry { + pub fn new(statistics: Box) -> Self { + let client = Client::builder() + .connect_timeout(GREPTIMEDB_TELEMETRY_CLIENT_CONNECT_TIMEOUT) + .timeout(GREPTIMEDB_TELEMETRY_CLIENT_TIMEOUT) + .build(); + Self { + statistics, + client: client.ok(), + telemetry_url: TELEMETRY_URL, + } + } + + pub async fn report_telemetry_info(&mut self) -> Option { + match self.statistics.get_uuid() { + Some(uuid) => { + let data = StatisticData { + os: self.statistics.get_os(), + version: self.statistics.get_version(), + git_commit: self.statistics.get_git_hash(), + arch: self.statistics.get_arch(), + mode: self.statistics.get_mode(), + nodes: self.statistics.get_nodes().await, + uuid, + }; + + if let Some(client) = self.client.as_ref() { + debug!("report version: {:?}", data); + let result = client.post(self.telemetry_url).json(&data).send().await; + debug!("report version result: {:?}", result); + result.ok() + } else { + None + } + } + None => None, + } + } +} + +#[cfg(test)] +mod tests { + use std::convert::Infallible; + use std::env; + use std::sync::atomic::AtomicUsize; + use std::time::Duration; + + use common_test_util::ports; + use hyper::service::{make_service_fn, service_fn}; + use hyper::Server; + use reqwest::Client; + use tokio::spawn; + + use crate::{Collector, GreptimeDBTelemetry, Mode, StatisticData}; + + static COUNT: AtomicUsize = std::sync::atomic::AtomicUsize::new(0); + + async fn echo(req: hyper::Request) -> hyper::Result> { + let path = req.uri().path(); + if path == "/req-cnt" { + let body = hyper::Body::from(format!( + "{}", + COUNT.load(std::sync::atomic::Ordering::SeqCst) + )); + Ok(hyper::Response::new(body)) + } else { + COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + Ok(hyper::Response::new(req.into_body())) + } + } + + #[tokio::test] + async fn test_gretimedb_telemetry() { + let (tx, rx) = tokio::sync::oneshot::channel::<()>(); + let port: u16 = ports::get_port() as u16; + spawn(async move { + let make_svc = make_service_fn(|_conn| { + // This is the `Service` that will handle the connection. + // `service_fn` is a helper to convert a function that + // returns a Response into a `Service`. + async { Ok::<_, Infallible>(service_fn(echo)) } + }); + let addr = ([127, 0, 0, 1], port).into(); + + let server = Server::bind(&addr).serve(make_svc); + let graceful = server.with_graceful_shutdown(async { + rx.await.ok(); + }); + let _ = graceful.await; + Ok::<_, Infallible>(()) + }); + struct TestStatistic; + + struct FailedStatistic; + + #[async_trait::async_trait] + impl Collector for TestStatistic { + fn get_mode(&self) -> Mode { + Mode::Standalone + } + + async fn get_nodes(&self) -> Option { + Some(1) + } + + fn get_retry(&self) -> i32 { + unimplemented!() + } + + fn inc_retry(&mut self) { + unimplemented!() + } + + fn set_uuid_cache(&mut self, _: String) { + unimplemented!() + } + + fn get_uuid_cache(&self) -> Option { + unimplemented!() + } + + fn get_uuid(&mut self) -> Option { + Some("test".to_string()) + } + } + + #[async_trait::async_trait] + impl Collector for FailedStatistic { + fn get_mode(&self) -> Mode { + Mode::Standalone + } + + async fn get_nodes(&self) -> Option { + None + } + + fn get_retry(&self) -> i32 { + unimplemented!() + } + + fn inc_retry(&mut self) { + unimplemented!() + } + + fn set_uuid_cache(&mut self, _: String) { + unimplemented!() + } + + fn get_uuid_cache(&self) -> Option { + unimplemented!() + } + + fn get_uuid(&mut self) -> Option { + None + } + } + + let test_statistic = Box::new(TestStatistic); + let mut test_report = GreptimeDBTelemetry::new(test_statistic); + let url = Box::leak(format!("{}:{}", "http://localhost", port).into_boxed_str()); + test_report.telemetry_url = url; + let response = test_report.report_telemetry_info().await.unwrap(); + + let body = response.json::().await.unwrap(); + assert_eq!(env::consts::ARCH, body.arch); + assert_eq!(env::consts::OS, body.os); + assert_eq!(env!("CARGO_PKG_VERSION"), body.version); + assert_eq!(env!("GIT_COMMIT"), body.git_commit); + assert_eq!(Mode::Standalone, body.mode); + assert_eq!(1, body.nodes.unwrap()); + + let failed_statistic = Box::new(FailedStatistic); + let mut failed_report = GreptimeDBTelemetry::new(failed_statistic); + failed_report.telemetry_url = url; + let response = failed_report.report_telemetry_info().await; + assert!(response.is_none()); + + let client = Client::builder() + .connect_timeout(Duration::from_secs(3)) + .timeout(Duration::from_secs(3)) + .build() + .unwrap(); + + let cnt_url = format!("{}/req-cnt", url); + let response = client.get(cnt_url).send().await.unwrap(); + let body = response.text().await.unwrap(); + assert_eq!("1", body); + tx.send(()).unwrap(); + } +} diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 027fe0e047..9474ccc20d 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -24,7 +24,9 @@ snafu.workspace = true store-api = { path = "../../store-api" } table = { path = "../../table" } tokio.workspace = true +reqwest = { version = "0.11", features = ["json"] } [dev-dependencies] chrono.workspace = true datatypes = { path = "../../datatypes" } +hyper = { version = "0.14", features = ["full"] } diff --git a/src/common/version/Cargo.toml b/src/common/version/Cargo.toml new file mode 100644 index 0000000000..7dd296e0d5 --- /dev/null +++ b/src/common/version/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "common-version" +version.workspace = true +edition.workspace = true +license.workspace = true + +[dependencies] +build-data = "0.1.4" diff --git a/src/common/version/src/lib.rs b/src/common/version/src/lib.rs new file mode 100644 index 0000000000..28ceaf8129 --- /dev/null +++ b/src/common/version/src/lib.rs @@ -0,0 +1,35 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +const DEFAULT_VALUE: &str = "unknown"; + +#[allow(clippy::print_stdout)] +pub fn setup_git_versions() { + println!( + "cargo:rustc-env=GIT_COMMIT={}", + build_data::get_git_commit().unwrap_or_else(|_| DEFAULT_VALUE.to_string()) + ); + println!( + "cargo:rustc-env=GIT_COMMIT_SHORT={}", + build_data::get_git_commit_short().unwrap_or_else(|_| DEFAULT_VALUE.to_string()) + ); + println!( + "cargo:rustc-env=GIT_BRANCH={}", + build_data::get_git_branch().unwrap_or_else(|_| DEFAULT_VALUE.to_string()) + ); + println!( + "cargo:rustc-env=GIT_DIRTY={}", + build_data::get_git_dirty().map_or(DEFAULT_VALUE.to_string(), |v| v.to_string()) + ); +} diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 10bc3c068a..ab40b469fb 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -4,6 +4,9 @@ version.workspace = true edition.workspace = true license.workspace = true +[features] +greptimedb-telemetry = [] + [dependencies] async-compat = "0.2" async-stream.workspace = true @@ -27,6 +30,7 @@ common-recordbatch = { path = "../common/recordbatch" } common-runtime = { path = "../common/runtime" } common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } +common-greptimedb-telemetry = { path = "../common/greptimedb-telemetry" } datafusion.workspace = true datafusion-common.workspace = true datafusion-expr.workspace = true @@ -65,6 +69,7 @@ tonic.workspace = true tower = { version = "0.4", features = ["full"] } tower-http = { version = "0.3", features = ["full"] } url = "2.3.1" +uuid.workspace = true [dev-dependencies] axum-test-helper = { git = "https://github.com/sunng87/axum-test-helper.git", branch = "patch-1" } diff --git a/src/datanode/src/greptimedb_telemetry.rs b/src/datanode/src/greptimedb_telemetry.rs new file mode 100644 index 0000000000..48feba1727 --- /dev/null +++ b/src/datanode/src/greptimedb_telemetry.rs @@ -0,0 +1,83 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[cfg(feature = "greptimedb-telemetry")] +pub mod telemetry { + use std::sync::Arc; + + use async_trait::async_trait; + use common_greptimedb_telemetry::{ + default_get_uuid, Collector, GreptimeDBTelemetry, GreptimeDBTelemetryTask, + Mode as VersionReporterMode, TELEMETRY_INTERVAL, + }; + use servers::Mode; + + struct StandaloneGreptimeDBTelemetryCollector { + uuid: Option, + retry: i32, + } + #[async_trait] + impl Collector for StandaloneGreptimeDBTelemetryCollector { + fn get_mode(&self) -> VersionReporterMode { + VersionReporterMode::Standalone + } + + async fn get_nodes(&self) -> Option { + Some(1) + } + + fn get_retry(&self) -> i32 { + self.retry + } + + fn inc_retry(&mut self) { + self.retry += 1; + } + + fn set_uuid_cache(&mut self, uuid: String) { + self.uuid = Some(uuid); + } + + fn get_uuid_cache(&self) -> Option { + self.uuid.clone() + } + } + + pub async fn get_greptimedb_telemetry_task(mode: &Mode) -> Arc { + match mode { + Mode::Standalone => Arc::new(GreptimeDBTelemetryTask::enable( + TELEMETRY_INTERVAL, + Box::new(GreptimeDBTelemetry::new(Box::new( + StandaloneGreptimeDBTelemetryCollector { + uuid: default_get_uuid(), + retry: 0, + }, + ))), + )), + Mode::Distributed => Arc::new(GreptimeDBTelemetryTask::disable()), + } + } +} + +#[cfg(not(feature = "greptimedb-telemetry"))] +pub mod telemetry { + use std::sync::Arc; + + use common_greptimedb_telemetry::GreptimeDBTelemetryTask; + use servers::Mode; + + pub async fn get_greptimedb_telemetry_task(_: &Mode) -> Arc { + Arc::new(GreptimeDBTelemetryTask::disable()) + } +} diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 04fdf3f8ae..3196551489 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -25,6 +25,7 @@ use common_base::paths::{CLUSTER_DIR, WAL_DIR}; use common_base::Plugins; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; use common_error::ext::BoxedError; +use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::HandlerGroupExecutor; @@ -32,7 +33,7 @@ use common_meta::key::TableMetadataManager; use common_procedure::local::{LocalManager, ManagerConfig}; use common_procedure::store::state_store::ObjectStateStore; use common_procedure::ProcedureManagerRef; -use common_telemetry::logging::info; +use common_telemetry::logging::{debug, info}; use file_table_engine::engine::immutable::ImmutableFileTableEngine; use log_store::raft_engine::log_store::RaftEngineLogStore; use log_store::LogConfig; @@ -68,6 +69,7 @@ use crate::heartbeat::handler::open_region::OpenRegionHandler; use crate::heartbeat::HeartbeatTask; use crate::sql::{SqlHandler, SqlRequest}; use crate::store; +use crate::telemetry::get_greptimedb_telemetry_task; mod grpc; pub mod sql; @@ -81,6 +83,7 @@ pub struct Instance { pub(crate) catalog_manager: CatalogManagerRef, pub(crate) table_id_provider: Option, procedure_manager: ProcedureManagerRef, + greptimedb_telemerty_task: Arc, } pub type InstanceRef = Arc; @@ -302,6 +305,7 @@ impl Instance { catalog_manager: catalog_manager.clone(), table_id_provider, procedure_manager, + greptimedb_telemerty_task: get_greptimedb_telemetry_task(&opts.mode).await, }); let heartbeat_task = Instance::build_heartbeat_task( @@ -330,6 +334,13 @@ impl Instance { self.procedure_manager .start() .context(StartProcedureManagerSnafu)?; + let _ = self + .greptimedb_telemerty_task + .start(common_runtime::bg_runtime()) + .map_err(|e| { + debug!("Failed to start greptimedb telemetry task: {}", e); + }); + Ok(()) } diff --git a/src/datanode/src/lib.rs b/src/datanode/src/lib.rs index 26ebb69295..0d6cfdd34c 100644 --- a/src/datanode/src/lib.rs +++ b/src/datanode/src/lib.rs @@ -17,6 +17,7 @@ pub mod datanode; pub mod error; +mod greptimedb_telemetry; pub mod heartbeat; pub mod instance; pub mod metrics; @@ -26,3 +27,5 @@ pub mod sql; mod store; #[cfg(test)] mod tests; + +use greptimedb_telemetry::telemetry; diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 5e39e94140..faaab3224a 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -6,6 +6,7 @@ license.workspace = true [features] mock = [] +greptimedb-telemetry = [] [dependencies] anymap = "1.0.0-beta.2" @@ -24,6 +25,7 @@ common-procedure = { path = "../common/procedure" } common-runtime = { path = "../common/runtime" } common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } +common-greptimedb-telemetry = { path = "../common/greptimedb-telemetry" } dashmap = "5.4" datatypes = { path = "../datatypes" } derive_builder.workspace = true @@ -51,6 +53,7 @@ tower = "0.4" typetag = "0.2" url = "2.3" servers = { path = "../servers" } +uuid.workspace = true [dev-dependencies] chrono.workspace = true diff --git a/src/meta-srv/src/cluster.rs b/src/meta-srv/src/cluster.rs index 68e47117ce..14ba5b7354 100644 --- a/src/meta-srv/src/cluster.rs +++ b/src/meta-srv/src/cluster.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::Duration; @@ -50,16 +50,26 @@ pub struct MetaPeerClient { } impl MetaPeerClient { - // Get all datanode stat kvs from leader meta. - pub async fn get_all_dn_stat_kvs(&self) -> Result> { + async fn get_dn_key_value(&self, keys_only: bool) -> Result> { let key = format!("{DN_STAT_PREFIX}-").into_bytes(); let range_end = util::get_prefix_end_key(&key); + self.range(key, range_end, keys_only).await + } - let kvs = self.range(key, range_end).await?; - + // Get all datanode stat kvs from leader meta. + pub async fn get_all_dn_stat_kvs(&self) -> Result> { + let kvs = self.get_dn_key_value(false).await?; to_stat_kv_map(kvs) } + pub async fn get_node_cnt(&self) -> Result { + let kvs = self.get_dn_key_value(true).await?; + kvs.into_iter() + .map(|kv| kv.key.try_into()) + .collect::>>() + .map(|hash_set| hash_set.len() as i32) + } + // Get datanode stat kvs from leader meta by input keys. pub async fn get_dn_stat_kvs(&self, keys: Vec) -> Result> { let stat_keys = keys.into_iter().map(|key| key.into()).collect(); @@ -70,7 +80,12 @@ impl MetaPeerClient { } // Range kv information from the leader's in_mem kv store - pub async fn range(&self, key: Vec, range_end: Vec) -> Result> { + pub async fn range( + &self, + key: Vec, + range_end: Vec, + keys_only: bool, + ) -> Result> { if self.is_leader() { let request = RangeRequest { key, @@ -85,7 +100,10 @@ impl MetaPeerClient { let retry_interval_ms = self.retry_interval_ms; for _ in 0..max_retry_count { - match self.remote_range(key.clone(), range_end.clone()).await { + match self + .remote_range(key.clone(), range_end.clone(), keys_only) + .await + { Ok(kvs) => return Ok(kvs), Err(e) => { if need_retry(&e) { @@ -105,7 +123,12 @@ impl MetaPeerClient { .fail() } - async fn remote_range(&self, key: Vec, range_end: Vec) -> Result> { + async fn remote_range( + &self, + key: Vec, + range_end: Vec, + keys_only: bool, + ) -> Result> { // Safety: when self.is_leader() == false, election must not empty. let election = self.election.as_ref().unwrap(); @@ -119,6 +142,7 @@ impl MetaPeerClient { let request = tonic::Request::new(PbRangeRequest { key, range_end, + keys_only, ..Default::default() }); diff --git a/src/meta-srv/src/greptimedb_telemetry.rs b/src/meta-srv/src/greptimedb_telemetry.rs new file mode 100644 index 0000000000..377fe4d214 --- /dev/null +++ b/src/meta-srv/src/greptimedb_telemetry.rs @@ -0,0 +1,88 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[cfg(feature = "greptimedb-telemetry")] +pub mod telemetry { + use std::sync::Arc; + + use async_trait::async_trait; + use common_greptimedb_telemetry::{ + default_get_uuid, Collector, GreptimeDBTelemetry, GreptimeDBTelemetryTask, + Mode as VersionReporterMode, TELEMETRY_INTERVAL, + }; + + use crate::cluster::MetaPeerClientRef; + + struct DistributedGreptimeDBTelemetryCollector { + meta_peer_client: MetaPeerClientRef, + uuid: Option, + retry: i32, + } + + #[async_trait] + impl Collector for DistributedGreptimeDBTelemetryCollector { + fn get_mode(&self) -> VersionReporterMode { + VersionReporterMode::Distributed + } + + async fn get_nodes(&self) -> Option { + self.meta_peer_client.get_node_cnt().await.ok() + } + + fn get_retry(&self) -> i32 { + self.retry + } + + fn inc_retry(&mut self) { + self.retry += 1; + } + + fn set_uuid_cache(&mut self, uuid: String) { + self.uuid = Some(uuid); + } + + fn get_uuid_cache(&self) -> Option { + self.uuid.clone() + } + } + + pub async fn get_greptimedb_telemetry_task( + meta_peer_client: MetaPeerClientRef, + ) -> Arc { + Arc::new(GreptimeDBTelemetryTask::enable( + TELEMETRY_INTERVAL, + Box::new(GreptimeDBTelemetry::new(Box::new( + DistributedGreptimeDBTelemetryCollector { + meta_peer_client, + uuid: default_get_uuid(), + retry: 0, + }, + ))), + )) + } +} + +#[cfg(not(feature = "greptimedb-telemetry"))] +pub mod telemetry { + use std::sync::Arc; + + use common_greptimedb_telemetry::GreptimeDBTelemetryTask; + + use crate::cluster::MetaPeerClientRef; + pub async fn get_greptimedb_telemetry_task( + _: MetaPeerClientRef, + ) -> Arc { + Arc::new(GreptimeDBTelemetryTask::disable()) + } +} diff --git a/src/meta-srv/src/lease.rs b/src/meta-srv/src/lease.rs index c62d894ec1..7da1c640f3 100644 --- a/src/meta-srv/src/lease.rs +++ b/src/meta-srv/src/lease.rs @@ -44,7 +44,7 @@ where let key = get_lease_prefix(cluster_id); let range_end = util::get_prefix_end_key(&key); - let kvs = meta_peer_client.range(key, range_end).await?; + let kvs = meta_peer_client.range(key, range_end, false).await?; let mut lease_kvs = HashMap::new(); for kv in kvs { let lease_key: LeaseKey = kv.key.try_into()?; diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index 5a0032fa78..a45764609f 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -42,3 +42,6 @@ pub use crate::error::Result; mod inactive_node_manager; #[cfg(test)] mod test_util; + +mod greptimedb_telemetry; +use greptimedb_telemetry::telemetry; diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index e81c1c632e..94fb593258 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -19,12 +19,13 @@ use std::sync::Arc; use api::v1::meta::Peer; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_grpc::channel_manager; use common_meta::key::TableMetadataManagerRef; use common_procedure::options::ProcedureConfig; use common_procedure::ProcedureManagerRef; use common_telemetry::logging::LoggingOptions; -use common_telemetry::{error, info, warn}; +use common_telemetry::{debug, error, info, warn}; use serde::{Deserialize, Serialize}; use servers::http::HttpOptions; use snafu::ResultExt; @@ -175,6 +176,7 @@ pub struct MetaSrv { mailbox: MailboxRef, ddl_manager: DdlManagerRef, table_metadata_manager: TableMetadataManagerRef, + greptimedb_telemerty_task: Arc, } impl MetaSrv { @@ -195,6 +197,7 @@ impl MetaSrv { let in_memory = self.in_memory.clone(); let leader_cached_kv_store = self.leader_cached_kv_store.clone(); let mut rx = election.subscribe_leader_change(); + let task_handler = self.greptimedb_telemerty_task.clone(); let _handle = common_runtime::spawn_bg(async move { loop { match rx.recv().await { @@ -210,9 +213,18 @@ impl MetaSrv { if let Err(e) = procedure_manager.recover().await { error!("Failed to recover procedures, error: {e}"); } + let _ = task_handler.start(common_runtime::bg_runtime()) + .map_err(|e| { + debug!("Failed to start greptimedb telemetry task, error: {e}"); + }); } LeaderChangeMessage::StepDown(leader) => { error!("Leader :{:?} step down", leader); + let _ = task_handler.stop().await.map_err(|e| { + debug!( + "Failed to stop greptimedb telemetry task, error: {e}" + ); + }); } } } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 9b24e208e3..fe562cc0ae 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -46,6 +46,7 @@ use crate::service::mailbox::MailboxRef; use crate::service::store::cached_kv::{CheckLeader, LeaderCachedKvStore}; use crate::service::store::kv::{KvBackendAdapter, KvStoreRef, ResettableKvStoreRef}; use crate::service::store::memory::MemStore; +use crate::telemetry::get_greptimedb_telemetry_task; // TODO(fys): try use derive_builder macro pub struct MetaSrvBuilder { @@ -224,7 +225,7 @@ impl MetaSrvBuilder { in_memory, kv_store, leader_cached_kv_store, - meta_peer_client, + meta_peer_client: meta_peer_client.clone(), table_id_sequence, selector, handler_group, @@ -235,6 +236,7 @@ impl MetaSrvBuilder { mailbox, ddl_manager, table_metadata_manager, + greptimedb_telemerty_task: get_greptimedb_telemetry_task(meta_peer_client).await, }) } }