feat: route for insert&select (#425)

* feat: route for insert/select

* chore: remove redundant tests

* chore: add fouce quit loop count limit to sequence

* chore: by code review

* chore: use ref with TableRouteKey

* chore: minor refactor
This commit is contained in:
Jiachun Feng
2022-11-10 16:13:15 +08:00
committed by GitHub
parent 49403012b5
commit 23f0320ffb
23 changed files with 644 additions and 228 deletions

2
Cargo.lock generated
View File

@@ -2944,6 +2944,7 @@ dependencies = [
"api",
"async-trait",
"common-base",
"common-catalog",
"common-error",
"common-grpc",
"common-runtime",
@@ -2955,6 +2956,7 @@ dependencies = [
"http-body",
"lazy_static",
"parking_lot",
"prost 0.11.0",
"regex",
"serde",
"serde_json",

View File

@@ -63,8 +63,9 @@ message RegionRoute {
}
message Table {
TableName table_name = 1;
bytes table_schema = 2;
uint64 id = 1;
TableName table_name = 2;
bytes table_schema = 3;
}
message Region {
@@ -80,3 +81,9 @@ message Partition {
repeated bytes column_list = 1;
repeated bytes value_list = 2;
}
// This message is only for saving into store.
message TableRouteValue {
repeated Peer peers = 1;
TableRoute table_route = 2;
}

View File

@@ -1,7 +1,11 @@
pub use prost::DecodeError;
use prost::Message;
use crate::v1::codec::{InsertBatch, PhysicalPlanNode, RegionNumber, SelectResult};
use crate::v1::codec::InsertBatch;
use crate::v1::codec::PhysicalPlanNode;
use crate::v1::codec::RegionNumber;
use crate::v1::codec::SelectResult;
use crate::v1::meta::TableRouteValue;
macro_rules! impl_convert_with_bytes {
($data_type: ty) => {
@@ -25,6 +29,7 @@ impl_convert_with_bytes!(InsertBatch);
impl_convert_with_bytes!(SelectResult);
impl_convert_with_bytes!(PhysicalPlanNode);
impl_convert_with_bytes!(RegionNumber);
impl_convert_with_bytes!(TableRouteValue);
#[cfg(test)]
mod tests {

View File

@@ -1,7 +1,47 @@
tonic::include_proto!("greptime.v1.meta");
use std::collections::HashMap;
use std::hash::Hash;
use std::hash::Hasher;
pub const PROTOCOL_VERSION: u64 = 1;
#[derive(Default)]
pub struct PeerDict {
peers: HashMap<Peer, usize>,
index: usize,
}
impl PeerDict {
pub fn get_or_insert(&mut self, peer: Peer) -> usize {
let index = self.peers.entry(peer).or_insert_with(|| {
let v = self.index;
self.index += 1;
v
});
*index
}
pub fn into_peers(self) -> Vec<Peer> {
let mut array = vec![Peer::default(); self.index];
for (p, i) in self.peers {
array[i] = p;
}
array
}
}
#[allow(clippy::derive_hash_xor_eq)]
impl Hash for Peer {
fn hash<H: Hasher>(&self, state: &mut H) {
self.id.hash(state);
self.addr.hash(state);
}
}
impl Eq for Peer {}
impl RequestHeader {
#[inline]
pub fn new((cluster_id, member_id): (u64, u64)) -> Self {
@@ -33,6 +73,21 @@ impl ResponseHeader {
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ErrorCode {
NoActiveDatanodes = 1,
}
impl Error {
#[inline]
pub fn no_active_datanodes() -> Self {
Self {
code: ErrorCode::NoActiveDatanodes as i32,
err_msg: "No active datanodes".to_string(),
}
}
}
macro_rules! gen_set_header {
($req: ty) => {
impl $req {
@@ -52,3 +107,59 @@ gen_set_header!(PutRequest);
gen_set_header!(BatchPutRequest);
gen_set_header!(CompareAndPutRequest);
gen_set_header!(DeleteRangeRequest);
#[cfg(test)]
mod tests {
use std::vec;
use super::*;
#[test]
fn test_peer_dict() {
let mut dict = PeerDict::default();
dict.get_or_insert(Peer {
id: 1,
addr: "111".to_string(),
});
dict.get_or_insert(Peer {
id: 2,
addr: "222".to_string(),
});
dict.get_or_insert(Peer {
id: 1,
addr: "111".to_string(),
});
dict.get_or_insert(Peer {
id: 1,
addr: "111".to_string(),
});
dict.get_or_insert(Peer {
id: 1,
addr: "111".to_string(),
});
dict.get_or_insert(Peer {
id: 1,
addr: "111".to_string(),
});
dict.get_or_insert(Peer {
id: 2,
addr: "222".to_string(),
});
assert_eq!(2, dict.index);
assert_eq!(
vec![
Peer {
id: 1,
addr: "111".to_string(),
},
Peer {
id: 2,
addr: "222".to_string(),
}
],
dict.into_peers()
);
}
}

View File

@@ -372,14 +372,14 @@ mod tests {
#[tokio::test]
async fn test_ask_leader() {
let client = mocks::mock_client_with_noopstore().await;
let client = mocks::mock_client_with_memstore().await;
let res = client.ask_leader().await;
assert!(res.is_ok());
}
#[tokio::test]
async fn test_heartbeat() {
let client = mocks::mock_client_with_noopstore().await;
let client = mocks::mock_client_with_memstore().await;
let (sender, mut receiver) = client.heartbeat().await.unwrap();
// send heartbeats
tokio::spawn(async move {
@@ -428,9 +428,10 @@ mod tests {
}
#[tokio::test]
async fn test_create_route() {
async fn test_route() {
let selector = Arc::new(MockSelector {});
let client = mocks::mock_client_with_selector(selector).await;
let client = mocks::mock_client_with_memorystore_and_selector(selector).await;
let p1 = Partition {
column_list: vec![b"col_1".to_vec(), b"col_2".to_vec()],
value_list: vec![b"k1".to_vec(), b"k2".to_vec()],
@@ -440,29 +441,17 @@ mod tests {
value_list: vec![b"Max1".to_vec(), b"Max2".to_vec()],
};
let table_name = TableName::new("test_catalog", "test_schema", "test_table");
let req = CreateRequest::new(table_name)
let req = CreateRequest::new(table_name.clone())
.add_partition(p1)
.add_partition(p2);
let res = client.create_route(req).await.unwrap();
assert_eq!(1, res.table_routes.len());
let table_routes = res.table_routes;
assert_eq!(1, table_routes.len());
let table_route = table_routes.get(0).unwrap();
let table = &table_route.table;
let table_name = &table.table_name;
assert_eq!("test_catalog", table_name.catalog_name);
assert_eq!("test_schema", table_name.schema_name);
assert_eq!("test_table", table_name.table_name);
let region_routes = &table_route.region_routes;
assert_eq!(2, region_routes.len());
let r0 = region_routes.get(0).unwrap();
let r1 = region_routes.get(1).unwrap();
assert_eq!(0, r0.region.as_ref().unwrap().id);
assert_eq!(1, r1.region.as_ref().unwrap().id);
assert_eq!("peer0", r0.leader_peer.as_ref().unwrap().addr);
assert_eq!("peer1", r1.leader_peer.as_ref().unwrap().addr);
let req = RouteRequest::new().add_table_name(table_name);
let res = client.route(req).await.unwrap();
// empty table_routes since no TableGlobalValue is stored by datanode
assert!(res.table_routes.is_empty());
}
async fn gen_data(client: &MetaClient) {

View File

@@ -5,11 +5,6 @@ use meta_srv::mocks::MockInfo;
use crate::client::MetaClient;
use crate::client::MetaClientBuilder;
pub async fn mock_client_with_noopstore() -> MetaClient {
let mock_info = server_mock::mock_with_noopstore().await;
mock_client_by(mock_info).await
}
pub async fn mock_client_with_memstore() -> MetaClient {
let mock_info = server_mock::mock_with_memstore().await;
mock_client_by(mock_info).await
@@ -21,8 +16,8 @@ pub async fn mock_client_with_etcdstore(addr: &str) -> MetaClient {
mock_client_by(mock_info).await
}
pub async fn mock_client_with_selector(selector: SelectorRef) -> MetaClient {
let mock_info = server_mock::mock_with_selector(selector).await;
pub async fn mock_client_with_memorystore_and_selector(selector: SelectorRef) -> MetaClient {
let mock_info = server_mock::mock_with_memstore_and_selector(selector).await;
mock_client_by(mock_info).await
}

View File

@@ -134,6 +134,7 @@ pub struct TableRoute {
#[derive(Debug, Clone)]
pub struct Table {
pub id: u64,
pub table_name: TableName,
pub table_schema: Vec<u8>,
}
@@ -149,6 +150,7 @@ impl TryFrom<PbTable> for Table {
})?
.into();
Ok(Self {
id: t.id,
table_name,
table_schema: t.table_schema,
})
@@ -296,6 +298,7 @@ mod tests {
],
table_routes: vec![PbTableRoute {
table: Some(PbTable {
id: 1,
table_name: Some(PbTableName {
catalog_name: "c1".to_string(),
schema_name: "s1".to_string(),
@@ -324,6 +327,7 @@ mod tests {
assert_eq!(1, table_routes.len());
let table_route = table_routes.remove(0);
let table = table_route.table;
assert_eq!(1, table.id);
assert_eq!("c1", table.table_name.catalog_name);
assert_eq!("s1", table.table_name.schema_name);
assert_eq!("t1", table.table_name.table_name);

View File

@@ -10,6 +10,7 @@ mock = []
api = { path = "../api" }
async-trait = "0.1"
common-base = { path = "../common/base" }
common-catalog = { path = "../common/catalog" }
common-error = { path = "../common/error" }
common-grpc = { path = "../common/grpc" }
common-runtime = { path = "../common/runtime" }
@@ -21,6 +22,7 @@ h2 = "0.3"
http-body = "0.4"
lazy_static = "1.4"
parking_lot = "0.12"
prost = "0.11"
regex = "1.6"
serde = "1.0"
serde_json = "1.0"

View File

@@ -79,6 +79,33 @@ pub enum Error {
err_msg: String,
backtrace: Backtrace,
},
#[snafu(display("Cannot parse catalog value, source: {}", source))]
InvalidCatalogValue {
#[snafu(backtrace)]
source: common_catalog::error::Error,
},
#[snafu(display("Unexcepted sequence value: {}", err_msg))]
UnexceptedSequenceValue {
err_msg: String,
backtrace: Backtrace,
},
#[snafu(display("Failed to decode table route, source: {}", source))]
DecodeTableRoute {
source: prost::DecodeError,
backtrace: Backtrace,
},
#[snafu(display("Table route not found: {}", key))]
TableRouteNotFound { key: String, backtrace: Backtrace },
#[snafu(display("Failed to get sequence: {}", err_msg))]
NextSequence {
err_msg: String,
backtrace: Backtrace,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -106,15 +133,19 @@ impl ErrorExt for Error {
| Error::TcpBind { .. }
| Error::SerializeToJson { .. }
| Error::DeserializeFromJson { .. }
| Error::DecodeTableRoute { .. }
| Error::StartGrpc { .. } => StatusCode::Internal,
Error::EmptyKey { .. }
| Error::EmptyTableName { .. }
| Error::InvalidLeaseKey { .. }
| Error::ParseNum { .. }
| Error::InvalidArguments { .. } => StatusCode::InvalidArguments,
Error::LeaseKeyFromUtf8 { .. } | Error::InvalidTxnResult { .. } => {
StatusCode::Unexpected
}
Error::LeaseKeyFromUtf8 { .. }
| Error::UnexceptedSequenceValue { .. }
| Error::TableRouteNotFound { .. }
| Error::NextSequence { .. }
| Error::InvalidTxnResult { .. } => StatusCode::Unexpected,
Error::InvalidCatalogValue { source, .. } => source.status_code(),
}
}
}

View File

@@ -31,7 +31,7 @@ impl HeartbeatHandler for DatanodeLeaseHandler {
node_addr: peer.addr.clone(),
};
info!("Receive a heartbeat from datanode: {:?}, {:?}", key, value);
info!("Receive a heartbeat: {:?}, {:?}", key, value);
let key = key.try_into()?;
let value = value.try_into()?;
@@ -41,8 +41,7 @@ impl HeartbeatHandler for DatanodeLeaseHandler {
..Default::default()
};
let kv_store = ctx.kv_store();
let _ = kv_store.put(put).await?;
let _ = ctx.kv_store().put(put).await?;
}
Ok(())

View File

@@ -35,11 +35,12 @@ mod tests {
use api::v1::meta::{HeartbeatResponse, RequestHeader};
use super::*;
use crate::{handler::Context, service::store::noop::NoopKvStore};
use crate::handler::Context;
use crate::service::store::memory::MemStore;
#[tokio::test]
async fn test_handle_heartbeat_resp_header() {
let kv_store = Arc::new(NoopKvStore {});
let kv_store = Arc::new(MemStore::new());
let ctx = Context {
server_addr: "0.0.0.0:0000".to_string(),
kv_store,

View File

@@ -1,5 +1,7 @@
use std::str::FromStr;
use api::v1::meta::TableName;
use common_catalog::TableGlobalKey;
use lazy_static::lazy_static;
use regex::Regex;
use serde::Deserialize;
@@ -12,6 +14,8 @@ use crate::error;
use crate::error::Result;
pub(crate) const DN_LEASE_PREFIX: &str = "__meta_dnlease";
pub(crate) const SEQ_PREFIX: &str = "__meta_seq";
pub(crate) const TABLE_ROUTE_PREFIX: &str = "__meta_table_route";
lazy_static! {
static ref DATANODE_KEY_PATTERN: Regex =
@@ -108,6 +112,44 @@ impl TryFrom<LeaseValue> for Vec<u8> {
}
}
pub struct TableRouteKey<'a> {
pub table_id: u64,
pub catalog_name: &'a str,
pub schema_name: &'a str,
pub table_name: &'a str,
}
impl<'a> TableRouteKey<'a> {
pub fn with_table_name(table_id: u64, t: &'a TableName) -> Self {
Self {
table_id,
catalog_name: &t.catalog_name,
schema_name: &t.schema_name,
table_name: &t.table_name,
}
}
pub fn with_table_global_key(table_id: u64, t: &'a TableGlobalKey) -> Self {
Self {
table_id,
catalog_name: &t.catalog_name,
schema_name: &t.schema_name,
table_name: &t.table_name,
}
}
pub fn prefix(&self) -> String {
format!(
"{}-{}-{}-{}",
TABLE_ROUTE_PREFIX, self.catalog_name, self.schema_name, self.table_name
)
}
pub fn key(&self) -> String {
format!("{}-{}", self.prefix(), self.table_id)
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -1,5 +1,4 @@
use api::v1::meta::RangeRequest;
use api::v1::meta::RangeResponse;
use crate::error::Result;
use crate::keys::LeaseKey;
@@ -10,7 +9,7 @@ use crate::util;
pub async fn alive_datanodes<P>(
cluster_id: u64,
kv_store: KvStoreRef,
kv_store: &KvStoreRef,
predicate: P,
) -> Result<Vec<(LeaseKey, LeaseValue)>>
where
@@ -26,7 +25,7 @@ where
let res = kv_store.range(req).await?;
let RangeResponse { kvs, .. } = res;
let kvs = res.kvs;
let mut lease_kvs = vec![];
for kv in kvs {
let lease_key: LeaseKey = kv.key.try_into()?;

View File

@@ -3,11 +3,12 @@ pub mod bootstrap;
pub mod error;
pub mod handler;
mod keys;
pub mod lease;
mod lease;
pub mod metasrv;
#[cfg(feature = "mock")]
pub mod mocks;
pub mod selector;
mod sequence;
pub mod service;
mod util;

View File

@@ -9,8 +9,12 @@ use crate::handler::response_header::ResponseHeaderHandler;
use crate::handler::HeartbeatHandlerGroup;
use crate::selector::lease_based::LeaseBasedSelector;
use crate::selector::Selector;
use crate::sequence::Sequence;
use crate::sequence::SequenceRef;
use crate::service::store::kv::KvStoreRef;
pub const TABLE_ID_SEQ: &str = "table_id";
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct MetaSrvOptions {
pub bind_addr: String,
@@ -24,7 +28,7 @@ impl Default for MetaSrvOptions {
Self {
bind_addr: "0.0.0.0:3002".to_string(),
server_addr: "0.0.0.0:3002".to_string(),
store_addr: "0.0.0.0:2380".to_string(),
store_addr: "0.0.0.0:2379".to_string(),
datanode_lease_secs: 15,
}
}
@@ -42,6 +46,7 @@ pub type SelectorRef = Arc<dyn Selector<Context = Context, Output = Vec<Peer>>>;
pub struct MetaSrv {
options: MetaSrvOptions,
kv_store: KvStoreRef,
table_id_sequence: SequenceRef,
selector: SelectorRef,
handler_group: HeartbeatHandlerGroup,
}
@@ -52,6 +57,7 @@ impl MetaSrv {
kv_store: KvStoreRef,
selector: Option<SelectorRef>,
) -> Self {
let table_id_sequence = Arc::new(Sequence::new(TABLE_ID_SEQ, 10, kv_store.clone()));
let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector {}));
let handler_group = HeartbeatHandlerGroup::default();
handler_group.add_handler(ResponseHeaderHandler).await;
@@ -60,6 +66,7 @@ impl MetaSrv {
Self {
options,
kv_store,
table_id_sequence,
selector,
handler_group,
}
@@ -75,6 +82,11 @@ impl MetaSrv {
self.kv_store.clone()
}
#[inline]
pub fn table_id_sequence(&self) -> SequenceRef {
self.table_id_sequence.clone()
}
#[inline]
pub fn selector(&self) -> SelectorRef {
self.selector.clone()

View File

@@ -13,18 +13,12 @@ use crate::metasrv::SelectorRef;
use crate::service::store::etcd::EtcdStore;
use crate::service::store::kv::KvStoreRef;
use crate::service::store::memory::MemStore;
use crate::service::store::noop::NoopKvStore;
pub struct MockInfo {
pub server_addr: String,
pub channel_manager: ChannelManager,
}
pub async fn mock_with_noopstore() -> MockInfo {
let kv_store = Arc::new(NoopKvStore {});
mock(Default::default(), kv_store, None).await
}
pub async fn mock_with_memstore() -> MockInfo {
let kv_store = Arc::new(MemStore::default());
mock(Default::default(), kv_store, None).await
@@ -35,8 +29,8 @@ pub async fn mock_with_etcdstore(addr: &str) -> MockInfo {
mock(Default::default(), kv_store, None).await
}
pub async fn mock_with_selector(selector: SelectorRef) -> MockInfo {
let kv_store = Arc::new(NoopKvStore {});
pub async fn mock_with_memstore_and_selector(selector: SelectorRef) -> MockInfo {
let kv_store = Arc::new(MemStore::default());
mock(Default::default(), kv_store, Some(selector)).await
}

View File

@@ -21,7 +21,7 @@ impl Selector for LeaseBasedSelector {
let lease_filter = |_: &LeaseKey, v: &LeaseValue| {
time_util::current_time_millis() - v.timestamp_millis < ctx.datanode_lease_secs
};
let mut lease_kvs = lease::alive_datanodes(ns, ctx.kv_store.clone(), lease_filter).await?;
let mut lease_kvs = lease::alive_datanodes(ns, &ctx.kv_store, lease_filter).await?;
// TODO(jiachun): At the moment we are just pushing the latest to the forefront,
// and it is better to use load-based strategies in the future.
lease_kvs.sort_by(|a, b| b.1.timestamp_millis.cmp(&a.1.timestamp_millis));

View File

@@ -0,0 +1,197 @@
use std::ops::Range;
use std::sync::Arc;
use api::v1::meta::CompareAndPutRequest;
use snafu::ensure;
use tokio::sync::Mutex;
use crate::error::{self, Result};
use crate::keys;
use crate::service::store::kv::KvStoreRef;
pub type SequenceRef = Arc<Sequence>;
pub struct Sequence {
inner: Mutex<Inner>,
}
impl Sequence {
pub fn new(name: impl AsRef<str>, step: u64, generator: KvStoreRef) -> Self {
let name = format!("{}-{}", keys::SEQ_PREFIX, name.as_ref());
let step = step.max(1);
Self {
inner: Mutex::new(Inner {
name,
generator,
next: 0,
step,
range: None,
force_quit: 1024,
}),
}
}
pub async fn next(&self) -> Result<u64> {
let mut inner = self.inner.lock().await;
inner.next().await
}
}
struct Inner {
name: String,
generator: KvStoreRef,
// The next available sequences(if it is in the range,
// otherwise it need to fetch from generator again).
next: u64,
// Fetch several sequences at once: [start, start + step).
step: u64,
// The range of available sequences for the local cache.
range: Option<Range<u64>>,
// Used to avoid dead loops.
force_quit: usize,
}
impl Inner {
/// 1. returns the `next` value directly if it is in the `range` (local cache)
/// 2. fetch(CAS) next `range` from the `generator`
/// 3. jump to step 1
pub async fn next(&mut self) -> Result<u64> {
for _ in 0..self.force_quit {
match &self.range {
Some(range) => {
if range.contains(&self.next) {
let res = Ok(self.next);
self.next += 1;
return res;
}
self.range = None;
}
None => {
let range = self.next_range().await?;
self.next = range.start;
self.range = Some(range);
}
}
}
error::NextSequenceSnafu {
err_msg: format!("{}.next()", &self.name),
}
.fail()
}
pub async fn next_range(&self) -> Result<Range<u64>> {
let key = self.name.as_bytes();
let mut start = self.next;
for _ in 0..self.force_quit {
let expect = if start == 0 {
vec![]
} else {
u64::to_le_bytes(start).to_vec()
};
let value = u64::to_le_bytes(start + self.step);
let req = CompareAndPutRequest {
key: key.to_vec(),
expect,
value: value.to_vec(),
..Default::default()
};
let res = self.generator.compare_and_put(req).await?;
if !res.success {
if let Some(kv) = res.prev_kv {
let value = kv.value;
ensure!(
value.len() == std::mem::size_of::<u64>(),
error::UnexceptedSequenceValueSnafu {
err_msg: format!("key={}, unexpected value={:?}", self.name, value)
}
);
start = u64::from_le_bytes(value.try_into().unwrap());
} else {
start = 0;
}
continue;
}
return Ok(Range {
start,
end: start + self.step,
});
}
error::NextSequenceSnafu {
err_msg: format!("{}.next_range()", &self.name),
}
.fail()
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use crate::service::store::{kv::KvStore, memory::MemStore};
#[tokio::test]
async fn test_sequence() {
let kv_store = Arc::new(MemStore::new());
let seq = Sequence::new("test_seq", 10, kv_store);
for i in 0..100 {
assert_eq!(i, seq.next().await.unwrap());
}
}
#[tokio::test]
async fn test_sequence_fouce_quit() {
struct Noop;
#[async_trait::async_trait]
impl KvStore for Noop {
async fn range(
&self,
_: api::v1::meta::RangeRequest,
) -> Result<api::v1::meta::RangeResponse> {
unreachable!()
}
async fn put(
&self,
_: api::v1::meta::PutRequest,
) -> Result<api::v1::meta::PutResponse> {
unreachable!()
}
async fn batch_put(
&self,
_: api::v1::meta::BatchPutRequest,
) -> Result<api::v1::meta::BatchPutResponse> {
unreachable!()
}
async fn compare_and_put(
&self,
_: CompareAndPutRequest,
) -> Result<api::v1::meta::CompareAndPutResponse> {
Ok(api::v1::meta::CompareAndPutResponse::default())
}
async fn delete_range(
&self,
_: api::v1::meta::DeleteRangeRequest,
) -> Result<api::v1::meta::DeleteRangeResponse> {
unreachable!()
}
}
let kv_store = Arc::new(Noop {});
let seq = Sequence::new("test_seq", 10, kv_store);
let next = seq.next().await;
assert!(next.is_err());
}
}

View File

@@ -142,11 +142,11 @@ mod tests {
use super::*;
use crate::metasrv::MetaSrvOptions;
use crate::service::store::noop::NoopKvStore;
use crate::service::store::memory::MemStore;
#[tokio::test]
async fn test_ask_leader() {
let kv_store = Arc::new(NoopKvStore {});
let kv_store = Arc::new(MemStore::new());
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None).await;
let req = AskLeaderRequest {

View File

@@ -1,5 +1,9 @@
use api::v1::meta::router_server;
use api::v1::meta::CreateRequest;
use api::v1::meta::Error;
use api::v1::meta::PeerDict;
use api::v1::meta::PutRequest;
use api::v1::meta::RangeRequest;
use api::v1::meta::Region;
use api::v1::meta::RegionRoute;
use api::v1::meta::ResponseHeader;
@@ -7,22 +11,31 @@ use api::v1::meta::RouteRequest;
use api::v1::meta::RouteResponse;
use api::v1::meta::Table;
use api::v1::meta::TableRoute;
use api::v1::meta::TableRouteValue;
use common_catalog::TableGlobalKey;
use common_catalog::TableGlobalValue;
use common_telemetry::warn;
use snafu::OptionExt;
use snafu::ResultExt;
use tonic::Request;
use tonic::Response;
use super::store::kv::KvStoreRef;
use super::GrpcResult;
use crate::error;
use crate::error::Result;
use crate::keys::TableRouteKey;
use crate::metasrv::Context;
use crate::metasrv::MetaSrv;
use crate::metasrv::SelectorRef;
use crate::sequence::SequenceRef;
#[async_trait::async_trait]
impl router_server::Router for MetaSrv {
async fn route(&self, req: Request<RouteRequest>) -> GrpcResult<RouteResponse> {
let req = req.into_inner();
let res = handle_route(req).await?;
let ctx = self.new_ctx();
let res = handle_route(req, ctx).await?;
Ok(Response::new(res))
}
@@ -31,20 +44,68 @@ impl router_server::Router for MetaSrv {
let req = req.into_inner();
let ctx = self.new_ctx();
let selector = self.selector();
let res = handle_create(req, ctx, selector).await?;
let table_id_sequence = self.table_id_sequence();
let res = handle_create(req, ctx, selector, table_id_sequence).await?;
Ok(Response::new(res))
}
}
async fn handle_route(_req: RouteRequest) -> Result<RouteResponse> {
todo!()
async fn handle_route(req: RouteRequest, ctx: Context) -> Result<RouteResponse> {
let RouteRequest {
header,
table_names,
} = req;
let cluster_id = header.as_ref().map_or(0, |h| h.cluster_id);
let table_global_keys = table_names.into_iter().map(|t| TableGlobalKey {
catalog_name: t.catalog_name,
schema_name: t.schema_name,
table_name: t.table_name,
});
let tables = fetch_tables(&ctx.kv_store, table_global_keys).await?;
let mut peer_dict = PeerDict::default();
let mut table_routes = vec![];
for (tg, tr) in tables {
let TableRouteValue {
peers,
mut table_route,
} = tr;
if let Some(ref mut table_route) = table_route {
for rr in &mut table_route.region_routes {
if let Some(peer) = peers.get(rr.leader_peer_index as usize) {
rr.leader_peer_index = peer_dict.get_or_insert(peer.clone()) as u64;
}
for index in &mut rr.follower_peer_indexes {
if let Some(peer) = peers.get(*index as usize) {
*index = peer_dict.get_or_insert(peer.clone()) as u64;
}
}
}
if let Some(ref mut table) = table_route.table {
table.table_schema = tg.as_bytes().context(error::InvalidCatalogValueSnafu)?;
}
}
if let Some(table_route) = table_route {
table_routes.push(table_route)
}
}
let peers = peer_dict.into_peers();
let header = Some(ResponseHeader::success(cluster_id));
Ok(RouteResponse {
header,
peers,
table_routes,
})
}
async fn handle_create(
req: CreateRequest,
ctx: Context,
selector: SelectorRef,
table_id_sequence: SequenceRef,
) -> Result<RouteResponse> {
let CreateRequest {
header,
@@ -55,22 +116,37 @@ async fn handle_create(
let cluster_id = header.as_ref().map_or(0, |h| h.cluster_id);
let peers = selector.select(cluster_id, &ctx).await?;
if peers.is_empty() {
let header = Some(ResponseHeader::failed(
cluster_id,
Error::no_active_datanodes(),
));
return Ok(RouteResponse {
header,
..Default::default()
});
}
let id = table_id_sequence.next().await?;
let table_route_key = TableRouteKey::with_table_name(id, &table_name)
.key()
.into_bytes();
let table = Table {
id,
table_name: Some(table_name),
..Default::default()
};
let region_num = partitions.len();
let mut region_routes = Vec::with_capacity(region_num);
for i in 0..region_num {
let mut region_routes = Vec::with_capacity(partitions.len());
for (i, partition) in partitions.into_iter().enumerate() {
let region = Region {
id: i as u64,
partition: Some(partition),
..Default::default()
};
let region_route = RegionRoute {
region: Some(region),
leader_peer_index: (i % peers.len()) as u64,
follower_peer_indexes: vec![(i % peers.len()) as u64],
follower_peer_indexes: vec![], // follower_peers is not supported at the moment
};
region_routes.push(region_route);
}
@@ -79,6 +155,13 @@ async fn handle_create(
region_routes,
};
// save table route data into meta store
let table_route_value = TableRouteValue {
peers: peers.clone(),
table_route: Some(table_route.clone()),
};
put_into_store(&ctx.kv_store, table_route_key, table_route_value).await?;
let header = Some(ResponseHeader::success(cluster_id));
Ok(RouteResponse {
header,
@@ -87,112 +170,89 @@ async fn handle_create(
})
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::meta::router_server::Router;
use api::v1::meta::*;
use tonic::IntoRequest;
use super::*;
use crate::metasrv::MetaSrvOptions;
use crate::selector::{Namespace, Selector};
use crate::service::store::noop::NoopKvStore;
#[should_panic]
#[tokio::test]
async fn test_handle_route() {
let kv_store = Arc::new(NoopKvStore {});
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None).await;
let req = RouteRequest {
header: Some(RequestHeader::new((1, 1))),
table_names: vec![
TableName {
catalog_name: "catalog1".to_string(),
schema_name: "schema1".to_string(),
table_name: "table1".to_string(),
},
TableName {
catalog_name: "catalog1".to_string(),
schema_name: "schema1".to_string(),
table_name: "table2".to_string(),
},
TableName {
catalog_name: "catalog1".to_string(),
schema_name: "schema1".to_string(),
table_name: "table3".to_string(),
},
],
};
let _res = meta_srv.route(req.into_request()).await.unwrap();
}
struct MockSelector;
#[async_trait::async_trait]
impl Selector for MockSelector {
type Context = Context;
type Output = Vec<Peer>;
async fn select(&self, _ns: Namespace, _ctx: &Self::Context) -> Result<Self::Output> {
Ok(vec![
Peer {
id: 0,
addr: "127.0.0.1:3000".to_string(),
},
Peer {
id: 1,
addr: "127.0.0.1:3001".to_string(),
},
])
async fn fetch_tables(
kv_store: &KvStoreRef,
keys: impl Iterator<Item = TableGlobalKey>,
) -> Result<Vec<(TableGlobalValue, TableRouteValue)>> {
let mut tables = vec![];
// Maybe we can optimize the for loop in the future, but in general,
// there won't be many keys, in fact, there is usually just one.
for tk in keys {
let tv = get_table_global_value(kv_store, &tk).await?;
if tv.is_none() {
warn!("Table global value is absent: {}", tk);
continue;
}
let tv = tv.unwrap();
let table_id = tv.id as u64;
let tr_key = TableRouteKey::with_table_global_key(table_id, &tk);
let tr = get_table_route_value(kv_store, &tr_key).await?;
tables.push((tv, tr));
}
#[tokio::test]
async fn test_handle_create() {
let kv_store = Arc::new(NoopKvStore {});
let table_name = TableName {
catalog_name: "test_catalog".to_string(),
schema_name: "test_db".to_string(),
table_name: "table1".to_string(),
};
let p0 = Partition {
column_list: vec![b"col1".to_vec(), b"col2".to_vec()],
value_list: vec![b"v1".to_vec(), b"v2".to_vec()],
};
let p1 = Partition {
column_list: vec![b"col1".to_vec(), b"col2".to_vec()],
value_list: vec![b"v11".to_vec(), b"v22".to_vec()],
};
let req = CreateRequest {
header: Some(RequestHeader::new((1, 1))),
table_name: Some(table_name),
partitions: vec![p0, p1],
};
let ctx = Context {
datanode_lease_secs: 10,
kv_store,
};
let selector = Arc::new(MockSelector {});
let res = handle_create(req, ctx, selector).await.unwrap();
Ok(tables)
}
assert_eq!(
vec![
Peer {
id: 0,
addr: "127.0.0.1:3000".to_string(),
},
Peer {
id: 1,
addr: "127.0.0.1:3001".to_string(),
},
],
res.peers
);
assert_eq!(1, res.table_routes.len());
assert_eq!(2, res.table_routes.get(0).unwrap().region_routes.len());
async fn get_table_route_value(
kv_store: &KvStoreRef,
key: &TableRouteKey<'_>,
) -> Result<TableRouteValue> {
let tr = get_from_store(kv_store, key.key().into_bytes())
.await?
.context(error::TableRouteNotFoundSnafu { key: key.key() })?;
let tr: TableRouteValue = tr
.as_slice()
.try_into()
.context(error::DecodeTableRouteSnafu)?;
Ok(tr)
}
async fn get_table_global_value(
kv_store: &KvStoreRef,
key: &TableGlobalKey,
) -> Result<Option<TableGlobalValue>> {
let tg_key = format!("{}", key).into_bytes();
let tv = get_from_store(kv_store, tg_key).await?;
match tv {
Some(tv) => {
let tv = TableGlobalValue::parse(&String::from_utf8_lossy(&tv))
.context(error::InvalidCatalogValueSnafu)?;
Ok(Some(tv))
}
None => Ok(None),
}
}
async fn put_into_store(
kv_store: &KvStoreRef,
key: impl Into<Vec<u8>>,
value: impl Into<Vec<u8>>,
) -> Result<()> {
let key = key.into();
let value = value.into();
let put_req = PutRequest {
key,
value,
..Default::default()
};
let _ = kv_store.put(put_req).await?;
Ok(())
}
async fn get_from_store(kv_store: &KvStoreRef, key: Vec<u8>) -> Result<Option<Vec<u8>>> {
let req = RangeRequest {
key,
..Default::default()
};
let res = kv_store.range(req).await?;
let mut kvs = res.kvs;
if kvs.is_empty() {
Ok(None)
} else {
Ok(Some(kvs.pop().unwrap().value))
}
}

View File

@@ -1,7 +1,6 @@
pub mod etcd;
pub mod kv;
pub mod memory;
pub mod noop;
use api::v1::meta::store_server;
use api::v1::meta::BatchPutRequest;
@@ -74,11 +73,11 @@ mod tests {
use super::*;
use crate::metasrv::MetaSrvOptions;
use crate::service::store::noop::NoopKvStore;
use crate::service::store::memory::MemStore;
#[tokio::test]
async fn test_range() {
let kv_store = Arc::new(NoopKvStore {});
let kv_store = Arc::new(MemStore::new());
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None).await;
let req = RangeRequest::default();
let res = meta_srv.range(req.into_request()).await;
@@ -88,7 +87,7 @@ mod tests {
#[tokio::test]
async fn test_put() {
let kv_store = Arc::new(NoopKvStore {});
let kv_store = Arc::new(MemStore::new());
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None).await;
let req = PutRequest::default();
let res = meta_srv.put(req.into_request()).await;
@@ -98,7 +97,7 @@ mod tests {
#[tokio::test]
async fn test_batch_put() {
let kv_store = Arc::new(NoopKvStore {});
let kv_store = Arc::new(MemStore::new());
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None).await;
let req = BatchPutRequest::default();
let res = meta_srv.batch_put(req.into_request()).await;
@@ -108,7 +107,7 @@ mod tests {
#[tokio::test]
async fn test_compare_and_put() {
let kv_store = Arc::new(NoopKvStore {});
let kv_store = Arc::new(MemStore::new());
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None).await;
let req = CompareAndPutRequest::default();
let res = meta_srv.compare_and_put(req.into_request()).await;
@@ -118,7 +117,7 @@ mod tests {
#[tokio::test]
async fn test_delete_range() {
let kv_store = Arc::new(NoopKvStore {});
let kv_store = Arc::new(MemStore::new());
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None).await;
let req = DeleteRangeRequest::default();
let res = meta_srv.delete_range(req.into_request()).await;

View File

@@ -1,4 +1,4 @@
use std::cmp::Ordering;
use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::ops::Range;
use std::sync::Arc;
@@ -146,19 +146,26 @@ impl KvStore for MemStore {
let mut memory = self.inner.write();
let prev_val = memory.get(&key);
let (success, prev_kv) = match memory.entry(key) {
Entry::Vacant(e) => {
let success = expect.is_empty();
if success {
e.insert(value);
}
(success, None)
}
Entry::Occupied(mut e) => {
let key = e.key().clone();
let prev_val = e.get().clone();
let success = prev_val == expect;
if success {
e.insert(value);
}
(success, Some((key, prev_val)))
}
};
let success = prev_val
.map(|v| expect.cmp(v) == Ordering::Equal)
.unwrap_or(false | expect.is_empty());
let prev_kv = prev_val.map(|v| KeyValue {
key: key.clone(),
value: v.clone(),
});
if success {
memory.insert(key, value);
}
let prev_kv = prev_kv.map(|(key, value)| KeyValue { key, value });
let cluster_id = header.map_or(0, |h| h.cluster_id);
let header = Some(ResponseHeader::success(cluster_id));

View File

@@ -1,41 +0,0 @@
use api::v1::meta::BatchPutRequest;
use api::v1::meta::BatchPutResponse;
use api::v1::meta::CompareAndPutRequest;
use api::v1::meta::CompareAndPutResponse;
use api::v1::meta::DeleteRangeRequest;
use api::v1::meta::DeleteRangeResponse;
use api::v1::meta::PutRequest;
use api::v1::meta::PutResponse;
use api::v1::meta::RangeRequest;
use api::v1::meta::RangeResponse;
use super::kv::KvStore;
use crate::error::Result;
/// A noop kv_store which only for test
// TODO(jiachun): Add a test feature
#[derive(Clone)]
pub struct NoopKvStore;
#[async_trait::async_trait]
impl KvStore for NoopKvStore {
async fn range(&self, _req: RangeRequest) -> Result<RangeResponse> {
Ok(RangeResponse::default())
}
async fn put(&self, _req: PutRequest) -> Result<PutResponse> {
Ok(PutResponse::default())
}
async fn batch_put(&self, _req: BatchPutRequest) -> Result<BatchPutResponse> {
Ok(BatchPutResponse::default())
}
async fn compare_and_put(&self, _req: CompareAndPutRequest) -> Result<CompareAndPutResponse> {
Ok(CompareAndPutResponse::default())
}
async fn delete_range(&self, _req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
Ok(DeleteRangeResponse::default())
}
}