refactor: remove re-export from logging (#3865)

* refactor: remove re-export from logging

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix merge problem

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* run formatter

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2024-05-06 21:26:01 +08:00
committed by GitHub
parent 573c19be32
commit 530353785c
38 changed files with 112 additions and 120 deletions

View File

@@ -24,7 +24,7 @@ use common_meta::peer::Peer;
use common_query::physical_plan::TaskContext;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_telemetry::logging::warn;
use common_telemetry::warn;
use common_time::timestamp::Timestamp;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;

View File

@@ -26,7 +26,7 @@ use common_error::ext::ErrorExt;
use common_meta::cache_invalidator::MultiCacheInvalidator;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::logging;
use common_telemetry::debug;
use either::Either;
use meta_client::client::MetaClientBuilder;
use query::datafusion::DatafusionQueryEngine;
@@ -78,7 +78,7 @@ impl Repl {
let history_file = history_file();
if let Err(e) = rl.load_history(&history_file) {
logging::debug!(
debug!(
"failed to load history file on {}, error: {e}",
history_file.display()
);
@@ -225,7 +225,7 @@ impl Drop for Repl {
if self.rl.helper().is_some() {
let history_file = history_file();
if let Err(e) = self.rl.save_history(&history_file) {
logging::debug!(
debug!(
"failed to save history file on {}, error: {e}",
history_file.display()
);

View File

@@ -18,7 +18,7 @@ use std::time::Duration;
use async_trait::async_trait;
use catalog::kvbackend::MetaKvBackend;
use clap::Parser;
use common_telemetry::{info, logging};
use common_telemetry::info;
use common_wal::config::DatanodeWalConfig;
use datanode::config::DatanodeOptions;
use datanode::datanode::{Datanode, DatanodeBuilder};
@@ -210,8 +210,8 @@ impl StartCommand {
.await
.context(StartDatanodeSnafu)?;
logging::info!("Datanode start command: {:#?}", self);
logging::info!("Datanode options: {:#?}", opts);
info!("Datanode start command: {:#?}", self);
info!("Datanode options: {:#?}", opts);
let node_id = opts
.node_id

View File

@@ -22,7 +22,7 @@ use client::client_manager::DatanodeClients;
use common_meta::cache_invalidator::MultiCacheInvalidator;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_telemetry::logging;
use common_telemetry::info;
use common_time::timezone::set_default_timezone;
use frontend::frontend::FrontendOptions;
use frontend::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler;
@@ -219,8 +219,8 @@ impl StartCommand {
.await
.context(StartFrontendSnafu)?;
logging::info!("Frontend start command: {:#?}", self);
logging::info!("Frontend options: {:#?}", opts);
info!("Frontend start command: {:#?}", self);
info!("Frontend options: {:#?}", opts);
set_default_timezone(opts.default_timezone.as_deref()).context(InitTimezoneSnafu)?;

View File

@@ -16,7 +16,7 @@ use std::time::Duration;
use async_trait::async_trait;
use clap::Parser;
use common_telemetry::logging;
use common_telemetry::info;
use meta_srv::bootstrap::MetasrvInstance;
use meta_srv::metasrv::MetasrvOptions;
use snafu::ResultExt;
@@ -198,8 +198,8 @@ impl StartCommand {
.await
.context(StartMetaServerSnafu)?;
logging::info!("Metasrv start command: {:#?}", self);
logging::info!("Metasrv options: {:#?}", opts);
info!("Metasrv start command: {:#?}", self);
info!("Metasrv options: {:#?}", opts);
let builder = meta_srv::bootstrap::metasrv_builder(&opts, plugins.clone(), None)
.await

View File

@@ -20,7 +20,7 @@ use common_meta::rpc::procedure::MigrateRegionRequest;
use common_query::error::Error::ThreadJoin;
use common_query::error::{InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result};
use common_query::prelude::{Signature, TypeSignature, Volatility};
use common_telemetry::logging::error;
use common_telemetry::error;
use datatypes::data_type::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::{Value, ValueRef};

View File

@@ -24,7 +24,7 @@ use async_trait::async_trait;
use backon::ExponentialBuilder;
use common_runtime::{RepeatedTask, TaskFunction};
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{info, logging, tracing};
use common_telemetry::{error, info, tracing};
use snafu::{ensure, ResultExt};
use tokio::sync::watch::{self, Receiver, Sender};
use tokio::sync::{Mutex as TokioMutex, Notify};
@@ -244,20 +244,18 @@ impl ManagerContext {
) -> Option<LoadedProcedure> {
let loaders = self.loaders.lock().unwrap();
let loader = loaders.get(&message.type_name).or_else(|| {
logging::error!(
error!(
"Loader not found, procedure_id: {}, type_name: {}",
procedure_id,
message.type_name
procedure_id, message.type_name
);
None
})?;
let procedure = loader(&message.data)
.map_err(|e| {
logging::error!(
error!(
"Failed to load procedure data, key: {}, source: {:?}",
procedure_id,
e
procedure_id, e
);
e
})
@@ -496,7 +494,7 @@ impl LocalManager {
continue;
};
logging::info!(
info!(
"Recover root procedure {}-{}, step: {}",
loaded_procedure.procedure.type_name(),
procedure_id,
@@ -521,7 +519,7 @@ impl LocalManager {
loaded_procedure.step,
loaded_procedure.procedure,
) {
logging::error!(e; "Failed to recover procedure {}", procedure_id);
error!(e; "Failed to recover procedure {}", procedure_id);
}
}
}
@@ -529,7 +527,7 @@ impl LocalManager {
/// Recovers unfinished procedures and reruns them.
async fn recover(&self) -> Result<()> {
logging::info!("LocalManager start to recover");
info!("LocalManager start to recover");
let recover_start = Instant::now();
let (messages, rollback_messages, finished_ids) =
@@ -539,19 +537,19 @@ impl LocalManager {
self.submit_recovered_messages(messages, InitProcedureState::Running);
if !finished_ids.is_empty() {
logging::info!(
info!(
"LocalManager try to clean finished procedures, num: {}",
finished_ids.len()
);
for procedure_id in finished_ids {
if let Err(e) = self.procedure_store.delete_procedure(procedure_id).await {
logging::error!(e; "Failed to delete procedure {}", procedure_id);
error!(e; "Failed to delete procedure {}", procedure_id);
}
}
}
logging::info!(
info!(
"LocalManager finish recovery, cost: {}ms",
recover_start.elapsed().as_millis()
);

View File

@@ -16,7 +16,7 @@ use std::sync::Arc;
use std::time::Duration;
use backon::{BackoffBuilder, ExponentialBuilder};
use common_telemetry::logging;
use common_telemetry::{debug, error, info};
use tokio::time;
use super::rwlock::OwnedKeyRwLockGuard;
@@ -54,7 +54,7 @@ impl ProcedureGuard {
impl Drop for ProcedureGuard {
fn drop(&mut self) {
if !self.finish {
logging::error!("Procedure {} exits unexpectedly", self.meta.id);
error!("Procedure {} exits unexpectedly", self.meta.id);
// Set state to failed. This is useful in test as runtime may not abort when the runner task panics.
// See https://github.com/tokio-rs/tokio/issues/2002 .
@@ -104,7 +104,7 @@ impl Runner {
// Ensure we can update the procedure state.
let mut guard = ProcedureGuard::new(self.meta.clone(), self.manager_ctx.clone());
logging::info!(
info!(
"Runner {}-{} starts",
self.procedure.type_name(),
self.meta.id
@@ -149,7 +149,7 @@ impl Runner {
for id in procedure_ids {
if let Err(e) = self.store.delete_procedure(id).await {
logging::error!(
error!(
e;
"Runner {}-{} failed to delete procedure {}",
self.procedure.type_name(),
@@ -160,7 +160,7 @@ impl Runner {
}
}
logging::info!(
info!(
"Runner {}-{} exits",
self.procedure.type_name(),
self.meta.id
@@ -260,7 +260,7 @@ impl Runner {
ProcedureState::Running | ProcedureState::Retrying { .. } => {
match self.procedure.execute(ctx).await {
Ok(status) => {
logging::debug!(
debug!(
"Execute procedure {}-{} once, status: {:?}, need_persist: {}",
self.procedure.type_name(),
self.meta.id,
@@ -299,7 +299,7 @@ impl Runner {
}
}
Err(e) => {
logging::error!(
error!(
e;
"Failed to execute procedure {}-{}, retry: {}",
self.procedure.type_name(),
@@ -394,7 +394,7 @@ impl Runner {
/// Extend the retry time to wait for the next retry.
async fn wait_on_err(&mut self, d: Duration, i: u64) {
logging::info!(
info!(
"Procedure {}-{} retry for the {} times after {} millis",
self.procedure.type_name(),
self.meta.id,
@@ -407,7 +407,7 @@ impl Runner {
async fn on_suspended(&mut self, subprocedures: Vec<ProcedureWithId>) {
let has_child = !subprocedures.is_empty();
for subprocedure in subprocedures {
logging::info!(
info!(
"Procedure {}-{} submit subprocedure {}-{}",
self.procedure.type_name(),
self.meta.id,
@@ -422,7 +422,7 @@ impl Runner {
);
}
logging::info!(
info!(
"Procedure {}-{} is waiting for subprocedures",
self.procedure.type_name(),
self.meta.id,
@@ -432,7 +432,7 @@ impl Runner {
if has_child {
self.meta.child_notify.notified().await;
logging::info!(
info!(
"Procedure {}-{} is waked up",
self.procedure.type_name(),
self.meta.id,
@@ -454,7 +454,7 @@ impl Runner {
)
.await
.map_err(|e| {
logging::error!(
error!(
e; "Failed to persist procedure {}-{}",
self.procedure.type_name(),
self.meta.id
@@ -470,7 +470,7 @@ impl Runner {
.commit_procedure(self.meta.id, self.step)
.await
.map_err(|e| {
logging::error!(
error!(
e; "Failed to commit procedure {}-{}",
self.procedure.type_name(),
self.meta.id
@@ -496,7 +496,7 @@ impl Runner {
.rollback_procedure(self.meta.id, message)
.await
.map_err(|e| {
logging::error!(
error!(
e; "Failed to write rollback key for procedure {}-{}",
self.procedure.type_name(),
self.meta.id
@@ -509,7 +509,7 @@ impl Runner {
fn done(&self, output: Option<Output>) {
// TODO(yingwen): Add files to remove list.
logging::info!(
info!(
"Procedure {}-{} done",
self.procedure.type_name(),
self.meta.id,

View File

@@ -15,7 +15,7 @@
use std::collections::HashMap;
use std::fmt;
use common_telemetry::logging;
use common_telemetry::{debug, error, info, warn};
use futures::TryStreamExt;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
@@ -65,7 +65,7 @@ impl ProcedureStore {
/// Creates a new [ProcedureStore] from specific [StateStoreRef].
pub(crate) fn new(parent_path: &str, store: StateStoreRef) -> ProcedureStore {
let proc_path = format!("{}{PROC_PATH}", parent_path);
logging::info!("The procedure state store path is: {}", &proc_path);
info!("The procedure state store path is: {}", &proc_path);
ProcedureStore { proc_path, store }
}
@@ -154,7 +154,7 @@ impl ProcedureStore {
while let Some((key_set, _)) = key_values.try_next().await? {
let key = key_set.key();
let Some(curr_key) = ParsedKey::parse_str(&self.proc_path, key) else {
logging::warn!("Unknown key while deleting procedures, key: {}", key);
warn!("Unknown key while deleting procedures, key: {}", key);
continue;
};
if curr_key.key_type == KeyType::Step {
@@ -165,11 +165,9 @@ impl ProcedureStore {
}
}
logging::debug!(
debug!(
"Delete keys for procedure {}, step_keys: {:?}, finish_keys: {:?}",
procedure_id,
step_keys,
finish_keys
procedure_id, step_keys, finish_keys
);
// We delete all step keys first.
self.store.batch_delete(step_keys.as_slice()).await?;
@@ -203,7 +201,7 @@ impl ProcedureStore {
while let Some((key_set, value)) = key_values.try_next().await? {
let key = key_set.key();
let Some(curr_key) = ParsedKey::parse_str(&self.proc_path, key) else {
logging::warn!("Unknown key while loading procedures, key: {}", key);
warn!("Unknown key while loading procedures, key: {}", key);
continue;
};
@@ -251,7 +249,7 @@ impl ProcedureStore {
serde_json::from_slice(value)
.map_err(|e| {
// `e` doesn't impl ErrorExt so we print it as normal error.
logging::error!("Failed to parse value, key: {:?}, source: {:?}", key, e);
error!("Failed to parse value, key: {:?}, source: {:?}", key, e);
e
})
.ok()

View File

@@ -17,7 +17,7 @@ use std::sync::Mutex;
use std::time::Duration;
use common_error::ext::ErrorExt;
use common_telemetry::logging;
use common_telemetry::{debug, error};
use snafu::{ensure, ResultExt};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
@@ -128,17 +128,16 @@ impl<E: ErrorExt + 'static> RepeatedTask<E> {
}
}
if let Err(e) = task_fn.call().await {
logging::error!(e; "Failed to run repeated task: {}", task_fn.name());
error!(e; "Failed to run repeated task: {}", task_fn.name());
}
}
});
inner.task_handle = Some(handle);
self.started.store(true, Ordering::Relaxed);
logging::debug!(
debug!(
"Repeated task {} started with interval: {:?}",
self.name,
self.interval
self.name, self.interval
);
Ok(())
@@ -162,7 +161,7 @@ impl<E: ErrorExt + 'static> RepeatedTask<E> {
.await
.context(WaitGcTaskStopSnafu { name: &self.name })?;
logging::debug!("Repeated task {} stopped", self.name);
debug!("Repeated task {} stopped", self.name);
Ok(())
}

View File

@@ -32,7 +32,6 @@ use tracing_subscriber::prelude::*;
use tracing_subscriber::{filter, EnvFilter, Registry};
use crate::tracing_sampler::{create_sampler, TracingSampleOptions};
pub use crate::{debug, error, info, trace, warn};
const DEFAULT_OTLP_ENDPOINT: &str = "http://localhost:4317";

View File

@@ -213,7 +213,7 @@ mod tests {
#[test]
fn test_log_error() {
crate::logging::init_default_ut_logging();
crate::init_default_ut_logging();
let err = MockError::new(StatusCode::Unknown);
let err_ref = &err;

View File

@@ -25,7 +25,7 @@ use std::time::Duration;
use std::{env, path};
use common_base::readable_size::ReadableSize;
use common_telemetry::logging::info;
use common_telemetry::info;
use object_store::layers::{LruCacheLayer, RetryLayer};
use object_store::services::Fs;
use object_store::util::{join_dir, normalize_dir, with_instrument_layers};

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use common_base::secrets::ExposeSecret;
use common_telemetry::logging::info;
use common_telemetry::info;
use object_store::services::Azblob;
use object_store::{util, ObjectStore};
use snafu::prelude::*;

View File

@@ -14,7 +14,7 @@
use std::{fs, path};
use common_telemetry::logging::info;
use common_telemetry::info;
use object_store::services::Fs;
use object_store::util::join_dir;
use object_store::ObjectStore;

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use common_base::secrets::ExposeSecret;
use common_telemetry::logging::info;
use common_telemetry::info;
use object_store::services::Gcs;
use object_store::{util, ObjectStore};
use snafu::prelude::*;

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use common_base::secrets::ExposeSecret;
use common_telemetry::logging::info;
use common_telemetry::info;
use object_store::services::Oss;
use object_store::{util, ObjectStore};
use snafu::prelude::*;

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use common_base::secrets::ExposeSecret;
use common_telemetry::logging::info;
use common_telemetry::info;
use object_store::services::S3;
use object_store::{util, ObjectStore};
use snafu::prelude::*;

View File

@@ -20,7 +20,7 @@ use arrow::datatypes::{DataType as ArrowDataType, Field};
use arrow_array::{Array, ListArray};
use common_base::bytes::{Bytes, StringBytes};
use common_decimal::Decimal128;
use common_telemetry::logging;
use common_telemetry::error;
use common_time::date::Date;
use common_time::datetime::DateTime;
use common_time::interval::IntervalUnit;
@@ -487,7 +487,7 @@ pub fn scalar_value_to_timestamp(
ScalarValue::Utf8(Some(s)) => match Timestamp::from_str(s, timezone) {
Ok(t) => Some(t),
Err(e) => {
logging::error!(e;"Failed to convert string literal {s} to timestamp");
error!(e;"Failed to convert string literal {s} to timestamp");
None
}
},

View File

@@ -26,7 +26,7 @@ use common_error::ext::BoxedError;
use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::{logging, tracing};
use common_telemetry::{debug, tracing};
use operator::insert::InserterRef;
use operator::statement::StatementExecutor;
use prost::Message;
@@ -119,7 +119,7 @@ impl Instance {
let logical_plan =
prom_store::query_to_plan(dataframe, query).context(PromStoreRemoteQueryPlanSnafu)?;
logging::debug!(
debug!(
"Prometheus remote read, table: {}, logical plan: {}",
table_name,
logical_plan.display_indent(),

View File

@@ -21,7 +21,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use common_base::BitVec;
use common_telemetry::logging;
use common_telemetry::{debug, error};
use futures::stream;
use crate::inverted_index::create::sort::external_provider::ExternalTempFileProvider;
@@ -254,9 +254,9 @@ impl ExternalSorter {
let entries = values.len();
IntermediateWriter::new(writer).write_all(values, bitmap_leading_zeros as _).await.inspect(|_|
logging::debug!("Dumped {entries} entries ({memory_usage} bytes) to intermediate file {file_id} for index {index_name}")
debug!("Dumped {entries} entries ({memory_usage} bytes) to intermediate file {file_id} for index {index_name}")
).inspect_err(|e|
logging::error!("Failed to dump {entries} entries to intermediate file {file_id} for index {index_name}. Error: {e}")
error!("Failed to dump {entries} entries to intermediate file {file_id} for index {index_name}. Error: {e}")
)
}

View File

@@ -47,7 +47,7 @@ mod tests {
#[test]
pub fn test_default_config() {
common_telemetry::logging::init_default_ut_logging();
common_telemetry::common_telemetry::init_default_ut_logging();
let default = LogConfig::default();
info!("LogConfig::default(): {:?}", default);
assert_eq!(1024 * 1024 * 1024, default.file_size);

View File

@@ -17,7 +17,7 @@ use std::pin::Pin;
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::{Arc, RwLock};
use common_telemetry::logging;
use common_telemetry::warn;
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
@@ -149,7 +149,7 @@ impl Scheduler for LocalScheduler {
impl Drop for LocalScheduler {
fn drop(&mut self) {
if self.state.load(Ordering::Relaxed) != STATE_STOP {
logging::warn!("scheduler should be stopped before dropping, which means the state of scheduler must be STATE_STOP");
warn!("scheduler should be stopped before dropping, which means the state of scheduler must be STATE_STOP");
// We didn't call `stop()` so we cancel all background workers here.
self.sender.write().unwrap().take();

View File

@@ -22,7 +22,7 @@ use opendal::raw::{
};
use opendal::Result;
mod read_cache;
use common_telemetry::logging::info;
use common_telemetry::info;
use read_cache::ReadCache;
/// An opendal layer with local LRU file cache supporting.

View File

@@ -14,7 +14,7 @@
use std::sync::Arc;
use common_telemetry::logging::debug;
use common_telemetry::debug;
use futures::FutureExt;
use moka::future::Cache;
use moka::notification::ListenerFuture;

View File

@@ -16,7 +16,7 @@ use std::env;
use std::sync::Arc;
use anyhow::Result;
use common_telemetry::logging;
use common_telemetry::info;
use common_test_util::temp_dir::create_temp_dir;
use object_store::layers::LruCacheLayer;
use object_store::services::{Fs, S3};
@@ -109,10 +109,10 @@ async fn test_fs_backend() -> Result<()> {
#[tokio::test]
async fn test_s3_backend() -> Result<()> {
logging::init_default_ut_logging();
common_telemetry::init_default_ut_logging();
if let Ok(bucket) = env::var("GT_S3_BUCKET") {
if !bucket.is_empty() {
logging::info!("Running s3 test.");
info!("Running s3 test.");
let root = uuid::Uuid::new_v4().to_string();
@@ -138,10 +138,10 @@ async fn test_s3_backend() -> Result<()> {
#[tokio::test]
async fn test_oss_backend() -> Result<()> {
logging::init_default_ut_logging();
common_telemetry::init_default_ut_logging();
if let Ok(bucket) = env::var("GT_OSS_BUCKET") {
if !bucket.is_empty() {
logging::info!("Running oss test.");
info!("Running oss test.");
let root = uuid::Uuid::new_v4().to_string();
@@ -166,10 +166,10 @@ async fn test_oss_backend() -> Result<()> {
#[tokio::test]
async fn test_azblob_backend() -> Result<()> {
logging::init_default_ut_logging();
common_telemetry::init_default_ut_logging();
if let Ok(container) = env::var("GT_AZBLOB_CONTAINER") {
if !container.is_empty() {
logging::info!("Running azblob test.");
info!("Running azblob test.");
let root = uuid::Uuid::new_v4().to_string();
@@ -193,10 +193,10 @@ async fn test_azblob_backend() -> Result<()> {
#[tokio::test]
async fn test_gcs_backend() -> Result<()> {
logging::init_default_ut_logging();
common_telemetry::init_default_ut_logging();
if let Ok(container) = env::var("GT_AZBLOB_CONTAINER") {
if !container.is_empty() {
logging::info!("Running azblob test.");
info!("Running azblob test.");
let mut builder = Gcs::default();
builder
@@ -219,7 +219,7 @@ async fn test_gcs_backend() -> Result<()> {
#[tokio::test]
async fn test_file_backend_with_lru_cache() -> Result<()> {
logging::init_default_ut_logging();
common_telemetry::init_default_ut_logging();
let data_dir = create_temp_dir("test_file_backend_with_lru_cache");
let tmp_dir = create_temp_dir("test_file_backend_with_lru_cache");

View File

@@ -20,8 +20,8 @@ use catalog::CatalogManagerRef;
use common_catalog::build_db_string;
use common_meta::node_manager::{AffectedRows, NodeManagerRef};
use common_meta::peer::Peer;
use common_telemetry::logging::{error, info};
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{error, info};
use futures_util::future;
use partition::manager::{PartitionInfo, PartitionRuleManagerRef};
use session::context::QueryContextRef;

View File

@@ -22,7 +22,7 @@ use catalog::{OpenSystemTableHook, RegisterSystemTableRequest};
use common_catalog::consts::{default_engine, DEFAULT_SCHEMA_NAME};
use common_error::ext::ErrorExt;
use common_query::Output;
use common_telemetry::logging;
use common_telemetry::info;
use futures::future::FutureExt;
use query::QueryEngineRef;
use servers::query_handler::grpc::GrpcQueryHandlerRef;
@@ -106,11 +106,11 @@ impl<E: ErrorExt + Send + Sync + 'static> ScriptManager<E> {
let mut compiled = self.compiled.write().unwrap();
let _ = compiled.insert(name.to_string(), script.clone());
}
logging::info!("Compiled and cached script: {}", name);
info!("Compiled and cached script: {}", name);
script.as_ref().register_udf().await;
logging::info!("Script register as UDF: {}", name);
info!("Script register as UDF: {}", name);
Ok(script)
}

View File

@@ -25,7 +25,7 @@ use catalog::error::CompileScriptInternalSnafu;
use common_error::ext::{BoxedError, ErrorExt};
use common_query::OutputData;
use common_recordbatch::{util as record_util, RecordBatch, SendableRecordBatchStream};
use common_telemetry::logging;
use common_telemetry::{debug, info, warn};
use common_time::util;
use datafusion::datasource::DefaultTableSource;
use datafusion::logical_expr::{and, col, lit};
@@ -127,7 +127,7 @@ impl<E: ErrorExt + Send + Sync + 'static> ScriptsTable<E> {
script_list.extend(part_of_scripts_list);
}
logging::info!(
info!(
"Found {} scripts in {}",
script_list.len(),
table_info.full_table_name()
@@ -137,16 +137,15 @@ impl<E: ErrorExt + Send + Sync + 'static> ScriptsTable<E> {
match PyScript::from_script(&script, query_engine.clone()) {
Ok(script) => {
script.register_udf().await;
logging::debug!(
debug!(
"Script in `scripts` system table re-register as UDF: {}",
name
);
}
Err(err) => {
logging::warn!(
warn!(
r#"Failed to compile script "{}"" in `scripts` table: {}"#,
name,
err
name, err
);
}
}
@@ -189,7 +188,7 @@ impl<E: ErrorExt + Send + Sync + 'static> ScriptsTable<E> {
.map_err(BoxedError::new)
.context(InsertScriptSnafu { name })?;
logging::info!(
info!(
"Inserted script: {} into scripts table: {}, output: {:?}.",
name,
table_info.full_table_name(),

View File

@@ -29,8 +29,7 @@ use async_trait::async_trait;
use common_grpc::channel_manager::{
DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
};
use common_telemetry::logging::info;
use common_telemetry::{error, warn};
use common_telemetry::{error, info, warn};
use futures::FutureExt;
use snafu::{ensure, OptionExt, ResultExt};
use tokio::net::TcpListener;

View File

@@ -28,7 +28,7 @@ use common_error::status_code::StatusCode;
use common_query::Output;
use common_runtime::Runtime;
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{logging, tracing};
use common_telemetry::{debug, error, tracing};
use common_time::timezone::parse_timezone;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{OptionExt, ResultExt};
@@ -91,10 +91,10 @@ impl GreptimeRequestHandler {
.await
.map_err(|e| {
if e.status_code().should_log_error() {
logging::error!(e; "Failed to handle request");
error!(e; "Failed to handle request");
} else {
// Currently, we still print a debug log.
logging::debug!("Failed to handle request, err: {:?}", e);
debug!("Failed to handle request, err: {:?}", e);
}
e
})

View File

@@ -31,7 +31,7 @@ use common_base::readable_size::ReadableSize;
use common_base::Plugins;
use common_error::status_code::StatusCode;
use common_recordbatch::RecordBatch;
use common_telemetry::logging::{error, info};
use common_telemetry::{error, info};
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datatypes::data_type::DataType;

View File

@@ -17,7 +17,7 @@ use axum::response::{IntoResponse, Response};
use axum::Json;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_telemetry::logging::{debug, error};
use common_telemetry::{debug, error};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

View File

@@ -23,7 +23,6 @@ pub mod handler {
use axum::extract::Query;
use axum::http::StatusCode;
use axum::response::IntoResponse;
use common_telemetry::logging;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
@@ -64,7 +63,7 @@ pub mod handler {
#[axum_macros::debug_handler]
pub async fn pprof_handler(Query(req): Query<PprofQuery>) -> Result<impl IntoResponse> {
logging::info!("start pprof, request: {:?}", req);
info!("start pprof, request: {:?}", req);
let profiling = Profiling::new(Duration::from_secs(req.seconds), req.frequency.into());
let body = match req.output {
@@ -76,7 +75,7 @@ pub mod handler {
Output::Flamegraph => profiling.dump_flamegraph().await.context(DumpPprofSnafu)?,
};
logging::info!("finish pprof");
info!("finish pprof");
Ok((StatusCode::OK, body))
}

View File

@@ -24,7 +24,7 @@ use chrono::{NaiveDate, NaiveDateTime};
use common_catalog::parse_optional_catalog_and_schema_from_db_string;
use common_error::ext::ErrorExt;
use common_query::Output;
use common_telemetry::{debug, error, logging, tracing, warn};
use common_telemetry::{debug, error, tracing, warn};
use datatypes::prelude::ConcreteDataType;
use itertools::Itertools;
use opensrv_mysql::{
@@ -327,7 +327,7 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for MysqlInstanceShi
}
};
logging::debug!("Mysql execute prepared plan: {}", plan.display_indent());
debug!("Mysql execute prepared plan: {}", plan.display_indent());
vec![
self.do_exec_plan(&sql_plan.query, plan, query_ctx.clone())
.await,
@@ -335,7 +335,7 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for MysqlInstanceShi
}
None => {
let query = replace_params(params, sql_plan.query);
logging::debug!("Mysql execute replaced query: {}", query);
debug!("Mysql execute replaced query: {}", query);
self.do_query(&query, query_ctx.clone()).await
}
};

View File

@@ -18,7 +18,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use common_runtime::Runtime;
use common_telemetry::logging::{error, info};
use common_telemetry::{error, info};
use futures::future::{try_join_all, AbortHandle, AbortRegistration, Abortable};
use snafu::{ensure, ResultExt};
use tokio::sync::{Mutex, RwLock};

View File

@@ -30,7 +30,7 @@ use common_grpc::flight::{FlightDecoder, FlightMessage};
use common_query::Output;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::RecordBatchStreamWrapper;
use common_telemetry::logging;
use common_telemetry::error;
use common_telemetry::tracing_context::W3cTrace;
use futures_util::StreamExt;
use prost::Message;
@@ -188,7 +188,7 @@ impl Database {
addr: client.addr().to_string(),
source: BoxedError::new(ServerSnafu { code, msg }.build()),
};
logging::error!(
error!(
"Failed to do Flight get, addr: {}, code: {}, source: {:?}",
client.addr(),
tonic_code,

View File

@@ -20,7 +20,6 @@ use client::OutputData;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_query::Output;
use common_recordbatch::util;
use common_telemetry::logging;
use common_test_util::recordbatch::check_output_stream;
use common_test_util::temp_dir;
use datatypes::vectors::{StringVector, TimestampMillisecondVector, UInt64Vector, VectorRef};
@@ -1629,7 +1628,9 @@ async fn test_execute_copy_to_s3(instance: Arc<dyn MockInstance>) {
#[apply(both_instances_cases)]
async fn test_execute_copy_from_s3(instance: Arc<dyn MockInstance>) {
logging::init_default_ut_logging();
use common_telemetry::info;
common_telemetry::init_default_ut_logging();
if let Ok(bucket) = env::var("GT_S3_BUCKET") {
if !bucket.is_empty() {
let instance = instance.frontend();
@@ -1706,7 +1707,7 @@ async fn test_execute_copy_from_s3(instance: Arc<dyn MockInstance>) {
"{} CONNECTION (ACCESS_KEY_ID='{}',SECRET_ACCESS_KEY='{}',REGION='{}')",
test.sql, key_id, key, region,
);
logging::info!("Running sql: {}", sql);
info!("Running sql: {}", sql);
let output = execute_sql(&instance, &sql).await.data;
assert!(matches!(output, OutputData::AffectedRows(2)));
@@ -1732,7 +1733,7 @@ async fn test_execute_copy_from_s3(instance: Arc<dyn MockInstance>) {
#[apply(both_instances_cases)]
async fn test_execute_copy_from_orc_with_cast(instance: Arc<dyn MockInstance>) {
logging::init_default_ut_logging();
common_telemetry::init_default_ut_logging();
let instance = instance.frontend();
// setups
@@ -1766,7 +1767,7 @@ async fn test_execute_copy_from_orc_with_cast(instance: Arc<dyn MockInstance>) {
#[apply(both_instances_cases)]
async fn test_execute_copy_from_orc(instance: Arc<dyn MockInstance>) {
logging::init_default_ut_logging();
common_telemetry::init_default_ut_logging();
let instance = instance.frontend();
// setups
@@ -1880,7 +1881,7 @@ async fn test_information_schema_dot_tables(instance: Arc<dyn MockInstance>) {
#[apply(both_instances_cases)]
async fn test_information_schema_dot_columns(instance: Arc<dyn MockInstance>) {
logging::init_default_ut_logging();
common_telemetry::init_default_ut_logging();
let instance = instance.frontend();
let sql = "create table another_table(i timestamp time index)";