feat: impl BatchDelete (#1253)

* chore: impl `BatchDelete`

* chore: add `batch_delete` to meta-client

* fix: auth param length check

* fix: auth param length check

* chore: rebase develop

* chore: use `filter_map`

Co-authored-by: LFC <bayinamine@gmail.com>

* chore: update error msg

Co-authored-by: LFC <bayinamine@gmail.com>

* fix: pre-allocate vec length

---------

Co-authored-by: LFC <bayinamine@gmail.com>
This commit is contained in:
shuiyisong
2023-03-28 14:06:13 +08:00
committed by GitHub
parent ed1cb73ffc
commit 995a28a27d
13 changed files with 346 additions and 31 deletions

2
Cargo.lock generated
View File

@@ -3115,7 +3115,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ff8c55bed0fa6e20577aa95af5b87f9c9e39b1e7#ff8c55bed0fa6e20577aa95af5b87f9c9e39b1e7"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=d3861c34f7920238869d0d4e50dc1e6b189d2a6b#d3861c34f7920238869d0d4e50dc1e6b189d2a6b"
dependencies = [
"prost",
"tonic",

View File

@@ -10,7 +10,7 @@ common-base = { path = "../common/base" }
common-error = { path = "../common/error" }
common-time = { path = "../common/time" }
datatypes = { path = "../datatypes" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ff8c55bed0fa6e20577aa95af5b87f9c9e39b1e7" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "d3861c34f7920238869d0d4e50dc1e6b189d2a6b" }
prost.workspace = true
snafu = { version = "0.7", features = ["backtraces"] }
tonic.workspace = true

View File

@@ -22,8 +22,8 @@ use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
use meta_client::client::MetaClientBuilder;
use meta_client::rpc::{
BatchPutRequest, CompareAndPutRequest, CreateRequest, DeleteRangeRequest, Partition,
PutRequest, RangeRequest, TableName,
BatchDeleteRequest, BatchGetRequest, BatchPutRequest, CompareAndPutRequest, CreateRequest,
DeleteRangeRequest, Partition, PutRequest, RangeRequest, TableName,
};
use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType};
use table::requests::TableOptions;
@@ -146,6 +146,30 @@ async fn run() {
// get none
let res = meta_client.range(range).await.unwrap();
event!(Level::INFO, "get range result: {:#?}", res);
// batch delete
// put two
let batch_put = BatchPutRequest::new()
.add_kv(b"batch_put1".to_vec(), b"batch_put_v1".to_vec())
.add_kv(b"batch_put2".to_vec(), b"batch_put_v2".to_vec())
.with_prev_kv();
let res = meta_client.batch_put(batch_put).await.unwrap();
event!(Level::INFO, "batch put result: {:#?}", res);
// delete one
let batch_delete = BatchDeleteRequest::new()
.add_key(b"batch_put1".to_vec())
.with_prev_kv();
let res = meta_client.batch_delete(batch_delete).await.unwrap();
event!(Level::INFO, "batch delete result: {:#?}", res);
// get other one
let batch_get = BatchGetRequest::new()
.add_key(b"batch_put1".to_vec())
.add_key(b"batch_put2".to_vec());
let res = meta_client.batch_get(batch_get).await.unwrap();
event!(Level::INFO, "batch get result: {:#?}", res);
}
fn new_table_info() -> RawTableInfo {

View File

@@ -32,10 +32,10 @@ use crate::error::Result;
use crate::rpc::lock::{LockRequest, LockResponse, UnlockRequest};
use crate::rpc::router::DeleteRequest;
use crate::rpc::{
BatchGetRequest, BatchGetResponse, BatchPutRequest, BatchPutResponse, CompareAndPutRequest,
CompareAndPutResponse, CreateRequest, DeleteRangeRequest, DeleteRangeResponse,
MoveValueRequest, MoveValueResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
RouteRequest, RouteResponse,
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, CreateRequest,
DeleteRangeRequest, DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest,
PutResponse, RangeRequest, RangeResponse, RouteRequest, RouteResponse,
};
pub type Id = (u64, u64);
@@ -256,6 +256,14 @@ impl MetaClient {
self.store_client()?.batch_put(req.into()).await?.try_into()
}
/// BatchDelete atomically deletes the given keys from the key-value store.
pub async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
self.store_client()?
.batch_delete(req.into())
.await?
.try_into()
}
/// CompareAndPut atomically puts the value to the given updated
/// value if the current value == the expected value.
pub async fn compare_and_put(

View File

@@ -17,9 +17,10 @@ use std::sync::Arc;
use api::v1::meta::store_client::StoreClient;
use api::v1::meta::{
BatchGetRequest, BatchGetResponse, BatchPutRequest, BatchPutResponse, CompareAndPutRequest,
CompareAndPutResponse, DeleteRangeRequest, DeleteRangeResponse, MoveValueRequest,
MoveValueResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse,
RangeRequest, RangeResponse,
};
use common_grpc::channel_manager::ChannelManager;
use snafu::{ensure, OptionExt, ResultExt};
@@ -80,6 +81,11 @@ impl Client {
inner.batch_put(req).await
}
pub async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
let inner = self.inner.read().await;
inner.batch_delete(req).await
}
pub async fn compare_and_put(
&self,
req: CompareAndPutRequest,
@@ -169,6 +175,17 @@ impl Inner {
Ok(res.into_inner())
}
async fn batch_delete(&self, mut req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
let mut client = self.random_client()?;
req.set_header(self.id);
let res = client
.batch_delete(req)
.await
.context(error::TonicStatusSnafu)?;
Ok(res.into_inner())
}
async fn compare_and_put(
&self,
mut req: CompareAndPutRequest,

View File

@@ -28,9 +28,10 @@ pub use router::{
};
use serde::{Deserialize, Serialize};
pub use store::{
BatchGetRequest, BatchGetResponse, BatchPutRequest, BatchPutResponse, CompareAndPutRequest,
CompareAndPutResponse, DeleteRangeRequest, DeleteRangeResponse, MoveValueRequest,
MoveValueResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse,
RangeRequest, RangeResponse,
};
#[derive(Debug, Clone)]

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use api::v1::meta::{
BatchDeleteRequest as PbBatchDeleteRequest, BatchDeleteResponse as PbBatchDeleteResponse,
BatchGetRequest as PbBatchGetRequest, BatchGetResponse as PbBatchGetResponse,
BatchPutRequest as PbBatchPutRequest, BatchPutResponse as PbBatchPutResponse,
CompareAndPutRequest as PbCompareAndPutRequest,
@@ -377,6 +378,78 @@ impl BatchPutResponse {
}
}
#[derive(Debug, Clone, Default)]
pub struct BatchDeleteRequest {
pub keys: Vec<Vec<u8>>,
/// If prev_kv is set, gets the previous key-value pairs before deleting it.
/// The previous key-value pairs will be returned in the batch delete response.
pub prev_kv: bool,
}
impl From<BatchDeleteRequest> for PbBatchDeleteRequest {
fn from(req: BatchDeleteRequest) -> Self {
Self {
header: None,
keys: req.keys,
prev_kv: req.prev_kv,
}
}
}
impl BatchDeleteRequest {
#[inline]
pub fn new() -> Self {
Self {
keys: vec![],
prev_kv: false,
}
}
#[inline]
pub fn add_key(mut self, key: impl Into<Vec<u8>>) -> Self {
self.keys.push(key.into());
self
}
/// If prev_kv is set, gets the previous key-value pair before deleting it.
/// The previous key-value pair will be returned in the batch delete response.
#[inline]
pub fn with_prev_kv(mut self) -> Self {
self.prev_kv = true;
self
}
}
#[derive(Debug, Clone)]
pub struct BatchDeleteResponse(PbBatchDeleteResponse);
impl TryFrom<PbBatchDeleteResponse> for BatchDeleteResponse {
type Error = error::Error;
fn try_from(pb: PbBatchDeleteResponse) -> Result<Self> {
util::check_response_header(pb.header.as_ref())?;
Ok(Self::new(pb))
}
}
impl BatchDeleteResponse {
#[inline]
pub fn new(res: PbBatchDeleteResponse) -> Self {
Self(res)
}
#[inline]
pub fn take_header(&mut self) -> Option<ResponseHeader> {
self.0.header.take().map(ResponseHeader::new)
}
#[inline]
pub fn take_prev_kvs(&mut self) -> Vec<KeyValue> {
self.0.prev_kvs.drain(..).map(KeyValue::new).collect()
}
}
#[derive(Debug, Clone, Default)]
pub struct CompareAndPutRequest {
/// key is the key, in bytes, to put into the key-value store.
@@ -832,6 +905,39 @@ mod tests {
assert_eq!(b"v1".to_vec(), kvs[0].value().to_vec());
}
#[test]
fn test_batch_delete_request_trans() {
let req = BatchDeleteRequest::new()
.add_key(b"test_key1".to_vec())
.add_key(b"test_key2".to_vec())
.add_key(b"test_key3".to_vec())
.with_prev_kv();
let into_req: PbBatchDeleteRequest = req.into();
assert!(into_req.header.is_none());
assert_eq!(&b"test_key1".to_vec(), into_req.keys.get(0).unwrap());
assert_eq!(&b"test_key2".to_vec(), into_req.keys.get(1).unwrap());
assert_eq!(&b"test_key3".to_vec(), into_req.keys.get(2).unwrap());
assert!(into_req.prev_kv);
}
#[test]
fn test_batch_delete_response_trans() {
let pb_res = PbBatchDeleteResponse {
header: None,
prev_kvs: vec![PbKeyValue {
key: b"k1".to_vec(),
value: b"v1".to_vec(),
}],
};
let mut res = BatchDeleteResponse::new(pb_res);
assert!(res.take_header().is_none());
let kvs = res.take_prev_kvs();
assert_eq!(b"k1".to_vec(), kvs[0].key().to_vec());
assert_eq!(b"v1".to_vec(), kvs[0].value().to_vec());
}
#[test]
fn test_compare_and_put_request_trans() {
let (key, expect, value) = (

View File

@@ -150,7 +150,9 @@ impl Inner {
mod tests {
use std::sync::Arc;
use api::v1::meta::{BatchGetRequest, BatchGetResponse};
use api::v1::meta::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse,
};
use super::*;
use crate::service::store::kv::KvStore;
@@ -218,6 +220,10 @@ mod tests {
) -> Result<api::v1::meta::MoveValueResponse> {
unreachable!()
}
async fn batch_delete(&self, _: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
unreachable!()
}
}
let kv_store = Arc::new(Noop {});

View File

@@ -18,9 +18,10 @@ pub mod kv;
pub mod memory;
use api::v1::meta::{
store_server, BatchGetRequest, BatchGetResponse, BatchPutRequest, BatchPutResponse,
CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest, DeleteRangeResponse,
MoveValueRequest, MoveValueResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
store_server, BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse,
BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse,
DeleteRangeRequest, DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest,
PutResponse, RangeRequest, RangeResponse,
};
use tonic::{Request, Response};
@@ -57,6 +58,15 @@ impl store_server::Store for MetaSrv {
Ok(Response::new(res))
}
async fn batch_delete(
&self,
req: Request<BatchDeleteRequest>,
) -> GrpcResult<BatchDeleteResponse> {
let req = req.into_inner();
let res = self.kv_store().batch_delete(req).await?;
Ok(Response::new(res))
}
async fn compare_and_put(
&self,
req: Request<CompareAndPutRequest>,
@@ -144,6 +154,18 @@ mod tests {
assert!(res.is_ok());
}
#[tokio::test]
async fn test_batch_delete() {
let kv_store = Arc::new(MemStore::new());
let meta_srv = MetaSrvBuilder::new().kv_store(kv_store).build().await;
let req = BatchDeleteRequest::default();
let res = meta_srv.batch_delete(req.into_request()).await;
assert!(res.is_ok());
}
#[tokio::test]
async fn test_compare_and_put() {
let kv_store = Arc::new(MemStore::new());

View File

@@ -15,9 +15,10 @@
use std::sync::Arc;
use api::v1::meta::{
BatchGetRequest, BatchGetResponse, BatchPutRequest, BatchPutResponse, CompareAndPutRequest,
CompareAndPutResponse, DeleteRangeRequest, DeleteRangeResponse, KeyValue, MoveValueRequest,
MoveValueResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, ResponseHeader,
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
DeleteRangeResponse, KeyValue, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse,
RangeRequest, RangeResponse, ResponseHeader,
};
use common_error::prelude::*;
use common_telemetry::warn;
@@ -168,6 +169,44 @@ impl KvStore for EtcdStore {
Ok(BatchPutResponse { header, prev_kvs })
}
async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
let BatchDelete {
cluster_id,
keys,
options,
} = req.try_into()?;
let mut prev_kvs = Vec::with_capacity(keys.len());
let delete_ops = keys
.into_iter()
.map(|k| TxnOp::delete(k, options.clone()))
.collect::<Vec<_>>();
let txn = Txn::new().and_then(delete_ops);
let txn_res = self
.client
.kv_client()
.txn(txn)
.await
.context(error::EtcdFailedSnafu)?;
for op_res in txn_res.op_responses() {
match op_res {
TxnOpResponse::Delete(delete_res) => {
delete_res.prev_kvs().iter().for_each(|kv| {
prev_kvs.push(KvPair::to_kv(kv));
});
}
_ => unreachable!(), // never get here
}
}
let header = Some(ResponseHeader::success(cluster_id));
Ok(BatchDeleteResponse { header, prev_kvs })
}
async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result<CompareAndPutResponse> {
let CompareAndPut {
cluster_id,
@@ -406,7 +445,7 @@ impl TryFrom<BatchGetRequest> for BatchGet {
fn try_from(req: BatchGetRequest) -> Result<Self> {
let BatchGetRequest { header, keys } = req;
let options = GetOptions::default().with_keys_only();
let options = GetOptions::default();
Ok(BatchGet {
cluster_id: header.map_or(0, |h| h.cluster_id),
@@ -445,6 +484,35 @@ impl TryFrom<BatchPutRequest> for BatchPut {
}
}
struct BatchDelete {
cluster_id: u64,
keys: Vec<Vec<u8>>,
options: Option<DeleteOptions>,
}
impl TryFrom<BatchDeleteRequest> for BatchDelete {
type Error = error::Error;
fn try_from(req: BatchDeleteRequest) -> Result<Self> {
let BatchDeleteRequest {
header,
keys,
prev_kv,
} = req;
let mut options = DeleteOptions::default();
if prev_kv {
options = options.with_prev_key();
}
Ok(BatchDelete {
cluster_id: header.map_or(0, |h| h.cluster_id),
keys,
options: Some(options),
})
}
}
struct CompareAndPut {
cluster_id: u64,
key: Vec<u8>,
@@ -628,6 +696,23 @@ mod tests {
assert!(batch_put.options.is_some());
}
#[test]
fn test_parse_batch_delete() {
let req = BatchDeleteRequest {
keys: vec![b"k1".to_vec(), b"k2".to_vec(), b"k3".to_vec()],
prev_kv: true,
..Default::default()
};
let batch_delete: BatchDelete = req.try_into().unwrap();
assert_eq!(batch_delete.keys.len(), 3);
assert_eq!(b"k1".to_vec(), batch_delete.keys.get(0).unwrap().to_vec());
assert_eq!(b"k2".to_vec(), batch_delete.keys.get(1).unwrap().to_vec());
assert_eq!(b"k3".to_vec(), batch_delete.keys.get(2).unwrap().to_vec());
assert!(batch_delete.options.is_some());
}
#[test]
fn test_parse_compare_and_put() {
let req = CompareAndPutRequest {

View File

@@ -15,9 +15,10 @@
use std::sync::Arc;
use api::v1::meta::{
BatchGetRequest, BatchGetResponse, BatchPutRequest, BatchPutResponse, CompareAndPutRequest,
CompareAndPutResponse, DeleteRangeRequest, DeleteRangeResponse, MoveValueRequest,
MoveValueResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse,
RangeRequest, RangeResponse,
};
use crate::error::Result;
@@ -35,6 +36,8 @@ pub trait KvStore: Send + Sync {
async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse>;
async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse>;
async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result<CompareAndPutResponse>;
async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse>;

View File

@@ -17,9 +17,10 @@ use std::collections::BTreeMap;
use std::ops::Range;
use api::v1::meta::{
BatchGetRequest, BatchGetResponse, BatchPutRequest, BatchPutResponse, CompareAndPutRequest,
CompareAndPutResponse, DeleteRangeRequest, DeleteRangeResponse, KeyValue, MoveValueRequest,
MoveValueResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, ResponseHeader,
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
DeleteRangeResponse, KeyValue, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse,
RangeRequest, RangeResponse, ResponseHeader,
};
use parking_lot::RwLock;
@@ -163,6 +164,29 @@ impl KvStore for MemStore {
Ok(BatchPutResponse { header, prev_kvs })
}
async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
let BatchDeleteRequest {
header,
keys,
prev_kv,
} = req;
let mut memory = self.inner.write();
let prev_kvs = if prev_kv {
keys.into_iter()
.filter_map(|key| memory.remove(&key).map(|value| KeyValue { key, value }))
.collect()
} else {
for key in keys.into_iter() {
memory.remove(&key);
}
vec![]
};
let cluster_id = header.map_or(0, |h| h.cluster_id);
let header = Some(ResponseHeader::success(cluster_id));
Ok(BatchDeleteResponse { header, prev_kvs })
}
async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result<CompareAndPutResponse> {
let CompareAndPutRequest {
header,

View File

@@ -26,8 +26,9 @@ use sha1::Sha1;
use snafu::{ensure, OptionExt, ResultExt};
use crate::auth::{
Error, HashedPassword, Identity, InvalidConfigSnafu, IoSnafu, Password, Result, Salt,
UnsupportedPasswordTypeSnafu, UserNotFoundSnafu, UserPasswordMismatchSnafu, UserProvider,
Error, HashedPassword, Identity, IllegalParamSnafu, InvalidConfigSnafu, IoSnafu, Password,
Result, Salt, UnsupportedPasswordTypeSnafu, UserNotFoundSnafu, UserPasswordMismatchSnafu,
UserProvider,
};
pub const STATIC_USER_PROVIDER: &str = "static_user_provider";
@@ -106,12 +107,24 @@ impl UserProvider for StaticUserProvider {
) -> Result<UserInfo> {
match input_id {
Identity::UserId(username, _) => {
ensure!(
!username.is_empty(),
IllegalParamSnafu {
msg: "blank username"
}
);
let save_pwd = self.users.get(username).context(UserNotFoundSnafu {
username: username.to_string(),
})?;
match input_pwd {
Password::PlainText(pwd) => {
ensure!(
!pwd.is_empty(),
IllegalParamSnafu {
msg: "blank password"
}
);
return if save_pwd == pwd.as_bytes() {
Ok(UserInfo::new(username))
} else {
@@ -119,9 +132,15 @@ impl UserProvider for StaticUserProvider {
username: username.to_string(),
}
.fail()
}
};
}
Password::MysqlNativePassword(auth_data, salt) => {
ensure!(
auth_data.len() == 20,
IllegalParamSnafu {
msg: "Illegal MySQL native password format, length != 20"
}
);
auth_mysql(auth_data, salt, username, save_pwd)
.map(|_| UserInfo::new(username))
}