feat: region alive keeper in Datanode (#1780)

This commit is contained in:
LFC
2023-06-19 14:50:33 +08:00
committed by GitHub
parent 960b84262b
commit 128c6ec98c
29 changed files with 601 additions and 244 deletions

91
Cargo.lock generated
View File

@@ -209,8 +209,8 @@ dependencies = [
"greptime-proto",
"prost",
"snafu",
"tonic 0.9.2",
"tonic-build 0.9.2",
"tonic",
"tonic-build",
]
[[package]]
@@ -382,7 +382,7 @@ dependencies = [
"paste",
"prost",
"tokio",
"tonic 0.9.2",
"tonic",
]
[[package]]
@@ -1538,7 +1538,7 @@ dependencies = [
"substrait 0.7.5",
"tokio",
"tokio-stream",
"tonic 0.9.2",
"tonic",
"tracing",
"tracing-subscriber",
]
@@ -1760,7 +1760,7 @@ dependencies = [
"rand",
"snafu",
"tokio",
"tonic 0.9.2",
"tonic",
"tower",
]
@@ -2005,7 +2005,7 @@ checksum = "c2895653b4d9f1538a83970077cb01dfc77a4810524e51a110944688e916b18e"
dependencies = [
"prost",
"prost-types",
"tonic 0.9.2",
"tonic",
"tracing-core",
]
@@ -2027,7 +2027,7 @@ dependencies = [
"thread_local",
"tokio",
"tokio-stream",
"tonic 0.9.2",
"tonic",
"tracing",
"tracing-core",
"tracing-subscriber",
@@ -2647,7 +2647,7 @@ dependencies = [
"tokio",
"tokio-stream",
"toml",
"tonic 0.9.2",
"tonic",
"tower",
"tower-http",
"url",
@@ -3025,16 +3025,16 @@ dependencies = [
[[package]]
name = "etcd-client"
version = "0.10.4"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4319dc0fb739a6e84cb8678b8cf50c9bcfa4712ae826b33ecf00cc0850550a58"
checksum = "f4b0ea5ef6dc2388a4b1669fa32097249bc03a15417b97cb75e38afb309e4a89"
dependencies = [
"http",
"prost",
"tokio",
"tokio-stream",
"tonic 0.8.3",
"tonic-build 0.8.4",
"tonic",
"tonic-build",
"tower",
"tower-service",
]
@@ -3257,7 +3257,7 @@ dependencies = [
"table",
"tokio",
"toml",
"tonic 0.9.2",
"tonic",
"tower",
"uuid",
]
@@ -4096,13 +4096,13 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=4398d20c56d5f7939cc2960789cb1fa7dd18e6fe#4398d20c56d5f7939cc2960789cb1fa7dd18e6fe"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=aee86f4a68c59873961c9b99ee7ed6a4341bf773#aee86f4a68c59873961c9b99ee7ed6a4341bf773"
dependencies = [
"prost",
"serde",
"serde_json",
"tonic 0.9.2",
"tonic-build 0.9.2",
"tonic",
"tonic-build",
]
[[package]]
@@ -5141,7 +5141,7 @@ dependencies = [
"table",
"tokio",
"tokio-stream",
"tonic 0.9.2",
"tonic",
"tower",
"tracing",
"tracing-subscriber",
@@ -5188,7 +5188,7 @@ dependencies = [
"table",
"tokio",
"tokio-stream",
"tonic 0.9.2",
"tonic",
"tower",
"tracing",
"tracing-subscriber",
@@ -8584,7 +8584,7 @@ dependencies = [
"tokio-rustls 0.24.0",
"tokio-stream",
"tokio-test",
"tonic 0.9.2",
"tonic",
"tonic-reflection",
"tower",
"tower-http",
@@ -9138,8 +9138,8 @@ dependencies = [
"table",
"tokio",
"tokio-util",
"tonic 0.9.2",
"tonic-build 0.9.2",
"tonic",
"tonic-build",
"uuid",
]
@@ -9596,7 +9596,7 @@ dependencies = [
"table",
"tempfile",
"tokio",
"tonic 0.9.2",
"tonic",
"tower",
"uuid",
]
@@ -9971,38 +9971,6 @@ dependencies = [
"winnow",
]
[[package]]
name = "tonic"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f219fad3b929bef19b1f86fbc0358d35daed8f2cac972037ac0dc10bbb8d5fb"
dependencies = [
"async-stream",
"async-trait",
"axum",
"base64 0.13.1",
"bytes",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"hyper",
"hyper-timeout",
"percent-encoding",
"pin-project",
"prost",
"prost-derive",
"tokio",
"tokio-stream",
"tokio-util",
"tower",
"tower-layer",
"tower-service",
"tracing",
"tracing-futures",
]
[[package]]
name = "tonic"
version = "0.9.2"
@@ -10034,19 +10002,6 @@ dependencies = [
"tracing",
]
[[package]]
name = "tonic-build"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5bf5e9b9c0f7e0a7c027dcfaba7b2c60816c7049171f679d99ee2ff65d0de8c4"
dependencies = [
"prettyplease 0.1.25",
"proc-macro2",
"prost-build",
"quote",
"syn 1.0.109",
]
[[package]]
name = "tonic-build"
version = "0.9.2"
@@ -10070,7 +10025,7 @@ dependencies = [
"prost-types",
"tokio",
"tokio-stream",
"tonic 0.9.2",
"tonic",
]
[[package]]

View File

@@ -72,7 +72,7 @@ datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "63e52dde9e44cac4b1f6c6e6b6bf6368ba3bd323" }
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "4398d20c56d5f7939cc2960789cb1fa7dd18e6fe" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "aee86f4a68c59873961c9b99ee7ed6a4341bf773" }
itertools = "0.10"
parquet = "40.0"
paste = "1.0"

View File

@@ -32,7 +32,7 @@ pub mod mock;
// FIXME(LFC): Used in next PR.
#[allow(dead_code)]
mod region_alive_keeper;
pub mod region_alive_keeper;
#[derive(Debug, Clone)]
pub struct Kv(pub Vec<u8>, pub Vec<u8>);

View File

@@ -20,13 +20,14 @@ use std::sync::Arc;
use async_stream::stream;
use async_trait::async_trait;
use common_catalog::consts::{MAX_SYS_TABLE_ID, MITO_ENGINE};
use common_meta::ident::TableIdent;
use common_telemetry::{debug, error, info, warn};
use dashmap::DashMap;
use futures::Stream;
use futures_util::{StreamExt, TryStreamExt};
use metrics::{decrement_gauge, increment_gauge};
use parking_lot::RwLock;
use snafu::{OptionExt, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use table::engine::manager::TableEngineManagerRef;
use table::engine::{EngineContext, TableReference};
use table::requests::{CreateTableRequest, OpenTableRequest};
@@ -43,6 +44,7 @@ use crate::helper::{
build_table_regional_prefix, CatalogKey, CatalogValue, SchemaKey, SchemaValue, TableGlobalKey,
TableGlobalValue, TableRegionalKey, TableRegionalValue, CATALOG_KEY_PREFIX,
};
use crate::remote::region_alive_keeper::RegionAliveKeepers;
use crate::remote::{Kv, KvBackendRef};
use crate::{
handle_system_table_request, CatalogManager, CatalogProvider, CatalogProviderRef,
@@ -57,16 +59,23 @@ pub struct RemoteCatalogManager {
catalogs: Arc<RwLock<DashMap<String, CatalogProviderRef>>>,
engine_manager: TableEngineManagerRef,
system_table_requests: Mutex<Vec<RegisterSystemTableRequest>>,
region_alive_keepers: Arc<RegionAliveKeepers>,
}
impl RemoteCatalogManager {
pub fn new(engine_manager: TableEngineManagerRef, node_id: u64, backend: KvBackendRef) -> Self {
pub fn new(
engine_manager: TableEngineManagerRef,
node_id: u64,
backend: KvBackendRef,
region_alive_keepers: Arc<RegionAliveKeepers>,
) -> Self {
Self {
engine_manager,
node_id,
backend,
catalogs: Default::default(),
system_table_requests: Default::default(),
region_alive_keepers,
}
}
@@ -576,34 +585,44 @@ impl CatalogManager for RemoteCatalogManager {
}
async fn register_table(&self, request: RegisterTableRequest) -> Result<bool> {
let catalog_name = request.catalog;
let schema_name = request.schema;
let catalog = &request.catalog;
let schema = &request.schema;
let table_name = &request.table_name;
let schema_provider = self
.catalog(&catalog_name)
.catalog(catalog)
.await?
.context(CatalogNotFoundSnafu {
catalog_name: &catalog_name,
catalog_name: catalog,
})?
.schema(&schema_name)
.schema(schema)
.await?
.with_context(|| SchemaNotFoundSnafu {
catalog: &catalog_name,
schema: &schema_name,
})?;
if schema_provider.table_exist(&request.table_name).await? {
return TableExistsSnafu {
table: format!("{}.{}.{}", &catalog_name, &schema_name, &request.table_name),
.context(SchemaNotFoundSnafu { catalog, schema })?;
ensure!(
!schema_provider.table_exist(table_name).await?,
TableExistsSnafu {
table: common_catalog::format_full_table_name(catalog, schema, table_name),
}
.fail();
}
);
increment_gauge!(
crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT,
1.0,
&[crate::metrics::db_label(&catalog_name, &schema_name)],
&[crate::metrics::db_label(catalog, schema)],
);
schema_provider
.register_table(request.table_name, request.table)
.register_table(table_name.to_string(), request.table.clone())
.await?;
let table_ident = TableIdent {
catalog: request.catalog,
schema: request.schema,
table: request.table_name,
table_id: request.table_id,
engine: request.table.table_info().meta.engine.clone(),
};
self.region_alive_keepers
.register_table(table_ident, request.table)
.await?;
Ok(true)
@@ -626,6 +645,21 @@ impl CatalogManager for RemoteCatalogManager {
1.0,
&[crate::metrics::db_label(catalog_name, schema_name)],
);
if let Some(table) = result.as_ref() {
let table_info = table.table_info();
let table_ident = TableIdent {
catalog: request.catalog,
schema: request.schema,
table: request.table_name,
table_id: table_info.ident.table_id,
engine: table_info.meta.engine.clone(),
};
self.region_alive_keepers
.deregister_table(&table_ident)
.await;
}
Ok(result.is_none())
}

View File

@@ -12,15 +12,193 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::instruction::TableIdent;
use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;
use common_meta::ident::TableIdent;
use common_meta::RegionIdent;
use common_telemetry::{debug, error, info, warn};
use snafu::ResultExt;
use store_api::storage::RegionNumber;
use table::engine::manager::TableEngineManagerRef;
use table::engine::{CloseTableResult, EngineContext, TableEngineRef};
use table::requests::CloseTableRequest;
use tokio::sync::mpsc;
use table::TableRef;
use tokio::sync::{mpsc, Mutex};
use tokio::task::JoinHandle;
use tokio::time::{Duration, Instant};
use crate::error::{Result, TableEngineNotFoundSnafu};
/// [RegionAliveKeepers] manages all [RegionAliveKeeper] in a scope of tables.
pub struct RegionAliveKeepers {
table_engine_manager: TableEngineManagerRef,
keepers: Arc<Mutex<HashMap<TableIdent, Arc<RegionAliveKeeper>>>>,
}
impl RegionAliveKeepers {
pub fn new(table_engine_manager: TableEngineManagerRef) -> Self {
Self {
table_engine_manager,
keepers: Arc::new(Mutex::new(HashMap::new())),
}
}
async fn find_keeper(&self, table_ident: &TableIdent) -> Option<Arc<RegionAliveKeeper>> {
self.keepers.lock().await.get(table_ident).cloned()
}
pub(crate) async fn register_table(
&self,
table_ident: TableIdent,
table: TableRef,
) -> Result<()> {
let keeper = self.find_keeper(&table_ident).await;
if keeper.is_some() {
return Ok(());
}
let table_engine = self
.table_engine_manager
.engine(&table_ident.engine)
.context(TableEngineNotFoundSnafu {
engine_name: &table_ident.engine,
})?;
let keeper = Arc::new(RegionAliveKeeper::new(table_engine, table_ident.clone()));
for r in table.table_info().meta.region_numbers.iter() {
keeper.register_region(*r).await;
}
info!("Register RegionAliveKeeper for table {table_ident}");
self.keepers.lock().await.insert(table_ident, keeper);
Ok(())
}
pub(crate) async fn deregister_table(&self, table_ident: &TableIdent) {
if self.keepers.lock().await.remove(table_ident).is_some() {
info!("Deregister RegionAliveKeeper for table {table_ident}");
}
}
pub async fn register_region(&self, region_ident: &RegionIdent) {
let table_ident = &region_ident.table_ident;
let Some(keeper) = self.find_keeper(table_ident).await else {
// Alive keeper could be affected by lagging msg, just warn and ignore.
warn!("Alive keeper for region {region_ident} is not found!");
return;
};
keeper.register_region(region_ident.region_number).await
}
pub async fn deregister_region(&self, region_ident: &RegionIdent) {
let table_ident = &region_ident.table_ident;
let Some(keeper) = self.find_keeper(table_ident).await else {
// Alive keeper could be affected by lagging msg, just warn and ignore.
warn!("Alive keeper for region {region_ident} is not found!");
return;
};
keeper.deregister_region(region_ident.region_number).await
}
pub async fn start(&self, heartbeat_interval_millis: u64) {
for keeper in self.keepers.lock().await.values() {
keeper.start(heartbeat_interval_millis).await;
}
}
}
/// [RegionAliveKeeper] starts a countdown for each region in a table. When deadline is reached,
/// the region will be closed.
/// The deadline is controlled by Metasrv. It works like "lease" for regions: a Datanode submits its
/// opened regions to Metasrv, in heartbeats. If Metasrv decides some region could be resided in this
/// Datanode, it will "extend" the region's "lease", with a deadline for [RegionAliveKeeper] to
/// countdown.
struct RegionAliveKeeper {
table_engine: TableEngineRef,
table_ident: TableIdent,
countdown_task_handles: Arc<Mutex<HashMap<RegionNumber, Arc<CountdownTaskHandle>>>>,
}
impl RegionAliveKeeper {
fn new(table_engine: TableEngineRef, table_ident: TableIdent) -> Self {
Self {
table_engine,
table_ident,
countdown_task_handles: Arc::new(Mutex::new(HashMap::new())),
}
}
async fn find_handle(&self, region: &RegionNumber) -> Option<Arc<CountdownTaskHandle>> {
self.countdown_task_handles
.lock()
.await
.get(region)
.cloned()
}
async fn register_region(&self, region: RegionNumber) {
if self.find_handle(&region).await.is_some() {
return;
}
let countdown_task_handles = self.countdown_task_handles.clone();
let on_task_finished = async move {
let _ = countdown_task_handles.lock().await.remove(&region);
};
let handle = Arc::new(CountdownTaskHandle::new(
self.table_engine.clone(),
self.table_ident.clone(),
region,
|| on_task_finished,
));
self.countdown_task_handles
.lock()
.await
.insert(region, handle);
info!(
"Register alive countdown for new region {region} in table {}",
self.table_ident
)
}
async fn deregister_region(&self, region: RegionNumber) {
if self
.countdown_task_handles
.lock()
.await
.remove(&region)
.is_some()
{
info!(
"Deregister alive countdown for region {region} in table {}",
self.table_ident
)
}
}
async fn start(&self, heartbeat_interval_millis: u64) {
for handle in self.countdown_task_handles.lock().await.values() {
handle.start(heartbeat_interval_millis).await;
}
info!(
"RegionAliveKeeper for table {} is started!",
self.table_ident
)
}
async fn keep_lived(&self, designated_regions: Vec<RegionNumber>, deadline: Instant) {
for region in designated_regions {
if let Some(handle) = self.find_handle(&region).await {
handle.reset_deadline(deadline).await;
}
// Else the region alive keeper might be triggered by lagging messages, we can safely ignore it.
}
}
}
#[derive(Debug)]
enum CountdownCommand {
Start(u64),
@@ -42,14 +220,14 @@ impl CountdownTaskHandle {
/// be invoked if the task is cancelled (by dropping the handle). This is because we want something
/// meaningful to be done when the task is finished, e.g. deregister the handle from the map.
/// While dropping the handle does not necessarily mean the task is finished.
fn new<F>(
fn new<Fut>(
table_engine: TableEngineRef,
table_ident: TableIdent,
region: RegionNumber,
on_task_finished: F,
on_task_finished: impl FnOnce() -> Fut + Send + 'static,
) -> Self
where
F: FnOnce() + Send + 'static,
Fut: Future<Output = ()> + Send,
{
let (tx, rx) = mpsc::channel(1024);
@@ -60,7 +238,8 @@ impl CountdownTaskHandle {
rx,
};
let handler = common_runtime::spawn_bg(async move {
countdown_task.run(on_task_finished).await;
countdown_task.run().await;
on_task_finished().await;
});
Self { tx, handler }
@@ -103,10 +282,7 @@ struct CountdownTask {
}
impl CountdownTask {
async fn run<F>(&mut self, on_task_finished: F)
where
F: FnOnce() + Send + 'static,
{
async fn run(&mut self) {
// 30 years. See `Instant::far_future`.
let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30);
@@ -133,16 +309,8 @@ impl CountdownTask {
debug!("Reset deadline to region {region} of table {table_ident} to {deadline:?}");
countdown.set(tokio::time::sleep_until(deadline));
}
// Else we have received a past deadline, it could be the following
// possible reasons:
// 1. the clock drift happened in Metasrv or Datanode;
// 2. some messages are lagged;
// 3. during the period of Datanode startup.
// We can safely ignore case 2 and 3. However, case 1 is catastrophic.
// We must think of a way to resolve it, maybe using logical clock, or
// simply fire an alarm for it? For now, we can tolerate that, because it's
// seconds resolution to deadline, and clock drift is less likely
// to happen in that resolution.
// Else the countdown could be not started yet, or during startup protection.
// Can be safely ignored.
},
None => {
info!(
@@ -168,8 +336,6 @@ impl CountdownTask {
}
}
}
on_task_finished();
}
async fn close_region(&self) -> CloseTableResult {
@@ -202,12 +368,142 @@ mod test {
use std::sync::Arc;
use datatypes::schema::RawSchema;
use table::engine::manager::MemoryTableEngineManager;
use table::engine::{TableEngine, TableReference};
use table::requests::{CreateTableRequest, TableOptions};
use table::test_util::EmptyTable;
use tokio::sync::oneshot;
use super::*;
use crate::remote::mock::MockTableEngine;
#[tokio::test(flavor = "multi_thread")]
async fn test_region_alive_keepers() {
let table_engine = Arc::new(MockTableEngine::default());
let table_engine_manager = Arc::new(MemoryTableEngineManager::new(table_engine));
let keepers = RegionAliveKeepers::new(table_engine_manager);
let catalog = "my_catalog";
let schema = "my_schema";
let table = "my_table";
let table_ident = TableIdent {
catalog: catalog.to_string(),
schema: schema.to_string(),
table: table.to_string(),
table_id: 1,
engine: "MockTableEngine".to_string(),
};
let table = Arc::new(EmptyTable::new(CreateTableRequest {
id: 1,
catalog_name: catalog.to_string(),
schema_name: schema.to_string(),
table_name: table.to_string(),
desc: None,
schema: RawSchema {
column_schemas: vec![],
timestamp_index: None,
version: 0,
},
region_numbers: vec![1, 2, 3],
primary_key_indices: vec![],
create_if_not_exists: false,
table_options: TableOptions::default(),
engine: "MockTableEngine".to_string(),
}));
keepers
.register_table(table_ident.clone(), table)
.await
.unwrap();
assert!(keepers.keepers.lock().await.contains_key(&table_ident));
keepers
.register_region(&RegionIdent {
cluster_id: 1,
datanode_id: 1,
table_ident: table_ident.clone(),
region_number: 4,
})
.await;
keepers.start(5000).await;
for keeper in keepers.keepers.lock().await.values() {
for handle in keeper.countdown_task_handles.lock().await.values() {
// assert countdown tasks are started
assert!(deadline(&handle.tx).await <= Instant::now() + Duration::from_secs(20));
}
}
keepers
.deregister_region(&RegionIdent {
cluster_id: 1,
datanode_id: 1,
table_ident: table_ident.clone(),
region_number: 1,
})
.await;
let mut regions = keepers
.find_keeper(&table_ident)
.await
.unwrap()
.countdown_task_handles
.lock()
.await
.keys()
.copied()
.collect::<Vec<_>>();
regions.sort();
assert_eq!(regions, vec![2, 3, 4]);
keepers.deregister_table(&table_ident).await;
assert!(keepers.keepers.lock().await.is_empty());
}
#[tokio::test(flavor = "multi_thread")]
async fn test_region_alive_keeper() {
let table_engine = Arc::new(MockTableEngine::default());
let table_ident = TableIdent {
catalog: "my_catalog".to_string(),
schema: "my_schema".to_string(),
table: "my_table".to_string(),
table_id: 1024,
engine: "mito".to_string(),
};
let keeper = RegionAliveKeeper::new(table_engine, table_ident);
let region = 1;
assert!(keeper.find_handle(&region).await.is_none());
keeper.register_region(region).await;
assert!(keeper.find_handle(&region).await.is_some());
let sender = &keeper
.countdown_task_handles
.lock()
.await
.get(&region)
.unwrap()
.tx
.clone();
let ten_seconds_later = || Instant::now() + Duration::from_secs(10);
keeper.keep_lived(vec![1, 2, 3], ten_seconds_later()).await;
assert!(keeper.find_handle(&2).await.is_none());
assert!(keeper.find_handle(&3).await.is_none());
let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 29);
// assert if keeper is not started, keep_lived is of no use
assert!(deadline(sender).await > far_future);
keeper.start(1000).await;
keeper.keep_lived(vec![1, 2, 3], ten_seconds_later()).await;
// assert keep_lived works if keeper is started
assert!(deadline(sender).await <= ten_seconds_later());
keeper.deregister_region(region).await;
assert!(keeper.find_handle(&region).await.is_none());
}
#[tokio::test(flavor = "multi_thread")]
async fn test_countdown_task_handle() {
let table_engine = Arc::new(MockTableEngine::default());
@@ -220,10 +516,12 @@ mod test {
};
let finished = Arc::new(AtomicBool::new(false));
let finished_clone = finished.clone();
let handle =
CountdownTaskHandle::new(table_engine.clone(), table_ident.clone(), 1, move || {
finished_clone.store(true, Ordering::Relaxed)
});
let handle = CountdownTaskHandle::new(
table_engine.clone(),
table_ident.clone(),
1,
|| async move { finished_clone.store(true, Ordering::Relaxed) },
);
let tx = handle.tx.clone();
// assert countdown task is running
@@ -244,7 +542,7 @@ mod test {
let finished = Arc::new(AtomicBool::new(false));
let finished_clone = finished.clone();
let handle = CountdownTaskHandle::new(table_engine, table_ident, 1, move || {
let handle = CountdownTaskHandle::new(table_engine, table_ident, 1, || async move {
finished_clone.store(true, Ordering::Relaxed)
});
handle.tx.send(CountdownCommand::Start(100)).await.unwrap();
@@ -296,15 +594,9 @@ mod test {
rx,
};
common_runtime::spawn_bg(async move {
task.run(|| ()).await;
task.run().await;
});
async fn deadline(tx: &mpsc::Sender<CountdownCommand>) -> Instant {
let (s, r) = tokio::sync::oneshot::channel();
tx.send(CountdownCommand::Deadline(s)).await.unwrap();
r.await.unwrap()
}
// if countdown task is not started, its deadline is set to far future
assert!(deadline(&tx).await > Instant::now() + Duration::from_secs(86400 * 365 * 29));
@@ -326,4 +618,10 @@ mod test {
tokio::time::sleep(Duration::from_millis(2000)).await;
assert!(!table_engine.table_exists(ctx, &table_ref));
}
async fn deadline(tx: &mpsc::Sender<CountdownCommand>) -> Instant {
let (s, r) = oneshot::channel();
tx.send(CountdownCommand::Deadline(s)).await.unwrap();
r.await.unwrap()
}
}

View File

@@ -22,6 +22,7 @@ mod tests {
use catalog::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue};
use catalog::remote::mock::{MockKvBackend, MockTableEngine};
use catalog::remote::region_alive_keeper::RegionAliveKeepers;
use catalog::remote::{
CachedMetaKvBackend, KvBackend, KvBackendRef, RemoteCatalogManager, RemoteCatalogProvider,
RemoteSchemaProvider,
@@ -138,8 +139,12 @@ mod tests {
table_engine.clone(),
));
let catalog_manager =
RemoteCatalogManager::new(engine_manager.clone(), node_id, cached_backend.clone());
let catalog_manager = RemoteCatalogManager::new(
engine_manager.clone(),
node_id,
cached_backend.clone(),
Arc::new(RegionAliveKeepers::new(engine_manager.clone())),
);
catalog_manager.start().await.unwrap();
(

View File

@@ -52,6 +52,9 @@ pub enum Error {
err_msg: String,
location: Location,
},
#[snafu(display("Invalid protobuf message, err: {}", err_msg))]
InvalidProtoMsg { err_msg: String, location: Location },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -61,7 +64,10 @@ impl ErrorExt for Error {
use Error::*;
match self {
IllegalServerState { .. } => StatusCode::Internal,
SerdeJson { .. } | RouteInfoCorrupted { .. } => StatusCode::Unexpected,
SerdeJson { .. } | RouteInfoCorrupted { .. } | InvalidProtoMsg { .. } => {
StatusCode::Unexpected
}
SendMessage { .. } => StatusCode::Internal,

View File

@@ -0,0 +1,57 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::{Display, Formatter};
use api::v1::meta::TableIdent as RawTableIdent;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use crate::error::{Error, InvalidProtoMsgSnafu};
#[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct TableIdent {
pub catalog: String,
pub schema: String,
pub table: String,
pub table_id: u32,
pub engine: String,
}
impl Display for TableIdent {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Table(id={}, name='{}.{}.{}', engine='{}')",
self.table_id, self.catalog, self.schema, self.table, self.engine,
)
}
}
impl TryFrom<RawTableIdent> for TableIdent {
type Error = Error;
fn try_from(value: RawTableIdent) -> Result<Self, Self::Error> {
let table_name = value.table_name.context(InvalidProtoMsgSnafu {
err_msg: "'table_name' is missing in TableIdent",
})?;
Ok(Self {
catalog: table_name.catalog_name,
schema: table_name.schema_name,
table: table_name.table_name,
table_id: value.table_id,
engine: value.engine,
})
}
}

View File

@@ -16,6 +16,7 @@ use std::fmt::{Display, Formatter};
use serde::{Deserialize, Serialize};
use crate::ident::TableIdent;
use crate::{ClusterId, DatanodeId};
#[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)]
@@ -49,25 +50,6 @@ impl From<RegionIdent> for TableIdent {
}
}
#[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct TableIdent {
pub catalog: String,
pub schema: String,
pub table: String,
pub table_id: u32,
pub engine: String,
}
impl Display for TableIdent {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"TableIdent(table_id='{}', table_name='{}.{}.{}', table_engine='{}')",
self.table_id, self.catalog, self.schema, self.table, self.engine,
)
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct SimpleReply {
pub result: bool,

View File

@@ -14,6 +14,7 @@
pub mod error;
pub mod heartbeat;
pub mod ident;
pub mod instruction;
pub mod key;
pub mod peer;

View File

@@ -131,6 +131,9 @@ impl HeartbeatTask {
let addr = resolve_addr(&self.server_addr, &self.server_hostname);
info!("Starting heartbeat to Metasrv with interval {interval}. My node id is {node_id}, address is {addr}.");
// TODO(LFC): Continued in next PR.
// self.region_alive_keepers.start(interval).await;
let meta_client = self.meta_client.clone();
let catalog_manager_clone = self.catalog_manager.clone();

View File

@@ -14,15 +14,14 @@
use std::sync::Arc;
use catalog::remote::region_alive_keeper::RegionAliveKeepers;
use catalog::{CatalogManagerRef, DeregisterTableRequest};
use common_catalog::format_full_table_name;
use common_meta::error::Result as MetaResult;
use common_meta::heartbeat::handler::{
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
};
use common_meta::instruction::{
Instruction, InstructionReply, RegionIdent, SimpleReply, TableIdent,
};
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_telemetry::{error, info, warn};
use snafu::ResultExt;
use store_api::storage::RegionNumber;
@@ -36,6 +35,7 @@ use crate::error::{self, Result};
pub struct CloseRegionHandler {
catalog_manager: CatalogManagerRef,
table_engine_manager: TableEngineManagerRef,
region_alive_keepers: Arc<RegionAliveKeepers>,
}
impl HeartbeatResponseHandler for CloseRegionHandler {
@@ -53,29 +53,26 @@ impl HeartbeatResponseHandler for CloseRegionHandler {
let mailbox = ctx.mailbox.clone();
let self_ref = Arc::new(self.clone());
let RegionIdent {
table_ident:
TableIdent {
engine,
catalog,
schema,
table,
..
},
region_number,
..
} = region_ident;
let region_alive_keepers = self.region_alive_keepers.clone();
common_runtime::spawn_bg(async move {
let table_ident = &region_ident.table_ident;
let table_ref = TableReference::full(
&table_ident.catalog,
&table_ident.schema,
&table_ident.table,
);
let result = self_ref
.close_region_inner(
engine,
&TableReference::full(&catalog, &schema, &table),
vec![region_number],
table_ident.engine.clone(),
&table_ref,
vec![region_ident.region_number],
)
.await;
if matches!(result, Ok(true)) {
region_alive_keepers.deregister_region(&region_ident).await;
}
if let Err(e) = mailbox
.send((meta, CloseRegionHandler::map_result(result)))
.await
@@ -92,10 +89,12 @@ impl CloseRegionHandler {
pub fn new(
catalog_manager: CatalogManagerRef,
table_engine_manager: TableEngineManagerRef,
region_alive_keepers: Arc<RegionAliveKeepers>,
) -> Self {
Self {
catalog_manager,
table_engine_manager,
region_alive_keepers,
}
}

View File

@@ -15,15 +15,14 @@
use std::sync::Arc;
use catalog::error::Error as CatalogError;
use catalog::remote::region_alive_keeper::RegionAliveKeepers;
use catalog::{CatalogManagerRef, RegisterTableRequest};
use common_catalog::format_full_table_name;
use common_meta::error::Result as MetaResult;
use common_meta::heartbeat::handler::{
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
};
use common_meta::instruction::{
Instruction, InstructionReply, RegionIdent, SimpleReply, TableIdent,
};
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_telemetry::{error, warn};
use snafu::ResultExt;
use store_api::storage::RegionNumber;
@@ -37,6 +36,7 @@ use crate::error::{self, Result};
pub struct OpenRegionHandler {
catalog_manager: CatalogManagerRef,
table_engine_manager: TableEngineManagerRef,
region_alive_keepers: Arc<RegionAliveKeepers>,
}
impl HeartbeatResponseHandler for OpenRegionHandler {
@@ -55,9 +55,24 @@ impl HeartbeatResponseHandler for OpenRegionHandler {
let mailbox = ctx.mailbox.clone();
let self_ref = Arc::new(self.clone());
let region_alive_keepers = self.region_alive_keepers.clone();
common_runtime::spawn_bg(async move {
let (engine, request) = OpenRegionHandler::prepare_request(region_ident);
let result = self_ref.open_region_inner(engine, request).await;
let table_ident = &region_ident.table_ident;
let request = OpenTableRequest {
catalog_name: table_ident.catalog.clone(),
schema_name: table_ident.schema.clone(),
table_name: table_ident.table.clone(),
table_id: table_ident.table_id,
region_numbers: vec![region_ident.region_number],
};
let result = self_ref
.open_region_inner(table_ident.engine.clone(), request)
.await;
if matches!(result, Ok(true)) {
region_alive_keepers.register_region(&region_ident).await;
}
if let Err(e) = mailbox
.send((meta, OpenRegionHandler::map_result(result)))
.await
@@ -73,10 +88,12 @@ impl OpenRegionHandler {
pub fn new(
catalog_manager: CatalogManagerRef,
table_engine_manager: TableEngineManagerRef,
region_alive_keepers: Arc<RegionAliveKeepers>,
) -> Self {
Self {
catalog_manager,
table_engine_manager,
region_alive_keepers,
}
}
@@ -97,32 +114,6 @@ impl OpenRegionHandler {
)
}
fn prepare_request(ident: RegionIdent) -> (String, OpenTableRequest) {
let RegionIdent {
table_ident:
TableIdent {
catalog,
schema,
table,
table_id,
engine,
},
region_number,
..
} = ident;
(
engine,
OpenTableRequest {
catalog_name: catalog,
schema_name: schema,
table_name: table,
table_id,
region_numbers: vec![region_number],
},
)
}
/// Returns true if a table or target regions have been opened.
async fn regions_opened(
&self,

View File

@@ -18,7 +18,8 @@ use std::time::Duration;
use std::{fs, path};
use api::v1::meta::Role;
use catalog::remote::CachedMetaKvBackend;
use catalog::remote::region_alive_keeper::RegionAliveKeepers;
use catalog::remote::{CachedMetaKvBackend, RemoteCatalogManager};
use catalog::{CatalogManager, CatalogManagerRef, RegisterTableRequest};
use common_base::paths::{CLUSTER_DIR, WAL_DIR};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
@@ -56,9 +57,9 @@ use table::Table;
use crate::datanode::{DatanodeOptions, ObjectStoreConfig, ProcedureConfig, WalConfig};
use crate::error::{
self, CatalogSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu,
NewCatalogSnafu, OpenLogStoreSnafu, RecoverProcedureSnafu, Result, ShutdownInstanceSnafu,
StartProcedureManagerSnafu, StopProcedureManagerSnafu,
self, CatalogSnafu, IncorrectInternalStateSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu,
MissingNodeIdSnafu, NewCatalogSnafu, OpenLogStoreSnafu, RecoverProcedureSnafu, Result,
ShutdownInstanceSnafu, StartProcedureManagerSnafu, StopProcedureManagerSnafu,
};
use crate::heartbeat::handler::close_region::CloseRegionHandler;
use crate::heartbeat::handler::open_region::OpenRegionHandler;
@@ -150,7 +151,7 @@ impl Instance {
);
// create remote catalog manager
let (catalog_manager, table_id_provider) = match opts.mode {
let (catalog_manager, table_id_provider, heartbeat_task) = match opts.mode {
Mode::Standalone => {
if opts.enable_memory_catalog {
let catalog = Arc::new(catalog::local::MemoryCatalogManager::default());
@@ -170,6 +171,7 @@ impl Instance {
(
catalog.clone() as CatalogManagerRef,
Some(catalog as TableIdProviderRef),
None,
)
} else {
let catalog = Arc::new(
@@ -181,51 +183,58 @@ impl Instance {
(
catalog.clone() as CatalogManagerRef,
Some(catalog as TableIdProviderRef),
None,
)
}
}
Mode::Distributed => {
let kv_backend = Arc::new(CachedMetaKvBackend::new(
meta_client.as_ref().unwrap().clone(),
));
let meta_client = meta_client.context(IncorrectInternalStateSnafu {
state: "meta client is not provided when creating distributed Datanode",
})?;
let catalog = Arc::new(catalog::remote::RemoteCatalogManager::new(
let kv_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone()));
let region_alive_keepers =
Arc::new(RegionAliveKeepers::new(engine_manager.clone()));
let catalog_manager = Arc::new(RemoteCatalogManager::new(
engine_manager.clone(),
opts.node_id.context(MissingNodeIdSnafu)?,
kv_backend,
region_alive_keepers.clone(),
));
(catalog as CatalogManagerRef, None)
let handlers_executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler::default()),
Arc::new(OpenRegionHandler::new(
catalog_manager.clone(),
engine_manager.clone(),
region_alive_keepers.clone(),
)),
Arc::new(CloseRegionHandler::new(
catalog_manager.clone(),
engine_manager.clone(),
region_alive_keepers,
)),
]);
let heartbeat_task = Some(HeartbeatTask::new(
opts.node_id.context(MissingNodeIdSnafu)?,
opts.rpc_addr.clone(),
opts.rpc_hostname.clone(),
meta_client,
catalog_manager.clone(),
Arc::new(handlers_executor),
));
(catalog_manager as CatalogManagerRef, None, heartbeat_task)
}
};
let factory = QueryEngineFactory::new(catalog_manager.clone(), false);
let query_engine = factory.query_engine();
let handlers_executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler::default()),
Arc::new(OpenRegionHandler::new(
catalog_manager.clone(),
engine_manager.clone(),
)),
Arc::new(CloseRegionHandler::new(
catalog_manager.clone(),
engine_manager.clone(),
)),
]);
let heartbeat_task = match opts.mode {
Mode::Standalone => None,
Mode::Distributed => Some(HeartbeatTask::new(
opts.node_id.context(MissingNodeIdSnafu)?,
opts.rpc_addr.clone(),
opts.rpc_hostname.clone(),
meta_client.as_ref().unwrap().clone(),
catalog_manager.clone(),
Arc::new(handlers_executor),
)),
};
let procedure_manager =
create_procedure_manager(opts.node_id.unwrap_or(0), &opts.procedure, object_store)
.await?;

View File

@@ -19,14 +19,14 @@ use api::v1::greptime_request::Request as GrpcRequest;
use api::v1::meta::HeartbeatResponse;
use api::v1::query_request::Query;
use api::v1::QueryRequest;
use catalog::remote::region_alive_keeper::RegionAliveKeepers;
use catalog::CatalogManagerRef;
use common_meta::heartbeat::handler::{
HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor,
};
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta};
use common_meta::instruction::{
Instruction, InstructionReply, RegionIdent, SimpleReply, TableIdent,
};
use common_meta::ident::TableIdent;
use common_meta::instruction::{Instruction, InstructionReply, RegionIdent, SimpleReply};
use common_query::Output;
use datatypes::prelude::ConcreteDataType;
use servers::query_handler::grpc::GrpcQueryHandler;
@@ -61,7 +61,11 @@ async fn test_close_region_handler() {
} = prepare_handler_test("test_close_region_handler").await;
let executor = Arc::new(HandlerGroupExecutor::new(vec![Arc::new(
CloseRegionHandler::new(catalog_manager_ref.clone(), engine_manager_ref.clone()),
CloseRegionHandler::new(
catalog_manager_ref.clone(),
engine_manager_ref.clone(),
Arc::new(RegionAliveKeepers::new(engine_manager_ref.clone())),
),
)]));
prepare_table(instance.inner()).await;
@@ -127,14 +131,18 @@ async fn test_open_region_handler() {
..
} = prepare_handler_test("test_open_region_handler").await;
let region_alive_keeper = Arc::new(RegionAliveKeepers::new(engine_manager_ref.clone()));
let executor = Arc::new(HandlerGroupExecutor::new(vec![
Arc::new(OpenRegionHandler::new(
catalog_manager_ref.clone(),
engine_manager_ref.clone(),
region_alive_keeper.clone(),
)),
Arc::new(CloseRegionHandler::new(
catalog_manager_ref.clone(),
engine_manager_ref.clone(),
region_alive_keeper,
)),
]));

View File

@@ -18,7 +18,8 @@ use common_meta::error::Result as MetaResult;
use common_meta::heartbeat::handler::{
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
};
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply, TableIdent};
use common_meta::ident::TableIdent;
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_meta::table_name::TableName;
use common_telemetry::{error, info};
use partition::manager::TableRouteCacheInvalidatorRef;

View File

@@ -23,7 +23,8 @@ use common_meta::heartbeat::handler::{
HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor,
};
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta};
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply, TableIdent};
use common_meta::ident::TableIdent;
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_meta::table_name::TableName;
use partition::manager::TableRouteCacheInvalidator;
use tokio::sync::mpsc;

View File

@@ -12,7 +12,7 @@ common-error = { path = "../common/error" }
common-grpc = { path = "../common/grpc" }
common-telemetry = { path = "../common/telemetry" }
common-meta = { path = "../common/meta" }
etcd-client = "0.10"
etcd-client = "0.11"
rand.workspace = true
serde.workspace = true
serde_json.workspace = true

View File

@@ -24,7 +24,7 @@ common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
dashmap = "5.4"
derive_builder = "0.12"
etcd-client = "0.10"
etcd-client = "0.11"
futures.workspace = true
h2 = "0.3"
http-body = "0.4"

View File

@@ -130,6 +130,7 @@ impl Pushers {
.push(HeartbeatResponse {
header: Some(pusher.header()),
mailbox_message: Some(mailbox_message),
..Default::default()
})
.await
}
@@ -151,6 +152,7 @@ impl Pushers {
.push(HeartbeatResponse {
header: Some(pusher.header()),
mailbox_message: Some(mailbox_message),
..Default::default()
})
.await?;
}
@@ -232,6 +234,7 @@ impl HeartbeatHandlerGroup {
let res = HeartbeatResponse {
header,
mailbox_message: acc.into_mailbox_message(),
..Default::default()
};
Ok(res)
}

View File

@@ -19,7 +19,7 @@ use std::sync::Arc;
use api::v1::meta::{HeartbeatRequest, Role};
use async_trait::async_trait;
use common_catalog::consts::MITO_ENGINE;
use common_meta::instruction::TableIdent;
use common_meta::ident::TableIdent;
use common_meta::RegionIdent;
use table::engine::table_id;

View File

@@ -246,7 +246,7 @@ impl FailureDetectorContainer {
#[cfg(test)]
mod tests {
use common_catalog::consts::MITO_ENGINE;
use common_meta::instruction::TableIdent;
use common_meta::ident::TableIdent;
use rand::Rng;
use super::*;

View File

@@ -88,6 +88,7 @@ mod tests {
let res = HeartbeatResponse {
header,
mailbox_message: acc.into_mailbox_message(),
..Default::default()
};
assert_eq!(1, res.header.unwrap().cluster_id);
}

View File

@@ -343,7 +343,8 @@ mod tests {
use api::v1::meta::{HeartbeatResponse, MailboxMessage, Peer, RequestHeader};
use catalog::helper::TableGlobalKey;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE};
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply, TableIdent};
use common_meta::ident::TableIdent;
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_meta::DatanodeId;
use common_procedure::BoxedProcedure;
use rand::prelude::SliceRandom;

View File

@@ -14,7 +14,7 @@
use async_trait::async_trait;
use common_error::prelude::{ErrorExt, StatusCode};
use common_meta::instruction::TableIdent;
use common_meta::ident::TableIdent;
use common_meta::peer::Peer;
use common_meta::RegionIdent;
use common_telemetry::info;

View File

@@ -14,7 +14,8 @@
use api::v1::meta::MailboxMessage;
use async_trait::async_trait;
use common_meta::instruction::{Instruction, TableIdent};
use common_meta::ident::TableIdent;
use common_meta::instruction::Instruction;
use common_meta::RegionIdent;
use common_telemetry::info;
use serde::{Deserialize, Serialize};

View File

@@ -36,6 +36,7 @@ impl EmptyTable {
.primary_key_indices(req.primary_key_indices)
.next_column_id(0)
.options(req.table_options)
.region_numbers(req.region_numbers)
.build();
let table_info = TableInfoBuilder::default()
.catalog_name(req.catalog_name)

View File

@@ -77,7 +77,7 @@ impl MemTable {
.schema(schema)
.primary_key_indices(vec![])
.value_indices(vec![])
.engine("mock".to_string())
.engine("mito".to_string())
.next_column_id(0)
.engine_options(Default::default())
.options(Default::default())

View File

@@ -20,7 +20,7 @@ use api::v1::meta::Peer;
use catalog::helper::TableGlobalKey;
use catalog::remote::{CachedMetaKvBackend, Kv};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE};
use common_meta::instruction::TableIdent;
use common_meta::ident::TableIdent;
use common_meta::rpc::router::TableRoute;
use common_meta::table_name::TableName;
use common_meta::RegionIdent;