feat: add ddl client (#1856)

* feat: add ddl client

* chore: apply suggestions from CR

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2023-07-04 20:32:02 +09:00
committed by GitHub
parent 884731a2c8
commit 000df8cf1e
5 changed files with 179 additions and 7 deletions

2
Cargo.lock generated
View File

@@ -4108,7 +4108,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=7aeaeaba1e0ca6a5c736b6ab2eb63144ae3d284b#7aeaeaba1e0ca6a5c736b6ab2eb63144ae3d284b"
source = "git+https://github.com/WenyXu/greptime-proto.git?rev=aab7d9a35900f995f9328c8588781e4d75253cba#aab7d9a35900f995f9328c8588781e4d75253cba"
dependencies = [
"prost",
"serde",

View File

@@ -73,7 +73,7 @@ datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "63e52dde9e44cac4b1f6c6e6b6bf6368ba3bd323" }
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "7aeaeaba1e0ca6a5c736b6ab2eb63144ae3d284b" }
greptime-proto = { git = "https://github.com/WenyXu/greptime-proto.git", rev = "aab7d9a35900f995f9328c8588781e4d75253cba" }
itertools = "0.10"
parquet = "40.0"
paste = "1.0"

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod ddl;
mod heartbeat;
mod load_balance;
mod lock;
@@ -29,6 +30,7 @@ use common_meta::rpc::store::{
RangeRequest, RangeResponse,
};
use common_telemetry::info;
use ddl::Client as DdlClient;
use heartbeat::Client as HeartbeatClient;
use lock::Client as LockClient;
use router::Client as RouterClient;
@@ -49,6 +51,7 @@ pub struct MetaClientBuilder {
enable_router: bool,
enable_store: bool,
enable_lock: bool,
enable_ddl: bool,
channel_manager: Option<ChannelManager>,
}
@@ -89,6 +92,13 @@ impl MetaClientBuilder {
}
}
pub fn enable_ddl(self) -> Self {
Self {
enable_ddl: true,
..self
}
}
pub fn channel_manager(self, channel_manager: ChannelManager) -> Self {
Self {
channel_manager: Some(channel_manager),
@@ -119,7 +129,10 @@ impl MetaClientBuilder {
client.store = Some(StoreClient::new(self.id, self.role, mgr.clone()));
}
if self.enable_lock {
client.lock = Some(LockClient::new(self.id, self.role, mgr));
client.lock = Some(LockClient::new(self.id, self.role, mgr.clone()));
}
if self.enable_ddl {
client.ddl = Some(DdlClient::new(self.id, self.role, mgr));
}
client
@@ -134,6 +147,7 @@ pub struct MetaClient {
router: Option<RouterClient>,
store: Option<StoreClient>,
lock: Option<LockClient>,
ddl: Option<DdlClient>,
}
impl MetaClient {

View File

@@ -0,0 +1,145 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use api::v1::meta::ddl_task_client::DdlTaskClient;
use api::v1::meta::{ErrorCode, Role, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_grpc::channel_manager::ChannelManager;
use snafu::{ensure, ResultExt};
use tokio::sync::RwLock;
use tonic::transport::Channel;
use crate::client::heartbeat::Inner as HeartbeatInner;
use crate::client::Id;
use crate::error;
use crate::error::Result;
#[derive(Clone, Debug)]
// TODO(weny): removes this in following PRs.
#[allow(unused)]
pub struct Client {
inner: Arc<RwLock<Inner>>,
}
// TODO(weny): removes this in following PRs.
#[allow(dead_code)]
impl Client {
pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
let inner = Arc::new(RwLock::new(Inner {
id,
role,
channel_manager: channel_manager.clone(),
heartbeat_inner: HeartbeatInner::new(id, role, channel_manager),
}));
Self { inner }
}
pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
where
U: AsRef<str>,
A: AsRef<[U]>,
{
let mut inner = self.inner.write().await;
inner.start(urls).await
}
pub async fn is_started(&self) -> bool {
let inner = self.inner.read().await;
inner.is_started()
}
pub async fn submit_ddl_task(
&self,
req: SubmitDdlTaskRequest,
) -> Result<SubmitDdlTaskResponse> {
let mut inner = self.inner.write().await;
inner.submit_ddl_task(req).await
}
}
#[derive(Debug)]
// TODO(weny): removes this in following PRs.
#[allow(unused)]
struct Inner {
id: Id,
role: Role,
channel_manager: ChannelManager,
heartbeat_inner: HeartbeatInner,
}
impl Inner {
async fn start<U, A>(&mut self, urls: A) -> Result<()>
where
U: AsRef<str>,
A: AsRef<[U]>,
{
ensure!(
!self.is_started(),
error::IllegalGrpcClientStateSnafu {
err_msg: "Router client already started",
}
);
self.heartbeat_inner.start(urls).await?;
Ok(())
}
fn make_client(&self, addr: impl AsRef<str>) -> Result<DdlTaskClient<Channel>> {
let channel = self
.channel_manager
.get(addr)
.context(error::CreateChannelSnafu)?;
Ok(DdlTaskClient::new(channel))
}
#[inline]
fn is_started(&self) -> bool {
self.heartbeat_inner.is_started()
}
pub async fn submit_ddl_task(
&mut self,
mut req: SubmitDdlTaskRequest,
) -> Result<SubmitDdlTaskResponse> {
req.set_header(self.id, self.role);
loop {
if let Some(leader) = &self.heartbeat_inner.get_leader() {
let mut client = self.make_client(leader)?;
let res = client
.submit_ddl_task(req.clone())
.await
.context(error::TonicStatusSnafu)?;
let res = res.into_inner();
if let Some(header) = res.header.as_ref() {
if let Some(err) = header.error.as_ref() {
if err.code == ErrorCode::NotLeader as i32 {
self.heartbeat_inner.ask_leader().await?;
continue;
}
}
}
return Ok(res);
} else if let Err(err) = self.heartbeat_inner.ask_leader().await {
return Err(err);
}
}
}
}

View File

@@ -133,7 +133,7 @@ impl Client {
}
#[derive(Debug)]
struct Inner {
pub(crate) struct Inner {
id: Id,
role: Role,
channel_manager: ChannelManager,
@@ -142,7 +142,16 @@ struct Inner {
}
impl Inner {
async fn start<U, A>(&mut self, urls: A) -> Result<()>
pub(crate) fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
Self {
id,
role,
channel_manager,
peers: HashSet::new(),
leader: None,
}
}
pub(crate) async fn start<U, A>(&mut self, urls: A) -> Result<()>
where
U: AsRef<str>,
A: AsRef<[U]>,
@@ -163,7 +172,11 @@ impl Inner {
Ok(())
}
async fn ask_leader(&mut self) -> Result<()> {
pub(crate) fn get_leader(&self) -> Option<String> {
self.leader.clone()
}
pub(crate) async fn ask_leader(&mut self) -> Result<()> {
ensure!(
self.is_started(),
error::IllegalGrpcClientStateSnafu {
@@ -242,7 +255,7 @@ impl Inner {
}
#[inline]
fn is_started(&self) -> bool {
pub(crate) fn is_started(&self) -> bool {
!self.peers.is_empty()
}
}