refactor: remove router rpc (#2646)

This commit is contained in:
Weny Xu
2023-10-31 13:54:56 +09:00
committed by GitHub
parent 36c0742c45
commit 88eb69530a
19 changed files with 76 additions and 926 deletions

2
Cargo.lock generated
View File

@@ -3635,7 +3635,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=1d7dc6f18b310355a4c16fb453d3ca7ed09f048d#1d7dc6f18b310355a4c16fb453d3ca7ed09f048d"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=20cdc57c3f320345b122eea43bc549a19d342e51#20cdc57c3f320345b122eea43bc549a19d342e51"
dependencies = [
"prost 0.12.1",
"serde",

View File

@@ -79,7 +79,7 @@ derive_builder = "0.12"
etcd-client = "0.11"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "1d7dc6f18b310355a4c16fb453d3ca7ed09f048d" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "20cdc57c3f320345b122eea43bc549a19d342e51" }
humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"

View File

@@ -15,65 +15,19 @@
use std::collections::{BTreeMap, HashMap, HashSet};
use api::v1::meta::{
Partition as PbPartition, Peer as PbPeer, Region as PbRegion, RegionRoute as PbRegionRoute,
RouteRequest as PbRouteRequest, RouteResponse as PbRouteResponse, Table as PbTable,
TableId as PbTableId, TableRoute as PbTableRoute, TableRouteValue as PbTableRouteValue,
Partition as PbPartition, Peer as PbPeer, Region as PbRegion, Table as PbTable,
TableRoute as PbTableRoute,
};
use serde::ser::SerializeSeq;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use snafu::OptionExt;
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId;
use crate::error::{self, Result};
use crate::key::RegionDistribution;
use crate::peer::Peer;
use crate::rpc::util;
use crate::table_name::TableName;
#[derive(Debug, Clone, Default)]
pub struct RouteRequest {
pub table_ids: Vec<TableId>,
}
impl From<RouteRequest> for PbRouteRequest {
fn from(mut req: RouteRequest) -> Self {
Self {
header: None,
table_ids: req.table_ids.drain(..).map(|id| PbTableId { id }).collect(),
}
}
}
impl RouteRequest {
#[inline]
pub fn new(table_id: TableId) -> Self {
Self {
table_ids: vec![table_id],
}
}
}
#[derive(Debug, Clone)]
pub struct RouteResponse {
pub table_routes: Vec<TableRoute>,
}
impl TryFrom<PbRouteResponse> for RouteResponse {
type Error = error::Error;
fn try_from(pb: PbRouteResponse) -> Result<Self> {
util::check_response_header(pb.header.as_ref())?;
let table_routes = pb
.table_routes
.into_iter()
.map(|x| TableRoute::try_from_raw(&pb.peers, x))
.collect::<Result<Vec<_>>>()?;
Ok(Self { table_routes })
}
}
pub fn region_distribution(region_routes: &[RegionRoute]) -> Result<RegionDistribution> {
let mut regions_id_map = RegionDistribution::new();
for route in region_routes.iter() {
@@ -199,107 +153,6 @@ impl TableRoute {
Ok(Self::new(table, region_routes))
}
pub fn try_into_raw(self) -> Result<(Vec<PbPeer>, PbTableRoute)> {
let mut peers = HashSet::new();
self.region_routes
.iter()
.filter_map(|x| x.leader_peer.as_ref())
.for_each(|p| {
let _ = peers.insert(p.clone());
});
self.region_routes
.iter()
.flat_map(|x| x.follower_peers.iter())
.for_each(|p| {
let _ = peers.insert(p.clone());
});
let mut peers = peers.into_iter().map(Into::into).collect::<Vec<PbPeer>>();
peers.sort_by_key(|x| x.id);
let find_peer = |peer_id: u64| -> u64 {
peers
.iter()
.enumerate()
.find_map(|(i, x)| {
if x.id == peer_id {
Some(i as u64)
} else {
None
}
})
.unwrap_or_else(|| {
panic!("Peer {peer_id} must be present when collecting all peers.")
})
};
let mut region_routes = Vec::with_capacity(self.region_routes.len());
for region_route in self.region_routes.into_iter() {
let leader_peer_index = region_route.leader_peer.map(|x| find_peer(x.id)).context(
error::RouteInfoCorruptedSnafu {
err_msg: "'leader_peer' is empty in region route",
},
)?;
let follower_peer_indexes = region_route
.follower_peers
.iter()
.map(|x| find_peer(x.id))
.collect::<Vec<_>>();
region_routes.push(PbRegionRoute {
region: Some(region_route.region.into()),
leader_peer_index,
follower_peer_indexes,
});
}
let table_route = PbTableRoute {
table: Some(self.table.into()),
region_routes,
};
Ok((peers, table_route))
}
pub fn find_leaders(&self) -> HashSet<Peer> {
find_leaders(&self.region_routes)
}
pub fn find_leader_regions(&self, datanode: &Peer) -> Vec<RegionNumber> {
find_leader_regions(&self.region_routes, datanode)
}
pub fn find_region_leader(&self, region_number: RegionNumber) -> Option<&Peer> {
self.region_leaders
.get(&region_number)
.and_then(|x| x.as_ref())
}
}
impl TryFrom<PbTableRouteValue> for TableRoute {
type Error = error::Error;
fn try_from(pb: PbTableRouteValue) -> Result<Self> {
TableRoute::try_from_raw(
&pb.peers,
pb.table_route.context(error::InvalidProtoMsgSnafu {
err_msg: "expected table_route",
})?,
)
}
}
impl TryFrom<TableRoute> for PbTableRouteValue {
type Error = error::Error;
fn try_from(table_route: TableRoute) -> Result<Self> {
let (peers, table_route) = table_route.try_into_raw()?;
Ok(PbTableRouteValue {
peers,
table_route: Some(table_route),
})
}
}
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
@@ -456,12 +309,6 @@ impl From<PbPartition> for Partition {
#[cfg(test)]
mod tests {
use api::v1::meta::{
Partition as PbPartition, Peer as PbPeer, Region as PbRegion, RegionRoute as PbRegionRoute,
RouteRequest as PbRouteRequest, RouteResponse as PbRouteResponse, Table as PbTable,
TableName as PbTableName, TableRoute as PbTableRoute,
};
use super::*;
#[test]
@@ -476,182 +323,4 @@ mod tests {
assert_eq!(got, p);
}
#[test]
fn test_route_request_trans() {
let req = RouteRequest {
table_ids: vec![1, 2],
};
let into_req: PbRouteRequest = req.into();
assert!(into_req.header.is_none());
assert_eq!(1, into_req.table_ids.get(0).unwrap().id);
assert_eq!(2, into_req.table_ids.get(1).unwrap().id);
}
#[test]
fn test_route_response_trans() {
let res = PbRouteResponse {
header: None,
peers: vec![
PbPeer {
id: 1,
addr: "peer1".to_string(),
},
PbPeer {
id: 2,
addr: "peer2".to_string(),
},
],
table_routes: vec![PbTableRoute {
table: Some(PbTable {
id: 1,
table_name: Some(PbTableName {
catalog_name: "c1".to_string(),
schema_name: "s1".to_string(),
table_name: "t1".to_string(),
}),
table_schema: b"schema".to_vec(),
}),
region_routes: vec![PbRegionRoute {
region: Some(PbRegion {
id: 1,
name: "region1".to_string(),
partition: Some(PbPartition {
column_list: vec![b"c1".to_vec(), b"c2".to_vec()],
value_list: vec![b"v1".to_vec(), b"v2".to_vec()],
}),
attrs: Default::default(),
}),
leader_peer_index: 0,
follower_peer_indexes: vec![1],
}],
}],
};
let res: RouteResponse = res.try_into().unwrap();
let mut table_routes = res.table_routes;
assert_eq!(1, table_routes.len());
let table_route = table_routes.remove(0);
let table = table_route.table;
assert_eq!(1, table.id);
assert_eq!("c1", table.table_name.catalog_name);
assert_eq!("s1", table.table_name.schema_name);
assert_eq!("t1", table.table_name.table_name);
let mut region_routes = table_route.region_routes;
assert_eq!(1, region_routes.len());
let region_route = region_routes.remove(0);
let region = region_route.region;
assert_eq!(1, region.id);
assert_eq!("region1", region.name);
let partition = region.partition.unwrap();
assert_eq!(vec![b"c1".to_vec(), b"c2".to_vec()], partition.column_list);
assert_eq!(vec![b"v1".to_vec(), b"v2".to_vec()], partition.value_list);
assert_eq!(1, region_route.leader_peer.as_ref().unwrap().id);
assert_eq!("peer1", region_route.leader_peer.as_ref().unwrap().addr);
assert_eq!(1, region_route.follower_peers.len());
assert_eq!(2, region_route.follower_peers.get(0).unwrap().id);
assert_eq!("peer2", region_route.follower_peers.get(0).unwrap().addr);
}
#[test]
fn test_table_route_raw_conversion() {
let raw_peers = vec![
PbPeer {
id: 1,
addr: "a1".to_string(),
},
PbPeer {
id: 2,
addr: "a2".to_string(),
},
PbPeer {
id: 3,
addr: "a3".to_string(),
},
];
// region distribution:
// region id => leader peer id + [follower peer id]
// 1 => 2 + [1, 3]
// 2 => 1 + [2, 3]
let raw_table_route = PbTableRoute {
table: Some(PbTable {
id: 1,
table_name: Some(PbTableName {
catalog_name: "c1".to_string(),
schema_name: "s1".to_string(),
table_name: "t1".to_string(),
}),
table_schema: vec![],
}),
region_routes: vec![
PbRegionRoute {
region: Some(PbRegion {
id: 1,
name: "r1".to_string(),
partition: None,
attrs: HashMap::new(),
}),
leader_peer_index: 1,
follower_peer_indexes: vec![0, 2],
},
PbRegionRoute {
region: Some(PbRegion {
id: 2,
name: "r2".to_string(),
partition: None,
attrs: HashMap::new(),
}),
leader_peer_index: 0,
follower_peer_indexes: vec![1, 2],
},
],
};
let table_route = TableRoute {
table: Table {
id: 1,
table_name: TableName::new("c1", "s1", "t1"),
table_schema: vec![],
},
region_routes: vec![
RegionRoute {
region: Region {
id: 1.into(),
name: "r1".to_string(),
partition: None,
attrs: BTreeMap::new(),
},
leader_peer: Some(Peer::new(2, "a2")),
follower_peers: vec![Peer::new(1, "a1"), Peer::new(3, "a3")],
},
RegionRoute {
region: Region {
id: 2.into(),
name: "r2".to_string(),
partition: None,
attrs: BTreeMap::new(),
},
leader_peer: Some(Peer::new(1, "a1")),
follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
},
],
region_leaders: HashMap::from([
(2, Some(Peer::new(1, "a1"))),
(1, Some(Peer::new(2, "a2"))),
]),
};
let from_raw = TableRoute::try_from_raw(&raw_peers, raw_table_route.clone()).unwrap();
assert_eq!(from_raw, table_route);
let into_raw = table_route.try_into_raw().unwrap();
assert_eq!(into_raw.0, raw_peers);
assert_eq!(into_raw.1, raw_table_route);
}
}

View File

@@ -17,7 +17,7 @@ mod ddl;
mod heartbeat;
mod load_balance;
mod lock;
mod router;
mod store;
use api::v1::meta::Role;
@@ -27,7 +27,6 @@ use common_meta::ddl::{DdlTaskExecutor, ExecutorContext};
use common_meta::error::{self as meta_error, Result as MetaResult};
use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_meta::rpc::lock::{LockRequest, LockResponse, UnlockRequest};
use common_meta::rpc::router::{RouteRequest, RouteResponse};
use common_meta::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
@@ -37,7 +36,6 @@ use common_telemetry::info;
use ddl::Client as DdlClient;
use heartbeat::Client as HeartbeatClient;
use lock::Client as LockClient;
use router::Client as RouterClient;
use snafu::{OptionExt, ResultExt};
use store::Client as StoreClient;
@@ -151,9 +149,6 @@ impl MetaClientBuilder {
DEFAULT_ASK_LEADER_MAX_RETRY,
));
}
if self.enable_router {
client.router = Some(RouterClient::new(self.id, self.role, mgr.clone()));
}
if self.enable_store {
client.store = Some(StoreClient::new(self.id, self.role, mgr.clone()));
}
@@ -179,7 +174,6 @@ pub struct MetaClient {
id: Id,
channel_manager: ChannelManager,
heartbeat: Option<HeartbeatClient>,
router: Option<RouterClient>,
store: Option<StoreClient>,
lock: Option<LockClient>,
ddl: Option<DdlClient>,
@@ -226,10 +220,6 @@ impl MetaClient {
client.start(urls.clone()).await?;
info!("Heartbeat client started");
}
if let Some(client) = &mut self.router {
client.start(urls.clone()).await?;
info!("Router client started");
}
if let Some(client) = &mut self.store {
client.start(urls.clone()).await?;
info!("Store client started");
@@ -262,33 +252,6 @@ impl MetaClient {
self.heartbeat_client()?.heartbeat().await
}
/// Fetch routing information for tables. The smallest unit is the complete
/// routing information(all regions) of a table.
///
/// ```text
/// table_1
/// table_name
/// table_schema
/// regions
/// region_1
/// leader_peer
/// follower_peer_1, follower_peer_2
/// region_2
/// leader_peer
/// follower_peer_1, follower_peer_2, follower_peer_3
/// region_xxx
/// table_2
/// ...
/// ```
///
pub async fn route(&self, req: RouteRequest) -> Result<RouteResponse> {
self.router_client()?
.route(req.into())
.await?
.try_into()
.context(ConvertMetaResponseSnafu)
}
/// Range gets the keys in the range from the key-value store.
pub async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
self.store_client()?
@@ -386,13 +349,6 @@ impl MetaClient {
})
}
#[inline]
pub fn router_client(&self) -> Result<RouterClient> {
self.router.clone().context(error::NotStartedSnafu {
name: "store_client",
})
}
#[inline]
pub fn store_client(&self) -> Result<StoreClient> {
self.store.clone().context(error::NotStartedSnafu {
@@ -489,7 +445,6 @@ mod tests {
.enable_heartbeat()
.build();
let _ = meta_client.heartbeat_client().unwrap();
assert!(meta_client.router_client().is_err());
assert!(meta_client.store_client().is_err());
meta_client.start(urls).await.unwrap();
assert!(meta_client.heartbeat_client().unwrap().is_started().await);
@@ -498,16 +453,13 @@ mod tests {
.enable_router()
.build();
assert!(meta_client.heartbeat_client().is_err());
let _ = meta_client.router_client().unwrap();
assert!(meta_client.store_client().is_err());
meta_client.start(urls).await.unwrap();
assert!(meta_client.router_client().unwrap().is_started().await);
let mut meta_client = MetaClientBuilder::new(0, 0, Role::Datanode)
.enable_store()
.build();
assert!(meta_client.heartbeat_client().is_err());
assert!(meta_client.router_client().is_err());
let _ = meta_client.store_client().unwrap();
meta_client.start(urls).await.unwrap();
assert!(meta_client.store_client().unwrap().is_started().await);
@@ -520,11 +472,9 @@ mod tests {
assert_eq!(1, meta_client.id().0);
assert_eq!(2, meta_client.id().1);
let _ = meta_client.heartbeat_client().unwrap();
let _ = meta_client.router_client().unwrap();
let _ = meta_client.store_client().unwrap();
meta_client.start(urls).await.unwrap();
assert!(meta_client.heartbeat_client().unwrap().is_started().await);
assert!(meta_client.router_client().unwrap().is_started().await);
assert!(meta_client.store_client().unwrap().is_started().await);
}
@@ -540,20 +490,6 @@ mod tests {
assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
}
#[tokio::test]
async fn test_not_start_router_client() {
let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
let mut meta_client = MetaClientBuilder::new(0, 0, Role::Datanode)
.enable_heartbeat()
.enable_store()
.build();
meta_client.start(urls).await.unwrap();
let req = RouteRequest::new(1);
let res = meta_client.route(req).await;
assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
}
#[tokio::test]
async fn test_not_start_store_client() {
let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];

View File

@@ -1,173 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::sync::Arc;
use api::v1::meta::router_client::RouterClient;
use api::v1::meta::{Role, RouteRequest, RouteResponse};
use common_grpc::channel_manager::ChannelManager;
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::RwLock;
use tonic::transport::Channel;
use crate::client::{load_balance as lb, Id};
use crate::error;
use crate::error::Result;
#[derive(Clone, Debug)]
pub struct Client {
inner: Arc<RwLock<Inner>>,
}
impl Client {
pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
let inner = Arc::new(RwLock::new(Inner {
id,
role,
channel_manager,
peers: vec![],
}));
Self { inner }
}
pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
where
U: AsRef<str>,
A: AsRef<[U]>,
{
let mut inner = self.inner.write().await;
inner.start(urls).await
}
pub async fn is_started(&self) -> bool {
let inner = self.inner.read().await;
inner.is_started()
}
pub async fn route(&self, req: RouteRequest) -> Result<RouteResponse> {
let inner = self.inner.read().await;
inner.route(req).await
}
}
#[derive(Debug)]
struct Inner {
id: Id,
role: Role,
channel_manager: ChannelManager,
peers: Vec<String>,
}
impl Inner {
async fn start<U, A>(&mut self, urls: A) -> Result<()>
where
U: AsRef<str>,
A: AsRef<[U]>,
{
ensure!(
!self.is_started(),
error::IllegalGrpcClientStateSnafu {
err_msg: "Router client already started",
}
);
self.peers = urls
.as_ref()
.iter()
.map(|url| url.as_ref().to_string())
.collect::<HashSet<_>>()
.drain()
.collect::<Vec<_>>();
Ok(())
}
async fn route(&self, mut req: RouteRequest) -> Result<RouteResponse> {
let mut client = self.random_client()?;
req.set_header(self.id, self.role);
let res = client.route(req).await.map_err(error::Error::from)?;
Ok(res.into_inner())
}
fn random_client(&self) -> Result<RouterClient<Channel>> {
let len = self.peers.len();
let peer = lb::random_get(len, |i| Some(&self.peers[i])).context(
error::IllegalGrpcClientStateSnafu {
err_msg: "Empty peers, router client may not start yet",
},
)?;
self.make_client(peer)
}
fn make_client(&self, addr: impl AsRef<str>) -> Result<RouterClient<Channel>> {
let channel = self
.channel_manager
.get(addr)
.context(error::CreateChannelSnafu)?;
Ok(RouterClient::new(channel))
}
#[inline]
fn is_started(&self) -> bool {
!self.peers.is_empty()
}
}
#[cfg(test)]
mod test {
use super::*;
#[tokio::test]
async fn test_start_client() {
let mut client = Client::new((0, 0), Role::Frontend, ChannelManager::default());
assert!(!client.is_started().await);
client
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
.await
.unwrap();
assert!(client.is_started().await);
}
#[tokio::test]
async fn test_already_start() {
let mut client = Client::new((0, 0), Role::Frontend, ChannelManager::default());
client
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
.await
.unwrap();
assert!(client.is_started().await);
let res = client.start(&["127.0.0.1:1002"]).await;
assert!(res.is_err());
assert!(matches!(
res.err(),
Some(error::Error::IllegalGrpcClientState { .. })
));
}
#[tokio::test]
async fn test_start_with_duplicate_peers() {
let mut client = Client::new((0, 0), Role::Frontend, ChannelManager::default());
client
.start(&["127.0.0.1:1000", "127.0.0.1:1000", "127.0.0.1:1000"])
.await
.unwrap();
assert_eq!(1, client.inner.write().await.peers.len());
}
}

View File

@@ -18,7 +18,6 @@ use api::v1::meta::cluster_server::ClusterServer;
use api::v1::meta::ddl_task_server::DdlTaskServer;
use api::v1::meta::heartbeat_server::HeartbeatServer;
use api::v1::meta::lock_server::LockServer;
use api::v1::meta::router_server::RouterServer;
use api::v1::meta::store_server::StoreServer;
use common_base::Plugins;
use etcd_client::Client;
@@ -152,7 +151,6 @@ pub fn router(meta_srv: MetaSrv) -> Router {
tonic::transport::Server::builder()
.accept_http1(true) // for admin services
.add_service(HeartbeatServer::new(meta_srv.clone()))
.add_service(RouterServer::new(meta_srv.clone()))
.add_service(StoreServer::new(meta_srv.clone()))
.add_service(ClusterServer::new(meta_srv.clone()))
.add_service(LockServer::new(meta_srv.clone()))

View File

@@ -78,7 +78,7 @@ mod test {
use crate::handler::node_stat::{RegionStat, Stat};
use crate::metasrv::builder::MetaSrvBuilder;
use crate::service::store::kv::KvBackendAdapter;
use crate::{table_routes, test_util};
use crate::test_util;
#[tokio::test]
async fn test_handle_region_lease() {
@@ -94,11 +94,7 @@ mod test {
let table_metadata_manager = Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap(
kv_store.clone(),
)));
table_routes::tests::prepare_table_region_and_info_value(
&table_metadata_manager,
table_name,
)
.await;
test_util::prepare_table_region_and_info_value(&table_metadata_manager, table_name).await;
let req = HeartbeatRequest {
duration_since_epoch: 1234,

View File

@@ -34,7 +34,6 @@ pub mod pubsub;
pub mod selector;
pub mod service;
pub mod table_meta_alloc;
pub mod table_routes;
pub use crate::error::Result;

View File

@@ -13,7 +13,6 @@
// limitations under the License.
pub(crate) const METRIC_META_KV_REQUEST: &str = "meta.kv_request";
pub(crate) const METRIC_META_ROUTE_REQUEST: &str = "meta.route_request";
pub(crate) const METRIC_META_HEARTBEAT_CONNECTION_NUM: &str = "meta.heartbeat_connection_num";
pub(crate) const METRIC_META_HANDLER_EXECUTE: &str = "meta.handler_execute";
pub const METRIC_META_INACTIVE_REGIONS: &str = "meta.inactive_regions";

View File

@@ -17,7 +17,6 @@ use std::time::Duration;
use api::v1::meta::ddl_task_server::DdlTaskServer;
use api::v1::meta::heartbeat_server::HeartbeatServer;
use api::v1::meta::router_server::RouterServer;
use api::v1::meta::store_server::StoreServer;
use client::client_manager::DatanodeClients;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
@@ -85,7 +84,6 @@ pub async fn mock(
let _handle = tokio::spawn(async move {
tonic::transport::Server::builder()
.add_service(HeartbeatServer::new(service.clone()))
.add_service(RouterServer::new(service.clone()))
.add_service(StoreServer::new(service.clone()))
.add_service(DdlTaskServer::new(service.clone()))
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))

View File

@@ -411,7 +411,7 @@ mod tests {
use crate::service::mailbox::Channel;
use crate::service::store::kv::{KvBackendAdapter, KvStoreRef};
use crate::service::store::memory::MemStore;
use crate::table_routes;
use crate::test_util;
struct RandomNodeSelector {
nodes: Vec<Peer>,
@@ -497,11 +497,7 @@ mod tests {
let table_metadata_manager = Arc::new(TableMetadataManager::new(
KvBackendAdapter::wrap(kv_store.clone()),
));
table_routes::tests::prepare_table_region_and_info_value(
&table_metadata_manager,
table,
)
.await;
test_util::prepare_table_region_and_info_value(&table_metadata_manager, table).await;
let region_distribution = table_metadata_manager
.table_route_manager()
.get_region_distribution(1)

View File

@@ -23,7 +23,6 @@ pub mod ddl;
mod heartbeat;
pub mod lock;
pub mod mailbox;
pub mod router;
pub mod store;
pub type GrpcResult<T> = std::result::Result<Response<T>, Status>;

View File

@@ -1,100 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::{
router_server, Peer, PeerDict, ResponseHeader, RouteRequest, RouteResponse, TableRoute,
TableRouteValue,
};
use common_meta::key::table_info::TableInfoValue;
use common_telemetry::timer;
use snafu::ResultExt;
use tonic::{Request, Response};
use crate::error::{Result, TableMetadataManagerSnafu};
use crate::metasrv::{Context, MetaSrv};
use crate::metrics::METRIC_META_ROUTE_REQUEST;
use crate::service::GrpcResult;
use crate::table_routes::fetch_tables;
#[async_trait::async_trait]
impl router_server::Router for MetaSrv {
async fn route(&self, req: Request<RouteRequest>) -> GrpcResult<RouteResponse> {
let req = req.into_inner();
let cluster_id = req.header.as_ref().map_or(0, |h| h.cluster_id);
let _timer = timer!(
METRIC_META_ROUTE_REQUEST,
&[
("op", "route".to_string()),
("cluster_id", cluster_id.to_string())
]
);
let ctx = self.new_ctx();
let res = handle_route(req, ctx).await?;
Ok(Response::new(res))
}
}
async fn handle_route(req: RouteRequest, ctx: Context) -> Result<RouteResponse> {
let RouteRequest { header, table_ids } = req;
let cluster_id = header.as_ref().map_or(0, |h| h.cluster_id);
let table_ids = table_ids.iter().map(|x| x.id).collect::<Vec<_>>();
let tables = fetch_tables(&ctx, table_ids).await?;
let (peers, table_routes) = fill_table_routes(tables)?;
let header = Some(ResponseHeader::success(cluster_id));
Ok(RouteResponse {
header,
peers,
table_routes,
})
}
pub(crate) fn fill_table_routes(
tables: Vec<(TableInfoValue, TableRouteValue)>,
) -> Result<(Vec<Peer>, Vec<TableRoute>)> {
let mut peer_dict = PeerDict::default();
let mut table_routes = vec![];
for (tgv, trv) in tables {
let TableRouteValue {
peers,
mut table_route,
} = trv;
if let Some(table_route) = &mut table_route {
for rr in &mut table_route.region_routes {
if let Some(peer) = peers.get(rr.leader_peer_index as usize) {
rr.leader_peer_index = peer_dict.get_or_insert(peer.clone()) as u64;
}
for index in &mut rr.follower_peer_indexes {
if let Some(peer) = peers.get(*index as usize) {
*index = peer_dict.get_or_insert(peer.clone()) as u64;
}
}
}
if let Some(table) = &mut table_route.table {
table.table_schema = tgv.try_as_raw_value().context(TableMetadataManagerSnafu)?;
}
}
if let Some(table_route) = table_route {
table_routes.push(table_route)
}
}
Ok((peer_dict.into_peers(), table_routes))
}

View File

@@ -1,141 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::TableRouteValue;
use common_meta::key::table_info::TableInfoValue;
use common_meta::key::TableMetadataManagerRef;
use common_meta::rpc::router::{Table, TableRoute};
use snafu::{OptionExt, ResultExt};
use table::metadata::TableId;
use crate::error::{self, Result, TableMetadataManagerSnafu, TableRouteNotFoundSnafu};
use crate::metasrv::Context;
pub(crate) async fn fetch_table(
table_metadata_manager: &TableMetadataManagerRef,
table_id: TableId,
) -> Result<Option<(TableInfoValue, TableRouteValue)>> {
let (table_info, table_route) = table_metadata_manager
.get_full_table_info(table_id)
.await
.context(TableMetadataManagerSnafu)?;
if let Some(table_info) = table_info {
let table_route = table_route
.context(TableRouteNotFoundSnafu { table_id })?
.into_inner();
let table = Table {
id: table_id as u64,
table_name: table_info.table_name(),
table_schema: vec![],
};
let table_route = TableRoute::new(table, table_route.region_routes);
let table_route_value = table_route
.try_into()
.context(error::TableRouteConversionSnafu)?;
Ok(Some((table_info.into_inner(), table_route_value)))
} else {
Ok(None)
}
}
pub(crate) async fn fetch_tables(
ctx: &Context,
table_ids: Vec<TableId>,
) -> Result<Vec<(TableInfoValue, TableRouteValue)>> {
let table_metadata_manager = &ctx.table_metadata_manager;
let mut tables = vec![];
// Maybe we can optimize the for loop in the future, but in general,
// there won't be many keys, in fact, there is usually just one.
for table_id in table_ids {
if let Some(x) = fetch_table(table_metadata_manager, table_id).await? {
tables.push(x);
}
}
Ok(tables)
}
#[cfg(test)]
pub(crate) mod tests {
use chrono::DateTime;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE};
use common_meta::key::TableMetadataManagerRef;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType};
use table::requests::TableOptions;
pub(crate) async fn prepare_table_region_and_info_value(
table_metadata_manager: &TableMetadataManagerRef,
table: &str,
) {
let table_info = RawTableInfo {
ident: TableIdent::new(1),
name: table.to_string(),
desc: None,
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
meta: RawTableMeta {
schema: RawSchema::new(vec![ColumnSchema::new(
"a",
ConcreteDataType::string_datatype(),
true,
)]),
primary_key_indices: vec![],
value_indices: vec![],
engine: MITO_ENGINE.to_string(),
next_column_id: 1,
region_numbers: vec![1, 2, 3, 4],
options: TableOptions::default(),
created_on: DateTime::default(),
partition_key_indices: vec![],
},
table_type: TableType::Base,
};
let region_route_factory = |region_id: u64, peer: u64| RegionRoute {
region: Region {
id: region_id.into(),
..Default::default()
},
leader_peer: Some(Peer {
id: peer,
addr: String::new(),
}),
follower_peers: vec![],
};
// Region distribution:
// Datanode => Regions
// 1 => 1, 2
// 2 => 3
// 3 => 4
let region_routes = vec![
region_route_factory(1, 1),
region_route_factory(2, 1),
region_route_factory(3, 2),
region_route_factory(4, 3),
];
table_metadata_manager
.create_table_metadata(table_info, region_routes)
.await
.unwrap();
}
}

View File

@@ -14,12 +14,18 @@
use std::sync::Arc;
use common_meta::key::TableMetadataManager;
use chrono::DateTime;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::sequence::Sequence;
use common_meta::state_store::KvStateStore;
use common_procedure::local::{LocalManager, ManagerConfig};
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType};
use table::requests::TableOptions;
use crate::cluster::MetaPeerClientBuilder;
use crate::handler::{HeartbeatMailbox, Pushers};
@@ -88,3 +94,60 @@ pub(crate) fn create_region_failover_manager() -> Arc<RegionFailoverManager> {
Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap(kv_store))),
))
}
pub(crate) async fn prepare_table_region_and_info_value(
table_metadata_manager: &TableMetadataManagerRef,
table: &str,
) {
let table_info = RawTableInfo {
ident: TableIdent::new(1),
name: table.to_string(),
desc: None,
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
meta: RawTableMeta {
schema: RawSchema::new(vec![ColumnSchema::new(
"a",
ConcreteDataType::string_datatype(),
true,
)]),
primary_key_indices: vec![],
value_indices: vec![],
engine: MITO_ENGINE.to_string(),
next_column_id: 1,
region_numbers: vec![1, 2, 3, 4],
options: TableOptions::default(),
created_on: DateTime::default(),
partition_key_indices: vec![],
},
table_type: TableType::Base,
};
let region_route_factory = |region_id: u64, peer: u64| RegionRoute {
region: Region {
id: region_id.into(),
..Default::default()
},
leader_peer: Some(Peer {
id: peer,
addr: String::new(),
}),
follower_peers: vec![],
};
// Region distribution:
// Datanode => Regions
// 1 => 1, 2
// 2 => 3
// 3 => 4
let region_routes = vec![
region_route_factory(1, 1),
region_route_factory(2, 1),
region_route_factory(3, 2),
region_route_factory(4, 3),
];
table_metadata_manager
.create_table_metadata(table_info, region_routes)
.await
.unwrap();
}

View File

@@ -20,7 +20,6 @@ pub mod manager;
pub mod metrics;
pub mod partition;
pub mod range;
pub mod route;
pub mod splitter;
pub use crate::partition::{PartitionRule, PartitionRuleRef};

View File

@@ -21,6 +21,7 @@ use common_meta::kv_backend::KvBackendRef;
use common_meta::peer::Peer;
use common_meta::rpc::router::{convert_to_region_map, RegionRoutes};
use common_query::prelude::Expr;
use common_telemetry::timer;
use datafusion_expr::{BinaryExpr, Expr as DfExpr, Operator};
use datatypes::prelude::Value;
use snafu::{ensure, OptionExt, ResultExt};
@@ -29,6 +30,7 @@ use table::metadata::TableId;
use crate::columns::RangeColumnsPartitionRule;
use crate::error::{FindLeaderSnafu, Result};
use crate::metrics::METRIC_TABLE_ROUTE_GET;
use crate::partition::{PartitionBound, PartitionDef, PartitionExpr};
use crate::range::RangePartitionRule;
use crate::splitter::RowSplitter;
@@ -66,6 +68,7 @@ impl PartitionRuleManager {
/// Find table route of given table name.
pub async fn find_table_route(&self, table_id: TableId) -> Result<RegionRoutes> {
let _timer = timer!(METRIC_TABLE_ROUTE_GET);
let route = self
.table_route_manager
.get(table_id)

View File

@@ -12,5 +12,4 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) const METRIC_TABLE_ROUTE_GET_REMOTE: &str = "frontend.table_route.get.remote";
pub(crate) const METRIC_TABLE_ROUTE_GET: &str = "frontend.table_route.get";

View File

@@ -1,90 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use std::time::Duration;
use common_meta::rpc::router::{RouteRequest, TableRoute};
use common_telemetry::timer;
use meta_client::client::MetaClient;
use moka::future::{Cache, CacheBuilder};
use snafu::{ensure, ResultExt};
use table::metadata::TableId;
use crate::error::{self, Result};
use crate::metrics;
type TableRouteCache = Cache<TableId, Arc<TableRoute>>;
pub struct TableRoutes {
meta_client: Arc<MetaClient>,
cache: TableRouteCache,
}
// TODO(hl): maybe periodically refresh table route cache?
impl TableRoutes {
pub fn new(meta_client: Arc<MetaClient>) -> Self {
Self {
meta_client,
cache: CacheBuilder::new(1024)
.time_to_live(Duration::from_secs(30 * 60))
.time_to_idle(Duration::from_secs(5 * 60))
.build(),
}
}
pub async fn get_route(&self, table_id: TableId) -> Result<Arc<TableRoute>> {
let _timer = timer!(metrics::METRIC_TABLE_ROUTE_GET);
self.cache
.try_get_with_by_ref(&table_id, self.get_from_meta(table_id))
.await
.map_err(|e| {
error::GetCacheSnafu {
err_msg: format!("{e:?}"),
}
.build()
})
}
async fn get_from_meta(&self, table_id: TableId) -> Result<Arc<TableRoute>> {
let _timer = timer!(metrics::METRIC_TABLE_ROUTE_GET_REMOTE);
let mut resp = self
.meta_client
.route(RouteRequest {
table_ids: vec![table_id],
})
.await
.context(error::RequestMetaSnafu)?;
ensure!(
!resp.table_routes.is_empty(),
error::FindTableRoutesSnafu { table_id }
);
let route = resp.table_routes.swap_remove(0);
Ok(Arc::new(route))
}
pub async fn insert_table_route(&self, table_id: TableId, table_route: Arc<TableRoute>) {
self.cache.insert(table_id, table_route).await
}
pub async fn invalidate_table_route(&self, table_id: TableId) {
self.cache.invalidate(&table_id).await
}
pub fn cache(&self) -> &TableRouteCache {
&self.cache
}
}