diff --git a/chore.md b/chore.md index 0d50b78d58..9edb43221a 100644 --- a/chore.md +++ b/chore.md @@ -29,10 +29,10 @@ CREATE TABLE IF NOT EXISTS `app1` ( select count(*) from app1; -SELECT * FROM app1 ORDER BY greptime_timestamp ASC LIMIT 10\G +SELECT * FROM app1 ORDER BY greptime_timestamp DESC LIMIT 10\G ``` # then ingest ```bash -RUST_LOG="debug" cargo run --bin=ingester -- --input-dir="/home/discord9/greptimedb/" --parquet-dir="parquet_store/" --cfg="ingester.toml" --db-http-addr="http://127.0.0.1:4000/v1/sst/ingest_json" +RUST_LOG="debug" cargo run --bin=ingester -- --input-dir="/home/discord9/greptimedb/parquet_store_bk/" --parquet-dir="parquet_store/" --cfg="ingester.toml" --db-http-addr="http://127.0.0.1:4000/v1/sst/ingest_json" ``` \ No newline at end of file diff --git a/src/ingester/src/main.rs b/src/ingester/src/main.rs index 4923c76cda..01fc902a70 100644 --- a/src/ingester/src/main.rs +++ b/src/ingester/src/main.rs @@ -107,10 +107,7 @@ async fn main() { .iter() .map(|parquet| { let full_table_name = parquet.name().split("-").next().unwrap(); - let mut names = full_table_name.split('_').rev(); - let table_name = names.next().unwrap(); - let schema_name = names.next().unwrap_or("public"); - let catalog_name = names.next().unwrap_or("greptime"); + let (catalog_name, schema_name, table_name) = extract_name(full_table_name); info!( "catalog: {}, schema: {}, table: {}", @@ -146,10 +143,8 @@ async fn main() { .iter() .map(|entry| { let full_table_name = entry.name().split("-").next().unwrap(); - let mut names = full_table_name.split('_').rev(); - let table_name = names.next().unwrap(); - let schema_name = names.next().unwrap_or("public"); - let catalog_name = names.next().unwrap_or("greptime"); + let (catalog_name, schema_name, table_name) = extract_name(full_table_name); + info!( "catalog: {}, schema: {}, table: {}", catalog_name, schema_name, table_name @@ -201,6 +196,18 @@ async fn convert_and_send( .unwrap(); } +fn extract_name(full_table_name: &str) -> (String, String, String) { + let mut names = full_table_name.split('.').rev(); + let table_name = names.next().unwrap(); + let schema_name = names.next().unwrap_or("public"); + let catalog_name = names.next().unwrap_or("greptime"); + ( + catalog_name.to_string(), + schema_name.to_string(), + table_name.to_string(), + ) +} + async fn send_ingest_requests( addr: &str, reqs: Vec, @@ -229,6 +236,7 @@ pub(crate) struct ClientIngestSstRequest { pub available_indexes: Vec, /// Size of the index file. pub index_file_size: u64, + pub time_unit: u32, } fn to_ingest_sst_req( @@ -242,22 +250,18 @@ fn to_ingest_sst_req( schema: Some(schema_name.to_string()), table: table_name.to_string(), file_id: sst_info.file_id.to_string(), - min_ts: sst_info - .time_range - .0 - .convert_to(TimeUnit::Microsecond) - .unwrap() - .value(), - max_ts: sst_info - .time_range - .1 - .convert_to(TimeUnit::Microsecond) - .unwrap() - .value(), + min_ts: sst_info.time_range.0.value(), + max_ts: sst_info.time_range.1.value(), file_size: sst_info.file_size, rows: sst_info.num_rows as _, row_groups: sst_info.num_row_groups as _, available_indexes: available_indexs.to_vec(), index_file_size, + time_unit: match sst_info.time_range.0.unit() { + TimeUnit::Second => 0, + TimeUnit::Millisecond => 3, + TimeUnit::Microsecond => 6, + TimeUnit::Nanosecond => 9, + }, } }