refactor: clean unnecessary disabled lints (#2338)

* clean manifest

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* clean engine

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* clean region

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* clean asscess_layer

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* clean manifest manager

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* clean row_converter

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* clean scheduler

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* clean worker

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-09-06 02:20:50 -05:00
parent 2ee2d29085
commit 4b2b59c31b
10 changed files with 14 additions and 189 deletions

View File

@@ -92,8 +92,6 @@ impl MitoEngine {
struct EngineInner {
/// Region workers group.
workers: WorkerGroup,
/// Shared object store of all regions.
object_store: ObjectStore,
}
impl EngineInner {
@@ -105,7 +103,6 @@ impl EngineInner {
) -> EngineInner {
EngineInner {
workers: WorkerGroup::start(config, log_store, object_store.clone()),
object_store,
}
}

View File

@@ -20,35 +20,27 @@
pub mod test_util;
// TODO(yingwen): Remove all `allow(dead_code)` after finish refactoring mito.
#[allow(dead_code)]
mod access_layer;
pub mod config;
#[allow(dead_code)]
pub mod engine;
pub mod error;
#[allow(dead_code)]
mod flush;
#[allow(dead_code)]
#[allow(unused_variables)]
pub mod manifest;
#[allow(dead_code)]
pub mod memtable;
mod metrics;
#[allow(dead_code)]
pub mod read;
#[allow(dead_code)]
mod region;
mod region_write_ctx;
#[allow(dead_code)]
pub mod request;
#[allow(dead_code)]
mod row_converter;
#[allow(dead_code)]
pub(crate) mod schedule;
#[allow(dead_code)]
pub mod sst;
pub mod wal;
#[allow(dead_code)]
mod worker;
#[cfg_attr(doc, aquamarine::aquamarine)]

View File

@@ -15,7 +15,6 @@
//! manifest storage
pub mod action;
#[allow(unused_variables)]
pub mod manager;
pub mod storage;
#[cfg(test)]

View File

@@ -18,7 +18,6 @@ use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::manifest::action::{ProtocolAction, ProtocolVersion};
use store_api::manifest::ManifestVersion;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
@@ -30,8 +29,6 @@ use crate::wal::EntryId;
/// Actions that can be applied to region manifest.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub enum RegionMetaAction {
/// Set the min/max supported protocol version
Protocol(ProtocolAction),
/// Change region's metadata for request like ALTER
Change(RegionChange),
/// Edit region's state for changing options or file list.
@@ -176,11 +173,6 @@ impl RegionMetaActionList {
}
impl RegionMetaActionList {
fn set_protocol(&mut self, action: ProtocolAction) {
// The protocol action should be the first action in action list by convention.
self.actions.insert(0, RegionMetaAction::Protocol(action));
}
/// Encode self into json in the form of string lines.
pub fn encode(&self) -> Result<Vec<u8>> {
let json = serde_json::to_string(&self).context(SerdeJsonSnafu)?;
@@ -195,22 +187,6 @@ impl RegionMetaActionList {
}
}
pub struct RegionMetaActionIter {
// log_iter: ObjectStoreLogIterator,
reader_version: ProtocolVersion,
last_protocol: Option<ProtocolAction>,
}
impl RegionMetaActionIter {
pub fn last_protocol(&self) -> Option<ProtocolAction> {
self.last_protocol.clone()
}
async fn next_action(&mut self) -> Result<Option<(ManifestVersion, RegionMetaActionList)>> {
todo!()
}
}
#[cfg(test)]
mod tests {
@@ -262,19 +238,6 @@ mod tests {
let region_remove = r#"{"region_id":42}"#;
let _ = serde_json::from_str::<RegionRemove>(region_remove).unwrap();
let protocol_action = r#"{"min_reader_version":1,"min_writer_version":2}"#;
let _ = serde_json::from_str::<ProtocolAction>(protocol_action).unwrap();
}
fn mock_file_meta() -> FileMeta {
FileMeta {
region_id: 0.into(),
file_id: FileId::random(),
time_range: (0.into(), 10000.into()),
level: 0,
file_size: 1024,
}
}
#[test]

View File

@@ -273,7 +273,7 @@ impl RegionManifestManagerInner {
RegionMetaAction::Edit(action) => {
manifest_builder.apply_edit(manifest_version, action);
}
RegionMetaAction::Remove(_) | RegionMetaAction::Protocol(_) => {
RegionMetaAction::Remove(_) => {
debug!(
"Unhandled action in {}, action: {:?}",
options.manifest_dir, action
@@ -323,7 +323,7 @@ impl RegionManifestManagerInner {
RegionMetaAction::Edit(action) => {
manifest_builder.apply_edit(version, action);
}
RegionMetaAction::Remove(_) | RegionMetaAction::Protocol(_) => {
RegionMetaAction::Remove(_) => {
debug!(
"Unhandled action for region {}, action: {:?}",
self.manifest.metadata.region_id, action
@@ -394,7 +394,7 @@ impl RegionManifestManagerInner {
RegionMetaAction::Edit(action) => {
manifest_builder.apply_edit(version, action);
}
RegionMetaAction::Remove(_) | RegionMetaAction::Protocol(_) => {
RegionMetaAction::Remove(_) => {
debug!(
"Unhandled action for region {}, action: {:?}",
self.manifest.metadata.region_id, action
@@ -436,7 +436,7 @@ impl RegionManifestManagerInner {
) -> Result<Option<RegionCheckpoint>> {
let last_checkpoint = store.load_last_checkpoint().await?;
if let Some((version, bytes)) = last_checkpoint {
if let Some((_, bytes)) = last_checkpoint {
let checkpoint = RegionCheckpoint::decode(&bytes)?;
Ok(Some(checkpoint))
} else {

View File

@@ -17,10 +17,10 @@ use std::iter::Iterator;
use std::str::FromStr;
use common_datasource::compression::CompressionType;
use common_telemetry::{debug, info};
use common_telemetry::debug;
use futures::TryStreamExt;
use lazy_static::lazy_static;
use object_store::{raw_normalize_path, util, Entry, ErrorKind, ObjectStore};
use object_store::{util, Entry, ErrorKind, ObjectStore};
use regex::Regex;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
@@ -184,10 +184,6 @@ impl ManifestObjectStore {
.context(OpenDalSnafu)
}
pub(crate) fn path(&self) -> &str {
&self.path
}
pub async fn scan(
&self,
start: ManifestVersion,
@@ -289,45 +285,6 @@ impl ManifestObjectStore {
Ok(ret)
}
async fn delete_all(&self, remove_action_manifest: ManifestVersion) -> Result<()> {
let entries: Vec<Entry> = self.get_paths(Some).await?;
// Filter out the latest delta file.
let paths: Vec<_> = entries
.iter()
.filter(|e| {
let name = e.name();
if is_delta_file(name) && file_version(name) == remove_action_manifest {
return false;
}
true
})
.map(|e| e.path().to_string())
.collect();
info!(
"Deleting {} from manifest storage path {} paths: {:?}",
paths.len(),
self.path,
paths,
);
// Delete all files except the latest delta file.
self.object_store
.remove(paths)
.await
.context(OpenDalSnafu)?;
// Delete the latest delta file and the manifest directory.
self.object_store
.remove_all(&self.path)
.await
.context(OpenDalSnafu)?;
info!("Deleted manifest storage path {}", self.path);
Ok(())
}
pub async fn save(&self, version: ManifestVersion, bytes: &[u8]) -> Result<()> {
let path = self.delta_file_path(version);
debug!("Save log to manifest storage, version: {}", version);
@@ -345,36 +302,6 @@ impl ManifestObjectStore {
.context(OpenDalSnafu)
}
async fn delete(&self, start: ManifestVersion, end: ManifestVersion) -> Result<()> {
ensure!(start <= end, InvalidScanIndexSnafu { start, end });
// Due to backward compatibility, it is possible that the user's log between start and end has not been compressed,
// so we need to delete the uncompressed file corresponding to that version, even if the uncompressed file in that version do not exist.
let mut paths = Vec::with_capacity(((end - start) * 2) as usize);
for version in start..end {
paths.push(raw_normalize_path(&self.delta_file_path(version)));
if self.compress_type != FALL_BACK_COMPRESS_TYPE {
paths.push(raw_normalize_path(&gen_path(
&self.path,
&delta_file(version),
FALL_BACK_COMPRESS_TYPE,
)));
}
}
debug!(
"Deleting logs from manifest storage, start: {}, end: {}",
start, end
);
self.object_store
.remove(paths.clone())
.await
.context(OpenDalSnafu)?;
Ok(())
}
pub async fn save_checkpoint(&self, version: ManifestVersion, bytes: &[u8]) -> Result<()> {
let path = self.checkpoint_file_path(version);
let data = self
@@ -469,29 +396,6 @@ impl ManifestObjectStore {
Ok(checkpoint_data.map(|data| (version, data)))
}
async fn delete_checkpoint(&self, version: ManifestVersion) -> Result<()> {
// Due to backward compatibility, it is possible that the user's checkpoint file has not been compressed,
// so we need to delete the uncompressed checkpoint file corresponding to that version, even if the uncompressed checkpoint file in that version do not exist.
let paths = if self.compress_type != FALL_BACK_COMPRESS_TYPE {
vec![
raw_normalize_path(&self.checkpoint_file_path(version)),
raw_normalize_path(&gen_path(
&self.path,
&checkpoint_file(version),
FALL_BACK_COMPRESS_TYPE,
)),
]
} else {
vec![raw_normalize_path(&self.checkpoint_file_path(version))]
};
self.object_store
.remove(paths.clone())
.await
.context(OpenDalSnafu)?;
Ok(())
}
/// Load the latest checkpoint.
/// Return manifest version and the raw [RegionCheckpoint](crate::manifest::action::RegionCheckpoint) content if any
pub async fn load_last_checkpoint(&self) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
@@ -609,18 +513,6 @@ mod tests {
}
assert!(it.next_log().await.unwrap().is_none());
// Delete [0, 3)
log_store.delete(0, 3).await.unwrap();
// [3, 5) remains
let mut it = log_store.scan(0, 11).await.unwrap();
for v in 3..5 {
let (version, bytes) = it.next_log().await.unwrap().unwrap();
assert_eq!(v, version);
assert_eq!(format!("hello, {v}").as_bytes(), bytes);
}
assert!(it.next_log().await.unwrap().is_none());
// test checkpoint
assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
log_store
@@ -702,19 +594,9 @@ mod tests {
assert_eq!(v, 10);
assert_eq!(checkpoint, "checkpoint_compressed".as_bytes());
// Delete previously uncompressed checkpoint
log_store.delete_checkpoint(5).await.unwrap();
assert!(log_store.load_checkpoint(5).await.unwrap().is_none());
// Delete [3, 7), contain uncompressed/compressed data
log_store.delete(3, 7).await.unwrap();
// [3, 7) deleted
let mut it = log_store.scan(3, 7).await.unwrap();
assert!(it.next_log().await.unwrap().is_none());
// Delete util 10, contain uncompressed/compressed data
// log 0, 1, 2, 7, 8, 9 will be delete
assert_eq!(6, log_store.delete_until(10, false).await.unwrap());
assert_eq!(11, log_store.delete_until(10, false).await.unwrap());
let mut it = log_store.scan(0, 10).await.unwrap();
assert!(it.next_log().await.unwrap().is_none());
}

View File

@@ -66,7 +66,7 @@ async fn manager_without_checkpoint() {
let (_env, manager) = build_manager(0, CompressionType::Uncompressed).await;
// apply 10 actions
for i in 0..10 {
for _ in 0..10 {
manager.update(nop_action()).await.unwrap();
}
@@ -110,7 +110,7 @@ async fn manager_with_checkpoint_distance_1() {
let (env, manager) = build_manager(1, CompressionType::Uncompressed).await;
// apply 10 actions
for i in 0..10 {
for _ in 0..10 {
manager.update(nop_action()).await.unwrap();
}
@@ -194,7 +194,7 @@ async fn generate_checkpoint_with_compression_types(
compress_type: CompressionType,
actions: Vec<RegionMetaActionList>,
) -> RegionCheckpoint {
let (env, manager) = build_manager(1, CompressionType::Uncompressed).await;
let (_env, manager) = build_manager(1, compress_type).await;
for action in actions {
manager.update(action).await.unwrap();

View File

@@ -32,9 +32,6 @@ use crate::manifest::manager::RegionManifestManager;
use crate::region::version::{VersionControlRef, VersionRef};
use crate::sst::file_purger::FilePurgerRef;
/// Type to store region version.
pub type VersionNumber = u32;
/// Metadata and runtime status of a region.
///
/// Writing and reading a region follow a single-writer-multi-reader rule:

View File

@@ -105,7 +105,7 @@ impl LocalScheduler {
}
#[inline]
fn running(&self) -> bool {
fn is_running(&self) -> bool {
self.state.load(Ordering::Relaxed) == STATE_RUNNING
}
}
@@ -113,10 +113,7 @@ impl LocalScheduler {
#[async_trait::async_trait]
impl Scheduler for LocalScheduler {
fn schedule(&self, job: Job) -> Result<()> {
ensure!(
self.state.load(Ordering::Relaxed) == STATE_RUNNING,
InvalidSchedulerStateSnafu
);
ensure!(self.is_running(), InvalidSchedulerStateSnafu);
self.sender
.read()
@@ -129,10 +126,7 @@ impl Scheduler for LocalScheduler {
/// if await_termination is true, scheduler will wait all tasks finished before stopping
async fn stop(&self, await_termination: bool) -> Result<()> {
ensure!(
self.state.load(Ordering::Relaxed) == STATE_RUNNING,
InvalidSchedulerStateSnafu
);
ensure!(self.is_running(), InvalidSchedulerStateSnafu);
let state = if await_termination {
STATE_AWAIT_TERMINATION
} else {

View File

@@ -59,6 +59,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
region
.version_control
.apply_edit(edit, region.file_purger.clone());
region.update_flush_millis();
// Delete wal.
info!(