From 91e89371121698e1e0522cc8374d393b94480e65 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 5 Dec 2022 06:38:13 -0500 Subject: [PATCH] no-op: add Timeline::myself member --- pageserver/src/tenant.rs | 10 +-- pageserver/src/tenant/timeline.rs | 107 ++++++++++++++++-------------- 2 files changed, 61 insertions(+), 56 deletions(-) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 0ff5089f66..ce05d8f085 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -480,7 +480,7 @@ impl Tenant { let timeline = UninitializedTimeline { owning_tenant: self, timeline_id, - raw_timeline: Some((Arc::new(dummy_timeline), TimelineUninitMark::dummy())), + raw_timeline: Some((dummy_timeline, TimelineUninitMark::dummy())), }; // Do not start walreceiver here. We do need loaded layer map for reconcile_with_remote // But we shouldnt start walreceiver before we have all the data locally, because working walreceiver @@ -510,7 +510,7 @@ impl Tenant { ) })?; broken_timeline.set_state(TimelineState::Broken); - timelines_accessor.insert(timeline_id, Arc::new(broken_timeline)); + timelines_accessor.insert(timeline_id, broken_timeline); Err(e) } } @@ -1647,7 +1647,7 @@ impl Tenant { new_metadata: TimelineMetadata, ancestor: Option>, remote_client: Option, - ) -> anyhow::Result { + ) -> anyhow::Result> { if let Some(ancestor_timeline_id) = new_metadata.ancestor_timeline() { anyhow::ensure!( ancestor.is_some(), @@ -2209,7 +2209,7 @@ impl Tenant { Ok(UninitializedTimeline { owning_tenant: self, timeline_id: new_timeline_id, - raw_timeline: Some((Arc::new(new_timeline), uninit_mark)), + raw_timeline: Some((new_timeline, uninit_mark)), }) } Err(e) => { @@ -2227,7 +2227,7 @@ impl Tenant { new_metadata: TimelineMetadata, ancestor: Option>, remote_client: Option, - ) -> anyhow::Result { + ) -> anyhow::Result> { let timeline_data = self .create_timeline_data( new_timeline_id, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 4a54c91d25..e891caa6f8 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -17,7 +17,7 @@ use std::fs; use std::ops::{Deref, Range}; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering}; -use std::sync::{Arc, Mutex, MutexGuard, RwLock}; +use std::sync::{Arc, Mutex, MutexGuard, RwLock, Weak}; use std::time::{Duration, Instant, SystemTime}; use crate::storage_sync::index::IndexPart; @@ -76,6 +76,8 @@ pub struct Timeline { conf: &'static PageServerConf, tenant_conf: Arc>, + _myself: Weak, + pub tenant_id: TenantId, pub timeline_id: TimelineId, @@ -748,75 +750,78 @@ impl Timeline { walredo_mgr: Arc, remote_client: Option, pg_version: u32, - ) -> Self { + ) -> Arc { let disk_consistent_lsn = metadata.disk_consistent_lsn(); let (state, _) = watch::channel(TimelineState::Suspended); let (layer_flush_start_tx, _) = tokio::sync::watch::channel(0); let (layer_flush_done_tx, _) = tokio::sync::watch::channel((0, Ok(()))); - let mut result = Timeline { - conf, - tenant_conf, - timeline_id, - tenant_id, - pg_version, - layers: RwLock::new(LayerMap::default()), + Arc::new_cyclic(|myself| { + let mut result = Timeline { + conf, + tenant_conf, + _myself: myself.clone(), + timeline_id, + tenant_id, + pg_version, + layers: RwLock::new(LayerMap::default()), - walredo_mgr, + walredo_mgr, - remote_client: remote_client.map(Arc::new), + remote_client: remote_client.map(Arc::new), - // initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'. - last_record_lsn: SeqWait::new(RecordLsn { - last: disk_consistent_lsn, - prev: metadata.prev_record_lsn().unwrap_or(Lsn(0)), - }), - disk_consistent_lsn: AtomicLsn::new(disk_consistent_lsn.0), + // initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'. + last_record_lsn: SeqWait::new(RecordLsn { + last: disk_consistent_lsn, + prev: metadata.prev_record_lsn().unwrap_or(Lsn(0)), + }), + disk_consistent_lsn: AtomicLsn::new(disk_consistent_lsn.0), - last_freeze_at: AtomicLsn::new(disk_consistent_lsn.0), - last_freeze_ts: RwLock::new(Instant::now()), + last_freeze_at: AtomicLsn::new(disk_consistent_lsn.0), + last_freeze_ts: RwLock::new(Instant::now()), - ancestor_timeline: ancestor, - ancestor_lsn: metadata.ancestor_lsn(), + ancestor_timeline: ancestor, + ancestor_lsn: metadata.ancestor_lsn(), - metrics: TimelineMetrics::new(&tenant_id, &timeline_id), + metrics: TimelineMetrics::new(&tenant_id, &timeline_id), - flush_loop_state: Mutex::new(FlushLoopState::NotStarted), + flush_loop_state: Mutex::new(FlushLoopState::NotStarted), - layer_flush_start_tx, - layer_flush_done_tx, + layer_flush_start_tx, + layer_flush_done_tx, - write_lock: Mutex::new(()), - layer_removal_cs: Default::default(), + write_lock: Mutex::new(()), + layer_removal_cs: Default::default(), - gc_info: RwLock::new(GcInfo { - retain_lsns: Vec::new(), - horizon_cutoff: Lsn(0), - pitr_cutoff: Lsn(0), - }), + gc_info: RwLock::new(GcInfo { + retain_lsns: Vec::new(), + horizon_cutoff: Lsn(0), + pitr_cutoff: Lsn(0), + }), - latest_gc_cutoff_lsn: Rcu::new(metadata.latest_gc_cutoff_lsn()), - initdb_lsn: metadata.initdb_lsn(), + latest_gc_cutoff_lsn: Rcu::new(metadata.latest_gc_cutoff_lsn()), + initdb_lsn: metadata.initdb_lsn(), - current_logical_size: if disk_consistent_lsn.is_valid() { - // we're creating timeline data with some layer files existing locally, - // need to recalculate timeline's logical size based on data in the layers. - LogicalSize::deferred_initial(disk_consistent_lsn) - } else { - // we're creating timeline data without any layers existing locally, - // initial logical size is 0. - LogicalSize::empty_initial() - }, - partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))), - repartition_threshold: 0, + current_logical_size: if disk_consistent_lsn.is_valid() { + // we're creating timeline data with some layer files existing locally, + // need to recalculate timeline's logical size based on data in the layers. + LogicalSize::deferred_initial(disk_consistent_lsn) + } else { + // we're creating timeline data without any layers existing locally, + // initial logical size is 0. + LogicalSize::empty_initial() + }, + partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))), + repartition_threshold: 0, - last_received_wal: Mutex::new(None), - rel_size_cache: RwLock::new(HashMap::new()), - state, - }; - result.repartition_threshold = result.get_checkpoint_distance() / 10; - result + last_received_wal: Mutex::new(None), + rel_size_cache: RwLock::new(HashMap::new()), + state, + }; + result.repartition_threshold = result.get_checkpoint_distance() / 10; + result + }) } pub(super) fn maybe_spawn_flush_loop(self: &Arc) {