feat: drop distributed Mito2 table (#2260)

* feat: drop distributed Mito2 table

* rebase develop

* fix: resolve PR comments

* fix: resolve PR comments
This commit is contained in:
LFC
2023-09-01 11:47:32 +08:00
committed by Ruihang Xia
parent fa542f6e93
commit ff3881f0e1
6 changed files with 349 additions and 184 deletions

View File

@@ -56,6 +56,7 @@ pub enum StatusCode {
TableColumnNotFound = 4002,
TableColumnExists = 4003,
DatabaseNotFound = 4004,
RegionNotFound = 4005,
// ====== End of catalog related status code =======
// ====== Begin of storage related status code =====
@@ -113,6 +114,7 @@ impl StatusCode {
| StatusCode::EngineExecuteQuery
| StatusCode::TableAlreadyExists
| StatusCode::TableNotFound
| StatusCode::RegionNotFound
| StatusCode::TableColumnNotFound
| StatusCode::TableColumnExists
| StatusCode::DatabaseNotFound
@@ -145,6 +147,7 @@ impl StatusCode {
| StatusCode::InvalidSyntax
| StatusCode::TableAlreadyExists
| StatusCode::TableNotFound
| StatusCode::RegionNotFound
| StatusCode::TableColumnNotFound
| StatusCode::TableColumnExists
| StatusCode::DatabaseNotFound
@@ -173,6 +176,7 @@ impl StatusCode {
v if v == StatusCode::EngineExecuteQuery as u32 => Some(StatusCode::EngineExecuteQuery),
v if v == StatusCode::TableAlreadyExists as u32 => Some(StatusCode::TableAlreadyExists),
v if v == StatusCode::TableNotFound as u32 => Some(StatusCode::TableNotFound),
v if v == StatusCode::RegionNotFound as u32 => Some(StatusCode::RegionNotFound),
v if v == StatusCode::TableColumnNotFound as u32 => {
Some(StatusCode::TableColumnNotFound)
}

View File

@@ -20,3 +20,4 @@ pub(crate) const METRIC_META_HEARTBEAT_CONNECTION_NUM: &str = "meta.heartbeat_co
pub(crate) const METRIC_META_HANDLER_EXECUTE: &str = "meta.handler_execute";
pub(crate) const METRIC_META_PROCEDURE_CREATE_TABLE: &str = "meta.procedure.create_table";
pub(crate) const METRIC_META_PROCEDURE_DROP_TABLE: &str = "meta.procedure.drop_table";

View File

@@ -187,7 +187,7 @@ impl CreateTableProcedure {
let request_template = self.create_region_request_template()?;
let leaders = find_leaders(region_routes);
let mut create_table_tasks = Vec::with_capacity(leaders.len());
let mut create_region_tasks = Vec::with_capacity(leaders.len());
for datanode in leaders {
let clients = self.context.datanode_clients.clone();
@@ -198,16 +198,16 @@ impl CreateTableProcedure {
.map(|region_number| {
let region_id = RegionId::new(self.table_id(), *region_number);
let mut create_table_request = request_template.clone();
create_table_request.region_id = region_id.as_u64();
create_table_request.catalog = catalog.to_string();
create_table_request.schema = schema.to_string();
let mut create_region_request = request_template.clone();
create_region_request.region_id = region_id.as_u64();
create_region_request.catalog = catalog.to_string();
create_region_request.schema = schema.to_string();
PbRegionRequest::Create(create_table_request)
PbRegionRequest::Create(create_region_request)
})
.collect::<Vec<_>>();
create_table_tasks.push(common_runtime::spawn_bg(async move {
create_region_tasks.push(async move {
for request in requests {
let client = clients.get_client(&datanode).await;
let requester = RegionRequester::new(client);
@@ -217,13 +217,12 @@ impl CreateTableProcedure {
}
}
Ok(())
}));
});
}
join_all(create_table_tasks)
join_all(create_region_tasks)
.await
.into_iter()
.map(|e| e.context(error::JoinSnafu).flatten())
.collect::<Result<Vec<_>>>()?;
self.creator.data.state = CreateTableState::CreateMetadata;
@@ -372,34 +371,11 @@ mod test {
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Mutex};
use api::v1::region::region_server::RegionServer;
use api::v1::region::RegionResponse;
use api::v1::{
ColumnDataType, ColumnDef as PbColumnDef, CreateTableExpr, ResponseHeader,
Status as PbStatus,
};
use chrono::DateTime;
use client::client_manager::DatanodeClients;
use client::Client;
use common_grpc::channel_manager::ChannelManager;
use common_meta::key::TableMetadataManager;
use common_meta::peer::Peer;
use common_runtime::{Builder as RuntimeBuilder, Runtime};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
use servers::grpc::region_server::{RegionServerHandler, RegionServerRequestHandler};
use table::metadata::{RawTableMeta, TableIdent, TableType};
use table::requests::TableOptions;
use tokio::sync::mpsc;
use tonic::transport::Server;
use tower::service_fn;
use api::v1::{ColumnDataType, ColumnDef as PbColumnDef, CreateTableExpr};
use super::*;
use crate::handler::{HeartbeatMailbox, Pushers};
use crate::sequence::Sequence;
use crate::service::store::kv::KvBackendAdapter;
use crate::service::store::memory::MemStore;
use crate::test_util::new_region_route;
use crate::procedure::utils::mock::EchoRegionServer;
use crate::procedure::utils::test_data;
fn create_table_procedure() -> CreateTableProcedure {
let create_table_expr = CreateTableExpr {
@@ -442,80 +418,11 @@ mod test {
engine: MITO2_ENGINE.to_string(),
};
let raw_table_info = RawTableInfo {
ident: TableIdent::new(42),
name: "my_table".to_string(),
desc: Some("blabla".to_string()),
catalog_name: "my_catalog".to_string(),
schema_name: "my_schema".to_string(),
meta: RawTableMeta {
schema: RawSchema {
column_schemas: vec![
ColumnSchema::new(
"ts".to_string(),
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
ColumnSchema::new(
"my_tag1".to_string(),
ConcreteDataType::string_datatype(),
true,
),
ColumnSchema::new(
"my_tag2".to_string(),
ConcreteDataType::string_datatype(),
true,
),
ColumnSchema::new(
"my_field_column".to_string(),
ConcreteDataType::int32_datatype(),
true,
),
],
timestamp_index: Some(0),
version: 0,
},
primary_key_indices: vec![1, 2],
value_indices: vec![2],
engine: MITO2_ENGINE.to_string(),
next_column_id: 3,
region_numbers: vec![1, 2, 3],
engine_options: HashMap::new(),
options: TableOptions::default(),
created_on: DateTime::default(),
partition_key_indices: vec![],
},
table_type: TableType::Base,
};
let peers = vec![
Peer::new(1, "127.0.0.1:4001"),
Peer::new(2, "127.0.0.1:4002"),
Peer::new(3, "127.0.0.1:4003"),
];
let region_routes = vec![
new_region_route(1, &peers, 3),
new_region_route(2, &peers, 2),
new_region_route(3, &peers, 1),
];
let kv_store = Arc::new(MemStore::new());
let mailbox_sequence = Sequence::new("test_heartbeat_mailbox", 0, 100, kv_store.clone());
let mailbox = HeartbeatMailbox::create(Pushers::default(), mailbox_sequence);
CreateTableProcedure::new(
1,
CreateTableTask::new(create_table_expr, vec![], raw_table_info),
region_routes,
DdlContext {
datanode_clients: Arc::new(DatanodeClients::default()),
mailbox,
server_addr: "127.0.0.1:4321".to_string(),
table_metadata_manager: Arc::new(TableMetadataManager::new(
KvBackendAdapter::wrap(kv_store),
)),
},
CreateTableTask::new(create_table_expr, vec![], test_data::new_table_info()),
test_data::new_region_routes(),
test_data::new_ddl_context(),
)
}
@@ -571,79 +478,11 @@ mod test {
assert_eq!(template, expected);
}
#[derive(Clone)]
struct TestingRegionServerHandler {
runtime: Arc<Runtime>,
create_region_notifier: mpsc::Sender<RegionId>,
}
impl TestingRegionServerHandler {
fn new(create_region_notifier: mpsc::Sender<RegionId>) -> Self {
Self {
runtime: Arc::new(RuntimeBuilder::default().worker_threads(2).build().unwrap()),
create_region_notifier,
}
}
fn new_client(&self, datanode: &Peer) -> Client {
let (client, server) = tokio::io::duplex(1024);
let handler =
RegionServerRequestHandler::new(Arc::new(self.clone()), self.runtime.clone());
tokio::spawn(async move {
Server::builder()
.add_service(RegionServer::new(handler))
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(
server,
)]))
.await
});
let channel_manager = ChannelManager::new();
let mut client = Some(client);
channel_manager
.reset_with_connector(
datanode.addr.clone(),
service_fn(move |_| {
let client = client.take().unwrap();
async move { Ok::<_, std::io::Error>(client) }
}),
)
.unwrap();
Client::with_manager_and_urls(channel_manager, vec![datanode.addr.clone()])
}
}
#[async_trait]
impl RegionServerHandler for TestingRegionServerHandler {
async fn handle(&self, request: PbRegionRequest) -> servers::error::Result<RegionResponse> {
let PbRegionRequest::Create(request) = request else {
unreachable!()
};
let region_id = request.region_id.into();
self.create_region_notifier.send(region_id).await.unwrap();
Ok(RegionResponse {
header: Some(ResponseHeader {
status: Some(PbStatus {
status_code: 0,
err_msg: "".to_string(),
}),
}),
affected_rows: 0,
})
}
}
#[tokio::test]
async fn test_on_datanode_create_regions() {
let mut procedure = create_table_procedure();
let (tx, mut rx) = mpsc::channel(10);
let region_server = TestingRegionServerHandler::new(tx);
let (region_server, mut rx) = EchoRegionServer::new();
let datanodes = find_leaders(&procedure.creator.data.region_routes);
for peer in datanodes {
@@ -664,7 +503,9 @@ mod test {
let expected_created_regions = expected_created_regions.clone();
let mut max_recv = expected_created_regions.lock().unwrap().len();
async move {
while let Some(region_id) = rx.recv().await {
while let Some(PbRegionRequest::Create(request)) = rx.recv().await {
let region_id = RegionId::from_u64(request.region_id);
expected_created_regions.lock().unwrap().remove(&region_id);
max_recv -= 1;

View File

@@ -13,10 +13,12 @@
// limitations under the License.
use api::v1::meta::MailboxMessage;
use api::v1::region::{region_request, DropRequest as PbDropRegionRequest};
use api::v1::DropTableExpr;
use async_trait::async_trait;
use client::region::RegionRequester;
use client::Database;
use common_catalog::consts::MITO_ENGINE;
use common_catalog::consts::MITO2_ENGINE;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_meta::ident::TableIdent;
@@ -25,7 +27,7 @@ use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_name::TableNameKey;
use common_meta::key::table_route::TableRouteValue;
use common_meta::rpc::ddl::DropTableTask;
use common_meta::rpc::router::{find_leaders, RegionRoute};
use common_meta::rpc::router::{find_leader_regions, find_leaders, RegionRoute};
use common_meta::table_name::TableName;
use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
use common_procedure::{
@@ -35,12 +37,15 @@ use common_telemetry::{debug, info};
use futures::future::join_all;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use store_api::storage::RegionId;
use strum::AsRefStr;
use table::engine::TableReference;
use table::metadata::{RawTableInfo, TableId};
use super::utils::handle_retry_error;
use crate::ddl::DdlContext;
use crate::error::{self, Result, TableMetadataManagerSnafu};
use crate::metrics;
use crate::procedure::utils::handle_request_datanode_error;
use crate::service::mailbox::BroadcastChannel;
pub struct DropTableProcedure {
@@ -118,14 +123,14 @@ impl DropTableProcedure {
/// Broadcasts invalidate table cache instruction.
async fn on_broadcast(&mut self) -> Result<Status> {
let table_name = self.data.table_name();
let engine = &self.data.table_info().meta.engine;
let table_ident = TableIdent {
catalog: table_name.catalog_name,
schema: table_name.schema_name,
table: table_name.table_name,
table_id: self.data.task.table_id,
// TODO(weny): retrieves the engine from the upper.
engine: MITO_ENGINE.to_string(),
engine: engine.to_string(),
};
let instruction = Instruction::InvalidateTableCache(table_ident);
@@ -145,11 +150,60 @@ impl DropTableProcedure {
.broadcast(&BroadcastChannel::Frontend, msg)
.await?;
self.data.state = DropTableState::DatanodeDropTable;
self.data.state = if engine == MITO2_ENGINE {
DropTableState::DatanodeDropRegions
} else {
DropTableState::DatanodeDropTable
};
Ok(Status::executing(true))
}
async fn on_datanode_drop_regions(&self) -> Result<Status> {
let table_id = self.data.table_id();
let region_routes = &self.data.region_routes();
let leaders = find_leaders(region_routes);
let mut drop_region_tasks = Vec::with_capacity(leaders.len());
for datanode in leaders {
let clients = self.context.datanode_clients.clone();
let regions = find_leader_regions(region_routes, &datanode);
let region_ids = regions
.iter()
.map(|region_number| RegionId::new(table_id, *region_number))
.collect::<Vec<_>>();
drop_region_tasks.push(async move {
for region_id in region_ids {
debug!("Dropping region {region_id} on Datanode {datanode:?}");
let request = region_request::Body::Drop(PbDropRegionRequest {
region_id: region_id.as_u64(),
});
let client = clients.get_client(&datanode).await;
let requester = RegionRequester::new(client);
if let Err(err) = requester.handle(request).await {
if err.status_code() != StatusCode::RegionNotFound {
return Err(handle_request_datanode_error(datanode)(err));
}
}
}
Ok(())
});
}
join_all(drop_region_tasks)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
Ok(Status::Done)
}
/// Executes drop table instruction on datanode.
async fn on_datanode_drop_table(&mut self) -> Result<Status> {
let region_routes = &self.data.region_routes();
@@ -202,11 +256,19 @@ impl Procedure for DropTableProcedure {
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &self.data.state;
let _timer = common_telemetry::timer!(
metrics::METRIC_META_PROCEDURE_DROP_TABLE,
&[("step", state.as_ref().to_string())]
);
match self.data.state {
DropTableState::Prepare => self.on_prepare().await,
DropTableState::RemoveMetadata => self.on_remove_metadata().await,
DropTableState::InvalidateTableCache => self.on_broadcast().await,
DropTableState::DatanodeDropTable => self.on_datanode_drop_table().await,
DropTableState::DatanodeDropRegions => self.on_datanode_drop_regions().await,
}
.map_err(handle_retry_error)
}
@@ -273,7 +335,7 @@ impl DropTableData {
}
}
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, AsRefStr)]
enum DropTableState {
/// Prepares to drop the table
Prepare,
@@ -283,4 +345,74 @@ enum DropTableState {
InvalidateTableCache,
/// Datanode drops the table
DatanodeDropTable,
/// Drop regions on Datanode
DatanodeDropRegions,
}
#[cfg(test)]
mod test {
use std::collections::HashSet;
use std::sync::{Arc, Mutex};
use super::*;
use crate::procedure::utils::mock::EchoRegionServer;
use crate::procedure::utils::test_data;
#[tokio::test]
async fn test_on_datanode_drop_regions() {
let drop_table_task = DropTableTask {
catalog: "my_catalog".to_string(),
schema: "my_schema".to_string(),
table: "my_table".to_string(),
table_id: 42,
};
let procedure = DropTableProcedure::new(
1,
drop_table_task,
TableRouteValue::new(test_data::new_region_routes()),
TableInfoValue::new(test_data::new_table_info()),
test_data::new_ddl_context(),
);
let (region_server, mut rx) = EchoRegionServer::new();
let datanodes = find_leaders(&procedure.data.table_route_value.region_routes);
for peer in datanodes {
let client = region_server.new_client(&peer);
procedure
.context
.datanode_clients
.insert_client(peer, client)
.await;
}
let expected_dropped_regions = Arc::new(Mutex::new(HashSet::from([
RegionId::new(42, 1),
RegionId::new(42, 2),
RegionId::new(42, 3),
])));
let handle = tokio::spawn({
let expected_dropped_regions = expected_dropped_regions.clone();
let mut max_recv = expected_dropped_regions.lock().unwrap().len();
async move {
while let Some(region_request::Body::Drop(request)) = rx.recv().await {
let region_id = RegionId::from_u64(request.region_id);
expected_dropped_regions.lock().unwrap().remove(&region_id);
max_recv -= 1;
if max_recv == 0 {
break;
}
}
}
});
let status = procedure.on_datanode_drop_regions().await.unwrap();
assert!(matches!(status, Status::Done));
handle.await.unwrap();
assert!(expected_dropped_regions.lock().unwrap().is_empty());
}
}

View File

@@ -42,3 +42,189 @@ pub fn handle_retry_error(e: Error) -> ProcedureError {
ProcedureError::external(e)
}
}
#[cfg(test)]
pub mod mock {
use std::io::Error;
use std::sync::Arc;
use api::v1::region::region_server::RegionServer;
use api::v1::region::{region_request, RegionResponse};
use api::v1::{ResponseHeader, Status as PbStatus};
use async_trait::async_trait;
use client::Client;
use common_grpc::channel_manager::ChannelManager;
use common_meta::peer::Peer;
use common_runtime::{Builder as RuntimeBuilder, Runtime};
use servers::grpc::region_server::{RegionServerHandler, RegionServerRequestHandler};
use tokio::sync::mpsc;
use tonic::transport::Server;
use tower::service_fn;
/// An mock implementation of region server that simply echoes the request.
#[derive(Clone)]
pub struct EchoRegionServer {
runtime: Arc<Runtime>,
received_requests: mpsc::Sender<region_request::Body>,
}
impl EchoRegionServer {
pub fn new() -> (Self, mpsc::Receiver<region_request::Body>) {
let (tx, rx) = mpsc::channel(10);
(
Self {
runtime: Arc::new(RuntimeBuilder::default().worker_threads(2).build().unwrap()),
received_requests: tx,
},
rx,
)
}
pub fn new_client(&self, datanode: &Peer) -> Client {
let (client, server) = tokio::io::duplex(1024);
let handler =
RegionServerRequestHandler::new(Arc::new(self.clone()), self.runtime.clone());
tokio::spawn(async move {
Server::builder()
.add_service(RegionServer::new(handler))
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, Error>(server)]))
.await
});
let channel_manager = ChannelManager::new();
let mut client = Some(client);
channel_manager
.reset_with_connector(
datanode.addr.clone(),
service_fn(move |_| {
let client = client.take().unwrap();
async move { Ok::<_, Error>(client) }
}),
)
.unwrap();
Client::with_manager_and_urls(channel_manager, vec![datanode.addr.clone()])
}
}
#[async_trait]
impl RegionServerHandler for EchoRegionServer {
async fn handle(
&self,
request: region_request::Body,
) -> servers::error::Result<RegionResponse> {
self.received_requests.send(request).await.unwrap();
Ok(RegionResponse {
header: Some(ResponseHeader {
status: Some(PbStatus {
status_code: 0,
err_msg: "".to_string(),
}),
}),
affected_rows: 0,
})
}
}
}
#[cfg(test)]
pub mod test_data {
use std::collections::HashMap;
use std::sync::Arc;
use chrono::DateTime;
use client::client_manager::DatanodeClients;
use common_catalog::consts::MITO2_ENGINE;
use common_meta::key::TableMetadataManager;
use common_meta::peer::Peer;
use common_meta::rpc::router::RegionRoute;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType};
use table::requests::TableOptions;
use crate::ddl::DdlContext;
use crate::handler::{HeartbeatMailbox, Pushers};
use crate::sequence::Sequence;
use crate::service::store::kv::KvBackendAdapter;
use crate::service::store::memory::MemStore;
use crate::test_util::new_region_route;
pub fn new_region_routes() -> Vec<RegionRoute> {
let peers = vec![
Peer::new(1, "127.0.0.1:4001"),
Peer::new(2, "127.0.0.1:4002"),
Peer::new(3, "127.0.0.1:4003"),
];
vec![
new_region_route(1, &peers, 3),
new_region_route(2, &peers, 2),
new_region_route(3, &peers, 1),
]
}
pub fn new_table_info() -> RawTableInfo {
RawTableInfo {
ident: TableIdent::new(42),
name: "my_table".to_string(),
desc: Some("blabla".to_string()),
catalog_name: "my_catalog".to_string(),
schema_name: "my_schema".to_string(),
meta: RawTableMeta {
schema: RawSchema {
column_schemas: vec![
ColumnSchema::new(
"ts".to_string(),
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
ColumnSchema::new(
"my_tag1".to_string(),
ConcreteDataType::string_datatype(),
true,
),
ColumnSchema::new(
"my_tag2".to_string(),
ConcreteDataType::string_datatype(),
true,
),
ColumnSchema::new(
"my_field_column".to_string(),
ConcreteDataType::int32_datatype(),
true,
),
],
timestamp_index: Some(0),
version: 0,
},
primary_key_indices: vec![1, 2],
value_indices: vec![2],
engine: MITO2_ENGINE.to_string(),
next_column_id: 3,
region_numbers: vec![1, 2, 3],
engine_options: HashMap::new(),
options: TableOptions::default(),
created_on: DateTime::default(),
partition_key_indices: vec![],
},
table_type: TableType::Base,
}
}
pub(crate) fn new_ddl_context() -> DdlContext {
let kv_store = Arc::new(MemStore::new());
let mailbox_sequence = Sequence::new("test_heartbeat_mailbox", 0, 100, kv_store.clone());
let mailbox = HeartbeatMailbox::create(Pushers::default(), mailbox_sequence);
let kv_backend = KvBackendAdapter::wrap(kv_store);
DdlContext {
datanode_clients: Arc::new(DatanodeClients::default()),
mailbox,
server_addr: "127.0.0.1:4321".to_string(),
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)),
}
}
}

View File

@@ -467,6 +467,7 @@ pub fn status_to_tonic_code(status_code: StatusCode) -> Code {
StatusCode::Cancelled => Code::Cancelled,
StatusCode::TableAlreadyExists | StatusCode::TableColumnExists => Code::AlreadyExists,
StatusCode::TableNotFound
| StatusCode::RegionNotFound
| StatusCode::TableColumnNotFound
| StatusCode::DatabaseNotFound
| StatusCode::UserNotFound => Code::NotFound,