mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 05:50:38 +00:00
Merge branch 'main' into releases/2023-11-17
This commit is contained in:
@@ -479,13 +479,6 @@ fn cli() -> clap::Command {
|
||||
)
|
||||
.value_name("FILECACHE_CONNSTR"),
|
||||
)
|
||||
.arg(
|
||||
// DEPRECATED, NO LONGER DOES ANYTHING.
|
||||
// See https://github.com/neondatabase/cloud/issues/7516
|
||||
Arg::new("file-cache-on-disk")
|
||||
.long("file-cache-on-disk")
|
||||
.action(clap::ArgAction::SetTrue),
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -638,7 +638,7 @@ const STORAGE_IO_TIME_BUCKETS: &[f64] = &[
|
||||
///
|
||||
/// Operations:
|
||||
/// - open ([`std::fs::OpenOptions::open`])
|
||||
/// - close (dropping [`std::fs::File`])
|
||||
/// - close (dropping [`crate::virtual_file::VirtualFile`])
|
||||
/// - close-by-replace (close by replacement algorithm)
|
||||
/// - read (`read_at`)
|
||||
/// - write (`write_at`)
|
||||
|
||||
@@ -291,6 +291,16 @@ impl From<harness::TestRedoManager> for WalRedoManager {
|
||||
}
|
||||
|
||||
impl WalRedoManager {
|
||||
pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
|
||||
match self {
|
||||
Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout),
|
||||
#[cfg(test)]
|
||||
Self::Test(_) => {
|
||||
// Not applicable to test redo manager
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn request_redo(
|
||||
&self,
|
||||
key: crate::repository::Key,
|
||||
@@ -1649,22 +1659,16 @@ impl Tenant {
|
||||
/// This function is periodically called by compactor task.
|
||||
/// Also it can be explicitly requested per timeline through page server
|
||||
/// api's 'compact' command.
|
||||
pub async fn compaction_iteration(
|
||||
async fn compaction_iteration(
|
||||
&self,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
// Don't start doing work during shutdown
|
||||
if let TenantState::Stopping { .. } = self.current_state() {
|
||||
) -> anyhow::Result<(), timeline::CompactionError> {
|
||||
// Don't start doing work during shutdown, or when broken, we do not need those in the logs
|
||||
if !self.is_active() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// We should only be called once the tenant has activated.
|
||||
anyhow::ensure!(
|
||||
self.is_active(),
|
||||
"Cannot run compaction iteration on inactive tenant"
|
||||
);
|
||||
|
||||
{
|
||||
let conf = self.tenant_conf.read().unwrap();
|
||||
if !conf.location.may_delete_layers_hint() || !conf.location.may_upload_layers_hint() {
|
||||
|
||||
@@ -289,7 +289,9 @@ impl DeltaLayer {
|
||||
async fn load_inner(&self, ctx: &RequestContext) -> Result<Arc<DeltaLayerInner>> {
|
||||
let path = self.path();
|
||||
|
||||
let loaded = DeltaLayerInner::load(&path, None, ctx).await?;
|
||||
let loaded = DeltaLayerInner::load(&path, None, ctx)
|
||||
.await
|
||||
.and_then(|res| res)?;
|
||||
|
||||
// not production code
|
||||
let actual_filename = path.file_name().unwrap().to_owned();
|
||||
@@ -610,18 +612,28 @@ impl Drop for DeltaLayerWriter {
|
||||
}
|
||||
|
||||
impl DeltaLayerInner {
|
||||
/// Returns nested result following Result<Result<_, OpErr>, Critical>:
|
||||
/// - inner has the success or transient failure
|
||||
/// - outer has the permanent failure
|
||||
pub(super) async fn load(
|
||||
path: &Utf8Path,
|
||||
summary: Option<Summary>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Self> {
|
||||
let file = VirtualFile::open(path)
|
||||
.await
|
||||
.with_context(|| format!("Failed to open file '{path}'"))?;
|
||||
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
|
||||
let file = match VirtualFile::open(path).await {
|
||||
Ok(file) => file,
|
||||
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
|
||||
};
|
||||
let file = FileBlockReader::new(file);
|
||||
|
||||
let summary_blk = file.read_blk(0, ctx).await?;
|
||||
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
|
||||
let summary_blk = match file.read_blk(0, ctx).await {
|
||||
Ok(blk) => blk,
|
||||
Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
|
||||
};
|
||||
|
||||
// TODO: this should be an assertion instead; see ImageLayerInner::load
|
||||
let actual_summary =
|
||||
Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
|
||||
|
||||
if let Some(mut expected_summary) = summary {
|
||||
// production code path
|
||||
@@ -636,11 +648,11 @@ impl DeltaLayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(DeltaLayerInner {
|
||||
Ok(Ok(DeltaLayerInner {
|
||||
file,
|
||||
index_start_blk: actual_summary.index_start_blk,
|
||||
index_root_blk: actual_summary.index_root_blk,
|
||||
})
|
||||
}))
|
||||
}
|
||||
|
||||
pub(super) async fn get_value_reconstruct_data(
|
||||
|
||||
@@ -249,7 +249,9 @@ impl ImageLayer {
|
||||
async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
|
||||
let path = self.path();
|
||||
|
||||
let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, ctx).await?;
|
||||
let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, ctx)
|
||||
.await
|
||||
.and_then(|res| res)?;
|
||||
|
||||
// not production code
|
||||
let actual_filename = path.file_name().unwrap().to_owned();
|
||||
@@ -295,18 +297,31 @@ impl ImageLayer {
|
||||
}
|
||||
|
||||
impl ImageLayerInner {
|
||||
/// Returns nested result following Result<Result<_, OpErr>, Critical>:
|
||||
/// - inner has the success or transient failure
|
||||
/// - outer has the permanent failure
|
||||
pub(super) async fn load(
|
||||
path: &Utf8Path,
|
||||
lsn: Lsn,
|
||||
summary: Option<Summary>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Self> {
|
||||
let file = VirtualFile::open(path)
|
||||
.await
|
||||
.with_context(|| format!("Failed to open file '{}'", path))?;
|
||||
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
|
||||
let file = match VirtualFile::open(path).await {
|
||||
Ok(file) => file,
|
||||
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
|
||||
};
|
||||
let file = FileBlockReader::new(file);
|
||||
let summary_blk = file.read_blk(0, ctx).await?;
|
||||
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
|
||||
let summary_blk = match file.read_blk(0, ctx).await {
|
||||
Ok(blk) => blk,
|
||||
Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
|
||||
};
|
||||
|
||||
// length is the only way how this could fail, so it's not actually likely at all unless
|
||||
// read_blk returns wrong sized block.
|
||||
//
|
||||
// TODO: confirm and make this into assertion
|
||||
let actual_summary =
|
||||
Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
|
||||
|
||||
if let Some(mut expected_summary) = summary {
|
||||
// production code path
|
||||
@@ -322,12 +337,12 @@ impl ImageLayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(ImageLayerInner {
|
||||
Ok(Ok(ImageLayerInner {
|
||||
index_start_blk: actual_summary.index_start_blk,
|
||||
index_root_blk: actual_summary.index_root_blk,
|
||||
lsn,
|
||||
file,
|
||||
})
|
||||
}))
|
||||
}
|
||||
|
||||
pub(super) async fn get_value_reconstruct_data(
|
||||
|
||||
@@ -868,6 +868,9 @@ impl LayerInner {
|
||||
}
|
||||
Ok((Err(e), _permit)) => {
|
||||
// FIXME: this should be with the spawned task and be cancellation sensitive
|
||||
//
|
||||
// while we should not need this, this backoff has turned out to be useful with
|
||||
// a bug of unexpectedly deleted remote layer file (#5787).
|
||||
let consecutive_failures =
|
||||
self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
|
||||
tracing::error!(consecutive_failures, "layer file download failed: {e:#}");
|
||||
@@ -1196,7 +1199,7 @@ impl DownloadedLayer {
|
||||
));
|
||||
delta_layer::DeltaLayerInner::load(&owner.path, summary, ctx)
|
||||
.await
|
||||
.map(LayerKind::Delta)
|
||||
.map(|res| res.map(LayerKind::Delta))
|
||||
} else {
|
||||
let lsn = owner.desc.image_layer_lsn();
|
||||
let summary = Some(image_layer::Summary::expected(
|
||||
@@ -1207,23 +1210,32 @@ impl DownloadedLayer {
|
||||
));
|
||||
image_layer::ImageLayerInner::load(&owner.path, lsn, summary, ctx)
|
||||
.await
|
||||
.map(LayerKind::Image)
|
||||
}
|
||||
// this will be a permanent failure
|
||||
.context("load layer");
|
||||
.map(|res| res.map(LayerKind::Image))
|
||||
};
|
||||
|
||||
if let Err(e) = res.as_ref() {
|
||||
LAYER_IMPL_METRICS.inc_permanent_loading_failures();
|
||||
// TODO(#5815): we are not logging all errors, so temporarily log them here as well
|
||||
tracing::error!("layer loading failed permanently: {e:#}");
|
||||
match res {
|
||||
Ok(Ok(layer)) => Ok(Ok(layer)),
|
||||
Ok(Err(transient)) => Err(transient),
|
||||
Err(permanent) => {
|
||||
LAYER_IMPL_METRICS.inc_permanent_loading_failures();
|
||||
// TODO(#5815): we are not logging all errors, so temporarily log them **once**
|
||||
// here as well
|
||||
let permanent = permanent.context("load layer");
|
||||
tracing::error!("layer loading failed permanently: {permanent:#}");
|
||||
Ok(Err(permanent))
|
||||
}
|
||||
}
|
||||
res
|
||||
};
|
||||
self.kind.get_or_init(init).await.as_ref().map_err(|e| {
|
||||
// errors are not clonabled, cannot but stringify
|
||||
// test_broken_timeline matches this string
|
||||
anyhow::anyhow!("layer loading failed: {e:#}")
|
||||
})
|
||||
self.kind
|
||||
.get_or_try_init(init)
|
||||
// return transient errors using `?`
|
||||
.await?
|
||||
.as_ref()
|
||||
.map_err(|e| {
|
||||
// errors are not clonabled, cannot but stringify
|
||||
// test_broken_timeline matches this string
|
||||
anyhow::anyhow!("layer loading failed: {e:#}")
|
||||
})
|
||||
}
|
||||
|
||||
async fn get_value_reconstruct_data(
|
||||
|
||||
@@ -180,7 +180,7 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
// Run compaction
|
||||
if let Err(e) = tenant.compaction_iteration(&cancel, &ctx).await {
|
||||
let wait_duration = backoff::exponential_backoff_duration_seconds(
|
||||
error_run_count,
|
||||
error_run_count + 1,
|
||||
1.0,
|
||||
MAX_BACKOFF_SECS,
|
||||
);
|
||||
@@ -198,6 +198,10 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
|
||||
warn_when_period_overrun(started_at.elapsed(), period, BackgroundLoopKind::Compaction);
|
||||
|
||||
// Perhaps we did no work and the walredo process has been idle for some time:
|
||||
// give it a chance to shut down to avoid leaving walredo process running indefinitely.
|
||||
tenant.walredo_mgr.maybe_quiesce(period * 10);
|
||||
|
||||
// Sleep
|
||||
if tokio::time::timeout(sleep_duration, cancel.cancelled())
|
||||
.await
|
||||
@@ -261,7 +265,7 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
.await;
|
||||
if let Err(e) = res {
|
||||
let wait_duration = backoff::exponential_backoff_duration_seconds(
|
||||
error_run_count,
|
||||
error_run_count + 1,
|
||||
1.0,
|
||||
MAX_BACKOFF_SECS,
|
||||
);
|
||||
|
||||
@@ -91,6 +91,7 @@ struct ProcessOutput {
|
||||
pub struct PostgresRedoManager {
|
||||
tenant_id: TenantId,
|
||||
conf: &'static PageServerConf,
|
||||
last_redo_at: std::sync::Mutex<Option<Instant>>,
|
||||
redo_process: RwLock<Option<Arc<WalRedoProcess>>>,
|
||||
}
|
||||
|
||||
@@ -187,10 +188,26 @@ impl PostgresRedoManager {
|
||||
PostgresRedoManager {
|
||||
tenant_id,
|
||||
conf,
|
||||
last_redo_at: std::sync::Mutex::default(),
|
||||
redo_process: RwLock::new(None),
|
||||
}
|
||||
}
|
||||
|
||||
/// This type doesn't have its own background task to check for idleness: we
|
||||
/// rely on our owner calling this function periodically in its own housekeeping
|
||||
/// loops.
|
||||
pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
|
||||
if let Ok(g) = self.last_redo_at.try_lock() {
|
||||
if let Some(last_redo_at) = *g {
|
||||
if last_redo_at.elapsed() >= idle_timeout {
|
||||
drop(g);
|
||||
let mut guard = self.redo_process.write().unwrap();
|
||||
*guard = None;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Process one request for WAL redo using wal-redo postgres
|
||||
///
|
||||
@@ -205,6 +222,8 @@ impl PostgresRedoManager {
|
||||
wal_redo_timeout: Duration,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<Bytes> {
|
||||
*(self.last_redo_at.lock().unwrap()) = Some(Instant::now());
|
||||
|
||||
let (rel, blknum) = key_to_rel_block(key).context("invalid record")?;
|
||||
const MAX_RETRY_ATTEMPTS: u32 = 1;
|
||||
let mut n_attempts = 0u32;
|
||||
@@ -348,12 +367,13 @@ impl PostgresRedoManager {
|
||||
self.apply_record_neon(key, &mut page, *record_lsn, record)?;
|
||||
}
|
||||
// Success!
|
||||
let end_time = Instant::now();
|
||||
let duration = end_time.duration_since(start_time);
|
||||
let duration = start_time.elapsed();
|
||||
// FIXME: using the same metric here creates a bimodal distribution by default, and because
|
||||
// there could be multiple batch sizes this would be N+1 modal.
|
||||
WAL_REDO_TIME.observe(duration.as_secs_f64());
|
||||
|
||||
debug!(
|
||||
"neon applied {} WAL records in {} ms to reconstruct page image at LSN {}",
|
||||
"neon applied {} WAL records in {} us to reconstruct page image at LSN {}",
|
||||
records.len(),
|
||||
duration.as_micros(),
|
||||
lsn
|
||||
|
||||
Reference in New Issue
Block a user