diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 2862d9d402..8621d772d5 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -182,6 +182,11 @@ pub const KAFKA_TOPIC_KEY_PREFIX: &str = "__topic_name/kafka"; pub const LEGACY_TOPIC_KEY_PREFIX: &str = "__created_wal_topics/kafka"; pub const TOPIC_REGION_PREFIX: &str = "__topic_region"; +/// The election key. +pub const ELECTION_KEY: &str = "__metasrv_election"; +/// The root key of metasrv election candidates. +pub const CANDIDATES_ROOT: &str = "__metasrv_election_candidates/"; + /// The keys with these prefixes will be loaded into the cache when the leader starts. pub const CACHE_KEY_PREFIXES: [&str; 5] = [ TABLE_NAME_KEY_PREFIX, diff --git a/src/common/meta/src/snapshot.rs b/src/common/meta/src/snapshot.rs index 6cd94768a9..e0942dde5f 100644 --- a/src/common/meta/src/snapshot.rs +++ b/src/common/meta/src/snapshot.rs @@ -21,7 +21,7 @@ use std::time::Instant; use common_telemetry::info; use file::{Metadata, MetadataContent}; -use futures::TryStreamExt; +use futures::{future, TryStreamExt}; use object_store::ObjectStore; use snafu::{OptionExt, ResultExt}; use strum::Display; @@ -30,6 +30,7 @@ use crate::error::{ Error, InvalidFileExtensionSnafu, InvalidFileNameSnafu, InvalidFilePathSnafu, ReadObjectSnafu, Result, WriteObjectSnafu, }; +use crate::key::{CANDIDATES_ROOT, ELECTION_KEY}; use crate::kv_backend::KvBackendRef; use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; use crate::rpc::store::{BatchPutRequest, RangeRequest}; @@ -162,6 +163,11 @@ pub struct MetadataSnapshotManager { /// The maximum size of the request to put metadata, use 1MiB by default. const MAX_REQUEST_SIZE: usize = 1024 * 1024; +/// Returns true if the key is an internal key. +fn is_internal_key(kv: &FileKeyValue) -> bool { + kv.key == ELECTION_KEY.as_bytes() || kv.key == CANDIDATES_ROOT.as_bytes() +} + impl MetadataSnapshotManager { pub fn new(kv_backend: KvBackendRef, object_store: ObjectStore) -> Self { Self { @@ -250,7 +256,10 @@ impl MetadataSnapshotManager { }) }) .into_stream(); - let keyvalues = stream.try_collect::>().await?; + let keyvalues = stream + .try_filter(|f| future::ready(!is_internal_key(f))) + .try_collect::>() + .await?; let num_keyvalues = keyvalues.len(); let document = Document::new( Metadata::new(), diff --git a/src/meta-srv/src/election.rs b/src/meta-srv/src/election.rs index 8163e2b9ad..f8d4a1e2bd 100644 --- a/src/meta-srv/src/election.rs +++ b/src/meta-srv/src/election.rs @@ -27,9 +27,6 @@ use tokio::sync::broadcast::{self, Receiver, Sender}; use crate::error::Result; use crate::metasrv::MetasrvNodeInfo; -pub const ELECTION_KEY: &str = "__metasrv_election"; -pub const CANDIDATES_ROOT: &str = "__metasrv_election_candidates/"; - pub(crate) const CANDIDATE_LEASE_SECS: u64 = 600; const KEEP_ALIVE_INTERVAL_SECS: u64 = CANDIDATE_LEASE_SECS / 2; diff --git a/src/meta-srv/src/election/etcd.rs b/src/meta-srv/src/election/etcd.rs index bf4bfa049d..936f9548ac 100644 --- a/src/meta-srv/src/election/etcd.rs +++ b/src/meta-srv/src/election/etcd.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use std::time::Duration; use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS}; +use common_meta::key::{CANDIDATES_ROOT, ELECTION_KEY}; use common_telemetry::{error, info, warn}; use etcd_client::{ Client, GetOptions, LeaderKey as EtcdLeaderKey, LeaseKeepAliveStream, LeaseKeeper, PutOptions, @@ -28,7 +29,7 @@ use tokio::time::{timeout, MissedTickBehavior}; use crate::election::{ listen_leader_change, send_leader_change_and_set_flags, Election, LeaderChangeMessage, - LeaderKey, CANDIDATES_ROOT, CANDIDATE_LEASE_SECS, ELECTION_KEY, KEEP_ALIVE_INTERVAL_SECS, + LeaderKey, CANDIDATE_LEASE_SECS, KEEP_ALIVE_INTERVAL_SECS, }; use crate::error; use crate::error::Result; diff --git a/src/meta-srv/src/election/rds/mysql.rs b/src/meta-srv/src/election/rds/mysql.rs index 27b348a83a..e36e1bfacd 100644 --- a/src/meta-srv/src/election/rds/mysql.rs +++ b/src/meta-srv/src/election/rds/mysql.rs @@ -16,6 +16,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; +use common_meta::key::{CANDIDATES_ROOT, ELECTION_KEY}; use common_telemetry::{error, warn}; use common_time::Timestamp; use snafu::{ensure, OptionExt, ResultExt}; @@ -29,7 +30,6 @@ use tokio::time::MissedTickBehavior; use crate::election::rds::{parse_value_and_expire_time, Lease, RdsLeaderKey, LEASE_SEP}; use crate::election::{ listen_leader_change, send_leader_change_and_set_flags, Election, LeaderChangeMessage, - CANDIDATES_ROOT, ELECTION_KEY, }; use crate::error::{ AcquireMySqlClientSnafu, DecodeSqlValueSnafu, DeserializeFromJsonSnafu, diff --git a/src/meta-srv/src/election/rds/postgres.rs b/src/meta-srv/src/election/rds/postgres.rs index b8c4ff718e..7caa3a249b 100644 --- a/src/meta-srv/src/election/rds/postgres.rs +++ b/src/meta-srv/src/election/rds/postgres.rs @@ -16,6 +16,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; +use common_meta::key::{CANDIDATES_ROOT, ELECTION_KEY}; use common_telemetry::{error, warn}; use common_time::Timestamp; use deadpool_postgres::{Manager, Pool}; @@ -28,7 +29,6 @@ use tokio_postgres::Row; use crate::election::rds::{parse_value_and_expire_time, Lease, RdsLeaderKey, LEASE_SEP}; use crate::election::{ listen_leader_change, send_leader_change_and_set_flags, Election, LeaderChangeMessage, - CANDIDATES_ROOT, ELECTION_KEY, }; use crate::error::{ DeserializeFromJsonSnafu, GetPostgresClientSnafu, NoLeaderSnafu, PostgresExecutionSnafu,