mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 18:02:56 +00:00
@@ -489,7 +489,9 @@ impl PageServerHandler {
|
||||
// Create empty timeline
|
||||
info!("creating new timeline");
|
||||
let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
|
||||
let timeline = tenant.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)?;
|
||||
let timeline = tenant
|
||||
.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
|
||||
.await?;
|
||||
|
||||
// TODO mark timeline as not ready until it reaches end_lsn.
|
||||
// We might have some wal to import as well, and we should prevent compute
|
||||
|
||||
@@ -1241,7 +1241,7 @@ impl Tenant {
|
||||
/// For tests, use `DatadirModification::init_empty_test_timeline` + `commit` to setup the
|
||||
/// minimum amount of keys required to get a writable timeline.
|
||||
/// (Without it, `put` might fail due to `repartition` failing.)
|
||||
pub fn create_empty_timeline(
|
||||
pub async fn create_empty_timeline(
|
||||
&self,
|
||||
new_timeline_id: TimelineId,
|
||||
initdb_lsn: Lsn,
|
||||
@@ -1253,9 +1253,11 @@ impl Tenant {
|
||||
"Cannot create empty timelines on inactive tenant"
|
||||
);
|
||||
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
let timeline_uninit_mark = self.create_timeline_uninit_mark(new_timeline_id, &timelines)?;
|
||||
drop(timelines);
|
||||
let timeline_uninit_mark = {
|
||||
let timelines: MutexGuard<'_, HashMap<TimelineId, Arc<Timeline>>> =
|
||||
self.timelines.lock().unwrap();
|
||||
self.create_timeline_uninit_mark(new_timeline_id, &timelines)?
|
||||
};
|
||||
|
||||
let new_metadata = TimelineMetadata::new(
|
||||
// Initialize disk_consistent LSN to 0, The caller must import some data to
|
||||
@@ -1275,6 +1277,7 @@ impl Tenant {
|
||||
initdb_lsn,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Helper for unit tests to create an emtpy timeline.
|
||||
@@ -1290,7 +1293,9 @@ impl Tenant {
|
||||
pg_version: u32,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let uninit_tl = self.create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)?;
|
||||
let uninit_tl = self
|
||||
.create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)
|
||||
.await?;
|
||||
let tline = uninit_tl.raw_timeline().expect("we just created it");
|
||||
assert_eq!(tline.get_last_record_lsn(), Lsn(0));
|
||||
|
||||
@@ -2750,13 +2755,15 @@ impl Tenant {
|
||||
src_timeline.pg_version,
|
||||
);
|
||||
|
||||
let uninitialized_timeline = self.prepare_new_timeline(
|
||||
dst_id,
|
||||
&metadata,
|
||||
timeline_uninit_mark,
|
||||
start_lsn + 1,
|
||||
Some(Arc::clone(src_timeline)),
|
||||
)?;
|
||||
let uninitialized_timeline = self
|
||||
.prepare_new_timeline(
|
||||
dst_id,
|
||||
&metadata,
|
||||
timeline_uninit_mark,
|
||||
start_lsn + 1,
|
||||
Some(Arc::clone(src_timeline)),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let new_timeline = uninitialized_timeline.finish_creation()?;
|
||||
|
||||
@@ -2834,13 +2841,15 @@ impl Tenant {
|
||||
pgdata_lsn,
|
||||
pg_version,
|
||||
);
|
||||
let raw_timeline = self.prepare_new_timeline(
|
||||
timeline_id,
|
||||
&new_metadata,
|
||||
timeline_uninit_mark,
|
||||
pgdata_lsn,
|
||||
None,
|
||||
)?;
|
||||
let raw_timeline = self
|
||||
.prepare_new_timeline(
|
||||
timeline_id,
|
||||
&new_metadata,
|
||||
timeline_uninit_mark,
|
||||
pgdata_lsn,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let tenant_id = raw_timeline.owning_tenant.tenant_id;
|
||||
let unfinished_timeline = raw_timeline.raw_timeline()?;
|
||||
@@ -2893,7 +2902,7 @@ impl Tenant {
|
||||
/// at 'disk_consistent_lsn'. After any initial data has been imported, call
|
||||
/// `finish_creation` to insert the Timeline into the timelines map and to remove the
|
||||
/// uninit mark file.
|
||||
fn prepare_new_timeline(
|
||||
async fn prepare_new_timeline(
|
||||
&self,
|
||||
new_timeline_id: TimelineId,
|
||||
new_metadata: &TimelineMetadata,
|
||||
@@ -2920,7 +2929,7 @@ impl Tenant {
|
||||
.create_timeline_struct(new_timeline_id, new_metadata, ancestor, remote_client, None)
|
||||
.context("Failed to create timeline data structure")?;
|
||||
|
||||
timeline_struct.init_empty_layer_map(start_lsn);
|
||||
timeline_struct.init_empty_layer_map(start_lsn).await?;
|
||||
|
||||
if let Err(e) =
|
||||
self.create_timeline_files(&uninit_mark.timeline_path, new_timeline_id, new_metadata)
|
||||
@@ -3617,7 +3626,10 @@ mod tests {
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
|
||||
match tenant.create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) {
|
||||
match tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
|
||||
.await
|
||||
{
|
||||
Ok(_) => panic!("duplicate timeline creation should fail"),
|
||||
Err(e) => assert_eq!(
|
||||
e.to_string(),
|
||||
@@ -4417,8 +4429,9 @@ mod tests {
|
||||
.await;
|
||||
|
||||
let initdb_lsn = Lsn(0x20);
|
||||
let utline =
|
||||
tenant.create_empty_timeline(TIMELINE_ID, initdb_lsn, DEFAULT_PG_VERSION, &ctx)?;
|
||||
let utline = tenant
|
||||
.create_empty_timeline(TIMELINE_ID, initdb_lsn, DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
let tline = utline.raw_timeline().unwrap();
|
||||
|
||||
// Spawn flush loop now so that we can set the `expect_initdb_optimization`
|
||||
|
||||
@@ -63,18 +63,6 @@ impl LayerMapMgr {
|
||||
self.layer_map.store(Arc::new(new_state));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Update the layer map.
|
||||
pub fn update_sync<O>(&self, operation: O) -> Result<()>
|
||||
where
|
||||
O: FnOnce(LayerMap) -> Result<LayerMap>,
|
||||
{
|
||||
let state_lock = self.state_lock.blocking_lock();
|
||||
let state = self.clone_for_write(&state_lock);
|
||||
let new_state = operation(state)?;
|
||||
self.layer_map.store(Arc::new(new_state));
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -174,14 +174,9 @@ impl LayerAccessStats {
|
||||
/// Create an empty stats object and record a [`LayerLoad`] event with the given residence status.
|
||||
///
|
||||
/// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
|
||||
pub(crate) fn for_loading_layer(
|
||||
status: LayerResidenceStatus,
|
||||
) -> Self {
|
||||
pub(crate) fn for_loading_layer(status: LayerResidenceStatus) -> Self {
|
||||
let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default()));
|
||||
new.record_residence_event(
|
||||
status,
|
||||
LayerResidenceEventReason::LayerLoad,
|
||||
);
|
||||
new.record_residence_event(status, LayerResidenceEventReason::LayerLoad);
|
||||
new
|
||||
}
|
||||
|
||||
@@ -199,10 +194,7 @@ impl LayerAccessStats {
|
||||
inner.clone()
|
||||
};
|
||||
let new = LayerAccessStats(Mutex::new(clone));
|
||||
new.record_residence_event(
|
||||
new_status,
|
||||
LayerResidenceEventReason::ResidenceChange,
|
||||
);
|
||||
new.record_residence_event(new_status, LayerResidenceEventReason::ResidenceChange);
|
||||
new
|
||||
}
|
||||
|
||||
|
||||
@@ -1601,13 +1601,13 @@ impl Timeline {
|
||||
///
|
||||
/// Initialize with an empty layer map. Used when creating a new timeline.
|
||||
///
|
||||
pub(super) fn init_empty_layer_map(&self, start_lsn: Lsn) {
|
||||
pub(super) async fn init_empty_layer_map(&self, start_lsn: Lsn) -> anyhow::Result<()> {
|
||||
self.layer_mgr
|
||||
.update_sync(|mut layers| {
|
||||
.update(|mut layers| async move {
|
||||
layers.next_open_layer_at = Some(Lsn(start_lsn.0));
|
||||
Ok(layers)
|
||||
})
|
||||
.unwrap();
|
||||
.await
|
||||
}
|
||||
|
||||
///
|
||||
@@ -1752,125 +1752,132 @@ impl Timeline {
|
||||
// We're holding a layer map lock for a while but this
|
||||
// method is only called during init so it's fine.
|
||||
let guard = self.lcache.layer_in_use_write().await;
|
||||
let mut layers_to_remove = vec![];
|
||||
let mut layers_to_insert = vec![];
|
||||
|
||||
let operation = |mut layer_map: LayerMap| {
|
||||
let mut updates = layer_map.batch_update();
|
||||
for remote_layer_name in &index_part.timeline_layers {
|
||||
let local_layer = local_only_layers.remove(remote_layer_name);
|
||||
for remote_layer_name in &index_part.timeline_layers {
|
||||
let local_layer = local_only_layers.remove(remote_layer_name);
|
||||
|
||||
let remote_layer_metadata = index_part
|
||||
.layer_metadata
|
||||
.get(remote_layer_name)
|
||||
.map(LayerFileMetadata::from)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"No remote layer metadata found for layer {}",
|
||||
remote_layer_name.file_name()
|
||||
)
|
||||
})?;
|
||||
let remote_layer_metadata = index_part
|
||||
.layer_metadata
|
||||
.get(remote_layer_name)
|
||||
.map(LayerFileMetadata::from)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"No remote layer metadata found for layer {}",
|
||||
remote_layer_name.file_name()
|
||||
)
|
||||
})?;
|
||||
|
||||
// Is the local layer's size different from the size stored in the
|
||||
// remote index file?
|
||||
// If so, rename_to_backup those files & replace their local layer with
|
||||
// a RemoteLayer in the layer map so that we re-download them on-demand.
|
||||
if let Some(local_layer) = local_layer {
|
||||
let local_layer_path = local_layer
|
||||
.local_path()
|
||||
.expect("caller must ensure that local_layers only contains local layers");
|
||||
ensure!(
|
||||
local_layer_path.exists(),
|
||||
"every layer from local_layers must exist on disk: {}",
|
||||
local_layer_path.display()
|
||||
);
|
||||
|
||||
let remote_size = remote_layer_metadata.file_size();
|
||||
let metadata = local_layer_path.metadata().with_context(|| {
|
||||
format!(
|
||||
"get file size of local layer {}",
|
||||
local_layer_path.display()
|
||||
)
|
||||
})?;
|
||||
let local_size = metadata.len();
|
||||
if local_size != remote_size {
|
||||
warn!("removing local file {local_layer_path:?} because it has unexpected length {local_size}; length in remote index is {remote_size}");
|
||||
if let Err(err) = rename_to_backup(&local_layer_path) {
|
||||
assert!(local_layer_path.exists(), "we would leave the local_layer without a file if this does not hold: {}", local_layer_path.display());
|
||||
anyhow::bail!("could not rename file {local_layer_path:?}: {err:?}");
|
||||
} else {
|
||||
self.metrics.resident_physical_size_gauge.sub(local_size);
|
||||
updates.remove_historic(local_layer.layer_desc().clone());
|
||||
self.lcache.remove_local_when_init(local_layer);
|
||||
// fall-through to adding the remote layer
|
||||
}
|
||||
} else {
|
||||
debug!(
|
||||
"layer is present locally and file size matches remote, using it: {}",
|
||||
local_layer_path.display()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
info!(
|
||||
"remote layer does not exist locally, creating remote layer: {}",
|
||||
remote_layer_name.file_name()
|
||||
// Is the local layer's size different from the size stored in the
|
||||
// remote index file?
|
||||
// If so, rename_to_backup those files & replace their local layer with
|
||||
// a RemoteLayer in the layer map so that we re-download them on-demand.
|
||||
if let Some(local_layer) = local_layer {
|
||||
let local_layer_path = local_layer
|
||||
.local_path()
|
||||
.expect("caller must ensure that local_layers only contains local layers");
|
||||
ensure!(
|
||||
local_layer_path.exists(),
|
||||
"every layer from local_layers must exist on disk: {}",
|
||||
local_layer_path.display()
|
||||
);
|
||||
|
||||
match remote_layer_name {
|
||||
LayerFileName::Image(imgfilename) => {
|
||||
if imgfilename.lsn > up_to_date_disk_consistent_lsn {
|
||||
warn!(
|
||||
"found future image layer {} on timeline {} remote_consistent_lsn is {}",
|
||||
imgfilename, self.timeline_id, up_to_date_disk_consistent_lsn
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
let remote_layer = RemoteLayer::new_img(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
imgfilename,
|
||||
&remote_layer_metadata,
|
||||
LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted),
|
||||
);
|
||||
let remote_layer = Arc::new(remote_layer);
|
||||
|
||||
updates.insert_historic(remote_layer.layer_desc().clone());
|
||||
self.lcache.populate_remote_when_init(remote_layer);
|
||||
}
|
||||
LayerFileName::Delta(deltafilename) => {
|
||||
// Create a RemoteLayer for the delta file.
|
||||
// The end-LSN is exclusive, while disk_consistent_lsn is
|
||||
// inclusive. For example, if disk_consistent_lsn is 100, it is
|
||||
// OK for a delta layer to have end LSN 101, but if the end LSN
|
||||
// is 102, then it might not have been fully flushed to disk
|
||||
// before crash.
|
||||
if deltafilename.lsn_range.end > up_to_date_disk_consistent_lsn + 1 {
|
||||
warn!(
|
||||
"found future delta layer {} on timeline {} remote_consistent_lsn is {}",
|
||||
deltafilename, self.timeline_id, up_to_date_disk_consistent_lsn
|
||||
);
|
||||
continue;
|
||||
}
|
||||
let remote_layer = RemoteLayer::new_delta(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
deltafilename,
|
||||
&remote_layer_metadata,
|
||||
LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted),
|
||||
);
|
||||
let remote_layer = Arc::new(remote_layer);
|
||||
updates.insert_historic(remote_layer.layer_desc().clone());
|
||||
self.lcache.populate_remote_when_init(remote_layer);
|
||||
let remote_size = remote_layer_metadata.file_size();
|
||||
let metadata = local_layer_path.metadata().with_context(|| {
|
||||
format!(
|
||||
"get file size of local layer {}",
|
||||
local_layer_path.display()
|
||||
)
|
||||
})?;
|
||||
let local_size = metadata.len();
|
||||
if local_size != remote_size {
|
||||
warn!("removing local file {local_layer_path:?} because it has unexpected length {local_size}; length in remote index is {remote_size}");
|
||||
if let Err(err) = rename_to_backup(&local_layer_path) {
|
||||
assert!(local_layer_path.exists(), "we would leave the local_layer without a file if this does not hold: {}", local_layer_path.display());
|
||||
anyhow::bail!("could not rename file {local_layer_path:?}: {err:?}");
|
||||
} else {
|
||||
self.metrics.resident_physical_size_gauge.sub(local_size);
|
||||
layers_to_remove.push(local_layer.layer_desc().clone());
|
||||
self.lcache.remove_local_when_init(local_layer);
|
||||
// fall-through to adding the remote layer
|
||||
}
|
||||
} else {
|
||||
debug!(
|
||||
"layer is present locally and file size matches remote, using it: {}",
|
||||
local_layer_path.display()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
info!(
|
||||
"remote layer does not exist locally, creating remote layer: {}",
|
||||
remote_layer_name.file_name()
|
||||
);
|
||||
|
||||
match remote_layer_name {
|
||||
LayerFileName::Image(imgfilename) => {
|
||||
if imgfilename.lsn > up_to_date_disk_consistent_lsn {
|
||||
warn!(
|
||||
"found future image layer {} on timeline {} remote_consistent_lsn is {}",
|
||||
imgfilename, self.timeline_id, up_to_date_disk_consistent_lsn
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
let remote_layer = RemoteLayer::new_img(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
imgfilename,
|
||||
&remote_layer_metadata,
|
||||
LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted),
|
||||
);
|
||||
let remote_layer = Arc::new(remote_layer);
|
||||
layers_to_insert.push(remote_layer.layer_desc().clone());
|
||||
self.lcache.populate_remote_when_init(remote_layer);
|
||||
}
|
||||
LayerFileName::Delta(deltafilename) => {
|
||||
// Create a RemoteLayer for the delta file.
|
||||
// The end-LSN is exclusive, while disk_consistent_lsn is
|
||||
// inclusive. For example, if disk_consistent_lsn is 100, it is
|
||||
// OK for a delta layer to have end LSN 101, but if the end LSN
|
||||
// is 102, then it might not have been fully flushed to disk
|
||||
// before crash.
|
||||
if deltafilename.lsn_range.end > up_to_date_disk_consistent_lsn + 1 {
|
||||
warn!(
|
||||
"found future delta layer {} on timeline {} remote_consistent_lsn is {}",
|
||||
deltafilename, self.timeline_id, up_to_date_disk_consistent_lsn
|
||||
);
|
||||
continue;
|
||||
}
|
||||
let remote_layer = RemoteLayer::new_delta(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
deltafilename,
|
||||
&remote_layer_metadata,
|
||||
LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted),
|
||||
);
|
||||
let remote_layer = Arc::new(remote_layer);
|
||||
layers_to_insert.push(remote_layer.layer_desc().clone());
|
||||
self.lcache.populate_remote_when_init(remote_layer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let operation = |mut layer_map: LayerMap| async {
|
||||
let mut updates = layer_map.batch_update();
|
||||
for layer in layers_to_remove {
|
||||
updates.remove_historic(layer);
|
||||
}
|
||||
for layer in layers_to_insert {
|
||||
updates.insert_historic(layer);
|
||||
}
|
||||
updates.flush();
|
||||
Ok(layer_map)
|
||||
};
|
||||
|
||||
self.layer_mgr.update_sync(operation)?;
|
||||
self.layer_mgr.update(operation).await?;
|
||||
|
||||
Ok(local_only_layers)
|
||||
}
|
||||
@@ -2731,11 +2738,13 @@ impl Timeline {
|
||||
InMemoryLayer::create(self.conf, self.timeline_id, self.tenant_id, start_lsn)?;
|
||||
let layer_rc = Arc::new(new_layer);
|
||||
|
||||
self.layer_mgr.update_sync(|mut layers| {
|
||||
layers.open_layer = Some(Arc::clone(&layer_rc));
|
||||
layers.next_open_layer_at = None;
|
||||
Ok(layers)
|
||||
})?;
|
||||
self.layer_mgr
|
||||
.update(|mut layers| async {
|
||||
layers.open_layer = Some(Arc::clone(&layer_rc));
|
||||
layers.next_open_layer_at = None;
|
||||
Ok(layers)
|
||||
})
|
||||
.await?;
|
||||
|
||||
layer = layer_rc;
|
||||
}
|
||||
@@ -2944,7 +2953,7 @@ impl Timeline {
|
||||
{
|
||||
let _guard = self.lcache.layer_in_use_write().await;
|
||||
self.layer_mgr
|
||||
.update_sync(|mut layers| {
|
||||
.update(|mut layers| async {
|
||||
let l = layers.frozen_layers.pop_front();
|
||||
|
||||
// Only one thread may call this function at a time (for this
|
||||
@@ -2954,6 +2963,7 @@ impl Timeline {
|
||||
|
||||
Ok(layers)
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
// release lock on 'layers'
|
||||
}
|
||||
@@ -3085,17 +3095,19 @@ impl Timeline {
|
||||
// Add it to the layer map
|
||||
let l = Arc::new(new_delta);
|
||||
let guard = self.lcache.layer_in_use_write().await;
|
||||
self.layer_mgr.update_sync(|mut layers| {
|
||||
let mut batch_updates = layers.batch_update();
|
||||
l.access_stats().record_residence_event(
|
||||
LayerResidenceStatus::Resident,
|
||||
LayerResidenceEventReason::LayerCreate,
|
||||
);
|
||||
batch_updates.insert_historic(l.layer_desc().clone());
|
||||
self.lcache.create_new_layer(l);
|
||||
batch_updates.flush();
|
||||
Ok(layers)
|
||||
})?;
|
||||
self.layer_mgr
|
||||
.update(|mut layers| async {
|
||||
let mut batch_updates = layers.batch_update();
|
||||
l.access_stats().record_residence_event(
|
||||
LayerResidenceStatus::Resident,
|
||||
LayerResidenceEventReason::LayerCreate,
|
||||
);
|
||||
batch_updates.insert_historic(l.layer_desc().clone());
|
||||
self.lcache.create_new_layer(l);
|
||||
batch_updates.flush();
|
||||
Ok(layers)
|
||||
})
|
||||
.await?;
|
||||
|
||||
// update the timeline's physical size
|
||||
self.metrics.resident_physical_size_gauge.add(sz);
|
||||
@@ -3324,35 +3336,37 @@ impl Timeline {
|
||||
let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len());
|
||||
|
||||
let guard = self.lcache.layer_in_use_write().await;
|
||||
self.layer_mgr.update_sync(|mut layers| {
|
||||
let mut updates = layers.batch_update();
|
||||
let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id);
|
||||
self.layer_mgr
|
||||
.update(|mut layers| async {
|
||||
let mut updates = layers.batch_update();
|
||||
let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id);
|
||||
|
||||
for l in image_layers {
|
||||
let path = l.filename();
|
||||
let metadata = timeline_path
|
||||
.join(path.file_name())
|
||||
.metadata()
|
||||
.with_context(|| {
|
||||
format!("reading metadata of layer file {}", path.file_name())
|
||||
})?;
|
||||
for l in image_layers {
|
||||
let path = l.filename();
|
||||
let metadata = timeline_path
|
||||
.join(path.file_name())
|
||||
.metadata()
|
||||
.with_context(|| {
|
||||
format!("reading metadata of layer file {}", path.file_name())
|
||||
})?;
|
||||
|
||||
layer_paths_to_upload.insert(path, LayerFileMetadata::new(metadata.len()));
|
||||
layer_paths_to_upload.insert(path, LayerFileMetadata::new(metadata.len()));
|
||||
|
||||
self.metrics
|
||||
.resident_physical_size_gauge
|
||||
.add(metadata.len());
|
||||
let l = Arc::new(l);
|
||||
l.access_stats().record_residence_event(
|
||||
LayerResidenceStatus::Resident,
|
||||
LayerResidenceEventReason::LayerCreate,
|
||||
);
|
||||
updates.insert_historic(l.layer_desc().clone());
|
||||
self.lcache.create_new_layer(l);
|
||||
}
|
||||
updates.flush();
|
||||
Ok(layers)
|
||||
})?;
|
||||
self.metrics
|
||||
.resident_physical_size_gauge
|
||||
.add(metadata.len());
|
||||
let l = Arc::new(l);
|
||||
l.access_stats().record_residence_event(
|
||||
LayerResidenceStatus::Resident,
|
||||
LayerResidenceEventReason::LayerCreate,
|
||||
);
|
||||
updates.insert_historic(l.layer_desc().clone());
|
||||
self.lcache.create_new_layer(l);
|
||||
}
|
||||
updates.flush();
|
||||
Ok(layers)
|
||||
})
|
||||
.await?;
|
||||
drop(guard);
|
||||
timer.stop_and_record();
|
||||
|
||||
@@ -3767,50 +3781,52 @@ impl Timeline {
|
||||
|
||||
let guard = self.lcache.layer_in_use_write().await;
|
||||
let mut layer_names_to_delete = Vec::with_capacity(deltas_to_compact.len());
|
||||
self.layer_mgr.update_sync(|mut layers| {
|
||||
let mut updates = layers.batch_update();
|
||||
let mut new_layer_paths = HashMap::with_capacity(new_layers.len());
|
||||
for l in new_layers {
|
||||
let new_delta_path = l.path();
|
||||
self.layer_mgr
|
||||
.update(|mut layers| async {
|
||||
let mut updates = layers.batch_update();
|
||||
let mut new_layer_paths = HashMap::with_capacity(new_layers.len());
|
||||
for l in new_layers {
|
||||
let new_delta_path = l.path();
|
||||
|
||||
let metadata = new_delta_path.metadata().with_context(|| {
|
||||
format!(
|
||||
"read file metadata for new created layer {}",
|
||||
new_delta_path.display()
|
||||
)
|
||||
})?;
|
||||
let metadata = new_delta_path.metadata().with_context(|| {
|
||||
format!(
|
||||
"read file metadata for new created layer {}",
|
||||
new_delta_path.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
remote_client.schedule_layer_file_upload(
|
||||
&l.filename(),
|
||||
&LayerFileMetadata::new(metadata.len()),
|
||||
)?;
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
remote_client.schedule_layer_file_upload(
|
||||
&l.filename(),
|
||||
&LayerFileMetadata::new(metadata.len()),
|
||||
)?;
|
||||
}
|
||||
|
||||
// update the timeline's physical size
|
||||
self.metrics
|
||||
.resident_physical_size_gauge
|
||||
.add(metadata.len());
|
||||
|
||||
new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len()));
|
||||
let x: Arc<dyn PersistentLayer + 'static> = Arc::new(l);
|
||||
x.access_stats().record_residence_event(
|
||||
LayerResidenceStatus::Resident,
|
||||
LayerResidenceEventReason::LayerCreate,
|
||||
);
|
||||
updates.insert_historic(x.layer_desc().clone());
|
||||
self.lcache.create_new_layer(x);
|
||||
}
|
||||
|
||||
// update the timeline's physical size
|
||||
self.metrics
|
||||
.resident_physical_size_gauge
|
||||
.add(metadata.len());
|
||||
|
||||
new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len()));
|
||||
let x: Arc<dyn PersistentLayer + 'static> = Arc::new(l);
|
||||
x.access_stats().record_residence_event(
|
||||
LayerResidenceStatus::Resident,
|
||||
LayerResidenceEventReason::LayerCreate,
|
||||
);
|
||||
updates.insert_historic(x.layer_desc().clone());
|
||||
self.lcache.create_new_layer(x);
|
||||
}
|
||||
|
||||
// Now that we have reshuffled the data to set of new delta layers, we can
|
||||
// delete the old ones
|
||||
for l in deltas_to_compact {
|
||||
layer_names_to_delete.push(l.filename());
|
||||
self.delete_historic_layer(layer_removal_cs.clone(), l, &mut updates)?;
|
||||
}
|
||||
updates.flush();
|
||||
Ok(layers)
|
||||
})?;
|
||||
// Now that we have reshuffled the data to set of new delta layers, we can
|
||||
// delete the old ones
|
||||
for l in deltas_to_compact {
|
||||
layer_names_to_delete.push(l.filename());
|
||||
self.delete_historic_layer(layer_removal_cs.clone(), l, &mut updates)?;
|
||||
}
|
||||
updates.flush();
|
||||
Ok(layers)
|
||||
})
|
||||
.await?;
|
||||
drop(guard);
|
||||
|
||||
// Also schedule the deletions in remote storage
|
||||
@@ -4127,40 +4143,42 @@ impl Timeline {
|
||||
.unwrap()
|
||||
.replace((new_gc_cutoff, wanted_image_layers.to_keyspace()));
|
||||
|
||||
self.layer_mgr.update_sync(|mut layers| {
|
||||
let mut updates = layers.batch_update();
|
||||
if !layers_to_remove.is_empty() {
|
||||
// Persist the new GC cutoff value in the metadata file, before
|
||||
// we actually remove anything.
|
||||
self.update_metadata_file(self.disk_consistent_lsn.load(), HashMap::new())?;
|
||||
self.layer_mgr
|
||||
.update(|mut layers| async move {
|
||||
let mut updates = layers.batch_update();
|
||||
if !layers_to_remove.is_empty() {
|
||||
// Persist the new GC cutoff value in the metadata file, before
|
||||
// we actually remove anything.
|
||||
self.update_metadata_file(self.disk_consistent_lsn.load(), HashMap::new())?;
|
||||
|
||||
// Actually delete the layers from disk and remove them from the map.
|
||||
// (couldn't do this in the loop above, because you cannot modify a collection
|
||||
// while iterating it. BTreeMap::retain() would be another option)
|
||||
let mut layer_names_to_delete = Vec::with_capacity(layers_to_remove.len());
|
||||
{
|
||||
for doomed_layer in layers_to_remove {
|
||||
layer_names_to_delete.push(doomed_layer.filename());
|
||||
self.delete_historic_layer(
|
||||
layer_removal_cs.clone(),
|
||||
doomed_layer,
|
||||
&mut updates,
|
||||
)?; // FIXME: schedule succeeded deletions before returning?
|
||||
result.layers_removed += 1;
|
||||
// Actually delete the layers from disk and remove them from the map.
|
||||
// (couldn't do this in the loop above, because you cannot modify a collection
|
||||
// while iterating it. BTreeMap::retain() would be another option)
|
||||
let mut layer_names_to_delete = Vec::with_capacity(layers_to_remove.len());
|
||||
{
|
||||
for doomed_layer in layers_to_remove {
|
||||
layer_names_to_delete.push(doomed_layer.filename());
|
||||
self.delete_historic_layer(
|
||||
layer_removal_cs.clone(),
|
||||
doomed_layer,
|
||||
&mut updates,
|
||||
)?; // FIXME: schedule succeeded deletions before returning?
|
||||
result.layers_removed += 1;
|
||||
}
|
||||
}
|
||||
|
||||
if result.layers_removed != 0 {
|
||||
fail_point!("after-timeline-gc-removed-layers");
|
||||
}
|
||||
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
remote_client.schedule_layer_file_deletion(&layer_names_to_delete)?;
|
||||
}
|
||||
}
|
||||
|
||||
if result.layers_removed != 0 {
|
||||
fail_point!("after-timeline-gc-removed-layers");
|
||||
}
|
||||
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
remote_client.schedule_layer_file_deletion(&layer_names_to_delete)?;
|
||||
}
|
||||
}
|
||||
updates.flush();
|
||||
Ok(layers)
|
||||
})?;
|
||||
updates.flush();
|
||||
Ok(layers)
|
||||
})
|
||||
.await?;
|
||||
|
||||
info!(
|
||||
"GC completed removing {} layers, cutoff {}",
|
||||
|
||||
Reference in New Issue
Block a user