From 14d495ae141fbde57146e8e5afad696565e1cbbf Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 16 Jun 2023 13:23:55 +0200 Subject: [PATCH 1/8] create_delta_layer: improve misleading TODO comment (#4488) Context: https://github.com/neondatabase/neon/pull/4441#discussion_r1228086608 --- pageserver/src/tenant/timeline.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index d42fdf5e55..13705d8b85 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3015,16 +3015,19 @@ impl Timeline { // Sync it to disk. // // We must also fsync the timeline dir to ensure the directory entries for - // new layer files are durable + // new layer files are durable. + // + // NB: timeline dir must be synced _after_ the file contents are durable. + // So, two separate fsyncs are required, they mustn't be batched. // // TODO: If we're running inside 'flush_frozen_layers' and there are multiple - // files to flush, it might be better to first write them all, and then fsync - // them all in parallel. - - // First sync the delta layer. We still use par_fsync here to keep everything consistent. Feel free to replace - // this with a single fsync in future refactors. + // files to flush, the fsync overhead can be reduces as follows: + // 1. write them all to temporary file names + // 2. fsync them + // 3. rename to the final name + // 4. fsync the parent directory. + // Note that (1),(2),(3) today happen inside write_to_disk(). par_fsync::par_fsync(&[new_delta_path.clone()]).context("fsync of delta layer")?; - // Then sync the parent directory. par_fsync::par_fsync(&[self_clone .conf .timeline_path(&self_clone.timeline_id, &self_clone.tenant_id)]) From 190c3ba6109710f53f70d36c1cdae5891b007f7a Mon Sep 17 00:00:00 2001 From: Alexander Bayandin Date: Fri, 16 Jun 2023 14:17:37 +0100 Subject: [PATCH 2/8] Add tags for releases (#4524) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Problem It's not a trivial task to find corresponding changes for a particular release (for example, for 3371 — 🤷) Ref: https://neondb.slack.com/archives/C04BLQ4LW7K/p1686761537607649?thread_ts=1686736854.174559&cid=C04BLQ4LW7K ## Summary of changes - Tag releases - Add a manual trigger for the release workflow --- .github/workflows/build_and_test.yml | 14 ++++++++++++++ .github/workflows/release.yml | 1 + 2 files changed, 15 insertions(+) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 471dc68df9..5f82ab7aca 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -914,6 +914,20 @@ jobs: exit 1 fi + - name: Create tag "release-${{ needs.tag.outputs.build-tag }}" + if: github.ref_name == 'release' + uses: actions/github-script@v6 + with: + # Retry script for 5XX server errors: https://github.com/actions/github-script#retries + retries: 5 + script: | + github.rest.git.createRef({ + owner: context.repo.owner, + repo: context.repo.repo, + ref: "refs/tags/release-${{ needs.tag.outputs.build-tag }}", + sha: context.sha, + }) + promote-compatibility-data: runs-on: [ self-hosted, gen3, small ] container: diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 4bce9cdd1e..595ee05514 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -3,6 +3,7 @@ name: Create Release Branch on: schedule: - cron: '0 10 * * 2' + workflow_dispatch: jobs: create_release_branch: From 78082d0b9fdfdb8a4c328ad1b2082f840dc0968f Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 16 Jun 2023 16:54:41 +0200 Subject: [PATCH 3/8] create_delta_layer: avoid needless `stat` (#4489) We already do it inside `frozen_layer.write_to_disk()`. Context: https://github.com/neondatabase/neon/pull/4441#discussion_r1228083959 --- pageserver/src/tenant/timeline.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 13705d8b85..ef7474cb8b 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3003,7 +3003,7 @@ impl Timeline { frozen_layer: &Arc, ) -> anyhow::Result<(LayerFileName, LayerFileMetadata)> { let span = tracing::info_span!("blocking"); - let (new_delta, sz): (DeltaLayer, _) = tokio::task::spawn_blocking({ + let new_delta: DeltaLayer = tokio::task::spawn_blocking({ let _g = span.entered(); let self_clone = Arc::clone(self); let frozen_layer = Arc::clone(frozen_layer); @@ -3027,20 +3027,19 @@ impl Timeline { // 3. rename to the final name // 4. fsync the parent directory. // Note that (1),(2),(3) today happen inside write_to_disk(). - par_fsync::par_fsync(&[new_delta_path.clone()]).context("fsync of delta layer")?; + par_fsync::par_fsync(&[new_delta_path]).context("fsync of delta layer")?; par_fsync::par_fsync(&[self_clone .conf .timeline_path(&self_clone.timeline_id, &self_clone.tenant_id)]) .context("fsync of timeline dir")?; - let sz = new_delta_path.metadata()?.len(); - - anyhow::Ok((new_delta, sz)) + anyhow::Ok(new_delta) } }) .await .context("spawn_blocking")??; let new_delta_name = new_delta.filename(); + let sz = new_delta.desc.file_size; // Add it to the layer map let l = Arc::new(new_delta); @@ -3054,9 +3053,8 @@ impl Timeline { batch_updates.insert_historic(l.layer_desc().clone(), l); batch_updates.flush(); - // update the timeline's physical size - self.metrics.resident_physical_size_gauge.add(sz); // update metrics + self.metrics.resident_physical_size_gauge.add(sz); self.metrics.num_persistent_files_created.inc_by(1); self.metrics.persistent_bytes_written.inc_by(sz); From 1b947fc8aff5b3cad6f8a057372860491ec58ab6 Mon Sep 17 00:00:00 2001 From: Alexander Bayandin Date: Fri, 16 Jun 2023 18:08:11 +0100 Subject: [PATCH 4/8] test_runner: workaround rerunfailures and timeout incompatibility (#4469) ## Problem `pytest-timeout` and `pytest-rerunfailures` are incompatible (or rather not fully compatible). Timeouts aren't set for reruns. Ref https://github.com/pytest-dev/pytest-rerunfailures/issues/99 ## Summary of changes - Dynamically make timeouts `func_only` for tests that we're going to retry. It applies timeouts for reruns as well. --- test_runner/fixtures/flaky.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/test_runner/fixtures/flaky.py b/test_runner/fixtures/flaky.py index 9d7f8ead9a..d13f3318b0 100644 --- a/test_runner/fixtures/flaky.py +++ b/test_runner/fixtures/flaky.py @@ -1,6 +1,6 @@ import json from pathlib import Path -from typing import List +from typing import Any, List, MutableMapping, cast import pytest from _pytest.config import Config @@ -56,3 +56,15 @@ def pytest_collection_modifyitems(config: Config, items: List[pytest.Item]): # Rerun 3 times = 1 original run + 2 reruns log.info(f"Marking {item.nodeid} as flaky. It will be rerun up to 3 times") item.add_marker(pytest.mark.flaky(reruns=2)) + + # pytest-rerunfailures is not compatible with pytest-timeout (timeout is not set for reruns), + # we can workaround it by setting `timeout_func_only` to True[1]. + # Unfortunately, setting `timeout_func_only = True` globally in pytest.ini is broken[2], + # but we still can do it using pytest marker. + # + # - [1] https://github.com/pytest-dev/pytest-rerunfailures/issues/99 + # - [2] https://github.com/pytest-dev/pytest-timeout/issues/142 + timeout_marker = item.get_closest_marker("timeout") + if timeout_marker is not None: + kwargs = cast(MutableMapping[str, Any], timeout_marker.kwargs) + kwargs["func_only"] = True From 3b06a5bc54a2ef7b5ec8f3fee24556547310586f Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Mon, 19 Jun 2023 14:04:16 +0400 Subject: [PATCH 5/8] Raise pageserver walreceiver timeouts. I observe sporadic reconnections with ~10k idle computes. It looks like a separate issue, probably walreceiver runtime gets blocked somewhere, but in any case 2-3 seconds is too small. --- pageserver/src/tenant/config.rs | 4 ++-- test_runner/regress/test_wal_receiver.py | 9 ++++++++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/pageserver/src/tenant/config.rs b/pageserver/src/tenant/config.rs index 80d153661a..ffe2c5eab6 100644 --- a/pageserver/src/tenant/config.rs +++ b/pageserver/src/tenant/config.rs @@ -38,8 +38,8 @@ pub mod defaults { pub const DEFAULT_GC_PERIOD: &str = "1 hr"; pub const DEFAULT_IMAGE_CREATION_THRESHOLD: usize = 3; pub const DEFAULT_PITR_INTERVAL: &str = "7 days"; - pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "2 seconds"; - pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "3 seconds"; + pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "10 seconds"; + pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "10 seconds"; pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 10 * 1024 * 1024; pub const DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD: &str = "24 hour"; } diff --git a/test_runner/regress/test_wal_receiver.py b/test_runner/regress/test_wal_receiver.py index 515d47c079..7ac6e6332c 100644 --- a/test_runner/regress/test_wal_receiver.py +++ b/test_runner/regress/test_wal_receiver.py @@ -1,3 +1,5 @@ +import time + from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder from fixtures.types import Lsn, TenantId @@ -40,7 +42,10 @@ def test_pageserver_lsn_wait_error_start(neon_env_builder: NeonEnvBuilder): # Kills one of the safekeepers and ensures that only the active ones are printed in the state. def test_pageserver_lsn_wait_error_safekeeper_stop(neon_env_builder: NeonEnvBuilder): # Trigger WAL wait timeout faster - neon_env_builder.pageserver_config_override = "wait_lsn_timeout = '1s'" + neon_env_builder.pageserver_config_override = """ + wait_lsn_timeout = "1s" + tenant_config={walreceiver_connect_timeout = "2s", lagging_wal_timeout = "2s"} + """ # Have notable SK ids to ensure we check logs for their presence, not some other random numbers neon_env_builder.safekeepers_id_start = 12345 neon_env_builder.num_safekeepers = 3 @@ -70,6 +75,8 @@ def test_pageserver_lsn_wait_error_safekeeper_stop(neon_env_builder: NeonEnvBuil stopped_safekeeper_id = stopped_safekeeper.id log.info(f"Stopping safekeeper {stopped_safekeeper.id}") stopped_safekeeper.stop() + # sleep until stopped safekeeper is removed from candidates + time.sleep(2) # Spend some more time inserting, to ensure SKs report updated statuses and walreceiver in PS have time to update its connection stats. insert_test_elements(env, tenant_id, start=elements_to_insert + 1, count=elements_to_insert) From 557abc18f36506c001c62c22ffa13ee11610d6c7 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Mon, 19 Jun 2023 13:27:14 +0400 Subject: [PATCH 6/8] Fix test_s3_wal_replay assertion flakiness. Supposedly fixes https://github.com/neondatabase/neon/issues/4277 --- test_runner/regress/test_wal_acceptor.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index a837501678..994858edf7 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -575,11 +575,21 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re pg_version = sk.http_client().timeline_status(tenant_id, timeline_id).pg_version + # Terminate first all safekeepers to prevent communication unexpectantly + # advancing peer_horizon_lsn. for sk in env.safekeepers: cli = sk.http_client() cli.timeline_delete_force(tenant_id, timeline_id) # restart safekeeper to clear its in-memory state - sk.stop().start() + sk.stop() + # wait all potenital in flight pushes to broker arrive before starting + # safekeepers (even without sleep, it is very unlikely they are not + # delivered yet). + time.sleep(1) + + for sk in env.safekeepers: + sk.start() + cli = sk.http_client() cli.timeline_create(tenant_id, timeline_id, pg_version, last_lsn) f_partial_path = ( Path(sk.data_dir()) / str(tenant_id) / str(timeline_id) / f_partial_saved.name From 036fda392ff9e864aeb2d5a9528d85a8c388d590 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 19 Jun 2023 16:25:57 +0200 Subject: [PATCH 7/8] log timings for compact_level0_phase1 (#4527) The data will help decide whether it's ok to keep holding Timeline::layers in shared mode until after we've calculated the holes. Other timings are to understand the general breakdown of timings in that function. Context: https://github.com/neondatabase/neon/issues/4492 --- pageserver/src/tenant/timeline.rs | 161 ++++++++++++++++++++++++++++++ 1 file changed, 161 insertions(+) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index ef7474cb8b..de786da322 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -15,6 +15,7 @@ use pageserver_api::models::{ TimelineState, }; use remote_storage::GenericRemoteStorage; +use serde_with::serde_as; use storage_broker::BrokerClientChannel; use tokio::sync::{oneshot, watch, Semaphore, TryAcquireError}; use tokio_util::sync::CancellationToken; @@ -3333,6 +3334,130 @@ impl From for CompactionError { } } +#[serde_as] +#[derive(serde::Serialize)] +struct RecordedDuration(#[serde_as(as = "serde_with::DurationMicroSeconds")] Duration); + +#[derive(Default)] +enum DurationRecorder { + #[default] + NotStarted, + Recorded(RecordedDuration, tokio::time::Instant), +} + +impl DurationRecorder { + pub fn till_now(&self) -> DurationRecorder { + match self { + DurationRecorder::NotStarted => { + panic!("must only call on recorded measurements") + } + DurationRecorder::Recorded(_, ended) => { + let now = tokio::time::Instant::now(); + DurationRecorder::Recorded(RecordedDuration(now - *ended), now) + } + } + } + pub fn into_recorded(self) -> Option { + match self { + DurationRecorder::NotStarted => None, + DurationRecorder::Recorded(recorded, _) => Some(recorded), + } + } +} + +#[derive(Default)] +struct CompactLevel0Phase1StatsBuilder { + version: Option, + tenant_id: Option, + timeline_id: Option, + first_read_lock_acquisition_micros: DurationRecorder, + get_level0_deltas_plus_drop_lock_micros: DurationRecorder, + level0_deltas_count: Option, + time_spent_between_locks: DurationRecorder, + second_read_lock_acquisition_micros: DurationRecorder, + second_read_lock_held_micros: DurationRecorder, + sort_holes_micros: DurationRecorder, + write_layer_files_micros: DurationRecorder, + new_deltas_count: Option, + new_deltas_size: Option, +} + +#[serde_as] +#[derive(serde::Serialize)] +struct CompactLevel0Phase1Stats { + version: u64, + #[serde_as(as = "serde_with::DisplayFromStr")] + tenant_id: TenantId, + #[serde_as(as = "serde_with::DisplayFromStr")] + timeline_id: TimelineId, + first_read_lock_acquisition_micros: RecordedDuration, + get_level0_deltas_plus_drop_lock_micros: RecordedDuration, + level0_deltas_count: usize, + time_spent_between_locks: RecordedDuration, + second_read_lock_acquisition_micros: RecordedDuration, + second_read_lock_held_micros: RecordedDuration, + sort_holes_micros: RecordedDuration, + write_layer_files_micros: RecordedDuration, + new_deltas_count: usize, + new_deltas_size: u64, +} + +impl TryFrom for CompactLevel0Phase1Stats { + type Error = anyhow::Error; + + fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result { + let CompactLevel0Phase1StatsBuilder { + version, + tenant_id, + timeline_id, + first_read_lock_acquisition_micros, + get_level0_deltas_plus_drop_lock_micros, + level0_deltas_count, + time_spent_between_locks, + second_read_lock_acquisition_micros, + second_read_lock_held_micros, + sort_holes_micros, + write_layer_files_micros, + new_deltas_count, + new_deltas_size, + } = value; + Ok(CompactLevel0Phase1Stats { + version: version.ok_or_else(|| anyhow::anyhow!("version not set"))?, + tenant_id: tenant_id.ok_or_else(|| anyhow::anyhow!("tenant_id not set"))?, + timeline_id: timeline_id.ok_or_else(|| anyhow::anyhow!("timeline_id not set"))?, + first_read_lock_acquisition_micros: first_read_lock_acquisition_micros + .into_recorded() + .ok_or_else(|| anyhow::anyhow!("first_read_lock_acquisition_micros not set"))?, + get_level0_deltas_plus_drop_lock_micros: get_level0_deltas_plus_drop_lock_micros + .into_recorded() + .ok_or_else(|| { + anyhow::anyhow!("get_level0_deltas_plus_drop_lock_micros not set") + })?, + level0_deltas_count: level0_deltas_count + .ok_or_else(|| anyhow::anyhow!("level0_deltas_count not set"))?, + time_spent_between_locks: time_spent_between_locks + .into_recorded() + .ok_or_else(|| anyhow::anyhow!("time_spent_between_locks not set"))?, + second_read_lock_acquisition_micros: second_read_lock_acquisition_micros + .into_recorded() + .ok_or_else(|| anyhow::anyhow!("second_read_lock_acquisition_micros not set"))?, + second_read_lock_held_micros: second_read_lock_held_micros + .into_recorded() + .ok_or_else(|| anyhow::anyhow!("second_read_lock_held_micros not set"))?, + sort_holes_micros: sort_holes_micros + .into_recorded() + .ok_or_else(|| anyhow::anyhow!("sort_holes_micros not set"))?, + write_layer_files_micros: write_layer_files_micros + .into_recorded() + .ok_or_else(|| anyhow::anyhow!("write_layer_files_micros not set"))?, + new_deltas_count: new_deltas_count + .ok_or_else(|| anyhow::anyhow!("new_deltas_count not set"))?, + new_deltas_size: new_deltas_size + .ok_or_else(|| anyhow::anyhow!("new_deltas_size not set"))?, + }) + } +} + impl Timeline { /// Level0 files first phase of compaction, explained in the [`compact_inner`] comment. /// @@ -3345,9 +3470,23 @@ impl Timeline { target_file_size: u64, ctx: &RequestContext, ) -> Result { + let mut stats = CompactLevel0Phase1StatsBuilder { + version: Some(1), + tenant_id: Some(self.tenant_id), + timeline_id: Some(self.timeline_id), + ..Default::default() + }; + + let begin = tokio::time::Instant::now(); let layers = self.layers.read().await; + let now = tokio::time::Instant::now(); + stats.first_read_lock_acquisition_micros = + DurationRecorder::Recorded(RecordedDuration(now - begin), now); let mut level0_deltas = layers.get_level0_deltas()?; drop(layers); + stats.level0_deltas_count = Some(level0_deltas.len()); + stats.get_level0_deltas_plus_drop_lock_micros = + stats.first_read_lock_acquisition_micros.till_now(); // Only compact if enough layers have accumulated. let threshold = self.get_compaction_threshold(); @@ -3468,7 +3607,9 @@ impl Timeline { // Determine N largest holes where N is number of compacted layers. let max_holes = deltas_to_compact.len(); let last_record_lsn = self.get_last_record_lsn(); + stats.time_spent_between_locks = stats.get_level0_deltas_plus_drop_lock_micros.till_now(); let layers = self.layers.read().await; // Is'n it better to hold original layers lock till here? + stats.second_read_lock_acquisition_micros = stats.time_spent_between_locks.till_now(); let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128; let min_hole_coverage_size = 3; // TODO: something more flexible? @@ -3502,9 +3643,11 @@ impl Timeline { prev = Some(next_key.next()); } drop(layers); + stats.second_read_lock_held_micros = stats.second_read_lock_acquisition_micros.till_now(); let mut holes = heap.into_vec(); holes.sort_unstable_by_key(|hole| hole.key_range.start); let mut next_hole = 0; // index of next hole in holes vector + stats.sort_holes_micros = stats.second_read_lock_held_micros.till_now(); // Merge the contents of all the input delta layers into a new set // of delta layers, based on the current partitioning. @@ -3664,8 +3807,26 @@ impl Timeline { layer_paths.pop().unwrap(); } + stats.write_layer_files_micros = stats.sort_holes_micros.till_now(); + stats.new_deltas_count = Some(new_layers.len()); + stats.new_deltas_size = Some(new_layers.iter().map(|l| l.desc.file_size).sum()); + drop(all_keys_iter); // So that deltas_to_compact is no longer borrowed + match TryInto::::try_into(stats) + .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string")) + { + Ok(stats_json) => { + info!( + stats_json = stats_json.as_str(), + "compact_level0_phase1 stats available" + ) + } + Err(e) => { + warn!("compact_level0_phase1 stats failed to serialize: {:#}", e); + } + } + Ok(CompactLevel0Phase1Result { new_layers, deltas_to_compact, From 2023e22ed3620b050473b5171c064df5bc1ba7aa Mon Sep 17 00:00:00 2001 From: Alek Westover Date: Mon, 19 Jun 2023 13:14:20 -0400 Subject: [PATCH 8/8] Add `RelationError` error type to pageserver rather than string parsing error messages (#4508) --- pageserver/src/import_datadir.rs | 12 +++--- pageserver/src/pgdatadir_mapping.rs | 58 ++++++++++++++++++----------- pageserver/src/walingest.rs | 7 +++- 3 files changed, 48 insertions(+), 29 deletions(-) diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 9ad0124a80..5bff5337bd 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -148,17 +148,17 @@ async fn import_rel( // because there is no guarantee about the order in which we are processing segments. // ignore "relation already exists" error // - // FIXME: use proper error type for this, instead of parsing the error message. - // Or better yet, keep track of which relations we've already created + // FIXME: Keep track of which relations we've already created? // https://github.com/neondatabase/neon/issues/3309 if let Err(e) = modification .put_rel_creation(rel, nblocks as u32, ctx) .await { - if e.to_string().contains("already exists") { - debug!("relation {} already exists. we must be extending it", rel); - } else { - return Err(e); + match e { + RelationError::AlreadyExists => { + debug!("Relation {} already exist. We must be extending it.", rel) + } + _ => return Err(e.into()), } } diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 86c84ec82f..998c199ba6 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -43,6 +43,16 @@ pub enum CalculateLogicalSizeError { Other(#[from] anyhow::Error), } +#[derive(Debug, thiserror::Error)] +pub enum RelationError { + #[error("Relation Already Exists")] + AlreadyExists, + #[error("invalid relnode")] + InvalidRelnode, + #[error(transparent)] + Other(#[from] anyhow::Error), +} + /// /// This impl provides all the functionality to store PostgreSQL relations, SLRUs, /// and other special kinds of files, in a versioned key-value store. The @@ -101,9 +111,9 @@ impl Timeline { ctx: &RequestContext, ) -> Result { if tag.relnode == 0 { - return Err(PageReconstructError::Other(anyhow::anyhow!( - "invalid relnode" - ))); + return Err(PageReconstructError::Other( + RelationError::InvalidRelnode.into(), + )); } let nblocks = self.get_rel_size(tag, lsn, latest, ctx).await?; @@ -148,9 +158,9 @@ impl Timeline { ctx: &RequestContext, ) -> Result { if tag.relnode == 0 { - return Err(PageReconstructError::Other(anyhow::anyhow!( - "invalid relnode" - ))); + return Err(PageReconstructError::Other( + RelationError::InvalidRelnode.into(), + )); } if let Some(nblocks) = self.get_cached_rel_size(&tag, lsn) { @@ -193,9 +203,9 @@ impl Timeline { ctx: &RequestContext, ) -> Result { if tag.relnode == 0 { - return Err(PageReconstructError::Other(anyhow::anyhow!( - "invalid relnode" - ))); + return Err(PageReconstructError::Other( + RelationError::InvalidRelnode.into(), + )); } // first try to lookup relation in cache @@ -724,7 +734,7 @@ impl<'a> DatadirModification<'a> { blknum: BlockNumber, rec: NeonWalRecord, ) -> anyhow::Result<()> { - anyhow::ensure!(rel.relnode != 0, "invalid relnode"); + anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode); self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec)); Ok(()) } @@ -751,7 +761,7 @@ impl<'a> DatadirModification<'a> { blknum: BlockNumber, img: Bytes, ) -> anyhow::Result<()> { - anyhow::ensure!(rel.relnode != 0, "invalid relnode"); + anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode); self.put(rel_block_to_key(rel, blknum), Value::Image(img)); Ok(()) } @@ -875,32 +885,38 @@ impl<'a> DatadirModification<'a> { rel: RelTag, nblocks: BlockNumber, ctx: &RequestContext, - ) -> anyhow::Result<()> { - anyhow::ensure!(rel.relnode != 0, "invalid relnode"); + ) -> Result<(), RelationError> { + if rel.relnode == 0 { + return Err(RelationError::AlreadyExists); + } // It's possible that this is the first rel for this db in this // tablespace. Create the reldir entry for it if so. - let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await?)?; + let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?) + .context("deserialize db")?; let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode); let mut rel_dir = if dbdir.dbdirs.get(&(rel.spcnode, rel.dbnode)).is_none() { // Didn't exist. Update dbdir dbdir.dbdirs.insert((rel.spcnode, rel.dbnode), false); - let buf = DbDirectory::ser(&dbdir)?; + let buf = DbDirectory::ser(&dbdir).context("serialize db")?; self.put(DBDIR_KEY, Value::Image(buf.into())); // and create the RelDirectory RelDirectory::default() } else { // reldir already exists, fetch it - RelDirectory::des(&self.get(rel_dir_key, ctx).await?)? + RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?) + .context("deserialize db")? }; // Add the new relation to the rel directory entry, and write it back if !rel_dir.rels.insert((rel.relnode, rel.forknum)) { - anyhow::bail!("rel {rel} already exists"); + return Err(RelationError::AlreadyExists); } self.put( rel_dir_key, - Value::Image(Bytes::from(RelDirectory::ser(&rel_dir)?)), + Value::Image(Bytes::from( + RelDirectory::ser(&rel_dir).context("serialize")?, + )), ); // Put size @@ -925,7 +941,7 @@ impl<'a> DatadirModification<'a> { nblocks: BlockNumber, ctx: &RequestContext, ) -> anyhow::Result<()> { - anyhow::ensure!(rel.relnode != 0, "invalid relnode"); + anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode); let last_lsn = self.tline.get_last_record_lsn(); if self.tline.get_rel_exists(rel, last_lsn, true, ctx).await? { let size_key = rel_size_to_key(rel); @@ -956,7 +972,7 @@ impl<'a> DatadirModification<'a> { nblocks: BlockNumber, ctx: &RequestContext, ) -> anyhow::Result<()> { - anyhow::ensure!(rel.relnode != 0, "invalid relnode"); + anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode); // Put size let size_key = rel_size_to_key(rel); @@ -977,7 +993,7 @@ impl<'a> DatadirModification<'a> { /// Drop a relation. pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> { - anyhow::ensure!(rel.relnode != 0, "invalid relnode"); + anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode); // Remove it from the directory entry let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode); diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 68cf2a4645..8d4c1842bd 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -25,7 +25,7 @@ use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes; use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment; use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn}; -use anyhow::Result; +use anyhow::{Context, Result}; use bytes::{Buf, Bytes, BytesMut}; use tracing::*; @@ -1082,7 +1082,10 @@ impl<'a> WalIngest<'a> { .await? { // create it with 0 size initially, the logic below will extend it - modification.put_rel_creation(rel, 0, ctx).await?; + modification + .put_rel_creation(rel, 0, ctx) + .await + .context("Relation Error")?; 0 } else { self.timeline.get_rel_size(rel, last_lsn, true, ctx).await?