fix: split write metadata request (#3311)

* feat: add txn_helper

* fix: split the create metadata requests to avoid exceeding the txn limit

* fix: add license header

* chore: some improve
This commit is contained in:
JeremyHi
2024-02-19 15:33:09 +08:00
committed by GitHub
parent aa569f7d6b
commit 6668d6b042
9 changed files with 130 additions and 102 deletions

1
Cargo.lock generated
View File

@@ -1930,6 +1930,7 @@ dependencies = [
"hex",
"humantime-serde",
"hyper",
"itertools 0.10.5",
"lazy_static",
"prometheus",
"prost 0.12.3",

View File

@@ -32,6 +32,7 @@ futures.workspace = true
futures-util.workspace = true
hex = { version = "0.4" }
humantime-serde.workspace = true
itertools.workspace = true
lazy_static.workspace = true
prometheus.workspace = true
prost.workspace = true

View File

@@ -23,6 +23,7 @@ use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
use common_telemetry::info;
use common_telemetry::tracing_context::TracingContext;
use futures_util::future::join_all;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use store_api::storage::{RegionId, RegionNumber};
@@ -159,7 +160,20 @@ impl CreateLogicalTablesProcedure {
let num_tables = tables_data.len();
if num_tables > 0 {
manager.create_logic_tables_metadata(tables_data).await?;
let chunk_size = manager.max_logical_tables_per_batch();
if num_tables > chunk_size {
let chunks = tables_data
.into_iter()
.chunks(chunk_size)
.into_iter()
.map(|chunk| chunk.collect::<Vec<_>>())
.collect::<Vec<_>>();
for chunk in chunks {
manager.create_logical_tables_metadata(chunk).await?;
}
} else {
manager.create_logical_tables_metadata(tables_data).await?;
}
}
let table_ids = self.creator.data.real_table_ids();

View File

@@ -56,6 +56,7 @@ pub mod table_region;
pub mod table_route;
#[cfg(any(test, feature = "testing"))]
pub mod test_utils;
mod txn_helper;
use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug;
@@ -277,7 +278,7 @@ impl<T: Serialize + DeserializeOwned + TableMetaValue> DeserializedValueWithByte
}
/// Returns original `bytes`
pub fn into_bytes(&self) -> Vec<u8> {
pub fn get_raw_bytes(&self) -> Vec<u8> {
self.bytes.to_vec()
}
@@ -457,8 +458,14 @@ impl TableMetadataManager {
Ok(())
}
pub fn max_logical_tables_per_batch(&self) -> usize {
// The batch size is max_txn_size / 3 because the size of the `tables_data`
// is 3 times the size of the `tables_data`.
self.kv_backend.max_txn_size() / 3
}
/// Creates metadata for multiple logical tables and return an error if different metadata exists.
pub async fn create_logic_tables_metadata(
pub async fn create_logical_tables_metadata(
&self,
tables_data: Vec<(RawTableInfo, TableRouteValue)>,
) -> Result<()> {
@@ -1003,13 +1010,13 @@ mod tests {
let tables_data = vec![(table_info.clone(), table_route_value.clone())];
// creates metadata.
table_metadata_manager
.create_logic_tables_metadata(tables_data.clone())
.create_logical_tables_metadata(tables_data.clone())
.await
.unwrap();
// if metadata was already created, it should be ok.
assert!(table_metadata_manager
.create_logic_tables_metadata(tables_data)
.create_logical_tables_metadata(tables_data)
.await
.is_ok());
@@ -1019,7 +1026,7 @@ mod tests {
let modified_tables_data = vec![(table_info.clone(), modified_table_route_value)];
// if remote metadata was exists, it should return an error.
assert!(table_metadata_manager
.create_logic_tables_metadata(modified_tables_data)
.create_logical_tables_metadata(modified_tables_data)
.await
.is_err());

View File

@@ -18,10 +18,10 @@ use serde::{Deserialize, Serialize};
use table::metadata::{RawTableInfo, TableId};
use table::table_reference::TableReference;
use super::{DeserializedValueWithBytes, TableMetaValue, TABLE_INFO_KEY_PREFIX};
use super::{txn_helper, DeserializedValueWithBytes, TableMetaValue, TABLE_INFO_KEY_PREFIX};
use crate::error::Result;
use crate::key::{to_removed_key, TableMetaKey};
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse};
use crate::kv_backend::txn::{Txn, TxnOp, TxnOpResponse};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::BatchGetRequest;
use crate::table_name::TableName;
@@ -112,7 +112,7 @@ impl TableInfoManager {
let raw_key = key.as_raw_key();
let txn = Txn::new().and_then(vec![TxnOp::Get(raw_key.clone())]);
(txn, Self::build_decode_fn(raw_key))
(txn, txn_helper::build_txn_response_decoder_fn(raw_key))
}
/// Builds a create table info transaction, it expected the `__table_info/{table_id}` wasn't occupied.
@@ -127,18 +127,12 @@ impl TableInfoManager {
let key = TableInfoKey::new(table_id);
let raw_key = key.as_raw_key();
let txn = Txn::new()
.when(vec![Compare::with_not_exist_value(
raw_key.clone(),
CompareOp::Equal,
)])
.and_then(vec![TxnOp::Put(
raw_key.clone(),
table_info_value.try_as_raw_value()?,
)])
.or_else(vec![TxnOp::Get(raw_key.clone())]);
let txn = txn_helper::build_put_if_absent_txn(
raw_key.clone(),
table_info_value.try_as_raw_value()?,
);
Ok((txn, Self::build_decode_fn(raw_key)))
Ok((txn, txn_helper::build_txn_response_decoder_fn(raw_key)))
}
/// Builds a update table info transaction, it expected the remote value equals the `current_current_table_info_value`.
@@ -154,21 +148,12 @@ impl TableInfoManager {
)> {
let key = TableInfoKey::new(table_id);
let raw_key = key.as_raw_key();
let raw_value = current_table_info_value.into_bytes();
let raw_value = current_table_info_value.get_raw_bytes();
let new_raw_value: Vec<u8> = new_table_info_value.try_as_raw_value()?;
let txn = Txn::new()
.when(vec![Compare::with_value(
raw_key.clone(),
CompareOp::Equal,
raw_value,
)])
.and_then(vec![TxnOp::Put(
raw_key.clone(),
new_table_info_value.try_as_raw_value()?,
)])
.or_else(vec![TxnOp::Get(raw_key.clone())]);
let txn = txn_helper::build_compare_and_put_txn(raw_key.clone(), raw_value, new_raw_value);
Ok((txn, Self::build_decode_fn(raw_key)))
Ok((txn, txn_helper::build_txn_response_decoder_fn(raw_key)))
}
/// Builds a delete table info transaction.
@@ -179,7 +164,7 @@ impl TableInfoManager {
) -> Result<Txn> {
let key = TableInfoKey::new(table_id);
let raw_key = key.as_raw_key();
let raw_value = table_info_value.into_bytes();
let raw_value = table_info_value.get_raw_bytes();
let removed_key = to_removed_key(&String::from_utf8_lossy(&raw_key));
let txn = Txn::new().and_then(vec![
@@ -190,26 +175,6 @@ impl TableInfoManager {
Ok(txn)
}
fn build_decode_fn(
raw_key: Vec<u8>,
) -> impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<DeserializedValueWithBytes<TableInfoValue>>>
{
move |kvs: &Vec<TxnOpResponse>| {
kvs.iter()
.filter_map(|resp| {
if let TxnOpResponse::ResponseGet(r) = resp {
Some(r)
} else {
None
}
})
.flat_map(|r| &r.kvs)
.find(|kv| kv.key == raw_key)
.map(|kv| DeserializedValueWithBytes::from_inner_slice(&kv.value))
.transpose()
}
}
#[cfg(test)]
pub async fn get_removed(
&self,

View File

@@ -20,13 +20,13 @@ use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId;
use super::{DeserializedValueWithBytes, TableMetaValue};
use super::{txn_helper, DeserializedValueWithBytes, TableMetaValue};
use crate::error::{
MetadataCorruptionSnafu, Result, SerdeJsonSnafu, TableRouteNotFoundSnafu,
UnexpectedLogicalRouteTableSnafu,
};
use crate::key::{to_removed_key, RegionDistribution, TableMetaKey, TABLE_ROUTE_PREFIX};
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse};
use crate::kv_backend::txn::{Txn, TxnOp, TxnOpResponse};
use crate::kv_backend::KvBackendRef;
use crate::rpc::router::{region_distribution, RegionRoute};
use crate::rpc::store::BatchGetRequest;
@@ -232,7 +232,7 @@ impl TableRouteManager {
let raw_key = key.as_raw_key();
let txn = Txn::new().and_then(vec![TxnOp::Get(raw_key.clone())]);
(txn, Self::build_decode_fn(raw_key))
(txn, txn_helper::build_txn_response_decoder_fn(raw_key))
}
/// Builds a create table route transaction. it expected the `__table_route/{table_id}` wasn't occupied.
@@ -247,18 +247,12 @@ impl TableRouteManager {
let key = TableRouteKey::new(table_id);
let raw_key = key.as_raw_key();
let txn = Txn::new()
.when(vec![Compare::with_not_exist_value(
raw_key.clone(),
CompareOp::Equal,
)])
.and_then(vec![TxnOp::Put(
raw_key.clone(),
table_route_value.try_as_raw_value()?,
)])
.or_else(vec![TxnOp::Get(raw_key.clone())]);
let txn = txn_helper::build_put_if_absent_txn(
raw_key.clone(),
table_route_value.try_as_raw_value()?,
);
Ok((txn, Self::build_decode_fn(raw_key)))
Ok((txn, txn_helper::build_txn_response_decoder_fn(raw_key)))
}
/// Builds a update table route transaction, it expected the remote value equals the `current_table_route_value`.
@@ -274,19 +268,12 @@ impl TableRouteManager {
)> {
let key = TableRouteKey::new(table_id);
let raw_key = key.as_raw_key();
let raw_value = current_table_route_value.into_bytes();
let raw_value = current_table_route_value.get_raw_bytes();
let new_raw_value: Vec<u8> = new_table_route_value.try_as_raw_value()?;
let txn = Txn::new()
.when(vec![Compare::with_value(
raw_key.clone(),
CompareOp::Equal,
raw_value,
)])
.and_then(vec![TxnOp::Put(raw_key.clone(), new_raw_value)])
.or_else(vec![TxnOp::Get(raw_key.clone())]);
let txn = txn_helper::build_compare_and_put_txn(raw_key.clone(), raw_value, new_raw_value);
Ok((txn, Self::build_decode_fn(raw_key)))
Ok((txn, txn_helper::build_txn_response_decoder_fn(raw_key)))
}
/// Builds a delete table route transaction, it expected the remote value equals the `table_route_value`.
@@ -297,7 +284,7 @@ impl TableRouteManager {
) -> Result<Txn> {
let key = TableRouteKey::new(table_id);
let raw_key = key.as_raw_key();
let raw_value = table_route_value.into_bytes();
let raw_value = table_route_value.get_raw_bytes();
let removed_key = to_removed_key(&String::from_utf8_lossy(&raw_key));
let txn = Txn::new().and_then(vec![
@@ -308,27 +295,6 @@ impl TableRouteManager {
Ok(txn)
}
fn build_decode_fn(
raw_key: Vec<u8>,
) -> impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>>
{
move |response: &Vec<TxnOpResponse>| {
response
.iter()
.filter_map(|resp| {
if let TxnOpResponse::ResponseGet(r) = resp {
Some(r)
} else {
None
}
})
.flat_map(|r| &r.kvs)
.find(|kv| kv.key == raw_key)
.map(|kv| DeserializedValueWithBytes::from_inner_slice(&kv.value))
.transpose()
}
}
pub async fn get(
&self,
table_id: TableId,

View File

@@ -0,0 +1,64 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use serde::de::DeserializeOwned;
use serde::Serialize;
use crate::error::Result;
use crate::key::{DeserializedValueWithBytes, TableMetaValue};
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse};
pub(crate) fn build_txn_response_decoder_fn<T>(
raw_key: Vec<u8>,
) -> impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<DeserializedValueWithBytes<T>>>
where
T: Serialize + DeserializeOwned + TableMetaValue,
{
move |txn_res: &Vec<TxnOpResponse>| {
txn_res
.iter()
.filter_map(|resp| {
if let TxnOpResponse::ResponseGet(r) = resp {
Some(r)
} else {
None
}
})
.flat_map(|r| &r.kvs)
.find(|kv| kv.key == raw_key)
.map(|kv| DeserializedValueWithBytes::from_inner_slice(&kv.value))
.transpose()
}
}
pub(crate) fn build_put_if_absent_txn(key: Vec<u8>, value: Vec<u8>) -> Txn {
Txn::new()
.when(vec![Compare::with_not_exist_value(
key.clone(),
CompareOp::Equal,
)])
.and_then(vec![TxnOp::Put(key.clone(), value)])
.or_else(vec![TxnOp::Get(key)])
}
pub(crate) fn build_compare_and_put_txn(key: Vec<u8>, old_value: Vec<u8>, value: Vec<u8>) -> Txn {
Txn::new()
.when(vec![Compare::with_value(
key.clone(),
CompareOp::Equal,
old_value,
)])
.and_then(vec![TxnOp::Put(key.clone(), value)])
.or_else(vec![TxnOp::Get(key)])
}

View File

@@ -66,7 +66,8 @@ impl EtcdStore {
}
async fn do_multi_txn(&self, txn_ops: Vec<TxnOp>) -> Result<Vec<TxnResponse>> {
if txn_ops.len() < MAX_TXN_SIZE {
let max_txn_size = self.max_txn_size();
if txn_ops.len() < max_txn_size {
// fast path
let _timer = METRIC_META_TXN_REQUEST
.with_label_values(&["etcd", "txn"])
@@ -82,7 +83,7 @@ impl EtcdStore {
}
let txns = txn_ops
.chunks(MAX_TXN_SIZE)
.chunks(max_txn_size)
.map(|part| async move {
let _timer = METRIC_META_TXN_REQUEST
.with_label_values(&["etcd", "txn"])
@@ -319,6 +320,10 @@ impl TxnService for EtcdStore {
.context(error::EtcdFailedSnafu)?;
txn_res.try_into()
}
fn max_txn_size(&self) -> usize {
MAX_TXN_SIZE
}
}
struct Get {

View File

@@ -25,6 +25,11 @@ pub trait TxnService: Sync + Send {
async fn txn(&self, _txn: Txn) -> Result<TxnResponse, Self::Error> {
unimplemented!("txn is not implemented")
}
/// Maximum number of operations permitted in a transaction.
fn max_txn_size(&self) -> usize {
usize::MAX
}
}
#[derive(Debug, Clone, PartialEq)]