feat: ingester(WIP)

This commit is contained in:
discord9
2025-03-09 16:57:26 +08:00
parent f929d751a5
commit ec8a15cadd
5 changed files with 262 additions and 1 deletions

18
Cargo.lock generated
View File

@@ -5628,6 +5628,24 @@ dependencies = [
"snafu 0.7.5",
]
[[package]]
name = "ingester"
version = "0.13.0"
dependencies = [
"clap 4.5.19",
"common-time",
"datanode",
"meta-client",
"mito2",
"object-store",
"reqwest",
"serde",
"serde_json",
"sst-convert",
"tokio",
"toml 0.8.19",
]
[[package]]
name = "inotify"
version = "0.9.6"

View File

@@ -41,6 +41,7 @@ members = [
"src/flow",
"src/frontend",
"src/index",
"src/ingester",
"src/log-query",
"src/log-store",
"src/meta-client",

22
src/ingester/Cargo.toml Normal file
View File

@@ -0,0 +1,22 @@
[package]
name = "ingester"
version.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
clap.workspace = true
common-time.workspace = true
datanode.workspace = true
meta-client.workspace = true
mito2.workspace = true
object-store.workspace = true
reqwest.workspace = true
serde.workspace = true
serde_json.workspace = true
sst-convert.workspace = true
tokio.workspace = true
toml.workspace = true
[lints]
workspace = true

220
src/ingester/src/main.rs Normal file
View File

@@ -0,0 +1,220 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use clap::Parser;
use common_time::timestamp::TimeUnit;
use datanode::config::StorageConfig;
use meta_client::MetaClientOptions;
use mito2::sst::file::IndexType;
use mito2::sst::parquet::SstInfo;
use object_store::ObjectStore;
use serde::{Deserialize, Serialize};
use sst_convert::converter::{InputFile, InputFileType, SstConverter, SstConverterBuilder};
#[derive(Parser, Debug)]
#[command(version, about = "Greptime Ingester", long_about = None)]
struct Args {
/// Input directory
input_dir: String,
/// Directory of input parquet files, relative to input_dir
#[arg(short, long)]
parquet_dir: Option<String>,
/// Directory of input json files, relative to input_dir
#[arg(short, long)]
json_dir: Option<String>,
/// Output storage config file
#[arg(short, long)]
output_storage_config_file: String,
/// Meta client config file
#[arg(short, long)]
meta_client_config_file: String,
/// DB HTTP address
#[arg(short, long)]
db_http_addr: String,
}
#[allow(unreachable_code)]
#[tokio::main]
async fn main() {
let args = Args::parse();
let meta_client_config = std::fs::read_to_string(&args.meta_client_config_file)
.expect("Failed to read meta client config file");
let meta_options: MetaClientOptions =
toml::from_str(&meta_client_config).expect("Failed to parse meta client config");
let storage_config = std::fs::read_to_string(&args.output_storage_config_file)
.expect("Failed to read storage config file");
let storage_config: StorageConfig =
toml::from_str(&storage_config).expect("Failed to parse storage config");
// TODO: build sst converter
let sst_converter = SstConverterBuilder::new_fs(args.input_dir)
.with_meta_options(meta_options)
.with_storage_config(storage_config)
.build()
.await
.expect("Failed to build sst converter");
let input_store: &ObjectStore = &sst_converter.input_store;
if let Some(parquet_dir) = args.parquet_dir {
// using opendal to read parquet files in given input object store
let all_parquets = input_store
.list(&parquet_dir)
.await
.expect("Failed to list parquet files");
let input_files = all_parquets
.iter()
.map(|parquet| {
let full_table_name = parquet.name().split("-").next().unwrap();
let mut names = full_table_name.split('.').rev();
let table_name = names.next().unwrap();
let schema_name = names.next().unwrap();
let catalog_name = names.next().unwrap_or("greptime");
InputFile {
catalog: catalog_name.to_string(),
schema: schema_name.to_string(),
table: table_name.to_string(),
path: parquet.path().to_string(),
file_type: InputFileType::Parquet,
}
})
.collect::<Vec<_>>();
convert_and_send(&input_files, &sst_converter, &args.db_http_addr).await;
}
if let Some(json_dir) = args.json_dir {
// using opendal to read json files in given input object store
let all_jsons = input_store
.list(&json_dir)
.await
.expect("Failed to list json files");
let input_files = all_jsons
.iter()
.map(|entry| {
let full_table_name = entry.name().split("-").next().unwrap();
let mut names = full_table_name.split('.').rev();
let table_name = names.next().unwrap();
let schema_name = names.next().unwrap();
let catalog_name = names.next().unwrap_or("greptime");
InputFile {
catalog: catalog_name.to_string(),
schema: schema_name.to_string(),
table: table_name.to_string(),
path: entry.path().to_string(),
file_type: InputFileType::RemoteWrite,
}
})
.collect::<Vec<_>>();
convert_and_send(&input_files, &sst_converter, &args.db_http_addr).await;
}
}
async fn convert_and_send(
input_files: &[InputFile],
sst_converter: &SstConverter,
db_http_addr: &str,
) {
let table_names = input_files
.iter()
.map(|f| (f.schema.clone(), f.table.clone()))
.collect::<Vec<_>>();
let sst_infos = sst_converter
.convert(input_files)
.await
.expect("Failed to convert parquet files");
let ingest_reqs = table_names
.iter()
.zip(sst_infos.iter())
.flat_map(|(schema_name, sst_info)| {
sst_info
.ssts
.iter()
.map(|sst| to_ingest_sst_req(&schema_name.0, &schema_name.1, sst))
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();
// send ingest requests to DB
send_ingest_requests(db_http_addr, ingest_reqs)
.await
.unwrap();
}
async fn send_ingest_requests(
addr: &str,
reqs: Vec<ClientIngestSstRequest>,
) -> Result<(), Box<dyn std::error::Error>> {
let client = reqwest::Client::new();
for req in reqs {
client.post(addr).json(&req).send().await?;
}
Ok(())
}
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct ClientIngestSstRequest {
schema: Option<String>,
table: String,
pub(crate) file_id: String,
pub(crate) min_ts: i64,
pub(crate) max_ts: i64,
pub(crate) file_size: u64,
pub(crate) rows: u32,
pub(crate) row_groups: u32,
/// Available indexes of the file.
pub available_indexes: Vec<IndexType>,
/// Size of the index file.
pub index_file_size: u64,
}
fn to_ingest_sst_req(
schema_name: &str,
table_name: &str,
sst_info: &SstInfo,
) -> ClientIngestSstRequest {
let index_file_size = sst_info.index_metadata.file_size;
let available_indexs = sst_info.index_metadata.build_available_indexes();
ClientIngestSstRequest {
schema: Some(schema_name.to_string()),
table: table_name.to_string(),
file_id: sst_info.file_id.to_string(),
min_ts: sst_info
.time_range
.0
.convert_to(TimeUnit::Microsecond)
.unwrap()
.value(),
max_ts: sst_info
.time_range
.1
.convert_to(TimeUnit::Microsecond)
.unwrap()
.value(),
file_size: sst_info.file_size,
rows: sst_info.num_rows as _,
row_groups: sst_info.num_row_groups as _,
available_indexes: available_indexs.to_vec(),
index_file_size,
}
}

View File

@@ -59,7 +59,7 @@ pub struct OutputSst {
/// SST converter takes a list of source files and converts them to SST files.
pub struct SstConverter {
/// Object store for input files.
input_store: ObjectStore,
pub input_store: ObjectStore,
/// Object store manager for output files.
output_store_manager: ObjectStoreManagerRef,
/// Helper to get table meta.