mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 14:02:55 +00:00
Ensure timeline logical size is initialized once
This commit is contained in:
committed by
Kirill Bulatov
parent
32be8739b9
commit
7779308985
@@ -205,7 +205,7 @@ impl Repository {
|
||||
|
||||
crate::tenant_mgr::try_send_timeline_update(LocalTimelineUpdate::Attach {
|
||||
id: ZTenantTimelineId::new(self.tenant_id(), timeline_id),
|
||||
datadir: Arc::clone(&timeline),
|
||||
timeline: Arc::clone(&timeline),
|
||||
});
|
||||
|
||||
Ok(timeline)
|
||||
@@ -572,8 +572,7 @@ impl Repository {
|
||||
}
|
||||
};
|
||||
debug!(
|
||||
"timeline {} found on a local disk, but not loaded into the memory, loading",
|
||||
&timeline_id
|
||||
"timeline {timeline_id} found on a local disk, but not loaded into the memory, loading"
|
||||
);
|
||||
let timeline = self.load_local_timeline(timeline_id, timelines)?;
|
||||
let was_loaded = timelines.insert(
|
||||
@@ -585,10 +584,6 @@ impl Repository {
|
||||
|| matches!(was_loaded, Some(LayeredTimelineEntry::Unloaded { .. })),
|
||||
"assertion failure, inserted wrong timeline in an incorrect state"
|
||||
);
|
||||
crate::tenant_mgr::try_send_timeline_update(LocalTimelineUpdate::Attach {
|
||||
id: ZTenantTimelineId::new(self.tenant_id(), timeline_id),
|
||||
datadir: Arc::clone(&timeline),
|
||||
});
|
||||
Ok(Some(timeline))
|
||||
}
|
||||
|
||||
@@ -627,7 +622,14 @@ impl Repository {
|
||||
.load_layer_map(disk_consistent_lsn)
|
||||
.context("failed to load layermap")?;
|
||||
|
||||
Ok(Arc::new(timeline))
|
||||
let timeline = Arc::new(timeline);
|
||||
|
||||
crate::tenant_mgr::try_send_timeline_update(LocalTimelineUpdate::Attach {
|
||||
id: ZTenantTimelineId::new(self.tenant_id(), timeline_id),
|
||||
timeline: Arc::clone(&timeline),
|
||||
});
|
||||
|
||||
Ok(timeline)
|
||||
}
|
||||
|
||||
pub fn new(
|
||||
|
||||
@@ -412,6 +412,11 @@ pub struct Timeline {
|
||||
/// and `set_current_logical_size` functions to modify this, they will
|
||||
/// also keep the prometheus metric in sync.
|
||||
current_logical_size: AtomicI64,
|
||||
// TODO we don't have a good, API to ensure on a compilation level
|
||||
// that the timeline passes all initialization.
|
||||
// Hence we ensure that we init at least once for every timeline
|
||||
// and keep this flag to avoid potentually long recomputes.
|
||||
logical_size_initialized: AtomicBool,
|
||||
|
||||
/// Information about the last processed message by the WAL receiver,
|
||||
/// or None if WAL receiver has not received anything for this timeline
|
||||
@@ -731,6 +736,7 @@ impl Timeline {
|
||||
initdb_lsn: metadata.initdb_lsn(),
|
||||
|
||||
current_logical_size: AtomicI64::new(0),
|
||||
logical_size_initialized: AtomicBool::new(false),
|
||||
partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))),
|
||||
repartition_threshold: 0,
|
||||
|
||||
@@ -835,6 +841,10 @@ impl Timeline {
|
||||
///
|
||||
/// This can be a slow operation.
|
||||
pub fn init_logical_size(&self) -> Result<()> {
|
||||
if self.logical_size_initialized.load(AtomicOrdering::Acquire) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Try a fast-path first:
|
||||
// Copy logical size from ancestor timeline if there has been no changes on this
|
||||
// branch, and no changes on the ancestor branch since the branch point.
|
||||
@@ -907,6 +917,8 @@ impl Timeline {
|
||||
fn set_current_logical_size(&self, new_size: u64) {
|
||||
self.current_logical_size
|
||||
.store(new_size as i64, AtomicOrdering::SeqCst);
|
||||
self.logical_size_initialized
|
||||
.store(true, AtomicOrdering::SeqCst);
|
||||
|
||||
// Also set the value in the prometheus gauge. Same race condition
|
||||
// here as in `update_current_logical_size`.
|
||||
|
||||
@@ -172,15 +172,15 @@ pub enum LocalTimelineUpdate {
|
||||
},
|
||||
Attach {
|
||||
id: ZTenantTimelineId,
|
||||
datadir: Arc<Timeline>,
|
||||
timeline: Arc<Timeline>,
|
||||
},
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for LocalTimelineUpdate {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
Self::Detach { id, .. } => f.debug_tuple("Remove").field(id).finish(),
|
||||
Self::Attach { id, .. } => f.debug_tuple("Add").field(id).finish(),
|
||||
Self::Detach { id, .. } => f.debug_tuple("Detach").field(id).finish(),
|
||||
Self::Attach { id, .. } => f.debug_tuple("Attach").field(id).finish(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -376,7 +376,10 @@ pub fn get_local_timeline_with_load(
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let repository = get_repository_for_tenant(tenant_id)?;
|
||||
match repository.get_timeline(timeline_id) {
|
||||
Some(RepositoryTimeline::Loaded(loaded_timeline)) => Ok(loaded_timeline),
|
||||
Some(RepositoryTimeline::Loaded(loaded_timeline)) => {
|
||||
loaded_timeline.init_logical_size()?;
|
||||
Ok(loaded_timeline)
|
||||
}
|
||||
_ => load_local_timeline(&repository, timeline_id)
|
||||
.with_context(|| format!("Failed to load local timeline for tenant {tenant_id}")),
|
||||
}
|
||||
@@ -435,13 +438,17 @@ pub fn detach_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> any
|
||||
|
||||
// wait for wal receivers to stop without holding the lock, because walreceiver
|
||||
// will attempt to change tenant state which is protected by the same global tenants lock.
|
||||
// TODO do we need a timeout here? how to handle it?
|
||||
// recv_timeout is broken: https://github.com/rust-lang/rust/issues/94518#issuecomment-1057440631
|
||||
// need to use crossbeam-channel
|
||||
for (timeline_id, join_handle) in walreceiver_join_handles {
|
||||
info!("waiting for wal receiver to shutdown timeline_id {timeline_id}");
|
||||
join_handle.recv().context("failed to join walreceiver")?;
|
||||
info!("wal receiver shutdown confirmed timeline_id {timeline_id}");
|
||||
}
|
||||
|
||||
tenants_state::write_tenants().remove(&tenant_id);
|
||||
|
||||
// If removal fails there will be no way to successfully retry detach,
|
||||
// because tenant no longer exists in in memory map. And it needs to be removed from it
|
||||
// before we remove files because it contains references to repository
|
||||
@@ -561,12 +568,15 @@ fn attach_downloaded_tenant(
|
||||
repo: &Repository,
|
||||
downloaded_timelines: HashSet<ZTimelineId>,
|
||||
) -> anyhow::Result<()> {
|
||||
for timeline_id in downloaded_timelines {
|
||||
// first, register timeline metadata
|
||||
// first, register timeline metadata to ensure ancestors will be found later during layer load
|
||||
for &timeline_id in &downloaded_timelines {
|
||||
repo.attach_timeline(timeline_id).with_context(|| {
|
||||
format!("Failed to load timeline {timeline_id} into in-memory repository")
|
||||
})?;
|
||||
// and then load its layers in memory
|
||||
}
|
||||
|
||||
// and then load its layers in memory
|
||||
for timeline_id in downloaded_timelines {
|
||||
let _ = load_local_timeline(repo, timeline_id).with_context(|| {
|
||||
format!(
|
||||
"Failed to register add local timeline for tenant {}",
|
||||
|
||||
@@ -269,7 +269,7 @@ async fn wal_receiver_main_thread_loop_step<'a>(
|
||||
}
|
||||
}
|
||||
// Timeline got attached, retrieve all necessary information to start its broker loop and maintain this loop endlessly.
|
||||
LocalTimelineUpdate::Attach { id, datadir } => {
|
||||
LocalTimelineUpdate::Attach { id, timeline } => {
|
||||
let timeline_connection_managers = local_timeline_wal_receivers
|
||||
.entry(id.tenant_id)
|
||||
.or_default();
|
||||
@@ -305,7 +305,7 @@ async fn wal_receiver_main_thread_loop_step<'a>(
|
||||
id,
|
||||
broker_prefix.to_owned(),
|
||||
etcd_client.clone(),
|
||||
datadir,
|
||||
timeline,
|
||||
wal_connect_timeout,
|
||||
lagging_wal_timeout,
|
||||
max_lsn_wal_lag,
|
||||
|
||||
Reference in New Issue
Block a user