feat: router impl (#363)

* feat: heartbeat lease & route api

* feat: batchput&cas

* chore: demo&ut

* chore: by cr

* chore: datanode selector

* chore: rename with_key_range to with_range

* chore: ut
This commit is contained in:
Jiachun Feng
2022-11-01 11:45:05 +08:00
committed by GitHub
parent 518b665f1e
commit dacfd12b8f
31 changed files with 1487 additions and 285 deletions

4
Cargo.lock generated
View File

@@ -2718,11 +2718,15 @@ dependencies = [
"common-error",
"common-runtime",
"common-telemetry",
"common-time",
"etcd-client",
"futures",
"h2",
"http-body",
"lazy_static",
"regex",
"serde",
"serde_json",
"snafu",
"tokio",
"tokio-stream",

View File

@@ -1,3 +1,4 @@
bind_addr = '127.0.0.1:3002'
server_addr = '0.0.0.0:3002'
store_addr = '127.0.0.1:2380'
datanode_lease_secs = 30

View File

@@ -11,6 +11,13 @@ service Store {
// Put puts the given key into the key-value store.
rpc Put(PutRequest) returns (PutResponse);
// BatchPut atomically puts the given keys into the key-value store.
rpc BatchPut(BatchPutRequest) returns (BatchPutResponse);
// CompareAndPut atomically puts the value to the given updated
// value if the current value == the expected value.
rpc CompareAndPut(CompareAndPutRequest) returns (CompareAndPutResponse);
// DeleteRange deletes the given range from the key-value store.
rpc DeleteRange(DeleteRangeRequest) returns (DeleteRangeResponse);
}
@@ -64,6 +71,42 @@ message PutResponse {
KeyValue prev_kv = 2;
}
message BatchPutRequest {
RequestHeader header = 1;
repeated KeyValue kvs = 2;
// If prev_kv is set, gets the previous key-value pairs before changing it.
// The previous key-value pairs will be returned in the batch put response.
bool prev_kv = 3;
}
message BatchPutResponse {
ResponseHeader header = 1;
// If prev_kv is set in the request, the previous key-value pairs will be
// returned.
repeated KeyValue prev_kvs = 2;
}
message CompareAndPutRequest {
RequestHeader header = 1;
// key is the key, in bytes, to put into the key-value store.
bytes key = 2;
// expect is the previous value, in bytes
bytes expect = 3;
// value is the value, in bytes, to associate with the key in the
// key-value store.
bytes value = 4;
}
message CompareAndPutResponse {
ResponseHeader header = 1;
bool success = 2;
KeyValue prev_kv = 3;
}
message DeleteRangeRequest {
RequestHeader header = 1;

View File

@@ -2,18 +2,34 @@ tonic::include_proto!("greptime.v1.meta");
pub const PROTOCOL_VERSION: u64 = 1;
pub const fn request_header((cluster_id, member_id): (u64, u64)) -> Option<RequestHeader> {
Some(RequestHeader::new((cluster_id, member_id)))
}
impl RequestHeader {
#[inline]
pub const fn new((cluster_id, member_id): (u64, u64)) -> Self {
Self {
pub fn new((cluster_id, member_id): (u64, u64)) -> Option<Self> {
Some(Self {
protocol_version: PROTOCOL_VERSION,
cluster_id,
member_id,
}
})
}
}
impl ResponseHeader {
#[inline]
pub fn success(cluster_id: u64) -> Option<Self> {
Some(Self {
protocol_version: PROTOCOL_VERSION,
cluster_id,
..Default::default()
})
}
#[inline]
pub fn failed(cluster_id: u64, error: Error) -> Option<Self> {
Some(Self {
protocol_version: PROTOCOL_VERSION,
cluster_id,
error: Some(error),
})
}
}

View File

@@ -117,5 +117,6 @@ mod tests {
assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
assert_eq!("0.0.0.0:3002".to_string(), options.server_addr);
assert_eq!("127.0.0.1:2380".to_string(), options.store_addr);
assert_eq!(30, options.datanode_lease_secs);
}
}

View File

@@ -5,6 +5,8 @@ use api::v1::meta::Peer;
use common_grpc::channel_manager::ChannelConfig;
use common_grpc::channel_manager::ChannelManager;
use meta_client::client::MetaClientBuilder;
use meta_client::rpc::BatchPutRequest;
use meta_client::rpc::CompareAndPutRequest;
use meta_client::rpc::CreateRequest;
use meta_client::rpc::DeleteRangeRequest;
use meta_client::rpc::Partition;
@@ -90,10 +92,40 @@ async fn run() {
event!(Level::INFO, "put result: {:#?}", res);
// get
let range = RangeRequest::new().with_key(b"key2".to_vec());
let range = RangeRequest::new().with_key(b"key1".to_vec());
let res = meta_client.range(range.clone()).await.unwrap();
event!(Level::INFO, "get range result: {:#?}", res);
// get prefix
let range2 = RangeRequest::new().with_prefix(b"key1".to_vec());
let res = meta_client.range(range2.clone()).await.unwrap();
event!(Level::INFO, "get prefix result: {:#?}", res);
// batch put
let batch_put = BatchPutRequest::new()
.add_kv(b"batch_put1".to_vec(), b"batch_put_v1".to_vec())
.add_kv(b"batch_put2".to_vec(), b"batch_put_v2".to_vec())
.with_prev_kv();
let res = meta_client.batch_put(batch_put).await.unwrap();
event!(Level::INFO, "batch put result: {:#?}", res);
// cas
let cas = CompareAndPutRequest::new()
.with_key(b"batch_put1".to_vec())
.with_expect(b"batch_put_v_fail".to_vec())
.with_value(b"batch_put_v111".to_vec());
let res = meta_client.compare_and_put(cas).await.unwrap();
event!(Level::INFO, "cas 0 result: {:#?}", res);
let cas = CompareAndPutRequest::new()
.with_key(b"batch_put1".to_vec())
.with_expect(b"batch_put_v1".to_vec())
.with_value(b"batch_put_v111".to_vec());
let res = meta_client.compare_and_put(cas).await.unwrap();
event!(Level::INFO, "cas 1 result: {:#?}", res);
// delete
let delete_range = DeleteRangeRequest::new().with_key(b"key1".to_vec());
let res = meta_client.delete_range(delete_range).await.unwrap();

View File

@@ -15,6 +15,10 @@ use self::heartbeat::HeartbeatSender;
use self::heartbeat::HeartbeatStream;
use crate::error;
use crate::error::Result;
use crate::rpc::BatchPutRequest;
use crate::rpc::BatchPutResponse;
use crate::rpc::CompareAndPutRequest;
use crate::rpc::CompareAndPutResponse;
use crate::rpc::CreateRequest;
use crate::rpc::DeleteRangeRequest;
use crate::rpc::DeleteRangeResponse;
@@ -217,8 +221,8 @@ impl MetaClient {
name: "store_client",
})?
.range(req.into())
.await
.map(Into::into)
.await?
.try_into()
}
/// Put puts the given key into the key-value store.
@@ -228,8 +232,34 @@ impl MetaClient {
name: "store_client",
})?
.put(req.into())
.await
.map(Into::into)
.await?
.try_into()
}
/// BatchPut atomically puts the given keys into the key-value store.
pub async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
self.store_client()
.context(error::NotStartedSnafu {
name: "store_client",
})?
.batch_put(req.into())
.await?
.try_into()
}
/// CompareAndPut atomically puts the value to the given updated
/// value if the current value == the expected value.
pub async fn compare_and_put(
&self,
req: CompareAndPutRequest,
) -> Result<CompareAndPutResponse> {
self.store_client()
.context(error::NotStartedSnafu {
name: "store_client",
})?
.compare_and_put(req.into())
.await?
.try_into()
}
/// DeleteRange deletes the given range from the key-value store.
@@ -239,8 +269,8 @@ impl MetaClient {
name: "store_client",
})?
.delete_range(req.into())
.await
.map(Into::into)
.await?
.try_into()
}
#[inline]
@@ -318,49 +348,38 @@ mod tests {
#[tokio::test]
async fn test_not_start_heartbeat_client() {
let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
let mut meta_client = MetaClientBuilder::new(0, 0)
.enable_router()
.enable_store()
.build();
meta_client.start(urls).await.unwrap();
let res = meta_client.ask_leader().await;
assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
}
#[tokio::test]
async fn test_not_start_router_client() {
let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
let mut meta_client = MetaClientBuilder::new(0, 0)
.enable_heartbeat()
.enable_store()
.build();
meta_client.start(urls).await.unwrap();
let req = CreateRequest::new(TableName::new("c", "s", "t"));
let res = meta_client.create_route(req).await;
assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
}
#[tokio::test]
async fn test_not_start_store_client() {
let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
let mut meta_client = MetaClientBuilder::new(0, 0)
.enable_heartbeat()
.enable_router()
.build();
meta_client.start(urls).await.unwrap();
let res = meta_client.put(PutRequest::default()).await;
assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
}

View File

@@ -2,10 +2,10 @@ use std::collections::HashSet;
use std::sync::Arc;
use api::v1::meta::heartbeat_client::HeartbeatClient;
use api::v1::meta::request_header;
use api::v1::meta::AskLeaderRequest;
use api::v1::meta::HeartbeatRequest;
use api::v1::meta::HeartbeatResponse;
use api::v1::meta::RequestHeader;
use common_grpc::channel_manager::ChannelManager;
use common_telemetry::debug;
use common_telemetry::info;
@@ -40,7 +40,7 @@ impl HeartbeatSender {
#[inline]
pub async fn send(&self, mut req: HeartbeatRequest) -> Result<()> {
req.header = request_header(self.id);
req.header = RequestHeader::new(self.id);
self.sender.send(req).await.map_err(|e| {
error::SendHeartbeatSnafu {
err_msg: e.to_string(),
@@ -154,7 +154,7 @@ impl Inner {
}
);
let header = request_header(self.id);
let header = RequestHeader::new(self.id);
let mut leader = None;
for addr in &self.peers {
let req = AskLeaderRequest {
@@ -182,9 +182,8 @@ impl Inner {
let mut leader = self.make_client(leader)?;
let (sender, receiver) = mpsc::channel::<HeartbeatRequest>(128);
let header = request_header(self.id);
let handshake = HeartbeatRequest {
header,
header: RequestHeader::new(self.id),
..Default::default()
};
sender.send(handshake).await.map_err(|e| {
@@ -236,14 +235,11 @@ mod test {
#[tokio::test]
async fn test_start_client() {
let mut client = Client::new((0, 0), ChannelManager::default());
assert!(!client.is_started().await);
client
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
.await
.unwrap();
assert!(client.is_started().await);
}
@@ -254,13 +250,9 @@ mod test {
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
.await
.unwrap();
assert!(client.is_started().await);
let res = client.start(&["127.0.0.1:1002"]).await;
assert!(res.is_err());
assert!(matches!(
res.err(),
Some(error::Error::IllegalGrpcClientState { .. })
@@ -274,48 +266,18 @@ mod test {
.start(&["127.0.0.1:1000", "127.0.0.1:1000", "127.0.0.1:1000"])
.await
.unwrap();
assert_eq!(1, client.inner.write().await.peers.len());
}
#[tokio::test]
async fn test_ask_leader_unavailable() {
let mut client = Client::new((0, 0), ChannelManager::default());
client.start(&["unavailable_peer"]).await.unwrap();
let res = client.ask_leader().await;
assert!(res.is_err());
let err = res.err().unwrap();
assert!(matches!(err, error::Error::AskLeader { .. }));
}
#[tokio::test]
async fn test_heartbeat_unavailable() {
let mut client = Client::new((0, 0), ChannelManager::default());
client.start(&["unavailable_peer"]).await.unwrap();
client.inner.write().await.leader = Some("unavailable".to_string());
let res = client.heartbeat().await;
assert!(res.is_err());
let err = res.err().unwrap();
assert!(matches!(err, error::Error::TonicStatus { .. }));
}
#[tokio::test]
async fn test_heartbeat_stream() {
let (sender, mut receiver) = mpsc::channel::<HeartbeatRequest>(100);
let sender = HeartbeatSender::new((8, 8), sender);
tokio::spawn(async move {
for _ in 0..10 {
sender.send(HeartbeatRequest::default()).await.unwrap();
}
});
while let Some(req) = receiver.recv().await {
let header = req.header.unwrap();
assert_eq!(8, header.cluster_id);

View File

@@ -1,9 +1,9 @@
use std::collections::HashSet;
use std::sync::Arc;
use api::v1::meta::request_header;
use api::v1::meta::router_client::RouterClient;
use api::v1::meta::CreateRequest;
use api::v1::meta::RequestHeader;
use api::v1::meta::RouteRequest;
use api::v1::meta::RouteResponse;
use common_grpc::channel_manager::ChannelManager;
@@ -92,7 +92,7 @@ impl Inner {
async fn route(&self, mut req: RouteRequest) -> Result<RouteResponse> {
let mut client = self.random_client()?;
req.header = request_header(self.id);
req.header = RequestHeader::new(self.id);
let res = client.route(req).await.context(error::TonicStatusSnafu)?;
Ok(res.into_inner())
@@ -100,7 +100,7 @@ impl Inner {
async fn create(&self, mut req: CreateRequest) -> Result<RouteResponse> {
let mut client = self.random_client()?;
req.header = request_header(self.id);
req.header = RequestHeader::new(self.id);
let res = client.create(req).await.context(error::TonicStatusSnafu)?;
Ok(res.into_inner())
@@ -139,14 +139,11 @@ mod test {
#[tokio::test]
async fn test_start_client() {
let mut client = Client::new((0, 0), ChannelManager::default());
assert!(!client.is_started().await);
client
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
.await
.unwrap();
assert!(client.is_started().await);
}
@@ -157,13 +154,9 @@ mod test {
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
.await
.unwrap();
assert!(client.is_started().await);
let res = client.start(&["127.0.0.1:1002"]).await;
assert!(res.is_err());
assert!(matches!(
res.err(),
Some(error::Error::IllegalGrpcClientState { .. })
@@ -180,42 +173,4 @@ mod test {
assert_eq!(1, client.inner.write().await.peers.len());
}
#[tokio::test]
async fn test_create_unavailable() {
let mut client = Client::new((0, 0), ChannelManager::default());
client.start(&["unavailable_peer"]).await.unwrap();
let req = CreateRequest {
header: request_header((0, 0)),
..Default::default()
};
let res = client.create(req).await;
assert!(res.is_err());
let err = res.err().unwrap();
assert!(
matches!(err, error::Error::TonicStatus { source, .. } if source.code() == tonic::Code::Unavailable)
);
}
#[tokio::test]
async fn test_route_unavailable() {
let mut client = Client::new((0, 0), ChannelManager::default());
client.start(&["unavailable_peer"]).await.unwrap();
let req = RouteRequest {
header: request_header((0, 0)),
..Default::default()
};
let res = client.route(req).await;
assert!(res.is_err());
let err = res.err().unwrap();
assert!(
matches!(err, error::Error::TonicStatus { source, .. } if source.code() == tonic::Code::Unavailable)
);
}
}

View File

@@ -1,14 +1,18 @@
use std::collections::HashSet;
use std::sync::Arc;
use api::v1::meta::request_header;
use api::v1::meta::store_client::StoreClient;
use api::v1::meta::BatchPutRequest;
use api::v1::meta::BatchPutResponse;
use api::v1::meta::CompareAndPutRequest;
use api::v1::meta::CompareAndPutResponse;
use api::v1::meta::DeleteRangeRequest;
use api::v1::meta::DeleteRangeResponse;
use api::v1::meta::PutRequest;
use api::v1::meta::PutResponse;
use api::v1::meta::RangeRequest;
use api::v1::meta::RangeResponse;
use api::v1::meta::RequestHeader;
use common_grpc::channel_manager::ChannelManager;
use snafu::ensure;
use snafu::OptionExt;
@@ -61,6 +65,19 @@ impl Client {
inner.put(req).await
}
pub async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
let inner = self.inner.read().await;
inner.batch_put(req).await
}
pub async fn compare_and_put(
&self,
req: CompareAndPutRequest,
) -> Result<CompareAndPutResponse> {
let inner = self.inner.read().await;
inner.compare_and_put(req).await
}
pub async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
let inner = self.inner.read().await;
inner.delete_range(req).await
@@ -100,7 +117,7 @@ impl Inner {
async fn range(&self, mut req: RangeRequest) -> Result<RangeResponse> {
let mut client = self.random_client()?;
req.header = request_header(self.id);
req.header = RequestHeader::new(self.id);
let res = client.range(req).await.context(error::TonicStatusSnafu)?;
Ok(res.into_inner())
@@ -108,15 +125,40 @@ impl Inner {
async fn put(&self, mut req: PutRequest) -> Result<PutResponse> {
let mut client = self.random_client()?;
req.header = request_header(self.id);
req.header = RequestHeader::new(self.id);
let res = client.put(req).await.context(error::TonicStatusSnafu)?;
Ok(res.into_inner())
}
async fn batch_put(&self, mut req: BatchPutRequest) -> Result<BatchPutResponse> {
let mut client = self.random_client()?;
req.header = RequestHeader::new(self.id);
let res = client
.batch_put(req)
.await
.context(error::TonicStatusSnafu)?;
Ok(res.into_inner())
}
async fn compare_and_put(
&self,
mut req: CompareAndPutRequest,
) -> Result<CompareAndPutResponse> {
let mut client = self.random_client()?;
req.header = RequestHeader::new(self.id);
let res = client
.compare_and_put(req)
.await
.context(error::TonicStatusSnafu)?;
Ok(res.into_inner())
}
async fn delete_range(&self, mut req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
let mut client = self.random_client()?;
req.header = request_header(self.id);
req.header = RequestHeader::new(self.id);
let res = client
.delete_range(req)
.await
@@ -158,14 +200,11 @@ mod test {
#[tokio::test]
async fn test_start_client() {
let mut client = Client::new((0, 0), ChannelManager::default());
assert!(!client.is_started().await);
client
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
.await
.unwrap();
assert!(client.is_started().await);
}
@@ -176,13 +215,9 @@ mod test {
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
.await
.unwrap();
assert!(client.is_started().await);
let res = client.start(&["127.0.0.1:1002"]).await;
assert!(res.is_err());
assert!(matches!(
res.err(),
Some(error::Error::IllegalGrpcClientState { .. })
@@ -196,66 +231,6 @@ mod test {
.start(&["127.0.0.1:1000", "127.0.0.1:1000", "127.0.0.1:1000"])
.await
.unwrap();
assert_eq!(1, client.inner.write().await.peers.len());
}
#[tokio::test]
async fn test_range_unavailable() {
let mut client = Client::new((0, 0), ChannelManager::default());
client.start(&["unknow_peer"]).await.unwrap();
let req = RangeRequest {
key: b"key1".to_vec(),
..Default::default()
};
let res = client.range(req).await;
assert!(res.is_err());
let err = res.err().unwrap();
assert!(
matches!(err, error::Error::TonicStatus { source, .. } if source.code() == tonic::Code::Unavailable)
);
}
#[tokio::test]
async fn test_put_unavailable() {
let mut client = Client::new((0, 0), ChannelManager::default());
client.start(&["unavailable_peer"]).await.unwrap();
let req = PutRequest {
key: b"key1".to_vec(),
value: b"value1".to_vec(),
prev_kv: true,
..Default::default()
};
let res = client.put(req).await;
assert!(res.is_err());
let err = res.err().unwrap();
assert!(
matches!(err, error::Error::TonicStatus { source, .. } if source.code() == tonic::Code::Unavailable)
);
}
#[tokio::test]
async fn test_delete_range_unavailable() {
let mut client = Client::new((0, 0), ChannelManager::default());
client.start(&["unavailable_peer"]).await.unwrap();
let req = DeleteRangeRequest {
key: b"key1".to_vec(),
..Default::default()
};
let res = client.delete_range(req).await;
assert!(res.is_err());
let err = res.err().unwrap();
assert!(
matches!(err, error::Error::TonicStatus { source, .. } if source.code() == tonic::Code::Unavailable)
);
}
}

View File

@@ -51,6 +51,13 @@ pub enum Error {
err_msg: String,
backtrace: Backtrace,
},
#[snafu(display("Illegal state from server, code: {}, error: {}", code, err_msg))]
IllegalServerState {
code: i32,
err_msg: String,
backtrace: Backtrace,
},
}
#[allow(dead_code)]
@@ -75,7 +82,8 @@ impl ErrorExt for Error {
| Error::NotStarted { .. }
| Error::SendHeartbeat { .. }
| Error::CreateHeartbeatStream { .. }
| Error::CreateChannel { .. } => StatusCode::Internal,
| Error::CreateChannel { .. }
| Error::IllegalServerState { .. } => StatusCode::Internal,
Error::RouteInfoCorrupted { .. } => StatusCode::Unexpected,
}
}
@@ -96,12 +104,10 @@ mod tests {
fn throw_tonic_error() -> StdResult<tonic::transport::Error> {
tonic::transport::Endpoint::new("http//http").map(|_| ())
}
let e = throw_tonic_error()
.context(ConnectFailedSnafu { url: "" })
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Internal);
}
@@ -112,7 +118,6 @@ mod tests {
.context(IllegalGrpcClientStateSnafu { err_msg: "" })
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Internal);
}
@@ -122,12 +127,10 @@ mod tests {
fn throw_tonic_status_error() -> StdResult<tonic::Status> {
Err(tonic::Status::new(tonic::Code::Aborted, ""))
}
let e = throw_tonic_status_error()
.context(TonicStatusSnafu)
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Internal);
}
@@ -135,7 +138,6 @@ mod tests {
#[test]
fn test_ask_leader_error() {
let e = throw_none_option().context(AskLeaderSnafu).err().unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Internal);
}
@@ -143,7 +145,6 @@ mod tests {
#[test]
fn test_no_leader_error() {
let e = throw_none_option().context(NoLeaderSnafu).err().unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Internal);
}
@@ -155,12 +156,20 @@ mod tests {
.map(|_| ())
.context(common_grpc::error::CreateChannelSnafu)
}
let e = throw_common_grpc_error()
.context(CreateChannelSnafu)
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Internal);
}
#[test]
fn test_not_started_error() {
let e = throw_none_option()
.context(NotStartedSnafu { name: "" })
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Internal);
}
@@ -171,7 +180,6 @@ mod tests {
.context(SendHeartbeatSnafu { err_msg: "" })
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Internal);
}
@@ -197,4 +205,18 @@ mod tests {
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Unexpected);
}
#[test]
fn test_illegal_server_state_error() {
let e = throw_none_option()
.context(IllegalServerStateSnafu {
code: 1,
err_msg: "",
})
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Internal);
}
}

View File

@@ -1,5 +1,6 @@
mod router;
mod store;
mod util;
use api::v1::meta::KeyValue as PbKeyValue;
use api::v1::meta::Peer as PbPeer;
@@ -12,6 +13,10 @@ pub use router::RouteRequest;
pub use router::RouteResponse;
pub use router::Table;
pub use router::TableRoute;
pub use store::BatchPutRequest;
pub use store::BatchPutResponse;
pub use store::CompareAndPutRequest;
pub use store::CompareAndPutResponse;
pub use store::DeleteRangeRequest;
pub use store::DeleteRangeResponse;
pub use store::PutRequest;

View File

@@ -8,6 +8,7 @@ use api::v1::meta::RouteResponse as PbRouteResponse;
use api::v1::meta::Table as PbTable;
use snafu::OptionExt;
use super::util;
use super::Peer;
use super::TableName;
use crate::error;
@@ -83,6 +84,8 @@ impl TryFrom<PbRouteResponse> for RouteResponse {
type Error = error::Error;
fn try_from(pb: PbRouteResponse) -> Result<Self> {
util::check_response_header(pb.header.as_ref())?;
let peers: Vec<Peer> = pb.peers.into_iter().map(Into::into).collect();
let get_peer = |index: u64| peers.get(index as usize).map(ToOwned::to_owned);
let mut table_routes = Vec::with_capacity(pb.table_routes.len());

View File

@@ -1,12 +1,20 @@
use api::v1::meta::BatchPutRequest as PbBatchPutRequest;
use api::v1::meta::BatchPutResponse as PbBatchPutResponse;
use api::v1::meta::CompareAndPutRequest as PbCompareAndPutRequest;
use api::v1::meta::CompareAndPutResponse as PbCompareAndPutResponse;
use api::v1::meta::DeleteRangeRequest as PbDeleteRangeRequest;
use api::v1::meta::DeleteRangeResponse as PbDeleteRangeResponse;
use api::v1::meta::KeyValue as PbKeyValue;
use api::v1::meta::PutRequest as PbPutRequest;
use api::v1::meta::PutResponse as PbPutResponse;
use api::v1::meta::RangeRequest as PbRangeRequest;
use api::v1::meta::RangeResponse as PbRangeResponse;
use super::util;
use super::KeyValue;
use super::ResponseHeader;
use crate::error;
use crate::error::Result;
#[derive(Debug, Clone, Default)]
pub struct RangeRequest {
@@ -58,6 +66,9 @@ impl RangeRequest {
self
}
/// key is the first key for the range, If range_end is not given, the
/// request only looks up key.
///
/// range_end is the upper bound on the requested range [key, range_end).
/// If range_end is '\0', the range is all keys >= key.
/// If range_end is key plus one (e.g., "aa"+1 == "ab", "a\xff"+1 == "b"),
@@ -65,11 +76,21 @@ impl RangeRequest {
/// If both key and range_end are '\0', then the range request returns all
/// keys.
#[inline]
pub fn with_range_end(mut self, range_end: impl Into<Vec<u8>>) -> Self {
pub fn with_range(mut self, key: impl Into<Vec<u8>>, range_end: impl Into<Vec<u8>>) -> Self {
self.key = key.into();
self.range_end = range_end.into();
self
}
/// Gets all keys prefixed with key.
/// range_end is the key plus one (e.g., "aa"+1 == "ab", "a\xff"+1 == "b"),
#[inline]
pub fn with_prefix(mut self, key: impl Into<Vec<u8>>) -> Self {
self.key = key.into();
self.range_end = util::get_prefix_end_key(&self.key);
self
}
/// limit is a limit on the number of keys returned for the request. When
/// limit is set to 0, it is treated as no limit.
#[inline]
@@ -89,9 +110,13 @@ impl RangeRequest {
#[derive(Debug, Clone)]
pub struct RangeResponse(PbRangeResponse);
impl From<PbRangeResponse> for RangeResponse {
fn from(res: PbRangeResponse) -> Self {
Self::new(res)
impl TryFrom<PbRangeResponse> for RangeResponse {
type Error = error::Error;
fn try_from(pb: PbRangeResponse) -> Result<Self> {
util::check_response_header(pb.header.as_ref())?;
Ok(Self::new(pb))
}
}
@@ -177,9 +202,13 @@ impl PutRequest {
#[derive(Debug, Clone)]
pub struct PutResponse(PbPutResponse);
impl From<PbPutResponse> for PutResponse {
fn from(res: PbPutResponse) -> Self {
Self::new(res)
impl TryFrom<PbPutResponse> for PutResponse {
type Error = error::Error;
fn try_from(pb: PbPutResponse) -> Result<Self> {
util::check_response_header(pb.header.as_ref())?;
Ok(Self::new(pb))
}
}
@@ -200,6 +229,170 @@ impl PutResponse {
}
}
#[derive(Debug, Clone, Default)]
pub struct BatchPutRequest {
pub kvs: Vec<PbKeyValue>,
/// If prev_kv is set, gets the previous key-value pairs before changing it.
/// The previous key-value pairs will be returned in the batch put response.
pub prev_kv: bool,
}
impl From<BatchPutRequest> for PbBatchPutRequest {
fn from(req: BatchPutRequest) -> Self {
Self {
header: None,
kvs: req.kvs,
prev_kv: req.prev_kv,
}
}
}
impl BatchPutRequest {
#[inline]
pub fn new() -> Self {
Self {
kvs: vec![],
prev_kv: false,
}
}
#[inline]
pub fn add_kv(mut self, key: impl Into<Vec<u8>>, value: impl Into<Vec<u8>>) -> Self {
self.kvs.push(PbKeyValue {
key: key.into(),
value: value.into(),
});
self
}
/// If prev_kv is set, gets the previous key-value pair before changing it.
/// The previous key-value pair will be returned in the put response.
#[inline]
pub fn with_prev_kv(mut self) -> Self {
self.prev_kv = true;
self
}
}
#[derive(Debug, Clone)]
pub struct BatchPutResponse(PbBatchPutResponse);
impl TryFrom<PbBatchPutResponse> for BatchPutResponse {
type Error = error::Error;
fn try_from(pb: PbBatchPutResponse) -> Result<Self> {
util::check_response_header(pb.header.as_ref())?;
Ok(Self::new(pb))
}
}
impl BatchPutResponse {
#[inline]
pub fn new(res: PbBatchPutResponse) -> Self {
Self(res)
}
#[inline]
pub fn take_header(&mut self) -> Option<ResponseHeader> {
self.0.header.take().map(ResponseHeader::new)
}
#[inline]
pub fn take_prev_kvs(&mut self) -> Vec<KeyValue> {
self.0.prev_kvs.drain(..).map(KeyValue::new).collect()
}
}
#[derive(Debug, Clone, Default)]
pub struct CompareAndPutRequest {
/// key is the key, in bytes, to put into the key-value store.
pub key: Vec<u8>,
pub expect: Vec<u8>,
/// value is the value, in bytes, to associate with the key in the
/// key-value store.
pub value: Vec<u8>,
}
impl From<CompareAndPutRequest> for PbCompareAndPutRequest {
fn from(req: CompareAndPutRequest) -> Self {
Self {
header: None,
key: req.key,
expect: req.expect,
value: req.value,
}
}
}
impl CompareAndPutRequest {
#[inline]
pub fn new() -> Self {
Self {
key: vec![],
expect: vec![],
value: vec![],
}
}
/// key is the key, in bytes, to put into the key-value store.
#[inline]
pub fn with_key(mut self, key: impl Into<Vec<u8>>) -> Self {
self.key = key.into();
self
}
/// expect is the previous value, in bytes
#[inline]
pub fn with_expect(mut self, expect: impl Into<Vec<u8>>) -> Self {
self.expect = expect.into();
self
}
/// value is the value, in bytes, to associate with the key in the
/// key-value store.
#[inline]
pub fn with_value(mut self, value: impl Into<Vec<u8>>) -> Self {
self.value = value.into();
self
}
}
#[derive(Debug, Clone)]
pub struct CompareAndPutResponse(PbCompareAndPutResponse);
impl TryFrom<PbCompareAndPutResponse> for CompareAndPutResponse {
type Error = error::Error;
fn try_from(pb: PbCompareAndPutResponse) -> Result<Self> {
util::check_response_header(pb.header.as_ref())?;
Ok(Self::new(pb))
}
}
impl CompareAndPutResponse {
#[inline]
pub fn new(res: PbCompareAndPutResponse) -> Self {
Self(res)
}
#[inline]
pub fn take_header(&mut self) -> Option<ResponseHeader> {
self.0.header.take().map(ResponseHeader::new)
}
#[inline]
pub fn is_success(&self) -> bool {
self.0.success
}
#[inline]
pub fn take_prev_kv(&mut self) -> Option<KeyValue> {
self.0.prev_kv.take().map(KeyValue::new)
}
}
#[derive(Debug, Clone, Default)]
pub struct DeleteRangeRequest {
/// key is the first key to delete in the range.
@@ -242,13 +435,16 @@ impl DeleteRangeRequest {
}
}
/// key is the first key to delete in the range.
/// key is the first key to delete in the range. If range_end is not given,
/// the range is defined to contain only the key argument.
#[inline]
pub fn with_key(mut self, key: impl Into<Vec<u8>>) -> Self {
self.key = key.into();
self
}
/// key is the first key to delete in the range.
///
/// range_end is the key following the last key to delete for the range
/// [key, range_end).
/// If range_end is not given, the range is defined to contain only the key
@@ -258,11 +454,21 @@ impl DeleteRangeRequest {
/// If range_end is '\0', the range is all keys greater than or equal to the
/// key argument.
#[inline]
pub fn with_range_end(mut self, range_end: impl Into<Vec<u8>>) -> Self {
pub fn with_range(mut self, key: impl Into<Vec<u8>>, range_end: impl Into<Vec<u8>>) -> Self {
self.key = key.into();
self.range_end = range_end.into();
self
}
/// Deletes all keys prefixed with key.
/// range_end is one bit larger than the given key.
#[inline]
pub fn with_prefix(mut self, key: impl Into<Vec<u8>>) -> Self {
self.key = key.into();
self.range_end = util::get_prefix_end_key(&self.key);
self
}
/// If prev_kv is set, gets the previous key-value pairs before deleting it.
/// The previous key-value pairs will be returned in the delete response.
#[inline]
@@ -275,9 +481,13 @@ impl DeleteRangeRequest {
#[derive(Debug, Clone)]
pub struct DeleteRangeResponse(PbDeleteRangeResponse);
impl From<PbDeleteRangeResponse> for DeleteRangeResponse {
fn from(res: PbDeleteRangeResponse) -> Self {
Self::new(res)
impl TryFrom<PbDeleteRangeResponse> for DeleteRangeResponse {
type Error = error::Error;
fn try_from(pb: PbDeleteRangeResponse) -> Result<Self> {
util::check_response_header(pb.header.as_ref())?;
Ok(Self::new(pb))
}
}
@@ -304,6 +514,10 @@ impl DeleteRangeResponse {
#[cfg(test)]
mod tests {
use api::v1::meta::BatchPutRequest as PbBatchPutRequest;
use api::v1::meta::BatchPutResponse as PbBatchPutResponse;
use api::v1::meta::CompareAndPutRequest as PbCompareAndPutRequest;
use api::v1::meta::CompareAndPutResponse as PbCompareAndPutResponse;
use api::v1::meta::DeleteRangeRequest as PbDeleteRangeRequest;
use api::v1::meta::DeleteRangeResponse as PbDeleteRangeResponse;
use api::v1::meta::KeyValue as PbKeyValue;
@@ -319,8 +533,7 @@ mod tests {
let (key, range_end, limit) = (b"test_key1".to_vec(), b"test_range_end1".to_vec(), 1);
let req = RangeRequest::new()
.with_key(key.clone())
.with_range_end(range_end.clone())
.with_range(key.clone(), range_end.clone())
.with_limit(limit)
.with_keys_only();
@@ -332,6 +545,23 @@ mod tests {
assert!(into_req.keys_only);
}
#[test]
fn test_prefix_request_trans() {
let (key, limit) = (b"test_key1".to_vec(), 1);
let req = RangeRequest::new()
.with_prefix(key.clone())
.with_limit(limit)
.with_keys_only();
let into_req: PbRangeRequest = req.into();
assert!(into_req.header.is_none());
assert_eq!(key, into_req.key);
assert_eq!(b"test_key2".to_vec(), into_req.range_end);
assert_eq!(limit, into_req.limit);
assert!(into_req.keys_only);
}
#[test]
fn test_range_response_trans() {
let pb_res = PbRangeResponse {
@@ -400,13 +630,88 @@ mod tests {
assert_eq!(b"v1".to_vec(), kv.take_value());
}
#[test]
fn test_batch_put_request_trans() {
let req = BatchPutRequest::new()
.add_kv(b"test_key1".to_vec(), b"test_value1".to_vec())
.add_kv(b"test_key2".to_vec(), b"test_value2".to_vec())
.add_kv(b"test_key3".to_vec(), b"test_value3".to_vec())
.with_prev_kv();
let into_req: PbBatchPutRequest = req.into();
assert!(into_req.header.is_none());
assert_eq!(b"test_key1".to_vec(), into_req.kvs.get(0).unwrap().key);
assert_eq!(b"test_key2".to_vec(), into_req.kvs.get(1).unwrap().key);
assert_eq!(b"test_key3".to_vec(), into_req.kvs.get(2).unwrap().key);
assert_eq!(b"test_value1".to_vec(), into_req.kvs.get(0).unwrap().value);
assert_eq!(b"test_value2".to_vec(), into_req.kvs.get(1).unwrap().value);
assert_eq!(b"test_value3".to_vec(), into_req.kvs.get(2).unwrap().value);
assert!(into_req.prev_kv);
}
#[test]
fn test_batch_put_response_trans() {
let pb_res = PbBatchPutResponse {
header: None,
prev_kvs: vec![PbKeyValue {
key: b"k1".to_vec(),
value: b"v1".to_vec(),
}],
};
let mut res = BatchPutResponse::new(pb_res);
assert!(res.take_header().is_none());
let kvs = res.take_prev_kvs();
assert_eq!(b"k1".to_vec(), kvs[0].key().to_vec());
assert_eq!(b"v1".to_vec(), kvs[0].value().to_vec());
}
#[test]
fn test_compare_and_put_request_trans() {
let (key, expect, value) = (
b"test_key1".to_vec(),
b"test_expect1".to_vec(),
b"test_value1".to_vec(),
);
let req = CompareAndPutRequest::new()
.with_key(key.clone())
.with_expect(expect.clone())
.with_value(value.clone());
let into_req: PbCompareAndPutRequest = req.into();
assert!(into_req.header.is_none());
assert_eq!(key, into_req.key);
assert_eq!(expect, into_req.expect);
assert_eq!(value, into_req.value);
}
#[test]
fn test_compare_and_put_response_trans() {
let pb_res = PbCompareAndPutResponse {
header: None,
success: true,
prev_kv: Some(PbKeyValue {
key: b"k1".to_vec(),
value: b"v1".to_vec(),
}),
};
let mut res = CompareAndPutResponse::new(pb_res);
assert!(res.take_header().is_none());
let mut kv = res.take_prev_kv().unwrap();
assert_eq!(b"k1".to_vec(), kv.key().to_vec());
assert_eq!(b"k1".to_vec(), kv.take_key());
assert_eq!(b"v1".to_vec(), kv.value().to_vec());
assert_eq!(b"v1".to_vec(), kv.take_value());
}
#[test]
fn test_delete_range_request_trans() {
let (key, range_end) = (b"test_key1".to_vec(), b"test_range_end1".to_vec());
let req = DeleteRangeRequest::new()
.with_key(key.clone())
.with_range_end(range_end.clone())
.with_range(key.clone(), range_end.clone())
.with_prev_kv();
let into_req: PbDeleteRangeRequest = req.into();
@@ -416,6 +721,21 @@ mod tests {
assert!(into_req.prev_kv);
}
#[test]
fn test_delete_prefix_request_trans() {
let key = b"test_key1".to_vec();
let req = DeleteRangeRequest::new()
.with_prefix(key.clone())
.with_prev_kv();
let into_req: PbDeleteRangeRequest = req.into();
assert!(into_req.header.is_none());
assert_eq!(key, into_req.key);
assert_eq!(b"test_key2".to_vec(), into_req.range_end);
assert!(into_req.prev_kv);
}
#[test]
fn test_delete_range_response_trans() {
let pb_res = PbDeleteRangeResponse {
@@ -433,7 +753,7 @@ mod tests {
],
};
let mut res: DeleteRangeResponse = pb_res.into();
let mut res: DeleteRangeResponse = pb_res.try_into().unwrap();
assert!(res.take_header().is_none());
assert_eq!(2, res.deleted());
let mut kvs = res.take_prev_kvs();

View File

@@ -0,0 +1,32 @@
use api::v1::meta::ResponseHeader;
use crate::error;
use crate::error::Result;
#[inline]
pub(crate) fn check_response_header(header: Option<&ResponseHeader>) -> Result<()> {
if let Some(header) = header {
if let Some(ref error) = header.error {
let code = error.code;
let err_msg = &error.err_msg;
return error::IllegalServerStateSnafu { code, err_msg }.fail();
}
}
Ok(())
}
/// Get prefix end key of `key`.
#[inline]
pub fn get_prefix_end_key(key: &[u8]) -> Vec<u8> {
for (i, v) in key.iter().enumerate().rev() {
if *v < 0xFF {
let mut end = Vec::from(&key[..=i]);
end[i] = *v + 1;
return end;
}
}
// next prefix does not exist (e.g., 0xffff);
vec![0]
}

View File

@@ -10,11 +10,15 @@ common-base = { path = "../common/base" }
common-error = { path = "../common/error" }
common-runtime = { path = "../common/runtime" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
etcd-client = "0.10"
futures = "0.3"
h2 = "0.3"
http-body = "0.4"
lazy_static = "1.4"
regex = "1.6"
serde = "1.0"
serde_json = "1.0"
snafu = { version = "0.7", features = ["backtraces"] }
tokio = { version = "1.0", features = ["full"] }
tokio-stream = { version = "0.1", features = ["net"] }

View File

@@ -37,6 +37,48 @@ pub enum Error {
#[snafu(display("Empty table name"))]
EmptyTableName { backtrace: Backtrace },
#[snafu(display("Invalid datanode lease key: {}", key))]
InvalidLeaseKey { key: String, backtrace: Backtrace },
#[snafu(display("Failed to parse datanode lease key from utf8: {}", source))]
LeaseKeyFromUtf8 {
source: std::string::FromUtf8Error,
backtrace: Backtrace,
},
#[snafu(display("Failed to serialize to json: {}", input))]
SerializeToJson {
input: String,
source: serde_json::error::Error,
backtrace: Backtrace,
},
#[snafu(display("Failed to deserialize from json: {}", input))]
DeserializeFromJson {
input: String,
source: serde_json::error::Error,
backtrace: Backtrace,
},
#[snafu(display("Failed to parse number: {}, source: {}", err_msg, source))]
ParseNum {
err_msg: String,
source: std::num::ParseIntError,
backtrace: Backtrace,
},
#[snafu(display("Invalid arguments: {}", err_msg))]
InvalidArguments {
err_msg: String,
backtrace: Backtrace,
},
#[snafu(display("Invalid result with a txn response: {}", err_msg))]
InvalidTxnResult {
err_msg: String,
backtrace: Backtrace,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -62,8 +104,17 @@ impl ErrorExt for Error {
| Error::EtcdFailed { .. }
| Error::ConnectEtcd { .. }
| Error::TcpBind { .. }
| Error::SerializeToJson { .. }
| Error::DeserializeFromJson { .. }
| Error::StartGrpc { .. } => StatusCode::Internal,
Error::EmptyKey { .. } | Error::EmptyTableName { .. } => StatusCode::InvalidArguments,
Error::EmptyKey { .. }
| Error::EmptyTableName { .. }
| Error::InvalidLeaseKey { .. }
| Error::ParseNum { .. }
| Error::InvalidArguments { .. } => StatusCode::InvalidArguments,
Error::LeaseKeyFromUtf8 { .. } | Error::InvalidTxnResult { .. } => {
StatusCode::Unexpected
}
}
}
}
@@ -106,10 +157,13 @@ mod tests {
Err(etcd_client::Error::InvalidArgs("".to_string()))
}
fn throw_serde_json_error() -> StdResult<serde_json::error::Error> {
serde_json::from_str("invalid json")
}
#[test]
fn test_stream_node_error() {
let e = throw_none_option().context(StreamNoneSnafu).err().unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Internal);
}
@@ -117,7 +171,6 @@ mod tests {
#[test]
fn test_empty_key_error() {
let e = throw_none_option().context(EmptyKeySnafu).err().unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::InvalidArguments);
}
@@ -128,7 +181,6 @@ mod tests {
.context(EtcdFailedSnafu)
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Internal);
}
@@ -139,7 +191,6 @@ mod tests {
.context(ConnectEtcdSnafu)
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Internal);
}
@@ -153,7 +204,6 @@ mod tests {
.context(TcpBindSnafu { addr: "127.0.0.1" })
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Internal);
}
@@ -163,9 +213,7 @@ mod tests {
fn throw_tonic_error() -> StdResult<tonic::transport::Error> {
tonic::transport::Endpoint::new("http//http").map(|_| ())
}
let e = throw_tonic_error().context(StartGrpcSnafu).err().unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Internal);
}
@@ -176,8 +224,84 @@ mod tests {
.context(EmptyTableNameSnafu)
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::InvalidArguments);
}
#[test]
fn test_invalid_lease_key_error() {
let e = throw_none_option()
.context(InvalidLeaseKeySnafu { key: "test" })
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::InvalidArguments);
}
#[test]
fn test_lease_key_fromutf8_test() {
fn throw_fromutf8_error() -> StdResult<std::string::FromUtf8Error> {
let sparkle_heart = vec![0, 159, 146, 150];
String::from_utf8(sparkle_heart).map(|_| ())
}
let e = throw_fromutf8_error()
.context(LeaseKeyFromUtf8Snafu)
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Unexpected);
}
#[test]
fn test_serialize_to_json_error() {
let e = throw_serde_json_error()
.context(SerializeToJsonSnafu { input: "" })
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Internal);
}
#[test]
fn test_deserialize_from_json_error() {
let e = throw_serde_json_error()
.context(DeserializeFromJsonSnafu { input: "" })
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Internal);
}
#[test]
fn test_parse_num_error() {
fn throw_parse_int_error() -> StdResult<std::num::ParseIntError> {
"invalid num".parse::<i64>().map(|_| ())
}
let e = throw_parse_int_error()
.context(ParseNumSnafu { err_msg: "" })
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::InvalidArguments);
}
#[test]
fn test_invalid_arguments_error() {
let e = throw_none_option()
.context(InvalidArgumentsSnafu { err_msg: "test" })
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::InvalidArguments);
}
#[test]
fn test_invalid_txn_error() {
let e = throw_none_option()
.context(InvalidTxnResultSnafu { err_msg: "test" })
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Unexpected);
}
}

View File

@@ -1,3 +1,4 @@
pub(crate) mod datanode_lease;
pub(crate) mod response_header;
use std::collections::BTreeMap;
@@ -18,11 +19,29 @@ pub trait HeartbeatHandler: Send + Sync {
async fn handle(
&self,
req: &HeartbeatRequest,
ctx: &mut HeartbeatAccumulator,
store: KvStoreRef,
ctx: &Context,
acc: &mut HeartbeatAccumulator,
) -> Result<()>;
}
#[derive(Clone)]
pub struct Context {
pub server_addr: String, // also server_id
pub kv_store: KvStoreRef,
}
impl Context {
#[inline]
pub fn server_addr(&self) -> &str {
&self.server_addr
}
#[inline]
pub fn kv_store(&self) -> KvStoreRef {
self.kv_store.clone()
}
}
#[derive(Debug, Default)]
pub struct HeartbeatAccumulator {
pub header: Option<ResponseHeader>,
@@ -45,22 +64,13 @@ pub enum Instruction {}
pub type Pusher = Sender<std::result::Result<HeartbeatResponse, tonic::Status>>;
#[derive(Clone)]
#[derive(Clone, Default)]
pub struct HeartbeatHandlers {
kv_store: KvStoreRef,
handlers: Arc<RwLock<Vec<Box<dyn HeartbeatHandler>>>>,
pushers: Arc<RwLock<BTreeMap<String, Pusher>>>,
}
impl HeartbeatHandlers {
pub fn new(kv_store: KvStoreRef) -> Self {
Self {
kv_store,
handlers: Arc::new(RwLock::new(Default::default())),
pushers: Arc::new(RwLock::new(Default::default())),
}
}
pub async fn add_handler(&self, handler: impl HeartbeatHandler + 'static) {
let mut handlers = self.handlers.write().await;
handlers.push(Box::new(handler));
@@ -80,11 +90,11 @@ impl HeartbeatHandlers {
pushers.remove(key)
}
pub async fn handle(&self, req: HeartbeatRequest) -> Result<HeartbeatResponse> {
pub async fn handle(&self, req: HeartbeatRequest, ctx: Context) -> Result<HeartbeatResponse> {
let mut acc = HeartbeatAccumulator::default();
let handlers = self.handlers.read().await;
for h in handlers.iter() {
h.handle(&req, &mut acc, self.kv_store.clone()).await?;
h.handle(&req, &ctx, &mut acc).await?;
}
let header = std::mem::take(&mut acc.header);
let res = HeartbeatResponse {

View File

@@ -0,0 +1,50 @@
use api::v1::meta::HeartbeatRequest;
use api::v1::meta::PutRequest;
use common_telemetry::info;
use common_time::util as time_util;
use super::Context;
use super::HeartbeatAccumulator;
use super::HeartbeatHandler;
use crate::error::Result;
use crate::keys::LeaseKey;
use crate::keys::LeaseValue;
pub struct DatanodeLeaseHandler;
#[async_trait::async_trait]
impl HeartbeatHandler for DatanodeLeaseHandler {
async fn handle(
&self,
req: &HeartbeatRequest,
ctx: &Context,
_acc: &mut HeartbeatAccumulator,
) -> Result<()> {
let HeartbeatRequest { header, peer, .. } = req;
if let Some(ref peer) = peer {
let key = LeaseKey {
cluster_id: header.as_ref().map_or(0, |h| h.cluster_id),
node_id: peer.id,
};
let value = LeaseValue {
timestamp_millis: time_util::current_time_millis(),
node_addr: peer.addr.clone(),
};
info!("Receive a heartbeat from datanode: {:?}, {:?}", key, value);
let key = key.try_into()?;
let value = value.try_into()?;
let put = PutRequest {
key,
value,
..Default::default()
};
let kv_store = ctx.kv_store();
let _ = kv_store.put(put).await?;
}
Ok(())
}
}

View File

@@ -2,10 +2,10 @@ use api::v1::meta::HeartbeatRequest;
use api::v1::meta::ResponseHeader;
use api::v1::meta::PROTOCOL_VERSION;
use super::Context;
use super::HeartbeatAccumulator;
use super::HeartbeatHandler;
use crate::error::Result;
use crate::service::store::kv::KvStoreRef;
pub struct ResponseHeaderHandler;
@@ -14,8 +14,8 @@ impl HeartbeatHandler for ResponseHeaderHandler {
async fn handle(
&self,
req: &HeartbeatRequest,
_ctx: &Context,
acc: &mut HeartbeatAccumulator,
_store: KvStoreRef,
) -> Result<()> {
let HeartbeatRequest { header, .. } = req;
let res_header = ResponseHeader {
@@ -32,25 +32,27 @@ impl HeartbeatHandler for ResponseHeaderHandler {
mod tests {
use std::sync::Arc;
use api::v1::meta::{request_header, HeartbeatResponse};
use api::v1::meta::{HeartbeatResponse, RequestHeader};
use super::*;
use crate::service::store::noop::NoopKvStore;
use crate::{handler::Context, service::store::noop::NoopKvStore};
#[tokio::test]
async fn test_handle_heartbeat_resp_header() {
let kv_store = Arc::new(NoopKvStore {});
let ctx = Context {
server_addr: "0.0.0.0:0000".to_string(),
kv_store,
};
let req = HeartbeatRequest {
header: request_header((1, 2)),
header: RequestHeader::new((1, 2)),
..Default::default()
};
let mut acc = HeartbeatAccumulator::default();
let response_handler = ResponseHeaderHandler {};
response_handler
.handle(&req, &mut acc, kv_store)
.await
.unwrap();
response_handler.handle(&req, &ctx, &mut acc).await.unwrap();
let header = std::mem::take(&mut acc.header);
let res = HeartbeatResponse {
header,

140
src/meta-srv/src/keys.rs Normal file
View File

@@ -0,0 +1,140 @@
use std::str::FromStr;
use lazy_static::lazy_static;
use regex::Regex;
use serde::Deserialize;
use serde::Serialize;
use snafu::ensure;
use snafu::OptionExt;
use snafu::ResultExt;
use crate::error;
use crate::error::Result;
pub(crate) const DN_LEASE_PREFIX: &str = "__meta_dnlease";
lazy_static! {
static ref DATANODE_KEY_PATTERN: Regex =
Regex::new(&format!("^{}-([0-9]+)-([0-9]+)$", DN_LEASE_PREFIX)).unwrap();
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct LeaseKey {
pub cluster_id: u64,
pub node_id: u64,
}
impl FromStr for LeaseKey {
type Err = error::Error;
fn from_str(key: &str) -> Result<Self> {
let caps = DATANODE_KEY_PATTERN
.captures(key)
.context(error::InvalidLeaseKeySnafu { key })?;
ensure!(caps.len() == 3, error::InvalidLeaseKeySnafu { key });
let cluster_id = caps[1].to_string();
let node_id = caps[2].to_string();
let cluster_id = cluster_id.parse::<u64>().context(error::ParseNumSnafu {
err_msg: format!("invalid cluster_id: {}", cluster_id),
})?;
let node_id = node_id.parse::<u64>().context(error::ParseNumSnafu {
err_msg: format!("invalid node_id: {}", node_id),
})?;
Ok(Self {
cluster_id,
node_id,
})
}
}
impl TryFrom<Vec<u8>> for LeaseKey {
type Error = error::Error;
fn try_from(bytes: Vec<u8>) -> Result<Self> {
String::from_utf8(bytes)
.context(error::LeaseKeyFromUtf8Snafu {})
.map(|x| x.parse())?
}
}
impl TryFrom<LeaseKey> for Vec<u8> {
type Error = error::Error;
fn try_from(dn_key: LeaseKey) -> Result<Self> {
Ok(format!(
"{}-{}-{}",
DN_LEASE_PREFIX, dn_key.cluster_id, dn_key.node_id
)
.into_bytes())
}
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct LeaseValue {
// last activity
pub timestamp_millis: i64,
pub node_addr: String,
}
impl FromStr for LeaseValue {
type Err = error::Error;
fn from_str(value: &str) -> Result<Self> {
serde_json::from_str(value).context(error::DeserializeFromJsonSnafu { input: value })
}
}
impl TryFrom<Vec<u8>> for LeaseValue {
type Error = error::Error;
fn try_from(bytes: Vec<u8>) -> Result<Self> {
String::from_utf8(bytes)
.context(error::LeaseKeyFromUtf8Snafu {})
.map(|x| x.parse())?
}
}
impl TryFrom<LeaseValue> for Vec<u8> {
type Error = error::Error;
fn try_from(dn_value: LeaseValue) -> Result<Self> {
Ok(serde_json::to_string(&dn_value)
.context(error::SerializeToJsonSnafu {
input: format!("{:?}", dn_value),
})?
.into_bytes())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_datanode_lease_key() {
let key = LeaseKey {
cluster_id: 0,
node_id: 1,
};
let key_bytes: Vec<u8> = key.clone().try_into().unwrap();
let new_key: LeaseKey = key_bytes.try_into().unwrap();
assert_eq!(new_key, key);
}
#[test]
fn test_datanode_lease_value() {
let value = LeaseValue {
timestamp_millis: 111,
node_addr: "0.0.0.0:3002".to_string(),
};
let value_bytes: Vec<u8> = value.clone().try_into().unwrap();
let new_value: LeaseValue = value_bytes.try_into().unwrap();
assert_eq!(new_value, value);
}
}

46
src/meta-srv/src/lease.rs Normal file
View File

@@ -0,0 +1,46 @@
use api::v1::meta::RangeRequest;
use api::v1::meta::RangeResponse;
use crate::error::Result;
use crate::keys::LeaseKey;
use crate::keys::LeaseValue;
use crate::keys::DN_LEASE_PREFIX;
use crate::service::store::kv::KvStoreRef;
use crate::util;
pub async fn alive_datanodes<P>(
cluster_id: u64,
kv_store: KvStoreRef,
predicate: P,
) -> Result<Vec<(LeaseKey, LeaseValue)>>
where
P: Fn(&LeaseKey, &LeaseValue) -> bool,
{
let key = get_lease_prefix(cluster_id);
let range_end = util::get_prefix_end_key(&key);
let req = RangeRequest {
key,
range_end,
..Default::default()
};
let res = kv_store.range(req).await?;
let RangeResponse { kvs, .. } = res;
let mut lease_kvs = vec![];
for kv in kvs {
let lease_key: LeaseKey = kv.key.try_into()?;
let lease_value: LeaseValue = kv.value.try_into()?;
if !predicate(&lease_key, &lease_value) {
continue;
}
lease_kvs.push((lease_key, lease_value));
}
Ok(lease_kvs)
}
#[inline]
pub fn get_lease_prefix(cluster_id: u64) -> Vec<u8> {
format!("{}-{}", DN_LEASE_PREFIX, cluster_id).into_bytes()
}

View File

@@ -1,7 +1,10 @@
pub mod bootstrap;
pub mod error;
pub mod handler;
mod keys;
pub mod lease;
pub mod metasrv;
pub mod service;
mod util;
pub use crate::error::Result;

View File

@@ -1,6 +1,7 @@
use serde::Deserialize;
use serde::Serialize;
use crate::handler::datanode_lease::DatanodeLeaseHandler;
use crate::handler::response_header::ResponseHeaderHandler;
use crate::handler::HeartbeatHandlers;
use crate::service::store::kv::KvStoreRef;
@@ -10,6 +11,7 @@ pub struct MetaSrvOptions {
pub bind_addr: String,
pub server_addr: String,
pub store_addr: String,
pub datanode_lease_secs: i64,
}
impl Default for MetaSrvOptions {
@@ -18,6 +20,7 @@ impl Default for MetaSrvOptions {
bind_addr: "0.0.0.0:3002".to_string(),
server_addr: "0.0.0.0:3002".to_string(),
store_addr: "0.0.0.0:2380".to_string(),
datanode_lease_secs: 15,
}
}
}
@@ -31,8 +34,10 @@ pub struct MetaSrv {
impl MetaSrv {
pub async fn new(options: MetaSrvOptions, kv_store: KvStoreRef) -> Self {
let heartbeat_handlers = HeartbeatHandlers::new(kv_store.clone());
let heartbeat_handlers = HeartbeatHandlers::default();
heartbeat_handlers.add_handler(ResponseHeaderHandler).await;
heartbeat_handlers.add_handler(DatanodeLeaseHandler).await;
Self {
options,
kv_store,

View File

@@ -23,6 +23,7 @@ use super::GrpcResult;
use super::GrpcStream;
use crate::error;
use crate::error::Result;
use crate::handler::Context;
use crate::metasrv::MetaSrv;
static PUSHER_ID: AtomicU64 = AtomicU64::new(0);
@@ -38,6 +39,10 @@ impl heartbeat_server::Heartbeat for MetaSrv {
let mut in_stream = req.into_inner();
let (tx, rx) = mpsc::channel(128);
let handlers = self.heartbeat_handlers();
let ctx = Context {
server_addr: self.options().server_addr.clone(),
kv_store: self.kv_store(),
};
common_runtime::spawn_bg(async move {
let mut pusher_key = None;
while let Some(msg) = in_stream.next().await {
@@ -56,9 +61,14 @@ impl heartbeat_server::Heartbeat for MetaSrv {
}
}
tx.send(handlers.handle(req).await.map_err(|e| e.into()))
.await
.expect("working rx");
tx.send(
handlers
.handle(req, ctx.clone())
.await
.map_err(|e| e.into()),
)
.await
.expect("working rx");
}
Err(err) => {
if let Some(io_err) = error::match_for_io_error(&err) {
@@ -140,7 +150,7 @@ mod tests {
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store).await;
let req = AskLeaderRequest {
header: request_header((1, 1)),
header: RequestHeader::new((1, 1)),
};
let res = meta_srv.ask_leader(req.into_request()).await.unwrap();

View File

@@ -1,8 +1,14 @@
use api::v1::meta::router_server;
use api::v1::meta::CreateRequest;
use api::v1::meta::Peer;
use api::v1::meta::Region;
use api::v1::meta::RegionRoute;
use api::v1::meta::ResponseHeader;
use api::v1::meta::RouteRequest;
use api::v1::meta::RouteResponse;
use api::v1::meta::Table;
use api::v1::meta::TableRoute;
use common_time::util as time_util;
use snafu::OptionExt;
use tonic::Request;
use tonic::Response;
@@ -11,44 +17,120 @@ use super::store::kv::KvStoreRef;
use super::GrpcResult;
use crate::error;
use crate::error::Result;
use crate::keys::LeaseKey;
use crate::keys::LeaseValue;
use crate::lease;
use crate::metasrv::MetaSrv;
#[derive(Clone)]
struct Context {
pub datanode_lease_secs: i64,
pub kv_store: KvStoreRef,
}
#[async_trait::async_trait]
impl router_server::Router for MetaSrv {
async fn route(&self, req: Request<RouteRequest>) -> GrpcResult<RouteResponse> {
let req = req.into_inner();
let kv_store = self.kv_store();
let res = handle_route(req, kv_store).await?;
let ctx = Context {
datanode_lease_secs: self.options().datanode_lease_secs,
kv_store: self.kv_store(),
};
let res = handle_route(req, ctx).await?;
Ok(Response::new(res))
}
async fn create(&self, req: Request<CreateRequest>) -> GrpcResult<RouteResponse> {
let req = req.into_inner();
let kv_store = self.kv_store();
let res = handle_create(req, kv_store).await?;
let ctx = Context {
datanode_lease_secs: self.options().datanode_lease_secs,
kv_store: self.kv_store(),
};
let res = handle_create(req, ctx, LeaseBasedSelector::default()).await?;
Ok(Response::new(res))
}
}
async fn handle_route(_req: RouteRequest, _kv_store: KvStoreRef) -> Result<RouteResponse> {
#[async_trait::async_trait]
trait DatanodeSelector {
async fn select(&self, id: u64, ctx: &Context) -> Result<Vec<Peer>>;
}
#[derive(Default)]
struct LeaseBasedSelector;
#[async_trait::async_trait]
impl DatanodeSelector for LeaseBasedSelector {
async fn select(&self, id: u64, ctx: &Context) -> Result<Vec<Peer>> {
// filter out the nodes out lease
let lease_filter = |_: &LeaseKey, v: &LeaseValue| {
time_util::current_time_millis() - v.timestamp_millis < ctx.datanode_lease_secs
};
let mut lease_kvs = lease::alive_datanodes(id, ctx.kv_store.clone(), lease_filter).await?;
// TODO(jiachun): At the moment we are just pushing the latest to the forefront,
// and it is better to use load-based strategies in the future.
lease_kvs.sort_by(|a, b| b.1.timestamp_millis.cmp(&a.1.timestamp_millis));
let peers = lease_kvs
.into_iter()
.map(|(k, v)| Peer {
id: k.node_id,
addr: v.node_addr,
})
.collect::<Vec<_>>();
Ok(peers)
}
}
async fn handle_route(_req: RouteRequest, _ctx: Context) -> Result<RouteResponse> {
todo!()
}
async fn handle_create(req: CreateRequest, _kv_store: KvStoreRef) -> Result<RouteResponse> {
let CreateRequest { table_name, .. } = req;
let _table_name = table_name.context(error::EmptyTableNameSnafu)?;
async fn handle_create(
req: CreateRequest,
ctx: Context,
selector: impl DatanodeSelector,
) -> Result<RouteResponse> {
let CreateRequest {
header,
table_name,
partitions,
} = req;
let table_name = table_name.context(error::EmptyTableNameSnafu)?;
let cluster_id = header.as_ref().map_or(0, |h| h.cluster_id);
// TODO(jiachun):
let peers = vec![Peer {
id: 0,
addr: "127.0.0.1:3000".to_string(),
}];
let peers = selector.select(cluster_id, &ctx).await?;
let table = Table {
table_name: Some(table_name),
..Default::default()
};
let region_num = partitions.len();
let mut region_routes = Vec::with_capacity(region_num);
for i in 0..region_num {
let region = Region {
id: i as u64,
..Default::default()
};
let region_route = RegionRoute {
region: Some(region),
leader_peer_index: (i % peers.len()) as u64,
follower_peer_indexes: vec![(i % peers.len()) as u64],
};
region_routes.push(region_route);
}
let table_route = TableRoute {
table: Some(table),
region_routes,
};
Ok(RouteResponse {
header: ResponseHeader::success(cluster_id),
peers,
..Default::default()
table_routes: vec![table_route],
})
}
@@ -71,7 +153,7 @@ mod tests {
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store).await;
let req = RouteRequest {
header: request_header((1, 1)),
header: RequestHeader::new((1, 1)),
..Default::default()
};
let req = req
@@ -82,32 +164,60 @@ mod tests {
let _res = meta_srv.route(req.into_request()).await.unwrap();
}
struct MockSelector;
#[async_trait::async_trait]
impl DatanodeSelector for MockSelector {
async fn select(&self, _: u64, _: &Context) -> Result<Vec<Peer>> {
Ok(vec![
Peer {
id: 0,
addr: "127.0.0.1:3000".to_string(),
},
Peer {
id: 1,
addr: "127.0.0.1:3001".to_string(),
},
])
}
}
#[tokio::test]
async fn test_handle_create() {
let kv_store = Arc::new(NoopKvStore {});
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store).await;
let table_name = TableName::new("test_catalog", "test_db", "table1");
let req = CreateRequest {
header: request_header((1, 1)),
header: RequestHeader::new((1, 1)),
table_name: Some(table_name),
..Default::default()
};
let p0 = Partition::new()
.column_list(vec![b"col1".to_vec(), b"col2".to_vec()])
.value_list(vec![b"v1".to_vec(), b"v2".to_vec()]);
let p1 = Partition::new()
.column_list(vec![b"col1".to_vec(), b"col2".to_vec()])
.value_list(vec![b"v11".to_vec(), b"v22".to_vec()]);
let req = req.add_partition(p0).add_partition(p1);
let ctx = Context {
datanode_lease_secs: 10,
kv_store,
};
let res = handle_create(req, ctx, MockSelector {}).await.unwrap();
let res = meta_srv.create(req.into_request()).await.unwrap();
for r in res.into_inner().peers {
assert_eq!("127.0.0.1:3000", r.addr);
}
assert_eq!(
vec![
Peer {
id: 0,
addr: "127.0.0.1:3000".to_string(),
},
Peer {
id: 1,
addr: "127.0.0.1:3001".to_string(),
},
],
res.peers
);
assert_eq!(1, res.table_routes.len());
assert_eq!(2, res.table_routes.get(0).unwrap().region_routes.len());
}
}

View File

@@ -1,8 +1,13 @@
pub mod etcd;
pub mod kv;
#[cfg(test)]
pub(crate) mod noop;
use api::v1::meta::store_server;
use api::v1::meta::BatchPutRequest;
use api::v1::meta::BatchPutResponse;
use api::v1::meta::CompareAndPutRequest;
use api::v1::meta::CompareAndPutResponse;
use api::v1::meta::DeleteRangeRequest;
use api::v1::meta::DeleteRangeResponse;
use api::v1::meta::PutRequest;
@@ -31,6 +36,23 @@ impl store_server::Store for MetaSrv {
Ok(Response::new(res))
}
async fn batch_put(&self, req: Request<BatchPutRequest>) -> GrpcResult<BatchPutResponse> {
let req = req.into_inner();
let res = self.kv_store().batch_put(req).await?;
Ok(Response::new(res))
}
async fn compare_and_put(
&self,
req: Request<CompareAndPutRequest>,
) -> GrpcResult<CompareAndPutResponse> {
let req = req.into_inner();
let res = self.kv_store().compare_and_put(req).await?;
Ok(Response::new(res))
}
async fn delete_range(
&self,
req: Request<DeleteRangeRequest>,
@@ -74,6 +96,26 @@ mod tests {
assert!(res.is_ok());
}
#[tokio::test]
async fn test_batch_put() {
let kv_store = Arc::new(NoopKvStore {});
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store).await;
let req = BatchPutRequest::default();
let res = meta_srv.batch_put(req.into_request()).await;
assert!(res.is_ok());
}
#[tokio::test]
async fn test_compare_and_put() {
let kv_store = Arc::new(NoopKvStore {});
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store).await;
let req = CompareAndPutRequest::default();
let res = meta_srv.compare_and_put(req.into_request()).await;
assert!(res.is_ok());
}
#[tokio::test]
async fn test_delete_range() {
let kv_store = Arc::new(NoopKvStore {});

View File

@@ -1,5 +1,9 @@
use std::sync::Arc;
use api::v1::meta::BatchPutRequest;
use api::v1::meta::BatchPutResponse;
use api::v1::meta::CompareAndPutRequest;
use api::v1::meta::CompareAndPutResponse;
use api::v1::meta::DeleteRangeRequest;
use api::v1::meta::DeleteRangeResponse;
use api::v1::meta::KeyValue;
@@ -7,11 +11,17 @@ use api::v1::meta::PutRequest;
use api::v1::meta::PutResponse;
use api::v1::meta::RangeRequest;
use api::v1::meta::RangeResponse;
use api::v1::meta::ResponseHeader;
use common_error::prelude::*;
use etcd_client::Client;
use etcd_client::Compare;
use etcd_client::CompareOp;
use etcd_client::DeleteOptions;
use etcd_client::GetOptions;
use etcd_client::PutOptions;
use etcd_client::Txn;
use etcd_client::TxnOp;
use etcd_client::TxnOpResponse;
use super::kv::KvStore;
use super::kv::KvStoreRef;
@@ -40,7 +50,11 @@ impl EtcdStore {
#[async_trait::async_trait]
impl KvStore for EtcdStore {
async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
let Get { key, options } = req.try_into()?;
let Get {
cluster_id,
key,
options,
} = req.try_into()?;
let res = self
.client
@@ -56,14 +70,15 @@ impl KvStore for EtcdStore {
.collect::<Vec<_>>();
Ok(RangeResponse {
header: ResponseHeader::success(cluster_id),
kvs,
more: res.more(),
..Default::default()
})
}
async fn put(&self, req: PutRequest) -> Result<PutResponse> {
let Put {
cluster_id,
key,
value,
options,
@@ -79,13 +94,111 @@ impl KvStore for EtcdStore {
let prev_kv = res.prev_key().map(|kv| KvPair::new(kv).into());
Ok(PutResponse {
header: ResponseHeader::success(cluster_id),
prev_kv,
})
}
async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
let BatchPut {
cluster_id,
kvs,
options,
} = req.try_into()?;
let put_ops = kvs
.into_iter()
.map(|kv| (TxnOp::put(kv.key, kv.value, options.clone())))
.collect::<Vec<_>>();
let txn = Txn::new().and_then(put_ops);
let txn_res = self
.client
.kv_client()
.txn(txn)
.await
.context(error::EtcdFailedSnafu)?;
let mut prev_kvs = vec![];
for op_res in txn_res.op_responses() {
match op_res {
TxnOpResponse::Put(put_res) => {
if let Some(prev_kv) = put_res.prev_key() {
prev_kvs.push(KvPair::new(prev_kv).into());
}
}
_ => unreachable!(), // never get here
}
}
Ok(BatchPutResponse {
header: ResponseHeader::success(cluster_id),
prev_kvs,
})
}
async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result<CompareAndPutResponse> {
let CompareAndPut {
cluster_id,
key,
expect,
value,
options,
} = req.try_into()?;
let txn = Txn::new()
.when(vec![Compare::value(
key.clone(),
CompareOp::Equal,
expect.clone(),
)])
.and_then(vec![TxnOp::put(key.clone(), value, options)])
.or_else(vec![TxnOp::get(key.clone(), None)]);
let txn_res = self
.client
.kv_client()
.txn(txn)
.await
.context(error::EtcdFailedSnafu)?;
let success = txn_res.succeeded();
let prev_kv;
let op_res = txn_res
.op_responses()
.pop()
.context(error::InvalidTxnResultSnafu {
err_msg: "empty response",
})?;
if success {
prev_kv = Some(KeyValue { key, value: expect });
} else {
match op_res {
TxnOpResponse::Get(get_res) => {
ensure!(
get_res.count() == 1,
error::InvalidTxnResultSnafu {
err_msg: format!("expect 1 response, actual {}", get_res.count())
}
);
prev_kv = Some(KeyValue::from(KvPair::new(&get_res.kvs()[0])));
}
_ => unreachable!(), // never get here
}
}
Ok(CompareAndPutResponse {
header: ResponseHeader::success(cluster_id),
success,
prev_kv,
..Default::default()
})
}
async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
let Delete { key, options } = req.try_into()?;
let Delete {
cluster_id,
key,
options,
} = req.try_into()?;
let res = self
.client
@@ -101,14 +214,15 @@ impl KvStore for EtcdStore {
.collect::<Vec<_>>();
Ok(DeleteRangeResponse {
header: ResponseHeader::success(cluster_id),
deleted: res.deleted(),
prev_kvs,
..Default::default()
})
}
}
struct Get {
cluster_id: u64,
key: Vec<u8>,
options: Option<GetOptions>,
}
@@ -118,11 +232,11 @@ impl TryFrom<RangeRequest> for Get {
fn try_from(req: RangeRequest) -> Result<Self> {
let RangeRequest {
header,
key,
range_end,
limit,
keys_only,
..
} = req;
ensure!(!key.is_empty(), error::EmptyKeySnafu);
@@ -139,6 +253,7 @@ impl TryFrom<RangeRequest> for Get {
}
Ok(Get {
cluster_id: header.map_or(0, |h| h.cluster_id),
key,
options: Some(options),
})
@@ -146,6 +261,7 @@ impl TryFrom<RangeRequest> for Get {
}
struct Put {
cluster_id: u64,
key: Vec<u8>,
value: Vec<u8>,
options: Option<PutOptions>,
@@ -156,10 +272,10 @@ impl TryFrom<PutRequest> for Put {
fn try_from(req: PutRequest) -> Result<Self> {
let PutRequest {
header,
key,
value,
prev_kv,
..
} = req;
let mut options = PutOptions::default();
@@ -168,6 +284,7 @@ impl TryFrom<PutRequest> for Put {
}
Ok(Put {
cluster_id: header.map_or(0, |h| h.cluster_id),
key,
value,
options: Some(options),
@@ -175,7 +292,66 @@ impl TryFrom<PutRequest> for Put {
}
}
struct BatchPut {
cluster_id: u64,
kvs: Vec<KeyValue>,
options: Option<PutOptions>,
}
impl TryFrom<BatchPutRequest> for BatchPut {
type Error = error::Error;
fn try_from(req: BatchPutRequest) -> Result<Self> {
let BatchPutRequest {
header,
kvs,
prev_kv,
} = req;
let mut options = PutOptions::default();
if prev_kv {
options = options.with_prev_key();
}
Ok(BatchPut {
cluster_id: header.map_or(0, |h| h.cluster_id),
kvs,
options: Some(options),
})
}
}
struct CompareAndPut {
cluster_id: u64,
key: Vec<u8>,
expect: Vec<u8>,
value: Vec<u8>,
options: Option<PutOptions>,
}
impl TryFrom<CompareAndPutRequest> for CompareAndPut {
type Error = error::Error;
fn try_from(req: CompareAndPutRequest) -> Result<Self> {
let CompareAndPutRequest {
header,
key,
expect,
value,
} = req;
Ok(CompareAndPut {
cluster_id: header.map_or(0, |h| h.cluster_id),
key,
expect,
value,
options: Some(PutOptions::default().with_prev_key()),
})
}
}
struct Delete {
cluster_id: u64,
key: Vec<u8>,
options: Option<DeleteOptions>,
}
@@ -185,10 +361,10 @@ impl TryFrom<DeleteRangeRequest> for Delete {
fn try_from(req: DeleteRangeRequest) -> Result<Self> {
let DeleteRangeRequest {
header,
key,
range_end,
prev_kv,
..
} = req;
ensure!(!key.is_empty(), error::EmptyKeySnafu);
@@ -202,6 +378,7 @@ impl TryFrom<DeleteRangeRequest> for Delete {
}
Ok(Delete {
cluster_id: header.map_or(0, |h| h.cluster_id),
key,
options: Some(options),
})
@@ -213,7 +390,7 @@ struct KvPair<'a>(&'a etcd_client::KeyValue);
impl<'a> KvPair<'a> {
/// Creates a `KvPair` from etcd KeyValue
#[inline]
const fn new(kv: &'a etcd_client::KeyValue) -> Self {
fn new(kv: &'a etcd_client::KeyValue) -> Self {
Self(kv)
}
}
@@ -263,6 +440,41 @@ mod tests {
assert!(put.options.is_some());
}
#[test]
fn test_parse_batch_put() {
let req = BatchPutRequest {
kvs: vec![KeyValue {
key: b"test_key".to_vec(),
value: b"test_value".to_vec(),
}],
prev_kv: true,
..Default::default()
};
let batch_put: BatchPut = req.try_into().unwrap();
assert_eq!(b"test_key".to_vec(), batch_put.kvs.get(0).unwrap().key);
assert_eq!(b"test_value".to_vec(), batch_put.kvs.get(0).unwrap().value);
assert!(batch_put.options.is_some());
}
#[test]
fn test_parse_compare_and_put() {
let req = CompareAndPutRequest {
key: b"test_key".to_vec(),
expect: b"test_expect".to_vec(),
value: b"test_value".to_vec(),
..Default::default()
};
let compare_and_put: CompareAndPut = req.try_into().unwrap();
assert_eq!(b"test_key".to_vec(), compare_and_put.key);
assert_eq!(b"test_expect".to_vec(), compare_and_put.expect);
assert_eq!(b"test_value".to_vec(), compare_and_put.value);
assert!(compare_and_put.options.is_some());
}
#[test]
fn test_parse_delete() {
let req = DeleteRangeRequest {

View File

@@ -1,5 +1,9 @@
use std::sync::Arc;
use api::v1::meta::BatchPutRequest;
use api::v1::meta::BatchPutResponse;
use api::v1::meta::CompareAndPutRequest;
use api::v1::meta::CompareAndPutResponse;
use api::v1::meta::DeleteRangeRequest;
use api::v1::meta::DeleteRangeResponse;
use api::v1::meta::PutRequest;
@@ -17,5 +21,9 @@ pub trait KvStore: Send + Sync {
async fn put(&self, req: PutRequest) -> Result<PutResponse>;
async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse>;
async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result<CompareAndPutResponse>;
async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse>;
}

View File

@@ -1,3 +1,7 @@
use api::v1::meta::BatchPutRequest;
use api::v1::meta::BatchPutResponse;
use api::v1::meta::CompareAndPutRequest;
use api::v1::meta::CompareAndPutResponse;
use api::v1::meta::DeleteRangeRequest;
use api::v1::meta::DeleteRangeResponse;
use api::v1::meta::PutRequest;
@@ -23,6 +27,14 @@ impl KvStore for NoopKvStore {
Ok(PutResponse::default())
}
async fn batch_put(&self, _req: BatchPutRequest) -> Result<BatchPutResponse> {
Ok(BatchPutResponse::default())
}
async fn compare_and_put(&self, _req: CompareAndPutRequest) -> Result<CompareAndPutResponse> {
Ok(CompareAndPutResponse::default())
}
async fn delete_range(&self, _req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
Ok(DeleteRangeResponse::default())
}

34
src/meta-srv/src/util.rs Normal file
View File

@@ -0,0 +1,34 @@
/// Get prefix end key of `key`.
#[inline]
pub fn get_prefix_end_key(key: &[u8]) -> Vec<u8> {
for (i, v) in key.iter().enumerate().rev() {
if *v < 0xFF {
let mut end = Vec::from(&key[..=i]);
end[i] = *v + 1;
return end;
}
}
// next prefix does not exist (e.g., 0xffff);
vec![0]
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_get_prefix() {
let key = b"testa";
assert_eq!(b"testb".to_vec(), get_prefix_end_key(key));
let key = vec![0, 0, 26];
assert_eq!(vec![0, 0, 27], get_prefix_end_key(&key));
let key = vec![0, 0, 255];
assert_eq!(vec![0, 1], get_prefix_end_key(&key));
let key = vec![0, 255, 255];
assert_eq!(vec![1], get_prefix_end_key(&key));
}
}