mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-14 17:23:09 +00:00
fix: LoadBase Selector cannot follow the region distribution rules (#2259)
* fix: LoadBase Selector cannot follow the region distribution rules * chore: apply suggestions from CR
This commit is contained in:
@@ -13,18 +13,53 @@
|
||||
// limitations under the License.
|
||||
|
||||
use api::v1::meta::Peer;
|
||||
use common_meta::key::table_name::TableNameKey;
|
||||
use common_meta::key::TableMetadataManager;
|
||||
use common_meta::rpc::router::find_leaders;
|
||||
use common_telemetry::warn;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::error::{self, Result};
|
||||
use crate::keys::{LeaseKey, LeaseValue, StatKey};
|
||||
use crate::lease;
|
||||
use crate::metasrv::SelectorContext;
|
||||
use crate::selector::{Namespace, Selector};
|
||||
use crate::service::store::kv::KvBackendAdapter;
|
||||
|
||||
const MAX_REGION_NUMBER: u64 = u64::MAX;
|
||||
|
||||
pub struct LoadBasedSelector;
|
||||
|
||||
async fn get_leader_peer_ids(
|
||||
table_metadata_manager: &TableMetadataManager,
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
table: &str,
|
||||
) -> Result<Vec<u64>> {
|
||||
let table_name = table_metadata_manager
|
||||
.table_name_manager()
|
||||
.get(TableNameKey::new(catalog, schema, table))
|
||||
.await
|
||||
.context(error::TableMetadataManagerSnafu)?;
|
||||
|
||||
Ok(if let Some(table_name) = table_name {
|
||||
table_metadata_manager
|
||||
.table_route_manager()
|
||||
.get(table_name.table_id())
|
||||
.await
|
||||
.context(error::TableMetadataManagerSnafu)?
|
||||
.map(|route| {
|
||||
find_leaders(&route.region_routes)
|
||||
.into_iter()
|
||||
.map(|peer| peer.id)
|
||||
.collect()
|
||||
})
|
||||
.unwrap_or_default()
|
||||
} else {
|
||||
Vec::new()
|
||||
})
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Selector for LoadBasedSelector {
|
||||
type Context = SelectorContext;
|
||||
@@ -41,11 +76,22 @@ impl Selector for LoadBasedSelector {
|
||||
let stat_keys: Vec<StatKey> = lease_kvs.keys().map(|k| k.into()).collect();
|
||||
let stat_kvs = ctx.meta_peer_client.get_dn_stat_kvs(stat_keys).await?;
|
||||
|
||||
let leader_peer_ids = if let (Some(catalog), Some(schema), Some(table)) =
|
||||
(&ctx.catalog, &ctx.schema, &ctx.table)
|
||||
{
|
||||
let table_metadata_manager =
|
||||
TableMetadataManager::new(KvBackendAdapter::wrap(ctx.kv_store.clone()));
|
||||
|
||||
get_leader_peer_ids(&table_metadata_manager, catalog, schema, table).await?
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
|
||||
let mut tuples: Vec<(LeaseKey, LeaseValue, u64)> = lease_kvs
|
||||
.into_iter()
|
||||
.filter(|(lease_k, _)| !leader_peer_ids.contains(&lease_k.node_id))
|
||||
.map(|(lease_k, lease_v)| {
|
||||
let stat_key: StatKey = (&lease_k).into();
|
||||
|
||||
let region_num = match stat_kvs
|
||||
.get(&stat_key)
|
||||
.and_then(|stat_val| stat_val.region_num())
|
||||
|
||||
Reference in New Issue
Block a user