mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-28 00:42:56 +00:00
feat: ignore internal keys in metadata snapshots (#6606)
feat: ignore dumpping internal keys Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
@@ -182,6 +182,11 @@ pub const KAFKA_TOPIC_KEY_PREFIX: &str = "__topic_name/kafka";
|
||||
pub const LEGACY_TOPIC_KEY_PREFIX: &str = "__created_wal_topics/kafka";
|
||||
pub const TOPIC_REGION_PREFIX: &str = "__topic_region";
|
||||
|
||||
/// The election key.
|
||||
pub const ELECTION_KEY: &str = "__metasrv_election";
|
||||
/// The root key of metasrv election candidates.
|
||||
pub const CANDIDATES_ROOT: &str = "__metasrv_election_candidates/";
|
||||
|
||||
/// The keys with these prefixes will be loaded into the cache when the leader starts.
|
||||
pub const CACHE_KEY_PREFIXES: [&str; 5] = [
|
||||
TABLE_NAME_KEY_PREFIX,
|
||||
|
||||
@@ -21,7 +21,7 @@ use std::time::Instant;
|
||||
|
||||
use common_telemetry::info;
|
||||
use file::{Metadata, MetadataContent};
|
||||
use futures::TryStreamExt;
|
||||
use futures::{future, TryStreamExt};
|
||||
use object_store::ObjectStore;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use strum::Display;
|
||||
@@ -30,6 +30,7 @@ use crate::error::{
|
||||
Error, InvalidFileExtensionSnafu, InvalidFileNameSnafu, InvalidFilePathSnafu, ReadObjectSnafu,
|
||||
Result, WriteObjectSnafu,
|
||||
};
|
||||
use crate::key::{CANDIDATES_ROOT, ELECTION_KEY};
|
||||
use crate::kv_backend::KvBackendRef;
|
||||
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
|
||||
use crate::rpc::store::{BatchPutRequest, RangeRequest};
|
||||
@@ -162,6 +163,11 @@ pub struct MetadataSnapshotManager {
|
||||
/// The maximum size of the request to put metadata, use 1MiB by default.
|
||||
const MAX_REQUEST_SIZE: usize = 1024 * 1024;
|
||||
|
||||
/// Returns true if the key is an internal key.
|
||||
fn is_internal_key(kv: &FileKeyValue) -> bool {
|
||||
kv.key == ELECTION_KEY.as_bytes() || kv.key == CANDIDATES_ROOT.as_bytes()
|
||||
}
|
||||
|
||||
impl MetadataSnapshotManager {
|
||||
pub fn new(kv_backend: KvBackendRef, object_store: ObjectStore) -> Self {
|
||||
Self {
|
||||
@@ -250,7 +256,10 @@ impl MetadataSnapshotManager {
|
||||
})
|
||||
})
|
||||
.into_stream();
|
||||
let keyvalues = stream.try_collect::<Vec<_>>().await?;
|
||||
let keyvalues = stream
|
||||
.try_filter(|f| future::ready(!is_internal_key(f)))
|
||||
.try_collect::<Vec<_>>()
|
||||
.await?;
|
||||
let num_keyvalues = keyvalues.len();
|
||||
let document = Document::new(
|
||||
Metadata::new(),
|
||||
|
||||
@@ -27,9 +27,6 @@ use tokio::sync::broadcast::{self, Receiver, Sender};
|
||||
use crate::error::Result;
|
||||
use crate::metasrv::MetasrvNodeInfo;
|
||||
|
||||
pub const ELECTION_KEY: &str = "__metasrv_election";
|
||||
pub const CANDIDATES_ROOT: &str = "__metasrv_election_candidates/";
|
||||
|
||||
pub(crate) const CANDIDATE_LEASE_SECS: u64 = 600;
|
||||
const KEEP_ALIVE_INTERVAL_SECS: u64 = CANDIDATE_LEASE_SECS / 2;
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS};
|
||||
use common_meta::key::{CANDIDATES_ROOT, ELECTION_KEY};
|
||||
use common_telemetry::{error, info, warn};
|
||||
use etcd_client::{
|
||||
Client, GetOptions, LeaderKey as EtcdLeaderKey, LeaseKeepAliveStream, LeaseKeeper, PutOptions,
|
||||
@@ -28,7 +29,7 @@ use tokio::time::{timeout, MissedTickBehavior};
|
||||
|
||||
use crate::election::{
|
||||
listen_leader_change, send_leader_change_and_set_flags, Election, LeaderChangeMessage,
|
||||
LeaderKey, CANDIDATES_ROOT, CANDIDATE_LEASE_SECS, ELECTION_KEY, KEEP_ALIVE_INTERVAL_SECS,
|
||||
LeaderKey, CANDIDATE_LEASE_SECS, KEEP_ALIVE_INTERVAL_SECS,
|
||||
};
|
||||
use crate::error;
|
||||
use crate::error::Result;
|
||||
|
||||
@@ -16,6 +16,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use common_meta::key::{CANDIDATES_ROOT, ELECTION_KEY};
|
||||
use common_telemetry::{error, warn};
|
||||
use common_time::Timestamp;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
@@ -29,7 +30,6 @@ use tokio::time::MissedTickBehavior;
|
||||
use crate::election::rds::{parse_value_and_expire_time, Lease, RdsLeaderKey, LEASE_SEP};
|
||||
use crate::election::{
|
||||
listen_leader_change, send_leader_change_and_set_flags, Election, LeaderChangeMessage,
|
||||
CANDIDATES_ROOT, ELECTION_KEY,
|
||||
};
|
||||
use crate::error::{
|
||||
AcquireMySqlClientSnafu, DecodeSqlValueSnafu, DeserializeFromJsonSnafu,
|
||||
|
||||
@@ -16,6 +16,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use common_meta::key::{CANDIDATES_ROOT, ELECTION_KEY};
|
||||
use common_telemetry::{error, warn};
|
||||
use common_time::Timestamp;
|
||||
use deadpool_postgres::{Manager, Pool};
|
||||
@@ -28,7 +29,6 @@ use tokio_postgres::Row;
|
||||
use crate::election::rds::{parse_value_and_expire_time, Lease, RdsLeaderKey, LEASE_SEP};
|
||||
use crate::election::{
|
||||
listen_leader_change, send_leader_change_and_set_flags, Election, LeaderChangeMessage,
|
||||
CANDIDATES_ROOT, ELECTION_KEY,
|
||||
};
|
||||
use crate::error::{
|
||||
DeserializeFromJsonSnafu, GetPostgresClientSnafu, NoLeaderSnafu, PostgresExecutionSnafu,
|
||||
|
||||
Reference in New Issue
Block a user