From 88eb69530a2e97bcb53fe8f886b82d877e345bf1 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Tue, 31 Oct 2023 13:54:56 +0900 Subject: [PATCH] refactor: remove router rpc (#2646) --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/common/meta/src/rpc/router.rs | 335 +----------------- src/meta-client/src/client.rs | 66 +--- src/meta-client/src/client/router.rs | 173 --------- src/meta-srv/src/bootstrap.rs | 2 - .../src/handler/region_lease_handler.rs | 8 +- src/meta-srv/src/lib.rs | 1 - src/meta-srv/src/metrics.rs | 1 - src/meta-srv/src/mocks.rs | 2 - src/meta-srv/src/procedure/region_failover.rs | 8 +- src/meta-srv/src/service.rs | 1 - src/meta-srv/src/service/router.rs | 100 ------ src/meta-srv/src/table_routes.rs | 141 -------- src/meta-srv/src/test_util.rs | 65 +++- src/partition/src/lib.rs | 1 - src/partition/src/manager.rs | 3 + src/partition/src/metrics.rs | 1 - src/partition/src/route.rs | 90 ----- 19 files changed, 76 insertions(+), 926 deletions(-) delete mode 100644 src/meta-client/src/client/router.rs delete mode 100644 src/meta-srv/src/service/router.rs delete mode 100644 src/meta-srv/src/table_routes.rs delete mode 100644 src/partition/src/route.rs diff --git a/Cargo.lock b/Cargo.lock index 0be1a8021a..9a77225fb5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3635,7 +3635,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=1d7dc6f18b310355a4c16fb453d3ca7ed09f048d#1d7dc6f18b310355a4c16fb453d3ca7ed09f048d" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=20cdc57c3f320345b122eea43bc549a19d342e51#20cdc57c3f320345b122eea43bc549a19d342e51" dependencies = [ "prost 0.12.1", "serde", diff --git a/Cargo.toml b/Cargo.toml index a018cdf484..2ec02a2783 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -79,7 +79,7 @@ derive_builder = "0.12" etcd-client = "0.11" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "1d7dc6f18b310355a4c16fb453d3ca7ed09f048d" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "20cdc57c3f320345b122eea43bc549a19d342e51" } humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" diff --git a/src/common/meta/src/rpc/router.rs b/src/common/meta/src/rpc/router.rs index 63d4880c23..449628aa44 100644 --- a/src/common/meta/src/rpc/router.rs +++ b/src/common/meta/src/rpc/router.rs @@ -15,65 +15,19 @@ use std::collections::{BTreeMap, HashMap, HashSet}; use api::v1::meta::{ - Partition as PbPartition, Peer as PbPeer, Region as PbRegion, RegionRoute as PbRegionRoute, - RouteRequest as PbRouteRequest, RouteResponse as PbRouteResponse, Table as PbTable, - TableId as PbTableId, TableRoute as PbTableRoute, TableRouteValue as PbTableRouteValue, + Partition as PbPartition, Peer as PbPeer, Region as PbRegion, Table as PbTable, + TableRoute as PbTableRoute, }; use serde::ser::SerializeSeq; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use snafu::OptionExt; use store_api::storage::{RegionId, RegionNumber}; -use table::metadata::TableId; use crate::error::{self, Result}; use crate::key::RegionDistribution; use crate::peer::Peer; -use crate::rpc::util; use crate::table_name::TableName; -#[derive(Debug, Clone, Default)] -pub struct RouteRequest { - pub table_ids: Vec, -} - -impl From for PbRouteRequest { - fn from(mut req: RouteRequest) -> Self { - Self { - header: None, - table_ids: req.table_ids.drain(..).map(|id| PbTableId { id }).collect(), - } - } -} - -impl RouteRequest { - #[inline] - pub fn new(table_id: TableId) -> Self { - Self { - table_ids: vec![table_id], - } - } -} - -#[derive(Debug, Clone)] -pub struct RouteResponse { - pub table_routes: Vec, -} - -impl TryFrom for RouteResponse { - type Error = error::Error; - - fn try_from(pb: PbRouteResponse) -> Result { - util::check_response_header(pb.header.as_ref())?; - - let table_routes = pb - .table_routes - .into_iter() - .map(|x| TableRoute::try_from_raw(&pb.peers, x)) - .collect::>>()?; - Ok(Self { table_routes }) - } -} - pub fn region_distribution(region_routes: &[RegionRoute]) -> Result { let mut regions_id_map = RegionDistribution::new(); for route in region_routes.iter() { @@ -199,107 +153,6 @@ impl TableRoute { Ok(Self::new(table, region_routes)) } - - pub fn try_into_raw(self) -> Result<(Vec, PbTableRoute)> { - let mut peers = HashSet::new(); - self.region_routes - .iter() - .filter_map(|x| x.leader_peer.as_ref()) - .for_each(|p| { - let _ = peers.insert(p.clone()); - }); - self.region_routes - .iter() - .flat_map(|x| x.follower_peers.iter()) - .for_each(|p| { - let _ = peers.insert(p.clone()); - }); - let mut peers = peers.into_iter().map(Into::into).collect::>(); - peers.sort_by_key(|x| x.id); - - let find_peer = |peer_id: u64| -> u64 { - peers - .iter() - .enumerate() - .find_map(|(i, x)| { - if x.id == peer_id { - Some(i as u64) - } else { - None - } - }) - .unwrap_or_else(|| { - panic!("Peer {peer_id} must be present when collecting all peers.") - }) - }; - - let mut region_routes = Vec::with_capacity(self.region_routes.len()); - for region_route in self.region_routes.into_iter() { - let leader_peer_index = region_route.leader_peer.map(|x| find_peer(x.id)).context( - error::RouteInfoCorruptedSnafu { - err_msg: "'leader_peer' is empty in region route", - }, - )?; - - let follower_peer_indexes = region_route - .follower_peers - .iter() - .map(|x| find_peer(x.id)) - .collect::>(); - - region_routes.push(PbRegionRoute { - region: Some(region_route.region.into()), - leader_peer_index, - follower_peer_indexes, - }); - } - - let table_route = PbTableRoute { - table: Some(self.table.into()), - region_routes, - }; - Ok((peers, table_route)) - } - - pub fn find_leaders(&self) -> HashSet { - find_leaders(&self.region_routes) - } - - pub fn find_leader_regions(&self, datanode: &Peer) -> Vec { - find_leader_regions(&self.region_routes, datanode) - } - - pub fn find_region_leader(&self, region_number: RegionNumber) -> Option<&Peer> { - self.region_leaders - .get(®ion_number) - .and_then(|x| x.as_ref()) - } -} - -impl TryFrom for TableRoute { - type Error = error::Error; - - fn try_from(pb: PbTableRouteValue) -> Result { - TableRoute::try_from_raw( - &pb.peers, - pb.table_route.context(error::InvalidProtoMsgSnafu { - err_msg: "expected table_route", - })?, - ) - } -} - -impl TryFrom for PbTableRouteValue { - type Error = error::Error; - - fn try_from(table_route: TableRoute) -> Result { - let (peers, table_route) = table_route.try_into_raw()?; - - Ok(PbTableRouteValue { - peers, - table_route: Some(table_route), - }) - } } #[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] @@ -456,12 +309,6 @@ impl From for Partition { #[cfg(test)] mod tests { - use api::v1::meta::{ - Partition as PbPartition, Peer as PbPeer, Region as PbRegion, RegionRoute as PbRegionRoute, - RouteRequest as PbRouteRequest, RouteResponse as PbRouteResponse, Table as PbTable, - TableName as PbTableName, TableRoute as PbTableRoute, - }; - use super::*; #[test] @@ -476,182 +323,4 @@ mod tests { assert_eq!(got, p); } - - #[test] - fn test_route_request_trans() { - let req = RouteRequest { - table_ids: vec![1, 2], - }; - - let into_req: PbRouteRequest = req.into(); - - assert!(into_req.header.is_none()); - assert_eq!(1, into_req.table_ids.get(0).unwrap().id); - assert_eq!(2, into_req.table_ids.get(1).unwrap().id); - } - - #[test] - fn test_route_response_trans() { - let res = PbRouteResponse { - header: None, - peers: vec![ - PbPeer { - id: 1, - addr: "peer1".to_string(), - }, - PbPeer { - id: 2, - addr: "peer2".to_string(), - }, - ], - table_routes: vec![PbTableRoute { - table: Some(PbTable { - id: 1, - table_name: Some(PbTableName { - catalog_name: "c1".to_string(), - schema_name: "s1".to_string(), - table_name: "t1".to_string(), - }), - table_schema: b"schema".to_vec(), - }), - region_routes: vec![PbRegionRoute { - region: Some(PbRegion { - id: 1, - name: "region1".to_string(), - partition: Some(PbPartition { - column_list: vec![b"c1".to_vec(), b"c2".to_vec()], - value_list: vec![b"v1".to_vec(), b"v2".to_vec()], - }), - attrs: Default::default(), - }), - leader_peer_index: 0, - follower_peer_indexes: vec![1], - }], - }], - }; - - let res: RouteResponse = res.try_into().unwrap(); - let mut table_routes = res.table_routes; - assert_eq!(1, table_routes.len()); - let table_route = table_routes.remove(0); - let table = table_route.table; - assert_eq!(1, table.id); - assert_eq!("c1", table.table_name.catalog_name); - assert_eq!("s1", table.table_name.schema_name); - assert_eq!("t1", table.table_name.table_name); - - let mut region_routes = table_route.region_routes; - assert_eq!(1, region_routes.len()); - let region_route = region_routes.remove(0); - let region = region_route.region; - assert_eq!(1, region.id); - assert_eq!("region1", region.name); - let partition = region.partition.unwrap(); - assert_eq!(vec![b"c1".to_vec(), b"c2".to_vec()], partition.column_list); - assert_eq!(vec![b"v1".to_vec(), b"v2".to_vec()], partition.value_list); - - assert_eq!(1, region_route.leader_peer.as_ref().unwrap().id); - assert_eq!("peer1", region_route.leader_peer.as_ref().unwrap().addr); - - assert_eq!(1, region_route.follower_peers.len()); - assert_eq!(2, region_route.follower_peers.get(0).unwrap().id); - assert_eq!("peer2", region_route.follower_peers.get(0).unwrap().addr); - } - - #[test] - fn test_table_route_raw_conversion() { - let raw_peers = vec![ - PbPeer { - id: 1, - addr: "a1".to_string(), - }, - PbPeer { - id: 2, - addr: "a2".to_string(), - }, - PbPeer { - id: 3, - addr: "a3".to_string(), - }, - ]; - - // region distribution: - // region id => leader peer id + [follower peer id] - // 1 => 2 + [1, 3] - // 2 => 1 + [2, 3] - - let raw_table_route = PbTableRoute { - table: Some(PbTable { - id: 1, - table_name: Some(PbTableName { - catalog_name: "c1".to_string(), - schema_name: "s1".to_string(), - table_name: "t1".to_string(), - }), - table_schema: vec![], - }), - region_routes: vec![ - PbRegionRoute { - region: Some(PbRegion { - id: 1, - name: "r1".to_string(), - partition: None, - attrs: HashMap::new(), - }), - leader_peer_index: 1, - follower_peer_indexes: vec![0, 2], - }, - PbRegionRoute { - region: Some(PbRegion { - id: 2, - name: "r2".to_string(), - partition: None, - attrs: HashMap::new(), - }), - leader_peer_index: 0, - follower_peer_indexes: vec![1, 2], - }, - ], - }; - let table_route = TableRoute { - table: Table { - id: 1, - table_name: TableName::new("c1", "s1", "t1"), - table_schema: vec![], - }, - region_routes: vec![ - RegionRoute { - region: Region { - id: 1.into(), - name: "r1".to_string(), - partition: None, - attrs: BTreeMap::new(), - }, - leader_peer: Some(Peer::new(2, "a2")), - follower_peers: vec![Peer::new(1, "a1"), Peer::new(3, "a3")], - }, - RegionRoute { - region: Region { - id: 2.into(), - name: "r2".to_string(), - partition: None, - attrs: BTreeMap::new(), - }, - leader_peer: Some(Peer::new(1, "a1")), - follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")], - }, - ], - region_leaders: HashMap::from([ - (2, Some(Peer::new(1, "a1"))), - (1, Some(Peer::new(2, "a2"))), - ]), - }; - - let from_raw = TableRoute::try_from_raw(&raw_peers, raw_table_route.clone()).unwrap(); - assert_eq!(from_raw, table_route); - - let into_raw = table_route.try_into_raw().unwrap(); - assert_eq!(into_raw.0, raw_peers); - assert_eq!(into_raw.1, raw_table_route); - } } diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index d39f6951f0..641eace2fb 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -17,7 +17,7 @@ mod ddl; mod heartbeat; mod load_balance; mod lock; -mod router; + mod store; use api::v1::meta::Role; @@ -27,7 +27,6 @@ use common_meta::ddl::{DdlTaskExecutor, ExecutorContext}; use common_meta::error::{self as meta_error, Result as MetaResult}; use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use common_meta::rpc::lock::{LockRequest, LockResponse, UnlockRequest}; -use common_meta::rpc::router::{RouteRequest, RouteResponse}; use common_meta::rpc::store::{ BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest, @@ -37,7 +36,6 @@ use common_telemetry::info; use ddl::Client as DdlClient; use heartbeat::Client as HeartbeatClient; use lock::Client as LockClient; -use router::Client as RouterClient; use snafu::{OptionExt, ResultExt}; use store::Client as StoreClient; @@ -151,9 +149,6 @@ impl MetaClientBuilder { DEFAULT_ASK_LEADER_MAX_RETRY, )); } - if self.enable_router { - client.router = Some(RouterClient::new(self.id, self.role, mgr.clone())); - } if self.enable_store { client.store = Some(StoreClient::new(self.id, self.role, mgr.clone())); } @@ -179,7 +174,6 @@ pub struct MetaClient { id: Id, channel_manager: ChannelManager, heartbeat: Option, - router: Option, store: Option, lock: Option, ddl: Option, @@ -226,10 +220,6 @@ impl MetaClient { client.start(urls.clone()).await?; info!("Heartbeat client started"); } - if let Some(client) = &mut self.router { - client.start(urls.clone()).await?; - info!("Router client started"); - } if let Some(client) = &mut self.store { client.start(urls.clone()).await?; info!("Store client started"); @@ -262,33 +252,6 @@ impl MetaClient { self.heartbeat_client()?.heartbeat().await } - /// Fetch routing information for tables. The smallest unit is the complete - /// routing information(all regions) of a table. - /// - /// ```text - /// table_1 - /// table_name - /// table_schema - /// regions - /// region_1 - /// leader_peer - /// follower_peer_1, follower_peer_2 - /// region_2 - /// leader_peer - /// follower_peer_1, follower_peer_2, follower_peer_3 - /// region_xxx - /// table_2 - /// ... - /// ``` - /// - pub async fn route(&self, req: RouteRequest) -> Result { - self.router_client()? - .route(req.into()) - .await? - .try_into() - .context(ConvertMetaResponseSnafu) - } - /// Range gets the keys in the range from the key-value store. pub async fn range(&self, req: RangeRequest) -> Result { self.store_client()? @@ -386,13 +349,6 @@ impl MetaClient { }) } - #[inline] - pub fn router_client(&self) -> Result { - self.router.clone().context(error::NotStartedSnafu { - name: "store_client", - }) - } - #[inline] pub fn store_client(&self) -> Result { self.store.clone().context(error::NotStartedSnafu { @@ -489,7 +445,6 @@ mod tests { .enable_heartbeat() .build(); let _ = meta_client.heartbeat_client().unwrap(); - assert!(meta_client.router_client().is_err()); assert!(meta_client.store_client().is_err()); meta_client.start(urls).await.unwrap(); assert!(meta_client.heartbeat_client().unwrap().is_started().await); @@ -498,16 +453,13 @@ mod tests { .enable_router() .build(); assert!(meta_client.heartbeat_client().is_err()); - let _ = meta_client.router_client().unwrap(); assert!(meta_client.store_client().is_err()); meta_client.start(urls).await.unwrap(); - assert!(meta_client.router_client().unwrap().is_started().await); let mut meta_client = MetaClientBuilder::new(0, 0, Role::Datanode) .enable_store() .build(); assert!(meta_client.heartbeat_client().is_err()); - assert!(meta_client.router_client().is_err()); let _ = meta_client.store_client().unwrap(); meta_client.start(urls).await.unwrap(); assert!(meta_client.store_client().unwrap().is_started().await); @@ -520,11 +472,9 @@ mod tests { assert_eq!(1, meta_client.id().0); assert_eq!(2, meta_client.id().1); let _ = meta_client.heartbeat_client().unwrap(); - let _ = meta_client.router_client().unwrap(); let _ = meta_client.store_client().unwrap(); meta_client.start(urls).await.unwrap(); assert!(meta_client.heartbeat_client().unwrap().is_started().await); - assert!(meta_client.router_client().unwrap().is_started().await); assert!(meta_client.store_client().unwrap().is_started().await); } @@ -540,20 +490,6 @@ mod tests { assert!(matches!(res.err(), Some(error::Error::NotStarted { .. }))); } - #[tokio::test] - async fn test_not_start_router_client() { - let urls = &["127.0.0.1:3001", "127.0.0.1:3002"]; - let mut meta_client = MetaClientBuilder::new(0, 0, Role::Datanode) - .enable_heartbeat() - .enable_store() - .build(); - meta_client.start(urls).await.unwrap(); - - let req = RouteRequest::new(1); - let res = meta_client.route(req).await; - assert!(matches!(res.err(), Some(error::Error::NotStarted { .. }))); - } - #[tokio::test] async fn test_not_start_store_client() { let urls = &["127.0.0.1:3001", "127.0.0.1:3002"]; diff --git a/src/meta-client/src/client/router.rs b/src/meta-client/src/client/router.rs deleted file mode 100644 index 5f4ce1f67b..0000000000 --- a/src/meta-client/src/client/router.rs +++ /dev/null @@ -1,173 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashSet; -use std::sync::Arc; - -use api::v1::meta::router_client::RouterClient; -use api::v1::meta::{Role, RouteRequest, RouteResponse}; -use common_grpc::channel_manager::ChannelManager; -use snafu::{ensure, OptionExt, ResultExt}; -use tokio::sync::RwLock; -use tonic::transport::Channel; - -use crate::client::{load_balance as lb, Id}; -use crate::error; -use crate::error::Result; - -#[derive(Clone, Debug)] -pub struct Client { - inner: Arc>, -} - -impl Client { - pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self { - let inner = Arc::new(RwLock::new(Inner { - id, - role, - channel_manager, - peers: vec![], - })); - - Self { inner } - } - - pub async fn start(&mut self, urls: A) -> Result<()> - where - U: AsRef, - A: AsRef<[U]>, - { - let mut inner = self.inner.write().await; - inner.start(urls).await - } - - pub async fn is_started(&self) -> bool { - let inner = self.inner.read().await; - inner.is_started() - } - - pub async fn route(&self, req: RouteRequest) -> Result { - let inner = self.inner.read().await; - inner.route(req).await - } -} - -#[derive(Debug)] -struct Inner { - id: Id, - role: Role, - channel_manager: ChannelManager, - peers: Vec, -} - -impl Inner { - async fn start(&mut self, urls: A) -> Result<()> - where - U: AsRef, - A: AsRef<[U]>, - { - ensure!( - !self.is_started(), - error::IllegalGrpcClientStateSnafu { - err_msg: "Router client already started", - } - ); - - self.peers = urls - .as_ref() - .iter() - .map(|url| url.as_ref().to_string()) - .collect::>() - .drain() - .collect::>(); - - Ok(()) - } - - async fn route(&self, mut req: RouteRequest) -> Result { - let mut client = self.random_client()?; - req.set_header(self.id, self.role); - let res = client.route(req).await.map_err(error::Error::from)?; - - Ok(res.into_inner()) - } - - fn random_client(&self) -> Result> { - let len = self.peers.len(); - let peer = lb::random_get(len, |i| Some(&self.peers[i])).context( - error::IllegalGrpcClientStateSnafu { - err_msg: "Empty peers, router client may not start yet", - }, - )?; - - self.make_client(peer) - } - - fn make_client(&self, addr: impl AsRef) -> Result> { - let channel = self - .channel_manager - .get(addr) - .context(error::CreateChannelSnafu)?; - - Ok(RouterClient::new(channel)) - } - - #[inline] - fn is_started(&self) -> bool { - !self.peers.is_empty() - } -} - -#[cfg(test)] -mod test { - use super::*; - - #[tokio::test] - async fn test_start_client() { - let mut client = Client::new((0, 0), Role::Frontend, ChannelManager::default()); - assert!(!client.is_started().await); - client - .start(&["127.0.0.1:1000", "127.0.0.1:1001"]) - .await - .unwrap(); - assert!(client.is_started().await); - } - - #[tokio::test] - async fn test_already_start() { - let mut client = Client::new((0, 0), Role::Frontend, ChannelManager::default()); - client - .start(&["127.0.0.1:1000", "127.0.0.1:1001"]) - .await - .unwrap(); - assert!(client.is_started().await); - let res = client.start(&["127.0.0.1:1002"]).await; - assert!(res.is_err()); - assert!(matches!( - res.err(), - Some(error::Error::IllegalGrpcClientState { .. }) - )); - } - - #[tokio::test] - async fn test_start_with_duplicate_peers() { - let mut client = Client::new((0, 0), Role::Frontend, ChannelManager::default()); - client - .start(&["127.0.0.1:1000", "127.0.0.1:1000", "127.0.0.1:1000"]) - .await - .unwrap(); - - assert_eq!(1, client.inner.write().await.peers.len()); - } -} diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 6c71f92375..cb2f15af2d 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -18,7 +18,6 @@ use api::v1::meta::cluster_server::ClusterServer; use api::v1::meta::ddl_task_server::DdlTaskServer; use api::v1::meta::heartbeat_server::HeartbeatServer; use api::v1::meta::lock_server::LockServer; -use api::v1::meta::router_server::RouterServer; use api::v1::meta::store_server::StoreServer; use common_base::Plugins; use etcd_client::Client; @@ -152,7 +151,6 @@ pub fn router(meta_srv: MetaSrv) -> Router { tonic::transport::Server::builder() .accept_http1(true) // for admin services .add_service(HeartbeatServer::new(meta_srv.clone())) - .add_service(RouterServer::new(meta_srv.clone())) .add_service(StoreServer::new(meta_srv.clone())) .add_service(ClusterServer::new(meta_srv.clone())) .add_service(LockServer::new(meta_srv.clone())) diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 2387058f92..5090d1e1b6 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -78,7 +78,7 @@ mod test { use crate::handler::node_stat::{RegionStat, Stat}; use crate::metasrv::builder::MetaSrvBuilder; use crate::service::store::kv::KvBackendAdapter; - use crate::{table_routes, test_util}; + use crate::test_util; #[tokio::test] async fn test_handle_region_lease() { @@ -94,11 +94,7 @@ mod test { let table_metadata_manager = Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap( kv_store.clone(), ))); - table_routes::tests::prepare_table_region_and_info_value( - &table_metadata_manager, - table_name, - ) - .await; + test_util::prepare_table_region_and_info_value(&table_metadata_manager, table_name).await; let req = HeartbeatRequest { duration_since_epoch: 1234, diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index 9bea19c48f..e27196b478 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -34,7 +34,6 @@ pub mod pubsub; pub mod selector; pub mod service; pub mod table_meta_alloc; -pub mod table_routes; pub use crate::error::Result; diff --git a/src/meta-srv/src/metrics.rs b/src/meta-srv/src/metrics.rs index 483fccfff8..e3b2a8d09a 100644 --- a/src/meta-srv/src/metrics.rs +++ b/src/meta-srv/src/metrics.rs @@ -13,7 +13,6 @@ // limitations under the License. pub(crate) const METRIC_META_KV_REQUEST: &str = "meta.kv_request"; -pub(crate) const METRIC_META_ROUTE_REQUEST: &str = "meta.route_request"; pub(crate) const METRIC_META_HEARTBEAT_CONNECTION_NUM: &str = "meta.heartbeat_connection_num"; pub(crate) const METRIC_META_HANDLER_EXECUTE: &str = "meta.handler_execute"; pub const METRIC_META_INACTIVE_REGIONS: &str = "meta.inactive_regions"; diff --git a/src/meta-srv/src/mocks.rs b/src/meta-srv/src/mocks.rs index 0ce0b4230f..acc49ab61d 100644 --- a/src/meta-srv/src/mocks.rs +++ b/src/meta-srv/src/mocks.rs @@ -17,7 +17,6 @@ use std::time::Duration; use api::v1::meta::ddl_task_server::DdlTaskServer; use api::v1::meta::heartbeat_server::HeartbeatServer; -use api::v1::meta::router_server::RouterServer; use api::v1::meta::store_server::StoreServer; use client::client_manager::DatanodeClients; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; @@ -85,7 +84,6 @@ pub async fn mock( let _handle = tokio::spawn(async move { tonic::transport::Server::builder() .add_service(HeartbeatServer::new(service.clone())) - .add_service(RouterServer::new(service.clone())) .add_service(StoreServer::new(service.clone())) .add_service(DdlTaskServer::new(service.clone())) .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)])) diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs index 88f3e04345..350c17d6fb 100644 --- a/src/meta-srv/src/procedure/region_failover.rs +++ b/src/meta-srv/src/procedure/region_failover.rs @@ -411,7 +411,7 @@ mod tests { use crate::service::mailbox::Channel; use crate::service::store::kv::{KvBackendAdapter, KvStoreRef}; use crate::service::store::memory::MemStore; - use crate::table_routes; + use crate::test_util; struct RandomNodeSelector { nodes: Vec, @@ -497,11 +497,7 @@ mod tests { let table_metadata_manager = Arc::new(TableMetadataManager::new( KvBackendAdapter::wrap(kv_store.clone()), )); - table_routes::tests::prepare_table_region_and_info_value( - &table_metadata_manager, - table, - ) - .await; + test_util::prepare_table_region_and_info_value(&table_metadata_manager, table).await; let region_distribution = table_metadata_manager .table_route_manager() .get_region_distribution(1) diff --git a/src/meta-srv/src/service.rs b/src/meta-srv/src/service.rs index 0e68db8eae..733bef295b 100644 --- a/src/meta-srv/src/service.rs +++ b/src/meta-srv/src/service.rs @@ -23,7 +23,6 @@ pub mod ddl; mod heartbeat; pub mod lock; pub mod mailbox; -pub mod router; pub mod store; pub type GrpcResult = std::result::Result, Status>; diff --git a/src/meta-srv/src/service/router.rs b/src/meta-srv/src/service/router.rs deleted file mode 100644 index ba4d653610..0000000000 --- a/src/meta-srv/src/service/router.rs +++ /dev/null @@ -1,100 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use api::v1::meta::{ - router_server, Peer, PeerDict, ResponseHeader, RouteRequest, RouteResponse, TableRoute, - TableRouteValue, -}; -use common_meta::key::table_info::TableInfoValue; -use common_telemetry::timer; -use snafu::ResultExt; -use tonic::{Request, Response}; - -use crate::error::{Result, TableMetadataManagerSnafu}; -use crate::metasrv::{Context, MetaSrv}; -use crate::metrics::METRIC_META_ROUTE_REQUEST; -use crate::service::GrpcResult; -use crate::table_routes::fetch_tables; - -#[async_trait::async_trait] -impl router_server::Router for MetaSrv { - async fn route(&self, req: Request) -> GrpcResult { - let req = req.into_inner(); - let cluster_id = req.header.as_ref().map_or(0, |h| h.cluster_id); - - let _timer = timer!( - METRIC_META_ROUTE_REQUEST, - &[ - ("op", "route".to_string()), - ("cluster_id", cluster_id.to_string()) - ] - ); - - let ctx = self.new_ctx(); - let res = handle_route(req, ctx).await?; - - Ok(Response::new(res)) - } -} - -async fn handle_route(req: RouteRequest, ctx: Context) -> Result { - let RouteRequest { header, table_ids } = req; - let cluster_id = header.as_ref().map_or(0, |h| h.cluster_id); - - let table_ids = table_ids.iter().map(|x| x.id).collect::>(); - let tables = fetch_tables(&ctx, table_ids).await?; - - let (peers, table_routes) = fill_table_routes(tables)?; - - let header = Some(ResponseHeader::success(cluster_id)); - Ok(RouteResponse { - header, - peers, - table_routes, - }) -} - -pub(crate) fn fill_table_routes( - tables: Vec<(TableInfoValue, TableRouteValue)>, -) -> Result<(Vec, Vec)> { - let mut peer_dict = PeerDict::default(); - let mut table_routes = vec![]; - for (tgv, trv) in tables { - let TableRouteValue { - peers, - mut table_route, - } = trv; - if let Some(table_route) = &mut table_route { - for rr in &mut table_route.region_routes { - if let Some(peer) = peers.get(rr.leader_peer_index as usize) { - rr.leader_peer_index = peer_dict.get_or_insert(peer.clone()) as u64; - } - for index in &mut rr.follower_peer_indexes { - if let Some(peer) = peers.get(*index as usize) { - *index = peer_dict.get_or_insert(peer.clone()) as u64; - } - } - } - - if let Some(table) = &mut table_route.table { - table.table_schema = tgv.try_as_raw_value().context(TableMetadataManagerSnafu)?; - } - } - if let Some(table_route) = table_route { - table_routes.push(table_route) - } - } - - Ok((peer_dict.into_peers(), table_routes)) -} diff --git a/src/meta-srv/src/table_routes.rs b/src/meta-srv/src/table_routes.rs deleted file mode 100644 index 593b6c3846..0000000000 --- a/src/meta-srv/src/table_routes.rs +++ /dev/null @@ -1,141 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use api::v1::meta::TableRouteValue; -use common_meta::key::table_info::TableInfoValue; -use common_meta::key::TableMetadataManagerRef; -use common_meta::rpc::router::{Table, TableRoute}; -use snafu::{OptionExt, ResultExt}; -use table::metadata::TableId; - -use crate::error::{self, Result, TableMetadataManagerSnafu, TableRouteNotFoundSnafu}; -use crate::metasrv::Context; - -pub(crate) async fn fetch_table( - table_metadata_manager: &TableMetadataManagerRef, - table_id: TableId, -) -> Result> { - let (table_info, table_route) = table_metadata_manager - .get_full_table_info(table_id) - .await - .context(TableMetadataManagerSnafu)?; - - if let Some(table_info) = table_info { - let table_route = table_route - .context(TableRouteNotFoundSnafu { table_id })? - .into_inner(); - - let table = Table { - id: table_id as u64, - table_name: table_info.table_name(), - table_schema: vec![], - }; - let table_route = TableRoute::new(table, table_route.region_routes); - let table_route_value = table_route - .try_into() - .context(error::TableRouteConversionSnafu)?; - - Ok(Some((table_info.into_inner(), table_route_value))) - } else { - Ok(None) - } -} - -pub(crate) async fn fetch_tables( - ctx: &Context, - table_ids: Vec, -) -> Result> { - let table_metadata_manager = &ctx.table_metadata_manager; - - let mut tables = vec![]; - // Maybe we can optimize the for loop in the future, but in general, - // there won't be many keys, in fact, there is usually just one. - for table_id in table_ids { - if let Some(x) = fetch_table(table_metadata_manager, table_id).await? { - tables.push(x); - } - } - - Ok(tables) -} - -#[cfg(test)] -pub(crate) mod tests { - use chrono::DateTime; - use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; - use common_meta::key::TableMetadataManagerRef; - use common_meta::peer::Peer; - use common_meta::rpc::router::{Region, RegionRoute}; - use datatypes::data_type::ConcreteDataType; - use datatypes::schema::{ColumnSchema, RawSchema}; - use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType}; - use table::requests::TableOptions; - - pub(crate) async fn prepare_table_region_and_info_value( - table_metadata_manager: &TableMetadataManagerRef, - table: &str, - ) { - let table_info = RawTableInfo { - ident: TableIdent::new(1), - name: table.to_string(), - desc: None, - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - schema_name: DEFAULT_SCHEMA_NAME.to_string(), - meta: RawTableMeta { - schema: RawSchema::new(vec![ColumnSchema::new( - "a", - ConcreteDataType::string_datatype(), - true, - )]), - primary_key_indices: vec![], - value_indices: vec![], - engine: MITO_ENGINE.to_string(), - next_column_id: 1, - region_numbers: vec![1, 2, 3, 4], - options: TableOptions::default(), - created_on: DateTime::default(), - partition_key_indices: vec![], - }, - table_type: TableType::Base, - }; - - let region_route_factory = |region_id: u64, peer: u64| RegionRoute { - region: Region { - id: region_id.into(), - ..Default::default() - }, - leader_peer: Some(Peer { - id: peer, - addr: String::new(), - }), - follower_peers: vec![], - }; - - // Region distribution: - // Datanode => Regions - // 1 => 1, 2 - // 2 => 3 - // 3 => 4 - let region_routes = vec![ - region_route_factory(1, 1), - region_route_factory(2, 1), - region_route_factory(3, 2), - region_route_factory(4, 3), - ]; - table_metadata_manager - .create_table_metadata(table_info, region_routes) - .await - .unwrap(); - } -} diff --git a/src/meta-srv/src/test_util.rs b/src/meta-srv/src/test_util.rs index b5317e04b7..1de7ae77cf 100644 --- a/src/meta-srv/src/test_util.rs +++ b/src/meta-srv/src/test_util.rs @@ -14,12 +14,18 @@ use std::sync::Arc; -use common_meta::key::TableMetadataManager; +use chrono::DateTime; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; +use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute}; use common_meta::sequence::Sequence; use common_meta::state_store::KvStateStore; use common_procedure::local::{LocalManager, ManagerConfig}; +use datatypes::data_type::ConcreteDataType; +use datatypes::schema::{ColumnSchema, RawSchema}; +use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType}; +use table::requests::TableOptions; use crate::cluster::MetaPeerClientBuilder; use crate::handler::{HeartbeatMailbox, Pushers}; @@ -88,3 +94,60 @@ pub(crate) fn create_region_failover_manager() -> Arc { Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap(kv_store))), )) } + +pub(crate) async fn prepare_table_region_and_info_value( + table_metadata_manager: &TableMetadataManagerRef, + table: &str, +) { + let table_info = RawTableInfo { + ident: TableIdent::new(1), + name: table.to_string(), + desc: None, + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + meta: RawTableMeta { + schema: RawSchema::new(vec![ColumnSchema::new( + "a", + ConcreteDataType::string_datatype(), + true, + )]), + primary_key_indices: vec![], + value_indices: vec![], + engine: MITO_ENGINE.to_string(), + next_column_id: 1, + region_numbers: vec![1, 2, 3, 4], + options: TableOptions::default(), + created_on: DateTime::default(), + partition_key_indices: vec![], + }, + table_type: TableType::Base, + }; + + let region_route_factory = |region_id: u64, peer: u64| RegionRoute { + region: Region { + id: region_id.into(), + ..Default::default() + }, + leader_peer: Some(Peer { + id: peer, + addr: String::new(), + }), + follower_peers: vec![], + }; + + // Region distribution: + // Datanode => Regions + // 1 => 1, 2 + // 2 => 3 + // 3 => 4 + let region_routes = vec![ + region_route_factory(1, 1), + region_route_factory(2, 1), + region_route_factory(3, 2), + region_route_factory(4, 3), + ]; + table_metadata_manager + .create_table_metadata(table_info, region_routes) + .await + .unwrap(); +} diff --git a/src/partition/src/lib.rs b/src/partition/src/lib.rs index 9dba350441..33d156c7be 100644 --- a/src/partition/src/lib.rs +++ b/src/partition/src/lib.rs @@ -20,7 +20,6 @@ pub mod manager; pub mod metrics; pub mod partition; pub mod range; -pub mod route; pub mod splitter; pub use crate::partition::{PartitionRule, PartitionRuleRef}; diff --git a/src/partition/src/manager.rs b/src/partition/src/manager.rs index 08e38a6401..76e6870775 100644 --- a/src/partition/src/manager.rs +++ b/src/partition/src/manager.rs @@ -21,6 +21,7 @@ use common_meta::kv_backend::KvBackendRef; use common_meta::peer::Peer; use common_meta::rpc::router::{convert_to_region_map, RegionRoutes}; use common_query::prelude::Expr; +use common_telemetry::timer; use datafusion_expr::{BinaryExpr, Expr as DfExpr, Operator}; use datatypes::prelude::Value; use snafu::{ensure, OptionExt, ResultExt}; @@ -29,6 +30,7 @@ use table::metadata::TableId; use crate::columns::RangeColumnsPartitionRule; use crate::error::{FindLeaderSnafu, Result}; +use crate::metrics::METRIC_TABLE_ROUTE_GET; use crate::partition::{PartitionBound, PartitionDef, PartitionExpr}; use crate::range::RangePartitionRule; use crate::splitter::RowSplitter; @@ -66,6 +68,7 @@ impl PartitionRuleManager { /// Find table route of given table name. pub async fn find_table_route(&self, table_id: TableId) -> Result { + let _timer = timer!(METRIC_TABLE_ROUTE_GET); let route = self .table_route_manager .get(table_id) diff --git a/src/partition/src/metrics.rs b/src/partition/src/metrics.rs index 54785dd01f..18b921383e 100644 --- a/src/partition/src/metrics.rs +++ b/src/partition/src/metrics.rs @@ -12,5 +12,4 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub(crate) const METRIC_TABLE_ROUTE_GET_REMOTE: &str = "frontend.table_route.get.remote"; pub(crate) const METRIC_TABLE_ROUTE_GET: &str = "frontend.table_route.get"; diff --git a/src/partition/src/route.rs b/src/partition/src/route.rs deleted file mode 100644 index 151b15ef10..0000000000 --- a/src/partition/src/route.rs +++ /dev/null @@ -1,90 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; -use std::time::Duration; - -use common_meta::rpc::router::{RouteRequest, TableRoute}; -use common_telemetry::timer; -use meta_client::client::MetaClient; -use moka::future::{Cache, CacheBuilder}; -use snafu::{ensure, ResultExt}; -use table::metadata::TableId; - -use crate::error::{self, Result}; -use crate::metrics; - -type TableRouteCache = Cache>; - -pub struct TableRoutes { - meta_client: Arc, - cache: TableRouteCache, -} - -// TODO(hl): maybe periodically refresh table route cache? -impl TableRoutes { - pub fn new(meta_client: Arc) -> Self { - Self { - meta_client, - cache: CacheBuilder::new(1024) - .time_to_live(Duration::from_secs(30 * 60)) - .time_to_idle(Duration::from_secs(5 * 60)) - .build(), - } - } - - pub async fn get_route(&self, table_id: TableId) -> Result> { - let _timer = timer!(metrics::METRIC_TABLE_ROUTE_GET); - - self.cache - .try_get_with_by_ref(&table_id, self.get_from_meta(table_id)) - .await - .map_err(|e| { - error::GetCacheSnafu { - err_msg: format!("{e:?}"), - } - .build() - }) - } - - async fn get_from_meta(&self, table_id: TableId) -> Result> { - let _timer = timer!(metrics::METRIC_TABLE_ROUTE_GET_REMOTE); - - let mut resp = self - .meta_client - .route(RouteRequest { - table_ids: vec![table_id], - }) - .await - .context(error::RequestMetaSnafu)?; - ensure!( - !resp.table_routes.is_empty(), - error::FindTableRoutesSnafu { table_id } - ); - let route = resp.table_routes.swap_remove(0); - Ok(Arc::new(route)) - } - - pub async fn insert_table_route(&self, table_id: TableId, table_route: Arc) { - self.cache.insert(table_id, table_route).await - } - - pub async fn invalidate_table_route(&self, table_id: TableId) { - self.cache.invalidate(&table_id).await - } - - pub fn cache(&self) -> &TableRouteCache { - &self.cache - } -}