mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 05:22:56 +00:00
Properly shutdown storage sync loop
This commit is contained in:
committed by
Kirill Bulatov
parent
ce8d6ae958
commit
23cf2fa984
@@ -195,9 +195,10 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
|
||||
}
|
||||
|
||||
let signals = signals::install_shutdown_handlers()?;
|
||||
let (async_shutdown_tx, async_shutdown_rx) = tokio::sync::watch::channel(());
|
||||
let mut threads = Vec::new();
|
||||
|
||||
let sync_startup = remote_storage::start_local_timeline_sync(conf)
|
||||
let sync_startup = remote_storage::start_local_timeline_sync(conf, async_shutdown_rx)
|
||||
.context("Failed to set up local files sync with external storage")?;
|
||||
|
||||
if let Some(handle) = sync_startup.sync_loop_handle {
|
||||
@@ -255,6 +256,7 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
|
||||
signal.name()
|
||||
);
|
||||
|
||||
async_shutdown_tx.send(())?;
|
||||
postgres_backend::set_pgbackend_shutdown_requested();
|
||||
tenant_mgr::shutdown_all_tenants()?;
|
||||
endpoint::shutdown();
|
||||
|
||||
@@ -45,14 +45,16 @@ impl BranchInfo {
|
||||
repo: &Arc<dyn Repository>,
|
||||
include_non_incremental_logical_size: bool,
|
||||
) -> Result<Self> {
|
||||
let name = path
|
||||
.as_ref()
|
||||
.file_name()
|
||||
.unwrap()
|
||||
.to_str()
|
||||
.unwrap()
|
||||
.to_string();
|
||||
let timeline_id = std::fs::read_to_string(path)?.parse::<ZTimelineId>()?;
|
||||
let path = path.as_ref();
|
||||
let name = path.file_name().unwrap().to_string_lossy().to_string();
|
||||
let timeline_id = std::fs::read_to_string(path)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to read branch file contents at path '{}'",
|
||||
path.display()
|
||||
)
|
||||
})?
|
||||
.parse::<ZTimelineId>()?;
|
||||
|
||||
let timeline = match repo.get_timeline(timeline_id)? {
|
||||
RepositoryTimeline::Local(local_entry) => local_entry,
|
||||
|
||||
@@ -93,7 +93,7 @@ use std::{
|
||||
};
|
||||
|
||||
use anyhow::{bail, Context};
|
||||
use tokio::io;
|
||||
use tokio::{io, sync};
|
||||
use tracing::{error, info};
|
||||
use zenith_utils::zid::{ZTenantId, ZTimelineId};
|
||||
|
||||
@@ -135,6 +135,7 @@ pub struct SyncStartupData {
|
||||
/// Along with that, scans tenant files local and remote (if the sync gets enabled) to check the initial timeline states.
|
||||
pub fn start_local_timeline_sync(
|
||||
config: &'static PageServerConf,
|
||||
shutdown_hook: sync::watch::Receiver<()>,
|
||||
) -> anyhow::Result<SyncStartupData> {
|
||||
let local_timeline_files = local_tenant_timeline_files(config)
|
||||
.context("Failed to collect local tenant timeline files")?;
|
||||
@@ -142,6 +143,7 @@ pub fn start_local_timeline_sync(
|
||||
match &config.remote_storage_config {
|
||||
Some(storage_config) => match &storage_config.storage {
|
||||
RemoteStorageKind::LocalFs(root) => storage_sync::spawn_storage_sync_thread(
|
||||
shutdown_hook,
|
||||
config,
|
||||
local_timeline_files,
|
||||
LocalFs::new(root.clone(), &config.workdir)?,
|
||||
@@ -149,6 +151,7 @@ pub fn start_local_timeline_sync(
|
||||
storage_config.max_sync_errors,
|
||||
),
|
||||
RemoteStorageKind::AwsS3(s3_config) => storage_sync::spawn_storage_sync_thread(
|
||||
shutdown_hook,
|
||||
config,
|
||||
local_timeline_files,
|
||||
S3::new(s3_config, &config.workdir)?,
|
||||
|
||||
@@ -86,10 +86,15 @@ use std::{
|
||||
use anyhow::{bail, Context};
|
||||
use futures::stream::{FuturesUnordered, StreamExt};
|
||||
use lazy_static::lazy_static;
|
||||
use tokio::{fs, sync::RwLock};
|
||||
use tokio::{
|
||||
sync::mpsc::{self, UnboundedReceiver},
|
||||
time::Instant,
|
||||
fs,
|
||||
runtime::Runtime,
|
||||
sync::{
|
||||
mpsc::{self, UnboundedReceiver},
|
||||
watch::Receiver,
|
||||
RwLock,
|
||||
},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tracing::*;
|
||||
|
||||
@@ -346,6 +351,7 @@ pub(super) fn spawn_storage_sync_thread<
|
||||
P: std::fmt::Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<StoragePath = P> + Send + Sync + 'static,
|
||||
>(
|
||||
shutdown_hook: Receiver<()>,
|
||||
conf: &'static PageServerConf,
|
||||
local_timeline_files: HashMap<TimelineSyncId, (TimelineMetadata, Vec<PathBuf>)>,
|
||||
storage: S,
|
||||
@@ -384,6 +390,7 @@ pub(super) fn spawn_storage_sync_thread<
|
||||
.spawn(move || {
|
||||
storage_sync_loop(
|
||||
runtime,
|
||||
shutdown_hook,
|
||||
conf,
|
||||
receiver,
|
||||
remote_index,
|
||||
@@ -399,11 +406,18 @@ pub(super) fn spawn_storage_sync_thread<
|
||||
})
|
||||
}
|
||||
|
||||
enum LoopStep {
|
||||
NewStates(HashMap<ZTenantId, HashMap<ZTimelineId, TimelineSyncState>>),
|
||||
Shutdown,
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn storage_sync_loop<
|
||||
P: std::fmt::Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<StoragePath = P> + Send + Sync + 'static,
|
||||
>(
|
||||
runtime: tokio::runtime::Runtime,
|
||||
runtime: Runtime,
|
||||
mut shutdown_hook: Receiver<()>,
|
||||
conf: &'static PageServerConf,
|
||||
mut receiver: UnboundedReceiver<SyncTask>,
|
||||
index: RemoteTimelineIndex,
|
||||
@@ -413,19 +427,28 @@ fn storage_sync_loop<
|
||||
) -> anyhow::Result<()> {
|
||||
let remote_assets = Arc::new((storage, RwLock::new(index)));
|
||||
while !crate::tenant_mgr::shutdown_requested() {
|
||||
let new_timeline_states = runtime.block_on(
|
||||
loop_step(
|
||||
conf,
|
||||
&mut receiver,
|
||||
Arc::clone(&remote_assets),
|
||||
max_concurrent_sync,
|
||||
max_sync_errors,
|
||||
)
|
||||
.instrument(debug_span!("storage_sync_loop_step")),
|
||||
);
|
||||
// Batch timeline download registration to ensure that the external registration code won't block any running tasks before.
|
||||
set_timeline_states(conf, new_timeline_states);
|
||||
debug!("Sync loop step completed");
|
||||
let loop_step = runtime.block_on(async {
|
||||
tokio::select! {
|
||||
new_timeline_states = loop_step(
|
||||
conf,
|
||||
&mut receiver,
|
||||
Arc::clone(&remote_assets),
|
||||
max_concurrent_sync,
|
||||
max_sync_errors,
|
||||
)
|
||||
.instrument(debug_span!("storage_sync_loop_step")) => LoopStep::NewStates(new_timeline_states),
|
||||
_ = shutdown_hook.changed() => LoopStep::Shutdown,
|
||||
}
|
||||
});
|
||||
|
||||
match loop_step {
|
||||
LoopStep::NewStates(new_timeline_states) => {
|
||||
// Batch timeline download registration to ensure that the external registration code won't block any running tasks before.
|
||||
set_timeline_states(conf, new_timeline_states);
|
||||
debug!("Sync loop step completed");
|
||||
}
|
||||
LoopStep::Shutdown => {}
|
||||
}
|
||||
}
|
||||
|
||||
debug!("Shutdown requested, stopping");
|
||||
@@ -539,7 +562,7 @@ async fn process_task<
|
||||
"Waiting {} seconds before starting the task",
|
||||
seconds_to_wait
|
||||
);
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs_f64(seconds_to_wait)).await;
|
||||
tokio::time::sleep(Duration::from_secs_f64(seconds_to_wait)).await;
|
||||
}
|
||||
|
||||
let sync_start = Instant::now();
|
||||
|
||||
@@ -202,7 +202,7 @@ async fn try_download_archive<
|
||||
archive_to_download.disk_consistent_lsn(),
|
||||
local_metadata.disk_consistent_lsn()
|
||||
),
|
||||
Err(e) => warn!("Failed to read local metadata file, assuing it's safe to override its with the download. Read: {:#}", e),
|
||||
Err(e) => warn!("Failed to read local metadata file, assuming it's safe to override its with the download. Read: {:#}", e),
|
||||
}
|
||||
compression::uncompress_file_stream_with_index(
|
||||
conf.timeline_path(&timeline_id, &tenant_id),
|
||||
|
||||
@@ -198,7 +198,7 @@ pub fn get_repository_for_tenant(tenantid: ZTenantId) -> Result<Arc<dyn Reposito
|
||||
|
||||
match &tenant.repo {
|
||||
Some(repo) => Ok(Arc::clone(repo)),
|
||||
None => anyhow::bail!("Repository for tenant {} is not yet valid", tenantid),
|
||||
None => bail!("Repository for tenant {} is not yet valid", tenantid),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user