mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 22:12:56 +00:00
Move walreceiver timeline registration into layered_repository
This commit is contained in:
committed by
Kirill Bulatov
parent
631cbf5b1b
commit
32be8739b9
@@ -13,6 +13,7 @@
|
||||
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use tracing::*;
|
||||
use utils::zid::ZTenantTimelineId;
|
||||
|
||||
use std::cmp::min;
|
||||
use std::collections::hash_map::Entry;
|
||||
@@ -32,6 +33,7 @@ use crate::storage_sync::index::RemoteIndex;
|
||||
use crate::tenant_config::{TenantConf, TenantConfOpt};
|
||||
|
||||
use crate::repository::{GcResult, RepositoryTimeline};
|
||||
use crate::tenant_mgr::LocalTimelineUpdate;
|
||||
use crate::thread_mgr;
|
||||
use crate::walredo::WalRedoManager;
|
||||
use crate::CheckpointConfig;
|
||||
@@ -125,8 +127,11 @@ impl Repository {
|
||||
/// Get Timeline handle for given zenith timeline ID.
|
||||
/// This function is idempotent. It doesn't change internal state in any way.
|
||||
pub fn get_timeline(&self, timelineid: ZTimelineId) -> Option<RepositoryTimeline<Timeline>> {
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
self.get_timeline_internal(timelineid, &timelines)
|
||||
self.timelines
|
||||
.lock()
|
||||
.unwrap()
|
||||
.get(&timelineid)
|
||||
.cloned()
|
||||
.map(RepositoryTimeline::from)
|
||||
}
|
||||
|
||||
@@ -198,6 +203,11 @@ impl Repository {
|
||||
let timeline = Arc::new(timeline);
|
||||
vacant_timeline_entry.insert(LayeredTimelineEntry::Loaded(Arc::clone(&timeline)));
|
||||
|
||||
crate::tenant_mgr::try_send_timeline_update(LocalTimelineUpdate::Attach {
|
||||
id: ZTenantTimelineId::new(self.tenant_id(), timeline_id),
|
||||
datadir: Arc::clone(&timeline),
|
||||
});
|
||||
|
||||
Ok(timeline)
|
||||
}
|
||||
|
||||
@@ -540,45 +550,34 @@ impl Repository {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Implementation of the public `get_timeline` function.
|
||||
// Differences from the public:
|
||||
// * interface in that the caller must already hold the mutex on the 'timelines' hashmap.
|
||||
fn get_timeline_internal(
|
||||
&self,
|
||||
timelineid: ZTimelineId,
|
||||
timelines: &HashMap<ZTimelineId, LayeredTimelineEntry>,
|
||||
) -> Option<LayeredTimelineEntry> {
|
||||
timelines.get(&timelineid).cloned()
|
||||
}
|
||||
|
||||
// Implementation of the public `get_timeline_load` function.
|
||||
// Differences from the public:
|
||||
// * interface in that the caller must already hold the mutex on the 'timelines' hashmap.
|
||||
fn get_timeline_load_internal(
|
||||
&self,
|
||||
timelineid: ZTimelineId,
|
||||
timeline_id: ZTimelineId,
|
||||
timelines: &mut HashMap<ZTimelineId, LayeredTimelineEntry>,
|
||||
) -> anyhow::Result<Option<Arc<Timeline>>> {
|
||||
match timelines.get(&timelineid) {
|
||||
match timelines.get(&timeline_id) {
|
||||
Some(entry) => match entry {
|
||||
LayeredTimelineEntry::Loaded(local_timeline) => {
|
||||
debug!("timeline {} found loaded into memory", &timelineid);
|
||||
debug!("timeline {timeline_id} found loaded into memory");
|
||||
return Ok(Some(Arc::clone(local_timeline)));
|
||||
}
|
||||
LayeredTimelineEntry::Unloaded { .. } => {}
|
||||
},
|
||||
None => {
|
||||
debug!("timeline {} not found", &timelineid);
|
||||
debug!("timeline {timeline_id} not found");
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
debug!(
|
||||
"timeline {} found on a local disk, but not loaded into the memory, loading",
|
||||
&timelineid
|
||||
&timeline_id
|
||||
);
|
||||
let timeline = self.load_local_timeline(timelineid, timelines)?;
|
||||
let timeline = self.load_local_timeline(timeline_id, timelines)?;
|
||||
let was_loaded = timelines.insert(
|
||||
timelineid,
|
||||
timeline_id,
|
||||
LayeredTimelineEntry::Loaded(Arc::clone(&timeline)),
|
||||
);
|
||||
ensure!(
|
||||
@@ -586,6 +585,10 @@ impl Repository {
|
||||
|| matches!(was_loaded, Some(LayeredTimelineEntry::Unloaded { .. })),
|
||||
"assertion failure, inserted wrong timeline in an incorrect state"
|
||||
);
|
||||
crate::tenant_mgr::try_send_timeline_update(LocalTimelineUpdate::Attach {
|
||||
id: ZTenantTimelineId::new(self.tenant_id(), timeline_id),
|
||||
datadir: Arc::clone(&timeline),
|
||||
});
|
||||
Ok(Some(timeline))
|
||||
}
|
||||
|
||||
|
||||
@@ -21,6 +21,7 @@ use tokio::sync::mpsc;
|
||||
use tracing::*;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
pub use tenants_state::try_send_timeline_update;
|
||||
use utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId};
|
||||
|
||||
mod tenants_state {
|
||||
@@ -68,7 +69,7 @@ mod tenants_state {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(super) fn try_send_timeline_update(update: LocalTimelineUpdate) {
|
||||
pub fn try_send_timeline_update(update: LocalTimelineUpdate) {
|
||||
match TIMELINE_UPDATE_SENDER
|
||||
.read()
|
||||
.expect("Failed to read() timeline_update_sender lock, it got poisoned")
|
||||
@@ -466,12 +467,6 @@ fn load_local_timeline(
|
||||
format!("Inmem timeline {timeline_id} not found in tenant's repository")
|
||||
})?;
|
||||
inmem_timeline.init_logical_size()?;
|
||||
|
||||
tenants_state::try_send_timeline_update(LocalTimelineUpdate::Attach {
|
||||
id: ZTenantTimelineId::new(repo.tenant_id(), timeline_id),
|
||||
datadir: Arc::clone(&inmem_timeline),
|
||||
});
|
||||
|
||||
Ok(inmem_timeline)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user