From ec8a15caddf9e5a95cf5bc61e567c866ac18bccb Mon Sep 17 00:00:00 2001 From: discord9 Date: Sun, 9 Mar 2025 16:57:26 +0800 Subject: [PATCH] feat: ingester(WIP) --- Cargo.lock | 18 +++ Cargo.toml | 1 + src/ingester/Cargo.toml | 22 ++++ src/ingester/src/main.rs | 220 +++++++++++++++++++++++++++++++ src/sst-convert/src/converter.rs | 2 +- 5 files changed, 262 insertions(+), 1 deletion(-) create mode 100644 src/ingester/Cargo.toml create mode 100644 src/ingester/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index 2fec91b987..11ba34bd71 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5628,6 +5628,24 @@ dependencies = [ "snafu 0.7.5", ] +[[package]] +name = "ingester" +version = "0.13.0" +dependencies = [ + "clap 4.5.19", + "common-time", + "datanode", + "meta-client", + "mito2", + "object-store", + "reqwest", + "serde", + "serde_json", + "sst-convert", + "tokio", + "toml 0.8.19", +] + [[package]] name = "inotify" version = "0.9.6" diff --git a/Cargo.toml b/Cargo.toml index 2da9147a51..c7cbed5991 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,7 @@ members = [ "src/flow", "src/frontend", "src/index", + "src/ingester", "src/log-query", "src/log-store", "src/meta-client", diff --git a/src/ingester/Cargo.toml b/src/ingester/Cargo.toml new file mode 100644 index 0000000000..576e7b21db --- /dev/null +++ b/src/ingester/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "ingester" +version.workspace = true +edition.workspace = true +license.workspace = true + +[dependencies] +clap.workspace = true +common-time.workspace = true +datanode.workspace = true +meta-client.workspace = true +mito2.workspace = true +object-store.workspace = true +reqwest.workspace = true +serde.workspace = true +serde_json.workspace = true +sst-convert.workspace = true +tokio.workspace = true +toml.workspace = true + +[lints] +workspace = true diff --git a/src/ingester/src/main.rs b/src/ingester/src/main.rs new file mode 100644 index 0000000000..276cdcf1e1 --- /dev/null +++ b/src/ingester/src/main.rs @@ -0,0 +1,220 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use clap::Parser; +use common_time::timestamp::TimeUnit; +use datanode::config::StorageConfig; +use meta_client::MetaClientOptions; +use mito2::sst::file::IndexType; +use mito2::sst::parquet::SstInfo; +use object_store::ObjectStore; +use serde::{Deserialize, Serialize}; +use sst_convert::converter::{InputFile, InputFileType, SstConverter, SstConverterBuilder}; + +#[derive(Parser, Debug)] +#[command(version, about = "Greptime Ingester", long_about = None)] +struct Args { + /// Input directory + input_dir: String, + /// Directory of input parquet files, relative to input_dir + #[arg(short, long)] + parquet_dir: Option, + /// Directory of input json files, relative to input_dir + #[arg(short, long)] + json_dir: Option, + /// Output storage config file + #[arg(short, long)] + output_storage_config_file: String, + /// Meta client config file + #[arg(short, long)] + meta_client_config_file: String, + /// DB HTTP address + #[arg(short, long)] + db_http_addr: String, +} + +#[allow(unreachable_code)] +#[tokio::main] +async fn main() { + let args = Args::parse(); + + let meta_client_config = std::fs::read_to_string(&args.meta_client_config_file) + .expect("Failed to read meta client config file"); + let meta_options: MetaClientOptions = + toml::from_str(&meta_client_config).expect("Failed to parse meta client config"); + + let storage_config = std::fs::read_to_string(&args.output_storage_config_file) + .expect("Failed to read storage config file"); + let storage_config: StorageConfig = + toml::from_str(&storage_config).expect("Failed to parse storage config"); + + // TODO: build sst converter + let sst_converter = SstConverterBuilder::new_fs(args.input_dir) + .with_meta_options(meta_options) + .with_storage_config(storage_config) + .build() + .await + .expect("Failed to build sst converter"); + + let input_store: &ObjectStore = &sst_converter.input_store; + + if let Some(parquet_dir) = args.parquet_dir { + // using opendal to read parquet files in given input object store + let all_parquets = input_store + .list(&parquet_dir) + .await + .expect("Failed to list parquet files"); + + let input_files = all_parquets + .iter() + .map(|parquet| { + let full_table_name = parquet.name().split("-").next().unwrap(); + let mut names = full_table_name.split('.').rev(); + let table_name = names.next().unwrap(); + let schema_name = names.next().unwrap(); + let catalog_name = names.next().unwrap_or("greptime"); + + InputFile { + catalog: catalog_name.to_string(), + schema: schema_name.to_string(), + table: table_name.to_string(), + path: parquet.path().to_string(), + file_type: InputFileType::Parquet, + } + }) + .collect::>(); + + convert_and_send(&input_files, &sst_converter, &args.db_http_addr).await; + } + + if let Some(json_dir) = args.json_dir { + // using opendal to read json files in given input object store + let all_jsons = input_store + .list(&json_dir) + .await + .expect("Failed to list json files"); + + let input_files = all_jsons + .iter() + .map(|entry| { + let full_table_name = entry.name().split("-").next().unwrap(); + let mut names = full_table_name.split('.').rev(); + let table_name = names.next().unwrap(); + let schema_name = names.next().unwrap(); + let catalog_name = names.next().unwrap_or("greptime"); + + InputFile { + catalog: catalog_name.to_string(), + schema: schema_name.to_string(), + table: table_name.to_string(), + path: entry.path().to_string(), + file_type: InputFileType::RemoteWrite, + } + }) + .collect::>(); + + convert_and_send(&input_files, &sst_converter, &args.db_http_addr).await; + } +} + +async fn convert_and_send( + input_files: &[InputFile], + sst_converter: &SstConverter, + db_http_addr: &str, +) { + let table_names = input_files + .iter() + .map(|f| (f.schema.clone(), f.table.clone())) + .collect::>(); + + let sst_infos = sst_converter + .convert(input_files) + .await + .expect("Failed to convert parquet files"); + + let ingest_reqs = table_names + .iter() + .zip(sst_infos.iter()) + .flat_map(|(schema_name, sst_info)| { + sst_info + .ssts + .iter() + .map(|sst| to_ingest_sst_req(&schema_name.0, &schema_name.1, sst)) + .collect::>() + }) + .collect::>(); + + // send ingest requests to DB + send_ingest_requests(db_http_addr, ingest_reqs) + .await + .unwrap(); +} + +async fn send_ingest_requests( + addr: &str, + reqs: Vec, +) -> Result<(), Box> { + let client = reqwest::Client::new(); + for req in reqs { + client.post(addr).json(&req).send().await?; + } + Ok(()) +} + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct ClientIngestSstRequest { + schema: Option, + table: String, + pub(crate) file_id: String, + pub(crate) min_ts: i64, + pub(crate) max_ts: i64, + pub(crate) file_size: u64, + pub(crate) rows: u32, + pub(crate) row_groups: u32, + /// Available indexes of the file. + pub available_indexes: Vec, + /// Size of the index file. + pub index_file_size: u64, +} + +fn to_ingest_sst_req( + schema_name: &str, + table_name: &str, + sst_info: &SstInfo, +) -> ClientIngestSstRequest { + let index_file_size = sst_info.index_metadata.file_size; + let available_indexs = sst_info.index_metadata.build_available_indexes(); + ClientIngestSstRequest { + schema: Some(schema_name.to_string()), + table: table_name.to_string(), + file_id: sst_info.file_id.to_string(), + min_ts: sst_info + .time_range + .0 + .convert_to(TimeUnit::Microsecond) + .unwrap() + .value(), + max_ts: sst_info + .time_range + .1 + .convert_to(TimeUnit::Microsecond) + .unwrap() + .value(), + file_size: sst_info.file_size, + rows: sst_info.num_rows as _, + row_groups: sst_info.num_row_groups as _, + available_indexes: available_indexs.to_vec(), + index_file_size, + } +} diff --git a/src/sst-convert/src/converter.rs b/src/sst-convert/src/converter.rs index 1c6ec87ca8..f754ecbb6a 100644 --- a/src/sst-convert/src/converter.rs +++ b/src/sst-convert/src/converter.rs @@ -59,7 +59,7 @@ pub struct OutputSst { /// SST converter takes a list of source files and converts them to SST files. pub struct SstConverter { /// Object store for input files. - input_store: ObjectStore, + pub input_store: ObjectStore, /// Object store manager for output files. output_store_manager: ObjectStoreManagerRef, /// Helper to get table meta.