no-op: add Timeline::myself member

This commit is contained in:
Christian Schwarz
2022-12-05 06:38:13 -05:00
committed by Christian Schwarz
parent f637f6e77e
commit 91e8937112
2 changed files with 61 additions and 56 deletions

View File

@@ -480,7 +480,7 @@ impl Tenant {
let timeline = UninitializedTimeline {
owning_tenant: self,
timeline_id,
raw_timeline: Some((Arc::new(dummy_timeline), TimelineUninitMark::dummy())),
raw_timeline: Some((dummy_timeline, TimelineUninitMark::dummy())),
};
// Do not start walreceiver here. We do need loaded layer map for reconcile_with_remote
// But we shouldnt start walreceiver before we have all the data locally, because working walreceiver
@@ -510,7 +510,7 @@ impl Tenant {
)
})?;
broken_timeline.set_state(TimelineState::Broken);
timelines_accessor.insert(timeline_id, Arc::new(broken_timeline));
timelines_accessor.insert(timeline_id, broken_timeline);
Err(e)
}
}
@@ -1647,7 +1647,7 @@ impl Tenant {
new_metadata: TimelineMetadata,
ancestor: Option<Arc<Timeline>>,
remote_client: Option<RemoteTimelineClient>,
) -> anyhow::Result<Timeline> {
) -> anyhow::Result<Arc<Timeline>> {
if let Some(ancestor_timeline_id) = new_metadata.ancestor_timeline() {
anyhow::ensure!(
ancestor.is_some(),
@@ -2209,7 +2209,7 @@ impl Tenant {
Ok(UninitializedTimeline {
owning_tenant: self,
timeline_id: new_timeline_id,
raw_timeline: Some((Arc::new(new_timeline), uninit_mark)),
raw_timeline: Some((new_timeline, uninit_mark)),
})
}
Err(e) => {
@@ -2227,7 +2227,7 @@ impl Tenant {
new_metadata: TimelineMetadata,
ancestor: Option<Arc<Timeline>>,
remote_client: Option<RemoteTimelineClient>,
) -> anyhow::Result<Timeline> {
) -> anyhow::Result<Arc<Timeline>> {
let timeline_data = self
.create_timeline_data(
new_timeline_id,

View File

@@ -17,7 +17,7 @@ use std::fs;
use std::ops::{Deref, Range};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering};
use std::sync::{Arc, Mutex, MutexGuard, RwLock};
use std::sync::{Arc, Mutex, MutexGuard, RwLock, Weak};
use std::time::{Duration, Instant, SystemTime};
use crate::storage_sync::index::IndexPart;
@@ -76,6 +76,8 @@ pub struct Timeline {
conf: &'static PageServerConf,
tenant_conf: Arc<RwLock<TenantConfOpt>>,
_myself: Weak<Self>,
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
@@ -748,75 +750,78 @@ impl Timeline {
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
remote_client: Option<RemoteTimelineClient>,
pg_version: u32,
) -> Self {
) -> Arc<Self> {
let disk_consistent_lsn = metadata.disk_consistent_lsn();
let (state, _) = watch::channel(TimelineState::Suspended);
let (layer_flush_start_tx, _) = tokio::sync::watch::channel(0);
let (layer_flush_done_tx, _) = tokio::sync::watch::channel((0, Ok(())));
let mut result = Timeline {
conf,
tenant_conf,
timeline_id,
tenant_id,
pg_version,
layers: RwLock::new(LayerMap::default()),
Arc::new_cyclic(|myself| {
let mut result = Timeline {
conf,
tenant_conf,
_myself: myself.clone(),
timeline_id,
tenant_id,
pg_version,
layers: RwLock::new(LayerMap::default()),
walredo_mgr,
walredo_mgr,
remote_client: remote_client.map(Arc::new),
remote_client: remote_client.map(Arc::new),
// initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
last_record_lsn: SeqWait::new(RecordLsn {
last: disk_consistent_lsn,
prev: metadata.prev_record_lsn().unwrap_or(Lsn(0)),
}),
disk_consistent_lsn: AtomicLsn::new(disk_consistent_lsn.0),
// initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
last_record_lsn: SeqWait::new(RecordLsn {
last: disk_consistent_lsn,
prev: metadata.prev_record_lsn().unwrap_or(Lsn(0)),
}),
disk_consistent_lsn: AtomicLsn::new(disk_consistent_lsn.0),
last_freeze_at: AtomicLsn::new(disk_consistent_lsn.0),
last_freeze_ts: RwLock::new(Instant::now()),
last_freeze_at: AtomicLsn::new(disk_consistent_lsn.0),
last_freeze_ts: RwLock::new(Instant::now()),
ancestor_timeline: ancestor,
ancestor_lsn: metadata.ancestor_lsn(),
ancestor_timeline: ancestor,
ancestor_lsn: metadata.ancestor_lsn(),
metrics: TimelineMetrics::new(&tenant_id, &timeline_id),
metrics: TimelineMetrics::new(&tenant_id, &timeline_id),
flush_loop_state: Mutex::new(FlushLoopState::NotStarted),
flush_loop_state: Mutex::new(FlushLoopState::NotStarted),
layer_flush_start_tx,
layer_flush_done_tx,
layer_flush_start_tx,
layer_flush_done_tx,
write_lock: Mutex::new(()),
layer_removal_cs: Default::default(),
write_lock: Mutex::new(()),
layer_removal_cs: Default::default(),
gc_info: RwLock::new(GcInfo {
retain_lsns: Vec::new(),
horizon_cutoff: Lsn(0),
pitr_cutoff: Lsn(0),
}),
gc_info: RwLock::new(GcInfo {
retain_lsns: Vec::new(),
horizon_cutoff: Lsn(0),
pitr_cutoff: Lsn(0),
}),
latest_gc_cutoff_lsn: Rcu::new(metadata.latest_gc_cutoff_lsn()),
initdb_lsn: metadata.initdb_lsn(),
latest_gc_cutoff_lsn: Rcu::new(metadata.latest_gc_cutoff_lsn()),
initdb_lsn: metadata.initdb_lsn(),
current_logical_size: if disk_consistent_lsn.is_valid() {
// we're creating timeline data with some layer files existing locally,
// need to recalculate timeline's logical size based on data in the layers.
LogicalSize::deferred_initial(disk_consistent_lsn)
} else {
// we're creating timeline data without any layers existing locally,
// initial logical size is 0.
LogicalSize::empty_initial()
},
partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))),
repartition_threshold: 0,
current_logical_size: if disk_consistent_lsn.is_valid() {
// we're creating timeline data with some layer files existing locally,
// need to recalculate timeline's logical size based on data in the layers.
LogicalSize::deferred_initial(disk_consistent_lsn)
} else {
// we're creating timeline data without any layers existing locally,
// initial logical size is 0.
LogicalSize::empty_initial()
},
partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))),
repartition_threshold: 0,
last_received_wal: Mutex::new(None),
rel_size_cache: RwLock::new(HashMap::new()),
state,
};
result.repartition_threshold = result.get_checkpoint_distance() / 10;
result
last_received_wal: Mutex::new(None),
rel_size_cache: RwLock::new(HashMap::new()),
state,
};
result.repartition_threshold = result.get_checkpoint_distance() / 10;
result
})
}
pub(super) fn maybe_spawn_flush_loop(self: &Arc<Self>) {