mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-04 12:22:55 +00:00
refactor: avoid unnecessary cloning (#3734)
refactor: using `TxnOpGetResponseSet`
This commit is contained in:
@@ -91,7 +91,7 @@ use crate::ddl::utils::region_storage_path;
|
||||
use crate::error::{self, Result, SerdeJsonSnafu};
|
||||
use crate::key::table_route::TableRouteKey;
|
||||
use crate::key::txn_helper::TxnOpGetResponseSet;
|
||||
use crate::kv_backend::txn::{Txn, TxnOp, TxnOpResponse};
|
||||
use crate::kv_backend::txn::{Txn, TxnOp};
|
||||
use crate::kv_backend::KvBackendRef;
|
||||
use crate::rpc::router::{region_distribution, RegionRoute, RegionStatus};
|
||||
use crate::rpc::store::BatchDeleteRequest;
|
||||
@@ -466,17 +466,18 @@ impl TableMetadataManager {
|
||||
txn = txn.merge(create_datanode_table_txn);
|
||||
}
|
||||
|
||||
let r = self.kv_backend.txn(txn).await?;
|
||||
let mut r = self.kv_backend.txn(txn).await?;
|
||||
|
||||
// Checks whether metadata was already created.
|
||||
if !r.succeeded {
|
||||
let remote_table_info = on_create_table_info_failure(&r.responses)?
|
||||
let mut set = TxnOpGetResponseSet::from(&mut r.responses);
|
||||
let remote_table_info = on_create_table_info_failure(&mut set)?
|
||||
.context(error::UnexpectedSnafu {
|
||||
err_msg: "Reads the empty table info during the create table metadata",
|
||||
})?
|
||||
.into_inner();
|
||||
|
||||
let remote_table_route = on_create_table_route_failure(&r.responses)?
|
||||
let remote_table_route = on_create_table_route_failure(&mut set)?
|
||||
.context(error::UnexpectedSnafu {
|
||||
err_msg: "Reads the empty table route during the create table metadata",
|
||||
})?
|
||||
@@ -505,8 +506,8 @@ impl TableMetadataManager {
|
||||
let mut txns = Vec::with_capacity(3 * len);
|
||||
struct OnFailure<F1, R1, F2, R2>
|
||||
where
|
||||
F1: FnOnce(&Vec<TxnOpResponse>) -> R1,
|
||||
F2: FnOnce(&Vec<TxnOpResponse>) -> R2,
|
||||
F1: FnOnce(&mut TxnOpGetResponseSet) -> R1,
|
||||
F2: FnOnce(&mut TxnOpGetResponseSet) -> R2,
|
||||
{
|
||||
table_info_value: TableInfoValue,
|
||||
on_create_table_info_failure: F1,
|
||||
@@ -551,18 +552,19 @@ impl TableMetadataManager {
|
||||
}
|
||||
|
||||
let txn = Txn::merge_all(txns);
|
||||
let r = self.kv_backend.txn(txn).await?;
|
||||
let mut r = self.kv_backend.txn(txn).await?;
|
||||
|
||||
// Checks whether metadata was already created.
|
||||
if !r.succeeded {
|
||||
let mut set = TxnOpGetResponseSet::from(&mut r.responses);
|
||||
for on_failure in on_failures {
|
||||
let remote_table_info = (on_failure.on_create_table_info_failure)(&r.responses)?
|
||||
let remote_table_info = (on_failure.on_create_table_info_failure)(&mut set)?
|
||||
.context(error::UnexpectedSnafu {
|
||||
err_msg: "Reads the empty table info during the create table metadata",
|
||||
})?
|
||||
.into_inner();
|
||||
|
||||
let remote_table_route = (on_failure.on_create_table_route_failure)(&r.responses)?
|
||||
let remote_table_route = (on_failure.on_create_table_route_failure)(&mut set)?
|
||||
.context(error::UnexpectedSnafu {
|
||||
err_msg: "Reads the empty table route during the create table metadata",
|
||||
})?
|
||||
@@ -708,11 +710,12 @@ impl TableMetadataManager {
|
||||
|
||||
let txn = Txn::merge_all(vec![update_table_name_txn, update_table_info_txn]);
|
||||
|
||||
let r = self.kv_backend.txn(txn).await?;
|
||||
let mut r = self.kv_backend.txn(txn).await?;
|
||||
|
||||
// Checks whether metadata was already updated.
|
||||
if !r.succeeded {
|
||||
let remote_table_info = on_update_table_info_failure(&r.responses)?
|
||||
let mut set = TxnOpGetResponseSet::from(&mut r.responses);
|
||||
let remote_table_info = on_update_table_info_failure(&mut set)?
|
||||
.context(error::UnexpectedSnafu {
|
||||
err_msg: "Reads the empty table info during the rename table metadata",
|
||||
})?
|
||||
@@ -740,11 +743,12 @@ impl TableMetadataManager {
|
||||
.table_info_manager()
|
||||
.build_update_txn(table_id, current_table_info_value, &new_table_info_value)?;
|
||||
|
||||
let r = self.kv_backend.txn(update_table_info_txn).await?;
|
||||
let mut r = self.kv_backend.txn(update_table_info_txn).await?;
|
||||
|
||||
// Checks whether metadata was already updated.
|
||||
if !r.succeeded {
|
||||
let remote_table_info = on_update_table_info_failure(&r.responses)?
|
||||
let mut set = TxnOpGetResponseSet::from(&mut r.responses);
|
||||
let remote_table_info = on_update_table_info_failure(&mut set)?
|
||||
.context(error::UnexpectedSnafu {
|
||||
err_msg: "Reads the empty table info during the updating table info",
|
||||
})?
|
||||
@@ -768,7 +772,7 @@ impl TableMetadataManager {
|
||||
let mut txns = Vec::with_capacity(len);
|
||||
struct OnFailure<F, R>
|
||||
where
|
||||
F: FnOnce(&Vec<TxnOpResponse>) -> R,
|
||||
F: FnOnce(&mut TxnOpGetResponseSet) -> R,
|
||||
{
|
||||
table_info_value: TableInfoValue,
|
||||
on_update_table_info_failure: F,
|
||||
@@ -796,11 +800,12 @@ impl TableMetadataManager {
|
||||
}
|
||||
|
||||
let txn = Txn::merge_all(txns);
|
||||
let r = self.kv_backend.txn(txn).await?;
|
||||
let mut r = self.kv_backend.txn(txn).await?;
|
||||
|
||||
if !r.succeeded {
|
||||
let mut set = TxnOpGetResponseSet::from(&mut r.responses);
|
||||
for on_failure in on_failures {
|
||||
let remote_table_info = (on_failure.on_update_table_info_failure)(&r.responses)?
|
||||
let remote_table_info = (on_failure.on_update_table_info_failure)(&mut set)?
|
||||
.context(error::UnexpectedSnafu {
|
||||
err_msg: "Reads the empty table info during the updating table info",
|
||||
})?
|
||||
@@ -847,11 +852,12 @@ impl TableMetadataManager {
|
||||
|
||||
let txn = Txn::merge_all(vec![update_datanode_table_txn, update_table_route_txn]);
|
||||
|
||||
let r = self.kv_backend.txn(txn).await?;
|
||||
let mut r = self.kv_backend.txn(txn).await?;
|
||||
|
||||
// Checks whether metadata was already updated.
|
||||
if !r.succeeded {
|
||||
let remote_table_route = on_update_table_route_failure(&r.responses)?
|
||||
let mut set = TxnOpGetResponseSet::from(&mut r.responses);
|
||||
let remote_table_route = on_update_table_route_failure(&mut set)?
|
||||
.context(error::UnexpectedSnafu {
|
||||
err_msg: "Reads the empty table route during the updating table route",
|
||||
})?
|
||||
@@ -898,11 +904,12 @@ impl TableMetadataManager {
|
||||
.table_route_storage()
|
||||
.build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
|
||||
|
||||
let r = self.kv_backend.txn(update_table_route_txn).await?;
|
||||
let mut r = self.kv_backend.txn(update_table_route_txn).await?;
|
||||
|
||||
// Checks whether metadata was already updated.
|
||||
if !r.succeeded {
|
||||
let remote_table_route = on_update_table_route_failure(&r.responses)?
|
||||
let mut set = TxnOpGetResponseSet::from(&mut r.responses);
|
||||
let remote_table_route = on_update_table_route_failure(&mut set)?
|
||||
.context(error::UnexpectedSnafu {
|
||||
err_msg: "Reads the empty table route during the updating leader region status",
|
||||
})?
|
||||
|
||||
@@ -18,11 +18,12 @@ use serde::{Deserialize, Serialize};
|
||||
use table::metadata::{RawTableInfo, TableId};
|
||||
use table::table_reference::TableReference;
|
||||
|
||||
use super::txn_helper::TxnOpGetResponseSet;
|
||||
use crate::error::Result;
|
||||
use crate::key::{
|
||||
txn_helper, DeserializedValueWithBytes, TableMetaKey, TableMetaValue, TABLE_INFO_KEY_PREFIX,
|
||||
};
|
||||
use crate::kv_backend::txn::{Txn, TxnOpResponse};
|
||||
use crate::kv_backend::txn::Txn;
|
||||
use crate::kv_backend::KvBackendRef;
|
||||
use crate::rpc::store::BatchGetRequest;
|
||||
use crate::table_name::TableName;
|
||||
@@ -109,7 +110,9 @@ impl TableInfoManager {
|
||||
table_info_value: &TableInfoValue,
|
||||
) -> Result<(
|
||||
Txn,
|
||||
impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<DeserializedValueWithBytes<TableInfoValue>>>,
|
||||
impl FnOnce(
|
||||
&mut TxnOpGetResponseSet,
|
||||
) -> Result<Option<DeserializedValueWithBytes<TableInfoValue>>>,
|
||||
)> {
|
||||
let key = TableInfoKey::new(table_id);
|
||||
let raw_key = key.as_raw_key();
|
||||
@@ -119,7 +122,10 @@ impl TableInfoManager {
|
||||
table_info_value.try_as_raw_value()?,
|
||||
);
|
||||
|
||||
Ok((txn, txn_helper::build_txn_response_decoder_fn(raw_key)))
|
||||
Ok((
|
||||
txn,
|
||||
TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
|
||||
))
|
||||
}
|
||||
|
||||
/// Builds a update table info transaction, it expected the remote value equals the `current_current_table_info_value`.
|
||||
@@ -131,7 +137,9 @@ impl TableInfoManager {
|
||||
new_table_info_value: &TableInfoValue,
|
||||
) -> Result<(
|
||||
Txn,
|
||||
impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<DeserializedValueWithBytes<TableInfoValue>>>,
|
||||
impl FnOnce(
|
||||
&mut TxnOpGetResponseSet,
|
||||
) -> Result<Option<DeserializedValueWithBytes<TableInfoValue>>>,
|
||||
)> {
|
||||
let key = TableInfoKey::new(table_id);
|
||||
let raw_key = key.as_raw_key();
|
||||
@@ -140,7 +148,10 @@ impl TableInfoManager {
|
||||
|
||||
let txn = txn_helper::build_compare_and_put_txn(raw_key.clone(), raw_value, new_raw_value);
|
||||
|
||||
Ok((txn, txn_helper::build_txn_response_decoder_fn(raw_key)))
|
||||
Ok((
|
||||
txn,
|
||||
TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
|
||||
))
|
||||
}
|
||||
|
||||
pub async fn get(
|
||||
|
||||
@@ -20,13 +20,16 @@ use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::storage::{RegionId, RegionNumber};
|
||||
use table::metadata::TableId;
|
||||
|
||||
use super::{txn_helper, DeserializedValueWithBytes, TableMetaValue};
|
||||
use crate::error::{
|
||||
self, MetadataCorruptionSnafu, Result, SerdeJsonSnafu, TableRouteNotFoundSnafu,
|
||||
UnexpectedLogicalRouteTableSnafu,
|
||||
};
|
||||
use crate::key::{RegionDistribution, TableMetaKey, TABLE_ROUTE_PREFIX};
|
||||
use crate::kv_backend::txn::{Txn, TxnOpResponse};
|
||||
use crate::key::txn_helper::TxnOpGetResponseSet;
|
||||
use crate::key::{
|
||||
txn_helper, DeserializedValueWithBytes, RegionDistribution, TableMetaKey, TableMetaValue,
|
||||
TABLE_ROUTE_PREFIX,
|
||||
};
|
||||
use crate::kv_backend::txn::Txn;
|
||||
use crate::kv_backend::KvBackendRef;
|
||||
use crate::rpc::router::{region_distribution, RegionRoute};
|
||||
use crate::rpc::store::BatchGetRequest;
|
||||
@@ -454,7 +457,9 @@ impl TableRouteStorage {
|
||||
table_route_value: &TableRouteValue,
|
||||
) -> Result<(
|
||||
Txn,
|
||||
impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>>,
|
||||
impl FnOnce(
|
||||
&mut TxnOpGetResponseSet,
|
||||
) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>>,
|
||||
)> {
|
||||
let key = TableRouteKey::new(table_id);
|
||||
let raw_key = key.as_raw_key();
|
||||
@@ -464,7 +469,10 @@ impl TableRouteStorage {
|
||||
table_route_value.try_as_raw_value()?,
|
||||
);
|
||||
|
||||
Ok((txn, txn_helper::build_txn_response_decoder_fn(raw_key)))
|
||||
Ok((
|
||||
txn,
|
||||
TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
|
||||
))
|
||||
}
|
||||
|
||||
/// Builds a update table route transaction,
|
||||
@@ -477,7 +485,9 @@ impl TableRouteStorage {
|
||||
new_table_route_value: &TableRouteValue,
|
||||
) -> Result<(
|
||||
Txn,
|
||||
impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>>,
|
||||
impl FnOnce(
|
||||
&mut TxnOpGetResponseSet,
|
||||
) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>>,
|
||||
)> {
|
||||
let key = TableRouteKey::new(table_id);
|
||||
let raw_key = key.as_raw_key();
|
||||
@@ -486,7 +496,10 @@ impl TableRouteStorage {
|
||||
|
||||
let txn = txn_helper::build_compare_and_put_txn(raw_key.clone(), raw_value, new_raw_value);
|
||||
|
||||
Ok((txn, txn_helper::build_txn_response_decoder_fn(raw_key)))
|
||||
Ok((
|
||||
txn,
|
||||
TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
|
||||
))
|
||||
}
|
||||
|
||||
/// Returns the [`TableRouteValue`].
|
||||
|
||||
@@ -21,7 +21,7 @@ use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse};
|
||||
use crate::rpc::KeyValue;
|
||||
|
||||
/// The response set of [TxnOpResponse::ResponseGet]
|
||||
pub(crate) struct TxnOpGetResponseSet(Vec<KeyValue>);
|
||||
pub struct TxnOpGetResponseSet(Vec<KeyValue>);
|
||||
|
||||
impl TxnOpGetResponseSet {
|
||||
/// Returns a filter to consume a [KeyValue] where the key equals `key`.
|
||||
@@ -68,30 +68,6 @@ impl From<&mut Vec<TxnOpResponse>> for TxnOpGetResponseSet {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(weny): using `TxnOpGetResponseSet`.
|
||||
pub(crate) fn build_txn_response_decoder_fn<T>(
|
||||
raw_key: Vec<u8>,
|
||||
) -> impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<DeserializedValueWithBytes<T>>>
|
||||
where
|
||||
T: Serialize + DeserializeOwned + TableMetaValue,
|
||||
{
|
||||
move |txn_res: &Vec<TxnOpResponse>| {
|
||||
txn_res
|
||||
.iter()
|
||||
.filter_map(|resp| {
|
||||
if let TxnOpResponse::ResponseGet(r) = resp {
|
||||
Some(r)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.flat_map(|r| &r.kvs)
|
||||
.find(|kv| kv.key == raw_key)
|
||||
.map(|kv| DeserializedValueWithBytes::from_inner_slice(&kv.value))
|
||||
.transpose()
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn build_put_if_absent_txn(key: Vec<u8>, value: Vec<u8>) -> Txn {
|
||||
Txn::new()
|
||||
.when(vec![Compare::with_not_exist_value(
|
||||
|
||||
Reference in New Issue
Block a user