feat: time unit

This commit is contained in:
discord9
2025-03-10 11:25:08 +08:00
parent 8aadd1e59a
commit 738c23beb0
2 changed files with 26 additions and 22 deletions

View File

@@ -29,10 +29,10 @@ CREATE TABLE IF NOT EXISTS `app1` (
select count(*) from app1;
SELECT * FROM app1 ORDER BY greptime_timestamp ASC LIMIT 10\G
SELECT * FROM app1 ORDER BY greptime_timestamp DESC LIMIT 10\G
```
# then ingest
```bash
RUST_LOG="debug" cargo run --bin=ingester -- --input-dir="/home/discord9/greptimedb/" --parquet-dir="parquet_store/" --cfg="ingester.toml" --db-http-addr="http://127.0.0.1:4000/v1/sst/ingest_json"
RUST_LOG="debug" cargo run --bin=ingester -- --input-dir="/home/discord9/greptimedb/parquet_store_bk/" --parquet-dir="parquet_store/" --cfg="ingester.toml" --db-http-addr="http://127.0.0.1:4000/v1/sst/ingest_json"
```

View File

@@ -107,10 +107,7 @@ async fn main() {
.iter()
.map(|parquet| {
let full_table_name = parquet.name().split("-").next().unwrap();
let mut names = full_table_name.split('_').rev();
let table_name = names.next().unwrap();
let schema_name = names.next().unwrap_or("public");
let catalog_name = names.next().unwrap_or("greptime");
let (catalog_name, schema_name, table_name) = extract_name(full_table_name);
info!(
"catalog: {}, schema: {}, table: {}",
@@ -146,10 +143,8 @@ async fn main() {
.iter()
.map(|entry| {
let full_table_name = entry.name().split("-").next().unwrap();
let mut names = full_table_name.split('_').rev();
let table_name = names.next().unwrap();
let schema_name = names.next().unwrap_or("public");
let catalog_name = names.next().unwrap_or("greptime");
let (catalog_name, schema_name, table_name) = extract_name(full_table_name);
info!(
"catalog: {}, schema: {}, table: {}",
catalog_name, schema_name, table_name
@@ -201,6 +196,18 @@ async fn convert_and_send(
.unwrap();
}
fn extract_name(full_table_name: &str) -> (String, String, String) {
let mut names = full_table_name.split('.').rev();
let table_name = names.next().unwrap();
let schema_name = names.next().unwrap_or("public");
let catalog_name = names.next().unwrap_or("greptime");
(
catalog_name.to_string(),
schema_name.to_string(),
table_name.to_string(),
)
}
async fn send_ingest_requests(
addr: &str,
reqs: Vec<ClientIngestSstRequest>,
@@ -229,6 +236,7 @@ pub(crate) struct ClientIngestSstRequest {
pub available_indexes: Vec<IndexType>,
/// Size of the index file.
pub index_file_size: u64,
pub time_unit: u32,
}
fn to_ingest_sst_req(
@@ -242,22 +250,18 @@ fn to_ingest_sst_req(
schema: Some(schema_name.to_string()),
table: table_name.to_string(),
file_id: sst_info.file_id.to_string(),
min_ts: sst_info
.time_range
.0
.convert_to(TimeUnit::Microsecond)
.unwrap()
.value(),
max_ts: sst_info
.time_range
.1
.convert_to(TimeUnit::Microsecond)
.unwrap()
.value(),
min_ts: sst_info.time_range.0.value(),
max_ts: sst_info.time_range.1.value(),
file_size: sst_info.file_size,
rows: sst_info.num_rows as _,
row_groups: sst_info.num_row_groups as _,
available_indexes: available_indexs.to_vec(),
index_file_size,
time_unit: match sst_info.time_range.0.unit() {
TimeUnit::Second => 0,
TimeUnit::Millisecond => 3,
TimeUnit::Microsecond => 6,
TimeUnit::Nanosecond => 9,
},
}
}