mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 02:12:56 +00:00
Use single map to manage timeline data
This commit is contained in:
committed by
Kirill Bulatov
parent
5522fbab25
commit
631cbf5b1b
@@ -4,6 +4,7 @@
|
||||
use crate::config::PageServerConf;
|
||||
use crate::http::models::TenantInfo;
|
||||
use crate::layered_repository::{load_metadata, Repository, Timeline};
|
||||
use crate::repository::RepositoryTimeline;
|
||||
use crate::storage_sync::index::{RemoteIndex, RemoteTimelineIndex};
|
||||
use crate::storage_sync::{self, LocalTimelineInitStatus, SyncStartupData};
|
||||
use crate::tenant_config::TenantConfOpt;
|
||||
@@ -94,12 +95,6 @@ struct Tenant {
|
||||
state: TenantState,
|
||||
/// Contains in-memory state, including the timeline that might not yet flushed on disk or loaded form disk.
|
||||
repo: Arc<Repository>,
|
||||
/// Timelines, located locally in the pageserver's datadir.
|
||||
/// Timelines can entirely be removed entirely by the `detach` operation only.
|
||||
///
|
||||
/// Local timelines have more metadata that's loaded into memory,
|
||||
/// that is located in the `repo.timelines` field, [`crate::layered_repository::LayeredTimelineEntry`].
|
||||
local_timelines: HashMap<ZTimelineId, Arc<Timeline>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
|
||||
@@ -288,7 +283,6 @@ pub fn create_tenant_repository(
|
||||
v.insert(Tenant {
|
||||
state: TenantState::Idle,
|
||||
repo,
|
||||
local_timelines: HashMap::new(),
|
||||
});
|
||||
Ok(Some(tenant_id))
|
||||
}
|
||||
@@ -379,20 +373,11 @@ pub fn get_local_timeline_with_load(
|
||||
tenant_id: ZTenantId,
|
||||
timeline_id: ZTimelineId,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let mut m = tenants_state::write_tenants();
|
||||
let tenant = m
|
||||
.get_mut(&tenant_id)
|
||||
.with_context(|| format!("Tenant {tenant_id} not found"))?;
|
||||
|
||||
if let Some(page_tline) = tenant.local_timelines.get(&timeline_id) {
|
||||
Ok(Arc::clone(page_tline))
|
||||
} else {
|
||||
let page_tline = load_local_timeline(&tenant.repo, timeline_id)
|
||||
.with_context(|| format!("Failed to load local timeline for tenant {tenant_id}"))?;
|
||||
tenant
|
||||
.local_timelines
|
||||
.insert(timeline_id, Arc::clone(&page_tline));
|
||||
Ok(page_tline)
|
||||
let repository = get_repository_for_tenant(tenant_id)?;
|
||||
match repository.get_timeline(timeline_id) {
|
||||
Some(RepositoryTimeline::Loaded(loaded_timeline)) => Ok(loaded_timeline),
|
||||
_ => load_local_timeline(&repository, timeline_id)
|
||||
.with_context(|| format!("Failed to load local timeline for tenant {tenant_id}")),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -419,10 +404,7 @@ pub fn delete_timeline(tenant_id: ZTenantId, timeline_id: ZTimelineId) -> anyhow
|
||||
thread_mgr::shutdown_threads(None, None, Some(timeline_id));
|
||||
debug!("thread shutdown completed");
|
||||
match tenants_state::write_tenants().get_mut(&tenant_id) {
|
||||
Some(tenant) => {
|
||||
tenant.repo.delete_timeline(timeline_id)?;
|
||||
tenant.local_timelines.remove(&timeline_id);
|
||||
}
|
||||
Some(tenant) => tenant.repo.delete_timeline(timeline_id)?,
|
||||
None => anyhow::bail!("Tenant {tenant_id} not found in local tenant state"),
|
||||
}
|
||||
|
||||
@@ -434,37 +416,31 @@ pub fn detach_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> any
|
||||
// shutdown the tenant and timeline threads: gc, compaction, page service threads)
|
||||
thread_mgr::shutdown_threads(None, Some(tenant_id), None);
|
||||
|
||||
// FIXME should we protect somehow from starting new threads/walreceivers when tenant is in stopping state?
|
||||
// send stop signal to wal receiver and collect join handles while holding the lock
|
||||
let walreceiver_join_handles = {
|
||||
let tenants = tenants_state::write_tenants();
|
||||
let tenant = tenants.get(&tenant_id).context("tenant not found")?;
|
||||
let mut walreceiver_join_handles = Vec::with_capacity(tenant.local_timelines.len());
|
||||
for timeline_id in tenant.local_timelines.keys() {
|
||||
let mut walreceiver_join_handles = Vec::new();
|
||||
let removed_tenant = {
|
||||
let mut tenants_accessor = tenants_state::write_tenants();
|
||||
tenants_accessor.remove(&tenant_id)
|
||||
};
|
||||
if let Some(tenant) = removed_tenant {
|
||||
for (timeline_id, _) in tenant.repo.list_timelines() {
|
||||
let (sender, receiver) = std::sync::mpsc::channel::<()>();
|
||||
tenants_state::try_send_timeline_update(LocalTimelineUpdate::Detach {
|
||||
id: ZTenantTimelineId::new(tenant_id, *timeline_id),
|
||||
id: ZTenantTimelineId::new(tenant_id, timeline_id),
|
||||
join_confirmation_sender: sender,
|
||||
});
|
||||
walreceiver_join_handles.push((*timeline_id, receiver));
|
||||
walreceiver_join_handles.push((timeline_id, receiver));
|
||||
}
|
||||
// drop the tenants lock
|
||||
walreceiver_join_handles
|
||||
};
|
||||
}
|
||||
|
||||
// wait for wal receivers to stop without holding the lock, because walreceiver
|
||||
// will attempt to change tenant state which is protected by the same global tenants lock.
|
||||
// TODO do we need a timeout here? how to handle it?
|
||||
// recv_timeout is broken: https://github.com/rust-lang/rust/issues/94518#issuecomment-1057440631
|
||||
// need to use crossbeam-channel
|
||||
for (timeline_id, join_handle) in walreceiver_join_handles {
|
||||
info!("waiting for wal receiver to shutdown timeline_id {timeline_id}");
|
||||
join_handle.recv().context("failed to join walreceiver")?;
|
||||
info!("wal receiver shutdown confirmed timeline_id {timeline_id}");
|
||||
}
|
||||
|
||||
tenants_state::write_tenants().remove(&tenant_id);
|
||||
|
||||
// If removal fails there will be no way to successfully retry detach,
|
||||
// because tenant no longer exists in in memory map. And it needs to be removed from it
|
||||
// before we remove files because it contains references to repository
|
||||
@@ -590,34 +566,18 @@ fn attach_downloaded_tenant(
|
||||
repo: &Repository,
|
||||
downloaded_timelines: HashSet<ZTimelineId>,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut registration_queue = Vec::with_capacity(downloaded_timelines.len());
|
||||
|
||||
// first need to register the in-mem representations, to avoid missing ancestors during the local disk data registration
|
||||
for timeline_id in downloaded_timelines {
|
||||
// first, register timeline metadata
|
||||
repo.attach_timeline(timeline_id).with_context(|| {
|
||||
format!("Failed to load timeline {timeline_id} into in-memory repository")
|
||||
})?;
|
||||
registration_queue.push(timeline_id);
|
||||
}
|
||||
|
||||
for timeline_id in registration_queue {
|
||||
let tenant_id = repo.tenant_id();
|
||||
match tenants_state::write_tenants().get_mut(&tenant_id) {
|
||||
Some(tenant) => match tenant.local_timelines.entry(timeline_id) {
|
||||
Entry::Occupied(_) => {
|
||||
anyhow::bail!("Local timeline {timeline_id} already registered")
|
||||
}
|
||||
Entry::Vacant(v) => {
|
||||
v.insert(load_local_timeline(repo, timeline_id).with_context(|| {
|
||||
format!("Failed to register add local timeline for tenant {tenant_id}")
|
||||
})?);
|
||||
}
|
||||
},
|
||||
None => anyhow::bail!(
|
||||
"Tenant {} not found in local tenant state",
|
||||
repo.tenant_id()
|
||||
),
|
||||
}
|
||||
// and then load its layers in memory
|
||||
let _ = load_local_timeline(repo, timeline_id).with_context(|| {
|
||||
format!(
|
||||
"Failed to register add local timeline for tenant {}",
|
||||
repo.tenant_id(),
|
||||
)
|
||||
})?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -647,7 +607,6 @@ fn load_local_repo(
|
||||
Tenant {
|
||||
state: TenantState::Idle,
|
||||
repo,
|
||||
local_timelines: HashMap::new(),
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
Reference in New Issue
Block a user