From bc9614e22c0fd1fef4f23ad98790d0e038e074bc Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 10 Mar 2025 20:59:38 +0800 Subject: [PATCH] feat: file parallel --- chore.md | 5 +++- src/ingester/src/main.rs | 51 ++++++++++++++++++++++++-------- src/mito2/src/sst/parquet.rs | 1 + src/sst-convert/src/converter.rs | 8 +++-- 4 files changed, 50 insertions(+), 15 deletions(-) diff --git a/chore.md b/chore.md index cf9641d0dd..6628ab66ba 100644 --- a/chore.md +++ b/chore.md @@ -40,13 +40,16 @@ RUST_LOG="debug" cargo run --bin=ingester -- --input-dir="/home/discord9/greptim # metrics!!!!!!! ```bash -mysql --host=127.0.0.1 --port=19195 --database=public < output.sql +mysql --host=127.0.0.1 --port=19195 --database=public < public.greptime_physical_table-create-tables.sql ``` ## then ingest ```bash RUST_LOG="debug" cargo run --bin=ingester -- --input-dir="/home/discord9/greptimedb/parquet_store_bk/" --remote-write-dir="metrics_parquet/" --cfg="ingester.toml" --db-http-addr="http://127.0.0.1:4000/v1/sst/ingest_json" +# perf it +cargo build --release ---bin=ingester +samply record target/release/ingester --input-dir="/home/discord9/greptimedb/parquet_store_bk/" --remote-write-dir="metrics_parquet/" --cfg="ingester.toml" --db-http-addr="http://127.0.0.1:4000/v1/sst/ingest_json" ``` ## check data diff --git a/src/ingester/src/main.rs b/src/ingester/src/main.rs index d65043b1bf..4c4e217bd6 100644 --- a/src/ingester/src/main.rs +++ b/src/ingester/src/main.rs @@ -21,7 +21,8 @@ use mito2::config::MitoConfig; use mito2::sst::file::IndexType; use mito2::sst::parquet::SstInfo; use serde::{Deserialize, Serialize}; -use sst_convert::converter::{InputFile, InputFileType, SstConverter, SstConverterBuilder}; +use sst_convert::converter::{InputFile, InputFileType, SstConverterBuilder}; +use tokio::sync::oneshot; #[derive(Parser, Debug)] #[command(version, about = "Greptime Ingester", long_about = None)] @@ -73,7 +74,7 @@ async fn main() { let cfg_file = std::fs::read_to_string(&args.cfg).expect("Failed to read config file"); let cfg: IngesterConfig = toml::from_str(&cfg_file).expect("Failed to parse config"); - let mut sst_converter = { + let sst_builder = { let mut builder = SstConverterBuilder::new_fs(args.input_dir) .with_meta_options(cfg.meta_client) .with_storage_config(cfg.storage) @@ -84,11 +85,14 @@ async fn main() { } builder - .build() - .await - .expect("Failed to build sst converter") }; + let sst_converter = sst_builder + .clone() + .build() + .await + .expect("Failed to build sst converter"); + let input_store = sst_converter.input_store.clone(); if let Some(parquet_dir) = args.parquet_dir { @@ -124,7 +128,7 @@ async fn main() { }) .collect::>(); - convert_and_send(&input_files, &mut sst_converter, &args.db_http_addr).await; + convert_and_send(&input_files, sst_builder.clone(), &args.db_http_addr).await; } if let Some(remote_write_dir) = args.remote_write_dir { @@ -159,24 +163,47 @@ async fn main() { }) .collect::>(); - convert_and_send(&input_files, &mut sst_converter, &args.db_http_addr).await; + convert_and_send(&input_files, sst_builder.clone(), &args.db_http_addr).await; } } async fn convert_and_send( input_files: &[InputFile], - sst_converter: &mut SstConverter, + sst_builder: SstConverterBuilder, db_http_addr: &str, ) { let table_names = input_files .iter() .map(|f| (f.schema.clone(), f.table.clone())) .collect::>(); + let mut rxs = Vec::new(); - let sst_infos = sst_converter - .convert(input_files) - .await - .expect("Failed to convert parquet files"); + // Spawn a task for each input file + info!("Spawning tasks for {} input files", input_files.len()); + for input_file in input_files.iter() { + let (tx, rx) = oneshot::channel(); + let sst_builder = sst_builder.clone(); + let input_file = (*input_file).clone(); + tokio::task::spawn(async move { + let mut sst_converter = sst_builder + .build() + .await + .expect("Failed to build sst converter"); + let sst_info = sst_converter + .convert_one(&input_file) + .await + .expect("Failed to convert parquet files"); + tx.send(sst_info).unwrap(); + }); + rxs.push(rx); + } + + let mut sst_infos = Vec::new(); + for rx in rxs { + sst_infos.push(rx.await.unwrap()); + } + + info!("Converted {} input files", sst_infos.len()); let ingest_reqs = table_names .iter() diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 14496312e3..00a8b3ab3a 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -61,6 +61,7 @@ impl Default for WriteOptions { } /// Parquet SST info returned by the writer. +#[derive(Debug)] pub struct SstInfo { /// SST file id. pub file_id: FileId, diff --git a/src/sst-convert/src/converter.rs b/src/sst-convert/src/converter.rs index cebc011c49..e361df497e 100644 --- a/src/sst-convert/src/converter.rs +++ b/src/sst-convert/src/converter.rs @@ -35,7 +35,7 @@ use crate::table::TableMetadataHelper; use crate::writer::RegionWriterBuilder; /// Input file type. -#[derive(Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq)] pub enum InputFileType { /// File type is Parquet. Parquet, @@ -44,6 +44,7 @@ pub enum InputFileType { } /// Description of a file to convert. +#[derive(Debug, Clone)] pub struct InputFile { /// Catalog of the table. pub catalog: String, @@ -60,6 +61,7 @@ pub struct InputFile { /// Description of converted files for an input file. /// A input file can be converted to multiple output files. +#[derive(Debug)] pub struct OutputSst { /// Meta of output SST files. pub ssts: SstInfoArray, @@ -99,7 +101,7 @@ impl SstConverter { } /// Converts one input. - async fn convert_one(&mut self, input: &InputFile) -> Result { + pub async fn convert_one(&mut self, input: &InputFile) -> Result { common_telemetry::info!( "Converting input file, input_path: {}, output_path: {}", input.path, @@ -122,11 +124,13 @@ impl SstConverter { .write_sst(source, &self.write_opts) .await .context(MitoSnafu)?; + common_telemetry::info!("Converted input file, input_path: {}", input.path); Ok(OutputSst { ssts }) } } /// Builder to build a SST converter. +#[derive(Clone)] pub struct SstConverterBuilder { input_path: String, meta_options: MetaClientOptions,