refactor(mito): mv mito2 request (#2086)

* refactor: mv request mod to crate level

* refactor: mv SkippedFields
This commit is contained in:
Yingwen
2023-08-03 12:38:46 +09:00
committed by GitHub
parent 90b2200cc8
commit fdd4929c8f
10 changed files with 54 additions and 55 deletions

View File

@@ -26,8 +26,8 @@ use store_api::storage::RegionId;
use crate::config::MitoConfig;
use crate::error::{RecvSnafu, Result};
pub use crate::worker::request::CreateRequest;
use crate::worker::request::{CloseRequest, OpenRequest, RegionRequest, RequestBody};
pub use crate::request::CreateRequest;
use crate::request::{CloseRequest, OpenRequest, RegionRequest, RequestBody};
use crate::worker::WorkerGroup;
/// Region engine implementation for timeseries data.

View File

@@ -18,8 +18,8 @@ use store_api::storage::RegionId;
use super::*;
use crate::error::Error;
use crate::request::RegionOptions;
use crate::test_util::{CreateRequestBuilder, TestEnv};
use crate::worker::request::RegionOptions;
#[tokio::test]
async fn test_engine_new_stop() {

View File

@@ -35,6 +35,8 @@ pub mod read;
#[allow(dead_code)]
mod region;
#[allow(dead_code)]
pub mod request;
#[allow(dead_code)]
pub mod sst;
#[allow(dead_code)]
mod worker;

View File

@@ -119,53 +119,7 @@ impl RegionMetadata {
pub fn to_json(&self) -> Result<String> {
serde_json::to_string(&self).context(SerdeJsonSnafu)
}
}
/// Fields skipped in serialization.
struct SkippedFields {
/// Last schema.
schema: SchemaRef,
/// Id of the time index column.
time_index: ColumnId,
/// Map column id to column's index in [column_metadatas](RegionMetadata::column_metadatas).
id_to_index: HashMap<ColumnId, usize>,
}
impl SkippedFields {
/// Constructs skipped fields from `column_metadatas`.
fn new(column_metadatas: &[ColumnMetadata]) -> Result<SkippedFields> {
let column_schemas = column_metadatas
.iter()
.map(|column_metadata| column_metadata.column_schema.clone())
.collect();
let schema = Arc::new(Schema::try_new(column_schemas).context(InvalidSchemaSnafu)?);
let time_index = column_metadatas
.iter()
.find_map(|col| {
if col.semantic_type == SemanticType::Timestamp {
Some(col.column_id)
} else {
None
}
})
.context(InvalidMetaSnafu {
reason: "time index not found",
})?;
let id_to_index = column_metadatas
.iter()
.enumerate()
.map(|(idx, col)| (col.column_id, idx))
.collect();
Ok(SkippedFields {
schema,
time_index,
id_to_index,
})
}
}
impl RegionMetadata {
/// Find column by id.
pub(crate) fn column_by_id(&self, column_id: ColumnId) -> Option<&ColumnMetadata> {
self.id_to_index
@@ -366,6 +320,50 @@ pub enum SemanticType {
Timestamp,
}
/// Fields skipped in serialization.
struct SkippedFields {
/// Last schema.
schema: SchemaRef,
/// Id of the time index column.
time_index: ColumnId,
/// Map column id to column's index in [column_metadatas](RegionMetadata::column_metadatas).
id_to_index: HashMap<ColumnId, usize>,
}
impl SkippedFields {
/// Constructs skipped fields from `column_metadatas`.
fn new(column_metadatas: &[ColumnMetadata]) -> Result<SkippedFields> {
let column_schemas = column_metadatas
.iter()
.map(|column_metadata| column_metadata.column_schema.clone())
.collect();
let schema = Arc::new(Schema::try_new(column_schemas).context(InvalidSchemaSnafu)?);
let time_index = column_metadatas
.iter()
.find_map(|col| {
if col.semantic_type == SemanticType::Timestamp {
Some(col.column_id)
} else {
None
}
})
.context(InvalidMetaSnafu {
reason: "time index not found",
})?;
let id_to_index = column_metadatas
.iter()
.enumerate()
.map(|(idx, col)| (col.column_id, idx))
.collect();
Ok(SkippedFields {
schema,
time_index,
id_to_index,
})
}
}
#[cfg(test)]
mod test {
use datatypes::prelude::ConcreteDataType;

View File

@@ -33,7 +33,7 @@ use crate::error::Result;
use crate::manifest::manager::RegionManifestManager;
use crate::manifest::options::RegionManifestOptions;
use crate::metadata::{ColumnMetadata, RegionMetadataRef, SemanticType};
use crate::worker::request::{CreateRequest, RegionOptions};
use crate::request::{CreateRequest, RegionOptions};
use crate::worker::WorkerGroup;
/// Env to test mito engine.

View File

@@ -17,7 +17,6 @@
mod handle_close;
mod handle_create;
mod handle_open;
pub(crate) mod request;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
@@ -38,7 +37,7 @@ use crate::config::MitoConfig;
use crate::error::{JoinSnafu, Result, WorkerStoppedSnafu};
use crate::memtable::{DefaultMemtableBuilder, MemtableBuilderRef};
use crate::region::{RegionMap, RegionMapRef};
use crate::worker::request::{RegionRequest, RequestBody, WorkerRequest};
use crate::request::{RegionRequest, RequestBody, WorkerRequest};
/// Identifier for a worker.
pub(crate) type WorkerId = u32;

View File

@@ -17,7 +17,7 @@
use common_telemetry::info;
use crate::error::Result;
use crate::worker::request::CloseRequest;
use crate::request::CloseRequest;
use crate::worker::RegionWorkerLoop;
impl<S> RegionWorkerLoop<S> {

View File

@@ -22,7 +22,7 @@ use snafu::ensure;
use crate::error::{RegionExistsSnafu, Result};
use crate::metadata::{RegionMetadataBuilder, INIT_REGION_VERSION};
use crate::region::opener::RegionOpener;
use crate::worker::request::CreateRequest;
use crate::request::CreateRequest;
use crate::worker::RegionWorkerLoop;
impl<S> RegionWorkerLoop<S> {

View File

@@ -20,7 +20,7 @@ use common_telemetry::info;
use crate::error::Result;
use crate::region::opener::RegionOpener;
use crate::worker::request::OpenRequest;
use crate::request::OpenRequest;
use crate::worker::RegionWorkerLoop;
impl<S> RegionWorkerLoop<S> {