mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 20:02:54 +00:00
@@ -31,7 +31,7 @@ use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_catalog::format_full_table_name;
|
||||
use common_error::prelude::BoxedError;
|
||||
use common_query::Output;
|
||||
use common_telemetry::{debug, info};
|
||||
use common_telemetry::debug;
|
||||
use datanode::instance::sql::table_idents_to_full_name;
|
||||
use datanode::sql::SqlHandler;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
@@ -134,7 +134,7 @@ impl DistInstance {
|
||||
}
|
||||
);
|
||||
let table_route = table_routes.first().unwrap();
|
||||
info!(
|
||||
debug!(
|
||||
"Creating distributed table {table_name} with table routes: {}",
|
||||
serde_json::to_string_pretty(table_route)
|
||||
.unwrap_or_else(|_| format!("{table_route:#?}"))
|
||||
|
||||
@@ -415,7 +415,11 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::CreateRegionSnafu)?;
|
||||
info!("Mito engine created region: {:?}", region.id());
|
||||
info!(
|
||||
"Mito engine created region: {}, id: {}",
|
||||
region.name(),
|
||||
region.id()
|
||||
);
|
||||
regions.insert(*region_number, region);
|
||||
}
|
||||
|
||||
@@ -450,7 +454,12 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
.await?,
|
||||
);
|
||||
|
||||
logging::info!("Mito engine created table: {:?}.", table.table_info());
|
||||
logging::info!(
|
||||
"Mito engine created table: {} in schema: {}, table_id: {}.",
|
||||
table_name,
|
||||
schema_name,
|
||||
table_id
|
||||
);
|
||||
|
||||
self.tables
|
||||
.write()
|
||||
@@ -541,7 +550,11 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
Some(table as _)
|
||||
};
|
||||
|
||||
logging::info!("Mito engine opened table {}", table_name);
|
||||
logging::info!(
|
||||
"Mito engine opened table: {} in schema: {}",
|
||||
table_name,
|
||||
schema_name
|
||||
);
|
||||
|
||||
Ok(table)
|
||||
}
|
||||
|
||||
@@ -16,7 +16,7 @@ use std::collections::HashSet;
|
||||
use std::fmt::{Debug, Formatter};
|
||||
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_telemetry::{error, info};
|
||||
use common_telemetry::{debug, error};
|
||||
use store_api::logstore::LogStore;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
@@ -115,7 +115,7 @@ impl<S: LogStore> CompactionTaskImpl<S> {
|
||||
files_to_add: Vec::from_iter(output.into_iter()),
|
||||
files_to_remove: Vec::from_iter(input.into_iter()),
|
||||
};
|
||||
info!(
|
||||
debug!(
|
||||
"Compacted region: {}, region edit: {:?}",
|
||||
version.metadata().name(),
|
||||
edit
|
||||
|
||||
@@ -17,7 +17,7 @@ use std::sync::{Arc, RwLock};
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_telemetry::logging::info;
|
||||
use common_telemetry::logging::debug;
|
||||
use object_store::{util, ObjectStore};
|
||||
use snafu::ResultExt;
|
||||
use store_api::logstore::LogStore;
|
||||
@@ -306,7 +306,11 @@ impl<S: LogStore> EngineInner<S> {
|
||||
Some(v) => v,
|
||||
};
|
||||
guard.update(RegionSlot::Ready(region.clone()));
|
||||
info!("Storage engine open region {}", region.id());
|
||||
debug!(
|
||||
"Storage engine open region {}, id: {}",
|
||||
region.name(),
|
||||
region.id()
|
||||
);
|
||||
Ok(Some(region))
|
||||
}
|
||||
|
||||
@@ -343,7 +347,11 @@ impl<S: LogStore> EngineInner<S> {
|
||||
|
||||
guard.update(RegionSlot::Ready(region.clone()));
|
||||
|
||||
info!("Storage engine create region {}", region.id());
|
||||
debug!(
|
||||
"Storage engine create region {}, id: {}",
|
||||
region.name(),
|
||||
region.id()
|
||||
);
|
||||
|
||||
Ok(region)
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@ use std::time::Duration;
|
||||
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_error::prelude::BoxedError;
|
||||
use common_telemetry::tracing::log::info;
|
||||
use common_telemetry::tracing::log::{debug, info};
|
||||
use common_telemetry::{error, logging};
|
||||
use futures::TryStreamExt;
|
||||
use snafu::{ensure, ResultExt};
|
||||
@@ -770,7 +770,7 @@ impl WriterInner {
|
||||
.file_num();
|
||||
|
||||
if level0_file_num <= max_files_in_l0 {
|
||||
info!(
|
||||
debug!(
|
||||
"No enough SST files in level 0 (threshold: {}), skip compaction",
|
||||
max_files_in_l0
|
||||
);
|
||||
|
||||
@@ -24,7 +24,7 @@ use std::sync::Arc;
|
||||
use async_trait::async_trait;
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
use common_telemetry::{error, info};
|
||||
use common_telemetry::{debug, error};
|
||||
use common_time::range::TimestampRange;
|
||||
use common_time::Timestamp;
|
||||
use datatypes::schema::SchemaRef;
|
||||
@@ -60,13 +60,21 @@ type LevelMetaVec = [LevelMeta; MAX_LEVEL as usize];
|
||||
/// Metadata of all SSTs under a region.
|
||||
///
|
||||
/// Files are organized into multiple level, though there may be only one level.
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Clone)]
|
||||
pub struct LevelMetas {
|
||||
levels: LevelMetaVec,
|
||||
sst_layer: AccessLayerRef,
|
||||
file_purger: FilePurgerRef,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for LevelMetas {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("LevelMetas")
|
||||
.field("levels", &self.levels)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl LevelMetas {
|
||||
/// Create a new LevelMetas and initialized each level.
|
||||
pub fn new(sst_layer: AccessLayerRef, file_purger: FilePurgerRef) -> LevelMetas {
|
||||
@@ -119,7 +127,7 @@ impl LevelMetas {
|
||||
}
|
||||
|
||||
/// Metadata of files in same SST level.
|
||||
#[derive(Debug, Default, Clone)]
|
||||
#[derive(Default, Clone)]
|
||||
pub struct LevelMeta {
|
||||
level: Level,
|
||||
/// Handles to the files in this level.
|
||||
@@ -128,6 +136,15 @@ pub struct LevelMeta {
|
||||
files: HashMap<FileId, FileHandle>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for LevelMeta {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("LevelMeta")
|
||||
.field("level", &self.level)
|
||||
.field("files", &self.files.keys())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl LevelMeta {
|
||||
pub fn new(level: Level) -> Self {
|
||||
Self {
|
||||
@@ -292,7 +309,7 @@ impl Drop for FileHandleInner {
|
||||
};
|
||||
match self.file_purger.schedule(request) {
|
||||
Ok(res) => {
|
||||
info!(
|
||||
debug!(
|
||||
"Scheduled SST purge task, region: {}, name: {}, res: {}",
|
||||
self.meta.region_id,
|
||||
self.meta.file_id.as_parquet(),
|
||||
@@ -301,7 +318,7 @@ impl Drop for FileHandleInner {
|
||||
}
|
||||
Err(e) => {
|
||||
error!(e; "Failed to schedule SST purge task, region: {}, name: {}",
|
||||
self.meta.region_id, self.meta.file_id.as_parquet());
|
||||
self.meta.region_id, self.meta.file_id.as_parquet());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,7 +24,7 @@
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_telemetry::info;
|
||||
use common_telemetry::{debug, info};
|
||||
use store_api::manifest::ManifestVersion;
|
||||
use store_api::storage::{SchemaRef, SequenceNumber};
|
||||
|
||||
@@ -237,7 +237,8 @@ impl Version {
|
||||
self.manifest_version = manifest_version;
|
||||
let ssts = self.ssts.merge(files, std::iter::empty());
|
||||
info!(
|
||||
"After applying checkpoint, region: {}, flushed_sequence: {}, manifest_version: {}",
|
||||
"After applying checkpoint, region: {}, id: {}, flushed_sequence: {}, manifest_version: {}",
|
||||
self.metadata.name(),
|
||||
self.metadata.id(),
|
||||
self.flushed_sequence,
|
||||
self.manifest_version,
|
||||
@@ -267,8 +268,9 @@ impl Version {
|
||||
.ssts
|
||||
.merge(handles_to_add, edit.files_to_remove.into_iter());
|
||||
|
||||
info!(
|
||||
"After applying edit, region: {}, SST files: {:?}",
|
||||
debug!(
|
||||
"After applying edit, region: {}, id: {}, SST files: {:?}",
|
||||
self.metadata.name(),
|
||||
self.metadata.id(),
|
||||
merged_ssts
|
||||
);
|
||||
|
||||
Reference in New Issue
Block a user