diff --git a/src/ingester/src/main.rs b/src/ingester/src/main.rs index 01fc902a70..d65043b1bf 100644 --- a/src/ingester/src/main.rs +++ b/src/ingester/src/main.rs @@ -34,7 +34,7 @@ struct Args { parquet_dir: Option, /// Directory of input json files, relative to input_dir #[arg(short, long)] - json_dir: Option, + remote_write_dir: Option, /// Config file #[arg(short, long)] cfg: String, @@ -127,22 +127,22 @@ async fn main() { convert_and_send(&input_files, &mut sst_converter, &args.db_http_addr).await; } - if let Some(json_dir) = args.json_dir { - // using opendal to read json files in given input object store - let all_jsons = input_store - .list(&json_dir) + if let Some(remote_write_dir) = args.remote_write_dir { + // using opendal to read parquet files in given input object store + let all_parquets = input_store + .list(&remote_write_dir) .await - .expect("Failed to list json files"); + .expect("Failed to list parquet files"); - let all_jsons = all_jsons + let all_parquets = all_parquets .iter() - .filter(|json| json.name().ends_with(".json") && json.metadata().is_file()) + .filter(|parquet| parquet.name().ends_with(".parquet") && parquet.metadata().is_file()) .collect::>(); - let input_files = all_jsons + let input_files = all_parquets .iter() - .map(|entry| { - let full_table_name = entry.name().split("-").next().unwrap(); + .map(|parquet| { + let full_table_name = parquet.name().split("-").next().unwrap(); let (catalog_name, schema_name, table_name) = extract_name(full_table_name); info!( @@ -153,7 +153,7 @@ async fn main() { catalog: catalog_name.to_string(), schema: schema_name.to_string(), table: table_name.to_string(), - path: entry.path().to_string(), + path: parquet.path().to_string(), file_type: InputFileType::RemoteWrite, } })