From b593e51eaea6ec53af22503332c49488ff9bb055 Mon Sep 17 00:00:00 2001 From: Evan Fleming Date: Mon, 9 Dec 2024 13:09:20 -0800 Subject: [PATCH] safekeeper: use arc for global timelines and config (#10051) Hello! I was interested in potentially making some contributions to Neon and looking through the issue backlog I found [8200](https://github.com/neondatabase/neon/issues/8200) which seemed like a good first issue to attempt to tackle. I see it was assigned a while ago so apologies if I'm stepping on any toes with this PR. I also apologize for the size of this PR. I'm not sure if there is a simple way to reduce it given the footprint of the components being changed. ## Problem This PR is attempting to address part of the problem outlined in issue [8200](https://github.com/neondatabase/neon/issues/8200). Namely to remove global static usage of timeline state in favour of `Arc` and to replace wasteful clones of `SafeKeeperConf` with `Arc`. I did not opt to tackle `RemoteStorage` in this PR to minimize the amount of changes as this PR is already quite large. I also did not opt to introduce an `SafekeeperApp` wrapper struct to similarly minimize changes but I can tackle either or both of these omissions in this PR if folks would like. ## Summary of changes - Remove static usage of `GlobalTimelines` in favour of `Arc` - Wrap `SafeKeeperConf` in `Arc` to avoid wasteful clones of the underlying struct ## Some additional thoughts - We seem to currently store `SafeKeeperConf` in `GlobalTimelines` and then expose it through a public`get_global_config` function which requires locking. This seems needlessly wasteful and based on observed usage we could remove this public accessor and force consumers to acquire `SafeKeeperConf` through the new Arc reference. --- safekeeper/benches/benchutils.rs | 10 +- safekeeper/src/bin/safekeeper.rs | 36 ++++--- safekeeper/src/broker.rs | 37 ++++--- safekeeper/src/copy_timeline.rs | 26 ++--- safekeeper/src/debug_dump.rs | 18 ++-- safekeeper/src/handler.rs | 14 ++- safekeeper/src/http/mod.rs | 8 +- safekeeper/src/http/routes.rs | 74 +++++++++----- safekeeper/src/json_ctrl.rs | 30 +++--- safekeeper/src/metrics.rs | 19 ++-- safekeeper/src/pull_timeline.rs | 12 ++- safekeeper/src/receive_wal.rs | 11 ++- safekeeper/src/send_wal.rs | 6 +- safekeeper/src/timeline.rs | 25 +++-- safekeeper/src/timelines_global_map.rs | 128 ++++++++++++------------- safekeeper/src/wal_service.rs | 22 +++-- 16 files changed, 283 insertions(+), 193 deletions(-) diff --git a/safekeeper/benches/benchutils.rs b/safekeeper/benches/benchutils.rs index 4e8dc58c49..48d796221b 100644 --- a/safekeeper/benches/benchutils.rs +++ b/safekeeper/benches/benchutils.rs @@ -83,14 +83,20 @@ impl Env { node_id: NodeId, ttid: TenantTimelineId, ) -> anyhow::Result> { - let conf = self.make_conf(node_id); + let conf = Arc::new(self.make_conf(node_id)); let timeline_dir = get_timeline_dir(&conf, &ttid); let remote_path = remote_timeline_path(&ttid)?; let safekeeper = self.make_safekeeper(node_id, ttid).await?; let shared_state = SharedState::new(StateSK::Loaded(safekeeper)); - let timeline = Timeline::new(ttid, &timeline_dir, &remote_path, shared_state); + let timeline = Timeline::new( + ttid, + &timeline_dir, + &remote_path, + shared_state, + conf.clone(), + ); timeline.bootstrap( &mut timeline.write_shared_state().await, &conf, diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index 4dc7edef37..13f6e34575 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -338,7 +338,7 @@ async fn main() -> anyhow::Result<()> { } }; - let conf = SafeKeeperConf { + let conf = Arc::new(SafeKeeperConf { workdir, my_id: id, listen_pg_addr: args.listen_pg, @@ -368,7 +368,7 @@ async fn main() -> anyhow::Result<()> { control_file_save_interval: args.control_file_save_interval, partial_backup_concurrency: args.partial_backup_concurrency, eviction_min_resident: args.eviction_min_resident, - }; + }); // initialize sentry if SENTRY_DSN is provided let _sentry_guard = init_sentry( @@ -382,7 +382,7 @@ async fn main() -> anyhow::Result<()> { /// complete, e.g. panicked, inner is error produced by task itself. type JoinTaskRes = Result, JoinError>; -async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { +async fn start_safekeeper(conf: Arc) -> Result<()> { // fsync the datadir to make sure we have a consistent state on disk. if !conf.no_sync { let dfd = File::open(&conf.workdir).context("open datadir for syncfs")?; @@ -428,9 +428,11 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { e })?; + let global_timelines = Arc::new(GlobalTimelines::new(conf.clone())); + // Register metrics collector for active timelines. It's important to do this // after daemonizing, otherwise process collector will be upset. - let timeline_collector = safekeeper::metrics::TimelineCollector::new(); + let timeline_collector = safekeeper::metrics::TimelineCollector::new(global_timelines.clone()); metrics::register_internal(Box::new(timeline_collector))?; wal_backup::init_remote_storage(&conf).await; @@ -447,9 +449,8 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { .then(|| Handle::try_current().expect("no runtime in main")); // Load all timelines from disk to memory. - GlobalTimelines::init(conf.clone()).await?; + global_timelines.init().await?; - let conf_ = conf.clone(); // Run everything in current thread rt, if asked. if conf.current_thread_runtime { info!("running in current thread runtime"); @@ -459,14 +460,16 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { .as_ref() .unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle()) .spawn(wal_service::task_main( - conf_, + conf.clone(), pg_listener, Scope::SafekeeperData, + global_timelines.clone(), )) // wrap with task name for error reporting .map(|res| ("WAL service main".to_owned(), res)); tasks_handles.push(Box::pin(wal_service_handle)); + let global_timelines_ = global_timelines.clone(); let timeline_housekeeping_handle = current_thread_rt .as_ref() .unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle()) @@ -474,40 +477,45 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { const TOMBSTONE_TTL: Duration = Duration::from_secs(3600 * 24); loop { tokio::time::sleep(TOMBSTONE_TTL).await; - GlobalTimelines::housekeeping(&TOMBSTONE_TTL); + global_timelines_.housekeeping(&TOMBSTONE_TTL); } }) .map(|res| ("Timeline map housekeeping".to_owned(), res)); tasks_handles.push(Box::pin(timeline_housekeeping_handle)); if let Some(pg_listener_tenant_only) = pg_listener_tenant_only { - let conf_ = conf.clone(); let wal_service_handle = current_thread_rt .as_ref() .unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle()) .spawn(wal_service::task_main( - conf_, + conf.clone(), pg_listener_tenant_only, Scope::Tenant, + global_timelines.clone(), )) // wrap with task name for error reporting .map(|res| ("WAL service tenant only main".to_owned(), res)); tasks_handles.push(Box::pin(wal_service_handle)); } - let conf_ = conf.clone(); let http_handle = current_thread_rt .as_ref() .unwrap_or_else(|| HTTP_RUNTIME.handle()) - .spawn(http::task_main(conf_, http_listener)) + .spawn(http::task_main( + conf.clone(), + http_listener, + global_timelines.clone(), + )) .map(|res| ("HTTP service main".to_owned(), res)); tasks_handles.push(Box::pin(http_handle)); - let conf_ = conf.clone(); let broker_task_handle = current_thread_rt .as_ref() .unwrap_or_else(|| BROKER_RUNTIME.handle()) - .spawn(broker::task_main(conf_).instrument(info_span!("broker"))) + .spawn( + broker::task_main(conf.clone(), global_timelines.clone()) + .instrument(info_span!("broker")), + ) .map(|res| ("broker main".to_owned(), res)); tasks_handles.push(Box::pin(broker_task_handle)); diff --git a/safekeeper/src/broker.rs b/safekeeper/src/broker.rs index 485816408f..4b091e2c29 100644 --- a/safekeeper/src/broker.rs +++ b/safekeeper/src/broker.rs @@ -39,14 +39,17 @@ const RETRY_INTERVAL_MSEC: u64 = 1000; const PUSH_INTERVAL_MSEC: u64 = 1000; /// Push once in a while data about all active timelines to the broker. -async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> { +async fn push_loop( + conf: Arc, + global_timelines: Arc, +) -> anyhow::Result<()> { if conf.disable_periodic_broker_push { info!("broker push_loop is disabled, doing nothing..."); futures::future::pending::<()>().await; // sleep forever return Ok(()); } - let active_timelines_set = GlobalTimelines::get_global_broker_active_set(); + let active_timelines_set = global_timelines.get_global_broker_active_set(); let mut client = storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?; @@ -87,8 +90,13 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> { /// Subscribe and fetch all the interesting data from the broker. #[instrument(name = "broker_pull", skip_all)] -async fn pull_loop(conf: SafeKeeperConf, stats: Arc) -> Result<()> { - let mut client = storage_broker::connect(conf.broker_endpoint, conf.broker_keepalive_interval)?; +async fn pull_loop( + conf: Arc, + global_timelines: Arc, + stats: Arc, +) -> Result<()> { + let mut client = + storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?; // TODO: subscribe only to local timelines instead of all let request = SubscribeSafekeeperInfoRequest { @@ -113,7 +121,7 @@ async fn pull_loop(conf: SafeKeeperConf, stats: Arc) -> Result<()> .as_ref() .ok_or_else(|| anyhow!("missing tenant_timeline_id"))?; let ttid = parse_proto_ttid(proto_ttid)?; - if let Ok(tli) = GlobalTimelines::get(ttid) { + if let Ok(tli) = global_timelines.get(ttid) { // Note that we also receive *our own* info. That's // important, as it is used as an indication of live // connection to the broker. @@ -135,7 +143,11 @@ async fn pull_loop(conf: SafeKeeperConf, stats: Arc) -> Result<()> /// Process incoming discover requests. This is done in a separate task to avoid /// interfering with the normal pull/push loops. -async fn discover_loop(conf: SafeKeeperConf, stats: Arc) -> Result<()> { +async fn discover_loop( + conf: Arc, + global_timelines: Arc, + stats: Arc, +) -> Result<()> { let mut client = storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?; @@ -171,7 +183,7 @@ async fn discover_loop(conf: SafeKeeperConf, stats: Arc) -> Result< .as_ref() .ok_or_else(|| anyhow!("missing tenant_timeline_id"))?; let ttid = parse_proto_ttid(proto_ttid)?; - if let Ok(tli) = GlobalTimelines::get(ttid) { + if let Ok(tli) = global_timelines.get(ttid) { // we received a discovery request for a timeline we know about discover_counter.inc(); @@ -210,7 +222,10 @@ async fn discover_loop(conf: SafeKeeperConf, stats: Arc) -> Result< bail!("end of stream"); } -pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> { +pub async fn task_main( + conf: Arc, + global_timelines: Arc, +) -> anyhow::Result<()> { info!("started, broker endpoint {:?}", conf.broker_endpoint); let mut ticker = tokio::time::interval(Duration::from_millis(RETRY_INTERVAL_MSEC)); @@ -261,13 +276,13 @@ pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> { }, _ = ticker.tick() => { if push_handle.is_none() { - push_handle = Some(tokio::spawn(push_loop(conf.clone()))); + push_handle = Some(tokio::spawn(push_loop(conf.clone(), global_timelines.clone()))); } if pull_handle.is_none() { - pull_handle = Some(tokio::spawn(pull_loop(conf.clone(), stats.clone()))); + pull_handle = Some(tokio::spawn(pull_loop(conf.clone(), global_timelines.clone(), stats.clone()))); } if discover_handle.is_none() { - discover_handle = Some(tokio::spawn(discover_loop(conf.clone(), stats.clone()))); + discover_handle = Some(tokio::spawn(discover_loop(conf.clone(), global_timelines.clone(), stats.clone()))); } }, _ = &mut stats_task => {} diff --git a/safekeeper/src/copy_timeline.rs b/safekeeper/src/copy_timeline.rs index 07fa98212f..28ef2b1d23 100644 --- a/safekeeper/src/copy_timeline.rs +++ b/safekeeper/src/copy_timeline.rs @@ -1,9 +1,7 @@ -use std::sync::Arc; - use anyhow::{bail, Result}; use camino::Utf8PathBuf; - use postgres_ffi::{MAX_SEND_SIZE, WAL_SEGMENT_SIZE}; +use std::sync::Arc; use tokio::{ fs::OpenOptions, io::{AsyncSeekExt, AsyncWriteExt}, @@ -14,7 +12,7 @@ use utils::{id::TenantTimelineId, lsn::Lsn}; use crate::{ control_file::FileStorage, state::TimelinePersistentState, - timeline::{Timeline, TimelineError, WalResidentTimeline}, + timeline::{TimelineError, WalResidentTimeline}, timelines_global_map::{create_temp_timeline_dir, validate_temp_timeline}, wal_backup::copy_s3_segments, wal_storage::{wal_file_paths, WalReader}, @@ -25,16 +23,19 @@ use crate::{ const MAX_BACKUP_LAG: u64 = 10 * WAL_SEGMENT_SIZE as u64; pub struct Request { - pub source: Arc, + pub source_ttid: TenantTimelineId, pub until_lsn: Lsn, pub destination_ttid: TenantTimelineId, } -pub async fn handle_request(request: Request) -> Result<()> { +pub async fn handle_request( + request: Request, + global_timelines: Arc, +) -> Result<()> { // TODO: request.until_lsn MUST be a valid LSN, and we cannot check it :( // if LSN will point to the middle of a WAL record, timeline will be in "broken" state - match GlobalTimelines::get(request.destination_ttid) { + match global_timelines.get(request.destination_ttid) { // timeline already exists. would be good to check that this timeline is the copy // of the source timeline, but it isn't obvious how to do that Ok(_) => return Ok(()), @@ -46,9 +47,10 @@ pub async fn handle_request(request: Request) -> Result<()> { } } - let source_tli = request.source.wal_residence_guard().await?; + let source = global_timelines.get(request.source_ttid)?; + let source_tli = source.wal_residence_guard().await?; - let conf = &GlobalTimelines::get_global_config(); + let conf = &global_timelines.get_global_config(); let ttid = request.destination_ttid; let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?; @@ -127,7 +129,7 @@ pub async fn handle_request(request: Request) -> Result<()> { copy_s3_segments( wal_seg_size, - &request.source.ttid, + &request.source_ttid, &request.destination_ttid, first_segment, first_ondisk_segment, @@ -158,7 +160,9 @@ pub async fn handle_request(request: Request) -> Result<()> { // now we have a ready timeline in a temp directory validate_temp_timeline(conf, request.destination_ttid, &tli_dir_path).await?; - GlobalTimelines::load_temp_timeline(request.destination_ttid, &tli_dir_path, true).await?; + global_timelines + .load_temp_timeline(request.destination_ttid, &tli_dir_path, true) + .await?; Ok(()) } diff --git a/safekeeper/src/debug_dump.rs b/safekeeper/src/debug_dump.rs index a2d0c49768..93011eddec 100644 --- a/safekeeper/src/debug_dump.rs +++ b/safekeeper/src/debug_dump.rs @@ -207,23 +207,23 @@ pub struct FileInfo { } /// Build debug dump response, using the provided [`Args`] filters. -pub async fn build(args: Args) -> Result { +pub async fn build(args: Args, global_timelines: Arc) -> Result { let start_time = Utc::now(); - let timelines_count = GlobalTimelines::timelines_count(); - let config = GlobalTimelines::get_global_config(); + let timelines_count = global_timelines.timelines_count(); + let config = global_timelines.get_global_config(); let ptrs_snapshot = if args.tenant_id.is_some() && args.timeline_id.is_some() { // If both tenant_id and timeline_id are specified, we can just get the // timeline directly, without taking a snapshot of the whole list. let ttid = TenantTimelineId::new(args.tenant_id.unwrap(), args.timeline_id.unwrap()); - if let Ok(tli) = GlobalTimelines::get(ttid) { + if let Ok(tli) = global_timelines.get(ttid) { vec![tli] } else { vec![] } } else { // Otherwise, take a snapshot of the whole list. - GlobalTimelines::get_all() + global_timelines.get_all() }; let mut timelines = Vec::new(); @@ -344,12 +344,12 @@ fn get_wal_last_modified(path: &Utf8Path) -> Result>> { /// Converts SafeKeeperConf to Config, filtering out the fields that are not /// supposed to be exposed. -fn build_config(config: SafeKeeperConf) -> Config { +fn build_config(config: Arc) -> Config { Config { id: config.my_id, - workdir: config.workdir.into(), - listen_pg_addr: config.listen_pg_addr, - listen_http_addr: config.listen_http_addr, + workdir: config.workdir.clone().into(), + listen_pg_addr: config.listen_pg_addr.clone(), + listen_http_addr: config.listen_http_addr.clone(), no_sync: config.no_sync, max_offloader_lag_bytes: config.max_offloader_lag_bytes, wal_backup_enabled: config.wal_backup_enabled, diff --git a/safekeeper/src/handler.rs b/safekeeper/src/handler.rs index 8dd2929a03..2ca6333ba8 100644 --- a/safekeeper/src/handler.rs +++ b/safekeeper/src/handler.rs @@ -33,7 +33,7 @@ use utils::{ /// Safekeeper handler of postgres commands pub struct SafekeeperPostgresHandler { - pub conf: SafeKeeperConf, + pub conf: Arc, /// assigned application name pub appname: Option, pub tenant_id: Option, @@ -43,6 +43,7 @@ pub struct SafekeeperPostgresHandler { pub protocol: Option, /// Unique connection id is logged in spans for observability. pub conn_id: ConnectionId, + pub global_timelines: Arc, /// Auth scope allowed on the connections and public key used to check auth tokens. None if auth is not configured. auth: Option<(Scope, Arc)>, claims: Option, @@ -314,10 +315,11 @@ impl postgres_backend::Handler impl SafekeeperPostgresHandler { pub fn new( - conf: SafeKeeperConf, + conf: Arc, conn_id: u32, io_metrics: Option, auth: Option<(Scope, Arc)>, + global_timelines: Arc, ) -> Self { SafekeeperPostgresHandler { conf, @@ -331,6 +333,7 @@ impl SafekeeperPostgresHandler { claims: None, auth, io_metrics, + global_timelines, } } @@ -360,7 +363,7 @@ impl SafekeeperPostgresHandler { pgb: &mut PostgresBackend, ) -> Result<(), QueryError> { // Get timeline, handling "not found" error - let tli = match GlobalTimelines::get(self.ttid) { + let tli = match self.global_timelines.get(self.ttid) { Ok(tli) => Ok(Some(tli)), Err(TimelineError::NotFound(_)) => Ok(None), Err(e) => Err(QueryError::Other(e.into())), @@ -394,7 +397,10 @@ impl SafekeeperPostgresHandler { &mut self, pgb: &mut PostgresBackend, ) -> Result<(), QueryError> { - let tli = GlobalTimelines::get(self.ttid).map_err(|e| QueryError::Other(e.into()))?; + let tli = self + .global_timelines + .get(self.ttid) + .map_err(|e| QueryError::Other(e.into()))?; let lsn = if self.is_walproposer_recovery() { // walproposer should get all local WAL until flush_lsn diff --git a/safekeeper/src/http/mod.rs b/safekeeper/src/http/mod.rs index 52fb13ff5b..7229ccb739 100644 --- a/safekeeper/src/http/mod.rs +++ b/safekeeper/src/http/mod.rs @@ -3,14 +3,16 @@ pub mod routes; pub use routes::make_router; pub use safekeeper_api::models; +use std::sync::Arc; -use crate::SafeKeeperConf; +use crate::{GlobalTimelines, SafeKeeperConf}; pub async fn task_main( - conf: SafeKeeperConf, + conf: Arc, http_listener: std::net::TcpListener, + global_timelines: Arc, ) -> anyhow::Result<()> { - let router = make_router(conf) + let router = make_router(conf, global_timelines) .build() .map_err(|err| anyhow::anyhow!(err))?; let service = utils::http::RouterService::new(router).unwrap(); diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 69b775fd76..71c36f1d46 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -66,6 +66,13 @@ fn get_conf(request: &Request) -> &SafeKeeperConf { .as_ref() } +fn get_global_timelines(request: &Request) -> Arc { + request + .data::>() + .expect("unknown state type") + .clone() +} + /// Same as TermLsn, but serializes LSN using display serializer /// in Postgres format, i.e. 0/FFFFFFFF. Used only for the API response. #[derive(Debug, Clone, Copy, Serialize, Deserialize)] @@ -123,9 +130,11 @@ async fn tenant_delete_handler(mut request: Request) -> Result) -> Result) -> Result) -> Result, ApiError> { check_permission(&request, None)?; - let res: Vec = GlobalTimelines::get_all() + let global_timelines = get_global_timelines(&request); + let res: Vec = global_timelines + .get_all() .iter() .map(|tli| tli.ttid) .collect(); @@ -182,7 +195,8 @@ async fn timeline_status_handler(request: Request) -> Result) -> Result) -> Result) -> Result, // so create the chan and write to it in another task. @@ -293,19 +311,19 @@ async fn timeline_copy_handler(mut request: Request) -> Result) -> Result) -> Result = parse_query_param(&request, "from_lsn")?; let until_lsn: Option = parse_query_param(&request, "until_lsn")?; @@ -371,7 +392,7 @@ async fn timeline_digest_handler(request: Request) -> Result) -> Result) -> Result) -> Result let dump_term_history = dump_term_history.unwrap_or(true); let dump_wal_last_modified = dump_wal_last_modified.unwrap_or(dump_all); + let global_timelines = get_global_timelines(&request); + let args = debug_dump::Args { dump_all, dump_control_file, @@ -517,7 +543,7 @@ async fn dump_debug_handler(mut request: Request) -> Result timeline_id, }; - let resp = debug_dump::build(args) + let resp = debug_dump::build(args, global_timelines) .await .map_err(ApiError::InternalServerError)?; @@ -570,7 +596,10 @@ async fn dump_debug_handler(mut request: Request) -> Result } /// Safekeeper http router. -pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder { +pub fn make_router( + conf: Arc, + global_timelines: Arc, +) -> RouterBuilder { let mut router = endpoint::make_router(); if conf.http_auth.is_some() { router = router.middleware(auth_middleware(|request| { @@ -592,7 +621,8 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder // located nearby (/safekeeper/src/http/openapi_spec.yaml). let auth = conf.http_auth.clone(); router - .data(Arc::new(conf)) + .data(conf) + .data(global_timelines) .data(auth) .get("/metrics", |r| request_span(r, prometheus_metrics_handler)) .get("/profile/cpu", |r| request_span(r, profile_cpu_handler)) diff --git a/safekeeper/src/json_ctrl.rs b/safekeeper/src/json_ctrl.rs index 0573ea81e7..dc4ad3706e 100644 --- a/safekeeper/src/json_ctrl.rs +++ b/safekeeper/src/json_ctrl.rs @@ -11,7 +11,6 @@ use postgres_backend::QueryError; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncRead, AsyncWrite}; use tracing::*; -use utils::id::TenantTimelineId; use crate::handler::SafekeeperPostgresHandler; use crate::safekeeper::{AcceptorProposerMessage, AppendResponse, ServerInfo}; @@ -21,7 +20,6 @@ use crate::safekeeper::{ use crate::safekeeper::{Term, TermHistory, TermLsn}; use crate::state::TimelinePersistentState; use crate::timeline::WalResidentTimeline; -use crate::GlobalTimelines; use postgres_backend::PostgresBackend; use postgres_ffi::encode_logical_message; use postgres_ffi::WAL_SEGMENT_SIZE; @@ -70,7 +68,7 @@ pub async fn handle_json_ctrl( info!("JSON_CTRL request: {append_request:?}"); // need to init safekeeper state before AppendRequest - let tli = prepare_safekeeper(spg.ttid, append_request.pg_version).await?; + let tli = prepare_safekeeper(spg, append_request.pg_version).await?; // if send_proposer_elected is true, we need to update local history if append_request.send_proposer_elected { @@ -99,20 +97,22 @@ pub async fn handle_json_ctrl( /// Prepare safekeeper to process append requests without crashes, /// by sending ProposerGreeting with default server.wal_seg_size. async fn prepare_safekeeper( - ttid: TenantTimelineId, + spg: &SafekeeperPostgresHandler, pg_version: u32, ) -> anyhow::Result { - let tli = GlobalTimelines::create( - ttid, - ServerInfo { - pg_version, - wal_seg_size: WAL_SEGMENT_SIZE as u32, - system_id: 0, - }, - Lsn::INVALID, - Lsn::INVALID, - ) - .await?; + let tli = spg + .global_timelines + .create( + spg.ttid, + ServerInfo { + pg_version, + wal_seg_size: WAL_SEGMENT_SIZE as u32, + system_id: 0, + }, + Lsn::INVALID, + Lsn::INVALID, + ) + .await?; tli.wal_residence_guard().await } diff --git a/safekeeper/src/metrics.rs b/safekeeper/src/metrics.rs index bbd2f86898..5883f402c7 100644 --- a/safekeeper/src/metrics.rs +++ b/safekeeper/src/metrics.rs @@ -455,6 +455,7 @@ pub struct FullTimelineInfo { /// Collects metrics for all active timelines. pub struct TimelineCollector { + global_timelines: Arc, descs: Vec, commit_lsn: GenericGaugeVec, backup_lsn: GenericGaugeVec, @@ -478,14 +479,8 @@ pub struct TimelineCollector { active_timelines_count: IntGauge, } -impl Default for TimelineCollector { - fn default() -> Self { - Self::new() - } -} - impl TimelineCollector { - pub fn new() -> TimelineCollector { + pub fn new(global_timelines: Arc) -> TimelineCollector { let mut descs = Vec::new(); let commit_lsn = GenericGaugeVec::new( @@ -676,6 +671,7 @@ impl TimelineCollector { descs.extend(active_timelines_count.desc().into_iter().cloned()); TimelineCollector { + global_timelines, descs, commit_lsn, backup_lsn, @@ -728,17 +724,18 @@ impl Collector for TimelineCollector { self.written_wal_seconds.reset(); self.flushed_wal_seconds.reset(); - let timelines_count = GlobalTimelines::get_all().len(); + let timelines_count = self.global_timelines.get_all().len(); let mut active_timelines_count = 0; // Prometheus Collector is sync, and data is stored under async lock. To // bridge the gap with a crutch, collect data in spawned thread with // local tokio runtime. + let global_timelines = self.global_timelines.clone(); let infos = std::thread::spawn(|| { let rt = tokio::runtime::Builder::new_current_thread() .build() .expect("failed to create rt"); - rt.block_on(collect_timeline_metrics()) + rt.block_on(collect_timeline_metrics(global_timelines)) }) .join() .expect("collect_timeline_metrics thread panicked"); @@ -857,9 +854,9 @@ impl Collector for TimelineCollector { } } -async fn collect_timeline_metrics() -> Vec { +async fn collect_timeline_metrics(global_timelines: Arc) -> Vec { let mut res = vec![]; - let active_timelines = GlobalTimelines::get_global_broker_active_set().get_all(); + let active_timelines = global_timelines.get_global_broker_active_set().get_all(); for tli in active_timelines { if let Some(info) = tli.info_for_metrics().await { diff --git a/safekeeper/src/pull_timeline.rs b/safekeeper/src/pull_timeline.rs index c700e18cc7..f58a9dca1d 100644 --- a/safekeeper/src/pull_timeline.rs +++ b/safekeeper/src/pull_timeline.rs @@ -409,8 +409,9 @@ pub struct DebugDumpResponse { pub async fn handle_request( request: Request, sk_auth_token: Option, + global_timelines: Arc, ) -> Result { - let existing_tli = GlobalTimelines::get(TenantTimelineId::new( + let existing_tli = global_timelines.get(TenantTimelineId::new( request.tenant_id, request.timeline_id, )); @@ -453,13 +454,14 @@ pub async fn handle_request( assert!(status.tenant_id == request.tenant_id); assert!(status.timeline_id == request.timeline_id); - pull_timeline(status, safekeeper_host, sk_auth_token).await + pull_timeline(status, safekeeper_host, sk_auth_token, global_timelines).await } async fn pull_timeline( status: TimelineStatus, host: String, sk_auth_token: Option, + global_timelines: Arc, ) -> Result { let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id); info!( @@ -472,7 +474,7 @@ async fn pull_timeline( status.acceptor_state.epoch ); - let conf = &GlobalTimelines::get_global_config(); + let conf = &global_timelines.get_global_config(); let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?; @@ -531,7 +533,9 @@ async fn pull_timeline( assert!(status.commit_lsn <= status.flush_lsn); // Finally, load the timeline. - let _tli = GlobalTimelines::load_temp_timeline(ttid, &tli_dir_path, false).await?; + let _tli = global_timelines + .load_temp_timeline(ttid, &tli_dir_path, false) + .await?; Ok(Response { safekeeper_host: host, diff --git a/safekeeper/src/receive_wal.rs b/safekeeper/src/receive_wal.rs index bfa1764abf..2a49890d61 100644 --- a/safekeeper/src/receive_wal.rs +++ b/safekeeper/src/receive_wal.rs @@ -267,6 +267,7 @@ impl SafekeeperPostgresHandler { pgb_reader: &mut pgb_reader, peer_addr, acceptor_handle: &mut acceptor_handle, + global_timelines: self.global_timelines.clone(), }; // Read first message and create timeline if needed. @@ -331,6 +332,7 @@ struct NetworkReader<'a, IO> { // WalAcceptor is spawned when we learn server info from walproposer and // create timeline; handle is put here. acceptor_handle: &'a mut Option>>, + global_timelines: Arc, } impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> { @@ -350,10 +352,11 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> { system_id: greeting.system_id, wal_seg_size: greeting.wal_seg_size, }; - let tli = - GlobalTimelines::create(self.ttid, server_info, Lsn::INVALID, Lsn::INVALID) - .await - .context("create timeline")?; + let tli = self + .global_timelines + .create(self.ttid, server_info, Lsn::INVALID, Lsn::INVALID) + .await + .context("create timeline")?; tli.wal_residence_guard().await? } _ => { diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 225b7f4c05..0887cf7264 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -10,7 +10,6 @@ use crate::timeline::WalResidentTimeline; use crate::wal_reader_stream::WalReaderStreamBuilder; use crate::wal_service::ConnectionId; use crate::wal_storage::WalReader; -use crate::GlobalTimelines; use anyhow::{bail, Context as AnyhowContext}; use bytes::Bytes; use futures::future::Either; @@ -400,7 +399,10 @@ impl SafekeeperPostgresHandler { start_pos: Lsn, term: Option, ) -> Result<(), QueryError> { - let tli = GlobalTimelines::get(self.ttid).map_err(|e| QueryError::Other(e.into()))?; + let tli = self + .global_timelines + .get(self.ttid) + .map_err(|e| QueryError::Other(e.into()))?; let residence_guard = tli.wal_residence_guard().await?; if let Err(end) = self diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index ef928f7633..94d6ef1061 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -44,8 +44,8 @@ use crate::wal_backup_partial::PartialRemoteSegment; use crate::metrics::{FullTimelineInfo, WalStorageMetrics, MISC_OPERATION_SECONDS}; use crate::wal_storage::{Storage as wal_storage_iface, WalReader}; +use crate::SafeKeeperConf; use crate::{debug_dump, timeline_manager, wal_storage}; -use crate::{GlobalTimelines, SafeKeeperConf}; /// Things safekeeper should know about timeline state on peers. #[derive(Debug, Clone, Serialize, Deserialize)] @@ -467,6 +467,7 @@ pub struct Timeline { walreceivers: Arc, timeline_dir: Utf8PathBuf, manager_ctl: ManagerCtl, + conf: Arc, /// Hold this gate from code that depends on the Timeline's non-shut-down state. While holding /// this gate, you must respect [`Timeline::cancel`] @@ -489,6 +490,7 @@ impl Timeline { timeline_dir: &Utf8Path, remote_path: &RemotePath, shared_state: SharedState, + conf: Arc, ) -> Arc { let (commit_lsn_watch_tx, commit_lsn_watch_rx) = watch::channel(shared_state.sk.state().commit_lsn); @@ -516,6 +518,7 @@ impl Timeline { gate: Default::default(), cancel: CancellationToken::default(), manager_ctl: ManagerCtl::new(), + conf, broker_active: AtomicBool::new(false), wal_backup_active: AtomicBool::new(false), last_removed_segno: AtomicU64::new(0), @@ -524,11 +527,14 @@ impl Timeline { } /// Load existing timeline from disk. - pub fn load_timeline(conf: &SafeKeeperConf, ttid: TenantTimelineId) -> Result> { + pub fn load_timeline( + conf: Arc, + ttid: TenantTimelineId, + ) -> Result> { let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered(); - let shared_state = SharedState::restore(conf, &ttid)?; - let timeline_dir = get_timeline_dir(conf, &ttid); + let shared_state = SharedState::restore(conf.as_ref(), &ttid)?; + let timeline_dir = get_timeline_dir(conf.as_ref(), &ttid); let remote_path = remote_timeline_path(&ttid)?; Ok(Timeline::new( @@ -536,6 +542,7 @@ impl Timeline { &timeline_dir, &remote_path, shared_state, + conf, )) } @@ -604,8 +611,7 @@ impl Timeline { // it is cancelled, so WAL storage won't be opened again. shared_state.sk.close_wal_store(); - let conf = GlobalTimelines::get_global_config(); - if !only_local && conf.is_wal_backup_enabled() { + if !only_local && self.conf.is_wal_backup_enabled() { // Note: we concurrently delete remote storage data from multiple // safekeepers. That's ok, s3 replies 200 if object doesn't exist and we // do some retries anyway. @@ -951,7 +957,7 @@ impl WalResidentTimeline { pub async fn get_walreader(&self, start_lsn: Lsn) -> Result { let (_, persisted_state) = self.get_state().await; - let enable_remote_read = GlobalTimelines::get_global_config().is_wal_backup_enabled(); + let enable_remote_read = self.conf.is_wal_backup_enabled(); WalReader::new( &self.ttid, @@ -1061,7 +1067,6 @@ impl ManagerTimeline { /// Try to switch state Offloaded->Present. pub(crate) async fn switch_to_present(&self) -> anyhow::Result<()> { - let conf = GlobalTimelines::get_global_config(); let mut shared = self.write_shared_state().await; // trying to restore WAL storage @@ -1069,7 +1074,7 @@ impl ManagerTimeline { &self.ttid, &self.timeline_dir, shared.sk.state(), - conf.no_sync, + self.conf.no_sync, )?; // updating control file @@ -1096,7 +1101,7 @@ impl ManagerTimeline { // now we can switch shared.sk to Present, shouldn't fail let prev_sk = std::mem::replace(&mut shared.sk, StateSK::Empty); let cfile_state = prev_sk.take_state(); - shared.sk = StateSK::Loaded(SafeKeeper::new(cfile_state, wal_store, conf.my_id)?); + shared.sk = StateSK::Loaded(SafeKeeper::new(cfile_state, wal_store, self.conf.my_id)?); Ok(()) } diff --git a/safekeeper/src/timelines_global_map.rs b/safekeeper/src/timelines_global_map.rs index 067945fd5f..e1241ceb9b 100644 --- a/safekeeper/src/timelines_global_map.rs +++ b/safekeeper/src/timelines_global_map.rs @@ -13,7 +13,6 @@ use crate::{control_file, wal_storage, SafeKeeperConf}; use anyhow::{bail, Context, Result}; use camino::Utf8PathBuf; use camino_tempfile::Utf8TempDir; -use once_cell::sync::Lazy; use serde::Serialize; use std::collections::HashMap; use std::str::FromStr; @@ -42,23 +41,16 @@ struct GlobalTimelinesState { // this map is dropped on restart. tombstones: HashMap, - conf: Option, + conf: Arc, broker_active_set: Arc, global_rate_limiter: RateLimiter, } impl GlobalTimelinesState { - /// Get configuration, which must be set once during init. - fn get_conf(&self) -> &SafeKeeperConf { - self.conf - .as_ref() - .expect("GlobalTimelinesState conf is not initialized") - } - /// Get dependencies for a timeline constructor. - fn get_dependencies(&self) -> (SafeKeeperConf, Arc, RateLimiter) { + fn get_dependencies(&self) -> (Arc, Arc, RateLimiter) { ( - self.get_conf().clone(), + self.conf.clone(), self.broker_active_set.clone(), self.global_rate_limiter.clone(), ) @@ -82,35 +74,39 @@ impl GlobalTimelinesState { } } -static TIMELINES_STATE: Lazy> = Lazy::new(|| { - Mutex::new(GlobalTimelinesState { - timelines: HashMap::new(), - tombstones: HashMap::new(), - conf: None, - broker_active_set: Arc::new(TimelinesSet::default()), - global_rate_limiter: RateLimiter::new(1, 1), - }) -}); - -/// A zero-sized struct used to manage access to the global timelines map. -pub struct GlobalTimelines; +/// A struct used to manage access to the global timelines map. +pub struct GlobalTimelines { + state: Mutex, +} impl GlobalTimelines { + /// Create a new instance of the global timelines map. + pub fn new(conf: Arc) -> Self { + Self { + state: Mutex::new(GlobalTimelinesState { + timelines: HashMap::new(), + tombstones: HashMap::new(), + conf, + broker_active_set: Arc::new(TimelinesSet::default()), + global_rate_limiter: RateLimiter::new(1, 1), + }), + } + } + /// Inject dependencies needed for the timeline constructors and load all timelines to memory. - pub async fn init(conf: SafeKeeperConf) -> Result<()> { + pub async fn init(&self) -> Result<()> { // clippy isn't smart enough to understand that drop(state) releases the // lock, so use explicit block let tenants_dir = { - let mut state = TIMELINES_STATE.lock().unwrap(); + let mut state = self.state.lock().unwrap(); state.global_rate_limiter = RateLimiter::new( - conf.partial_backup_concurrency, + state.conf.partial_backup_concurrency, DEFAULT_EVICTION_CONCURRENCY, ); - state.conf = Some(conf); // Iterate through all directories and load tenants for all directories // named as a valid tenant_id. - state.get_conf().workdir.clone() + state.conf.workdir.clone() }; let mut tenant_count = 0; for tenants_dir_entry in std::fs::read_dir(&tenants_dir) @@ -122,7 +118,7 @@ impl GlobalTimelines { TenantId::from_str(tenants_dir_entry.file_name().to_str().unwrap_or("")) { tenant_count += 1; - GlobalTimelines::load_tenant_timelines(tenant_id).await?; + self.load_tenant_timelines(tenant_id).await?; } } Err(e) => error!( @@ -135,7 +131,7 @@ impl GlobalTimelines { info!( "found {} tenants directories, successfully loaded {} timelines", tenant_count, - TIMELINES_STATE.lock().unwrap().timelines.len() + self.state.lock().unwrap().timelines.len() ); Ok(()) } @@ -143,13 +139,13 @@ impl GlobalTimelines { /// Loads all timelines for the given tenant to memory. Returns fs::read_dir /// errors if any. /// - /// It is async, but TIMELINES_STATE lock is sync and there is no important + /// It is async, but self.state lock is sync and there is no important /// reason to make it async (it is always held for a short while), so we /// just lock and unlock it for each timeline -- this function is called /// during init when nothing else is running, so this is fine. - async fn load_tenant_timelines(tenant_id: TenantId) -> Result<()> { + async fn load_tenant_timelines(&self, tenant_id: TenantId) -> Result<()> { let (conf, broker_active_set, partial_backup_rate_limiter) = { - let state = TIMELINES_STATE.lock().unwrap(); + let state = self.state.lock().unwrap(); state.get_dependencies() }; @@ -163,10 +159,10 @@ impl GlobalTimelines { TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or("")) { let ttid = TenantTimelineId::new(tenant_id, timeline_id); - match Timeline::load_timeline(&conf, ttid) { + match Timeline::load_timeline(conf.clone(), ttid) { Ok(tli) => { let mut shared_state = tli.write_shared_state().await; - TIMELINES_STATE + self.state .lock() .unwrap() .timelines @@ -200,29 +196,30 @@ impl GlobalTimelines { } /// Get the number of timelines in the map. - pub fn timelines_count() -> usize { - TIMELINES_STATE.lock().unwrap().timelines.len() + pub fn timelines_count(&self) -> usize { + self.state.lock().unwrap().timelines.len() } /// Get the global safekeeper config. - pub fn get_global_config() -> SafeKeeperConf { - TIMELINES_STATE.lock().unwrap().get_conf().clone() + pub fn get_global_config(&self) -> Arc { + self.state.lock().unwrap().conf.clone() } - pub fn get_global_broker_active_set() -> Arc { - TIMELINES_STATE.lock().unwrap().broker_active_set.clone() + pub fn get_global_broker_active_set(&self) -> Arc { + self.state.lock().unwrap().broker_active_set.clone() } /// Create a new timeline with the given id. If the timeline already exists, returns /// an existing timeline. pub(crate) async fn create( + &self, ttid: TenantTimelineId, server_info: ServerInfo, commit_lsn: Lsn, local_start_lsn: Lsn, ) -> Result> { let (conf, _, _) = { - let state = TIMELINES_STATE.lock().unwrap(); + let state = self.state.lock().unwrap(); if let Ok(timeline) = state.get(&ttid) { // Timeline already exists, return it. return Ok(timeline); @@ -245,7 +242,7 @@ impl GlobalTimelines { let state = TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn)?; control_file::FileStorage::create_new(&tmp_dir_path, state, conf.no_sync).await?; - let timeline = GlobalTimelines::load_temp_timeline(ttid, &tmp_dir_path, true).await?; + let timeline = self.load_temp_timeline(ttid, &tmp_dir_path, true).await?; Ok(timeline) } @@ -261,13 +258,14 @@ impl GlobalTimelines { /// 2) move the directory and load the timeline /// 3) take lock again and insert the timeline into the global map. pub async fn load_temp_timeline( + &self, ttid: TenantTimelineId, tmp_path: &Utf8PathBuf, check_tombstone: bool, ) -> Result> { // Check for existence and mark that we're creating it. let (conf, broker_active_set, partial_backup_rate_limiter) = { - let mut state = TIMELINES_STATE.lock().unwrap(); + let mut state = self.state.lock().unwrap(); match state.timelines.get(&ttid) { Some(GlobalMapTimeline::CreationInProgress) => { bail!(TimelineError::CreationInProgress(ttid)); @@ -295,10 +293,10 @@ impl GlobalTimelines { }; // Do the actual move and reflect the result in the map. - match GlobalTimelines::install_temp_timeline(ttid, tmp_path, &conf).await { + match GlobalTimelines::install_temp_timeline(ttid, tmp_path, conf.clone()).await { Ok(timeline) => { let mut timeline_shared_state = timeline.write_shared_state().await; - let mut state = TIMELINES_STATE.lock().unwrap(); + let mut state = self.state.lock().unwrap(); assert!(matches!( state.timelines.get(&ttid), Some(GlobalMapTimeline::CreationInProgress) @@ -319,7 +317,7 @@ impl GlobalTimelines { } Err(e) => { // Init failed, remove the marker from the map - let mut state = TIMELINES_STATE.lock().unwrap(); + let mut state = self.state.lock().unwrap(); assert!(matches!( state.timelines.get(&ttid), Some(GlobalMapTimeline::CreationInProgress) @@ -334,10 +332,10 @@ impl GlobalTimelines { async fn install_temp_timeline( ttid: TenantTimelineId, tmp_path: &Utf8PathBuf, - conf: &SafeKeeperConf, + conf: Arc, ) -> Result> { - let tenant_path = get_tenant_dir(conf, &ttid.tenant_id); - let timeline_path = get_timeline_dir(conf, &ttid); + let tenant_path = get_tenant_dir(conf.as_ref(), &ttid.tenant_id); + let timeline_path = get_timeline_dir(conf.as_ref(), &ttid); // We must have already checked that timeline doesn't exist in the map, // but there might be existing datadir: if timeline is corrupted it is @@ -382,9 +380,9 @@ impl GlobalTimelines { /// Get a timeline from the global map. If it's not present, it doesn't exist on disk, /// or was corrupted and couldn't be loaded on startup. Returned timeline is always valid, /// i.e. loaded in memory and not cancelled. - pub(crate) fn get(ttid: TenantTimelineId) -> Result, TimelineError> { + pub(crate) fn get(&self, ttid: TenantTimelineId) -> Result, TimelineError> { let tli_res = { - let state = TIMELINES_STATE.lock().unwrap(); + let state = self.state.lock().unwrap(); state.get(&ttid) }; match tli_res { @@ -399,8 +397,8 @@ impl GlobalTimelines { } /// Returns all timelines. This is used for background timeline processes. - pub fn get_all() -> Vec> { - let global_lock = TIMELINES_STATE.lock().unwrap(); + pub fn get_all(&self) -> Vec> { + let global_lock = self.state.lock().unwrap(); global_lock .timelines .values() @@ -419,8 +417,8 @@ impl GlobalTimelines { /// Returns all timelines belonging to a given tenant. Used for deleting all timelines of a tenant, /// and that's why it can return cancelled timelines, to retry deleting them. - fn get_all_for_tenant(tenant_id: TenantId) -> Vec> { - let global_lock = TIMELINES_STATE.lock().unwrap(); + fn get_all_for_tenant(&self, tenant_id: TenantId) -> Vec> { + let global_lock = self.state.lock().unwrap(); global_lock .timelines .values() @@ -435,11 +433,12 @@ impl GlobalTimelines { /// Cancels timeline, then deletes the corresponding data directory. /// If only_local, doesn't remove WAL segments in remote storage. pub(crate) async fn delete( + &self, ttid: &TenantTimelineId, only_local: bool, ) -> Result { let tli_res = { - let state = TIMELINES_STATE.lock().unwrap(); + let state = self.state.lock().unwrap(); if state.tombstones.contains_key(ttid) { // Presence of a tombstone guarantees that a previous deletion has completed and there is no work to do. @@ -472,7 +471,7 @@ impl GlobalTimelines { } Err(_) => { // Timeline is not memory, but it may still exist on disk in broken state. - let dir_path = get_timeline_dir(TIMELINES_STATE.lock().unwrap().get_conf(), ttid); + let dir_path = get_timeline_dir(self.state.lock().unwrap().conf.as_ref(), ttid); let dir_existed = delete_dir(dir_path)?; Ok(TimelineDeleteForceResult { @@ -485,7 +484,7 @@ impl GlobalTimelines { // Finalize deletion, by dropping Timeline objects and storing smaller tombstones. The tombstones // are used to prevent still-running computes from re-creating the same timeline when they send data, // and to speed up repeated deletion calls by avoiding re-listing objects. - TIMELINES_STATE.lock().unwrap().delete(*ttid); + self.state.lock().unwrap().delete(*ttid); result } @@ -497,17 +496,18 @@ impl GlobalTimelines { /// /// If only_local, doesn't remove WAL segments in remote storage. pub async fn delete_force_all_for_tenant( + &self, tenant_id: &TenantId, only_local: bool, ) -> Result> { info!("deleting all timelines for tenant {}", tenant_id); - let to_delete = Self::get_all_for_tenant(*tenant_id); + let to_delete = self.get_all_for_tenant(*tenant_id); let mut err = None; let mut deleted = HashMap::new(); for tli in &to_delete { - match Self::delete(&tli.ttid, only_local).await { + match self.delete(&tli.ttid, only_local).await { Ok(result) => { deleted.insert(tli.ttid, result); } @@ -529,15 +529,15 @@ impl GlobalTimelines { // so the directory may be not empty. In this case timelines will have bad state // and timeline background jobs can panic. delete_dir(get_tenant_dir( - TIMELINES_STATE.lock().unwrap().get_conf(), + self.state.lock().unwrap().conf.as_ref(), tenant_id, ))?; Ok(deleted) } - pub fn housekeeping(tombstone_ttl: &Duration) { - let mut state = TIMELINES_STATE.lock().unwrap(); + pub fn housekeeping(&self, tombstone_ttl: &Duration) { + let mut state = self.state.lock().unwrap(); // We keep tombstones long enough to have a good chance of preventing rogue computes from re-creating deleted // timelines. If a compute kept running for longer than this TTL (or across a safekeeper restart) then they diff --git a/safekeeper/src/wal_service.rs b/safekeeper/src/wal_service.rs index 5248d545db..1ff83918a7 100644 --- a/safekeeper/src/wal_service.rs +++ b/safekeeper/src/wal_service.rs @@ -4,6 +4,7 @@ //! use anyhow::{Context, Result}; use postgres_backend::QueryError; +use std::sync::Arc; use std::time::Duration; use tokio::net::TcpStream; use tokio_io_timeout::TimeoutReader; @@ -11,9 +12,9 @@ use tokio_util::sync::CancellationToken; use tracing::*; use utils::{auth::Scope, measured_stream::MeasuredStream}; -use crate::handler::SafekeeperPostgresHandler; use crate::metrics::TrafficMetrics; use crate::SafeKeeperConf; +use crate::{handler::SafekeeperPostgresHandler, GlobalTimelines}; use postgres_backend::{AuthType, PostgresBackend}; /// Accept incoming TCP connections and spawn them into a background thread. @@ -22,9 +23,10 @@ use postgres_backend::{AuthType, PostgresBackend}; /// to any tenant are allowed) or Tenant (only tokens giving access to specific /// tenant are allowed). Doesn't matter if auth is disabled in conf. pub async fn task_main( - conf: SafeKeeperConf, + conf: Arc, pg_listener: std::net::TcpListener, allowed_auth_scope: Scope, + global_timelines: Arc, ) -> anyhow::Result<()> { // Tokio's from_std won't do this for us, per its comment. pg_listener.set_nonblocking(true)?; @@ -37,10 +39,10 @@ pub async fn task_main( debug!("accepted connection from {}", peer_addr); let conf = conf.clone(); let conn_id = issue_connection_id(&mut connection_count); - + let global_timelines = global_timelines.clone(); tokio::spawn( async move { - if let Err(err) = handle_socket(socket, conf, conn_id, allowed_auth_scope).await { + if let Err(err) = handle_socket(socket, conf, conn_id, allowed_auth_scope, global_timelines).await { error!("connection handler exited: {}", err); } } @@ -53,9 +55,10 @@ pub async fn task_main( /// async fn handle_socket( socket: TcpStream, - conf: SafeKeeperConf, + conf: Arc, conn_id: ConnectionId, allowed_auth_scope: Scope, + global_timelines: Arc, ) -> Result<(), QueryError> { socket.set_nodelay(true)?; let peer_addr = socket.peer_addr()?; @@ -96,8 +99,13 @@ async fn handle_socket( Some(_) => AuthType::NeonJWT, }; let auth_pair = auth_key.map(|key| (allowed_auth_scope, key)); - let mut conn_handler = - SafekeeperPostgresHandler::new(conf, conn_id, Some(traffic_metrics.clone()), auth_pair); + let mut conn_handler = SafekeeperPostgresHandler::new( + conf, + conn_id, + Some(traffic_metrics.clone()), + auth_pair, + global_timelines, + ); let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?; // libpq protocol between safekeeper and walproposer / pageserver // We don't use shutdown.