From b650743785f8e5f66d4639a3e87b6155a93eb6e2 Mon Sep 17 00:00:00 2001 From: evenyag Date: Sun, 9 Mar 2025 19:51:46 +0800 Subject: [PATCH] feat: implement converter convert --- src/ingester/src/main.rs | 11 +- src/mito2/src/region/opener.rs | 13 +- src/sst-convert/src/converter.rs | 72 +++++++--- src/sst-convert/src/reader.rs | 159 +++++++++++++++++++++ src/sst-convert/src/reader/parquet.rs | 19 ++- src/sst-convert/src/reader/remote_write.rs | 10 +- src/sst-convert/src/writer.rs | 22 +-- 7 files changed, 244 insertions(+), 62 deletions(-) diff --git a/src/ingester/src/main.rs b/src/ingester/src/main.rs index b25e5f212e..1c7b36def7 100644 --- a/src/ingester/src/main.rs +++ b/src/ingester/src/main.rs @@ -18,7 +18,6 @@ use datanode::config::StorageConfig; use meta_client::MetaClientOptions; use mito2::sst::file::IndexType; use mito2::sst::parquet::SstInfo; -use object_store::ObjectStore; use serde::{Deserialize, Serialize}; use sst_convert::converter::{InputFile, InputFileType, SstConverter, SstConverterBuilder}; @@ -68,7 +67,7 @@ async fn main() { toml::from_str(&storage_config).expect("Failed to parse storage config"); // TODO: build sst converter - let sst_converter = { + let mut sst_converter = { let mut builder = SstConverterBuilder::new_fs(args.input_dir) .with_meta_options(meta_options) .with_storage_config(storage_config); @@ -83,7 +82,7 @@ async fn main() { .expect("Failed to build sst converter") }; - let input_store: &ObjectStore = &sst_converter.input_store; + let input_store = sst_converter.input_store.clone(); if let Some(parquet_dir) = args.parquet_dir { // using opendal to read parquet files in given input object store @@ -111,7 +110,7 @@ async fn main() { }) .collect::>(); - convert_and_send(&input_files, &sst_converter, &args.db_http_addr).await; + convert_and_send(&input_files, &mut sst_converter, &args.db_http_addr).await; } if let Some(json_dir) = args.json_dir { @@ -140,13 +139,13 @@ async fn main() { }) .collect::>(); - convert_and_send(&input_files, &sst_converter, &args.db_http_addr).await; + convert_and_send(&input_files, &mut sst_converter, &args.db_http_addr).await; } } async fn convert_and_send( input_files: &[InputFile], - sst_converter: &SstConverter, + sst_converter: &mut SstConverter, db_http_addr: &str, ) { let table_names = input_files diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 49804cc833..f18c9b55be 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -40,6 +40,7 @@ use crate::error::{ EmptyRegionDirSnafu, InvalidMetadataSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu, Result, StaleLogEntrySnafu, }; +use crate::manifest::action::RegionManifest; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; use crate::manifest::storage::manifest_compress_type; use crate::memtable::time_partition::TimePartitions; @@ -489,6 +490,16 @@ impl RegionMetadataLoader { region_dir: &str, region_options: &RegionOptions, ) -> Result> { + let manifest = self.load_manifest(region_dir, region_options).await?; + Ok(manifest.map(|m| m.metadata.clone())) + } + + /// Loads the manifest of the region from the region dir. + pub async fn load_manifest( + &self, + region_dir: &str, + region_options: &RegionOptions, + ) -> Result>> { let region_manifest_options = RegionOpener::manifest_options( &self.config, region_options, @@ -503,7 +514,7 @@ impl RegionMetadataLoader { }; let manifest = manifest_manager.manifest(); - Ok(Some(manifest.metadata.clone())) + Ok(Some(manifest)) } } diff --git a/src/sst-convert/src/converter.rs b/src/sst-convert/src/converter.rs index f754ecbb6a..ef5b1ba0ba 100644 --- a/src/sst-convert/src/converter.rs +++ b/src/sst-convert/src/converter.rs @@ -14,17 +14,24 @@ //! SST converter. +use std::sync::Arc; + use datanode::config::{FileConfig, StorageConfig}; use datanode::datanode::DatanodeBuilder; use datanode::store::fs::new_fs_object_store; use meta_client::MetaClientOptions; use mito2::access_layer::SstInfoArray; +use mito2::config::MitoConfig; +use mito2::read::Source; +use mito2::sst::parquet::WriteOptions; use object_store::manager::ObjectStoreManagerRef; use object_store::ObjectStore; use snafu::ResultExt; -use crate::error::{DatanodeSnafu, Result}; +use crate::error::{DatanodeSnafu, MitoSnafu, Result}; +use crate::reader::InputReaderBuilder; use crate::table::TableMetadataHelper; +use crate::writer::RegionWriterBuilder; /// Input file type. pub enum InputFileType { @@ -60,20 +67,19 @@ pub struct OutputSst { pub struct SstConverter { /// Object store for input files. pub input_store: ObjectStore, - /// Object store manager for output files. - output_store_manager: ObjectStoreManagerRef, - /// Helper to get table meta. - table_helper: TableMetadataHelper, /// Output path for the converted SST files. /// If it is not None, the converted SST files will be written to the specified path /// in the `input_store`. /// This is for debugging purposes. output_path: Option, + reader_builder: InputReaderBuilder, + writer_builder: RegionWriterBuilder, + write_opts: WriteOptions, } impl SstConverter { /// Converts a list of input to a list of outputs. - pub async fn convert(&self, input: &[InputFile]) -> Result> { + pub async fn convert(&mut self, input: &[InputFile]) -> Result> { let mut outputs = Vec::with_capacity(input.len()); for file in input { let output = self.convert_one(file).await?; @@ -83,21 +89,24 @@ impl SstConverter { } /// Converts one input. - async fn convert_one(&self, input: &InputFile) -> Result { - match input.file_type { - InputFileType::Parquet => self.convert_parquet(input).await, - InputFileType::RemoteWrite => self.convert_remote_write(input).await, - } - } + async fn convert_one(&mut self, input: &InputFile) -> Result { + let reader_info = self.reader_builder.read_input(input).await?; + let source = Source::Reader(reader_info.reader); + let output_dir = self + .output_path + .as_deref() + .unwrap_or(&reader_info.region_dir); + let writer = self + .writer_builder + .build(reader_info.metadata, output_dir, reader_info.region_options) + .await + .context(MitoSnafu)?; - /// Converts a parquet input. - async fn convert_parquet(&self, input: &InputFile) -> Result { - todo!() - } - - /// Converts a remote write input. - async fn convert_remote_write(&self, input: &InputFile) -> Result { - todo!() + let ssts = writer + .write_sst(source, &self.write_opts) + .await + .context(MitoSnafu)?; + Ok(OutputSst { ssts }) } } @@ -107,6 +116,7 @@ pub struct SstConverterBuilder { meta_options: MetaClientOptions, storage_config: StorageConfig, output_path: Option, + config: MitoConfig, } impl SstConverterBuilder { @@ -117,6 +127,7 @@ impl SstConverterBuilder { meta_options: MetaClientOptions::default(), storage_config: StorageConfig::default(), output_path: None, + config: MitoConfig::default(), } } @@ -139,17 +150,34 @@ impl SstConverterBuilder { self } + /// Sets the config for the converted SST files. + pub fn with_config(mut self, config: MitoConfig) -> Self { + self.config = config; + self + } + /// Builds a SST converter. pub async fn build(self) -> Result { let input_store = new_input_store(&self.input_path).await?; let output_store_manager = new_object_store_manager(&self.storage_config).await?; let table_helper = TableMetadataHelper::new(&self.meta_options).await?; + let config = Arc::new(self.config); + let reader_builder = InputReaderBuilder::new( + input_store.clone(), + table_helper, + output_store_manager.clone(), + config.clone(), + ); + let writer_builder = RegionWriterBuilder::new(config, output_store_manager) + .await + .context(MitoSnafu)?; Ok(SstConverter { input_store, - output_store_manager, - table_helper, output_path: self.output_path, + reader_builder, + writer_builder, + write_opts: WriteOptions::default(), }) } } diff --git a/src/sst-convert/src/reader.rs b/src/sst-convert/src/reader.rs index 4f24fd6d3b..e34ff04959 100644 --- a/src/sst-convert/src/reader.rs +++ b/src/sst-convert/src/reader.rs @@ -14,5 +14,164 @@ //! Reader to read input data in different formats. +use std::collections::HashMap; +use std::sync::Arc; + +use mito2::config::MitoConfig; +use mito2::read::BoxedBatchReader; +use mito2::region::opener::RegionMetadataLoader; +use mito2::region::options::RegionOptions; +use object_store::manager::ObjectStoreManagerRef; +use object_store::ObjectStore; +use snafu::{OptionExt, ResultExt}; +use store_api::metadata::RegionMetadataRef; +use store_api::path_utils::region_dir; +use store_api::storage::{RegionId, SequenceNumber}; +use table::metadata::TableId; + +use crate::converter::{InputFile, InputFileType}; +use crate::error::{MissingTableSnafu, MitoSnafu, Result}; +use crate::reader::remote_write::RemoteWriteReader; +use crate::table::TableMetadataHelper; +use crate::OpenDALParquetReader; + pub(crate) mod parquet; mod remote_write; + +/// Reader and context. +pub struct ReaderInfo { + pub reader: BoxedBatchReader, + pub region_dir: String, + pub region_options: RegionOptions, + pub metadata: RegionMetadataRef, +} + +/// Builder to build readers to read input files. +pub(crate) struct InputReaderBuilder { + input_store: ObjectStore, + table_helper: TableMetadataHelper, + region_loader: RegionMetadataLoader, + /// Cached region infos for tables. + region_infos: HashMap, +} + +impl InputReaderBuilder { + pub(crate) fn new( + input_store: ObjectStore, + table_helper: TableMetadataHelper, + object_store_manager: ObjectStoreManagerRef, + config: Arc, + ) -> Self { + let region_loader = RegionMetadataLoader::new(config, object_store_manager); + + Self { + input_store, + table_helper, + region_loader, + region_infos: HashMap::new(), + } + } + + /// Builds a reader to read the input file. + pub async fn read_input(&mut self, input: &InputFile) -> Result { + match input.file_type { + InputFileType::Parquet => self.read_parquet(input).await, + InputFileType::RemoteWrite => self.read_remote_write(input).await, + } + } + + /// Builds a reader to read the parquet file. + pub async fn read_parquet(&mut self, input: &InputFile) -> Result { + let region_info = self.get_region_info(input).await?; + let reader = OpenDALParquetReader::new( + self.input_store.clone(), + &input.path, + region_info.metadata.clone(), + Some(region_info.flushed_sequence + 1), + ) + .await?; + + Ok(ReaderInfo { + reader: Box::new(reader), + region_dir: region_info.region_dir, + region_options: region_info.region_options, + metadata: region_info.metadata, + }) + } + + /// Builds a reader to read the remote write file. + pub async fn read_remote_write(&mut self, input: &InputFile) -> Result { + let region_info = self.get_region_info(input).await?; + // TODO(yingwen): Should update the sequence. + let reader = RemoteWriteReader::open( + self.input_store.clone(), + &input.path, + &input.catalog, + &input.schema, + region_info.metadata.clone(), + self.table_helper.clone(), + ) + .await? + .with_sequence(region_info.flushed_sequence + 1); + + Ok(ReaderInfo { + reader: Box::new(reader), + region_dir: region_info.region_dir, + region_options: region_info.region_options, + metadata: region_info.metadata, + }) + } + + async fn get_region_info(&mut self, input: &InputFile) -> Result { + let cache_key = cache_key(&input.catalog, &input.schema, &input.table); + if let Some(region_info) = self.region_infos.get(&cache_key) { + return Ok(region_info.clone()); + } + + let table_info = self + .table_helper + .get_table(&input.catalog, &input.schema, &input.table) + .await? + .context(MissingTableSnafu { + table_name: &input.table, + })?; + let region_id = to_region_id(table_info.table_info.ident.table_id); + let opts = table_info.table_info.to_region_options(); + // TODO(yingwen): We ignore WAL options now. We should `prepare_wal_options()` in the future. + let region_options = RegionOptions::try_from(&opts).context(MitoSnafu)?; + let region_dir = region_dir(&table_info.region_storage_path(), region_id); + let manifest = self + .region_loader + .load_manifest(®ion_dir, ®ion_options) + .await + .context(MitoSnafu)? + .context(MissingTableSnafu { + table_name: &table_info.table_info.name, + })?; + let region_info = RegionInfo { + metadata: manifest.metadata.clone(), + flushed_sequence: manifest.flushed_sequence, + region_dir, + region_options, + }; + self.region_infos.insert(cache_key, region_info.clone()); + + Ok(region_info) + } +} + +fn to_region_id(table_id: TableId) -> RegionId { + RegionId::new(table_id, 0) +} + +fn cache_key(catalog: &str, schema: &str, table: &str) -> String { + format!("{}/{}/{}", catalog, schema, table) +} + +#[derive(Clone)] +struct RegionInfo { + metadata: RegionMetadataRef, + flushed_sequence: SequenceNumber, + region_dir: String, + region_options: RegionOptions, +} diff --git a/src/sst-convert/src/reader/parquet.rs b/src/sst-convert/src/reader/parquet.rs index 748c85d810..a981fad53e 100644 --- a/src/sst-convert/src/reader/parquet.rs +++ b/src/sst-convert/src/reader/parquet.rs @@ -29,7 +29,7 @@ use datatypes::schema::Schema; use datatypes::value::Value; use datatypes::vectors::{MutableVector, UInt64VectorBuilder, UInt8VectorBuilder}; use futures_util::StreamExt; -use mito2::error::{OpenDalSnafu, ReadParquetSnafu}; +use mito2::error::ReadParquetSnafu; use mito2::read::{Batch, BatchColumn, BatchReader}; use mito2::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, SparsePrimaryKeyCodec}; use object_store::ObjectStore; @@ -40,6 +40,8 @@ use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::RegionMetadataRef; use store_api::storage::{ColumnId, SequenceNumber}; +use crate::error::{MitoSnafu, ObjectStoreSnafu, Result}; + pub struct OpenDALParquetReader { inner: RawParquetReader, } @@ -50,28 +52,23 @@ impl OpenDALParquetReader { path: &str, metadata: RegionMetadataRef, override_sequence: Option, - ) -> Result { - let reader = operator - .reader_with(path) - .await - .context(OpenDalSnafu) - .map_err(BoxedError::new)?; + ) -> Result { + let reader = operator.reader_with(path).await.context(ObjectStoreSnafu)?; let content_len = operator .stat(path) .await - .context(OpenDalSnafu) - .map_err(BoxedError::new)? + .context(ObjectStoreSnafu)? .content_length(); let reader = AsyncReader::new(reader, content_len).with_prefetch_footer_size(512 * 1024); let stream = ParquetRecordBatchStreamBuilder::new(reader) .await .context(ReadParquetSnafu { path }) - .map_err(BoxedError::new)? + .context(MitoSnafu)? .build() .context(ReadParquetSnafu { path }) - .map_err(BoxedError::new)?; + .context(MitoSnafu)?; Ok(Self { inner: RawParquetReader::new(stream, metadata, override_sequence, path), }) diff --git a/src/sst-convert/src/reader/remote_write.rs b/src/sst-convert/src/reader/remote_write.rs index 9c45edee59..ddae378816 100644 --- a/src/sst-convert/src/reader/remote_write.rs +++ b/src/sst-convert/src/reader/remote_write.rs @@ -46,7 +46,7 @@ const METRIC_NAME_LABEL: &str = "__name__"; const GREPTIME_VALUE: &str = "greptime_value"; /// A reader that reads remote write file, sorts and outputs timeseries in the primary key order. -struct RemoteWriteReader { +pub struct RemoteWriteReader { /// Timeseries sorted by primary key in reverse order. /// So we can pop the series. series: Vec<(Vec, TimeSeries)>, @@ -60,8 +60,8 @@ impl RemoteWriteReader { pub async fn open( operator: ObjectStore, path: &str, - catalog: String, - schema: String, + catalog: &str, + schema: &str, metadata: RegionMetadataRef, table_helper: TableMetadataHelper, ) -> Result { @@ -73,8 +73,8 @@ impl RemoteWriteReader { })? .column_id; let encoder = PrimaryKeyEncoder { - catalog, - schema, + catalog: catalog.to_string(), + schema: schema.to_string(), metadata, table_helper, table_ids: HashMap::new(), diff --git a/src/sst-convert/src/writer.rs b/src/sst-convert/src/writer.rs index 74edc42928..07628f0a9c 100644 --- a/src/sst-convert/src/writer.rs +++ b/src/sst-convert/src/writer.rs @@ -21,15 +21,13 @@ use mito2::access_layer::{ }; use mito2::cache::CacheManager; use mito2::config::MitoConfig; -use mito2::error::{RegionMetadataNotFoundSnafu, Result}; +use mito2::error::Result; use mito2::read::Source; -use mito2::region::opener::RegionMetadataLoader; use mito2::region::options::RegionOptions; use mito2::sst::index::intermediate::IntermediateManager; use mito2::sst::index::puffin_manager::PuffinManagerFactory; use mito2::sst::parquet::WriteOptions; use object_store::manager::ObjectStoreManagerRef; -use snafu::OptionExt; use store_api::metadata::RegionMetadataRef; /// A writer that can create multiple SST files for a region. @@ -69,21 +67,19 @@ impl RegionWriter { } /// Creator to create [`RegionWriter`] for different regions. -pub struct RegionWriterCreator { +pub struct RegionWriterBuilder { /// Mito engine config. config: Arc, /// Object stores. object_store_manager: ObjectStoreManagerRef, - /// Loader to load region metadata. - metadata_loader: RegionMetadataLoader, puffin_manager_factory: PuffinManagerFactory, intermediate_manager: IntermediateManager, } -impl RegionWriterCreator { +impl RegionWriterBuilder { /// Create a new [`RegionContextCreator`]. pub async fn new( - config: MitoConfig, + config: Arc, object_store_manager: ObjectStoreManagerRef, ) -> Result { let puffin_manager_factory = PuffinManagerFactory::new( @@ -96,14 +92,10 @@ impl RegionWriterCreator { let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path) .await? .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _)); - let config = Arc::new(config); - let metadata_loader = - RegionMetadataLoader::new(config.clone(), object_store_manager.clone()); Ok(Self { config, object_store_manager, - metadata_loader, puffin_manager_factory, intermediate_manager, }) @@ -112,14 +104,10 @@ impl RegionWriterCreator { /// Builds a [`RegionWriter`] for the given region directory. pub async fn build( &self, + metadata: RegionMetadataRef, region_dir: &str, region_options: RegionOptions, ) -> Result { - let metadata = self - .metadata_loader - .load(region_dir, ®ion_options) - .await? - .context(RegionMetadataNotFoundSnafu)?; let object_store = mito2::region::opener::get_object_store( ®ion_options.storage, &self.object_store_manager,