feat: heartbeat task&peer lookup in proc (#4179)

* feat: herat beat task

* feat: use real flow peer allocator when building

* feat: add peer look up in ddl context

* fix: drop flow test

* refactor: per review(WIP)

* refactor: not check if is alive

* refactor: per review

* refactor: remove useless `reset`

* refactor: per bot advices

* refactor: alive peer

* chore: bot review
This commit is contained in:
discord9
2024-06-24 23:06:33 +08:00
committed by GitHub
parent 77904adaaf
commit a44fe627ce
20 changed files with 459 additions and 27 deletions

1
Cargo.lock generated
View File

@@ -3917,6 +3917,7 @@ dependencies = [
"greptime-proto",
"hydroflow",
"itertools 0.10.5",
"meta-client",
"minstant",
"nom",
"num-traits",

View File

@@ -31,6 +31,7 @@ use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::node_manager::NodeManagerRef;
use common_meta::peer::StandalonePeerLookupService;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal_options_allocator::{WalOptionsAllocator, WalOptionsAllocatorRef};
@@ -557,6 +558,7 @@ impl StartCommand {
table_metadata_allocator,
flow_metadata_manager,
flow_metadata_allocator,
peer_lookup_service: Arc::new(StandalonePeerLookupService::new()),
},
procedure_manager,
true,

View File

@@ -26,6 +26,7 @@ use crate::key::flow::FlowMetadataManagerRef;
use crate::key::table_route::PhysicalTableRouteValue;
use crate::key::TableMetadataManagerRef;
use crate::node_manager::NodeManagerRef;
use crate::peer::PeerLookupServiceRef;
use crate::region_keeper::MemoryRegionKeeperRef;
use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
@@ -118,4 +119,6 @@ pub struct DdlContext {
pub flow_metadata_manager: FlowMetadataManagerRef,
/// Allocator for flow metadata.
pub flow_metadata_allocator: FlowMetadataAllocatorRef,
/// look up peer by id.
pub peer_lookup_service: PeerLookupServiceRef,
}

View File

@@ -23,10 +23,11 @@ impl CreateFlowProcedure {
pub(crate) async fn allocate_flow_id(&mut self) -> Result<()> {
//TODO(weny, ruihang): We doesn't support the partitions. It's always be 1, now.
let partitions = 1;
let cluster_id = self.data.cluster_id;
let (flow_id, peers) = self
.context
.flow_metadata_allocator
.create(partitions)
.create(cluster_id, partitions)
.await?;
self.data.flow_id = Some(flow_id);
self.data.peers = peers;

View File

@@ -25,18 +25,17 @@ use common_procedure::{
use common_telemetry::info;
use futures::future::join_all;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use strum::AsRefStr;
use super::utils::{add_peer_context_if_needed, handle_retry_error};
use crate::cache_invalidator::Context;
use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::error::{self, Result, UnexpectedSnafu};
use crate::flow_name::FlowName;
use crate::instruction::{CacheIdent, DropFlow};
use crate::key::flow::flow_info::FlowInfoValue;
use crate::lock_key::{CatalogLock, FlowLock};
use crate::peer::Peer;
use crate::rpc::ddl::DropFlowTask;
use crate::{metrics, ClusterId};
@@ -103,10 +102,17 @@ impl DropFlowProcedure {
let flownode_ids = &self.data.flow_info_value.as_ref().unwrap().flownode_ids;
let flow_id = self.data.task.flow_id;
let mut drop_flow_tasks = Vec::with_capacity(flownode_ids.len());
let cluster_id = self.data.cluster_id;
for flownode in flownode_ids.values() {
// TODO(weny): use the real peer.
let peer = Peer::new(*flownode, "");
let peer = self
.context
.peer_lookup_service
.flownode(cluster_id, *flownode)
.await?
.with_context(|| UnexpectedSnafu {
err_msg: "Attempted to drop flow on a node that could not be found. Consider verifying node availability.",
})?;
let requester = self.context.node_manager.flownode(&peer).await;
let request = FlowRequest {
body: Some(flow_request::Body::Drop(DropRequest {

View File

@@ -20,6 +20,7 @@ use crate::error::Result;
use crate::key::FlowId;
use crate::peer::Peer;
use crate::sequence::SequenceRef;
use crate::ClusterId;
/// The reference of [FlowMetadataAllocator].
pub type FlowMetadataAllocatorRef = Arc<FlowMetadataAllocator>;
@@ -42,6 +43,16 @@ impl FlowMetadataAllocator {
}
}
pub fn with_peer_allocator(
flow_id_sequence: SequenceRef,
peer_allocator: Arc<dyn PartitionPeerAllocator>,
) -> Self {
Self {
flow_id_sequence,
partition_peer_allocator: peer_allocator,
}
}
/// Allocates a the [FlowId].
pub(crate) async fn allocate_flow_id(&self) -> Result<FlowId> {
let flow_id = self.flow_id_sequence.next().await? as FlowId;
@@ -49,9 +60,16 @@ impl FlowMetadataAllocator {
}
/// Allocates the [FlowId] and [Peer]s.
pub async fn create(&self, partitions: usize) -> Result<(FlowId, Vec<Peer>)> {
pub async fn create(
&self,
cluster_id: ClusterId,
partitions: usize,
) -> Result<(FlowId, Vec<Peer>)> {
let flow_id = self.allocate_flow_id().await?;
let peers = self.partition_peer_allocator.alloc(partitions).await?;
let peers = self
.partition_peer_allocator
.alloc(cluster_id, partitions)
.await?;
Ok((flow_id, peers))
}
@@ -61,7 +79,7 @@ impl FlowMetadataAllocator {
#[async_trait]
pub trait PartitionPeerAllocator: Send + Sync {
/// Allocates [Peer] nodes for storing partitions.
async fn alloc(&self, partitions: usize) -> Result<Vec<Peer>>;
async fn alloc(&self, cluster_id: ClusterId, partitions: usize) -> Result<Vec<Peer>>;
}
/// [PartitionPeerAllocatorRef] allocates [Peer]s for partitions.
@@ -71,7 +89,7 @@ struct NoopPartitionPeerAllocator;
#[async_trait]
impl PartitionPeerAllocator for NoopPartitionPeerAllocator {
async fn alloc(&self, partitions: usize) -> Result<Vec<Peer>> {
async fn alloc(&self, _cluster_id: ClusterId, partitions: usize) -> Result<Vec<Peer>> {
Ok(vec![Peer::default(); partitions])
}
}

View File

@@ -810,7 +810,7 @@ mod tests {
use crate::key::TableMetadataManager;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::node_manager::{DatanodeRef, FlownodeRef, NodeManager};
use crate::peer::Peer;
use crate::peer::{Peer, StandalonePeerLookupService};
use crate::region_keeper::MemoryRegionKeeper;
use crate::sequence::SequenceBuilder;
use crate::state_store::KvStateStore;
@@ -855,6 +855,7 @@ mod tests {
flow_metadata_manager,
flow_metadata_allocator,
memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
peer_lookup_service: Arc::new(StandalonePeerLookupService::new()),
},
procedure_manager.clone(),
true,

View File

@@ -13,10 +13,14 @@
// limitations under the License.
use std::fmt::{Display, Formatter};
use std::sync::Arc;
use api::v1::meta::Peer as PbPeer;
use serde::{Deserialize, Serialize};
use crate::error::Error;
use crate::{ClusterId, DatanodeId, FlownodeId};
#[derive(Debug, Default, Clone, Hash, Eq, PartialEq, Deserialize, Serialize)]
pub struct Peer {
/// Node identifier. Unique in a cluster.
@@ -64,3 +68,50 @@ impl Display for Peer {
write!(f, "peer-{}({})", self.id, self.addr)
}
}
/// can query peer given a node id
#[async_trait::async_trait]
pub trait PeerLookupService {
async fn datanode(&self, cluster_id: ClusterId, id: DatanodeId) -> Result<Option<Peer>, Error>;
async fn flownode(&self, cluster_id: ClusterId, id: FlownodeId) -> Result<Option<Peer>, Error>;
}
pub type PeerLookupServiceRef = Arc<dyn PeerLookupService + Send + Sync>;
/// always return `Peer::new(0, "")` for any query
pub struct StandalonePeerLookupService {
default_peer: Peer,
}
impl StandalonePeerLookupService {
pub fn new() -> Self {
Self {
default_peer: Peer::new(0, ""),
}
}
}
impl Default for StandalonePeerLookupService {
fn default() -> Self {
Self::new()
}
}
#[async_trait::async_trait]
impl PeerLookupService for StandalonePeerLookupService {
async fn datanode(
&self,
_cluster_id: ClusterId,
_id: DatanodeId,
) -> Result<Option<Peer>, Error> {
Ok(Some(self.default_peer.clone()))
}
async fn flownode(
&self,
_cluster_id: ClusterId,
_id: FlownodeId,
) -> Result<Option<Peer>, Error> {
Ok(Some(self.default_peer.clone()))
}
}

View File

@@ -33,7 +33,7 @@ use crate::kv_backend::KvBackendRef;
use crate::node_manager::{
Datanode, DatanodeRef, Flownode, FlownodeRef, NodeManager, NodeManagerRef,
};
use crate::peer::Peer;
use crate::peer::{Peer, StandalonePeerLookupService};
use crate::region_keeper::MemoryRegionKeeper;
use crate::sequence::SequenceBuilder;
use crate::wal_options_allocator::WalOptionsAllocator;
@@ -180,5 +180,6 @@ pub fn new_ddl_context_with_kv_backend(
table_metadata_manager,
flow_metadata_allocator,
flow_metadata_manager,
peer_lookup_service: Arc::new(StandalonePeerLookupService::new()),
}
}

View File

@@ -38,6 +38,7 @@ greptime-proto.workspace = true
# otherwise it is the same with upstream repo
hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", branch = "main" }
itertools.workspace = true
meta-client.workspace = true
minstant = "0.1.7"
nom = "7.1.3"
num-traits = "0.2"

View File

@@ -34,6 +34,7 @@ use greptime_proto::v1;
use itertools::Itertools;
use query::{QueryEngine, QueryEngineFactory};
use serde::{Deserialize, Serialize};
use servers::grpc::GrpcOptions;
use session::context::QueryContext;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{ConcreteDataType, RegionId};
@@ -78,8 +79,8 @@ pub type TableName = [String; 3];
#[derive(Clone, Default, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct FlownodeOptions {
/// rpc address
pub rpc_addr: String,
pub node_id: Option<u64>,
pub grpc: GrpcOptions,
}
/// Flownode Builder
@@ -497,6 +498,7 @@ impl FlownodeManager {
/// run in common_runtime background runtime
pub fn run_background(self: Arc<Self>) -> JoinHandle<()> {
info!("Starting flownode manager's background task");
// TODO(discord9): add heartbeat tasks here
common_runtime::spawn_bg(async move {
self.run().await;
})

198
src/flow/src/heartbeat.rs Normal file
View File

@@ -0,0 +1,198 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Send heartbeat from flownode to metasrv
use std::sync::Arc;
use api::v1::meta::{HeartbeatRequest, Peer};
use common_error::ext::BoxedError;
use common_meta::heartbeat::handler::{
HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
};
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
use common_telemetry::{debug, error, info};
use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
use servers::heartbeat_options::HeartbeatOptions;
use snafu::ResultExt;
use tokio::sync::mpsc;
use tokio::time::{Duration, Instant};
use crate::adapter::error::ExternalSnafu;
use crate::{Error, FlownodeOptions};
/// The flownode heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background.
#[derive(Clone)]
pub struct HeartbeatTask {
node_id: u64,
server_addr: String,
meta_client: Arc<MetaClient>,
report_interval: Duration,
retry_interval: Duration,
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
}
impl HeartbeatTask {
pub fn new(
opts: &FlownodeOptions,
meta_client: Arc<MetaClient>,
heartbeat_opts: HeartbeatOptions,
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
) -> Self {
Self {
node_id: opts.node_id.unwrap_or(0),
server_addr: opts.grpc.addr.clone(),
meta_client,
report_interval: heartbeat_opts.interval,
retry_interval: heartbeat_opts.retry_interval,
resp_handler_executor,
}
}
pub async fn start(&self) -> Result<(), Error> {
info!("Start to establish the heartbeat connection to metasrv.");
let (req_sender, resp_stream) = self
.meta_client
.heartbeat()
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
info!("Flownode's heartbeat connection has been established with metasrv");
let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
self.start_handle_resp_stream(resp_stream, mailbox);
self.start_heartbeat_report(req_sender, outgoing_rx);
Ok(())
}
fn create_heartbeat_request(
message: OutgoingMessage,
self_peer: &Option<Peer>,
) -> Option<HeartbeatRequest> {
match outgoing_message_to_mailbox_message(message) {
Ok(message) => {
let req = HeartbeatRequest {
mailbox_message: Some(message),
peer: self_peer.clone(),
..Default::default()
};
Some(req)
}
Err(e) => {
error!(e; "Failed to encode mailbox messages");
None
}
}
}
fn start_heartbeat_report(
&self,
req_sender: HeartbeatSender,
mut outgoing_rx: mpsc::Receiver<OutgoingMessage>,
) {
let report_interval = self.report_interval;
let self_peer = Some(Peer {
id: self.node_id,
addr: self.server_addr.clone(),
});
common_runtime::spawn_hb(async move {
// note that using interval will cause it to first immediately send
// a heartbeat
let mut interval = tokio::time::interval(report_interval);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
let req = tokio::select! {
message = outgoing_rx.recv() => {
if let Some(message) = message {
Self::create_heartbeat_request(message, &self_peer)
} else {
// Receives None that means Sender was dropped, we need to break the current loop
break
}
}
_ = interval.tick() => {
let req = HeartbeatRequest {
peer: self_peer.clone(),
..Default::default()
};
Some(req)
}
};
if let Some(req) = req {
if let Err(e) = req_sender.send(req.clone()).await {
error!(e; "Failed to send heartbeat to metasrv");
break;
} else {
debug!("Send a heartbeat request to metasrv, content: {:?}", req);
}
}
}
});
}
fn start_handle_resp_stream(&self, mut resp_stream: HeartbeatStream, mailbox: MailboxRef) {
let capture_self = self.clone();
let retry_interval = self.retry_interval;
let _handle = common_runtime::spawn_hb(async move {
loop {
match resp_stream.message().await {
Ok(Some(resp)) => {
debug!("Receiving heartbeat response: {:?}", resp);
let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), resp);
if let Err(e) = capture_self.handle_response(ctx).await {
error!(e; "Error while handling heartbeat response");
}
}
Ok(None) => break,
Err(e) => {
error!(e; "Occur error while reading heartbeat response");
capture_self.start_with_retry(retry_interval).await;
break;
}
}
}
});
}
async fn handle_response(&self, ctx: HeartbeatResponseHandlerContext) -> Result<(), Error> {
self.resp_handler_executor
.handle(ctx)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)
}
async fn start_with_retry(&self, retry_interval: Duration) {
loop {
tokio::time::sleep(retry_interval).await;
info!("Try to re-establish the heartbeat connection to metasrv.");
if self.start().await.is_ok() {
break;
}
}
}
}

View File

@@ -20,16 +20,17 @@
#![feature(duration_abs_diff)]
#![allow(dead_code)]
#![allow(unused_imports)]
#![warn(missing_docs)]
#![warn(clippy::missing_docs_in_private_items)]
#![warn(clippy::too_many_lines)]
// allow unused for now because it should be use later
mod adapter;
mod compute;
mod expr;
mod heartbeat;
mod plan;
mod repr;
mod transform;
mod utils;
pub use adapter::error::{Error, Result};
pub use adapter::{FlownodeBuilder, FlownodeManager, FlownodeManagerRef, FlownodeOptions};

View File

@@ -0,0 +1,55 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_error::ext::BoxedError;
use common_meta::ddl::flow_meta::PartitionPeerAllocator;
use common_meta::peer::Peer;
use common_meta::ClusterId;
use snafu::ResultExt;
use crate::metasrv::{SelectorContext, SelectorRef};
use crate::selector::SelectorOptions;
pub struct FlowPeerAllocator {
ctx: SelectorContext,
selector: SelectorRef,
}
impl FlowPeerAllocator {
pub fn new(ctx: SelectorContext, selector: SelectorRef) -> Self {
Self { ctx, selector }
}
}
#[async_trait::async_trait]
impl PartitionPeerAllocator for FlowPeerAllocator {
async fn alloc(
&self,
cluster_id: ClusterId,
partitions: usize,
) -> common_meta::error::Result<Vec<Peer>> {
self.selector
.select(
cluster_id,
&self.ctx,
SelectorOptions {
min_required_items: partitions,
allow_duplication: true,
},
)
.await
.map_err(BoxedError::new)
.context(common_meta::error::ExternalSnafu)
}
}

View File

@@ -15,10 +15,12 @@
use std::collections::HashMap;
use std::hash::Hash;
use common_error::ext::BoxedError;
use common_meta::kv_backend::KvBackend;
use common_meta::peer::Peer;
use common_meta::{util, ClusterId};
use common_meta::peer::{Peer, PeerLookupService};
use common_meta::{util, ClusterId, DatanodeId, FlownodeId};
use common_time::util as time_util;
use snafu::ResultExt;
use crate::cluster::MetaPeerClientRef;
use crate::error::{Error, Result};
@@ -31,9 +33,10 @@ fn build_lease_filter(lease_secs: u64) -> impl Fn(&LeaseValue) -> bool {
}
}
pub async fn lookup_alive_datanode_peer(
/// look up [`Peer`] given [`ClusterId`] and [`DatanodeId`], will only return if it's alive under given `lease_secs`
pub async fn lookup_datanode_peer(
cluster_id: ClusterId,
datanode_id: u64,
datanode_id: DatanodeId,
meta_peer_client: &MetaPeerClientRef,
lease_secs: u64,
) -> Result<Option<Peer>> {
@@ -47,7 +50,8 @@ pub async fn lookup_alive_datanode_peer(
return Ok(None);
};
let lease_value: LeaseValue = kv.value.try_into()?;
if lease_filter(&lease_value) {
let is_alive = lease_filter(&lease_value);
if is_alive {
Ok(Some(Peer {
id: lease_key.node_id,
addr: lease_value.node_addr,
@@ -57,6 +61,7 @@ pub async fn lookup_alive_datanode_peer(
}
}
/// Find all alive datanodes
pub async fn alive_datanodes(
cluster_id: ClusterId,
meta_peer_client: &MetaPeerClientRef,
@@ -71,6 +76,36 @@ pub async fn alive_datanodes(
.await
}
/// look up [`Peer`] given [`ClusterId`] and [`DatanodeId`], only return if it's alive under given `lease_secs`
pub async fn lookup_flownode_peer(
cluster_id: ClusterId,
flownode_id: FlownodeId,
meta_peer_client: &MetaPeerClientRef,
lease_secs: u64,
) -> Result<Option<Peer>> {
let lease_filter = build_lease_filter(lease_secs);
let lease_key = FlownodeLeaseKey {
cluster_id,
node_id: flownode_id,
};
let lease_key_bytes: Vec<u8> = lease_key.clone().try_into()?;
let Some(kv) = meta_peer_client.get(&lease_key_bytes).await? else {
return Ok(None);
};
let lease_value: LeaseValue = kv.value.try_into()?;
let is_alive = lease_filter(&lease_value);
if is_alive {
Ok(Some(Peer {
id: lease_key.node_id,
addr: lease_value.node_addr,
}))
} else {
Ok(None)
}
}
/// Find all alive flownodes
pub async fn alive_flownodes(
cluster_id: ClusterId,
meta_peer_client: &MetaPeerClientRef,
@@ -114,3 +149,38 @@ where
Ok(lease_kvs)
}
#[derive(Clone)]
pub struct MetaPeerLookupService {
pub meta_peer_client: MetaPeerClientRef,
}
impl MetaPeerLookupService {
pub fn new(meta_peer_client: MetaPeerClientRef) -> Self {
Self { meta_peer_client }
}
}
#[async_trait::async_trait]
impl PeerLookupService for MetaPeerLookupService {
async fn datanode(
&self,
cluster_id: ClusterId,
id: DatanodeId,
) -> common_meta::error::Result<Option<Peer>> {
lookup_datanode_peer(cluster_id, id, &self.meta_peer_client, u64::MAX)
.await
.map_err(BoxedError::new)
.context(common_meta::error::ExternalSnafu)
}
async fn flownode(
&self,
cluster_id: ClusterId,
id: FlownodeId,
) -> common_meta::error::Result<Option<Peer>> {
lookup_flownode_peer(cluster_id, id, &self.meta_peer_client, u64::MAX)
.await
.map_err(BoxedError::new)
.context(common_meta::error::ExternalSnafu)
}
}

View File

@@ -24,6 +24,7 @@ pub mod cluster;
pub mod election;
pub mod error;
mod failure_detector;
pub mod flow_meta_alloc;
pub mod handler;
pub mod key;
pub mod lease;

View File

@@ -50,7 +50,7 @@ use crate::error::{
};
use crate::failure_detector::PhiAccrualFailureDetectorOptions;
use crate::handler::HeartbeatHandlerGroup;
use crate::lease::lookup_alive_datanode_peer;
use crate::lease::lookup_datanode_peer;
use crate::lock::DistLockRef;
use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
use crate::pubsub::{PublisherRef, SubscriptionManagerRef};
@@ -484,7 +484,7 @@ impl Metasrv {
cluster_id: ClusterId,
peer_id: u64,
) -> Result<Option<Peer>> {
lookup_alive_datanode_peer(
lookup_datanode_peer(
cluster_id,
peer_id,
&self.meta_peer_client,

View File

@@ -42,6 +42,7 @@ use super::{SelectTarget, FLOW_ID_SEQ};
use crate::cache_invalidator::MetasrvCacheInvalidator;
use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
use crate::error::{self, Result};
use crate::flow_meta_alloc::FlowPeerAllocator;
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
use crate::handler::check_leader_handler::CheckLeaderHandler;
use crate::handler::collect_cluster_info_handler::{
@@ -58,6 +59,7 @@ use crate::handler::publish_heartbeat_handler::PublishHeartbeatHandler;
use crate::handler::region_lease_handler::RegionLeaseHandler;
use crate::handler::response_header_handler::ResponseHeaderHandler;
use crate::handler::{HeartbeatHandlerGroup, HeartbeatMailbox, Pushers};
use crate::lease::MetaPeerLookupService;
use crate::lock::memory::MemLock;
use crate::lock::DistLockRef;
use crate::metasrv::{
@@ -240,14 +242,26 @@ impl MetasrvBuilder {
peer_allocator,
))
});
// TODO(weny): use the real allocator.
let flow_metadata_allocator =
Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(Arc::new(
let flow_metadata_allocator = {
// for now flownode just use round robin selector
let flow_selector = RoundRobinSelector::new(SelectTarget::Flownode);
let flow_selector_ctx = selector_ctx.clone();
let peer_allocator = Arc::new(FlowPeerAllocator::new(
flow_selector_ctx,
Arc::new(flow_selector),
));
let seq = Arc::new(
SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
.initial(MIN_USER_FLOW_ID as u64)
.step(10)
.build(),
)));
);
Arc::new(FlowMetadataAllocator::with_peer_allocator(
seq,
peer_allocator,
))
};
let memory_region_keeper = Arc::new(MemoryRegionKeeper::default());
let node_manager = node_manager.unwrap_or_else(|| {
let datanode_client_channel_config = ChannelConfig::new()
@@ -276,6 +290,9 @@ impl MetasrvBuilder {
table_metadata_allocator: table_metadata_allocator.clone(),
flow_metadata_manager: flow_metadata_manager.clone(),
flow_metadata_allocator: flow_metadata_allocator.clone(),
peer_lookup_service: Arc::new(MetaPeerLookupService::new(
meta_peer_client.clone(),
)),
},
procedure_manager.clone(),
true,

View File

@@ -119,7 +119,7 @@ pub mod test_data {
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::node_manager::NodeManagerRef;
use common_meta::peer::Peer;
use common_meta::peer::{Peer, StandalonePeerLookupService};
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::rpc::router::RegionRoute;
use common_meta::sequence::SequenceBuilder;
@@ -225,6 +225,7 @@ pub mod test_data {
flow_metadata_manager,
flow_metadata_allocator,
memory_region_keeper: Arc::new(MemoryRegionKeeper::new()),
peer_lookup_service: Arc::new(StandalonePeerLookupService::new()),
}
}
}

View File

@@ -28,6 +28,7 @@ use common_meta::ddl_manager::DdlManager;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use common_meta::peer::StandalonePeerLookupService;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal_options_allocator::WalOptionsAllocator;
@@ -197,6 +198,7 @@ impl GreptimeDbStandaloneBuilder {
table_metadata_allocator,
flow_metadata_manager,
flow_metadata_allocator,
peer_lookup_service: Arc::new(StandalonePeerLookupService::new()),
},
procedure_manager.clone(),
register_procedure_loaders,