mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-26 01:40:36 +00:00
feat: table_routes util (#1849)
This commit is contained in:
@@ -15,9 +15,9 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use api::v1::meta::{
|
||||
router_server, BatchPutRequest, CreateRequest, DeleteRequest, Error, KeyValue,
|
||||
MoveValueRequest, Peer, PeerDict, Region, RegionRoute, ResponseHeader, RouteRequest,
|
||||
RouteResponse, Table, TableRoute, TableRouteValue,
|
||||
router_server, BatchPutRequest, CreateRequest, DeleteRequest, Error, KeyValue, Peer, PeerDict,
|
||||
Region, RegionRoute, ResponseHeader, RouteRequest, RouteResponse, Table, TableRoute,
|
||||
TableRouteValue,
|
||||
};
|
||||
use catalog::helper::{TableGlobalKey, TableGlobalValue};
|
||||
use common_meta::key::TableRouteKey;
|
||||
@@ -36,7 +36,10 @@ use crate::sequence::SequenceRef;
|
||||
use crate::service::store::ext::KvStoreExt;
|
||||
use crate::service::store::kv::KvStoreRef;
|
||||
use crate::service::GrpcResult;
|
||||
use crate::table_routes::{get_table_global_value, get_table_route_value};
|
||||
use crate::table_routes::{
|
||||
get_table_global_value, get_table_route_value, remove_table_global_value,
|
||||
remove_table_route_value,
|
||||
};
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl router_server::Router for MetaSrv {
|
||||
@@ -412,51 +415,3 @@ fn table_route_key(table_id: u64, t: &TableGlobalKey) -> TableRouteKey<'_> {
|
||||
table_name: &t.table_name,
|
||||
}
|
||||
}
|
||||
|
||||
async fn remove_table_route_value(
|
||||
kv_store: &KvStoreRef,
|
||||
key: &TableRouteKey<'_>,
|
||||
) -> Result<(Vec<u8>, TableRouteValue)> {
|
||||
let from_key = key.key().into_bytes();
|
||||
let to_key = key.removed_key().into_bytes();
|
||||
let v = move_value(kv_store, from_key, to_key)
|
||||
.await?
|
||||
.context(error::TableRouteNotFoundSnafu { key: key.key() })?;
|
||||
let trv: TableRouteValue =
|
||||
v.1.as_slice()
|
||||
.try_into()
|
||||
.context(error::DecodeTableRouteSnafu)?;
|
||||
|
||||
Ok((v.0, trv))
|
||||
}
|
||||
|
||||
async fn remove_table_global_value(
|
||||
kv_store: &KvStoreRef,
|
||||
key: &TableGlobalKey,
|
||||
) -> Result<(Vec<u8>, TableGlobalValue)> {
|
||||
let key = key.to_string();
|
||||
let removed_key = crate::keys::to_removed_key(&key);
|
||||
let kv = move_value(kv_store, key.as_bytes(), removed_key)
|
||||
.await?
|
||||
.context(error::TableNotFoundSnafu { name: key })?;
|
||||
let value: TableGlobalValue =
|
||||
TableGlobalValue::from_bytes(&kv.1).context(error::InvalidCatalogValueSnafu)?;
|
||||
Ok((kv.0, value))
|
||||
}
|
||||
|
||||
async fn move_value(
|
||||
kv_store: &KvStoreRef,
|
||||
from_key: impl Into<Vec<u8>>,
|
||||
to_key: impl Into<Vec<u8>>,
|
||||
) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
|
||||
let from_key = from_key.into();
|
||||
let to_key = to_key.into();
|
||||
let move_req = MoveValueRequest {
|
||||
from_key,
|
||||
to_key,
|
||||
..Default::default()
|
||||
};
|
||||
let res = kv_store.move_value(move_req).await?;
|
||||
|
||||
Ok(res.kv.map(|kv| (kv.key, kv.value)))
|
||||
}
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use api::v1::meta::{PutRequest, TableRouteValue};
|
||||
use api::v1::meta::{MoveValueRequest, PutRequest, TableRouteValue};
|
||||
use catalog::helper::{TableGlobalKey, TableGlobalValue};
|
||||
use common_meta::key::TableRouteKey;
|
||||
use common_meta::rpc::store::{BatchGetRequest, BatchGetResponse};
|
||||
@@ -22,7 +22,7 @@ use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use crate::error::{
|
||||
ConvertProtoDataSnafu, DecodeTableRouteSnafu, InvalidCatalogValueSnafu, Result,
|
||||
TableRouteNotFoundSnafu,
|
||||
TableNotFoundSnafu, TableRouteNotFoundSnafu,
|
||||
};
|
||||
use crate::service::store::ext::KvStoreExt;
|
||||
use crate::service::store::kv::KvStoreRef;
|
||||
@@ -80,6 +80,20 @@ pub(crate) async fn put_table_global_value(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn remove_table_global_value(
|
||||
kv_store: &KvStoreRef,
|
||||
key: &TableGlobalKey,
|
||||
) -> Result<(Vec<u8>, TableGlobalValue)> {
|
||||
let key = key.to_string();
|
||||
let removed_key = crate::keys::to_removed_key(&key);
|
||||
let kv = move_value(kv_store, key.as_bytes(), removed_key)
|
||||
.await?
|
||||
.context(TableNotFoundSnafu { name: key })?;
|
||||
let value: TableGlobalValue =
|
||||
TableGlobalValue::from_bytes(&kv.1).context(InvalidCatalogValueSnafu)?;
|
||||
Ok((kv.0, value))
|
||||
}
|
||||
|
||||
pub(crate) async fn get_table_route_value(
|
||||
kv_store: &KvStoreRef,
|
||||
key: &TableRouteKey<'_>,
|
||||
@@ -109,6 +123,37 @@ pub(crate) async fn put_table_route_value(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn remove_table_route_value(
|
||||
kv_store: &KvStoreRef,
|
||||
key: &TableRouteKey<'_>,
|
||||
) -> Result<(Vec<u8>, TableRouteValue)> {
|
||||
let from_key = key.key().into_bytes();
|
||||
let to_key = key.removed_key().into_bytes();
|
||||
let v = move_value(kv_store, from_key, to_key)
|
||||
.await?
|
||||
.context(TableRouteNotFoundSnafu { key: key.key() })?;
|
||||
let trv: TableRouteValue = v.1.as_slice().try_into().context(DecodeTableRouteSnafu)?;
|
||||
|
||||
Ok((v.0, trv))
|
||||
}
|
||||
|
||||
async fn move_value(
|
||||
kv_store: &KvStoreRef,
|
||||
from_key: impl Into<Vec<u8>>,
|
||||
to_key: impl Into<Vec<u8>>,
|
||||
) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
|
||||
let from_key = from_key.into();
|
||||
let to_key = to_key.into();
|
||||
let move_req = MoveValueRequest {
|
||||
from_key,
|
||||
to_key,
|
||||
..Default::default()
|
||||
};
|
||||
let res = kv_store.move_value(move_req).await?;
|
||||
|
||||
Ok(res.kv.map(|kv| (kv.key, kv.value)))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) mod tests {
|
||||
use std::collections::HashMap;
|
||||
|
||||
Reference in New Issue
Block a user