fix: can't print log because the tracing guard is dropped (#4005)

* fix: avoid logging guard drop

* chore: remove unused '#[allow(dead_code)]'
This commit is contained in:
zyy17
2024-05-22 11:24:40 +08:00
committed by GitHub
parent e070ba3c32
commit b86d79b906
10 changed files with 76 additions and 35 deletions

1
Cargo.lock generated
View File

@@ -1662,6 +1662,7 @@ dependencies = [
"tikv-jemallocator",
"tokio",
"toml 0.8.12",
"tracing-appender",
]
[[package]]

View File

@@ -74,6 +74,7 @@ substrait.workspace = true
table.workspace = true
tokio.workspace = true
toml.workspace = true
tracing-appender = "0.2"
[target.'cfg(not(windows))'.dependencies]
tikv-jemallocator = "0.5"

View File

@@ -31,6 +31,7 @@ use async_trait::async_trait;
use bench::BenchTableMetadataCommand;
use clap::Parser;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
use tracing_appender::non_blocking::WorkerGuard;
// pub use repl::Repl;
use upgrade::UpgradeCommand;
@@ -48,11 +49,17 @@ pub trait Tool: Send + Sync {
pub struct Instance {
tool: Box<dyn Tool>,
// Keep the logging guard to prevent the worker from being dropped.
_guard: Vec<WorkerGuard>,
}
impl Instance {
fn new(tool: Box<dyn Tool>) -> Self {
Self { tool }
fn new(tool: Box<dyn Tool>, guard: Vec<WorkerGuard>) -> Self {
Self {
tool,
_guard: guard,
}
}
}
@@ -83,14 +90,14 @@ pub struct Command {
impl Command {
pub async fn build(&self, opts: LoggingOptions) -> Result<Instance> {
let _guard = common_telemetry::init_global_logging(
let guard = common_telemetry::init_global_logging(
APP_NAME,
&opts,
&TracingOptions::default(),
None,
);
self.cmd.build().await
self.cmd.build(guard).await
}
pub fn load_options(&self, global_options: &GlobalOptions) -> Result<LoggingOptions> {
@@ -115,12 +122,12 @@ enum SubCommand {
}
impl SubCommand {
async fn build(&self) -> Result<Instance> {
async fn build(&self, guard: Vec<WorkerGuard>) -> Result<Instance> {
match self {
// SubCommand::Attach(cmd) => cmd.build().await,
SubCommand::Upgrade(cmd) => cmd.build().await,
SubCommand::Bench(cmd) => cmd.build().await,
SubCommand::Export(cmd) => cmd.build().await,
SubCommand::Upgrade(cmd) => cmd.build(guard).await,
SubCommand::Bench(cmd) => cmd.build(guard).await,
SubCommand::Export(cmd) => cmd.build(guard).await,
}
}
}

View File

@@ -30,6 +30,7 @@ use datatypes::schema::{ColumnSchema, RawSchema};
use rand::Rng;
use store_api::storage::RegionNumber;
use table::metadata::{RawTableInfo, RawTableMeta, TableId, TableIdent, TableType};
use tracing_appender::non_blocking::WorkerGuard;
use self::metadata::TableMetadataBencher;
use crate::cli::{Instance, Tool};
@@ -61,7 +62,7 @@ pub struct BenchTableMetadataCommand {
}
impl BenchTableMetadataCommand {
pub async fn build(&self) -> Result<Instance> {
pub async fn build(&self, guard: Vec<WorkerGuard>) -> Result<Instance> {
let etcd_store = EtcdStore::with_endpoints([&self.etcd_addr], 128)
.await
.unwrap();
@@ -72,7 +73,7 @@ impl BenchTableMetadataCommand {
table_metadata_manager,
count: self.count,
};
Ok(Instance::new(Box::new(tool)))
Ok(Instance::new(Box::new(tool), guard))
}
}

View File

@@ -30,6 +30,7 @@ use tokio::fs::File;
use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::sync::Semaphore;
use tokio::time::Instant;
use tracing_appender::non_blocking::WorkerGuard;
use crate::cli::{Instance, Tool};
use crate::error::{
@@ -80,7 +81,7 @@ pub struct ExportCommand {
}
impl ExportCommand {
pub async fn build(&self) -> Result<Instance> {
pub async fn build(&self, guard: Vec<WorkerGuard>) -> Result<Instance> {
let (catalog, schema) = split_database(&self.database)?;
let auth_header = if let Some(basic) = &self.auth_basic {
@@ -90,15 +91,18 @@ impl ExportCommand {
None
};
Ok(Instance::new(Box::new(Export {
addr: self.addr.clone(),
catalog,
schema,
output_dir: self.output_dir.clone(),
parallelism: self.export_jobs,
target: self.target.clone(),
auth_header,
})))
Ok(Instance::new(
Box::new(Export {
addr: self.addr.clone(),
catalog,
schema,
output_dir: self.output_dir.clone(),
parallelism: self.export_jobs,
target: self.target.clone(),
auth_header,
}),
guard,
))
}
}

View File

@@ -40,6 +40,7 @@ use etcd_client::Client;
use futures::TryStreamExt;
use prost::Message;
use snafu::ResultExt;
use tracing_appender::non_blocking::WorkerGuard;
use v1_helper::{CatalogKey as v1CatalogKey, SchemaKey as v1SchemaKey, TableGlobalValue};
use crate::cli::{Instance, Tool};
@@ -63,7 +64,7 @@ pub struct UpgradeCommand {
}
impl UpgradeCommand {
pub async fn build(&self) -> Result<Instance> {
pub async fn build(&self, guard: Vec<WorkerGuard>) -> Result<Instance> {
let client = Client::connect([&self.etcd_addr], None)
.await
.context(ConnectEtcdSnafu {
@@ -77,7 +78,7 @@ impl UpgradeCommand {
skip_schema_keys: self.skip_schema_keys,
skip_table_route_keys: self.skip_table_route_keys,
};
Ok(Instance::new(Box::new(tool)))
Ok(Instance::new(Box::new(tool), guard))
}
}

View File

@@ -29,6 +29,7 @@ use datanode::service::DatanodeServiceBuilder;
use meta_client::MetaClientOptions;
use servers::Mode;
use snafu::{OptionExt, ResultExt};
use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{
LoadLayeredConfigSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu,
@@ -40,11 +41,17 @@ pub const APP_NAME: &str = "greptime-datanode";
pub struct Instance {
datanode: Datanode,
// Keep the logging guard to prevent the worker from being dropped.
_guard: Vec<WorkerGuard>,
}
impl Instance {
pub fn new(datanode: Datanode) -> Self {
Self { datanode }
pub fn new(datanode: Datanode, guard: Vec<WorkerGuard>) -> Self {
Self {
datanode,
_guard: guard,
}
}
pub fn datanode_mut(&mut self) -> &mut Datanode {
@@ -228,7 +235,7 @@ impl StartCommand {
}
async fn build(&self, mut opts: DatanodeOptions) -> Result<Instance> {
let _guard = common_telemetry::init_global_logging(
let guard = common_telemetry::init_global_logging(
APP_NAME,
&opts.logging,
&opts.tracing,
@@ -274,7 +281,7 @@ impl StartCommand {
.context(StartDatanodeSnafu)?;
datanode.setup_services(services);
Ok(Instance::new(datanode))
Ok(Instance::new(datanode, guard))
}
}

View File

@@ -42,6 +42,7 @@ use meta_client::MetaClientOptions;
use servers::tls::{TlsMode, TlsOption};
use servers::Mode;
use snafu::{OptionExt, ResultExt};
use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{
self, InitTimezoneSnafu, LoadLayeredConfigSnafu, MissingConfigSnafu, Result, StartFrontendSnafu,
@@ -51,13 +52,19 @@ use crate::{log_versions, App};
pub struct Instance {
frontend: FeInstance,
// Keep the logging guard to prevent the worker from being dropped.
_guard: Vec<WorkerGuard>,
}
pub const APP_NAME: &str = "greptime-frontend";
impl Instance {
pub fn new(frontend: FeInstance) -> Self {
Self { frontend }
pub fn new(frontend: FeInstance, guard: Vec<WorkerGuard>) -> Self {
Self {
frontend,
_guard: guard,
}
}
pub fn mut_inner(&mut self) -> &mut FeInstance {
@@ -241,7 +248,7 @@ impl StartCommand {
}
async fn build(&self, mut opts: FrontendOptions) -> Result<Instance> {
let _guard = common_telemetry::init_global_logging(
let guard = common_telemetry::init_global_logging(
APP_NAME,
&opts.logging,
&opts.tracing,
@@ -358,7 +365,7 @@ impl StartCommand {
.build_servers(opts, servers)
.context(StartFrontendSnafu)?;
Ok(Instance::new(instance))
Ok(Instance::new(instance, guard))
}
}

View File

@@ -23,6 +23,7 @@ use common_version::{short_version, version};
use meta_srv::bootstrap::MetasrvInstance;
use meta_srv::metasrv::MetasrvOptions;
use snafu::ResultExt;
use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{self, LoadLayeredConfigSnafu, Result, StartMetaServerSnafu};
use crate::options::GlobalOptions;
@@ -32,11 +33,17 @@ pub const APP_NAME: &str = "greptime-metasrv";
pub struct Instance {
instance: MetasrvInstance,
// Keep the logging guard to prevent the worker from being dropped.
_guard: Vec<WorkerGuard>,
}
impl Instance {
fn new(instance: MetasrvInstance) -> Self {
Self { instance }
fn new(instance: MetasrvInstance, guard: Vec<WorkerGuard>) -> Self {
Self {
instance,
_guard: guard,
}
}
}
@@ -214,7 +221,7 @@ impl StartCommand {
}
async fn build(&self, mut opts: MetasrvOptions) -> Result<Instance> {
let _guard =
let guard =
common_telemetry::init_global_logging(APP_NAME, &opts.logging, &opts.tracing, None);
log_versions(version!(), short_version!());
@@ -234,7 +241,7 @@ impl StartCommand {
.await
.context(error::BuildMetaServerSnafu)?;
Ok(Instance::new(instance))
Ok(Instance::new(instance, guard))
}
}

View File

@@ -62,6 +62,7 @@ use servers::http::HttpOptions;
use servers::tls::{TlsMode, TlsOption};
use servers::Mode;
use snafu::{OptionExt, ResultExt};
use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{
BuildCacheRegistrySnafu, CacheRequiredSnafu, CreateDirSnafu, IllegalConfigSnafu,
@@ -210,6 +211,9 @@ pub struct Instance {
frontend: FeInstance,
procedure_manager: ProcedureManagerRef,
wal_options_allocator: WalOptionsAllocatorRef,
// Keep the logging guard to prevent the worker from being dropped.
_guard: Vec<WorkerGuard>,
}
#[async_trait]
@@ -375,7 +379,7 @@ impl StartCommand {
#[allow(unused_variables)]
#[allow(clippy::diverging_sub_expression)]
async fn build(&self, opts: StandaloneOptions) -> Result<Instance> {
let _guard =
let guard =
common_telemetry::init_global_logging(APP_NAME, &opts.logging, &opts.tracing, None);
log_versions(version!(), short_version!());
@@ -521,6 +525,7 @@ impl StartCommand {
frontend,
procedure_manager,
wal_options_allocator,
_guard: guard,
})
}