refactor: move rpc's commons to common-meta (#1625)

This commit is contained in:
JeremyHi
2023-05-23 10:07:24 +08:00
committed by GitHub
parent 8e7ec4626b
commit 1225edb065
20 changed files with 66 additions and 56 deletions

1
Cargo.lock generated
View File

@@ -1186,6 +1186,7 @@ dependencies = [
"common-catalog",
"common-error",
"common-grpc",
"common-meta",
"common-query",
"common-recordbatch",
"common-runtime",

View File

@@ -14,6 +14,7 @@ backoff = { version = "0.4", features = ["tokio"] }
common-catalog = { path = "../common/catalog" }
common-error = { path = "../common/error" }
common-grpc = { path = "../common/grpc" }
common-meta = { path = "../common/meta" }
common-query = { path = "../common/query" }
common-recordbatch = { path = "../common/recordbatch" }
common-runtime = { path = "../common/runtime" }

View File

@@ -16,9 +16,9 @@ use std::fmt::Debug;
use std::sync::Arc;
use async_stream::stream;
use common_meta::rpc::store::{CompareAndPutRequest, DeleteRangeRequest, PutRequest, RangeRequest};
use common_telemetry::info;
use meta_client::client::MetaClient;
use meta_client::rpc::{CompareAndPutRequest, DeleteRangeRequest, PutRequest, RangeRequest};
use snafu::ResultExt;
use crate::error::{Error, MetaSrvSnafu};

View File

@@ -15,9 +15,8 @@
pub mod error;
pub mod instruction;
pub mod peer;
pub mod router;
pub mod rpc;
pub mod table_name;
pub mod util;
pub type ClusterId = u64;
pub type DatanodeId = u64;

View File

@@ -13,15 +13,11 @@
// limitations under the License.
pub mod lock;
mod store;
pub mod router;
pub mod store;
pub mod util;
use api::v1::meta::{KeyValue as PbKeyValue, ResponseHeader as PbResponseHeader};
pub use store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse,
RangeRequest, RangeResponse,
};
#[derive(Debug, Clone)]
pub struct ResponseHeader(PbResponseHeader);

View File

@@ -26,8 +26,8 @@ use table::metadata::RawTableInfo;
use crate::error::{self, Result};
use crate::peer::Peer;
use crate::rpc::util;
use crate::table_name::TableName;
use crate::util;
#[derive(Debug, Clone)]
pub struct CreateRequest<'a> {

View File

