feat!: enable telemetry by default (#2137)

* feat: remove greptimedb-telemetry feature

* feat: adds enable_telemetry option to metasrv and datanode

* refactor: move data_home from file config to storage config

* feat: store the installation uuid into datanode and metasrv working home

* fix: cargo toml fmt

* test: ignore region failver test when using local fle storage

* test: ignore telemetry reporter in test mode

* feat: print warning log when enabling telemetry

* chore: the telemetry doc link

* chore: remove enable_telemetry from datanode example config file

* refactor: rename GREPTIMEDB_TELEMETRY_CLIENT_REQUEST_TIMEOUT

* chore: rename print_warn_log to print_anonymous_usage_data_disclaimer
This commit is contained in:
dennis zhuang
2023-08-11 22:50:40 +08:00
committed by GitHub
parent 0b05c22be1
commit 6f40128058
27 changed files with 327 additions and 248 deletions

2
Cargo.lock generated
View File

@@ -1722,10 +1722,10 @@ dependencies = [
"common-test-util",
"common-version",
"hyper",
"once_cell",
"reqwest",
"serde",
"serde_json",
"tempfile",
"tokio",
"uuid",
]

View File

@@ -38,8 +38,9 @@ sync_write = false
# Storage options, see `standalone.example.toml`.
[storage]
type = "File"
# The working home directory.
data_home = "/tmp/greptimedb/"
type = "File"
# TTL for all tables. Disabled by default.
# global_ttl = "7d"

View File

@@ -1,3 +1,5 @@
# The working home directory.
data_home = "/tmp/metasrv/"
# The bind address of metasrv, "127.0.0.1:3002" by default.
bind_addr = "127.0.0.1:3002"
# The communication server address for frontend and datanode to connect to metasrv, "127.0.0.1:3002" by default for localhost.
@@ -13,6 +15,8 @@ datanode_lease_secs = 15
selector = "LeaseBased"
# Store data in memory, false by default.
use_memory_store = false
# Whether to enable greptimedb telemetry, true by default.
enable_telemetry = true
# Log options, see `standalone.example.toml`
# [logging]

View File

@@ -2,6 +2,8 @@
mode = "standalone"
# Whether to use in-memory catalog, `false` by default.
enable_memory_catalog = false
# Whether to enable greptimedb telemetry, true by default.
enable_telemetry = true
# HTTP server options.
[http_options]
@@ -96,10 +98,10 @@ sync_write = false
# Storage options.
[storage]
# The working home directory.
data_home = "/tmp/greptimedb/"
# Storage type.
type = "File"
# Data directory, "/tmp/greptimedb/data" by default.
data_home = "/tmp/greptimedb/"
# TTL for all tables. Disabled by default.
# global_ttl = "7d"

View File

@@ -13,10 +13,6 @@ path = "src/bin/greptime.rs"
default = ["metrics-process"]
tokio-console = ["common-telemetry/tokio-console"]
metrics-process = ["servers/metrics-process"]
greptimedb-telemetry = [
"datanode/greptimedb-telemetry",
"meta-srv/greptimedb-telemetry",
]
[dependencies]
anymap = "1.0.0-beta.2"

View File

@@ -16,7 +16,7 @@ use std::time::Duration;
use clap::Parser;
use common_telemetry::logging;
use datanode::datanode::{Datanode, DatanodeOptions, FileConfig, ObjectStoreConfig};
use datanode::datanode::{Datanode, DatanodeOptions};
use meta_client::MetaClientOptions;
use servers::Mode;
use snafu::ResultExt;
@@ -143,9 +143,7 @@ impl StartCommand {
}
if let Some(data_home) = &self.data_home {
opts.storage.store = ObjectStoreConfig::File(FileConfig {
data_home: data_home.clone(),
});
opts.storage.data_home = data_home.clone();
}
if let Some(wal_dir) = &self.wal_dir {
@@ -185,7 +183,9 @@ mod tests {
use common_base::readable_size::ReadableSize;
use common_test_util::temp_dir::create_named_temp_file;
use datanode::datanode::{CompactionConfig, ObjectStoreConfig, RegionManifestConfig};
use datanode::datanode::{
CompactionConfig, FileConfig, ObjectStoreConfig, RegionManifestConfig,
};
use servers::Mode;
use super::*;
@@ -270,16 +270,11 @@ mod tests {
assert_eq!(10000, ddl_timeout_millis);
assert_eq!(3000, timeout_millis);
assert!(tcp_nodelay);
match &options.storage.store {
ObjectStoreConfig::File(FileConfig { data_home, .. }) => {
assert_eq!("/tmp/greptimedb/", data_home)
}
ObjectStoreConfig::S3 { .. } => unreachable!(),
ObjectStoreConfig::Oss { .. } => unreachable!(),
ObjectStoreConfig::Azblob { .. } => unreachable!(),
ObjectStoreConfig::Gcs { .. } => unreachable!(),
};
assert_eq!("/tmp/greptimedb/", options.storage.data_home);
assert!(matches!(
&options.storage.store,
ObjectStoreConfig::File(FileConfig { .. })
));
assert_eq!(
CompactionConfig {

View File

@@ -83,6 +83,7 @@ impl SubCommand {
pub struct StandaloneOptions {
pub mode: Mode,
pub enable_memory_catalog: bool,
pub enable_telemetry: bool,
pub http_options: Option<HttpOptions>,
pub grpc_options: Option<GrpcOptions>,
pub mysql_options: Option<MysqlOptions>,
@@ -102,6 +103,7 @@ impl Default for StandaloneOptions {
Self {
mode: Mode::Standalone,
enable_memory_catalog: false,
enable_telemetry: true,
http_options: Some(HttpOptions::default()),
grpc_options: Some(GrpcOptions::default()),
mysql_options: Some(MysqlOptions::default()),
@@ -139,6 +141,7 @@ impl StandaloneOptions {
fn datanode_options(self) -> DatanodeOptions {
DatanodeOptions {
enable_memory_catalog: self.enable_memory_catalog,
enable_telemetry: self.enable_telemetry,
wal: self.wal,
storage: self.storage,
procedure: self.procedure,

View File

@@ -9,7 +9,6 @@ async-trait.workspace = true
common-error = { workspace = true }
common-runtime = { workspace = true }
common-telemetry = { workspace = true }
once_cell = "1.17.0"
reqwest = { version = "0.11", features = [
"json",
"rustls-tls",
@@ -22,6 +21,7 @@ uuid.workspace = true
[dev-dependencies]
common-test-util = { workspace = true }
hyper = { version = "0.14", features = ["full"] }
tempfile.workspace = true
[build-dependencies]
common-version = { workspace = true }

View File

@@ -14,30 +14,26 @@
use std::env;
use std::io::ErrorKind;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::time::Duration;
use common_runtime::error::{Error, Result};
use common_runtime::{BoxedTaskFunction, RepeatedTask, Runtime, TaskFunction};
use common_telemetry::{debug, info};
use once_cell::sync::Lazy;
use reqwest::{Client, Response};
use serde::{Deserialize, Serialize};
/// The URL to report telemetry data.
pub const TELEMETRY_URL: &str = "https://api.greptime.cloud/db/otel/statistics";
/// The local installation uuid cache file
const UUID_FILE_NAME: &str = ".greptimedb-telemetry-uuid";
// Getting the right path when running on windows
static TELEMETRY_UUID_FILE_NAME: Lazy<PathBuf> = Lazy::new(|| {
let mut path = PathBuf::new();
path.push(env::temp_dir());
path.push(".greptimedb-telemetry-uuid");
path
});
/// The default interval of reporting telemetry data to greptime cloud
pub static TELEMETRY_INTERVAL: Duration = Duration::from_secs(60 * 30);
/// The default connect timeout to greptime cloud.
const GREPTIMEDB_TELEMETRY_CLIENT_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
const GREPTIMEDB_TELEMETRY_CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
/// The default request timeout to greptime cloud.
const GREPTIMEDB_TELEMETRY_CLIENT_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
pub enum GreptimeDBTelemetryTask {
Enable(RepeatedTask<Error>),
@@ -54,6 +50,8 @@ impl GreptimeDBTelemetryTask {
}
pub fn start(&self, runtime: Runtime) -> Result<()> {
print_anonymous_usage_data_disclaimer();
match self {
GreptimeDBTelemetryTask::Enable(task) => task.start(runtime),
GreptimeDBTelemetryTask::Disable => Ok(()),
@@ -68,14 +66,22 @@ impl GreptimeDBTelemetryTask {
}
}
/// Telemetry data to report
#[derive(Serialize, Deserialize, Debug)]
struct StatisticData {
/// Operating system name, such as `linux`, `windows` etc.
pub os: String,
/// The greptimedb version
pub version: String,
/// The architecture of the CPU, such as `x86`, `x86_64` etc.
pub arch: String,
/// The running mode, `standalone` or `distributed`.
pub mode: Mode,
/// The git commit revision of greptimedb
pub git_commit: String,
/// The node number
pub nodes: Option<i32>,
/// The local installation uuid
pub uuid: String,
}
@@ -116,14 +122,14 @@ pub trait Collector {
async fn get_nodes(&self) -> Option<i32>;
fn get_uuid(&mut self) -> Option<String> {
fn get_uuid(&mut self, working_home: &Option<String>) -> Option<String> {
match self.get_uuid_cache() {
Some(uuid) => Some(uuid),
None => {
if self.get_retry() > 3 {
return None;
}
match default_get_uuid() {
match default_get_uuid(working_home) {
Some(uuid) => {
self.set_uuid_cache(uuid.clone());
Some(uuid)
@@ -138,8 +144,26 @@ pub trait Collector {
}
}
pub fn default_get_uuid() -> Option<String> {
let path = (*TELEMETRY_UUID_FILE_NAME).as_path();
fn print_anonymous_usage_data_disclaimer() {
info!("Attention: GreptimeDB now collects anonymous usage data to help improve its roadmap and prioritize features.");
info!(
"To learn more about this anonymous program and how to deactivate it if you don't want to participate, please visit the following URL: ");
info!("https://docs.greptime.com/reference/telemetry");
}
pub fn default_get_uuid(working_home: &Option<String>) -> Option<String> {
let temp_dir = env::temp_dir();
let mut path = PathBuf::new();
path.push(
working_home
.as_ref()
.map(Path::new)
.unwrap_or_else(|| temp_dir.as_path()),
);
path.push(UUID_FILE_NAME);
let path = path.as_path();
match std::fs::read(path) {
Ok(bytes) => Some(String::from_utf8_lossy(&bytes).to_string()),
Err(e) => {
@@ -164,6 +188,7 @@ pub fn default_get_uuid() -> Option<String> {
pub struct GreptimeDBTelemetry {
statistics: Box<dyn Collector + Send + Sync>,
client: Option<Client>,
working_home: Option<String>,
telemetry_url: &'static str,
}
@@ -180,12 +205,13 @@ impl TaskFunction<Error> for GreptimeDBTelemetry {
}
impl GreptimeDBTelemetry {
pub fn new(statistics: Box<dyn Collector + Send + Sync>) -> Self {
pub fn new(working_home: Option<String>, statistics: Box<dyn Collector + Send + Sync>) -> Self {
let client = Client::builder()
.connect_timeout(GREPTIMEDB_TELEMETRY_CLIENT_CONNECT_TIMEOUT)
.timeout(GREPTIMEDB_TELEMETRY_CLIENT_TIMEOUT)
.timeout(GREPTIMEDB_TELEMETRY_CLIENT_REQUEST_TIMEOUT)
.build();
Self {
working_home,
statistics,
client: client.ok(),
telemetry_url: TELEMETRY_URL,
@@ -193,7 +219,7 @@ impl GreptimeDBTelemetry {
}
pub async fn report_telemetry_info(&mut self) -> Option<Response> {
match self.statistics.get_uuid() {
match self.statistics.get_uuid(&self.working_home) {
Some(uuid) => {
let data = StatisticData {
os: self.statistics.get_os(),
@@ -232,7 +258,7 @@ mod tests {
use reqwest::Client;
use tokio::spawn;
use crate::{Collector, GreptimeDBTelemetry, Mode, StatisticData};
use crate::{default_get_uuid, Collector, GreptimeDBTelemetry, Mode, StatisticData};
static COUNT: AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
@@ -300,7 +326,7 @@ mod tests {
unimplemented!()
}
fn get_uuid(&mut self) -> Option<String> {
fn get_uuid(&mut self, _working_home: &Option<String>) -> Option<String> {
Some("test".to_string())
}
}
@@ -331,13 +357,19 @@ mod tests {
unimplemented!()
}
fn get_uuid(&mut self) -> Option<String> {
fn get_uuid(&mut self, _working_home: &Option<String>) -> Option<String> {
None
}
}
let working_home_temp = tempfile::Builder::new()
.prefix("greptimedb_telemetry")
.tempdir()
.unwrap();
let working_home = working_home_temp.path().to_str().unwrap().to_string();
let test_statistic = Box::new(TestStatistic);
let mut test_report = GreptimeDBTelemetry::new(test_statistic);
let mut test_report = GreptimeDBTelemetry::new(Some(working_home.clone()), test_statistic);
let url = Box::leak(format!("{}:{}", "http://localhost", port).into_boxed_str());
test_report.telemetry_url = url;
let response = test_report.report_telemetry_info().await.unwrap();
@@ -351,7 +383,7 @@ mod tests {
assert_eq!(1, body.nodes.unwrap());
let failed_statistic = Box::new(FailedStatistic);
let mut failed_report = GreptimeDBTelemetry::new(failed_statistic);
let mut failed_report = GreptimeDBTelemetry::new(Some(working_home), failed_statistic);
failed_report.telemetry_url = url;
let response = failed_report.report_telemetry_info().await;
assert!(response.is_none());
@@ -368,4 +400,18 @@ mod tests {
assert_eq!("1", body);
tx.send(()).unwrap();
}
#[test]
fn test_get_uuid() {
let working_home_temp = tempfile::Builder::new()
.prefix("greptimedb_telemetry")
.tempdir()
.unwrap();
let working_home = working_home_temp.path().to_str().unwrap().to_string();
let uuid = default_get_uuid(&Some(working_home.clone()));
assert!(uuid.is_some());
assert_eq!(uuid, default_get_uuid(&Some(working_home.clone())));
assert_eq!(uuid, default_get_uuid(&Some(working_home.clone())));
}
}

View File

@@ -5,7 +5,6 @@ edition.workspace = true
license.workspace = true
[features]
greptimedb-telemetry = []
testing = ["meta-srv/mock"]
[dependencies]

View File

@@ -58,7 +58,7 @@ pub enum ObjectStoreConfig {
}
/// Storage engine config
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct StorageConfig {
/// Retention period for all tables.
@@ -68,6 +68,8 @@ pub struct StorageConfig {
/// The precedence order is: ttl in table options > global ttl.
#[serde(with = "humantime_serde")]
pub global_ttl: Option<Duration>,
/// The working directory of database
pub data_home: String,
#[serde(flatten)]
pub store: ObjectStoreConfig,
pub compaction: CompactionConfig,
@@ -75,11 +77,22 @@ pub struct StorageConfig {
pub flush: FlushConfig,
}
impl Default for StorageConfig {
fn default() -> Self {
Self {
global_ttl: None,
data_home: DEFAULT_DATA_HOME.to_string(),
store: ObjectStoreConfig::default(),
compaction: CompactionConfig::default(),
manifest: RegionManifestConfig::default(),
flush: FlushConfig::default(),
}
}
}
#[derive(Debug, Clone, Serialize, Default, Deserialize)]
#[serde(default)]
pub struct FileConfig {
pub data_home: String,
}
pub struct FileConfig {}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
@@ -198,9 +211,7 @@ impl Default for GcsConfig {
impl Default for ObjectStoreConfig {
fn default() -> Self {
ObjectStoreConfig::File(FileConfig {
data_home: DEFAULT_DATA_HOME.to_string(),
})
ObjectStoreConfig::File(FileConfig {})
}
}
@@ -362,6 +373,7 @@ pub struct DatanodeOptions {
pub storage: StorageConfig,
pub procedure: ProcedureConfig,
pub logging: LoggingOptions,
pub enable_telemetry: bool,
}
impl Default for DatanodeOptions {
@@ -380,6 +392,7 @@ impl Default for DatanodeOptions {
procedure: ProcedureConfig::default(),
logging: LoggingOptions::default(),
heartbeat: HeartbeatOptions::default(),
enable_telemetry: true,
}
}
}

View File

@@ -12,72 +12,66 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#[cfg(feature = "greptimedb-telemetry")]
pub mod telemetry {
use std::sync::Arc;
use std::sync::Arc;
use async_trait::async_trait;
use common_greptimedb_telemetry::{
default_get_uuid, Collector, GreptimeDBTelemetry, GreptimeDBTelemetryTask,
Mode as VersionReporterMode, TELEMETRY_INTERVAL,
};
use servers::Mode;
use async_trait::async_trait;
use common_greptimedb_telemetry::{
default_get_uuid, Collector, GreptimeDBTelemetry, GreptimeDBTelemetryTask,
Mode as VersionReporterMode, TELEMETRY_INTERVAL,
};
use servers::Mode;
struct StandaloneGreptimeDBTelemetryCollector {
uuid: Option<String>,
retry: i32,
}
#[async_trait]
impl Collector for StandaloneGreptimeDBTelemetryCollector {
fn get_mode(&self) -> VersionReporterMode {
VersionReporterMode::Standalone
}
async fn get_nodes(&self) -> Option<i32> {
Some(1)
}
fn get_retry(&self) -> i32 {
self.retry
}
fn inc_retry(&mut self) {
self.retry += 1;
}
fn set_uuid_cache(&mut self, uuid: String) {
self.uuid = Some(uuid);
}
fn get_uuid_cache(&self) -> Option<String> {
self.uuid.clone()
}
struct StandaloneGreptimeDBTelemetryCollector {
uuid: Option<String>,
retry: i32,
}
#[async_trait]
impl Collector for StandaloneGreptimeDBTelemetryCollector {
fn get_mode(&self) -> VersionReporterMode {
VersionReporterMode::Standalone
}
pub async fn get_greptimedb_telemetry_task(mode: &Mode) -> Arc<GreptimeDBTelemetryTask> {
match mode {
Mode::Standalone => Arc::new(GreptimeDBTelemetryTask::enable(
TELEMETRY_INTERVAL,
Box::new(GreptimeDBTelemetry::new(Box::new(
StandaloneGreptimeDBTelemetryCollector {
uuid: default_get_uuid(),
retry: 0,
},
))),
async fn get_nodes(&self) -> Option<i32> {
Some(1)
}
fn get_retry(&self) -> i32 {
self.retry
}
fn inc_retry(&mut self) {
self.retry += 1;
}
fn set_uuid_cache(&mut self, uuid: String) {
self.uuid = Some(uuid);
}
fn get_uuid_cache(&self) -> Option<String> {
self.uuid.clone()
}
}
pub async fn get_greptimedb_telemetry_task(
working_home: Option<String>,
mode: &Mode,
enable: bool,
) -> Arc<GreptimeDBTelemetryTask> {
if !enable || cfg!(test) {
return Arc::new(GreptimeDBTelemetryTask::disable());
}
match mode {
Mode::Standalone => Arc::new(GreptimeDBTelemetryTask::enable(
TELEMETRY_INTERVAL,
Box::new(GreptimeDBTelemetry::new(
working_home.clone(),
Box::new(StandaloneGreptimeDBTelemetryCollector {
uuid: default_get_uuid(&working_home),
retry: 0,
}),
)),
Mode::Distributed => Arc::new(GreptimeDBTelemetryTask::disable()),
}
}
}
#[cfg(not(feature = "greptimedb-telemetry"))]
pub mod telemetry {
use std::sync::Arc;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use servers::Mode;
pub async fn get_greptimedb_telemetry_task(_: &Mode) -> Arc<GreptimeDBTelemetryTask> {
Arc::new(GreptimeDBTelemetryTask::disable())
)),
Mode::Distributed => Arc::new(GreptimeDBTelemetryTask::disable()),
}
}

View File

@@ -64,12 +64,12 @@ use crate::error::{
MissingNodeIdSnafu, NewCatalogSnafu, OpenLogStoreSnafu, RecoverProcedureSnafu, Result,
ShutdownInstanceSnafu, StartProcedureManagerSnafu, StopProcedureManagerSnafu,
};
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
use crate::heartbeat::handler::close_region::CloseRegionHandler;
use crate::heartbeat::handler::open_region::OpenRegionHandler;
use crate::heartbeat::HeartbeatTask;
use crate::sql::{SqlHandler, SqlRequest};
use crate::store;
use crate::telemetry::get_greptimedb_telemetry_task;
mod grpc;
pub mod sql;
@@ -164,8 +164,11 @@ impl Instance {
compaction_scheduler: CompactionSchedulerRef<RaftEngineLogStore>,
plugins: Arc<Plugins>,
) -> Result<(InstanceRef, Option<HeartbeatTask>)> {
let object_store = store::new_object_store(&opts.storage.store).await?;
let log_store = Arc::new(create_log_store(&opts.storage.store, &opts.wal).await?);
let data_home = util::normalize_dir(&opts.storage.data_home);
info!("The working home directory is: {}", data_home);
let object_store = store::new_object_store(&data_home, &opts.storage.store).await?;
let log_store =
Arc::new(create_log_store(&data_home, &opts.storage.store, &opts.wal).await?);
let mito_engine = Arc::new(DefaultEngine::new(
TableEngineConfig {
@@ -305,7 +308,12 @@ impl Instance {
catalog_manager: catalog_manager.clone(),
table_id_provider,
procedure_manager,
greptimedb_telemetry_task: get_greptimedb_telemetry_task(&opts.mode).await,
greptimedb_telemetry_task: get_greptimedb_telemetry_task(
Some(opts.storage.data_home.clone()),
&opts.mode,
opts.enable_telemetry,
)
.await,
});
let heartbeat_task = Instance::build_heartbeat_task(
@@ -444,13 +452,14 @@ async fn new_metasrv_client(node_id: u64, meta_config: &MetaClientOptions) -> Re
}
pub(crate) async fn create_log_store(
data_home: &str,
store_config: &ObjectStoreConfig,
wal_config: &WalConfig,
) -> Result<RaftEngineLogStore> {
let wal_dir = match (&wal_config.dir, store_config) {
(Some(dir), _) => dir.to_string(),
(None, ObjectStoreConfig::File(file_config)) => {
format!("{}{WAL_DIR}", util::normalize_dir(&file_config.data_home))
(None, ObjectStoreConfig::File(_file_config)) => {
format!("{}{WAL_DIR}", data_home)
}
_ => return error::MissingWalDirConfigSnafu {}.fail(),
};

View File

@@ -27,7 +27,5 @@ pub mod server;
pub mod sql;
mod store;
use greptimedb_telemetry::telemetry;
#[cfg(test)]
mod tests;

View File

@@ -33,9 +33,14 @@ use snafu::prelude::*;
use crate::datanode::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
use crate::error::{self, Result};
pub(crate) async fn new_object_store(store_config: &ObjectStoreConfig) -> Result<ObjectStore> {
pub(crate) async fn new_object_store(
data_home: &str,
store_config: &ObjectStoreConfig,
) -> Result<ObjectStore> {
let object_store = match store_config {
ObjectStoreConfig::File(file_config) => fs::new_fs_object_store(file_config).await,
ObjectStoreConfig::File(file_config) => {
fs::new_fs_object_store(data_home, file_config).await
}
ObjectStoreConfig::S3(s3_config) => s3::new_s3_object_store(s3_config).await,
ObjectStoreConfig::Oss(oss_config) => oss::new_oss_object_store(oss_config).await,
ObjectStoreConfig::Azblob(azblob_config) => {

View File

@@ -16,24 +16,26 @@ use std::{fs, path};
use common_telemetry::logging::info;
use object_store::services::Fs as FsBuilder;
use object_store::{util, ObjectStore};
use object_store::ObjectStore;
use snafu::prelude::*;
use crate::datanode::FileConfig;
use crate::error::{self, Result};
use crate::store;
pub(crate) async fn new_fs_object_store(file_config: &FileConfig) -> Result<ObjectStore> {
let data_home = util::normalize_dir(&file_config.data_home);
pub(crate) async fn new_fs_object_store(
data_home: &str,
_file_config: &FileConfig,
) -> Result<ObjectStore> {
fs::create_dir_all(path::Path::new(&data_home))
.context(error::CreateDirSnafu { dir: &data_home })?;
info!("The file storage home is: {}", &data_home);
.context(error::CreateDirSnafu { dir: data_home })?;
info!("The file storage home is: {}", data_home);
let atomic_write_dir = format!("{data_home}.tmp/");
store::clean_temp_dir(&atomic_write_dir)?;
let mut builder = FsBuilder::default();
let _ = builder.root(&data_home).atomic_write_dir(&atomic_write_dir);
let _ = builder.root(data_home).atomic_write_dir(&atomic_write_dir);
let object_store = ObjectStore::new(builder)
.context(error::InitBackendSnafu)?

View File

@@ -74,9 +74,8 @@ fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard)
..Default::default()
},
storage: StorageConfig {
store: ObjectStoreConfig::File(FileConfig {
data_home: data_tmp_dir.path().to_str().unwrap().to_string(),
}),
data_home: data_tmp_dir.path().to_str().unwrap().to_string(),
store: ObjectStoreConfig::File(FileConfig {}),
..Default::default()
},
mode: Mode::Standalone,

View File

@@ -6,7 +6,6 @@ license.workspace = true
[features]
mock = []
greptimedb-telemetry = []
[dependencies]
anymap = "1.0.0-beta.2"

View File

@@ -12,77 +12,67 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#[cfg(feature = "greptimedb-telemetry")]
pub mod telemetry {
use std::sync::Arc;
use std::sync::Arc;
use async_trait::async_trait;
use common_greptimedb_telemetry::{
default_get_uuid, Collector, GreptimeDBTelemetry, GreptimeDBTelemetryTask,
Mode as VersionReporterMode, TELEMETRY_INTERVAL,
};
use async_trait::async_trait;
use common_greptimedb_telemetry::{
default_get_uuid, Collector, GreptimeDBTelemetry, GreptimeDBTelemetryTask,
Mode as VersionReporterMode, TELEMETRY_INTERVAL,
};
use crate::cluster::MetaPeerClientRef;
use crate::cluster::MetaPeerClientRef;
struct DistributedGreptimeDBTelemetryCollector {
meta_peer_client: MetaPeerClientRef,
uuid: Option<String>,
retry: i32,
struct DistributedGreptimeDBTelemetryCollector {
meta_peer_client: MetaPeerClientRef,
uuid: Option<String>,
retry: i32,
}
#[async_trait]
impl Collector for DistributedGreptimeDBTelemetryCollector {
fn get_mode(&self) -> VersionReporterMode {
VersionReporterMode::Distributed
}
#[async_trait]
impl Collector for DistributedGreptimeDBTelemetryCollector {
fn get_mode(&self) -> VersionReporterMode {
VersionReporterMode::Distributed
}
async fn get_nodes(&self) -> Option<i32> {
self.meta_peer_client.get_node_cnt().await.ok()
}
fn get_retry(&self) -> i32 {
self.retry
}
fn inc_retry(&mut self) {
self.retry += 1;
}
fn set_uuid_cache(&mut self, uuid: String) {
self.uuid = Some(uuid);
}
fn get_uuid_cache(&self) -> Option<String> {
self.uuid.clone()
}
async fn get_nodes(&self) -> Option<i32> {
self.meta_peer_client.get_node_cnt().await.ok()
}
pub async fn get_greptimedb_telemetry_task(
meta_peer_client: MetaPeerClientRef,
) -> Arc<GreptimeDBTelemetryTask> {
Arc::new(GreptimeDBTelemetryTask::enable(
TELEMETRY_INTERVAL,
Box::new(GreptimeDBTelemetry::new(Box::new(
DistributedGreptimeDBTelemetryCollector {
meta_peer_client,
uuid: default_get_uuid(),
retry: 0,
},
))),
))
fn get_retry(&self) -> i32 {
self.retry
}
fn inc_retry(&mut self) {
self.retry += 1;
}
fn set_uuid_cache(&mut self, uuid: String) {
self.uuid = Some(uuid);
}
fn get_uuid_cache(&self) -> Option<String> {
self.uuid.clone()
}
}
#[cfg(not(feature = "greptimedb-telemetry"))]
pub mod telemetry {
use std::sync::Arc;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use crate::cluster::MetaPeerClientRef;
pub async fn get_greptimedb_telemetry_task(
_: MetaPeerClientRef,
) -> Arc<GreptimeDBTelemetryTask> {
Arc::new(GreptimeDBTelemetryTask::disable())
pub async fn get_greptimedb_telemetry_task(
working_home: Option<String>,
meta_peer_client: MetaPeerClientRef,
enable: bool,
) -> Arc<GreptimeDBTelemetryTask> {
if !enable || cfg!(test) {
return Arc::new(GreptimeDBTelemetryTask::disable());
}
Arc::new(GreptimeDBTelemetryTask::enable(
TELEMETRY_INTERVAL,
Box::new(GreptimeDBTelemetry::new(
working_home.clone(),
Box::new(DistributedGreptimeDBTelemetryCollector {
meta_peer_client,
uuid: default_get_uuid(&working_home),
retry: 0,
}),
)),
))
}

View File

@@ -42,7 +42,6 @@ pub use crate::error::Result;
mod inactive_node_manager;
mod greptimedb_telemetry;
use greptimedb_telemetry::telemetry;
#[cfg(test)]
mod test_util;

View File

@@ -44,6 +44,7 @@ use crate::sequence::SequenceRef;
use crate::service::mailbox::MailboxRef;
use crate::service::store::kv::{KvStoreRef, ResettableKvStoreRef};
pub const TABLE_ID_SEQ: &str = "table_id";
const METASRV_HOME: &str = "/tmp/metasrv";
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default)]
@@ -59,6 +60,8 @@ pub struct MetaSrvOptions {
pub logging: LoggingOptions,
pub procedure: ProcedureConfig,
pub datanode: DatanodeOptions,
pub enable_telemetry: bool,
pub data_home: String,
}
impl Default for MetaSrvOptions {
@@ -72,9 +75,14 @@ impl Default for MetaSrvOptions {
use_memory_store: false,
disable_region_failover: false,
http_opts: HttpOptions::default(),
logging: LoggingOptions::default(),
logging: LoggingOptions {
dir: format!("{METASRV_HOME}/logs"),
..Default::default()
},
procedure: ProcedureConfig::default(),
datanode: DatanodeOptions::default(),
enable_telemetry: true,
data_home: METASRV_HOME.to_string(),
}
}
}

View File

@@ -25,6 +25,7 @@ use common_procedure::ProcedureManagerRef;
use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
use crate::ddl::{DdlManager, DdlManagerRef};
use crate::error::Result;
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
use crate::handler::mailbox_handler::MailboxHandler;
use crate::handler::publish_heartbeat_handler::PublishHeartbeatHandler;
use crate::handler::region_lease_handler::RegionLeaseHandler;
@@ -48,7 +49,6 @@ use crate::service::mailbox::MailboxRef;
use crate::service::store::cached_kv::{CheckLeader, LeaderCachedKvStore};
use crate::service::store::kv::{KvBackendAdapter, KvStoreRef, ResettableKvStoreRef};
use crate::service::store::memory::MemStore;
use crate::telemetry::get_greptimedb_telemetry_task;
// TODO(fys): try use derive_builder macro
pub struct MetaSrvBuilder {
@@ -234,6 +234,9 @@ impl MetaSrvBuilder {
}
};
let enable_telemetry = options.enable_telemetry;
let metasrv_home = options.data_home.to_string();
Ok(MetaSrv {
started,
options,
@@ -251,7 +254,12 @@ impl MetaSrvBuilder {
mailbox,
ddl_manager,
table_metadata_manager,
greptimedb_telemetry_task: get_greptimedb_telemetry_task(meta_peer_client).await,
greptimedb_telemetry_task: get_greptimedb_telemetry_task(
Some(metasrv_home),
meta_peer_client,
enable_telemetry,
)
.await,
pubsub,
})
}

View File

@@ -44,12 +44,12 @@ use tonic::transport::Server;
use tower::service_fn;
use crate::test_util::{
create_datanode_opts, create_tmp_dir_and_datanode_opts, StorageGuard, StorageType, WalGuard,
create_datanode_opts, create_tmp_dir_and_datanode_opts, FileDirGuard, StorageGuard, StorageType,
};
pub struct GreptimeDbCluster {
pub storage_guards: Vec<StorageGuard>,
_wal_guards: Vec<WalGuard>,
pub _dir_guards: Vec<FileDirGuard>,
pub datanode_instances: HashMap<DatanodeId, Arc<DatanodeInstance>>,
pub datanode_heartbeat_tasks: HashMap<DatanodeId, Option<HeartbeatTask>>,
@@ -93,7 +93,7 @@ impl GreptimeDbClusterBuilder {
let meta_srv = self.build_metasrv(datanode_clients.clone()).await;
let (datanode_instances, heartbeat_tasks, storage_guards, wal_guards) =
let (datanode_instances, heartbeat_tasks, storage_guards, dir_guards) =
self.build_datanodes(meta_srv.clone(), datanodes).await;
build_datanode_clients(datanode_clients.clone(), &datanode_instances, datanodes).await;
@@ -109,7 +109,7 @@ impl GreptimeDbClusterBuilder {
GreptimeDbCluster {
storage_guards,
_wal_guards: wal_guards,
_dir_guards: dir_guards,
datanode_instances,
datanode_heartbeat_tasks: heartbeat_tasks,
kv_store: self.kv_store.clone(),
@@ -149,22 +149,26 @@ impl GreptimeDbClusterBuilder {
HashMap<DatanodeId, Arc<DatanodeInstance>>,
HashMap<DatanodeId, Option<HeartbeatTask>>,
Vec<StorageGuard>,
Vec<WalGuard>,
Vec<FileDirGuard>,
) {
let mut instances = HashMap::with_capacity(datanodes as usize);
let mut heartbeat_tasks = HashMap::with_capacity(datanodes as usize);
let mut storage_guards = Vec::with_capacity(datanodes as usize);
let mut wal_guards = Vec::with_capacity(datanodes as usize);
let mut dir_guards = Vec::with_capacity(datanodes as usize);
for i in 0..datanodes {
let datanode_id = i as u64 + 1;
let mut opts = if let Some(store_config) = &self.store_config {
let home_tmp_dir = create_temp_dir(&format!("gt_home_{}", &self.cluster_name));
let home_dir = home_tmp_dir.path().to_str().unwrap().to_string();
let wal_tmp_dir = create_temp_dir(&format!("gt_wal_{}", &self.cluster_name));
let wal_dir = wal_tmp_dir.path().to_str().unwrap().to_string();
wal_guards.push(WalGuard(wal_tmp_dir));
dir_guards.push(FileDirGuard::new(home_tmp_dir, false));
dir_guards.push(FileDirGuard::new(wal_tmp_dir, true));
create_datanode_opts(store_config.clone(), wal_dir)
create_datanode_opts(store_config.clone(), home_dir, wal_dir)
} else {
let (opts, guard) = create_tmp_dir_and_datanode_opts(
StorageType::File,
@@ -172,7 +176,8 @@ impl GreptimeDbClusterBuilder {
);
storage_guards.push(guard.storage_guard);
wal_guards.push(guard.wal_guard);
dir_guards.push(guard.home_guard);
dir_guards.push(guard.wal_guard);
opts
};
@@ -184,7 +189,7 @@ impl GreptimeDbClusterBuilder {
let _ = instances.insert(datanode_id, dn_instance.0.clone());
let _ = heartbeat_tasks.insert(datanode_id, dn_instance.1);
}
(instances, heartbeat_tasks, storage_guards, wal_guards)
(instances, heartbeat_tasks, storage_guards, dir_guards)
}
async fn wait_datanodes_alive(

View File

@@ -135,10 +135,7 @@ fn s3_test_config() -> S3Config {
}
}
pub fn get_test_store_config(
store_type: &StorageType,
name: &str,
) -> (ObjectStoreConfig, TempDirGuard) {
pub fn get_test_store_config(store_type: &StorageType) -> (ObjectStoreConfig, TempDirGuard) {
let _ = dotenv::dotenv();
match store_type {
@@ -243,21 +240,12 @@ pub fn get_test_store_config(
(config, TempDirGuard::S3(TempFolder::new(&store, "/")))
}
StorageType::File => {
let data_tmp_dir = create_temp_dir(&format!("gt_data_{name}"));
(
ObjectStoreConfig::File(FileConfig {
data_home: data_tmp_dir.path().to_str().unwrap().to_string(),
}),
TempDirGuard::File(data_tmp_dir),
)
}
StorageType::File => (ObjectStoreConfig::File(FileConfig {}), TempDirGuard::None),
}
}
pub enum TempDirGuard {
File(TempDir),
None,
S3(TempFolder),
Oss(TempFolder),
Azblob(TempFolder),
@@ -265,11 +253,21 @@ pub enum TempDirGuard {
}
pub struct TestGuard {
pub wal_guard: WalGuard,
pub home_guard: FileDirGuard,
pub wal_guard: FileDirGuard,
pub storage_guard: StorageGuard,
}
pub struct WalGuard(pub TempDir);
pub struct FileDirGuard {
pub temp_dir: TempDir,
pub is_wal: bool,
}
impl FileDirGuard {
pub fn new(temp_dir: TempDir, is_wal: bool) -> Self {
Self { temp_dir, is_wal }
}
}
pub struct StorageGuard(pub TempDirGuard);
@@ -289,28 +287,36 @@ pub fn create_tmp_dir_and_datanode_opts(
store_type: StorageType,
name: &str,
) -> (DatanodeOptions, TestGuard) {
let home_tmp_dir = create_temp_dir(&format!("gt_data_{name}"));
let wal_tmp_dir = create_temp_dir(&format!("gt_wal_{name}"));
let home_dir = home_tmp_dir.path().to_str().unwrap().to_string();
let wal_dir = wal_tmp_dir.path().to_str().unwrap().to_string();
let (store, data_tmp_dir) = get_test_store_config(&store_type, name);
let opts = create_datanode_opts(store, wal_dir);
let (store, data_tmp_dir) = get_test_store_config(&store_type);
let opts = create_datanode_opts(store, home_dir, wal_dir);
(
opts,
TestGuard {
wal_guard: WalGuard(wal_tmp_dir),
home_guard: FileDirGuard::new(home_tmp_dir, false),
wal_guard: FileDirGuard::new(wal_tmp_dir, true),
storage_guard: StorageGuard(data_tmp_dir),
},
)
}
pub fn create_datanode_opts(store: ObjectStoreConfig, wal_dir: String) -> DatanodeOptions {
pub fn create_datanode_opts(
store: ObjectStoreConfig,
home_dir: String,
wal_dir: String,
) -> DatanodeOptions {
DatanodeOptions {
wal: WalConfig {
dir: Some(wal_dir),
..Default::default()
},
storage: StorageConfig {
data_home: home_dir,
store,
..Default::default()
},

View File

@@ -27,21 +27,16 @@ use frontend::instance::Instance;
use table::engine::{region_name, table_dir};
use crate::cluster::{GreptimeDbCluster, GreptimeDbClusterBuilder};
use crate::test_util::{create_tmp_dir_and_datanode_opts, StorageType, TempDirGuard, TestGuard};
use crate::test_util::{create_tmp_dir_and_datanode_opts, StorageType, TestGuard};
pub struct MockDistributedInstance(GreptimeDbCluster);
impl MockDistributedInstance {
pub fn data_tmp_dirs(&self) -> Vec<&TempDir> {
self.0
.storage_guards
._dir_guards
.iter()
.map(|g| {
let TempDirGuard::File(dir) = &g.0 else {
unreachable!()
};
dir
})
.filter_map(|d| if !d.is_wal { Some(&d.temp_dir) } else { None })
.collect()
}
@@ -65,10 +60,7 @@ pub struct MockStandaloneInstance {
impl MockStandaloneInstance {
pub fn data_tmp_dir(&self) -> &TempDir {
let TempDirGuard::File(dir) = &self._guard.storage_guard.0 else {
unreachable!()
};
dir
&self._guard.home_guard.temp_dir
}
}

View File

@@ -597,6 +597,7 @@ pub async fn test_config_api(store_type: StorageType) {
enable_memory_catalog = false
rpc_addr = "127.0.0.1:3001"
rpc_runtime_size = 8
enable_telemetry = true
[heartbeat]
interval_millis = 5000

View File

@@ -79,13 +79,18 @@ macro_rules! region_failover_tests {
}
pub async fn test_region_failover(store_type: StorageType) {
if store_type == StorageType::File {
// Region failover doesn't make sense when using local file storage.
return;
}
common_telemetry::init_default_ut_logging();
info!("Running region failover test for {}", store_type);
let mut logical_timer = 1685508715000;
let cluster_name = "test_region_failover";
let (store_config, _guard) = get_test_store_config(&store_type, cluster_name);
let (store_config, _guard) = get_test_store_config(&store_type);
let datanodes = 5u64;
let cluster = GreptimeDbClusterBuilder::new(cluster_name)