diff --git a/chore.md b/chore.md index 8880ad57bb..0d50b78d58 100644 --- a/chore.md +++ b/chore.md @@ -4,7 +4,9 @@ mysql --host=127.0.0.1 --port=19195 --database=public; ``` ```sql -CREATE TABLE IF NOT EXISTS `public`.`logsbench` ( +CREATE DATABASE IF NOT EXISTS `cluster1`; +USE `cluster1`; +CREATE TABLE IF NOT EXISTS `app1` ( `greptime_timestamp` TimestampNanosecond NOT NULL TIME INDEX, `app` STRING NULL INVERTED INDEX, `cluster` STRING NULL INVERTED INDEX, @@ -24,9 +26,13 @@ CREATE TABLE IF NOT EXISTS `public`.`logsbench` ( 'compaction.twcs.max_inactive_window_files' = '4', 'compaction.twcs.max_inactive_window_runs' = '2', ); + +select count(*) from app1; + +SELECT * FROM app1 ORDER BY greptime_timestamp ASC LIMIT 10\G ``` # then ingest ```bash -cargo run --bin=ingester -- --input-dir="/home/discord9/greptimedb/parquet_store" --parquet-dir="." --cfg="ingester.toml" --db-http-addr="http://127.0.0.1:4000" +RUST_LOG="debug" cargo run --bin=ingester -- --input-dir="/home/discord9/greptimedb/" --parquet-dir="parquet_store/" --cfg="ingester.toml" --db-http-addr="http://127.0.0.1:4000/v1/sst/ingest_json" ``` \ No newline at end of file diff --git a/ingester.toml b/ingester.toml index 4998387b07..51765255de 100644 --- a/ingester.toml +++ b/ingester.toml @@ -30,6 +30,6 @@ metadata_cache_tti = "5m" ## The data storage options. [storage] ## The working home directory. -data_home = "/tmp/greptimedb/" +data_home = "/tmp/greptimedb-cluster/datanode0" type = "File" [mito] \ No newline at end of file diff --git a/src/ingester/src/main.rs b/src/ingester/src/main.rs index 54e7502dbb..4923c76cda 100644 --- a/src/ingester/src/main.rs +++ b/src/ingester/src/main.rs @@ -13,6 +13,7 @@ // limitations under the License. use clap::Parser; +use common_telemetry::info; use common_time::timestamp::TimeUnit; use datanode::config::StorageConfig; use meta_client::MetaClientOptions; @@ -96,7 +97,7 @@ async fn main() { .list(&parquet_dir) .await .expect("Failed to list parquet files"); - + info!("Listed all files in parquet directory: {:?}", all_parquets); let all_parquets = all_parquets .iter() .filter(|parquet| parquet.name().ends_with(".parquet") && parquet.metadata().is_file()) @@ -106,11 +107,16 @@ async fn main() { .iter() .map(|parquet| { let full_table_name = parquet.name().split("-").next().unwrap(); - let mut names = full_table_name.split('.').rev(); + let mut names = full_table_name.split('_').rev(); let table_name = names.next().unwrap(); - let schema_name = names.next().unwrap(); + let schema_name = names.next().unwrap_or("public"); let catalog_name = names.next().unwrap_or("greptime"); + info!( + "catalog: {}, schema: {}, table: {}", + catalog_name, schema_name, table_name + ); + InputFile { catalog: catalog_name.to_string(), schema: schema_name.to_string(), @@ -140,11 +146,14 @@ async fn main() { .iter() .map(|entry| { let full_table_name = entry.name().split("-").next().unwrap(); - let mut names = full_table_name.split('.').rev(); + let mut names = full_table_name.split('_').rev(); let table_name = names.next().unwrap(); - let schema_name = names.next().unwrap(); + let schema_name = names.next().unwrap_or("public"); let catalog_name = names.next().unwrap_or("greptime"); - + info!( + "catalog: {}, schema: {}, table: {}", + catalog_name, schema_name, table_name + ); InputFile { catalog: catalog_name.to_string(), schema: schema_name.to_string(), @@ -198,7 +207,10 @@ async fn send_ingest_requests( ) -> Result<(), Box> { let client = reqwest::Client::new(); for req in reqs { - client.post(addr).json(&req).send().await?; + info!("ingesting sst: {req:?}"); + let req = client.post(addr).json(&req); + let resp = req.send().await?; + info!("ingest response: {resp:?}"); } Ok(()) } diff --git a/src/sst-convert/src/converter.rs b/src/sst-convert/src/converter.rs index b71411b0ec..81620ea6d9 100644 --- a/src/sst-convert/src/converter.rs +++ b/src/sst-convert/src/converter.rs @@ -16,6 +16,7 @@ use std::sync::Arc; +use common_telemetry::info; use datanode::config::StorageConfig; use datanode::datanode::DatanodeBuilder; use meta_client::MetaClientOptions; @@ -214,9 +215,10 @@ pub async fn new_object_store_manager(config: &StorageConfig) -> Result Result { let builder = Fs::default().root(path); - + info!("Creating input store, path: {}", path); let object_store = ObjectStore::new(builder) .context(ObjectStoreSnafu)? .finish(); + info!("Created input store: {:?}", object_store); Ok(object_store) }