mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-29 11:00:38 +00:00
Merge remote-tracking branch 'origin/main' into problame/configurable-one-runtime
This commit is contained in:
@@ -18,6 +18,7 @@ use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING};
|
||||
use pageserver::task_mgr::WALRECEIVER_RUNTIME;
|
||||
use pageserver::tenant::{secondary, TenantSharedResources};
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use tokio::signal::unix::SignalKind;
|
||||
use tokio::time::Instant;
|
||||
use tracing::*;
|
||||
|
||||
@@ -671,46 +672,37 @@ fn start_pageserver(
|
||||
let mut shutdown_pageserver = Some(shutdown_pageserver.drop_guard());
|
||||
|
||||
// All started up! Now just sit and wait for shutdown signal.
|
||||
{
|
||||
use signal_hook::consts::*;
|
||||
let signal_handler = BACKGROUND_RUNTIME.spawn_blocking(move || {
|
||||
let mut signals =
|
||||
signal_hook::iterator::Signals::new([SIGINT, SIGTERM, SIGQUIT]).unwrap();
|
||||
return signals
|
||||
.forever()
|
||||
.next()
|
||||
.expect("forever() never returns None unless explicitly closed");
|
||||
});
|
||||
let signal = BACKGROUND_RUNTIME
|
||||
// NB: in `NEON_PAGESERVER_USE_ONE_RUNTIME=current_thread`, this
|
||||
// is where the executor is actually driven. In multi-threaded runtime
|
||||
// modes, the executor threads are spawned internally, so, async execution
|
||||
// is driven even before we reach here.
|
||||
.block_on(signal_handler)
|
||||
.expect("join error");
|
||||
match signal {
|
||||
SIGQUIT => {
|
||||
info!("Got signal {signal}. Terminating in immediate shutdown mode",);
|
||||
std::process::exit(111);
|
||||
}
|
||||
SIGINT | SIGTERM => {
|
||||
info!("Got signal {signal}. Terminating gracefully in fast shutdown mode",);
|
||||
|
||||
// This cancels the `shutdown_pageserver` cancellation tree.
|
||||
// Right now that tree doesn't reach very far, and `task_mgr` is used instead.
|
||||
// The plan is to change that over time.
|
||||
shutdown_pageserver.take();
|
||||
let bg_remote_storage = remote_storage.clone();
|
||||
let bg_deletion_queue = deletion_queue.clone();
|
||||
BACKGROUND_RUNTIME.block_on(pageserver::shutdown_pageserver(
|
||||
&tenant_manager,
|
||||
bg_remote_storage.map(|_| bg_deletion_queue),
|
||||
0,
|
||||
));
|
||||
unreachable!()
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
{
|
||||
BACKGROUND_RUNTIME.block_on(async move {
|
||||
let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt()).unwrap();
|
||||
let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate()).unwrap();
|
||||
let mut sigquit = tokio::signal::unix::signal(SignalKind::quit()).unwrap();
|
||||
let signal = tokio::select! {
|
||||
_ = sigquit.recv() => {
|
||||
info!("Got signal SIGQUIT. Terminating in immediate shutdown mode",);
|
||||
std::process::exit(111);
|
||||
}
|
||||
_ = sigint.recv() => { "SIGINT" },
|
||||
_ = sigterm.recv() => { "SIGTERM" },
|
||||
};
|
||||
|
||||
info!("Got signal {signal}. Terminating gracefully in fast shutdown mode",);
|
||||
|
||||
// This cancels the `shutdown_pageserver` cancellation tree.
|
||||
// Right now that tree doesn't reach very far, and `task_mgr` is used instead.
|
||||
// The plan is to change that over time.
|
||||
shutdown_pageserver.take();
|
||||
let bg_remote_storage = remote_storage.clone();
|
||||
let bg_deletion_queue = deletion_queue.clone();
|
||||
pageserver::shutdown_pageserver(
|
||||
&tenant_manager,
|
||||
bg_remote_storage.map(|_| bg_deletion_queue),
|
||||
0,
|
||||
)
|
||||
.await;
|
||||
unreachable!()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@ use pageserver_api::{
|
||||
use serde::{de::DeserializeOwned, Serialize};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use url::Url;
|
||||
use utils::{backoff, generation::Generation, id::NodeId};
|
||||
use utils::{backoff, failpoint_support, generation::Generation, id::NodeId};
|
||||
|
||||
use crate::{
|
||||
config::{NodeMetadata, PageServerConf},
|
||||
@@ -210,7 +210,10 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient {
|
||||
.collect(),
|
||||
};
|
||||
|
||||
fail::fail_point!("control-plane-client-validate");
|
||||
failpoint_support::sleep_millis_async!("control-plane-client-validate-sleep", &self.cancel);
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(RetryForeverError::ShuttingDown);
|
||||
}
|
||||
|
||||
let response: ValidateResponse = self.retry_http_forever(&re_attach_path, request).await?;
|
||||
|
||||
|
||||
@@ -1629,7 +1629,7 @@ components:
|
||||
type: integer
|
||||
format: int64
|
||||
minimum: 0
|
||||
description: The amount of disk space currently utilized by layer files.
|
||||
description: The amount of disk space currently used.
|
||||
free_space_bytes:
|
||||
type: integer
|
||||
format: int64
|
||||
|
||||
@@ -8,6 +8,7 @@ use anyhow::{bail, ensure, Context, Result};
|
||||
use bytes::Bytes;
|
||||
use camino::Utf8Path;
|
||||
use futures::StreamExt;
|
||||
use pageserver_api::key::rel_block_to_key;
|
||||
use tokio::io::{AsyncRead, AsyncReadExt};
|
||||
use tokio_tar::Archive;
|
||||
use tracing::*;
|
||||
@@ -170,7 +171,10 @@ async fn import_rel(
|
||||
let r = reader.read_exact(&mut buf).await;
|
||||
match r {
|
||||
Ok(_) => {
|
||||
modification.put_rel_page_image(rel, blknum, Bytes::copy_from_slice(&buf))?;
|
||||
let key = rel_block_to_key(rel, blknum);
|
||||
if modification.tline.get_shard_identity().is_key_local(&key) {
|
||||
modification.put_rel_page_image(rel, blknum, Bytes::copy_from_slice(&buf))?;
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: UnexpectedEof is expected
|
||||
|
||||
@@ -593,14 +593,14 @@ impl RemoteTimelineClient {
|
||||
upload_queue: &mut UploadQueueInitialized,
|
||||
metadata: TimelineMetadata,
|
||||
) {
|
||||
let disk_consistent_lsn = upload_queue.latest_metadata.disk_consistent_lsn();
|
||||
|
||||
info!(
|
||||
"scheduling metadata upload with {} files ({} changed)",
|
||||
"scheduling metadata upload up to consistent LSN {disk_consistent_lsn} with {} files ({} changed)",
|
||||
upload_queue.latest_files.len(),
|
||||
upload_queue.latest_files_changes_since_metadata_upload_scheduled,
|
||||
);
|
||||
|
||||
let disk_consistent_lsn = upload_queue.latest_metadata.disk_consistent_lsn();
|
||||
|
||||
let index_part = IndexPart::new(
|
||||
upload_queue.latest_files.clone(),
|
||||
disk_consistent_lsn,
|
||||
|
||||
@@ -15,11 +15,23 @@ pub(crate) fn regenerate(tenants_path: &Path) -> anyhow::Result<PageserverUtiliz
|
||||
.map_err(std::io::Error::from)
|
||||
.context("statvfs tenants directory")?;
|
||||
|
||||
let blocksz = statvfs.block_size();
|
||||
// https://unix.stackexchange.com/a/703650
|
||||
let blocksz = if statvfs.fragment_size() > 0 {
|
||||
statvfs.fragment_size()
|
||||
} else {
|
||||
statvfs.block_size()
|
||||
};
|
||||
|
||||
#[cfg_attr(not(target_os = "macos"), allow(clippy::unnecessary_cast))]
|
||||
let free = statvfs.blocks_available() as u64 * blocksz;
|
||||
let used = crate::metrics::RESIDENT_PHYSICAL_SIZE_GLOBAL.get();
|
||||
|
||||
#[cfg_attr(not(target_os = "macos"), allow(clippy::unnecessary_cast))]
|
||||
let used = statvfs
|
||||
.blocks()
|
||||
// use blocks_free instead of available here to match df in case someone compares
|
||||
.saturating_sub(statvfs.blocks_free()) as u64
|
||||
* blocksz;
|
||||
|
||||
let captured_at = std::time::SystemTime::now();
|
||||
|
||||
let doc = PageserverUtilization {
|
||||
|
||||
Reference in New Issue
Block a user