From 1225edb065634afbf905685a6493f8f753659044 Mon Sep 17 00:00:00 2001 From: JeremyHi Date: Tue, 23 May 2023 10:07:24 +0800 Subject: [PATCH] refactor: move rpc's commons to common-meta (#1625) --- Cargo.lock | 1 + src/catalog/Cargo.toml | 1 + src/catalog/src/remote/client.rs | 2 +- src/common/meta/src/lib.rs | 3 +- src/{meta-client => common/meta}/src/rpc.rs | 10 ++-- .../meta}/src/rpc/lock.rs | 0 src/common/meta/src/{ => rpc}/router.rs | 2 +- .../meta}/src/rpc/store.rs | 28 +++++------ src/common/meta/src/{ => rpc}/util.rs | 0 src/frontend/src/instance/distributed.rs | 4 +- src/frontend/src/table.rs | 2 +- src/meta-client/examples/lock.rs | 2 +- src/meta-client/examples/meta_client.rs | 10 ++-- src/meta-client/src/client.rs | 46 +++++++++++++------ src/meta-client/src/client/heartbeat.rs | 2 +- src/meta-client/src/lib.rs | 1 - .../region_failover/update_metadata.rs | 2 +- src/partition/src/manager.rs | 2 +- src/partition/src/partition.rs | 2 +- src/partition/src/route.rs | 2 +- 20 files changed, 66 insertions(+), 56 deletions(-) rename src/{meta-client => common/meta}/src/rpc.rs (88%) rename src/{meta-client => common/meta}/src/rpc/lock.rs (100%) rename src/common/meta/src/{ => rpc}/router.rs (99%) rename src/{meta-client => common/meta}/src/rpc/store.rs (97%) rename src/common/meta/src/{ => rpc}/util.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index 339c817b34..9af0abaa4b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1186,6 +1186,7 @@ dependencies = [ "common-catalog", "common-error", "common-grpc", + "common-meta", "common-query", "common-recordbatch", "common-runtime", diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index 2c37136c08..5ceb7b2954 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -14,6 +14,7 @@ backoff = { version = "0.4", features = ["tokio"] } common-catalog = { path = "../common/catalog" } common-error = { path = "../common/error" } common-grpc = { path = "../common/grpc" } +common-meta = { path = "../common/meta" } common-query = { path = "../common/query" } common-recordbatch = { path = "../common/recordbatch" } common-runtime = { path = "../common/runtime" } diff --git a/src/catalog/src/remote/client.rs b/src/catalog/src/remote/client.rs index be855a92ac..b981a89bea 100644 --- a/src/catalog/src/remote/client.rs +++ b/src/catalog/src/remote/client.rs @@ -16,9 +16,9 @@ use std::fmt::Debug; use std::sync::Arc; use async_stream::stream; +use common_meta::rpc::store::{CompareAndPutRequest, DeleteRangeRequest, PutRequest, RangeRequest}; use common_telemetry::info; use meta_client::client::MetaClient; -use meta_client::rpc::{CompareAndPutRequest, DeleteRangeRequest, PutRequest, RangeRequest}; use snafu::ResultExt; use crate::error::{Error, MetaSrvSnafu}; diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index 9c8a5db6f3..35ce7a7d58 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -15,9 +15,8 @@ pub mod error; pub mod instruction; pub mod peer; -pub mod router; +pub mod rpc; pub mod table_name; -pub mod util; pub type ClusterId = u64; pub type DatanodeId = u64; diff --git a/src/meta-client/src/rpc.rs b/src/common/meta/src/rpc.rs similarity index 88% rename from src/meta-client/src/rpc.rs rename to src/common/meta/src/rpc.rs index 1a1935b021..8225b9330e 100644 --- a/src/meta-client/src/rpc.rs +++ b/src/common/meta/src/rpc.rs @@ -13,15 +13,11 @@ // limitations under the License. pub mod lock; -mod store; +pub mod router; +pub mod store; +pub mod util; use api::v1::meta::{KeyValue as PbKeyValue, ResponseHeader as PbResponseHeader}; -pub use store::{ - BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, - BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest, - DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse, - RangeRequest, RangeResponse, -}; #[derive(Debug, Clone)] pub struct ResponseHeader(PbResponseHeader); diff --git a/src/meta-client/src/rpc/lock.rs b/src/common/meta/src/rpc/lock.rs similarity index 100% rename from src/meta-client/src/rpc/lock.rs rename to src/common/meta/src/rpc/lock.rs diff --git a/src/common/meta/src/router.rs b/src/common/meta/src/rpc/router.rs similarity index 99% rename from src/common/meta/src/router.rs rename to src/common/meta/src/rpc/router.rs index 71332d47f2..3df29d6d4a 100644 --- a/src/common/meta/src/router.rs +++ b/src/common/meta/src/rpc/router.rs @@ -26,8 +26,8 @@ use table::metadata::RawTableInfo; use crate::error::{self, Result}; use crate::peer::Peer; +use crate::rpc::util; use crate::table_name::TableName; -use crate::util; #[derive(Debug, Clone)] pub struct CreateRequest<'a> { diff --git a/src/meta-client/src/rpc/store.rs b/src/common/meta/src/rpc/store.rs similarity index 97% rename from src/meta-client/src/rpc/store.rs rename to src/common/meta/src/rpc/store.rs index a69ec24628..f3e8496d98 100644 --- a/src/meta-client/src/rpc/store.rs +++ b/src/common/meta/src/rpc/store.rs @@ -21,14 +21,12 @@ use api::v1::meta::{ DeleteRangeResponse as PbDeleteRangeResponse, KeyValue as PbKeyValue, MoveValueRequest as PbMoveValueRequest, MoveValueResponse as PbMoveValueResponse, PutRequest as PbPutRequest, PutResponse as PbPutResponse, RangeRequest as PbRangeRequest, - RangeResponse as PbRangeResponse, ResponseHeader as PbResponseHeader, + RangeResponse as PbRangeResponse, }; -use common_meta::util; -use snafu::ResultExt; use crate::error; -use crate::error::{InvalidResponseHeaderSnafu, Result}; -use crate::rpc::{KeyValue, ResponseHeader}; +use crate::error::Result; +use crate::rpc::{util, KeyValue, ResponseHeader}; #[derive(Debug, Clone, Default)] pub struct RangeRequest { @@ -128,7 +126,7 @@ impl TryFrom for RangeResponse { type Error = error::Error; fn try_from(pb: PbRangeResponse) -> Result { - check_response_header(pb.header.as_ref())?; + util::check_response_header(pb.header.as_ref())?; Ok(Self::new(pb)) } @@ -220,7 +218,7 @@ impl TryFrom for PutResponse { type Error = error::Error; fn try_from(pb: PbPutResponse) -> Result { - check_response_header(pb.header.as_ref())?; + util::check_response_header(pb.header.as_ref())?; Ok(Self::new(pb)) } @@ -282,7 +280,7 @@ impl TryFrom for BatchGetResponse { type Error = error::Error; fn try_from(pb: PbBatchGetResponse) -> Result { - check_response_header(pb.header.as_ref())?; + util::check_response_header(pb.header.as_ref())?; Ok(Self(pb)) } @@ -357,7 +355,7 @@ impl TryFrom for BatchPutResponse { type Error = error::Error; fn try_from(pb: PbBatchPutResponse) -> Result { - check_response_header(pb.header.as_ref())?; + util::check_response_header(pb.header.as_ref())?; Ok(Self::new(pb)) } @@ -429,7 +427,7 @@ impl TryFrom for BatchDeleteResponse { type Error = error::Error; fn try_from(pb: PbBatchDeleteResponse) -> Result { - check_response_header(pb.header.as_ref())?; + util::check_response_header(pb.header.as_ref())?; Ok(Self::new(pb)) } @@ -513,7 +511,7 @@ impl TryFrom for CompareAndPutResponse { type Error = error::Error; fn try_from(pb: PbCompareAndPutResponse) -> Result { - check_response_header(pb.header.as_ref())?; + util::check_response_header(pb.header.as_ref())?; Ok(Self::new(pb)) } @@ -633,7 +631,7 @@ impl TryFrom for DeleteRangeResponse { type Error = error::Error; fn try_from(pb: PbDeleteRangeResponse) -> Result { - check_response_header(pb.header.as_ref())?; + util::check_response_header(pb.header.as_ref())?; Ok(Self::new(pb)) } @@ -697,7 +695,7 @@ impl TryFrom for MoveValueResponse { type Error = error::Error; fn try_from(pb: PbMoveValueResponse) -> Result { - check_response_header(pb.header.as_ref())?; + util::check_response_header(pb.header.as_ref())?; Ok(Self::new(pb)) } @@ -720,10 +718,6 @@ impl MoveValueResponse { } } -fn check_response_header(header: Option<&PbResponseHeader>) -> Result<()> { - util::check_response_header(header).context(InvalidResponseHeaderSnafu) -} - #[cfg(test)] mod tests { use api::v1::meta::{ diff --git a/src/common/meta/src/util.rs b/src/common/meta/src/rpc/util.rs similarity index 100% rename from src/common/meta/src/util.rs rename to src/common/meta/src/rpc/util.rs diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index 00e03a5951..c9e6512da4 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -30,10 +30,11 @@ use client::Database; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_catalog::format_full_table_name; use common_error::prelude::BoxedError; -use common_meta::router::{ +use common_meta::rpc::router::{ CreateRequest as MetaCreateRequest, DeleteRequest as MetaDeleteRequest, Partition as MetaPartition, RouteRequest, RouteResponse, }; +use common_meta::rpc::store::CompareAndPutRequest; use common_meta::table_name::TableName; use common_query::Output; use common_telemetry::debug; @@ -42,7 +43,6 @@ use datanode::sql::SqlHandler; use datatypes::prelude::ConcreteDataType; use datatypes::schema::RawSchema; use meta_client::client::MetaClient; -use meta_client::rpc::CompareAndPutRequest; use partition::manager::PartitionInfo; use partition::partition::{PartitionBound, PartitionDef}; use query::error::QueryExecutionSnafu; diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 8a467a78ec..2b9660a16e 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -498,7 +498,7 @@ mod test { use catalog::error::Result; use catalog::remote::{KvBackend, ValueIter}; - use common_meta::router::{Region, RegionRoute, Table, TableRoute}; + use common_meta::rpc::router::{Region, RegionRoute, Table, TableRoute}; use datafusion_expr::expr_fn::{and, binary_expr, col, or}; use datafusion_expr::{lit, Operator}; use meta_client::client::MetaClient; diff --git a/src/meta-client/examples/lock.rs b/src/meta-client/examples/lock.rs index 121f3719fc..89a50affe8 100644 --- a/src/meta-client/examples/lock.rs +++ b/src/meta-client/examples/lock.rs @@ -16,8 +16,8 @@ use std::time::Duration; use api::v1::meta::Role; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; +use common_meta::rpc::lock::{LockRequest, UnlockRequest}; use meta_client::client::{MetaClient, MetaClientBuilder}; -use meta_client::rpc::lock::{LockRequest, UnlockRequest}; use tracing::{info, subscriber}; use tracing_subscriber::FmtSubscriber; diff --git a/src/meta-client/examples/meta_client.rs b/src/meta-client/examples/meta_client.rs index c8fbc4b1ac..9e3f7e1a23 100644 --- a/src/meta-client/examples/meta_client.rs +++ b/src/meta-client/examples/meta_client.rs @@ -18,15 +18,15 @@ use std::time::Duration; use api::v1::meta::{HeartbeatRequest, Peer, Role}; use chrono::DateTime; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; -use common_meta::router::{CreateRequest, Partition}; +use common_meta::rpc::router::{CreateRequest, Partition}; +use common_meta::rpc::store::{ + BatchDeleteRequest, BatchGetRequest, BatchPutRequest, CompareAndPutRequest, DeleteRangeRequest, + PutRequest, RangeRequest, +}; use common_meta::table_name::TableName; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, RawSchema}; use meta_client::client::MetaClientBuilder; -use meta_client::rpc::{ - BatchDeleteRequest, BatchGetRequest, BatchPutRequest, CompareAndPutRequest, DeleteRangeRequest, - PutRequest, RangeRequest, -}; use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType}; use table::requests::TableOptions; use tracing::{event, subscriber, Level}; diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index e978d56405..a893be1311 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -20,7 +20,14 @@ mod store; use api::v1::meta::Role; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; -use common_meta::router::{CreateRequest, DeleteRequest, RouteRequest, RouteResponse}; +use common_meta::rpc::lock::{LockRequest, LockResponse, UnlockRequest}; +use common_meta::rpc::router::{CreateRequest, DeleteRequest, RouteRequest, RouteResponse}; +use common_meta::rpc::store::{ + BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, + BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest, + DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse, + RangeRequest, RangeResponse, +}; use common_telemetry::info; use heartbeat::Client as HeartbeatClient; use lock::Client as LockClient; @@ -31,13 +38,6 @@ use store::Client as StoreClient; pub use self::heartbeat::{HeartbeatSender, HeartbeatStream}; use crate::error; use crate::error::{ConvertMetaRequestSnafu, ConvertMetaResponseSnafu, Result}; -use crate::rpc::lock::{LockRequest, LockResponse, UnlockRequest}; -use crate::rpc::{ - BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, - BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest, - DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse, - RangeRequest, RangeResponse, -}; pub type Id = (u64, u64); @@ -251,22 +251,38 @@ impl MetaClient { /// Range gets the keys in the range from the key-value store. pub async fn range(&self, req: RangeRequest) -> Result { - self.store_client()?.range(req.into()).await?.try_into() + self.store_client()? + .range(req.into()) + .await? + .try_into() + .context(ConvertMetaResponseSnafu) } /// Put puts the given key into the key-value store. pub async fn put(&self, req: PutRequest) -> Result { - self.store_client()?.put(req.into()).await?.try_into() + self.store_client()? + .put(req.into()) + .await? + .try_into() + .context(ConvertMetaResponseSnafu) } /// BatchGet atomically get values by the given keys from the key-value store. pub async fn batch_get(&self, req: BatchGetRequest) -> Result { - self.store_client()?.batch_get(req.into()).await?.try_into() + self.store_client()? + .batch_get(req.into()) + .await? + .try_into() + .context(ConvertMetaResponseSnafu) } /// BatchPut atomically puts the given keys into the key-value store. pub async fn batch_put(&self, req: BatchPutRequest) -> Result { - self.store_client()?.batch_put(req.into()).await?.try_into() + self.store_client()? + .batch_put(req.into()) + .await? + .try_into() + .context(ConvertMetaResponseSnafu) } /// BatchDelete atomically deletes the given keys from the key-value store. @@ -275,6 +291,7 @@ impl MetaClient { .batch_delete(req.into()) .await? .try_into() + .context(ConvertMetaResponseSnafu) } /// CompareAndPut atomically puts the value to the given updated @@ -287,6 +304,7 @@ impl MetaClient { .compare_and_put(req.into()) .await? .try_into() + .context(ConvertMetaResponseSnafu) } /// DeleteRange deletes the given range from the key-value store. @@ -295,6 +313,7 @@ impl MetaClient { .delete_range(req.into()) .await? .try_into() + .context(ConvertMetaResponseSnafu) } /// MoveValue atomically renames the key to the given updated key. @@ -303,6 +322,7 @@ impl MetaClient { .move_value(req.into()) .await? .try_into() + .context(ConvertMetaResponseSnafu) } pub async fn lock(&self, req: LockRequest) -> Result { @@ -360,7 +380,7 @@ mod tests { use api::v1::meta::{HeartbeatRequest, Peer}; use chrono::DateTime; - use common_meta::router::Partition; + use common_meta::rpc::router::Partition; use common_meta::table_name::TableName; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, RawSchema}; diff --git a/src/meta-client/src/client/heartbeat.rs b/src/meta-client/src/client/heartbeat.rs index 8e13671ee5..1d8ff1bbbb 100644 --- a/src/meta-client/src/client/heartbeat.rs +++ b/src/meta-client/src/client/heartbeat.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use api::v1::meta::heartbeat_client::HeartbeatClient; use api::v1::meta::{AskLeaderRequest, HeartbeatRequest, HeartbeatResponse, RequestHeader, Role}; use common_grpc::channel_manager::ChannelManager; -use common_meta::util; +use common_meta::rpc::util; use common_telemetry::{debug, info}; use snafu::{ensure, OptionExt, ResultExt}; use tokio::sync::{mpsc, RwLock}; diff --git a/src/meta-client/src/lib.rs b/src/meta-client/src/lib.rs index 1754380f2b..f8de47c7f8 100644 --- a/src/meta-client/src/lib.rs +++ b/src/meta-client/src/lib.rs @@ -18,7 +18,6 @@ pub mod client; pub mod error; #[cfg(test)] mod mocks; -pub mod rpc; // Options for meta client in datanode instance. #[derive(Clone, Debug, Serialize, Deserialize)] diff --git a/src/meta-srv/src/procedure/region_failover/update_metadata.rs b/src/meta-srv/src/procedure/region_failover/update_metadata.rs index 36570cf5fe..42dce8b427 100644 --- a/src/meta-srv/src/procedure/region_failover/update_metadata.rs +++ b/src/meta-srv/src/procedure/region_failover/update_metadata.rs @@ -16,7 +16,7 @@ use api::v1::meta::{TableName, TableRouteValue}; use async_trait::async_trait; use catalog::helper::TableGlobalKey; use common_meta::peer::Peer; -use common_meta::router::TableRoute; +use common_meta::rpc::router::TableRoute; use common_meta::RegionIdent; use common_telemetry::info; use serde::{Deserialize, Serialize}; diff --git a/src/partition/src/manager.rs b/src/partition/src/manager.rs index 3ceeb61f20..e1f941f67c 100644 --- a/src/partition/src/manager.rs +++ b/src/partition/src/manager.rs @@ -16,7 +16,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; use common_meta::peer::Peer; -use common_meta::router::TableRoute; +use common_meta::rpc::router::TableRoute; use common_meta::table_name::TableName; use common_query::prelude::Expr; use datafusion_expr::{BinaryExpr, Expr as DfExpr, Operator}; diff --git a/src/partition/src/partition.rs b/src/partition/src/partition.rs index 583fb87de2..6db2aa5d86 100644 --- a/src/partition/src/partition.rs +++ b/src/partition/src/partition.rs @@ -16,7 +16,7 @@ use std::any::Any; use std::fmt::Debug; use std::sync::Arc; -use common_meta::router::Partition as MetaPartition; +use common_meta::rpc::router::Partition as MetaPartition; use datafusion_expr::Operator; use datatypes::prelude::Value; use serde::{Deserialize, Serialize}; diff --git a/src/partition/src/route.rs b/src/partition/src/route.rs index 5dcbc2e730..851761abd1 100644 --- a/src/partition/src/route.rs +++ b/src/partition/src/route.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use std::time::Duration; -use common_meta::router::{RouteRequest, TableRoute}; +use common_meta::rpc::router::{RouteRequest, TableRoute}; use common_meta::table_name::TableName; use meta_client::client::MetaClient; use moka::future::{Cache, CacheBuilder};