refactor: remove duplications from mito (#7632)

* parse wal options

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* new memtable from version

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* file path

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* use wal entry reader

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* map batch responses

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-01-28 17:03:22 +08:00
committed by GitHub
parent 00f568ed28
commit d99a946d33
5 changed files with 100 additions and 124 deletions

View File

@@ -98,7 +98,7 @@ use common_meta::key::SchemaMetadataManagerRef;
use common_recordbatch::{MemoryPermit, QueryMemoryTracker, SendableRecordBatchStream};
use common_stat::get_total_memory_bytes;
use common_telemetry::{info, tracing, warn};
use common_wal::options::{WAL_OPTIONS_KEY, WalOptions};
use common_wal::options::WalOptions;
use futures::future::{join_all, try_join_all};
use futures::stream::{self, Stream, StreamExt};
use object_store::manager::ObjectStoreManagerRef;
@@ -144,6 +144,7 @@ use crate::read::scan_region::{ScanRegion, Scanner};
use crate::read::stream::ScanBatchStream;
use crate::region::MitoRegionRef;
use crate::region::opener::PartitionExprFetcherRef;
use crate::region::options::parse_wal_options;
use crate::request::{RegionEditRequest, WorkerRequest};
use crate::sst::file::{FileMeta, RegionFileId, RegionIndexId};
use crate::sst::file_ref::FileReferenceManagerRef;
@@ -726,12 +727,7 @@ fn prepare_batch_open_requests(
let mut topic_to_regions: HashMap<String, Vec<(RegionId, RegionOpenRequest)>> = HashMap::new();
let mut remaining_regions: Vec<(RegionId, RegionOpenRequest)> = Vec::new();
for (region_id, request) in requests {
let options = if let Some(options) = request.options.get(WAL_OPTIONS_KEY) {
serde_json::from_str(options).context(SerdeJsonSnafu)?
} else {
WalOptions::RaftEngine
};
match options {
match parse_wal_options(&request.options).context(SerdeJsonSnafu)? {
WalOptions::Kafka(options) => {
topic_to_regions
.entry(options.topic)
@@ -1128,6 +1124,18 @@ impl EngineInner {
}
}
fn map_batch_responses(responses: Vec<(RegionId, Result<AffectedRows>)>) -> BatchResponses {
responses
.into_iter()
.map(|(region_id, response)| {
(
region_id,
response.map(RegionResponse::new).map_err(BoxedError::new),
)
})
.collect()
}
#[async_trait]
impl RegionEngine for MitoEngine {
fn name(&self) -> &str {
@@ -1144,17 +1152,7 @@ impl RegionEngine for MitoEngine {
self.inner
.handle_batch_open_requests(parallelism, requests)
.await
.map(|responses| {
responses
.into_iter()
.map(|(region_id, response)| {
(
region_id,
response.map(RegionResponse::new).map_err(BoxedError::new),
)
})
.collect::<Vec<_>>()
})
.map(map_batch_responses)
.map_err(BoxedError::new)
}
@@ -1167,17 +1165,7 @@ impl RegionEngine for MitoEngine {
self.inner
.handle_batch_catchup_requests(parallelism, requests)
.await
.map(|responses| {
responses
.into_iter()
.map(|(region_id, response)| {
(
region_id,
response.map(RegionResponse::new).map_err(BoxedError::new),
)
})
.collect::<Vec<_>>()
})
.map(map_batch_responses)
.map_err(BoxedError::new)
}

View File

@@ -39,6 +39,16 @@ use crate::sst::FormatType;
const DEFAULT_INDEX_SEGMENT_ROW_COUNT: usize = 1024;
pub(crate) fn parse_wal_options(
options_map: &HashMap<String, String>,
) -> std::result::Result<WalOptions, serde_json::Error> {
options_map
.get(WAL_OPTIONS_KEY)
.map_or(Ok(WalOptions::default()), |encoded_wal_options| {
serde_json::from_str(encoded_wal_options)
})
}
/// Mode to handle duplicate rows while merging.
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, EnumString)]
#[serde(rename_all = "snake_case")]
@@ -134,13 +144,7 @@ impl TryFrom<&HashMap<String, String>> for RegionOptions {
CompactionOptions::default()
};
// Tries to decode the wal options from the map or sets to the default if there's none wal options in the map.
let wal_options = options_map.get(WAL_OPTIONS_KEY).map_or_else(
|| Ok(WalOptions::default()),
|encoded_wal_options| {
serde_json::from_str(encoded_wal_options).context(JsonOptionsSnafu)
},
)?;
let wal_options = parse_wal_options(options_map).context(JsonOptionsSnafu)?;
let index_options: IndexOptions = serde_json::from_str(&json).context(JsonOptionsSnafu)?;
let memtable = if validate_enum_options(options_map, "memtable.type")? {

View File

@@ -108,6 +108,21 @@ impl FileDescriptor {
FileDescriptor::Data { size, .. } => *size,
}
}
fn file_path(&self, region_id: RegionId, table_dir: &str, path_type: PathType) -> String {
match *self {
FileDescriptor::Index {
file_id, version, ..
} => location::index_file_path(
table_dir,
RegionIndexId::new(RegionFileId::new(region_id, file_id), version),
path_type,
),
FileDescriptor::Data { file_id, .. } => {
location::sst_file_path(table_dir, RegionFileId::new(region_id, file_id), path_type)
}
}
}
}
/// Builds the source and target file paths for a given file descriptor.
@@ -128,34 +143,10 @@ fn build_copy_file_paths(
table_dir: &str,
path_type: PathType,
) -> (String, String) {
match file_descriptor {
FileDescriptor::Index {
file_id, version, ..
} => (
location::index_file_path(
table_dir,
RegionIndexId::new(RegionFileId::new(source_region_id, file_id), version),
path_type,
),
location::index_file_path(
table_dir,
RegionIndexId::new(RegionFileId::new(target_region_id, file_id), version),
path_type,
),
),
FileDescriptor::Data { file_id, .. } => (
location::sst_file_path(
table_dir,
RegionFileId::new(source_region_id, file_id),
path_type,
),
location::sst_file_path(
table_dir,
RegionFileId::new(target_region_id, file_id),
path_type,
),
),
}
(
file_descriptor.file_path(source_region_id, table_dir, path_type),
file_descriptor.file_path(target_region_id, table_dir, path_type),
)
}
fn build_delete_file_path(
@@ -164,20 +155,7 @@ fn build_delete_file_path(
table_dir: &str,
path_type: PathType,
) -> String {
match file_descriptor {
FileDescriptor::Index {
file_id, version, ..
} => location::index_file_path(
table_dir,
RegionIndexId::new(RegionFileId::new(target_region_id, file_id), version),
path_type,
),
FileDescriptor::Data { file_id, .. } => location::sst_file_path(
table_dir,
RegionFileId::new(target_region_id, file_id),
path_type,
),
}
file_descriptor.file_path(target_region_id, table_dir, path_type)
}
impl RegionFileCopier {
@@ -342,4 +320,28 @@ mod tests {
)
);
}
#[test]
fn test_build_delete_file_path() {
common_telemetry::init_default_ut_logging();
let file_id = FileId::random();
let target_region_id = RegionId::new(1, 2);
let table_dir = "/table_dir";
let path_type = PathType::Bare;
let file_descriptor = FileDescriptor::Data { file_id, size: 100 };
let path = build_delete_file_path(target_region_id, file_descriptor, table_dir, path_type);
assert_eq!(path, format!("/table_dir/1_0000000002/{}.parquet", file_id));
let file_descriptor = FileDescriptor::Index {
file_id,
version: 1,
size: 100,
};
let path = build_delete_file_path(target_region_id, file_descriptor, table_dir, path_type);
assert_eq!(
path,
format!("/table_dir/1_0000000002/index/{}.1.puffin", file_id)
);
}
}

View File

@@ -163,15 +163,9 @@ impl VersionControl {
/// Mark all opened files as deleted and set the delete marker in [VersionControlData]
pub(crate) fn mark_dropped(&self) {
let version = self.current().version;
let part_duration = Some(version.memtables.mutable.part_duration());
let next_memtable_id = version.memtables.mutable.next_memtable_id();
let memtable_builder = version.memtables.mutable.memtable_builder().clone();
let new_mutable = Arc::new(TimePartitions::new(
version.metadata.clone(),
memtable_builder,
next_memtable_id,
part_duration,
));
let new_mutable =
Self::new_mutable_from_version(&version, version.metadata.clone(), memtable_builder);
let mut data = self.data.write().unwrap();
data.is_dropped = true;
@@ -188,15 +182,9 @@ impl VersionControl {
/// new schema. Memtables of the version must be empty.
pub(crate) fn alter_schema(&self, metadata: RegionMetadataRef) {
let version = self.current().version;
let part_duration = Some(version.memtables.mutable.part_duration());
let next_memtable_id = version.memtables.mutable.next_memtable_id();
let memtable_builder = version.memtables.mutable.memtable_builder().clone();
let new_mutable = Arc::new(TimePartitions::new(
metadata.clone(),
memtable_builder,
next_memtable_id,
part_duration,
));
let new_mutable =
Self::new_mutable_from_version(&version, metadata.clone(), memtable_builder);
debug_assert!(version.memtables.mutable.is_empty());
debug_assert!(version.memtables.immutables().is_empty());
let new_version = Arc::new(
@@ -221,15 +209,9 @@ impl VersionControl {
memtable_builder: MemtableBuilderRef,
) {
let version = self.current().version;
let part_duration = Some(version.memtables.mutable.part_duration());
let next_memtable_id = version.memtables.mutable.next_memtable_id();
// Use the new metadata to build `TimePartitions`.
let new_mutable = Arc::new(TimePartitions::new(
metadata.clone(),
memtable_builder,
next_memtable_id,
part_duration,
));
let new_mutable =
Self::new_mutable_from_version(&version, metadata.clone(), memtable_builder);
debug_assert!(version.memtables.mutable.is_empty());
debug_assert!(version.memtables.immutables().is_empty());
let new_version = Arc::new(
@@ -248,20 +230,17 @@ impl VersionControl {
pub(crate) fn truncate(&self, truncate_kind: TruncateKind) {
let version = self.current().version;
let part_duration = version.memtables.mutable.part_duration();
let next_memtable_id = version.memtables.mutable.next_memtable_id();
let memtable_builder = version.memtables.mutable.memtable_builder().clone();
let new_mutable = Arc::new(TimePartitions::new(
version.metadata.clone(),
memtable_builder,
next_memtable_id,
Some(part_duration),
));
match truncate_kind {
TruncateKind::All {
truncated_entry_id,
truncated_sequence,
} => {
let memtable_builder = version.memtables.mutable.memtable_builder().clone();
let new_mutable = Self::new_mutable_from_version(
&version,
version.metadata.clone(),
memtable_builder,
);
let new_version = Arc::new(
VersionBuilder::from_version(version)
.memtables(MemtableVersion::new(new_mutable))
@@ -295,6 +274,19 @@ impl VersionControl {
let mut version_data = self.data.write().unwrap();
version_data.version = version;
}
fn new_mutable_from_version(
version: &Version,
metadata: RegionMetadataRef,
memtable_builder: MemtableBuilderRef,
) -> TimePartitionsRef {
Arc::new(TimePartitions::new(
metadata,
memtable_builder,
version.memtables.mutable.next_memtable_id(),
Some(version.memtables.mutable.part_duration()),
))
}
}
pub(crate) type VersionControlRef = Arc<VersionControl>;

View File

@@ -67,7 +67,7 @@ impl<S> Wal<S> {
impl<S> Clone for Wal<S> {
fn clone(&self) -> Self {
Self {
store: self.store.clone(),
store: Arc::clone(&self.store),
}
}
}
@@ -137,18 +137,8 @@ impl<S: LogStore> Wal<S> {
start_id: EntryId,
provider: &'a Provider,
) -> Result<WalEntryStream<'a>> {
match provider {
Provider::RaftEngine(_) => {
LogStoreEntryReader::new(LogStoreRawEntryReader::new(self.store.clone()))
.read(provider, start_id)
}
Provider::Kafka(_) => LogStoreEntryReader::new(RegionRawEntryReader::new(
LogStoreRawEntryReader::new(self.store.clone()),
region_id,
))
.read(provider, start_id),
Provider::Noop => Ok(Box::pin(futures::stream::empty())),
}
let mut reader = self.wal_entry_reader(provider, region_id, None);
reader.read(provider, start_id)
}
/// Mark entries whose ids `<= last_id` as deleted.