chore: more logs

This commit is contained in:
discord9
2025-03-09 23:29:58 +08:00
parent e522e8959b
commit cbd58291da
4 changed files with 31 additions and 11 deletions

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use clap::Parser;
use common_telemetry::info;
use common_time::timestamp::TimeUnit;
use datanode::config::StorageConfig;
use meta_client::MetaClientOptions;
@@ -96,7 +97,7 @@ async fn main() {
.list(&parquet_dir)
.await
.expect("Failed to list parquet files");
info!("Listed all files in parquet directory: {:?}", all_parquets);
let all_parquets = all_parquets
.iter()
.filter(|parquet| parquet.name().ends_with(".parquet") && parquet.metadata().is_file())
@@ -106,11 +107,16 @@ async fn main() {
.iter()
.map(|parquet| {
let full_table_name = parquet.name().split("-").next().unwrap();
let mut names = full_table_name.split('.').rev();
let mut names = full_table_name.split('_').rev();
let table_name = names.next().unwrap();
let schema_name = names.next().unwrap();
let schema_name = names.next().unwrap_or("public");
let catalog_name = names.next().unwrap_or("greptime");
info!(
"catalog: {}, schema: {}, table: {}",
catalog_name, schema_name, table_name
);
InputFile {
catalog: catalog_name.to_string(),
schema: schema_name.to_string(),
@@ -140,11 +146,14 @@ async fn main() {
.iter()
.map(|entry| {
let full_table_name = entry.name().split("-").next().unwrap();
let mut names = full_table_name.split('.').rev();
let mut names = full_table_name.split('_').rev();
let table_name = names.next().unwrap();
let schema_name = names.next().unwrap();
let schema_name = names.next().unwrap_or("public");
let catalog_name = names.next().unwrap_or("greptime");
info!(
"catalog: {}, schema: {}, table: {}",
catalog_name, schema_name, table_name
);
InputFile {
catalog: catalog_name.to_string(),
schema: schema_name.to_string(),
@@ -198,7 +207,10 @@ async fn send_ingest_requests(
) -> Result<(), Box<dyn std::error::Error>> {
let client = reqwest::Client::new();
for req in reqs {
client.post(addr).json(&req).send().await?;
info!("ingesting sst: {req:?}");
let req = client.post(addr).json(&req);
let resp = req.send().await?;
info!("ingest response: {resp:?}");
}
Ok(())
}

View File

@@ -16,6 +16,7 @@
use std::sync::Arc;
use common_telemetry::info;
use datanode::config::StorageConfig;
use datanode::datanode::DatanodeBuilder;
use meta_client::MetaClientOptions;
@@ -214,9 +215,10 @@ pub async fn new_object_store_manager(config: &StorageConfig) -> Result<ObjectSt
/// Creates a input store from a path.
pub async fn new_input_store(path: &str) -> Result<ObjectStore> {
let builder = Fs::default().root(path);
info!("Creating input store, path: {}", path);
let object_store = ObjectStore::new(builder)
.context(ObjectStoreSnafu)?
.finish();
info!("Created input store: {:?}", object_store);
Ok(object_store)
}