mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 21:02:58 +00:00
chore: add version reporter (#2007)
* chore: add version reporter * chore: add uuid for version report * chore: add file license * chore: format code * chore: fix by pr comment * chore: change version report api url * chore: change greptimedb opentelemetry crate name * chore: minor code beautification * chore: add keys only option when range etcd * chore: fix by pr comment * chore: fix by pr comment * chore: change uuid file location * chore: only run telemetry in meta leader * chore: add more test and some minor fix * chore: make clippy happy * chore: fix by pr comment * chore: fix by pr comment * chore: add debug log for greptimedb telemetry
This commit is contained in:
131
Cargo.lock
generated
131
Cargo.lock
generated
@@ -1591,7 +1591,6 @@ version = "0.3.2"
|
||||
dependencies = [
|
||||
"anymap",
|
||||
"async-trait",
|
||||
"build-data",
|
||||
"catalog",
|
||||
"chrono",
|
||||
"clap 3.2.25",
|
||||
@@ -1603,6 +1602,7 @@ dependencies = [
|
||||
"common-recordbatch",
|
||||
"common-telemetry",
|
||||
"common-test-util",
|
||||
"common-version",
|
||||
"config",
|
||||
"datanode",
|
||||
"datatypes",
|
||||
@@ -1756,6 +1756,25 @@ dependencies = [
|
||||
"syn 1.0.109",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "common-greptimedb-telemetry"
|
||||
version = "0.3.2"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"common-error",
|
||||
"common-runtime",
|
||||
"common-telemetry",
|
||||
"common-test-util",
|
||||
"common-version",
|
||||
"hyper",
|
||||
"once_cell",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "common-grpc"
|
||||
version = "0.3.2"
|
||||
@@ -1835,9 +1854,11 @@ dependencies = [
|
||||
"datatypes",
|
||||
"etcd-client",
|
||||
"futures",
|
||||
"hyper",
|
||||
"lazy_static",
|
||||
"prost",
|
||||
"regex",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"snafu",
|
||||
@@ -1990,6 +2011,13 @@ dependencies = [
|
||||
"snafu",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "common-version"
|
||||
version = "0.3.2"
|
||||
dependencies = [
|
||||
"build-data",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "concurrent-queue"
|
||||
version = "2.2.0"
|
||||
@@ -2636,6 +2664,7 @@ dependencies = [
|
||||
"common-datasource",
|
||||
"common-error",
|
||||
"common-function",
|
||||
"common-greptimedb-telemetry",
|
||||
"common-grpc",
|
||||
"common-grpc-expr",
|
||||
"common-meta",
|
||||
@@ -2684,6 +2713,7 @@ dependencies = [
|
||||
"tower",
|
||||
"tower-http",
|
||||
"url",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -3224,6 +3254,21 @@ version = "1.0.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
|
||||
|
||||
[[package]]
|
||||
name = "foreign-types"
|
||||
version = "0.3.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1"
|
||||
dependencies = [
|
||||
"foreign-types-shared",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "foreign-types-shared"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b"
|
||||
|
||||
[[package]]
|
||||
name = "form_urlencoded"
|
||||
version = "1.2.0"
|
||||
@@ -4446,6 +4491,19 @@ dependencies = [
|
||||
"tokio-io-timeout",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hyper-tls"
|
||||
version = "0.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"hyper",
|
||||
"native-tls",
|
||||
"tokio",
|
||||
"tokio-native-tls",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "iana-time-zone"
|
||||
version = "0.1.56"
|
||||
@@ -5285,6 +5343,7 @@ dependencies = [
|
||||
"common-base",
|
||||
"common-catalog",
|
||||
"common-error",
|
||||
"common-greptimedb-telemetry",
|
||||
"common-grpc",
|
||||
"common-grpc-expr",
|
||||
"common-meta",
|
||||
@@ -5322,6 +5381,7 @@ dependencies = [
|
||||
"tracing-subscriber",
|
||||
"typetag",
|
||||
"url",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -5737,6 +5797,24 @@ dependencies = [
|
||||
"syn 1.0.109",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "native-tls"
|
||||
version = "0.2.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e"
|
||||
dependencies = [
|
||||
"lazy_static",
|
||||
"libc",
|
||||
"log",
|
||||
"openssl",
|
||||
"openssl-probe",
|
||||
"openssl-sys",
|
||||
"schannel",
|
||||
"security-framework",
|
||||
"security-framework-sys",
|
||||
"tempfile",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "new_debug_unreachable"
|
||||
version = "1.0.4"
|
||||
@@ -6096,12 +6174,50 @@ dependencies = [
|
||||
"tokio-rustls 0.24.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "openssl"
|
||||
version = "0.10.54"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "69b3f656a17a6cbc115b5c7a40c616947d213ba182135b014d6051b73ab6f019"
|
||||
dependencies = [
|
||||
"bitflags 1.3.2",
|
||||
"cfg-if 1.0.0",
|
||||
"foreign-types",
|
||||
"libc",
|
||||
"once_cell",
|
||||
"openssl-macros",
|
||||
"openssl-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "openssl-macros"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.18",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "openssl-probe"
|
||||
version = "0.1.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
|
||||
|
||||
[[package]]
|
||||
name = "openssl-sys"
|
||||
version = "0.9.88"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c2ce0f250f34a308dcfdbb351f511359857d4ed2134ba715a4eadd46e1ffd617"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"libc",
|
||||
"pkg-config",
|
||||
"vcpkg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry"
|
||||
version = "0.17.0"
|
||||
@@ -7640,11 +7756,13 @@ dependencies = [
|
||||
"http-body",
|
||||
"hyper",
|
||||
"hyper-rustls",
|
||||
"hyper-tls",
|
||||
"ipnet",
|
||||
"js-sys",
|
||||
"log",
|
||||
"mime",
|
||||
"mime_guess",
|
||||
"native-tls",
|
||||
"once_cell",
|
||||
"percent-encoding",
|
||||
"pin-project-lite",
|
||||
@@ -7655,6 +7773,7 @@ dependencies = [
|
||||
"serde_json",
|
||||
"serde_urlencoded",
|
||||
"tokio",
|
||||
"tokio-native-tls",
|
||||
"tokio-rustls 0.24.0",
|
||||
"tokio-util",
|
||||
"tower-service",
|
||||
@@ -10130,6 +10249,16 @@ dependencies = [
|
||||
"syn 2.0.18",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-native-tls"
|
||||
version = "0.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2"
|
||||
dependencies = [
|
||||
"native-tls",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-postgres"
|
||||
version = "0.7.8"
|
||||
|
||||
@@ -13,6 +13,10 @@ path = "src/bin/greptime.rs"
|
||||
default = ["metrics-process"]
|
||||
tokio-console = ["common-telemetry/tokio-console"]
|
||||
metrics-process = ["servers/metrics-process"]
|
||||
greptimedb-telemetry = [
|
||||
"datanode/greptimedb-telemetry",
|
||||
"meta-srv/greptimedb-telemetry",
|
||||
]
|
||||
|
||||
[dependencies]
|
||||
anymap = "1.0.0-beta.2"
|
||||
@@ -61,4 +65,4 @@ serde.workspace = true
|
||||
toml.workspace = true
|
||||
|
||||
[build-dependencies]
|
||||
build-data = "0.1.4"
|
||||
common-version = { path = "../common/version" }
|
||||
|
||||
@@ -12,22 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
const DEFAULT_VALUE: &str = "unknown";
|
||||
fn main() {
|
||||
println!(
|
||||
"cargo:rustc-env=GIT_COMMIT={}",
|
||||
build_data::get_git_commit().unwrap_or_else(|_| DEFAULT_VALUE.to_string())
|
||||
);
|
||||
println!(
|
||||
"cargo:rustc-env=GIT_COMMIT_SHORT={}",
|
||||
build_data::get_git_commit_short().unwrap_or_else(|_| DEFAULT_VALUE.to_string())
|
||||
);
|
||||
println!(
|
||||
"cargo:rustc-env=GIT_BRANCH={}",
|
||||
build_data::get_git_branch().unwrap_or_else(|_| DEFAULT_VALUE.to_string())
|
||||
);
|
||||
println!(
|
||||
"cargo:rustc-env=GIT_DIRTY={}",
|
||||
build_data::get_git_dirty().map_or(DEFAULT_VALUE.to_string(), |v| v.to_string())
|
||||
);
|
||||
common_version::setup_git_versions();
|
||||
}
|
||||
|
||||
24
src/common/greptimedb-telemetry/Cargo.toml
Normal file
24
src/common/greptimedb-telemetry/Cargo.toml
Normal file
@@ -0,0 +1,24 @@
|
||||
[package]
|
||||
name = "common-greptimedb-telemetry"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
async-trait.workspace = true
|
||||
common-error = { path = "../error" }
|
||||
common-runtime = { path = "../runtime" }
|
||||
common-telemetry = { path = "../telemetry" }
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
tokio.workspace = true
|
||||
reqwest = { version = "0.11", features = ["json"] }
|
||||
uuid.workspace = true
|
||||
once_cell = "1.17.0"
|
||||
|
||||
[dev-dependencies]
|
||||
hyper = { version = "0.14", features = ["full"] }
|
||||
common-test-util = { path = "../test-util" }
|
||||
|
||||
[build-dependencies]
|
||||
common-version = { path = "../version" }
|
||||
17
src/common/greptimedb-telemetry/build.rs
Normal file
17
src/common/greptimedb-telemetry/build.rs
Normal file
@@ -0,0 +1,17 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
fn main() {
|
||||
common_version::setup_git_versions();
|
||||
}
|
||||
371
src/common/greptimedb-telemetry/src/lib.rs
Normal file
371
src/common/greptimedb-telemetry/src/lib.rs
Normal file
@@ -0,0 +1,371 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::env;
|
||||
use std::io::ErrorKind;
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
|
||||
use common_runtime::error::{Error, Result};
|
||||
use common_runtime::{BoxedTaskFunction, RepeatedTask, Runtime, TaskFunction};
|
||||
use common_telemetry::debug;
|
||||
use once_cell::sync::Lazy;
|
||||
use reqwest::{Client, Response};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub const TELEMETRY_URL: &str = "https://api-preview.greptime.cloud/db/otel/statistics";
|
||||
|
||||
// Getting the right path when running on windows
|
||||
static TELEMETRY_UUID_FILE_NAME: Lazy<PathBuf> = Lazy::new(|| {
|
||||
let mut path = PathBuf::new();
|
||||
path.push(env::temp_dir());
|
||||
path.push(".greptimedb-telemetry-uuid");
|
||||
path
|
||||
});
|
||||
|
||||
pub static TELEMETRY_INTERVAL: Duration = Duration::from_secs(60 * 30);
|
||||
|
||||
const GREPTIMEDB_TELEMETRY_CLIENT_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
const GREPTIMEDB_TELEMETRY_CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
|
||||
pub enum GreptimeDBTelemetryTask {
|
||||
Enable(RepeatedTask<Error>),
|
||||
Disable,
|
||||
}
|
||||
|
||||
impl GreptimeDBTelemetryTask {
|
||||
pub fn enable(interval: Duration, task_fn: BoxedTaskFunction<Error>) -> Self {
|
||||
GreptimeDBTelemetryTask::Enable(RepeatedTask::new(interval, task_fn))
|
||||
}
|
||||
|
||||
pub fn disable() -> Self {
|
||||
GreptimeDBTelemetryTask::Disable
|
||||
}
|
||||
|
||||
pub fn start(&self, runtime: Runtime) -> Result<()> {
|
||||
match self {
|
||||
GreptimeDBTelemetryTask::Enable(task) => task.start(runtime),
|
||||
GreptimeDBTelemetryTask::Disable => Ok(()),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn stop(&self) -> Result<()> {
|
||||
match self {
|
||||
GreptimeDBTelemetryTask::Enable(task) => task.stop().await,
|
||||
GreptimeDBTelemetryTask::Disable => Ok(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
struct StatisticData {
|
||||
pub os: String,
|
||||
pub version: String,
|
||||
pub arch: String,
|
||||
pub mode: Mode,
|
||||
pub git_commit: String,
|
||||
pub nodes: Option<i32>,
|
||||
pub uuid: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum Mode {
|
||||
Distributed,
|
||||
Standalone,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait Collector {
|
||||
fn get_version(&self) -> String {
|
||||
env!("CARGO_PKG_VERSION").to_string()
|
||||
}
|
||||
|
||||
fn get_git_hash(&self) -> String {
|
||||
env!("GIT_COMMIT").to_string()
|
||||
}
|
||||
|
||||
fn get_os(&self) -> String {
|
||||
env::consts::OS.to_string()
|
||||
}
|
||||
|
||||
fn get_arch(&self) -> String {
|
||||
env::consts::ARCH.to_string()
|
||||
}
|
||||
|
||||
fn get_mode(&self) -> Mode;
|
||||
|
||||
fn get_retry(&self) -> i32;
|
||||
|
||||
fn inc_retry(&mut self);
|
||||
|
||||
fn set_uuid_cache(&mut self, uuid: String);
|
||||
|
||||
fn get_uuid_cache(&self) -> Option<String>;
|
||||
|
||||
async fn get_nodes(&self) -> Option<i32>;
|
||||
|
||||
fn get_uuid(&mut self) -> Option<String> {
|
||||
match self.get_uuid_cache() {
|
||||
Some(uuid) => Some(uuid),
|
||||
None => {
|
||||
if self.get_retry() > 3 {
|
||||
return None;
|
||||
}
|
||||
match default_get_uuid() {
|
||||
Some(uuid) => {
|
||||
self.set_uuid_cache(uuid.clone());
|
||||
Some(uuid)
|
||||
}
|
||||
None => {
|
||||
self.inc_retry();
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn default_get_uuid() -> Option<String> {
|
||||
let path = (*TELEMETRY_UUID_FILE_NAME).as_path();
|
||||
match std::fs::read(path) {
|
||||
Ok(bytes) => Some(String::from_utf8_lossy(&bytes).to_string()),
|
||||
Err(e) => {
|
||||
if e.kind() == ErrorKind::NotFound {
|
||||
let uuid = uuid::Uuid::new_v4().to_string();
|
||||
let _ = std::fs::write(path, uuid.as_bytes());
|
||||
Some(uuid)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Report version info to GreptimeDB.
|
||||
/// We do not collect any identity-sensitive information.
|
||||
/// This task is scheduled to run every 30 minutes.
|
||||
/// The task will be disabled default. It can be enabled by setting the build feature `greptimedb-telemetry`
|
||||
/// Collector is used to collect the version info. It can be implemented by different components.
|
||||
/// client is used to send the HTTP request to GreptimeDB.
|
||||
/// telemetry_url is the GreptimeDB url.
|
||||
pub struct GreptimeDBTelemetry {
|
||||
statistics: Box<dyn Collector + Send + Sync>,
|
||||
client: Option<Client>,
|
||||
telemetry_url: &'static str,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl TaskFunction<Error> for GreptimeDBTelemetry {
|
||||
fn name(&self) -> &str {
|
||||
"Greptimedb-telemetry-task"
|
||||
}
|
||||
|
||||
async fn call(&mut self) -> Result<()> {
|
||||
self.report_telemetry_info().await;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl GreptimeDBTelemetry {
|
||||
pub fn new(statistics: Box<dyn Collector + Send + Sync>) -> Self {
|
||||
let client = Client::builder()
|
||||
.connect_timeout(GREPTIMEDB_TELEMETRY_CLIENT_CONNECT_TIMEOUT)
|
||||
.timeout(GREPTIMEDB_TELEMETRY_CLIENT_TIMEOUT)
|
||||
.build();
|
||||
Self {
|
||||
statistics,
|
||||
client: client.ok(),
|
||||
telemetry_url: TELEMETRY_URL,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn report_telemetry_info(&mut self) -> Option<Response> {
|
||||
match self.statistics.get_uuid() {
|
||||
Some(uuid) => {
|
||||
let data = StatisticData {
|
||||
os: self.statistics.get_os(),
|
||||
version: self.statistics.get_version(),
|
||||
git_commit: self.statistics.get_git_hash(),
|
||||
arch: self.statistics.get_arch(),
|
||||
mode: self.statistics.get_mode(),
|
||||
nodes: self.statistics.get_nodes().await,
|
||||
uuid,
|
||||
};
|
||||
|
||||
if let Some(client) = self.client.as_ref() {
|
||||
debug!("report version: {:?}", data);
|
||||
let result = client.post(self.telemetry_url).json(&data).send().await;
|
||||
debug!("report version result: {:?}", result);
|
||||
result.ok()
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
None => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::convert::Infallible;
|
||||
use std::env;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::time::Duration;
|
||||
|
||||
use common_test_util::ports;
|
||||
use hyper::service::{make_service_fn, service_fn};
|
||||
use hyper::Server;
|
||||
use reqwest::Client;
|
||||
use tokio::spawn;
|
||||
|
||||
use crate::{Collector, GreptimeDBTelemetry, Mode, StatisticData};
|
||||
|
||||
static COUNT: AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
|
||||
|
||||
async fn echo(req: hyper::Request<hyper::Body>) -> hyper::Result<hyper::Response<hyper::Body>> {
|
||||
let path = req.uri().path();
|
||||
if path == "/req-cnt" {
|
||||
let body = hyper::Body::from(format!(
|
||||
"{}",
|
||||
COUNT.load(std::sync::atomic::Ordering::SeqCst)
|
||||
));
|
||||
Ok(hyper::Response::new(body))
|
||||
} else {
|
||||
COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
|
||||
Ok(hyper::Response::new(req.into_body()))
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_gretimedb_telemetry() {
|
||||
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
|
||||
let port: u16 = ports::get_port() as u16;
|
||||
spawn(async move {
|
||||
let make_svc = make_service_fn(|_conn| {
|
||||
// This is the `Service` that will handle the connection.
|
||||
// `service_fn` is a helper to convert a function that
|
||||
// returns a Response into a `Service`.
|
||||
async { Ok::<_, Infallible>(service_fn(echo)) }
|
||||
});
|
||||
let addr = ([127, 0, 0, 1], port).into();
|
||||
|
||||
let server = Server::bind(&addr).serve(make_svc);
|
||||
let graceful = server.with_graceful_shutdown(async {
|
||||
rx.await.ok();
|
||||
});
|
||||
let _ = graceful.await;
|
||||
Ok::<_, Infallible>(())
|
||||
});
|
||||
struct TestStatistic;
|
||||
|
||||
struct FailedStatistic;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Collector for TestStatistic {
|
||||
fn get_mode(&self) -> Mode {
|
||||
Mode::Standalone
|
||||
}
|
||||
|
||||
async fn get_nodes(&self) -> Option<i32> {
|
||||
Some(1)
|
||||
}
|
||||
|
||||
fn get_retry(&self) -> i32 {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn inc_retry(&mut self) {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn set_uuid_cache(&mut self, _: String) {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn get_uuid_cache(&self) -> Option<String> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn get_uuid(&mut self) -> Option<String> {
|
||||
Some("test".to_string())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Collector for FailedStatistic {
|
||||
fn get_mode(&self) -> Mode {
|
||||
Mode::Standalone
|
||||
}
|
||||
|
||||
async fn get_nodes(&self) -> Option<i32> {
|
||||
None
|
||||
}
|
||||
|
||||
fn get_retry(&self) -> i32 {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn inc_retry(&mut self) {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn set_uuid_cache(&mut self, _: String) {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn get_uuid_cache(&self) -> Option<String> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn get_uuid(&mut self) -> Option<String> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
let test_statistic = Box::new(TestStatistic);
|
||||
let mut test_report = GreptimeDBTelemetry::new(test_statistic);
|
||||
let url = Box::leak(format!("{}:{}", "http://localhost", port).into_boxed_str());
|
||||
test_report.telemetry_url = url;
|
||||
let response = test_report.report_telemetry_info().await.unwrap();
|
||||
|
||||
let body = response.json::<StatisticData>().await.unwrap();
|
||||
assert_eq!(env::consts::ARCH, body.arch);
|
||||
assert_eq!(env::consts::OS, body.os);
|
||||
assert_eq!(env!("CARGO_PKG_VERSION"), body.version);
|
||||
assert_eq!(env!("GIT_COMMIT"), body.git_commit);
|
||||
assert_eq!(Mode::Standalone, body.mode);
|
||||
assert_eq!(1, body.nodes.unwrap());
|
||||
|
||||
let failed_statistic = Box::new(FailedStatistic);
|
||||
let mut failed_report = GreptimeDBTelemetry::new(failed_statistic);
|
||||
failed_report.telemetry_url = url;
|
||||
let response = failed_report.report_telemetry_info().await;
|
||||
assert!(response.is_none());
|
||||
|
||||
let client = Client::builder()
|
||||
.connect_timeout(Duration::from_secs(3))
|
||||
.timeout(Duration::from_secs(3))
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let cnt_url = format!("{}/req-cnt", url);
|
||||
let response = client.get(cnt_url).send().await.unwrap();
|
||||
let body = response.text().await.unwrap();
|
||||
assert_eq!("1", body);
|
||||
tx.send(()).unwrap();
|
||||
}
|
||||
}
|
||||
@@ -24,7 +24,9 @@ snafu.workspace = true
|
||||
store-api = { path = "../../store-api" }
|
||||
table = { path = "../../table" }
|
||||
tokio.workspace = true
|
||||
reqwest = { version = "0.11", features = ["json"] }
|
||||
|
||||
[dev-dependencies]
|
||||
chrono.workspace = true
|
||||
datatypes = { path = "../../datatypes" }
|
||||
hyper = { version = "0.14", features = ["full"] }
|
||||
|
||||
8
src/common/version/Cargo.toml
Normal file
8
src/common/version/Cargo.toml
Normal file
@@ -0,0 +1,8 @@
|
||||
[package]
|
||||
name = "common-version"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
build-data = "0.1.4"
|
||||
35
src/common/version/src/lib.rs
Normal file
35
src/common/version/src/lib.rs
Normal file
@@ -0,0 +1,35 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
const DEFAULT_VALUE: &str = "unknown";
|
||||
|
||||
#[allow(clippy::print_stdout)]
|
||||
pub fn setup_git_versions() {
|
||||
println!(
|
||||
"cargo:rustc-env=GIT_COMMIT={}",
|
||||
build_data::get_git_commit().unwrap_or_else(|_| DEFAULT_VALUE.to_string())
|
||||
);
|
||||
println!(
|
||||
"cargo:rustc-env=GIT_COMMIT_SHORT={}",
|
||||
build_data::get_git_commit_short().unwrap_or_else(|_| DEFAULT_VALUE.to_string())
|
||||
);
|
||||
println!(
|
||||
"cargo:rustc-env=GIT_BRANCH={}",
|
||||
build_data::get_git_branch().unwrap_or_else(|_| DEFAULT_VALUE.to_string())
|
||||
);
|
||||
println!(
|
||||
"cargo:rustc-env=GIT_DIRTY={}",
|
||||
build_data::get_git_dirty().map_or(DEFAULT_VALUE.to_string(), |v| v.to_string())
|
||||
);
|
||||
}
|
||||
@@ -4,6 +4,9 @@ version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[features]
|
||||
greptimedb-telemetry = []
|
||||
|
||||
[dependencies]
|
||||
async-compat = "0.2"
|
||||
async-stream.workspace = true
|
||||
@@ -27,6 +30,7 @@ common-recordbatch = { path = "../common/recordbatch" }
|
||||
common-runtime = { path = "../common/runtime" }
|
||||
common-telemetry = { path = "../common/telemetry" }
|
||||
common-time = { path = "../common/time" }
|
||||
common-greptimedb-telemetry = { path = "../common/greptimedb-telemetry" }
|
||||
datafusion.workspace = true
|
||||
datafusion-common.workspace = true
|
||||
datafusion-expr.workspace = true
|
||||
@@ -65,6 +69,7 @@ tonic.workspace = true
|
||||
tower = { version = "0.4", features = ["full"] }
|
||||
tower-http = { version = "0.3", features = ["full"] }
|
||||
url = "2.3.1"
|
||||
uuid.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
axum-test-helper = { git = "https://github.com/sunng87/axum-test-helper.git", branch = "patch-1" }
|
||||
|
||||
83
src/datanode/src/greptimedb_telemetry.rs
Normal file
83
src/datanode/src/greptimedb_telemetry.rs
Normal file
@@ -0,0 +1,83 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#[cfg(feature = "greptimedb-telemetry")]
|
||||
pub mod telemetry {
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_greptimedb_telemetry::{
|
||||
default_get_uuid, Collector, GreptimeDBTelemetry, GreptimeDBTelemetryTask,
|
||||
Mode as VersionReporterMode, TELEMETRY_INTERVAL,
|
||||
};
|
||||
use servers::Mode;
|
||||
|
||||
struct StandaloneGreptimeDBTelemetryCollector {
|
||||
uuid: Option<String>,
|
||||
retry: i32,
|
||||
}
|
||||
#[async_trait]
|
||||
impl Collector for StandaloneGreptimeDBTelemetryCollector {
|
||||
fn get_mode(&self) -> VersionReporterMode {
|
||||
VersionReporterMode::Standalone
|
||||
}
|
||||
|
||||
async fn get_nodes(&self) -> Option<i32> {
|
||||
Some(1)
|
||||
}
|
||||
|
||||
fn get_retry(&self) -> i32 {
|
||||
self.retry
|
||||
}
|
||||
|
||||
fn inc_retry(&mut self) {
|
||||
self.retry += 1;
|
||||
}
|
||||
|
||||
fn set_uuid_cache(&mut self, uuid: String) {
|
||||
self.uuid = Some(uuid);
|
||||
}
|
||||
|
||||
fn get_uuid_cache(&self) -> Option<String> {
|
||||
self.uuid.clone()
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_greptimedb_telemetry_task(mode: &Mode) -> Arc<GreptimeDBTelemetryTask> {
|
||||
match mode {
|
||||
Mode::Standalone => Arc::new(GreptimeDBTelemetryTask::enable(
|
||||
TELEMETRY_INTERVAL,
|
||||
Box::new(GreptimeDBTelemetry::new(Box::new(
|
||||
StandaloneGreptimeDBTelemetryCollector {
|
||||
uuid: default_get_uuid(),
|
||||
retry: 0,
|
||||
},
|
||||
))),
|
||||
)),
|
||||
Mode::Distributed => Arc::new(GreptimeDBTelemetryTask::disable()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "greptimedb-telemetry"))]
|
||||
pub mod telemetry {
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
|
||||
use servers::Mode;
|
||||
|
||||
pub async fn get_greptimedb_telemetry_task(_: &Mode) -> Arc<GreptimeDBTelemetryTask> {
|
||||
Arc::new(GreptimeDBTelemetryTask::disable())
|
||||
}
|
||||
}
|
||||
@@ -25,6 +25,7 @@ use common_base::paths::{CLUSTER_DIR, WAL_DIR};
|
||||
use common_base::Plugins;
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
|
||||
use common_error::ext::BoxedError;
|
||||
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
|
||||
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
|
||||
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
|
||||
use common_meta::heartbeat::handler::HandlerGroupExecutor;
|
||||
@@ -32,7 +33,7 @@ use common_meta::key::TableMetadataManager;
|
||||
use common_procedure::local::{LocalManager, ManagerConfig};
|
||||
use common_procedure::store::state_store::ObjectStateStore;
|
||||
use common_procedure::ProcedureManagerRef;
|
||||
use common_telemetry::logging::info;
|
||||
use common_telemetry::logging::{debug, info};
|
||||
use file_table_engine::engine::immutable::ImmutableFileTableEngine;
|
||||
use log_store::raft_engine::log_store::RaftEngineLogStore;
|
||||
use log_store::LogConfig;
|
||||
@@ -68,6 +69,7 @@ use crate::heartbeat::handler::open_region::OpenRegionHandler;
|
||||
use crate::heartbeat::HeartbeatTask;
|
||||
use crate::sql::{SqlHandler, SqlRequest};
|
||||
use crate::store;
|
||||
use crate::telemetry::get_greptimedb_telemetry_task;
|
||||
|
||||
mod grpc;
|
||||
pub mod sql;
|
||||
@@ -81,6 +83,7 @@ pub struct Instance {
|
||||
pub(crate) catalog_manager: CatalogManagerRef,
|
||||
pub(crate) table_id_provider: Option<TableIdProviderRef>,
|
||||
procedure_manager: ProcedureManagerRef,
|
||||
greptimedb_telemerty_task: Arc<GreptimeDBTelemetryTask>,
|
||||
}
|
||||
|
||||
pub type InstanceRef = Arc<Instance>;
|
||||
@@ -302,6 +305,7 @@ impl Instance {
|
||||
catalog_manager: catalog_manager.clone(),
|
||||
table_id_provider,
|
||||
procedure_manager,
|
||||
greptimedb_telemerty_task: get_greptimedb_telemetry_task(&opts.mode).await,
|
||||
});
|
||||
|
||||
let heartbeat_task = Instance::build_heartbeat_task(
|
||||
@@ -330,6 +334,13 @@ impl Instance {
|
||||
self.procedure_manager
|
||||
.start()
|
||||
.context(StartProcedureManagerSnafu)?;
|
||||
let _ = self
|
||||
.greptimedb_telemerty_task
|
||||
.start(common_runtime::bg_runtime())
|
||||
.map_err(|e| {
|
||||
debug!("Failed to start greptimedb telemetry task: {}", e);
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
|
||||
pub mod datanode;
|
||||
pub mod error;
|
||||
mod greptimedb_telemetry;
|
||||
pub mod heartbeat;
|
||||
pub mod instance;
|
||||
pub mod metrics;
|
||||
@@ -26,3 +27,5 @@ pub mod sql;
|
||||
mod store;
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
use greptimedb_telemetry::telemetry;
|
||||
|
||||
@@ -6,6 +6,7 @@ license.workspace = true
|
||||
|
||||
[features]
|
||||
mock = []
|
||||
greptimedb-telemetry = []
|
||||
|
||||
[dependencies]
|
||||
anymap = "1.0.0-beta.2"
|
||||
@@ -24,6 +25,7 @@ common-procedure = { path = "../common/procedure" }
|
||||
common-runtime = { path = "../common/runtime" }
|
||||
common-telemetry = { path = "../common/telemetry" }
|
||||
common-time = { path = "../common/time" }
|
||||
common-greptimedb-telemetry = { path = "../common/greptimedb-telemetry" }
|
||||
dashmap = "5.4"
|
||||
datatypes = { path = "../datatypes" }
|
||||
derive_builder.workspace = true
|
||||
@@ -51,6 +53,7 @@ tower = "0.4"
|
||||
typetag = "0.2"
|
||||
url = "2.3"
|
||||
servers = { path = "../servers" }
|
||||
uuid.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
chrono.workspace = true
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -50,16 +50,26 @@ pub struct MetaPeerClient {
|
||||
}
|
||||
|
||||
impl MetaPeerClient {
|
||||
// Get all datanode stat kvs from leader meta.
|
||||
pub async fn get_all_dn_stat_kvs(&self) -> Result<HashMap<StatKey, StatValue>> {
|
||||
async fn get_dn_key_value(&self, keys_only: bool) -> Result<Vec<KeyValue>> {
|
||||
let key = format!("{DN_STAT_PREFIX}-").into_bytes();
|
||||
let range_end = util::get_prefix_end_key(&key);
|
||||
self.range(key, range_end, keys_only).await
|
||||
}
|
||||
|
||||
let kvs = self.range(key, range_end).await?;
|
||||
|
||||
// Get all datanode stat kvs from leader meta.
|
||||
pub async fn get_all_dn_stat_kvs(&self) -> Result<HashMap<StatKey, StatValue>> {
|
||||
let kvs = self.get_dn_key_value(false).await?;
|
||||
to_stat_kv_map(kvs)
|
||||
}
|
||||
|
||||
pub async fn get_node_cnt(&self) -> Result<i32> {
|
||||
let kvs = self.get_dn_key_value(true).await?;
|
||||
kvs.into_iter()
|
||||
.map(|kv| kv.key.try_into())
|
||||
.collect::<Result<HashSet<StatKey>>>()
|
||||
.map(|hash_set| hash_set.len() as i32)
|
||||
}
|
||||
|
||||
// Get datanode stat kvs from leader meta by input keys.
|
||||
pub async fn get_dn_stat_kvs(&self, keys: Vec<StatKey>) -> Result<HashMap<StatKey, StatValue>> {
|
||||
let stat_keys = keys.into_iter().map(|key| key.into()).collect();
|
||||
@@ -70,7 +80,12 @@ impl MetaPeerClient {
|
||||
}
|
||||
|
||||
// Range kv information from the leader's in_mem kv store
|
||||
pub async fn range(&self, key: Vec<u8>, range_end: Vec<u8>) -> Result<Vec<KeyValue>> {
|
||||
pub async fn range(
|
||||
&self,
|
||||
key: Vec<u8>,
|
||||
range_end: Vec<u8>,
|
||||
keys_only: bool,
|
||||
) -> Result<Vec<KeyValue>> {
|
||||
if self.is_leader() {
|
||||
let request = RangeRequest {
|
||||
key,
|
||||
@@ -85,7 +100,10 @@ impl MetaPeerClient {
|
||||
let retry_interval_ms = self.retry_interval_ms;
|
||||
|
||||
for _ in 0..max_retry_count {
|
||||
match self.remote_range(key.clone(), range_end.clone()).await {
|
||||
match self
|
||||
.remote_range(key.clone(), range_end.clone(), keys_only)
|
||||
.await
|
||||
{
|
||||
Ok(kvs) => return Ok(kvs),
|
||||
Err(e) => {
|
||||
if need_retry(&e) {
|
||||
@@ -105,7 +123,12 @@ impl MetaPeerClient {
|
||||
.fail()
|
||||
}
|
||||
|
||||
async fn remote_range(&self, key: Vec<u8>, range_end: Vec<u8>) -> Result<Vec<KeyValue>> {
|
||||
async fn remote_range(
|
||||
&self,
|
||||
key: Vec<u8>,
|
||||
range_end: Vec<u8>,
|
||||
keys_only: bool,
|
||||
) -> Result<Vec<KeyValue>> {
|
||||
// Safety: when self.is_leader() == false, election must not empty.
|
||||
let election = self.election.as_ref().unwrap();
|
||||
|
||||
@@ -119,6 +142,7 @@ impl MetaPeerClient {
|
||||
let request = tonic::Request::new(PbRangeRequest {
|
||||
key,
|
||||
range_end,
|
||||
keys_only,
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
|
||||
88
src/meta-srv/src/greptimedb_telemetry.rs
Normal file
88
src/meta-srv/src/greptimedb_telemetry.rs
Normal file
@@ -0,0 +1,88 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#[cfg(feature = "greptimedb-telemetry")]
|
||||
pub mod telemetry {
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_greptimedb_telemetry::{
|
||||
default_get_uuid, Collector, GreptimeDBTelemetry, GreptimeDBTelemetryTask,
|
||||
Mode as VersionReporterMode, TELEMETRY_INTERVAL,
|
||||
};
|
||||
|
||||
use crate::cluster::MetaPeerClientRef;
|
||||
|
||||
struct DistributedGreptimeDBTelemetryCollector {
|
||||
meta_peer_client: MetaPeerClientRef,
|
||||
uuid: Option<String>,
|
||||
retry: i32,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Collector for DistributedGreptimeDBTelemetryCollector {
|
||||
fn get_mode(&self) -> VersionReporterMode {
|
||||
VersionReporterMode::Distributed
|
||||
}
|
||||
|
||||
async fn get_nodes(&self) -> Option<i32> {
|
||||
self.meta_peer_client.get_node_cnt().await.ok()
|
||||
}
|
||||
|
||||
fn get_retry(&self) -> i32 {
|
||||
self.retry
|
||||
}
|
||||
|
||||
fn inc_retry(&mut self) {
|
||||
self.retry += 1;
|
||||
}
|
||||
|
||||
fn set_uuid_cache(&mut self, uuid: String) {
|
||||
self.uuid = Some(uuid);
|
||||
}
|
||||
|
||||
fn get_uuid_cache(&self) -> Option<String> {
|
||||
self.uuid.clone()
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_greptimedb_telemetry_task(
|
||||
meta_peer_client: MetaPeerClientRef,
|
||||
) -> Arc<GreptimeDBTelemetryTask> {
|
||||
Arc::new(GreptimeDBTelemetryTask::enable(
|
||||
TELEMETRY_INTERVAL,
|
||||
Box::new(GreptimeDBTelemetry::new(Box::new(
|
||||
DistributedGreptimeDBTelemetryCollector {
|
||||
meta_peer_client,
|
||||
uuid: default_get_uuid(),
|
||||
retry: 0,
|
||||
},
|
||||
))),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "greptimedb-telemetry"))]
|
||||
pub mod telemetry {
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
|
||||
|
||||
use crate::cluster::MetaPeerClientRef;
|
||||
pub async fn get_greptimedb_telemetry_task(
|
||||
_: MetaPeerClientRef,
|
||||
) -> Arc<GreptimeDBTelemetryTask> {
|
||||
Arc::new(GreptimeDBTelemetryTask::disable())
|
||||
}
|
||||
}
|
||||
@@ -44,7 +44,7 @@ where
|
||||
let key = get_lease_prefix(cluster_id);
|
||||
let range_end = util::get_prefix_end_key(&key);
|
||||
|
||||
let kvs = meta_peer_client.range(key, range_end).await?;
|
||||
let kvs = meta_peer_client.range(key, range_end, false).await?;
|
||||
let mut lease_kvs = HashMap::new();
|
||||
for kv in kvs {
|
||||
let lease_key: LeaseKey = kv.key.try_into()?;
|
||||
|
||||
@@ -42,3 +42,6 @@ pub use crate::error::Result;
|
||||
mod inactive_node_manager;
|
||||
#[cfg(test)]
|
||||
mod test_util;
|
||||
|
||||
mod greptimedb_telemetry;
|
||||
use greptimedb_telemetry::telemetry;
|
||||
|
||||
@@ -19,12 +19,13 @@ use std::sync::Arc;
|
||||
|
||||
use api::v1::meta::Peer;
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
|
||||
use common_grpc::channel_manager;
|
||||
use common_meta::key::TableMetadataManagerRef;
|
||||
use common_procedure::options::ProcedureConfig;
|
||||
use common_procedure::ProcedureManagerRef;
|
||||
use common_telemetry::logging::LoggingOptions;
|
||||
use common_telemetry::{error, info, warn};
|
||||
use common_telemetry::{debug, error, info, warn};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use servers::http::HttpOptions;
|
||||
use snafu::ResultExt;
|
||||
@@ -175,6 +176,7 @@ pub struct MetaSrv {
|
||||
mailbox: MailboxRef,
|
||||
ddl_manager: DdlManagerRef,
|
||||
table_metadata_manager: TableMetadataManagerRef,
|
||||
greptimedb_telemerty_task: Arc<GreptimeDBTelemetryTask>,
|
||||
}
|
||||
|
||||
impl MetaSrv {
|
||||
@@ -195,6 +197,7 @@ impl MetaSrv {
|
||||
let in_memory = self.in_memory.clone();
|
||||
let leader_cached_kv_store = self.leader_cached_kv_store.clone();
|
||||
let mut rx = election.subscribe_leader_change();
|
||||
let task_handler = self.greptimedb_telemerty_task.clone();
|
||||
let _handle = common_runtime::spawn_bg(async move {
|
||||
loop {
|
||||
match rx.recv().await {
|
||||
@@ -210,9 +213,18 @@ impl MetaSrv {
|
||||
if let Err(e) = procedure_manager.recover().await {
|
||||
error!("Failed to recover procedures, error: {e}");
|
||||
}
|
||||
let _ = task_handler.start(common_runtime::bg_runtime())
|
||||
.map_err(|e| {
|
||||
debug!("Failed to start greptimedb telemetry task, error: {e}");
|
||||
});
|
||||
}
|
||||
LeaderChangeMessage::StepDown(leader) => {
|
||||
error!("Leader :{:?} step down", leader);
|
||||
let _ = task_handler.stop().await.map_err(|e| {
|
||||
debug!(
|
||||
"Failed to stop greptimedb telemetry task, error: {e}"
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -46,6 +46,7 @@ use crate::service::mailbox::MailboxRef;
|
||||
use crate::service::store::cached_kv::{CheckLeader, LeaderCachedKvStore};
|
||||
use crate::service::store::kv::{KvBackendAdapter, KvStoreRef, ResettableKvStoreRef};
|
||||
use crate::service::store::memory::MemStore;
|
||||
use crate::telemetry::get_greptimedb_telemetry_task;
|
||||
|
||||
// TODO(fys): try use derive_builder macro
|
||||
pub struct MetaSrvBuilder {
|
||||
@@ -224,7 +225,7 @@ impl MetaSrvBuilder {
|
||||
in_memory,
|
||||
kv_store,
|
||||
leader_cached_kv_store,
|
||||
meta_peer_client,
|
||||
meta_peer_client: meta_peer_client.clone(),
|
||||
table_id_sequence,
|
||||
selector,
|
||||
handler_group,
|
||||
@@ -235,6 +236,7 @@ impl MetaSrvBuilder {
|
||||
mailbox,
|
||||
ddl_manager,
|
||||
table_metadata_manager,
|
||||
greptimedb_telemerty_task: get_greptimedb_telemetry_task(meta_peer_client).await,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user