feat: file parallel

This commit is contained in:
discord9
2025-03-10 20:59:38 +08:00
parent 7dd9e98ff6
commit bc9614e22c
4 changed files with 50 additions and 15 deletions

View File

@@ -40,13 +40,16 @@ RUST_LOG="debug" cargo run --bin=ingester -- --input-dir="/home/discord9/greptim
# metrics!!!!!!!
```bash
mysql --host=127.0.0.1 --port=19195 --database=public < output.sql
mysql --host=127.0.0.1 --port=19195 --database=public < public.greptime_physical_table-create-tables.sql
```
## then ingest
```bash
RUST_LOG="debug"
cargo run --bin=ingester -- --input-dir="/home/discord9/greptimedb/parquet_store_bk/" --remote-write-dir="metrics_parquet/" --cfg="ingester.toml" --db-http-addr="http://127.0.0.1:4000/v1/sst/ingest_json"
# perf it
cargo build --release ---bin=ingester
samply record target/release/ingester --input-dir="/home/discord9/greptimedb/parquet_store_bk/" --remote-write-dir="metrics_parquet/" --cfg="ingester.toml" --db-http-addr="http://127.0.0.1:4000/v1/sst/ingest_json"
```
## check data

View File

@@ -21,7 +21,8 @@ use mito2::config::MitoConfig;
use mito2::sst::file::IndexType;
use mito2::sst::parquet::SstInfo;
use serde::{Deserialize, Serialize};
use sst_convert::converter::{InputFile, InputFileType, SstConverter, SstConverterBuilder};
use sst_convert::converter::{InputFile, InputFileType, SstConverterBuilder};
use tokio::sync::oneshot;
#[derive(Parser, Debug)]
#[command(version, about = "Greptime Ingester", long_about = None)]
@@ -73,7 +74,7 @@ async fn main() {
let cfg_file = std::fs::read_to_string(&args.cfg).expect("Failed to read config file");
let cfg: IngesterConfig = toml::from_str(&cfg_file).expect("Failed to parse config");
let mut sst_converter = {
let sst_builder = {
let mut builder = SstConverterBuilder::new_fs(args.input_dir)
.with_meta_options(cfg.meta_client)
.with_storage_config(cfg.storage)
@@ -84,11 +85,14 @@ async fn main() {
}
builder
.build()
.await
.expect("Failed to build sst converter")
};
let sst_converter = sst_builder
.clone()
.build()
.await
.expect("Failed to build sst converter");
let input_store = sst_converter.input_store.clone();
if let Some(parquet_dir) = args.parquet_dir {
@@ -124,7 +128,7 @@ async fn main() {
})
.collect::<Vec<_>>();
convert_and_send(&input_files, &mut sst_converter, &args.db_http_addr).await;
convert_and_send(&input_files, sst_builder.clone(), &args.db_http_addr).await;
}
if let Some(remote_write_dir) = args.remote_write_dir {
@@ -159,24 +163,47 @@ async fn main() {
})
.collect::<Vec<_>>();
convert_and_send(&input_files, &mut sst_converter, &args.db_http_addr).await;
convert_and_send(&input_files, sst_builder.clone(), &args.db_http_addr).await;
}
}
async fn convert_and_send(
input_files: &[InputFile],
sst_converter: &mut SstConverter,
sst_builder: SstConverterBuilder,
db_http_addr: &str,
) {
let table_names = input_files
.iter()
.map(|f| (f.schema.clone(), f.table.clone()))
.collect::<Vec<_>>();
let mut rxs = Vec::new();
let sst_infos = sst_converter
.convert(input_files)
.await
.expect("Failed to convert parquet files");
// Spawn a task for each input file
info!("Spawning tasks for {} input files", input_files.len());
for input_file in input_files.iter() {
let (tx, rx) = oneshot::channel();
let sst_builder = sst_builder.clone();
let input_file = (*input_file).clone();
tokio::task::spawn(async move {
let mut sst_converter = sst_builder
.build()
.await
.expect("Failed to build sst converter");
let sst_info = sst_converter
.convert_one(&input_file)
.await
.expect("Failed to convert parquet files");
tx.send(sst_info).unwrap();
});
rxs.push(rx);
}
let mut sst_infos = Vec::new();
for rx in rxs {
sst_infos.push(rx.await.unwrap());
}
info!("Converted {} input files", sst_infos.len());
let ingest_reqs = table_names
.iter()

View File

@@ -61,6 +61,7 @@ impl Default for WriteOptions {
}
/// Parquet SST info returned by the writer.
#[derive(Debug)]
pub struct SstInfo {
/// SST file id.
pub file_id: FileId,

View File

@@ -35,7 +35,7 @@ use crate::table::TableMetadataHelper;
use crate::writer::RegionWriterBuilder;
/// Input file type.
#[derive(Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum InputFileType {
/// File type is Parquet.
Parquet,
@@ -44,6 +44,7 @@ pub enum InputFileType {
}
/// Description of a file to convert.
#[derive(Debug, Clone)]
pub struct InputFile {
/// Catalog of the table.
pub catalog: String,
@@ -60,6 +61,7 @@ pub struct InputFile {
/// Description of converted files for an input file.
/// A input file can be converted to multiple output files.
#[derive(Debug)]
pub struct OutputSst {
/// Meta of output SST files.
pub ssts: SstInfoArray,
@@ -99,7 +101,7 @@ impl SstConverter {
}
/// Converts one input.
async fn convert_one(&mut self, input: &InputFile) -> Result<OutputSst> {
pub async fn convert_one(&mut self, input: &InputFile) -> Result<OutputSst> {
common_telemetry::info!(
"Converting input file, input_path: {}, output_path: {}",
input.path,
@@ -122,11 +124,13 @@ impl SstConverter {
.write_sst(source, &self.write_opts)
.await
.context(MitoSnafu)?;
common_telemetry::info!("Converted input file, input_path: {}", input.path);
Ok(OutputSst { ssts })
}
}
/// Builder to build a SST converter.
#[derive(Clone)]
pub struct SstConverterBuilder {
input_path: String,
meta_options: MetaClientOptions,