diff --git a/Cargo.lock b/Cargo.lock index cc28f66fab..b5ce49cf0c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2944,6 +2944,7 @@ dependencies = [ "api", "async-trait", "common-base", + "common-catalog", "common-error", "common-grpc", "common-runtime", @@ -2955,6 +2956,7 @@ dependencies = [ "http-body", "lazy_static", "parking_lot", + "prost 0.11.0", "regex", "serde", "serde_json", diff --git a/src/api/greptime/v1/meta/route.proto b/src/api/greptime/v1/meta/route.proto index 33976bf91d..61a636b26e 100644 --- a/src/api/greptime/v1/meta/route.proto +++ b/src/api/greptime/v1/meta/route.proto @@ -63,8 +63,9 @@ message RegionRoute { } message Table { - TableName table_name = 1; - bytes table_schema = 2; + uint64 id = 1; + TableName table_name = 2; + bytes table_schema = 3; } message Region { @@ -80,3 +81,9 @@ message Partition { repeated bytes column_list = 1; repeated bytes value_list = 2; } + +// This message is only for saving into store. +message TableRouteValue { + repeated Peer peers = 1; + TableRoute table_route = 2; +} diff --git a/src/api/src/serde.rs b/src/api/src/serde.rs index 2e65330b95..d97ed88dff 100644 --- a/src/api/src/serde.rs +++ b/src/api/src/serde.rs @@ -1,7 +1,11 @@ pub use prost::DecodeError; use prost::Message; -use crate::v1::codec::{InsertBatch, PhysicalPlanNode, RegionNumber, SelectResult}; +use crate::v1::codec::InsertBatch; +use crate::v1::codec::PhysicalPlanNode; +use crate::v1::codec::RegionNumber; +use crate::v1::codec::SelectResult; +use crate::v1::meta::TableRouteValue; macro_rules! impl_convert_with_bytes { ($data_type: ty) => { @@ -25,6 +29,7 @@ impl_convert_with_bytes!(InsertBatch); impl_convert_with_bytes!(SelectResult); impl_convert_with_bytes!(PhysicalPlanNode); impl_convert_with_bytes!(RegionNumber); +impl_convert_with_bytes!(TableRouteValue); #[cfg(test)] mod tests { diff --git a/src/api/src/v1/meta.rs b/src/api/src/v1/meta.rs index 6ba7f6e54d..5c2d0046ad 100644 --- a/src/api/src/v1/meta.rs +++ b/src/api/src/v1/meta.rs @@ -1,7 +1,47 @@ tonic::include_proto!("greptime.v1.meta"); +use std::collections::HashMap; +use std::hash::Hash; +use std::hash::Hasher; + pub const PROTOCOL_VERSION: u64 = 1; +#[derive(Default)] +pub struct PeerDict { + peers: HashMap, + index: usize, +} + +impl PeerDict { + pub fn get_or_insert(&mut self, peer: Peer) -> usize { + let index = self.peers.entry(peer).or_insert_with(|| { + let v = self.index; + self.index += 1; + v + }); + + *index + } + + pub fn into_peers(self) -> Vec { + let mut array = vec![Peer::default(); self.index]; + for (p, i) in self.peers { + array[i] = p; + } + array + } +} + +#[allow(clippy::derive_hash_xor_eq)] +impl Hash for Peer { + fn hash(&self, state: &mut H) { + self.id.hash(state); + self.addr.hash(state); + } +} + +impl Eq for Peer {} + impl RequestHeader { #[inline] pub fn new((cluster_id, member_id): (u64, u64)) -> Self { @@ -33,6 +73,21 @@ impl ResponseHeader { } } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ErrorCode { + NoActiveDatanodes = 1, +} + +impl Error { + #[inline] + pub fn no_active_datanodes() -> Self { + Self { + code: ErrorCode::NoActiveDatanodes as i32, + err_msg: "No active datanodes".to_string(), + } + } +} + macro_rules! gen_set_header { ($req: ty) => { impl $req { @@ -52,3 +107,59 @@ gen_set_header!(PutRequest); gen_set_header!(BatchPutRequest); gen_set_header!(CompareAndPutRequest); gen_set_header!(DeleteRangeRequest); + +#[cfg(test)] +mod tests { + use std::vec; + + use super::*; + + #[test] + fn test_peer_dict() { + let mut dict = PeerDict::default(); + + dict.get_or_insert(Peer { + id: 1, + addr: "111".to_string(), + }); + dict.get_or_insert(Peer { + id: 2, + addr: "222".to_string(), + }); + dict.get_or_insert(Peer { + id: 1, + addr: "111".to_string(), + }); + dict.get_or_insert(Peer { + id: 1, + addr: "111".to_string(), + }); + dict.get_or_insert(Peer { + id: 1, + addr: "111".to_string(), + }); + dict.get_or_insert(Peer { + id: 1, + addr: "111".to_string(), + }); + dict.get_or_insert(Peer { + id: 2, + addr: "222".to_string(), + }); + + assert_eq!(2, dict.index); + assert_eq!( + vec![ + Peer { + id: 1, + addr: "111".to_string(), + }, + Peer { + id: 2, + addr: "222".to_string(), + } + ], + dict.into_peers() + ); + } +} diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 0d1e3cbc19..72cc4251ad 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -372,14 +372,14 @@ mod tests { #[tokio::test] async fn test_ask_leader() { - let client = mocks::mock_client_with_noopstore().await; + let client = mocks::mock_client_with_memstore().await; let res = client.ask_leader().await; assert!(res.is_ok()); } #[tokio::test] async fn test_heartbeat() { - let client = mocks::mock_client_with_noopstore().await; + let client = mocks::mock_client_with_memstore().await; let (sender, mut receiver) = client.heartbeat().await.unwrap(); // send heartbeats tokio::spawn(async move { @@ -428,9 +428,10 @@ mod tests { } #[tokio::test] - async fn test_create_route() { + async fn test_route() { let selector = Arc::new(MockSelector {}); - let client = mocks::mock_client_with_selector(selector).await; + let client = mocks::mock_client_with_memorystore_and_selector(selector).await; + let p1 = Partition { column_list: vec![b"col_1".to_vec(), b"col_2".to_vec()], value_list: vec![b"k1".to_vec(), b"k2".to_vec()], @@ -440,29 +441,17 @@ mod tests { value_list: vec![b"Max1".to_vec(), b"Max2".to_vec()], }; let table_name = TableName::new("test_catalog", "test_schema", "test_table"); - let req = CreateRequest::new(table_name) + let req = CreateRequest::new(table_name.clone()) .add_partition(p1) .add_partition(p2); let res = client.create_route(req).await.unwrap(); + assert_eq!(1, res.table_routes.len()); - let table_routes = res.table_routes; - assert_eq!(1, table_routes.len()); - let table_route = table_routes.get(0).unwrap(); - let table = &table_route.table; - let table_name = &table.table_name; - assert_eq!("test_catalog", table_name.catalog_name); - assert_eq!("test_schema", table_name.schema_name); - assert_eq!("test_table", table_name.table_name); - - let region_routes = &table_route.region_routes; - assert_eq!(2, region_routes.len()); - let r0 = region_routes.get(0).unwrap(); - let r1 = region_routes.get(1).unwrap(); - assert_eq!(0, r0.region.as_ref().unwrap().id); - assert_eq!(1, r1.region.as_ref().unwrap().id); - assert_eq!("peer0", r0.leader_peer.as_ref().unwrap().addr); - assert_eq!("peer1", r1.leader_peer.as_ref().unwrap().addr); + let req = RouteRequest::new().add_table_name(table_name); + let res = client.route(req).await.unwrap(); + // empty table_routes since no TableGlobalValue is stored by datanode + assert!(res.table_routes.is_empty()); } async fn gen_data(client: &MetaClient) { diff --git a/src/meta-client/src/mocks.rs b/src/meta-client/src/mocks.rs index 7ec0f2f271..11982241fa 100644 --- a/src/meta-client/src/mocks.rs +++ b/src/meta-client/src/mocks.rs @@ -5,11 +5,6 @@ use meta_srv::mocks::MockInfo; use crate::client::MetaClient; use crate::client::MetaClientBuilder; -pub async fn mock_client_with_noopstore() -> MetaClient { - let mock_info = server_mock::mock_with_noopstore().await; - mock_client_by(mock_info).await -} - pub async fn mock_client_with_memstore() -> MetaClient { let mock_info = server_mock::mock_with_memstore().await; mock_client_by(mock_info).await @@ -21,8 +16,8 @@ pub async fn mock_client_with_etcdstore(addr: &str) -> MetaClient { mock_client_by(mock_info).await } -pub async fn mock_client_with_selector(selector: SelectorRef) -> MetaClient { - let mock_info = server_mock::mock_with_selector(selector).await; +pub async fn mock_client_with_memorystore_and_selector(selector: SelectorRef) -> MetaClient { + let mock_info = server_mock::mock_with_memstore_and_selector(selector).await; mock_client_by(mock_info).await } diff --git a/src/meta-client/src/rpc/router.rs b/src/meta-client/src/rpc/router.rs index 024aa9da92..6366e7d5a3 100644 --- a/src/meta-client/src/rpc/router.rs +++ b/src/meta-client/src/rpc/router.rs @@ -134,6 +134,7 @@ pub struct TableRoute { #[derive(Debug, Clone)] pub struct Table { + pub id: u64, pub table_name: TableName, pub table_schema: Vec, } @@ -149,6 +150,7 @@ impl TryFrom for Table { })? .into(); Ok(Self { + id: t.id, table_name, table_schema: t.table_schema, }) @@ -296,6 +298,7 @@ mod tests { ], table_routes: vec![PbTableRoute { table: Some(PbTable { + id: 1, table_name: Some(PbTableName { catalog_name: "c1".to_string(), schema_name: "s1".to_string(), @@ -324,6 +327,7 @@ mod tests { assert_eq!(1, table_routes.len()); let table_route = table_routes.remove(0); let table = table_route.table; + assert_eq!(1, table.id); assert_eq!("c1", table.table_name.catalog_name); assert_eq!("s1", table.table_name.schema_name); assert_eq!("t1", table.table_name.table_name); diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index cab35cf317..af8a87c861 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -10,6 +10,7 @@ mock = [] api = { path = "../api" } async-trait = "0.1" common-base = { path = "../common/base" } +common-catalog = { path = "../common/catalog" } common-error = { path = "../common/error" } common-grpc = { path = "../common/grpc" } common-runtime = { path = "../common/runtime" } @@ -21,6 +22,7 @@ h2 = "0.3" http-body = "0.4" lazy_static = "1.4" parking_lot = "0.12" +prost = "0.11" regex = "1.6" serde = "1.0" serde_json = "1.0" diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 25d2dd49ee..fcbab4ed58 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -79,6 +79,33 @@ pub enum Error { err_msg: String, backtrace: Backtrace, }, + + #[snafu(display("Cannot parse catalog value, source: {}", source))] + InvalidCatalogValue { + #[snafu(backtrace)] + source: common_catalog::error::Error, + }, + + #[snafu(display("Unexcepted sequence value: {}", err_msg))] + UnexceptedSequenceValue { + err_msg: String, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to decode table route, source: {}", source))] + DecodeTableRoute { + source: prost::DecodeError, + backtrace: Backtrace, + }, + + #[snafu(display("Table route not found: {}", key))] + TableRouteNotFound { key: String, backtrace: Backtrace }, + + #[snafu(display("Failed to get sequence: {}", err_msg))] + NextSequence { + err_msg: String, + backtrace: Backtrace, + }, } pub type Result = std::result::Result; @@ -106,15 +133,19 @@ impl ErrorExt for Error { | Error::TcpBind { .. } | Error::SerializeToJson { .. } | Error::DeserializeFromJson { .. } + | Error::DecodeTableRoute { .. } | Error::StartGrpc { .. } => StatusCode::Internal, Error::EmptyKey { .. } | Error::EmptyTableName { .. } | Error::InvalidLeaseKey { .. } | Error::ParseNum { .. } | Error::InvalidArguments { .. } => StatusCode::InvalidArguments, - Error::LeaseKeyFromUtf8 { .. } | Error::InvalidTxnResult { .. } => { - StatusCode::Unexpected - } + Error::LeaseKeyFromUtf8 { .. } + | Error::UnexceptedSequenceValue { .. } + | Error::TableRouteNotFound { .. } + | Error::NextSequence { .. } + | Error::InvalidTxnResult { .. } => StatusCode::Unexpected, + Error::InvalidCatalogValue { source, .. } => source.status_code(), } } } diff --git a/src/meta-srv/src/handler/datanode_lease.rs b/src/meta-srv/src/handler/datanode_lease.rs index ea1ae6d4ee..45c88f0278 100644 --- a/src/meta-srv/src/handler/datanode_lease.rs +++ b/src/meta-srv/src/handler/datanode_lease.rs @@ -31,7 +31,7 @@ impl HeartbeatHandler for DatanodeLeaseHandler { node_addr: peer.addr.clone(), }; - info!("Receive a heartbeat from datanode: {:?}, {:?}", key, value); + info!("Receive a heartbeat: {:?}, {:?}", key, value); let key = key.try_into()?; let value = value.try_into()?; @@ -41,8 +41,7 @@ impl HeartbeatHandler for DatanodeLeaseHandler { ..Default::default() }; - let kv_store = ctx.kv_store(); - let _ = kv_store.put(put).await?; + let _ = ctx.kv_store().put(put).await?; } Ok(()) diff --git a/src/meta-srv/src/handler/response_header.rs b/src/meta-srv/src/handler/response_header.rs index 7692bed55f..76fa7b2508 100644 --- a/src/meta-srv/src/handler/response_header.rs +++ b/src/meta-srv/src/handler/response_header.rs @@ -35,11 +35,12 @@ mod tests { use api::v1::meta::{HeartbeatResponse, RequestHeader}; use super::*; - use crate::{handler::Context, service::store::noop::NoopKvStore}; + use crate::handler::Context; + use crate::service::store::memory::MemStore; #[tokio::test] async fn test_handle_heartbeat_resp_header() { - let kv_store = Arc::new(NoopKvStore {}); + let kv_store = Arc::new(MemStore::new()); let ctx = Context { server_addr: "0.0.0.0:0000".to_string(), kv_store, diff --git a/src/meta-srv/src/keys.rs b/src/meta-srv/src/keys.rs index c6ecec4b2f..ff25d55ba8 100644 --- a/src/meta-srv/src/keys.rs +++ b/src/meta-srv/src/keys.rs @@ -1,5 +1,7 @@ use std::str::FromStr; +use api::v1::meta::TableName; +use common_catalog::TableGlobalKey; use lazy_static::lazy_static; use regex::Regex; use serde::Deserialize; @@ -12,6 +14,8 @@ use crate::error; use crate::error::Result; pub(crate) const DN_LEASE_PREFIX: &str = "__meta_dnlease"; +pub(crate) const SEQ_PREFIX: &str = "__meta_seq"; +pub(crate) const TABLE_ROUTE_PREFIX: &str = "__meta_table_route"; lazy_static! { static ref DATANODE_KEY_PATTERN: Regex = @@ -108,6 +112,44 @@ impl TryFrom for Vec { } } +pub struct TableRouteKey<'a> { + pub table_id: u64, + pub catalog_name: &'a str, + pub schema_name: &'a str, + pub table_name: &'a str, +} + +impl<'a> TableRouteKey<'a> { + pub fn with_table_name(table_id: u64, t: &'a TableName) -> Self { + Self { + table_id, + catalog_name: &t.catalog_name, + schema_name: &t.schema_name, + table_name: &t.table_name, + } + } + + pub fn with_table_global_key(table_id: u64, t: &'a TableGlobalKey) -> Self { + Self { + table_id, + catalog_name: &t.catalog_name, + schema_name: &t.schema_name, + table_name: &t.table_name, + } + } + + pub fn prefix(&self) -> String { + format!( + "{}-{}-{}-{}", + TABLE_ROUTE_PREFIX, self.catalog_name, self.schema_name, self.table_name + ) + } + + pub fn key(&self) -> String { + format!("{}-{}", self.prefix(), self.table_id) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/meta-srv/src/lease.rs b/src/meta-srv/src/lease.rs index d57f258243..1e90e201d2 100644 --- a/src/meta-srv/src/lease.rs +++ b/src/meta-srv/src/lease.rs @@ -1,5 +1,4 @@ use api::v1::meta::RangeRequest; -use api::v1::meta::RangeResponse; use crate::error::Result; use crate::keys::LeaseKey; @@ -10,7 +9,7 @@ use crate::util; pub async fn alive_datanodes

( cluster_id: u64, - kv_store: KvStoreRef, + kv_store: &KvStoreRef, predicate: P, ) -> Result> where @@ -26,7 +25,7 @@ where let res = kv_store.range(req).await?; - let RangeResponse { kvs, .. } = res; + let kvs = res.kvs; let mut lease_kvs = vec![]; for kv in kvs { let lease_key: LeaseKey = kv.key.try_into()?; diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index f67ce21989..b2e256348d 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -3,11 +3,12 @@ pub mod bootstrap; pub mod error; pub mod handler; mod keys; -pub mod lease; +mod lease; pub mod metasrv; #[cfg(feature = "mock")] pub mod mocks; pub mod selector; +mod sequence; pub mod service; mod util; diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index c7ab3d5454..c7d886a61f 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -9,8 +9,12 @@ use crate::handler::response_header::ResponseHeaderHandler; use crate::handler::HeartbeatHandlerGroup; use crate::selector::lease_based::LeaseBasedSelector; use crate::selector::Selector; +use crate::sequence::Sequence; +use crate::sequence::SequenceRef; use crate::service::store::kv::KvStoreRef; +pub const TABLE_ID_SEQ: &str = "table_id"; + #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct MetaSrvOptions { pub bind_addr: String, @@ -24,7 +28,7 @@ impl Default for MetaSrvOptions { Self { bind_addr: "0.0.0.0:3002".to_string(), server_addr: "0.0.0.0:3002".to_string(), - store_addr: "0.0.0.0:2380".to_string(), + store_addr: "0.0.0.0:2379".to_string(), datanode_lease_secs: 15, } } @@ -42,6 +46,7 @@ pub type SelectorRef = Arc>>; pub struct MetaSrv { options: MetaSrvOptions, kv_store: KvStoreRef, + table_id_sequence: SequenceRef, selector: SelectorRef, handler_group: HeartbeatHandlerGroup, } @@ -52,6 +57,7 @@ impl MetaSrv { kv_store: KvStoreRef, selector: Option, ) -> Self { + let table_id_sequence = Arc::new(Sequence::new(TABLE_ID_SEQ, 10, kv_store.clone())); let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector {})); let handler_group = HeartbeatHandlerGroup::default(); handler_group.add_handler(ResponseHeaderHandler).await; @@ -60,6 +66,7 @@ impl MetaSrv { Self { options, kv_store, + table_id_sequence, selector, handler_group, } @@ -75,6 +82,11 @@ impl MetaSrv { self.kv_store.clone() } + #[inline] + pub fn table_id_sequence(&self) -> SequenceRef { + self.table_id_sequence.clone() + } + #[inline] pub fn selector(&self) -> SelectorRef { self.selector.clone() diff --git a/src/meta-srv/src/mocks.rs b/src/meta-srv/src/mocks.rs index cc44d210cf..95101250e7 100644 --- a/src/meta-srv/src/mocks.rs +++ b/src/meta-srv/src/mocks.rs @@ -13,18 +13,12 @@ use crate::metasrv::SelectorRef; use crate::service::store::etcd::EtcdStore; use crate::service::store::kv::KvStoreRef; use crate::service::store::memory::MemStore; -use crate::service::store::noop::NoopKvStore; pub struct MockInfo { pub server_addr: String, pub channel_manager: ChannelManager, } -pub async fn mock_with_noopstore() -> MockInfo { - let kv_store = Arc::new(NoopKvStore {}); - mock(Default::default(), kv_store, None).await -} - pub async fn mock_with_memstore() -> MockInfo { let kv_store = Arc::new(MemStore::default()); mock(Default::default(), kv_store, None).await @@ -35,8 +29,8 @@ pub async fn mock_with_etcdstore(addr: &str) -> MockInfo { mock(Default::default(), kv_store, None).await } -pub async fn mock_with_selector(selector: SelectorRef) -> MockInfo { - let kv_store = Arc::new(NoopKvStore {}); +pub async fn mock_with_memstore_and_selector(selector: SelectorRef) -> MockInfo { + let kv_store = Arc::new(MemStore::default()); mock(Default::default(), kv_store, Some(selector)).await } diff --git a/src/meta-srv/src/selector/lease_based.rs b/src/meta-srv/src/selector/lease_based.rs index 2065654c81..c59e2586f8 100644 --- a/src/meta-srv/src/selector/lease_based.rs +++ b/src/meta-srv/src/selector/lease_based.rs @@ -21,7 +21,7 @@ impl Selector for LeaseBasedSelector { let lease_filter = |_: &LeaseKey, v: &LeaseValue| { time_util::current_time_millis() - v.timestamp_millis < ctx.datanode_lease_secs }; - let mut lease_kvs = lease::alive_datanodes(ns, ctx.kv_store.clone(), lease_filter).await?; + let mut lease_kvs = lease::alive_datanodes(ns, &ctx.kv_store, lease_filter).await?; // TODO(jiachun): At the moment we are just pushing the latest to the forefront, // and it is better to use load-based strategies in the future. lease_kvs.sort_by(|a, b| b.1.timestamp_millis.cmp(&a.1.timestamp_millis)); diff --git a/src/meta-srv/src/sequence.rs b/src/meta-srv/src/sequence.rs new file mode 100644 index 0000000000..44e89ce4f9 --- /dev/null +++ b/src/meta-srv/src/sequence.rs @@ -0,0 +1,197 @@ +use std::ops::Range; +use std::sync::Arc; + +use api::v1::meta::CompareAndPutRequest; +use snafu::ensure; +use tokio::sync::Mutex; + +use crate::error::{self, Result}; +use crate::keys; +use crate::service::store::kv::KvStoreRef; + +pub type SequenceRef = Arc; + +pub struct Sequence { + inner: Mutex, +} + +impl Sequence { + pub fn new(name: impl AsRef, step: u64, generator: KvStoreRef) -> Self { + let name = format!("{}-{}", keys::SEQ_PREFIX, name.as_ref()); + let step = step.max(1); + Self { + inner: Mutex::new(Inner { + name, + generator, + next: 0, + step, + range: None, + force_quit: 1024, + }), + } + } + + pub async fn next(&self) -> Result { + let mut inner = self.inner.lock().await; + inner.next().await + } +} + +struct Inner { + name: String, + generator: KvStoreRef, + // The next available sequences(if it is in the range, + // otherwise it need to fetch from generator again). + next: u64, + // Fetch several sequences at once: [start, start + step). + step: u64, + // The range of available sequences for the local cache. + range: Option>, + // Used to avoid dead loops. + force_quit: usize, +} + +impl Inner { + /// 1. returns the `next` value directly if it is in the `range` (local cache) + /// 2. fetch(CAS) next `range` from the `generator` + /// 3. jump to step 1 + pub async fn next(&mut self) -> Result { + for _ in 0..self.force_quit { + match &self.range { + Some(range) => { + if range.contains(&self.next) { + let res = Ok(self.next); + self.next += 1; + return res; + } + self.range = None; + } + None => { + let range = self.next_range().await?; + self.next = range.start; + self.range = Some(range); + } + } + } + + error::NextSequenceSnafu { + err_msg: format!("{}.next()", &self.name), + } + .fail() + } + + pub async fn next_range(&self) -> Result> { + let key = self.name.as_bytes(); + let mut start = self.next; + for _ in 0..self.force_quit { + let expect = if start == 0 { + vec![] + } else { + u64::to_le_bytes(start).to_vec() + }; + let value = u64::to_le_bytes(start + self.step); + + let req = CompareAndPutRequest { + key: key.to_vec(), + expect, + value: value.to_vec(), + ..Default::default() + }; + + let res = self.generator.compare_and_put(req).await?; + + if !res.success { + if let Some(kv) = res.prev_kv { + let value = kv.value; + ensure!( + value.len() == std::mem::size_of::(), + error::UnexceptedSequenceValueSnafu { + err_msg: format!("key={}, unexpected value={:?}", self.name, value) + } + ); + start = u64::from_le_bytes(value.try_into().unwrap()); + } else { + start = 0; + } + continue; + } + + return Ok(Range { + start, + end: start + self.step, + }); + } + + error::NextSequenceSnafu { + err_msg: format!("{}.next_range()", &self.name), + } + .fail() + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + use crate::service::store::{kv::KvStore, memory::MemStore}; + + #[tokio::test] + async fn test_sequence() { + let kv_store = Arc::new(MemStore::new()); + let seq = Sequence::new("test_seq", 10, kv_store); + + for i in 0..100 { + assert_eq!(i, seq.next().await.unwrap()); + } + } + + #[tokio::test] + async fn test_sequence_fouce_quit() { + struct Noop; + + #[async_trait::async_trait] + impl KvStore for Noop { + async fn range( + &self, + _: api::v1::meta::RangeRequest, + ) -> Result { + unreachable!() + } + + async fn put( + &self, + _: api::v1::meta::PutRequest, + ) -> Result { + unreachable!() + } + + async fn batch_put( + &self, + _: api::v1::meta::BatchPutRequest, + ) -> Result { + unreachable!() + } + + async fn compare_and_put( + &self, + _: CompareAndPutRequest, + ) -> Result { + Ok(api::v1::meta::CompareAndPutResponse::default()) + } + + async fn delete_range( + &self, + _: api::v1::meta::DeleteRangeRequest, + ) -> Result { + unreachable!() + } + } + + let kv_store = Arc::new(Noop {}); + let seq = Sequence::new("test_seq", 10, kv_store); + + let next = seq.next().await; + assert!(next.is_err()); + } +} diff --git a/src/meta-srv/src/service/heartbeat.rs b/src/meta-srv/src/service/heartbeat.rs index 462ef69519..82f91ec7d2 100644 --- a/src/meta-srv/src/service/heartbeat.rs +++ b/src/meta-srv/src/service/heartbeat.rs @@ -142,11 +142,11 @@ mod tests { use super::*; use crate::metasrv::MetaSrvOptions; - use crate::service::store::noop::NoopKvStore; + use crate::service::store::memory::MemStore; #[tokio::test] async fn test_ask_leader() { - let kv_store = Arc::new(NoopKvStore {}); + let kv_store = Arc::new(MemStore::new()); let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None).await; let req = AskLeaderRequest { diff --git a/src/meta-srv/src/service/router.rs b/src/meta-srv/src/service/router.rs index abc7e4a03d..1eb861151c 100644 --- a/src/meta-srv/src/service/router.rs +++ b/src/meta-srv/src/service/router.rs @@ -1,5 +1,9 @@ use api::v1::meta::router_server; use api::v1::meta::CreateRequest; +use api::v1::meta::Error; +use api::v1::meta::PeerDict; +use api::v1::meta::PutRequest; +use api::v1::meta::RangeRequest; use api::v1::meta::Region; use api::v1::meta::RegionRoute; use api::v1::meta::ResponseHeader; @@ -7,22 +11,31 @@ use api::v1::meta::RouteRequest; use api::v1::meta::RouteResponse; use api::v1::meta::Table; use api::v1::meta::TableRoute; +use api::v1::meta::TableRouteValue; +use common_catalog::TableGlobalKey; +use common_catalog::TableGlobalValue; +use common_telemetry::warn; use snafu::OptionExt; +use snafu::ResultExt; use tonic::Request; use tonic::Response; +use super::store::kv::KvStoreRef; use super::GrpcResult; use crate::error; use crate::error::Result; +use crate::keys::TableRouteKey; use crate::metasrv::Context; use crate::metasrv::MetaSrv; use crate::metasrv::SelectorRef; +use crate::sequence::SequenceRef; #[async_trait::async_trait] impl router_server::Router for MetaSrv { async fn route(&self, req: Request) -> GrpcResult { let req = req.into_inner(); - let res = handle_route(req).await?; + let ctx = self.new_ctx(); + let res = handle_route(req, ctx).await?; Ok(Response::new(res)) } @@ -31,20 +44,68 @@ impl router_server::Router for MetaSrv { let req = req.into_inner(); let ctx = self.new_ctx(); let selector = self.selector(); - let res = handle_create(req, ctx, selector).await?; + let table_id_sequence = self.table_id_sequence(); + let res = handle_create(req, ctx, selector, table_id_sequence).await?; Ok(Response::new(res)) } } -async fn handle_route(_req: RouteRequest) -> Result { - todo!() +async fn handle_route(req: RouteRequest, ctx: Context) -> Result { + let RouteRequest { + header, + table_names, + } = req; + let cluster_id = header.as_ref().map_or(0, |h| h.cluster_id); + let table_global_keys = table_names.into_iter().map(|t| TableGlobalKey { + catalog_name: t.catalog_name, + schema_name: t.schema_name, + table_name: t.table_name, + }); + let tables = fetch_tables(&ctx.kv_store, table_global_keys).await?; + + let mut peer_dict = PeerDict::default(); + let mut table_routes = vec![]; + for (tg, tr) in tables { + let TableRouteValue { + peers, + mut table_route, + } = tr; + if let Some(ref mut table_route) = table_route { + for rr in &mut table_route.region_routes { + if let Some(peer) = peers.get(rr.leader_peer_index as usize) { + rr.leader_peer_index = peer_dict.get_or_insert(peer.clone()) as u64; + } + for index in &mut rr.follower_peer_indexes { + if let Some(peer) = peers.get(*index as usize) { + *index = peer_dict.get_or_insert(peer.clone()) as u64; + } + } + } + + if let Some(ref mut table) = table_route.table { + table.table_schema = tg.as_bytes().context(error::InvalidCatalogValueSnafu)?; + } + } + if let Some(table_route) = table_route { + table_routes.push(table_route) + } + } + let peers = peer_dict.into_peers(); + + let header = Some(ResponseHeader::success(cluster_id)); + Ok(RouteResponse { + header, + peers, + table_routes, + }) } async fn handle_create( req: CreateRequest, ctx: Context, selector: SelectorRef, + table_id_sequence: SequenceRef, ) -> Result { let CreateRequest { header, @@ -55,22 +116,37 @@ async fn handle_create( let cluster_id = header.as_ref().map_or(0, |h| h.cluster_id); let peers = selector.select(cluster_id, &ctx).await?; + if peers.is_empty() { + let header = Some(ResponseHeader::failed( + cluster_id, + Error::no_active_datanodes(), + )); + return Ok(RouteResponse { + header, + ..Default::default() + }); + } + let id = table_id_sequence.next().await?; + let table_route_key = TableRouteKey::with_table_name(id, &table_name) + .key() + .into_bytes(); let table = Table { + id, table_name: Some(table_name), ..Default::default() }; - let region_num = partitions.len(); - let mut region_routes = Vec::with_capacity(region_num); - for i in 0..region_num { + let mut region_routes = Vec::with_capacity(partitions.len()); + for (i, partition) in partitions.into_iter().enumerate() { let region = Region { id: i as u64, + partition: Some(partition), ..Default::default() }; let region_route = RegionRoute { region: Some(region), leader_peer_index: (i % peers.len()) as u64, - follower_peer_indexes: vec![(i % peers.len()) as u64], + follower_peer_indexes: vec![], // follower_peers is not supported at the moment }; region_routes.push(region_route); } @@ -79,6 +155,13 @@ async fn handle_create( region_routes, }; + // save table route data into meta store + let table_route_value = TableRouteValue { + peers: peers.clone(), + table_route: Some(table_route.clone()), + }; + put_into_store(&ctx.kv_store, table_route_key, table_route_value).await?; + let header = Some(ResponseHeader::success(cluster_id)); Ok(RouteResponse { header, @@ -87,112 +170,89 @@ async fn handle_create( }) } -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use api::v1::meta::router_server::Router; - use api::v1::meta::*; - use tonic::IntoRequest; - - use super::*; - use crate::metasrv::MetaSrvOptions; - use crate::selector::{Namespace, Selector}; - use crate::service::store::noop::NoopKvStore; - - #[should_panic] - #[tokio::test] - async fn test_handle_route() { - let kv_store = Arc::new(NoopKvStore {}); - let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None).await; - - let req = RouteRequest { - header: Some(RequestHeader::new((1, 1))), - table_names: vec![ - TableName { - catalog_name: "catalog1".to_string(), - schema_name: "schema1".to_string(), - table_name: "table1".to_string(), - }, - TableName { - catalog_name: "catalog1".to_string(), - schema_name: "schema1".to_string(), - table_name: "table2".to_string(), - }, - TableName { - catalog_name: "catalog1".to_string(), - schema_name: "schema1".to_string(), - table_name: "table3".to_string(), - }, - ], - }; - - let _res = meta_srv.route(req.into_request()).await.unwrap(); - } - - struct MockSelector; - - #[async_trait::async_trait] - impl Selector for MockSelector { - type Context = Context; - type Output = Vec; - - async fn select(&self, _ns: Namespace, _ctx: &Self::Context) -> Result { - Ok(vec![ - Peer { - id: 0, - addr: "127.0.0.1:3000".to_string(), - }, - Peer { - id: 1, - addr: "127.0.0.1:3001".to_string(), - }, - ]) +async fn fetch_tables( + kv_store: &KvStoreRef, + keys: impl Iterator, +) -> Result> { + let mut tables = vec![]; + // Maybe we can optimize the for loop in the future, but in general, + // there won't be many keys, in fact, there is usually just one. + for tk in keys { + let tv = get_table_global_value(kv_store, &tk).await?; + if tv.is_none() { + warn!("Table global value is absent: {}", tk); + continue; } + let tv = tv.unwrap(); + + let table_id = tv.id as u64; + let tr_key = TableRouteKey::with_table_global_key(table_id, &tk); + let tr = get_table_route_value(kv_store, &tr_key).await?; + + tables.push((tv, tr)); } - #[tokio::test] - async fn test_handle_create() { - let kv_store = Arc::new(NoopKvStore {}); - let table_name = TableName { - catalog_name: "test_catalog".to_string(), - schema_name: "test_db".to_string(), - table_name: "table1".to_string(), - }; - let p0 = Partition { - column_list: vec![b"col1".to_vec(), b"col2".to_vec()], - value_list: vec![b"v1".to_vec(), b"v2".to_vec()], - }; - let p1 = Partition { - column_list: vec![b"col1".to_vec(), b"col2".to_vec()], - value_list: vec![b"v11".to_vec(), b"v22".to_vec()], - }; - let req = CreateRequest { - header: Some(RequestHeader::new((1, 1))), - table_name: Some(table_name), - partitions: vec![p0, p1], - }; - let ctx = Context { - datanode_lease_secs: 10, - kv_store, - }; - let selector = Arc::new(MockSelector {}); - let res = handle_create(req, ctx, selector).await.unwrap(); + Ok(tables) +} - assert_eq!( - vec![ - Peer { - id: 0, - addr: "127.0.0.1:3000".to_string(), - }, - Peer { - id: 1, - addr: "127.0.0.1:3001".to_string(), - }, - ], - res.peers - ); - assert_eq!(1, res.table_routes.len()); - assert_eq!(2, res.table_routes.get(0).unwrap().region_routes.len()); +async fn get_table_route_value( + kv_store: &KvStoreRef, + key: &TableRouteKey<'_>, +) -> Result { + let tr = get_from_store(kv_store, key.key().into_bytes()) + .await? + .context(error::TableRouteNotFoundSnafu { key: key.key() })?; + let tr: TableRouteValue = tr + .as_slice() + .try_into() + .context(error::DecodeTableRouteSnafu)?; + + Ok(tr) +} + +async fn get_table_global_value( + kv_store: &KvStoreRef, + key: &TableGlobalKey, +) -> Result> { + let tg_key = format!("{}", key).into_bytes(); + let tv = get_from_store(kv_store, tg_key).await?; + match tv { + Some(tv) => { + let tv = TableGlobalValue::parse(&String::from_utf8_lossy(&tv)) + .context(error::InvalidCatalogValueSnafu)?; + Ok(Some(tv)) + } + None => Ok(None), + } +} + +async fn put_into_store( + kv_store: &KvStoreRef, + key: impl Into>, + value: impl Into>, +) -> Result<()> { + let key = key.into(); + let value = value.into(); + let put_req = PutRequest { + key, + value, + ..Default::default() + }; + let _ = kv_store.put(put_req).await?; + + Ok(()) +} + +async fn get_from_store(kv_store: &KvStoreRef, key: Vec) -> Result>> { + let req = RangeRequest { + key, + ..Default::default() + }; + let res = kv_store.range(req).await?; + let mut kvs = res.kvs; + if kvs.is_empty() { + Ok(None) + } else { + Ok(Some(kvs.pop().unwrap().value)) } } diff --git a/src/meta-srv/src/service/store.rs b/src/meta-srv/src/service/store.rs index 70c56194a7..290df1a6ca 100644 --- a/src/meta-srv/src/service/store.rs +++ b/src/meta-srv/src/service/store.rs @@ -1,7 +1,6 @@ pub mod etcd; pub mod kv; pub mod memory; -pub mod noop; use api::v1::meta::store_server; use api::v1::meta::BatchPutRequest; @@ -74,11 +73,11 @@ mod tests { use super::*; use crate::metasrv::MetaSrvOptions; - use crate::service::store::noop::NoopKvStore; + use crate::service::store::memory::MemStore; #[tokio::test] async fn test_range() { - let kv_store = Arc::new(NoopKvStore {}); + let kv_store = Arc::new(MemStore::new()); let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None).await; let req = RangeRequest::default(); let res = meta_srv.range(req.into_request()).await; @@ -88,7 +87,7 @@ mod tests { #[tokio::test] async fn test_put() { - let kv_store = Arc::new(NoopKvStore {}); + let kv_store = Arc::new(MemStore::new()); let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None).await; let req = PutRequest::default(); let res = meta_srv.put(req.into_request()).await; @@ -98,7 +97,7 @@ mod tests { #[tokio::test] async fn test_batch_put() { - let kv_store = Arc::new(NoopKvStore {}); + let kv_store = Arc::new(MemStore::new()); let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None).await; let req = BatchPutRequest::default(); let res = meta_srv.batch_put(req.into_request()).await; @@ -108,7 +107,7 @@ mod tests { #[tokio::test] async fn test_compare_and_put() { - let kv_store = Arc::new(NoopKvStore {}); + let kv_store = Arc::new(MemStore::new()); let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None).await; let req = CompareAndPutRequest::default(); let res = meta_srv.compare_and_put(req.into_request()).await; @@ -118,7 +117,7 @@ mod tests { #[tokio::test] async fn test_delete_range() { - let kv_store = Arc::new(NoopKvStore {}); + let kv_store = Arc::new(MemStore::new()); let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None).await; let req = DeleteRangeRequest::default(); let res = meta_srv.delete_range(req.into_request()).await; diff --git a/src/meta-srv/src/service/store/memory.rs b/src/meta-srv/src/service/store/memory.rs index 8721789105..294abc32e0 100644 --- a/src/meta-srv/src/service/store/memory.rs +++ b/src/meta-srv/src/service/store/memory.rs @@ -1,4 +1,4 @@ -use std::cmp::Ordering; +use std::collections::btree_map::Entry; use std::collections::BTreeMap; use std::ops::Range; use std::sync::Arc; @@ -146,19 +146,26 @@ impl KvStore for MemStore { let mut memory = self.inner.write(); - let prev_val = memory.get(&key); + let (success, prev_kv) = match memory.entry(key) { + Entry::Vacant(e) => { + let success = expect.is_empty(); + if success { + e.insert(value); + } + (success, None) + } + Entry::Occupied(mut e) => { + let key = e.key().clone(); + let prev_val = e.get().clone(); + let success = prev_val == expect; + if success { + e.insert(value); + } + (success, Some((key, prev_val))) + } + }; - let success = prev_val - .map(|v| expect.cmp(v) == Ordering::Equal) - .unwrap_or(false | expect.is_empty()); - let prev_kv = prev_val.map(|v| KeyValue { - key: key.clone(), - value: v.clone(), - }); - - if success { - memory.insert(key, value); - } + let prev_kv = prev_kv.map(|(key, value)| KeyValue { key, value }); let cluster_id = header.map_or(0, |h| h.cluster_id); let header = Some(ResponseHeader::success(cluster_id)); diff --git a/src/meta-srv/src/service/store/noop.rs b/src/meta-srv/src/service/store/noop.rs deleted file mode 100644 index e113ebfdcb..0000000000 --- a/src/meta-srv/src/service/store/noop.rs +++ /dev/null @@ -1,41 +0,0 @@ -use api::v1::meta::BatchPutRequest; -use api::v1::meta::BatchPutResponse; -use api::v1::meta::CompareAndPutRequest; -use api::v1::meta::CompareAndPutResponse; -use api::v1::meta::DeleteRangeRequest; -use api::v1::meta::DeleteRangeResponse; -use api::v1::meta::PutRequest; -use api::v1::meta::PutResponse; -use api::v1::meta::RangeRequest; -use api::v1::meta::RangeResponse; - -use super::kv::KvStore; -use crate::error::Result; - -/// A noop kv_store which only for test -// TODO(jiachun): Add a test feature -#[derive(Clone)] -pub struct NoopKvStore; - -#[async_trait::async_trait] -impl KvStore for NoopKvStore { - async fn range(&self, _req: RangeRequest) -> Result { - Ok(RangeResponse::default()) - } - - async fn put(&self, _req: PutRequest) -> Result { - Ok(PutResponse::default()) - } - - async fn batch_put(&self, _req: BatchPutRequest) -> Result { - Ok(BatchPutResponse::default()) - } - - async fn compare_and_put(&self, _req: CompareAndPutRequest) -> Result { - Ok(CompareAndPutResponse::default()) - } - - async fn delete_range(&self, _req: DeleteRangeRequest) -> Result { - Ok(DeleteRangeResponse::default()) - } -}