@@ -21,14 +21,12 @@ use api::v1::meta::{
DeleteRangeResponse as PbDeleteRangeResponse, KeyValue as PbKeyValue,
MoveValueRequest as PbMoveValueRequest, MoveValueResponse as PbMoveValueResponse,
PutRequest as PbPutRequest, PutResponse as PbPutResponse, RangeRequest as PbRangeRequest,
RangeResponse as PbRangeResponse, ResponseHeader as PbResponseHeader,
RangeResponse as PbRangeResponse,
};
use common_meta::util;
use snafu::ResultExt;
use crate::error;
use crate::error::{InvalidResponseHeaderSnafu, Result};
use crate::rpc::{KeyValue, ResponseHeader};
use crate::error::Result;
use crate::rpc::{util, KeyValue, ResponseHeader};
#[derive(Debug, Clone, Default)]
pub struct RangeRequest {
@@ -128,7 +126,7 @@ impl TryFrom<PbRangeResponse> for RangeResponse {
type Error = error::Error;
fn try_from(pb: PbRangeResponse) -> Result<Self> {
check_response_header(pb.header.as_ref())?;
util::check_response_header(pb.header.as_ref())?;
Ok(Self::new(pb))
}
@@ -220,7 +218,7 @@ impl TryFrom<PbPutResponse> for PutResponse {
type Error = error::Error;
fn try_from(pb: PbPutResponse) -> Result<Self> {
check_response_header(pb.header.as_ref())?;
util::check_response_header(pb.header.as_ref())?;
Ok(Self::new(pb))
}
@@ -282,7 +280,7 @@ impl TryFrom<PbBatchGetResponse> for BatchGetResponse {
type Error = error::Error;
fn try_from(pb: PbBatchGetResponse) -> Result<Self> {
check_response_header(pb.header.as_ref())?;
util::check_response_header(pb.header.as_ref())?;
Ok(Self(pb))
}
@@ -357,7 +355,7 @@ impl TryFrom<PbBatchPutResponse> for BatchPutResponse {
type Error = error::Error;
fn try_from(pb: PbBatchPutResponse) -> Result<Self> {
check_response_header(pb.header.as_ref())?;
util::check_response_header(pb.header.as_ref())?;
Ok(Self::new(pb))
}
@@ -429,7 +427,7 @@ impl TryFrom<PbBatchDeleteResponse> for BatchDeleteResponse {
type Error = error::Error;
fn try_from(pb: PbBatchDeleteResponse) -> Result<Self> {
check_response_header(pb.header.as_ref())?;
util::check_response_header(pb.header.as_ref())?;
Ok(Self::new(pb))
}
@@ -513,7 +511,7 @@ impl TryFrom<PbCompareAndPutResponse> for CompareAndPutResponse {
type Error = error::Error;
fn try_from(pb: PbCompareAndPutResponse) -> Result<Self> {
check_response_header(pb.header.as_ref())?;
util::check_response_header(pb.header.as_ref())?;
Ok(Self::new(pb))
}
@@ -633,7 +631,7 @@ impl TryFrom<PbDeleteRangeResponse> for DeleteRangeResponse {
type Error = error::Error;
fn try_from(pb: PbDeleteRangeResponse) -> Result<Self> {
check_response_header(pb.header.as_ref())?;
util::check_response_header(pb.header.as_ref())?;
Ok(Self::new(pb))
}
@@ -697,7 +695,7 @@ impl TryFrom<PbMoveValueResponse> for MoveValueResponse {
type Error = error::Error;
fn try_from(pb: PbMoveValueResponse) -> Result<Self> {
check_response_header(pb.header.as_ref())?;
util::check_response_header(pb.header.as_ref())?;
Ok(Self::new(pb))
}
@@ -720,10 +718,6 @@ impl MoveValueResponse {
}
}
fn check_response_header(header: Option<&PbResponseHeader>) -> Result<()> {
util::check_response_header(header).context(InvalidResponseHeaderSnafu)
}
#[cfg(test)]
mod tests {
use api::v1::meta::{

View File

@@ -30,10 +30,11 @@ use client::Database;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::format_full_table_name;
use common_error::prelude::BoxedError;
use common_meta::router::{
use common_meta::rpc::router::{
CreateRequest as MetaCreateRequest, DeleteRequest as MetaDeleteRequest,
Partition as MetaPartition, RouteRequest, RouteResponse,
};
use common_meta::rpc::store::CompareAndPutRequest;
use common_meta::table_name::TableName;
use common_query::Output;
use common_telemetry::debug;
@@ -42,7 +43,6 @@ use datanode::sql::SqlHandler;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::RawSchema;
use meta_client::client::MetaClient;
use meta_client::rpc::CompareAndPutRequest;
use partition::manager::PartitionInfo;
use partition::partition::{PartitionBound, PartitionDef};
use query::error::QueryExecutionSnafu;

View File

@@ -498,7 +498,7 @@ mod test {
use catalog::error::Result;
use catalog::remote::{KvBackend, ValueIter};
use common_meta::router::{Region, RegionRoute, Table, TableRoute};
use common_meta::rpc::router::{Region, RegionRoute, Table, TableRoute};
use datafusion_expr::expr_fn::{and, binary_expr, col, or};
use datafusion_expr::{lit, Operator};
use meta_client::client::MetaClient;

View File

@@ -16,8 +16,8 @@ use std::time::Duration;
use api::v1::meta::Role;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::rpc::lock::{LockRequest, UnlockRequest};
use meta_client::client::{MetaClient, MetaClientBuilder};
use meta_client::rpc::lock::{LockRequest, UnlockRequest};
use tracing::{info, subscriber};
use tracing_subscriber::FmtSubscriber;

View File

@@ -18,15 +18,15 @@ use std::time::Duration;
use api::v1::meta::{HeartbeatRequest, Peer, Role};
use chrono::DateTime;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::router::{CreateRequest, Partition};
use common_meta::rpc::router::{CreateRequest, Partition};
use common_meta::rpc::store::{
BatchDeleteRequest, BatchGetRequest, BatchPutRequest, CompareAndPutRequest, DeleteRangeRequest,
PutRequest, RangeRequest,
};
use common_meta::table_name::TableName;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
use meta_client::client::MetaClientBuilder;
use meta_client::rpc::{
BatchDeleteRequest, BatchGetRequest, BatchPutRequest, CompareAndPutRequest, DeleteRangeRequest,
PutRequest, RangeRequest,
};
use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType};
use table::requests::TableOptions;
use tracing::{event, subscriber, Level};

View File

@@ -20,7 +20,14 @@ mod store;
use api::v1::meta::Role;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::router::{CreateRequest, DeleteRequest, RouteRequest, RouteResponse};
use common_meta::rpc::lock::{LockRequest, LockResponse, UnlockRequest};
use common_meta::rpc::router::{CreateRequest, DeleteRequest, RouteRequest, RouteResponse};
use common_meta::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse,
RangeRequest, RangeResponse,
};
use common_telemetry::info;
use heartbeat::Client as HeartbeatClient;
use lock::Client as LockClient;
@@ -31,13 +38,6 @@ use store::Client as StoreClient;
pub use self::heartbeat::{HeartbeatSender, HeartbeatStream};
use crate::error;
use crate::error::{ConvertMetaRequestSnafu, ConvertMetaResponseSnafu, Result};
use crate::rpc::lock::{LockRequest, LockResponse, UnlockRequest};
use crate::rpc::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse,
RangeRequest, RangeResponse,
};
pub type Id = (u64, u64);
@@ -251,22 +251,38 @@ impl MetaClient {
/// Range gets the keys in the range from the key-value store.
pub async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
self.store_client()?.range(req.into()).await?.try_into()
self.store_client()?
.range(req.into())
.await?
.try_into()
.context(ConvertMetaResponseSnafu)
}
/// Put puts the given key into the key-value store.
pub async fn put(&self, req: PutRequest) -> Result<PutResponse> {
self.store_client()?.put(req.into()).await?.try_into()
self.store_client()?
.put(req.into())
.await?
.try_into()
.context(ConvertMetaResponseSnafu)
}
/// BatchGet atomically get values by the given keys from the key-value store.
pub async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
self.store_client()?.batch_get(req.into()).await?.try_into()
self.store_client()?
.batch_get(req.into())
.await?
.try_into()
.context(ConvertMetaResponseSnafu)
}
/// BatchPut atomically puts the given keys into the key-value store.
pub async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
self.store_client()?.batch_put(req.into()).await?.try_into()
self.store_client()?
.batch_put(req.into())
.await?
.try_into()
.context(ConvertMetaResponseSnafu)
}
/// BatchDelete atomically deletes the given keys from the key-value store.
@@ -275,6 +291,7 @@ impl MetaClient {
.batch_delete(req.into())
.await?
.try_into()
.context(ConvertMetaResponseSnafu)
}
/// CompareAndPut atomically puts the value to the given updated
@@ -287,6 +304,7 @@ impl MetaClient {
.compare_and_put(req.into())
.await?
.try_into()
.context(ConvertMetaResponseSnafu)
}
/// DeleteRange deletes the given range from the key-value store.
@@ -295,6 +313,7 @@ impl MetaClient {
.delete_range(req.into())
.await?
.try_into()
.context(ConvertMetaResponseSnafu)
}
/// MoveValue atomically renames the key to the given updated key.
@@ -303,6 +322,7 @@ impl MetaClient {
.move_value(req.into())
.await?
.try_into()
.context(ConvertMetaResponseSnafu)
}
pub async fn lock(&self, req: LockRequest) -> Result<LockResponse> {
@@ -360,7 +380,7 @@ mod tests {
use api::v1::meta::{HeartbeatRequest, Peer};
use chrono::DateTime;
use common_meta::router::Partition;
use common_meta::rpc::router::Partition;
use common_meta::table_name::TableName;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};

View File

@@ -18,7 +18,7 @@ use std::sync::Arc;
use api::v1::meta::heartbeat_client::HeartbeatClient;
use api::v1::meta::{AskLeaderRequest, HeartbeatRequest, HeartbeatResponse, RequestHeader, Role};
use common_grpc::channel_manager::ChannelManager;
use common_meta::util;
use common_meta::rpc::util;
use common_telemetry::{debug, info};
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::{mpsc, RwLock};

View File

@@ -18,7 +18,6 @@ pub mod client;
pub mod error;
#[cfg(test)]
mod mocks;
pub mod rpc;
// Options for meta client in datanode instance.
#[derive(Clone, Debug, Serialize, Deserialize)]

View File

@@ -16,7 +16,7 @@ use api::v1::meta::{TableName, TableRouteValue};
use async_trait::async_trait;
use catalog::helper::TableGlobalKey;
use common_meta::peer::Peer;
use common_meta::router::TableRoute;
use common_meta::rpc::router::TableRoute;
use common_meta::RegionIdent;
use common_telemetry::info;
use serde::{Deserialize, Serialize};

View File

@@ -16,7 +16,7 @@ use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use common_meta::peer::Peer;
use common_meta::router::TableRoute;
use common_meta::rpc::router::TableRoute;
use common_meta::table_name::TableName;
use common_query::prelude::Expr;
use datafusion_expr::{BinaryExpr, Expr as DfExpr, Operator};

View File

@@ -16,7 +16,7 @@ use std::any::Any;
use std::fmt::Debug;
use std::sync::Arc;
use common_meta::router::Partition as MetaPartition;
use common_meta::rpc::router::Partition as MetaPartition;
use datafusion_expr::Operator;
use datatypes::prelude::Value;
use serde::{Deserialize, Serialize};

View File

@@ -15,7 +15,7 @@
use std::sync::Arc;
use std::time::Duration;
use common_meta::router::{RouteRequest, TableRoute};
use common_meta::rpc::router::{RouteRequest, TableRoute};
use common_meta::table_name::TableName;
use meta_client::client::MetaClient;
use moka::future::{Cache, CacheBuilder};