mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-31 12:00:42 +00:00
refactor: Layer initialization
This commit is contained in:
@@ -64,10 +64,6 @@ impl Layer {
|
||||
file_name: LayerFileName,
|
||||
metadata: LayerFileMetadata,
|
||||
) -> Self {
|
||||
let path = conf
|
||||
.timeline_path(&timeline.tenant_id, &timeline.timeline_id)
|
||||
.join(file_name.file_name());
|
||||
|
||||
let desc = PersistentLayerDesc::from_filename(
|
||||
timeline.tenant_id,
|
||||
timeline.timeline_id,
|
||||
@@ -77,23 +73,17 @@ impl Layer {
|
||||
|
||||
let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted);
|
||||
|
||||
let outer = Arc::new(LayerInner {
|
||||
let owner = Layer(Arc::new(LayerInner::new(
|
||||
conf,
|
||||
path,
|
||||
desc,
|
||||
timeline: Arc::downgrade(timeline),
|
||||
timeline,
|
||||
access_stats,
|
||||
inner: heavier_once_cell::OnceCell::default(),
|
||||
wanted_garbage_collected: AtomicBool::default(),
|
||||
wanted_evicted: AtomicBool::default(),
|
||||
version: AtomicUsize::default(),
|
||||
have_remote_client: timeline.remote_client.is_some(),
|
||||
status: tokio::sync::broadcast::channel(1).0,
|
||||
});
|
||||
desc,
|
||||
None,
|
||||
)));
|
||||
|
||||
debug_assert!(outer.needs_download_blocking().unwrap().is_some());
|
||||
debug_assert!(owner.0.needs_download_blocking().unwrap().is_some());
|
||||
|
||||
Layer(outer)
|
||||
owner
|
||||
}
|
||||
|
||||
pub(crate) fn for_resident(
|
||||
@@ -102,10 +92,6 @@ impl Layer {
|
||||
file_name: LayerFileName,
|
||||
metadata: LayerFileMetadata,
|
||||
) -> ResidentLayer {
|
||||
let path = conf
|
||||
.timeline_path(&timeline.tenant_id, &timeline.timeline_id)
|
||||
.join(file_name.file_name());
|
||||
|
||||
let desc = PersistentLayerDesc::from_filename(
|
||||
timeline.tenant_id,
|
||||
timeline.timeline_id,
|
||||
@@ -117,35 +103,21 @@ impl Layer {
|
||||
|
||||
let mut resident = None;
|
||||
|
||||
let outer = Arc::new_cyclic(|owner| {
|
||||
let owner = Layer(Arc::new_cyclic(|owner| {
|
||||
let inner = Arc::new(DownloadedLayer {
|
||||
owner: owner.clone(),
|
||||
kind: tokio::sync::OnceCell::default(),
|
||||
});
|
||||
resident = Some(inner.clone());
|
||||
LayerInner {
|
||||
conf,
|
||||
path,
|
||||
desc,
|
||||
timeline: Arc::downgrade(timeline),
|
||||
have_remote_client: timeline.remote_client.is_some(),
|
||||
access_stats,
|
||||
wanted_garbage_collected: AtomicBool::new(false),
|
||||
wanted_evicted: AtomicBool::new(false),
|
||||
inner: heavier_once_cell::OnceCell::new(ResidentOrWantedEvicted::Resident(inner)),
|
||||
version: AtomicUsize::new(0),
|
||||
status: tokio::sync::broadcast::channel(1).0,
|
||||
}
|
||||
});
|
||||
|
||||
debug_assert!(outer.needs_download_blocking().unwrap().is_none());
|
||||
LayerInner::new(conf, timeline, access_stats, desc, Some(inner))
|
||||
}));
|
||||
|
||||
debug_assert!(owner.0.needs_download_blocking().unwrap().is_none());
|
||||
|
||||
let downloaded = resident.expect("just initialized");
|
||||
|
||||
ResidentLayer {
|
||||
downloaded,
|
||||
owner: Layer(outer),
|
||||
}
|
||||
ResidentLayer { downloaded, owner }
|
||||
}
|
||||
|
||||
pub(crate) fn for_written(
|
||||
@@ -153,52 +125,27 @@ impl Layer {
|
||||
timeline: &Arc<Timeline>,
|
||||
desc: PersistentLayerDesc,
|
||||
) -> anyhow::Result<ResidentLayer> {
|
||||
let path = conf
|
||||
.timeline_path(&desc.tenant_id, &desc.timeline_id)
|
||||
.join(desc.filename().to_string());
|
||||
|
||||
let mut resident = None;
|
||||
|
||||
let outer = Arc::new_cyclic(|owner| {
|
||||
let owner = Layer(Arc::new_cyclic(|owner| {
|
||||
let inner = Arc::new(DownloadedLayer {
|
||||
owner: owner.clone(),
|
||||
kind: tokio::sync::OnceCell::default(),
|
||||
});
|
||||
resident = Some(inner.clone());
|
||||
LayerInner {
|
||||
conf,
|
||||
path,
|
||||
desc,
|
||||
timeline: Arc::downgrade(timeline),
|
||||
have_remote_client: timeline.remote_client.is_some(),
|
||||
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
|
||||
wanted_garbage_collected: AtomicBool::new(false),
|
||||
wanted_evicted: AtomicBool::new(false),
|
||||
inner: heavier_once_cell::OnceCell::new(ResidentOrWantedEvicted::Resident(inner)),
|
||||
version: AtomicUsize::new(0),
|
||||
status: tokio::sync::broadcast::channel(1).0,
|
||||
}
|
||||
});
|
||||
let access_stats = LayerAccessStats::empty_will_record_residence_event_later();
|
||||
access_stats.record_residence_event(
|
||||
LayerResidenceStatus::Resident,
|
||||
LayerResidenceEventReason::LayerCreate,
|
||||
);
|
||||
LayerInner::new(conf, timeline, access_stats, desc, Some(inner))
|
||||
}));
|
||||
|
||||
// FIXME: ugly, but if we don't do this check here, any error will pop up at read time
|
||||
// but we cannot check it because DeltaLayerWriter and ImageLayerWriter create the
|
||||
// instances *before* renaming the file to final destination
|
||||
// anyhow::ensure!(
|
||||
// outer.needs_download_blocking()?.is_none(),
|
||||
// "should not need downloading if it was just written"
|
||||
// );
|
||||
let downloaded = resident.expect("just initialized");
|
||||
|
||||
// FIXME: because we can now do garbage collection on drop, should we mark these files as
|
||||
// garbage collected until they get really get added to LayerMap? consider that files are
|
||||
// written out to disk, fsynced, renamed by `{Delta,Image}LayerWriter`, then waiting for
|
||||
// remaining files to be generated (compaction, create_image_layers) before being added to
|
||||
// LayerMap. We could panic or just error out during that time, even for unrelated reasons,
|
||||
// but the files would be left.
|
||||
// FIXME: should we handle the rename?
|
||||
|
||||
Ok(ResidentLayer {
|
||||
downloaded: resident.expect("just wrote Some"),
|
||||
owner: Layer(outer),
|
||||
})
|
||||
Ok(ResidentLayer { downloaded, owner })
|
||||
}
|
||||
|
||||
pub(crate) async fn evict_and_wait(
|
||||
@@ -449,6 +396,36 @@ impl Drop for LayerInner {
|
||||
}
|
||||
|
||||
impl LayerInner {
|
||||
fn new(
|
||||
conf: &'static PageServerConf,
|
||||
timeline: &Arc<Timeline>,
|
||||
access_stats: LayerAccessStats,
|
||||
desc: PersistentLayerDesc,
|
||||
downloaded: Option<Arc<DownloadedLayer>>,
|
||||
) -> Self {
|
||||
let path = conf
|
||||
.timeline_path(&timeline.tenant_id, &timeline.timeline_id)
|
||||
.join(desc.filename().to_string());
|
||||
|
||||
LayerInner {
|
||||
conf,
|
||||
path,
|
||||
desc,
|
||||
timeline: Arc::downgrade(timeline),
|
||||
have_remote_client: timeline.remote_client.is_some(),
|
||||
access_stats,
|
||||
wanted_garbage_collected: AtomicBool::new(false),
|
||||
wanted_evicted: AtomicBool::new(false),
|
||||
inner: if let Some(inner) = downloaded {
|
||||
heavier_once_cell::OnceCell::new(ResidentOrWantedEvicted::Resident(inner))
|
||||
} else {
|
||||
heavier_once_cell::OnceCell::default()
|
||||
},
|
||||
version: AtomicUsize::new(0),
|
||||
status: tokio::sync::broadcast::channel(1).0,
|
||||
}
|
||||
}
|
||||
|
||||
fn garbage_collect(&self) {
|
||||
self.wanted_garbage_collected.store(true, Ordering::Release);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user