feat: table route for metric engine (#3053)

* feat: table route for metric engine

* feat: register logical regions

* fix: open logical region (#96)

---------

Co-authored-by: JeremyHi <jiachun_feng@proton.me>
This commit is contained in:
LFC
2024-01-04 14:30:17 +08:00
committed by GitHub
parent b025bed45c
commit ec43b9183d
20 changed files with 228 additions and 92 deletions

View File

@@ -272,6 +272,16 @@ pub enum Error {
location: Location,
source: BoxedError,
},
#[snafu(display(
"Failed to find logical regions in physical region {}",
physical_region_id
))]
FindLogicalRegions {
physical_region_id: RegionId,
source: metric_engine::error::Error,
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -340,6 +350,8 @@ impl ErrorExt for Error {
}
HandleRegionRequest { source, .. } => source.status_code(),
StopRegionEngine { source, .. } => source.status_code(),
FindLogicalRegions { source, .. } => source.status_code(),
}
}

View File

@@ -43,6 +43,7 @@ use datafusion_common::DataFusionError;
use datafusion_expr::{Expr as DfExpr, TableProviderFilterPushDown, TableType};
use datatypes::arrow::datatypes::SchemaRef;
use futures_util::future::try_join_all;
use metric_engine::engine::MetricEngine;
use prost::Message;
use query::QueryEngineRef;
use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult};
@@ -51,6 +52,7 @@ use servers::grpc::region_server::RegionServerHandler;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::{METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY};
use store_api::region_engine::{RegionEngineRef, RegionRole, SetReadonlyResponse};
use store_api::region_request::{AffectedRows, RegionCloseRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
@@ -60,8 +62,9 @@ use tonic::{Request, Response, Result as TonicResult};
use crate::error::{
self, BuildRegionRequestsSnafu, DecodeLogicalPlanSnafu, ExecuteLogicalPlanSnafu,
GetRegionMetadataSnafu, HandleRegionRequestSnafu, RegionEngineNotFoundSnafu,
RegionNotFoundSnafu, Result, StopRegionEngineSnafu, UnsupportedOutputSnafu,
FindLogicalRegionsSnafu, GetRegionMetadataSnafu, HandleRegionRequestSnafu,
RegionEngineNotFoundSnafu, RegionNotFoundSnafu, Result, StopRegionEngineSnafu, UnexpectedSnafu,
UnsupportedOutputSnafu,
};
use crate::event_listener::RegionServerEventListenerRef;
@@ -369,7 +372,7 @@ impl RegionServerInner {
let current_region_status = self.region_map.get(&region_id);
let engine = match region_change {
RegionChange::Register(ref engine_type) => match current_region_status {
RegionChange::Register(ref engine_type, _) => match current_region_status {
Some(status) => match status.clone() {
RegionEngineWithStatus::Registering(_) => {
return Ok(CurrentEngine::EarlyReturn(0))
@@ -427,8 +430,12 @@ impl RegionServerInner {
.start_timer();
let region_change = match &request {
RegionRequest::Create(create) => RegionChange::Register(create.engine.clone()),
RegionRequest::Open(open) => RegionChange::Register(open.engine.clone()),
RegionRequest::Create(create) => RegionChange::Register(create.engine.clone(), false),
RegionRequest::Open(open) => {
let is_opening_physical_region =
open.options.contains_key(PHYSICAL_TABLE_METADATA_KEY);
RegionChange::Register(open.engine.clone(), is_opening_physical_region)
}
RegionRequest::Close(_) | RegionRequest::Drop(_) => RegionChange::Deregisters,
RegionRequest::Put(_)
| RegionRequest::Delete(_)
@@ -460,7 +467,8 @@ impl RegionServerInner {
{
Ok(result) => {
// Sets corresponding region status to ready.
self.set_region_status_ready(region_id, engine, region_change);
self.set_region_status_ready(region_id, engine, region_change)
.await?;
Ok(result)
}
Err(err) => {
@@ -478,7 +486,7 @@ impl RegionServerInner {
region_change: &RegionChange,
) {
match region_change {
RegionChange::Register(_) => {
RegionChange::Register(_, _) => {
self.region_map.insert(
region_id,
RegionEngineWithStatus::Registering(engine.clone()),
@@ -497,7 +505,7 @@ impl RegionServerInner {
fn unset_region_status(&self, region_id: RegionId, region_change: RegionChange) {
match region_change {
RegionChange::None => {}
RegionChange::Register(_) | RegionChange::Deregisters => {
RegionChange::Register(_, _) | RegionChange::Deregisters => {
self.region_map
.remove(&region_id)
.map(|(id, engine)| engine.set_writable(id, false));
@@ -505,16 +513,20 @@ impl RegionServerInner {
}
}
fn set_region_status_ready(
async fn set_region_status_ready(
&self,
region_id: RegionId,
engine: RegionEngineRef,
region_change: RegionChange,
) {
) -> Result<()> {
let engine_type = engine.name();
match region_change {
RegionChange::None => {}
RegionChange::Register(_) => {
RegionChange::Register(_, is_opening_physical_region) => {
if is_opening_physical_region {
self.register_logical_regions(&engine, region_id).await?;
}
info!("Region {region_id} is registered to engine {engine_type}");
self.region_map
.insert(region_id, RegionEngineWithStatus::Ready(engine));
@@ -528,6 +540,37 @@ impl RegionServerInner {
self.event_listener.on_region_deregistered(region_id);
}
}
Ok(())
}
async fn register_logical_regions(
&self,
engine: &RegionEngineRef,
physical_region_id: RegionId,
) -> Result<()> {
let metric_engine =
engine
.as_any()
.downcast_ref::<MetricEngine>()
.context(UnexpectedSnafu {
violated: format!(
"expecting engine type '{}', actual '{}'",
METRIC_ENGINE_NAME,
engine.name(),
),
})?;
let logical_regions = metric_engine
.logical_regions(physical_region_id)
.await
.context(FindLogicalRegionsSnafu { physical_region_id })?;
for region in logical_regions {
self.region_map
.insert(region, RegionEngineWithStatus::Ready(engine.clone()));
info!("Logical region {} is registered!", region);
}
Ok(())
}
pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
@@ -622,7 +665,7 @@ impl RegionServerInner {
enum RegionChange {
None,
Register(String),
Register(String, bool),
Deregisters,
}
@@ -1051,7 +1094,7 @@ mod tests {
CurrentEngineTest {
region_id,
current_region_status: None,
region_change: RegionChange::Register(engine.name().to_string()),
region_change: RegionChange::Register(engine.name().to_string(), false),
assert: Box::new(|result| {
let current_engine = result.unwrap();
assert_matches!(current_engine, CurrentEngine::Engine(_));
@@ -1060,7 +1103,7 @@ mod tests {
CurrentEngineTest {
region_id,
current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
region_change: RegionChange::Register(engine.name().to_string()),
region_change: RegionChange::Register(engine.name().to_string(), false),
assert: Box::new(|result| {
let current_engine = result.unwrap();
assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
@@ -1069,7 +1112,7 @@ mod tests {
CurrentEngineTest {
region_id,
current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
region_change: RegionChange::Register(engine.name().to_string()),
region_change: RegionChange::Register(engine.name().to_string(), false),
assert: Box::new(|result| {
let err = result.unwrap_err();
assert_eq!(err.status_code(), StatusCode::RegionBusy);
@@ -1078,7 +1121,7 @@ mod tests {
CurrentEngineTest {
region_id,
current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
region_change: RegionChange::Register(engine.name().to_string()),
region_change: RegionChange::Register(engine.name().to_string(), false),
assert: Box::new(|result| {
let current_engine = result.unwrap();
assert_matches!(current_engine, CurrentEngine::Engine(_));

View File

@@ -207,4 +207,8 @@ impl RegionEngine for MockRegionEngine {
}
Some(RegionRole::Leader)
}
fn as_any(&self) -> &dyn Any {
self
}
}