Yet another apporach of copying logical timeline size during branch creation (#2139)

* Yet another apporach of copying logical timeline size during branch creation

* Fix unit tests

* Update pageserver/src/layered_repository.rs

Co-authored-by: Thang Pham <thang@neon.tech>

* Update pageserver/src/layered_repository.rs

Co-authored-by: Thang Pham <thang@neon.tech>

* Update pageserver/src/layered_repository.rs

Co-authored-by: Thang Pham <thang@neon.tech>

Co-authored-by: Thang Pham <thang@neon.tech>
This commit is contained in:
Konstantin Knizhnik
2022-07-26 09:11:10 +03:00
committed by GitHub
parent 45680f9a2d
commit 28243d68e6
3 changed files with 52 additions and 6 deletions

View File

@@ -259,6 +259,7 @@ impl Repository for LayeredRepository {
self.tenant_id, self.tenant_id,
Arc::clone(&self.walredo_mgr), Arc::clone(&self.walredo_mgr),
self.upload_layers, self.upload_layers,
None,
); );
timeline.layers.write().unwrap().next_open_layer_at = Some(initdb_lsn); timeline.layers.write().unwrap().next_open_layer_at = Some(initdb_lsn);
@@ -323,6 +324,20 @@ impl Repository for LayeredRepository {
)); ));
} }
} }
// Copy logical size from source timeline if we are branching on the last position.
let init_logical_size =
if let Ok(src_pgdir) = tenant_mgr::get_local_timeline_with_load(self.tenant_id, src) {
let logical_size = src_pgdir.get_current_logical_size();
// Check LSN after getting logical size to exclude race condition
// when ancestor timeline is concurrently updated
if src_timeline.get_last_record_lsn() == start_lsn {
Some(logical_size)
} else {
None
}
} else {
None
};
// Determine prev-LSN for the new timeline. We can only determine it if // Determine prev-LSN for the new timeline. We can only determine it if
// the timeline was branched at the current end of the source timeline. // the timeline was branched at the current end of the source timeline.
@@ -353,7 +368,14 @@ impl Repository for LayeredRepository {
); );
crashsafe_dir::create_dir_all(self.conf.timeline_path(&dst, &self.tenant_id))?; crashsafe_dir::create_dir_all(self.conf.timeline_path(&dst, &self.tenant_id))?;
Self::save_metadata(self.conf, dst, self.tenant_id, &metadata, true)?; Self::save_metadata(self.conf, dst, self.tenant_id, &metadata, true)?;
timelines.insert(dst, LayeredTimelineEntry::Unloaded { id: dst, metadata }); timelines.insert(
dst,
LayeredTimelineEntry::Unloaded {
id: dst,
metadata,
init_logical_size,
},
);
info!("branched timeline {} from {} at {}", dst, src, start_lsn); info!("branched timeline {} from {} at {}", dst, src, start_lsn);
@@ -489,7 +511,7 @@ impl Repository for LayeredRepository {
// we need to get metadata of a timeline, another option is to pass it along with Downloaded status // we need to get metadata of a timeline, another option is to pass it along with Downloaded status
let metadata = load_metadata(self.conf, timeline_id, self.tenant_id).context("failed to load local metadata")?; let metadata = load_metadata(self.conf, timeline_id, self.tenant_id).context("failed to load local metadata")?;
// finally we make newly downloaded timeline visible to repository // finally we make newly downloaded timeline visible to repository
entry.insert(LayeredTimelineEntry::Unloaded { id: timeline_id, metadata, }) entry.insert(LayeredTimelineEntry::Unloaded { id: timeline_id, metadata, init_logical_size: None })
}, },
}; };
Ok(()) Ok(())
@@ -506,6 +528,7 @@ enum LayeredTimelineEntry {
Unloaded { Unloaded {
id: ZTimelineId, id: ZTimelineId,
metadata: TimelineMetadata, metadata: TimelineMetadata,
init_logical_size: Option<usize>,
}, },
} }
@@ -673,13 +696,18 @@ impl LayeredRepository {
timelineid: ZTimelineId, timelineid: ZTimelineId,
timelines: &mut HashMap<ZTimelineId, LayeredTimelineEntry>, timelines: &mut HashMap<ZTimelineId, LayeredTimelineEntry>,
) -> anyhow::Result<Option<Arc<LayeredTimeline>>> { ) -> anyhow::Result<Option<Arc<LayeredTimeline>>> {
let logical_size: Option<usize>;
match timelines.get(&timelineid) { match timelines.get(&timelineid) {
Some(entry) => match entry { Some(entry) => match entry {
LayeredTimelineEntry::Loaded(local_timeline) => { LayeredTimelineEntry::Loaded(local_timeline) => {
debug!("timeline {} found loaded into memory", &timelineid); debug!("timeline {} found loaded into memory", &timelineid);
return Ok(Some(Arc::clone(local_timeline))); return Ok(Some(Arc::clone(local_timeline)));
} }
LayeredTimelineEntry::Unloaded { .. } => {} LayeredTimelineEntry::Unloaded {
init_logical_size, ..
} => {
logical_size = *init_logical_size;
}
}, },
None => { None => {
debug!("timeline {} not found", &timelineid); debug!("timeline {} not found", &timelineid);
@@ -690,7 +718,7 @@ impl LayeredRepository {
"timeline {} found on a local disk, but not loaded into the memory, loading", "timeline {} found on a local disk, but not loaded into the memory, loading",
&timelineid &timelineid
); );
let timeline = self.load_local_timeline(timelineid, timelines)?; let timeline = self.load_local_timeline(timelineid, timelines, logical_size)?;
let was_loaded = timelines.insert( let was_loaded = timelines.insert(
timelineid, timelineid,
LayeredTimelineEntry::Loaded(Arc::clone(&timeline)), LayeredTimelineEntry::Loaded(Arc::clone(&timeline)),
@@ -707,6 +735,7 @@ impl LayeredRepository {
&self, &self,
timeline_id: ZTimelineId, timeline_id: ZTimelineId,
timelines: &mut HashMap<ZTimelineId, LayeredTimelineEntry>, timelines: &mut HashMap<ZTimelineId, LayeredTimelineEntry>,
init_logical_size: Option<usize>,
) -> anyhow::Result<Arc<LayeredTimeline>> { ) -> anyhow::Result<Arc<LayeredTimeline>> {
let metadata = load_metadata(self.conf, timeline_id, self.tenant_id) let metadata = load_metadata(self.conf, timeline_id, self.tenant_id)
.context("failed to load metadata")?; .context("failed to load metadata")?;
@@ -733,6 +762,7 @@ impl LayeredRepository {
self.tenant_id, self.tenant_id,
Arc::clone(&self.walredo_mgr), Arc::clone(&self.walredo_mgr),
self.upload_layers, self.upload_layers,
init_logical_size,
); );
timeline timeline
.load_layer_map(disk_consistent_lsn) .load_layer_map(disk_consistent_lsn)
@@ -1099,6 +1129,10 @@ pub struct LayeredTimeline {
// It can be unified with latest_gc_cutoff_lsn under some "first_valid_lsn", // It can be unified with latest_gc_cutoff_lsn under some "first_valid_lsn",
// though lets keep them both for better error visibility. // though lets keep them both for better error visibility.
initdb_lsn: Lsn, initdb_lsn: Lsn,
// Initial logical size of timeline (if known).
// Logical size can be copied from ancestor timeline when new branch is create at last LSN
pub init_logical_size: Option<usize>,
} }
/// ///
@@ -1299,6 +1333,7 @@ impl LayeredTimeline {
tenant_id: ZTenantId, tenant_id: ZTenantId,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>, walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
upload_layers: bool, upload_layers: bool,
init_logical_size: Option<usize>,
) -> LayeredTimeline { ) -> LayeredTimeline {
let reconstruct_time_histo = RECONSTRUCT_TIME let reconstruct_time_histo = RECONSTRUCT_TIME
.get_metric_with_label_values(&[&tenant_id.to_string(), &timeline_id.to_string()]) .get_metric_with_label_values(&[&tenant_id.to_string(), &timeline_id.to_string()])
@@ -1377,6 +1412,7 @@ impl LayeredTimeline {
latest_gc_cutoff_lsn: RwLock::new(metadata.latest_gc_cutoff_lsn()), latest_gc_cutoff_lsn: RwLock::new(metadata.latest_gc_cutoff_lsn()),
initdb_lsn: metadata.initdb_lsn(), initdb_lsn: metadata.initdb_lsn(),
init_logical_size,
} }
} }

View File

@@ -76,6 +76,12 @@ impl<R: Repository> DatadirTimeline<R> {
Ok(()) Ok(())
} }
/// Set timeline logical size.
pub fn set_logical_size(&self, size: usize) {
self.current_logical_size
.store(size as isize, Ordering::SeqCst);
}
/// Start ingesting a WAL record, or other atomic modification of /// Start ingesting a WAL record, or other atomic modification of
/// the timeline. /// the timeline.
/// ///

View File

@@ -494,12 +494,16 @@ fn load_local_timeline(
format!("Inmem timeline {timeline_id} not found in tenant's repository") format!("Inmem timeline {timeline_id} not found in tenant's repository")
})?; })?;
let repartition_distance = repo.get_checkpoint_distance() / 10; let repartition_distance = repo.get_checkpoint_distance() / 10;
let init_logical_size = inmem_timeline.init_logical_size;
let page_tline = Arc::new(DatadirTimelineImpl::new( let page_tline = Arc::new(DatadirTimelineImpl::new(
inmem_timeline, inmem_timeline,
repartition_distance, repartition_distance,
)); ));
page_tline.init_logical_size()?; if let Some(logical_size) = init_logical_size {
page_tline.set_logical_size(logical_size);
} else {
page_tline.init_logical_size()?;
}
tenants_state::try_send_timeline_update(LocalTimelineUpdate::Attach { tenants_state::try_send_timeline_update(LocalTimelineUpdate::Attach {
id: ZTenantTimelineId::new(repo.tenant_id(), timeline_id), id: ZTenantTimelineId::new(repo.tenant_id(), timeline_id),
datadir: Arc::clone(&page_tline), datadir: Arc::clone(&page_tline),