diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 65bc4f0082..1af79daff6 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -98,7 +98,7 @@ use common_meta::key::SchemaMetadataManagerRef; use common_recordbatch::{MemoryPermit, QueryMemoryTracker, SendableRecordBatchStream}; use common_stat::get_total_memory_bytes; use common_telemetry::{info, tracing, warn}; -use common_wal::options::{WAL_OPTIONS_KEY, WalOptions}; +use common_wal::options::WalOptions; use futures::future::{join_all, try_join_all}; use futures::stream::{self, Stream, StreamExt}; use object_store::manager::ObjectStoreManagerRef; @@ -144,6 +144,7 @@ use crate::read::scan_region::{ScanRegion, Scanner}; use crate::read::stream::ScanBatchStream; use crate::region::MitoRegionRef; use crate::region::opener::PartitionExprFetcherRef; +use crate::region::options::parse_wal_options; use crate::request::{RegionEditRequest, WorkerRequest}; use crate::sst::file::{FileMeta, RegionFileId, RegionIndexId}; use crate::sst::file_ref::FileReferenceManagerRef; @@ -726,12 +727,7 @@ fn prepare_batch_open_requests( let mut topic_to_regions: HashMap> = HashMap::new(); let mut remaining_regions: Vec<(RegionId, RegionOpenRequest)> = Vec::new(); for (region_id, request) in requests { - let options = if let Some(options) = request.options.get(WAL_OPTIONS_KEY) { - serde_json::from_str(options).context(SerdeJsonSnafu)? - } else { - WalOptions::RaftEngine - }; - match options { + match parse_wal_options(&request.options).context(SerdeJsonSnafu)? { WalOptions::Kafka(options) => { topic_to_regions .entry(options.topic) @@ -1128,6 +1124,18 @@ impl EngineInner { } } +fn map_batch_responses(responses: Vec<(RegionId, Result)>) -> BatchResponses { + responses + .into_iter() + .map(|(region_id, response)| { + ( + region_id, + response.map(RegionResponse::new).map_err(BoxedError::new), + ) + }) + .collect() +} + #[async_trait] impl RegionEngine for MitoEngine { fn name(&self) -> &str { @@ -1144,17 +1152,7 @@ impl RegionEngine for MitoEngine { self.inner .handle_batch_open_requests(parallelism, requests) .await - .map(|responses| { - responses - .into_iter() - .map(|(region_id, response)| { - ( - region_id, - response.map(RegionResponse::new).map_err(BoxedError::new), - ) - }) - .collect::>() - }) + .map(map_batch_responses) .map_err(BoxedError::new) } @@ -1167,17 +1165,7 @@ impl RegionEngine for MitoEngine { self.inner .handle_batch_catchup_requests(parallelism, requests) .await - .map(|responses| { - responses - .into_iter() - .map(|(region_id, response)| { - ( - region_id, - response.map(RegionResponse::new).map_err(BoxedError::new), - ) - }) - .collect::>() - }) + .map(map_batch_responses) .map_err(BoxedError::new) } diff --git a/src/mito2/src/region/options.rs b/src/mito2/src/region/options.rs index f1034de379..0fe0a8f12a 100644 --- a/src/mito2/src/region/options.rs +++ b/src/mito2/src/region/options.rs @@ -39,6 +39,16 @@ use crate::sst::FormatType; const DEFAULT_INDEX_SEGMENT_ROW_COUNT: usize = 1024; +pub(crate) fn parse_wal_options( + options_map: &HashMap, +) -> std::result::Result { + options_map + .get(WAL_OPTIONS_KEY) + .map_or(Ok(WalOptions::default()), |encoded_wal_options| { + serde_json::from_str(encoded_wal_options) + }) +} + /// Mode to handle duplicate rows while merging. #[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, EnumString)] #[serde(rename_all = "snake_case")] @@ -134,13 +144,7 @@ impl TryFrom<&HashMap> for RegionOptions { CompactionOptions::default() }; - // Tries to decode the wal options from the map or sets to the default if there's none wal options in the map. - let wal_options = options_map.get(WAL_OPTIONS_KEY).map_or_else( - || Ok(WalOptions::default()), - |encoded_wal_options| { - serde_json::from_str(encoded_wal_options).context(JsonOptionsSnafu) - }, - )?; + let wal_options = parse_wal_options(options_map).context(JsonOptionsSnafu)?; let index_options: IndexOptions = serde_json::from_str(&json).context(JsonOptionsSnafu)?; let memtable = if validate_enum_options(options_map, "memtable.type")? { diff --git a/src/mito2/src/region/utils.rs b/src/mito2/src/region/utils.rs index 25c084ef7a..6917315757 100644 --- a/src/mito2/src/region/utils.rs +++ b/src/mito2/src/region/utils.rs @@ -108,6 +108,21 @@ impl FileDescriptor { FileDescriptor::Data { size, .. } => *size, } } + + fn file_path(&self, region_id: RegionId, table_dir: &str, path_type: PathType) -> String { + match *self { + FileDescriptor::Index { + file_id, version, .. + } => location::index_file_path( + table_dir, + RegionIndexId::new(RegionFileId::new(region_id, file_id), version), + path_type, + ), + FileDescriptor::Data { file_id, .. } => { + location::sst_file_path(table_dir, RegionFileId::new(region_id, file_id), path_type) + } + } + } } /// Builds the source and target file paths for a given file descriptor. @@ -128,34 +143,10 @@ fn build_copy_file_paths( table_dir: &str, path_type: PathType, ) -> (String, String) { - match file_descriptor { - FileDescriptor::Index { - file_id, version, .. - } => ( - location::index_file_path( - table_dir, - RegionIndexId::new(RegionFileId::new(source_region_id, file_id), version), - path_type, - ), - location::index_file_path( - table_dir, - RegionIndexId::new(RegionFileId::new(target_region_id, file_id), version), - path_type, - ), - ), - FileDescriptor::Data { file_id, .. } => ( - location::sst_file_path( - table_dir, - RegionFileId::new(source_region_id, file_id), - path_type, - ), - location::sst_file_path( - table_dir, - RegionFileId::new(target_region_id, file_id), - path_type, - ), - ), - } + ( + file_descriptor.file_path(source_region_id, table_dir, path_type), + file_descriptor.file_path(target_region_id, table_dir, path_type), + ) } fn build_delete_file_path( @@ -164,20 +155,7 @@ fn build_delete_file_path( table_dir: &str, path_type: PathType, ) -> String { - match file_descriptor { - FileDescriptor::Index { - file_id, version, .. - } => location::index_file_path( - table_dir, - RegionIndexId::new(RegionFileId::new(target_region_id, file_id), version), - path_type, - ), - FileDescriptor::Data { file_id, .. } => location::sst_file_path( - table_dir, - RegionFileId::new(target_region_id, file_id), - path_type, - ), - } + file_descriptor.file_path(target_region_id, table_dir, path_type) } impl RegionFileCopier { @@ -342,4 +320,28 @@ mod tests { ) ); } + + #[test] + fn test_build_delete_file_path() { + common_telemetry::init_default_ut_logging(); + let file_id = FileId::random(); + let target_region_id = RegionId::new(1, 2); + let table_dir = "/table_dir"; + let path_type = PathType::Bare; + + let file_descriptor = FileDescriptor::Data { file_id, size: 100 }; + let path = build_delete_file_path(target_region_id, file_descriptor, table_dir, path_type); + assert_eq!(path, format!("/table_dir/1_0000000002/{}.parquet", file_id)); + + let file_descriptor = FileDescriptor::Index { + file_id, + version: 1, + size: 100, + }; + let path = build_delete_file_path(target_region_id, file_descriptor, table_dir, path_type); + assert_eq!( + path, + format!("/table_dir/1_0000000002/index/{}.1.puffin", file_id) + ); + } } diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index 79391e324d..1e6b8386b3 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -163,15 +163,9 @@ impl VersionControl { /// Mark all opened files as deleted and set the delete marker in [VersionControlData] pub(crate) fn mark_dropped(&self) { let version = self.current().version; - let part_duration = Some(version.memtables.mutable.part_duration()); - let next_memtable_id = version.memtables.mutable.next_memtable_id(); let memtable_builder = version.memtables.mutable.memtable_builder().clone(); - let new_mutable = Arc::new(TimePartitions::new( - version.metadata.clone(), - memtable_builder, - next_memtable_id, - part_duration, - )); + let new_mutable = + Self::new_mutable_from_version(&version, version.metadata.clone(), memtable_builder); let mut data = self.data.write().unwrap(); data.is_dropped = true; @@ -188,15 +182,9 @@ impl VersionControl { /// new schema. Memtables of the version must be empty. pub(crate) fn alter_schema(&self, metadata: RegionMetadataRef) { let version = self.current().version; - let part_duration = Some(version.memtables.mutable.part_duration()); - let next_memtable_id = version.memtables.mutable.next_memtable_id(); let memtable_builder = version.memtables.mutable.memtable_builder().clone(); - let new_mutable = Arc::new(TimePartitions::new( - metadata.clone(), - memtable_builder, - next_memtable_id, - part_duration, - )); + let new_mutable = + Self::new_mutable_from_version(&version, metadata.clone(), memtable_builder); debug_assert!(version.memtables.mutable.is_empty()); debug_assert!(version.memtables.immutables().is_empty()); let new_version = Arc::new( @@ -221,15 +209,9 @@ impl VersionControl { memtable_builder: MemtableBuilderRef, ) { let version = self.current().version; - let part_duration = Some(version.memtables.mutable.part_duration()); - let next_memtable_id = version.memtables.mutable.next_memtable_id(); // Use the new metadata to build `TimePartitions`. - let new_mutable = Arc::new(TimePartitions::new( - metadata.clone(), - memtable_builder, - next_memtable_id, - part_duration, - )); + let new_mutable = + Self::new_mutable_from_version(&version, metadata.clone(), memtable_builder); debug_assert!(version.memtables.mutable.is_empty()); debug_assert!(version.memtables.immutables().is_empty()); let new_version = Arc::new( @@ -248,20 +230,17 @@ impl VersionControl { pub(crate) fn truncate(&self, truncate_kind: TruncateKind) { let version = self.current().version; - let part_duration = version.memtables.mutable.part_duration(); - let next_memtable_id = version.memtables.mutable.next_memtable_id(); - let memtable_builder = version.memtables.mutable.memtable_builder().clone(); - let new_mutable = Arc::new(TimePartitions::new( - version.metadata.clone(), - memtable_builder, - next_memtable_id, - Some(part_duration), - )); match truncate_kind { TruncateKind::All { truncated_entry_id, truncated_sequence, } => { + let memtable_builder = version.memtables.mutable.memtable_builder().clone(); + let new_mutable = Self::new_mutable_from_version( + &version, + version.metadata.clone(), + memtable_builder, + ); let new_version = Arc::new( VersionBuilder::from_version(version) .memtables(MemtableVersion::new(new_mutable)) @@ -295,6 +274,19 @@ impl VersionControl { let mut version_data = self.data.write().unwrap(); version_data.version = version; } + + fn new_mutable_from_version( + version: &Version, + metadata: RegionMetadataRef, + memtable_builder: MemtableBuilderRef, + ) -> TimePartitionsRef { + Arc::new(TimePartitions::new( + metadata, + memtable_builder, + version.memtables.mutable.next_memtable_id(), + Some(version.memtables.mutable.part_duration()), + )) + } } pub(crate) type VersionControlRef = Arc; diff --git a/src/mito2/src/wal.rs b/src/mito2/src/wal.rs index f4515070b7..cc6facce3d 100644 --- a/src/mito2/src/wal.rs +++ b/src/mito2/src/wal.rs @@ -67,7 +67,7 @@ impl Wal { impl Clone for Wal { fn clone(&self) -> Self { Self { - store: self.store.clone(), + store: Arc::clone(&self.store), } } } @@ -137,18 +137,8 @@ impl Wal { start_id: EntryId, provider: &'a Provider, ) -> Result> { - match provider { - Provider::RaftEngine(_) => { - LogStoreEntryReader::new(LogStoreRawEntryReader::new(self.store.clone())) - .read(provider, start_id) - } - Provider::Kafka(_) => LogStoreEntryReader::new(RegionRawEntryReader::new( - LogStoreRawEntryReader::new(self.store.clone()), - region_id, - )) - .read(provider, start_id), - Provider::Noop => Ok(Box::pin(futures::stream::empty())), - } + let mut reader = self.wal_entry_reader(provider, region_id, None); + reader.read(provider, start_id) } /// Mark entries whose ids `<= last_id` as deleted.