refactor: remove useless region follower legacy code (#5787)

chore: remove region follower procedure
This commit is contained in:
Weny Xu
2025-03-27 19:50:29 +08:00
committed by GitHub
parent 09ef24fd75
commit c2ba7fb16c
7 changed files with 0 additions and 831 deletions

View File

@@ -787,41 +787,6 @@ pub enum Error {
location: Location,
source: common_meta::error::Error,
},
#[snafu(display("Logical table cannot add follower: {table_id}"))]
LogicalTableCannotAddFollower {
table_id: TableId,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"A region follower cannot be placed on the same node as the leader: {region_id}, {peer_id}"
))]
RegionFollowerLeaderConflict {
region_id: RegionId,
peer_id: u64,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Multiple region followers cannot be placed on the same node: {region_id}, {peer_id}"
))]
MultipleRegionFollowersOnSameNode {
region_id: RegionId,
peer_id: u64,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Region follower not exists: {region_id}, {peer_id}"))]
RegionFollowerNotExists {
region_id: RegionId,
peer_id: u64,
#[snafu(implicit)]
location: Location,
},
}
impl Error {
@@ -891,10 +856,6 @@ impl ErrorExt for Error {
| Error::ProcedureNotFound { .. }
| Error::TooManyPartitions { .. }
| Error::TomlFormat { .. }
| Error::LogicalTableCannotAddFollower { .. }
| Error::RegionFollowerLeaderConflict { .. }
| Error::MultipleRegionFollowersOnSameNode { .. }
| Error::RegionFollowerNotExists { .. }
| Error::HandlerNotFound { .. } => StatusCode::InvalidArguments,
Error::LeaseKeyFromUtf8 { .. }
| Error::LeaseValueFromUtf8 { .. }

View File

@@ -55,8 +55,6 @@ use crate::lease::MetaPeerLookupService;
use crate::metasrv::{
ElectionRef, Metasrv, MetasrvInfo, MetasrvOptions, SelectorContext, SelectorRef, TABLE_ID_SEQ,
};
use crate::procedure::region_follower::manager::RegionFollowerManager;
use crate::procedure::region_follower::Context as ArfContext;
use crate::procedure::region_migration::manager::RegionMigrationManager;
use crate::procedure::region_migration::DefaultContextFactory;
use crate::region::supervisor::{
@@ -345,19 +343,6 @@ impl MetasrvBuilder {
.context(error::InitDdlManagerSnafu)?,
);
// alter region follower manager
let region_follower_manager = Arc::new(RegionFollowerManager::new(
procedure_manager.clone(),
ArfContext {
table_metadata_manager: table_metadata_manager.clone(),
mailbox: mailbox.clone(),
server_addr: options.server_addr.clone(),
cache_invalidator: cache_invalidator.clone(),
meta_peer_client: meta_peer_client.clone(),
},
));
region_follower_manager.try_start()?;
let handler_group_builder = match handler_group_builder {
Some(handler_group_builder) => handler_group_builder,
None => {

View File

@@ -18,7 +18,6 @@ use common_meta::leadership_notifier::LeadershipChangeListener;
use common_procedure::ProcedureManagerRef;
use snafu::ResultExt;
pub mod region_follower;
pub mod region_migration;
#[cfg(test)]
mod test_util;

View File

@@ -1,252 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::{Duration, Instant};
use api::v1::meta::MailboxMessage;
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
use common_meta::key::datanode_table::RegionInfo;
use common_meta::peer::Peer;
use common_meta::RegionIdent;
use common_telemetry::info;
use snafu::ResultExt;
use store_api::storage::RegionId;
use super::Context;
use crate::error::{self, Result};
use crate::handler::HeartbeatMailbox;
use crate::service::mailbox::Channel;
/// Uses lease time of a region as the timeout of opening a follower region.
const OPEN_REGION_FOLLOWER_TIMEOUT: Duration = Duration::from_secs(REGION_LEASE_SECS);
pub(crate) struct CreateFollower {
region_id: RegionId,
// The peer of the datanode to add region follower.
peer: Peer,
}
impl CreateFollower {
pub fn new(region_id: RegionId, peer: Peer) -> Self {
Self { region_id, peer }
}
/// Builds the open region instruction for the region follower.
pub(crate) async fn build_open_region_instruction(
&self,
region_info: RegionInfo,
) -> Result<Instruction> {
let datanode_id = self.peer.id;
let table_id = self.region_id.table_id();
let region_number = self.region_id.region_number();
let RegionInfo {
region_storage_path,
region_options,
region_wal_options,
engine,
} = region_info;
let region_ident = RegionIdent {
datanode_id,
table_id,
region_number,
engine,
};
let open_instruction = Instruction::OpenRegion(OpenRegion::new(
region_ident,
&region_storage_path,
region_options,
region_wal_options,
true,
));
Ok(open_instruction)
}
/// Sends the open region instruction to the datanode.
pub(crate) async fn send_open_region_instruction(
&self,
ctx: &Context,
instruction: Instruction,
) -> Result<()> {
// TODO(jeremy): register the opening_region_keeper
let msg = MailboxMessage::json_message(
&format!("Open a follower region: {}", self.region_id),
&format!("Metasrv@{}", ctx.server_addr),
&format!("Datanode-{}@{}", self.peer.id, self.peer.addr),
common_time::util::current_time_millis(),
&instruction,
)
.with_context(|_| error::SerializeToJsonSnafu {
input: instruction.to_string(),
})?;
let ch = Channel::Datanode(self.peer.id);
let now = Instant::now();
let receiver = ctx
.mailbox
.send(&ch, msg, OPEN_REGION_FOLLOWER_TIMEOUT)
.await?;
match receiver.await? {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!(
"Received open region follower reply: {:?}, region: {}, elapsed: {:?}",
reply,
self.region_id,
now.elapsed()
);
let InstructionReply::OpenRegion(SimpleReply { result, error }) = reply else {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: "expect open region follower reply",
}
.fail();
};
if result {
Ok(())
} else {
error::RetryLaterSnafu {
reason: format!(
"Region {} is not opened by datanode {:?}, error: {error:?}, elapsed: {:?}",
self.region_id,
&self.peer,
now.elapsed()
),
}
.fail()
}
}
Err(error::Error::MailboxTimeout { .. }) => {
let reason = format!(
"Mailbox received timeout for open region follower {} on datanode {:?}, elapsed: {:?}",
self.region_id,
&self.peer,
now.elapsed()
);
error::RetryLaterSnafu { reason }.fail()
}
Err(e) => Err(e),
}
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use common_meta::DatanodeId;
use super::*;
use crate::error::Error;
use crate::procedure::region_follower::test_util::TestingEnv;
use crate::procedure::test_util::{new_close_region_reply, send_mock_reply};
#[tokio::test]
async fn test_datanode_is_unreachable() {
let env = TestingEnv::new();
let ctx = env.new_context();
let region_id = RegionId::new(1, 1);
let peer = Peer::new(1, "127.0.0.1:8080");
let create_follower = CreateFollower::new(region_id, peer.clone());
let instruction = mock_open_region_instruction(peer.id, region_id);
let err = create_follower
.send_open_region_instruction(&ctx, instruction)
.await
.unwrap_err();
assert_matches!(err, Error::PusherNotFound { .. });
assert!(!err.is_retryable());
}
#[tokio::test]
async fn test_unexpected_instruction_reply() {
let mut env = TestingEnv::new();
let ctx = env.new_context();
let mailbox_ctx = env.mailbox_context_mut();
let mailbox = mailbox_ctx.mailbox().clone();
let region_id = RegionId::new(1, 1);
let peer = Peer::new(1, "127.0.0.1:8080");
let (tx, rx) = tokio::sync::mpsc::channel(1);
mailbox_ctx
.insert_heartbeat_response_receiver(Channel::Datanode(peer.id), tx)
.await;
// Sends an timeout error.
send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
let create_follower = CreateFollower::new(region_id, peer.clone());
let instruction = mock_open_region_instruction(peer.id, region_id);
let err = create_follower
.send_open_region_instruction(&ctx, instruction)
.await
.unwrap_err();
assert_matches!(err, Error::UnexpectedInstructionReply { .. });
assert!(!err.is_retryable());
}
#[tokio::test]
async fn test_instruction_exceeded_deadline() {
let mut env = TestingEnv::new();
let ctx = env.new_context();
let mailbox_ctx = env.mailbox_context_mut();
let mailbox = mailbox_ctx.mailbox().clone();
let region_id = RegionId::new(1, 1);
let peer = Peer::new(1, "127.0.0.1:8080");
let (tx, rx) = tokio::sync::mpsc::channel(1);
mailbox_ctx
.insert_heartbeat_response_receiver(Channel::Datanode(peer.id), tx)
.await;
// Sends an timeout error.
send_mock_reply(mailbox, rx, |id| {
Err(error::MailboxTimeoutSnafu { id }.build())
});
let create_follower = CreateFollower::new(region_id, peer.clone());
let instruction = mock_open_region_instruction(peer.id, region_id);
let err = create_follower
.send_open_region_instruction(&ctx, instruction)
.await
.unwrap_err();
assert_matches!(err, Error::RetryLater { .. });
assert!(err.is_retryable());
}
fn mock_open_region_instruction(datanode_id: DatanodeId, region_id: RegionId) -> Instruction {
Instruction::OpenRegion(OpenRegion {
region_ident: RegionIdent {
datanode_id,
table_id: region_id.table_id(),
region_number: region_id.region_number(),
engine: "mito2".to_string(),
},
region_storage_path: "/tmp".to_string(),
region_options: Default::default(),
region_wal_options: Default::default(),
skip_wal_replay: true,
})
}
}

View File

@@ -1,193 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::rpc::procedure::{AddRegionFollowerRequest, RemoveRegionFollowerRequest};
use common_procedure::{watcher, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId};
use common_telemetry::info;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use table::table_name::TableName;
use super::remove_region_follower::RemoveRegionFollowerProcedure;
use crate::error::{self, Result};
use crate::procedure::region_follower::add_region_follower::AddRegionFollowerProcedure;
use crate::procedure::region_follower::Context;
pub struct RegionFollowerManager {
procedure_manager: ProcedureManagerRef,
default_context: Context,
}
impl RegionFollowerManager {
pub fn new(procedure_manager: ProcedureManagerRef, default_context: Context) -> Self {
Self {
procedure_manager,
default_context,
}
}
pub fn new_context(&self) -> Context {
self.default_context.clone()
}
pub(crate) fn try_start(&self) -> Result<()> {
// register add region follower procedure
let context = self.new_context();
let type_name = AddRegionFollowerProcedure::TYPE_NAME;
self.procedure_manager
.register_loader(
type_name,
Box::new(move |json| {
let context = context.clone();
AddRegionFollowerProcedure::from_json(json, context).map(|p| Box::new(p) as _)
}),
)
.context(error::RegisterProcedureLoaderSnafu { type_name })?;
// register remove region follower procedure
let context = self.new_context();
let type_name = RemoveRegionFollowerProcedure::TYPE_NAME;
self.procedure_manager
.register_loader(
type_name,
Box::new(move |json| {
let context = context.clone();
RemoveRegionFollowerProcedure::from_json(json, context)
.map(|p| Box::new(p) as _)
}),
)
.context(error::RegisterProcedureLoaderSnafu { type_name })?;
Ok(())
}
pub async fn submit_add_follower_procedure(
&self,
req: AddRegionFollowerRequest,
) -> Result<(ProcedureId, Option<Output>)> {
let AddRegionFollowerRequest { region_id, peer_id } = req;
let region_id = RegionId::from_u64(region_id);
let table_id = region_id.table_id();
let ctx = self.new_context();
// get the table info
let table_info = ctx
.table_metadata_manager
.table_info_manager()
.get(table_id)
.await
.context(error::TableMetadataManagerSnafu)?
.context(error::TableInfoNotFoundSnafu { table_id })?
.into_inner();
let TableName {
catalog_name,
schema_name,
..
} = table_info.table_name();
let procedure =
AddRegionFollowerProcedure::new(catalog_name, schema_name, region_id, peer_id, ctx);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
let procedure_id = procedure_with_id.id;
info!("Starting add region follower procedure {procedure_id} for {req:?}");
let mut watcher = self
.procedure_manager
.submit(procedure_with_id)
.await
.context(error::SubmitProcedureSnafu)?;
let output = watcher::wait(&mut watcher)
.await
.context(error::WaitProcedureSnafu)?;
Ok((procedure_id, output))
}
pub async fn submit_remove_follower_procedure(
&self,
req: RemoveRegionFollowerRequest,
) -> Result<(ProcedureId, Option<Output>)> {
let RemoveRegionFollowerRequest { region_id, peer_id } = req;
let region_id = RegionId::from_u64(region_id);
let table_id = region_id.table_id();
let ctx = self.new_context();
// get the table info
let table_info = ctx
.table_metadata_manager
.table_info_manager()
.get(table_id)
.await
.context(error::TableMetadataManagerSnafu)?
.context(error::TableInfoNotFoundSnafu { table_id })?
.into_inner();
let TableName {
catalog_name,
schema_name,
..
} = table_info.table_name();
let procedure =
RemoveRegionFollowerProcedure::new(catalog_name, schema_name, region_id, peer_id, ctx);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
let procedure_id = procedure_with_id.id;
info!("Starting remove region follower procedure {procedure_id} for {req:?}");
let mut watcher = self
.procedure_manager
.submit(procedure_with_id)
.await
.context(error::SubmitProcedureSnafu)?;
let output = watcher::wait(&mut watcher)
.await
.context(error::WaitProcedureSnafu)?;
Ok((procedure_id, output))
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use super::*;
use crate::procedure::region_follower::test_util::TestingEnv;
#[tokio::test]
async fn test_submit_procedure_table_not_found() {
let env = TestingEnv::new();
let ctx = env.new_context();
let region_follower_manager = RegionFollowerManager::new(env.procedure_manager(), ctx);
let req = AddRegionFollowerRequest {
region_id: 1,
peer_id: 2,
};
let err = region_follower_manager
.submit_add_follower_procedure(req)
.await
.unwrap_err();
assert_matches!(err, error::Error::TableInfoNotFound { .. });
let req = RemoveRegionFollowerRequest {
region_id: 1,
peer_id: 2,
};
let err = region_follower_manager
.submit_remove_follower_procedure(req)
.await
.unwrap_err();
assert_matches!(err, error::Error::TableInfoNotFound { .. });
}
}

View File

@@ -1,234 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::{Duration, Instant};
use api::v1::meta::MailboxMessage;
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_meta::key::datanode_table::RegionInfo;
use common_meta::peer::Peer;
use common_meta::RegionIdent;
use common_telemetry::info;
use snafu::ResultExt;
use store_api::storage::RegionId;
use super::Context;
use crate::error::{self, Result};
use crate::handler::HeartbeatMailbox;
use crate::service::mailbox::Channel;
/// Uses lease time of a region as the timeout of closing a follower region.
const CLOSE_REGION_FOLLOWER_TIMEOUT: Duration = Duration::from_secs(REGION_LEASE_SECS);
pub(crate) struct RemoveFollower {
region_id: RegionId,
// The peer of the datanode to add region follower.
peer: Peer,
}
impl RemoveFollower {
pub fn new(region_id: RegionId, peer: Peer) -> Self {
Self { region_id, peer }
}
/// Builds the close region instruction for the region follower.
pub(crate) async fn build_close_region_instruction(
&self,
region_info: RegionInfo,
) -> Result<Instruction> {
let datanode_id = self.peer.id;
let table_id = self.region_id.table_id();
let region_number = self.region_id.region_number();
let RegionInfo { engine, .. } = region_info;
let region_ident = RegionIdent {
datanode_id,
table_id,
region_number,
engine,
};
let close_instruction = Instruction::CloseRegion(region_ident);
Ok(close_instruction)
}
/// Sends the close region instruction to the datanode.
pub(crate) async fn send_close_region_instruction(
&self,
ctx: &Context,
instruction: Instruction,
) -> Result<()> {
let msg = MailboxMessage::json_message(
&format!("Close a follower region: {}", self.region_id),
&format!("Metasrv@{}", ctx.server_addr),
&format!("Datanode-{}@{}", self.peer.id, self.peer.addr),
common_time::util::current_time_millis(),
&instruction,
)
.with_context(|_| error::SerializeToJsonSnafu {
input: instruction.to_string(),
})?;
let ch = Channel::Datanode(self.peer.id);
let now = Instant::now();
let receiver = ctx
.mailbox
.send(&ch, msg, CLOSE_REGION_FOLLOWER_TIMEOUT)
.await?;
match receiver.await? {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!(
"Received close region follower reply: {:?}, region: {}, elapsed: {:?}",
reply,
self.region_id,
now.elapsed()
);
let InstructionReply::CloseRegion(SimpleReply { result, error }) = reply else {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: "expect close region follower reply",
}
.fail();
};
if result {
Ok(())
} else {
error::RetryLaterSnafu {
reason: format!(
"Region {} is not closed by datanode {:?}, error: {error:?}, elapsed: {:?}",
self.region_id,
&self.peer,
now.elapsed()
),
}
.fail()
}
}
Err(error::Error::MailboxTimeout { .. }) => {
let reason = format!(
"Mailbox received timeout for close region follower {} on datanode {:?}, elapsed: {:?}",
self.region_id,
&self.peer,
now.elapsed()
);
error::RetryLaterSnafu { reason }.fail()
}
Err(e) => Err(e),
}
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use common_meta::DatanodeId;
use super::*;
use crate::error::Error;
use crate::procedure::region_follower::test_util::TestingEnv;
use crate::procedure::test_util::{new_open_region_reply, send_mock_reply};
#[tokio::test]
async fn test_datanode_is_unreachable() {
let env = TestingEnv::new();
let ctx = env.new_context();
let region_id = RegionId::new(1, 1);
let peer = Peer::new(1, "127.0.0.1:8080");
let remove_follower = RemoveFollower::new(region_id, peer.clone());
let instruction = mock_close_region_instruction(peer.id, region_id);
let err = remove_follower
.send_close_region_instruction(&ctx, instruction)
.await
.unwrap_err();
assert_matches!(err, Error::PusherNotFound { .. });
assert!(!err.is_retryable());
}
#[tokio::test]
async fn test_unexpected_instruction_reply() {
let mut env = TestingEnv::new();
let ctx = env.new_context();
let mailbox_ctx = env.mailbox_context_mut();
let mailbox = mailbox_ctx.mailbox().clone();
let region_id = RegionId::new(1, 1);
let peer = Peer::new(1, "127.0.0.1:8080");
let (tx, rx) = tokio::sync::mpsc::channel(1);
mailbox_ctx
.insert_heartbeat_response_receiver(Channel::Datanode(peer.id), tx)
.await;
// Sends an timeout error.
send_mock_reply(mailbox, rx, |id| Ok(new_open_region_reply(id, false, None)));
let remove_follower = RemoveFollower::new(region_id, peer.clone());
let instruction = mock_close_region_instruction(peer.id, region_id);
let err = remove_follower
.send_close_region_instruction(&ctx, instruction)
.await
.unwrap_err();
assert_matches!(err, Error::UnexpectedInstructionReply { .. });
assert!(!err.is_retryable());
}
#[tokio::test]
async fn test_instruction_exceeded_deadline() {
let mut env = TestingEnv::new();
let ctx = env.new_context();
let mailbox_ctx = env.mailbox_context_mut();
let mailbox = mailbox_ctx.mailbox().clone();
let region_id = RegionId::new(1, 1);
let peer = Peer::new(1, "127.0.0.1:8080");
let (tx, rx) = tokio::sync::mpsc::channel(1);
mailbox_ctx
.insert_heartbeat_response_receiver(Channel::Datanode(peer.id), tx)
.await;
// Sends an timeout error.
send_mock_reply(mailbox, rx, |id| {
Err(error::MailboxTimeoutSnafu { id }.build())
});
let remove_follower = RemoveFollower::new(region_id, peer.clone());
let instruction = mock_close_region_instruction(peer.id, region_id);
let err = remove_follower
.send_close_region_instruction(&ctx, instruction)
.await
.unwrap_err();
assert_matches!(err, Error::RetryLater { .. });
assert!(err.is_retryable());
}
fn mock_close_region_instruction(datanode_id: DatanodeId, region_id: RegionId) -> Instruction {
Instruction::CloseRegion(RegionIdent {
datanode_id,
table_id: region_id.table_id(),
region_number: region_id.region_number(),
engine: "mito2".to_string(),
})
}
}

View File

@@ -1,97 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::ResettableKvBackendRef;
use common_meta::sequence::SequenceBuilder;
use common_meta::state_store::KvStateStore;
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::ProcedureManagerRef;
use super::Context;
use crate::cache_invalidator::MetasrvCacheInvalidator;
use crate::cluster::MetaPeerClientBuilder;
use crate::metasrv::MetasrvInfo;
use crate::procedure::test_util::MailboxContext;
/// `TestingEnv` provides components during the tests.
pub struct TestingEnv {
table_metadata_manager: TableMetadataManagerRef,
mailbox_ctx: MailboxContext,
server_addr: String,
procedure_manager: ProcedureManagerRef,
in_memory: ResettableKvBackendRef,
}
impl Default for TestingEnv {
fn default() -> Self {
Self::new()
}
}
impl TestingEnv {
pub fn new() -> Self {
let kv_backend = Arc::new(MemoryKvBackend::new());
let in_memory = Arc::new(MemoryKvBackend::new());
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
let mailbox_sequence =
SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build();
let mailbox_ctx = MailboxContext::new(mailbox_sequence);
let state_store = Arc::new(KvStateStore::new(kv_backend));
let procedure_manager = Arc::new(LocalManager::new(ManagerConfig::default(), state_store));
Self {
table_metadata_manager,
mailbox_ctx,
server_addr: "localhost".to_string(),
procedure_manager,
in_memory,
}
}
pub fn new_context(&self) -> Context {
Context {
table_metadata_manager: self.table_metadata_manager.clone(),
mailbox: self.mailbox_ctx.mailbox().clone(),
server_addr: self.server_addr.clone(),
cache_invalidator: Arc::new(MetasrvCacheInvalidator::new(
self.mailbox_ctx.mailbox().clone(),
MetasrvInfo {
server_addr: self.server_addr.to_string(),
},
)),
meta_peer_client: MetaPeerClientBuilder::default()
.election(None)
.in_memory(self.in_memory.clone())
.build()
.map(Arc::new)
// Safety: all required fields set at initialization
.unwrap(),
}
}
pub fn mailbox_context_mut(&mut self) -> &mut MailboxContext {
&mut self.mailbox_ctx
}
pub fn procedure_manager(&self) -> ProcedureManagerRef {
self.procedure_manager.clone()
}
}