From a010b2108a7912f2e140c81d4d6b0d115f6c4aad Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Fri, 23 Jun 2023 03:18:06 -0400 Subject: [PATCH 01/12] pgserver: better template config file (#4554) * `compaction_threshold` should be an integer, not a string. * uncomment `[section]` so that if a user needs to modify the config, they can simply uncomment the corresponding line. Otherwise it's easy for us to forget uncommenting the `[section]` when uncommenting the config item we want to configure. Signed-off-by: Alex Chi --- pageserver/src/config.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 17e6e3fb2a..2046d27b1e 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -96,12 +96,12 @@ pub mod defaults { #background_task_maximum_delay = '{DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY}' -# [tenant_config] +[tenant_config] #checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes #checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT} #compaction_target_size = {DEFAULT_COMPACTION_TARGET_SIZE} # in bytes #compaction_period = '{DEFAULT_COMPACTION_PERIOD}' -#compaction_threshold = '{DEFAULT_COMPACTION_THRESHOLD}' +#compaction_threshold = {DEFAULT_COMPACTION_THRESHOLD} #gc_period = '{DEFAULT_GC_PERIOD}' #gc_horizon = {DEFAULT_GC_HORIZON} @@ -111,7 +111,8 @@ pub mod defaults { #min_resident_size_override = .. # in bytes #evictions_low_residence_duration_metric_threshold = '{DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD}' #gc_feedback = false -# [remote_storage] + +[remote_storage] "### ); From d96d51a3b7db91ca70308b71d5d2c350a7dc00f2 Mon Sep 17 00:00:00 2001 From: Vadim Kharitonov Date: Fri, 23 Jun 2023 14:09:04 +0300 Subject: [PATCH 02/12] Update rust to 1.70.0 (#4550) --- rust-toolchain.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust-toolchain.toml b/rust-toolchain.toml index c39ba4f417..6abb435018 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,5 +1,5 @@ [toolchain] -channel = "1.68.2" +channel = "1.70.0" profile = "default" # The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy. # https://rust-lang.github.io/rustup/concepts/profiles.html From 6c3605fc249c9ca079fe686c46bebf4eb87d7396 Mon Sep 17 00:00:00 2001 From: Alexander Bayandin Date: Fri, 23 Jun 2023 12:47:37 +0100 Subject: [PATCH 03/12] Nightly Benchmarks: Increase timeout for pgbench-compare job (#4551) ## Problem In the test environment vacuum duration fluctuates from ~1h to ~5h, along with another two 1h benchmarks (`select-only` and `simple-update`) it could be up to 7h which is longer than 6h timeout. ## Summary of changes - Increase timeout for pgbench-compare job to 8h - Remove 6h timeouts from Nightly Benchmarks (this is a default value) --- .github/workflows/benchmarking.yml | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/.github/workflows/benchmarking.yml b/.github/workflows/benchmarking.yml index 08b74a2656..172b904331 100644 --- a/.github/workflows/benchmarking.yml +++ b/.github/workflows/benchmarking.yml @@ -180,7 +180,8 @@ jobs: image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned options: --init - timeout-minutes: 360 # 6h + # Increase timeout to 8h, default timeout is 6h + timeout-minutes: 480 steps: - uses: actions/checkout@v3 @@ -321,8 +322,6 @@ jobs: image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned options: --init - timeout-minutes: 360 # 6h - steps: - uses: actions/checkout@v3 @@ -414,8 +413,6 @@ jobs: image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned options: --init - timeout-minutes: 360 # 6h - steps: - uses: actions/checkout@v3 @@ -501,8 +498,6 @@ jobs: image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned options: --init - timeout-minutes: 360 # 6h - steps: - uses: actions/checkout@v3 From c07b6ffbdcf2b333cab774c92885316347e86d3c Mon Sep 17 00:00:00 2001 From: Alexander Bayandin Date: Fri, 23 Jun 2023 12:52:17 +0100 Subject: [PATCH 04/12] Fix git tag name for release (#4545) ## Problem A git tag for a release has an extra `release-` prefix (it looks like `release-release-3439`). ## Summary of changes - Do not add `release-` prefix when create git tag --- .github/workflows/build_and_test.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 94fbb02cf6..435512c460 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -916,7 +916,7 @@ jobs: exit 1 fi - - name: Create tag "release-${{ needs.tag.outputs.build-tag }}" + - name: Create git tag if: github.ref_name == 'release' uses: actions/github-script@v6 with: @@ -926,7 +926,7 @@ jobs: github.rest.git.createRef({ owner: context.repo.owner, repo: context.repo.repo, - ref: "refs/tags/release-${{ needs.tag.outputs.build-tag }}", + ref: "refs/tags/${{ needs.tag.outputs.build-tag }}", sha: context.sha, }) From 76718472beac1df5804ecd45a8d74b3a8fe26852 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 23 Jun 2023 16:42:12 +0200 Subject: [PATCH 05/12] add pageserver-global histogram for basebackup latency (#4559) The histogram distinguishes by ok/err. I took the liberty to create a small abstraction for such use cases. It helps keep the label values inside `metrics.rs`, right next to the place where the metric and its labels are declared. --- libs/metrics/src/lib.rs | 1 + libs/metrics/src/metric_vec_duration.rs | 23 +++++++++++++++++++++++ pageserver/src/metrics.rs | 22 ++++++++++++++++++++++ pageserver/src/page_service.rs | 22 ++++++++++++++++++---- test_runner/fixtures/metrics.py | 1 + 5 files changed, 65 insertions(+), 4 deletions(-) create mode 100644 libs/metrics/src/metric_vec_duration.rs diff --git a/libs/metrics/src/lib.rs b/libs/metrics/src/lib.rs index 6011713c8f..f24488b19d 100644 --- a/libs/metrics/src/lib.rs +++ b/libs/metrics/src/lib.rs @@ -23,6 +23,7 @@ use prometheus::{Registry, Result}; pub mod launch_timestamp; mod wrappers; pub use wrappers::{CountedReader, CountedWriter}; +pub mod metric_vec_duration; pub type UIntGauge = GenericGauge; pub type UIntGaugeVec = GenericGaugeVec; diff --git a/libs/metrics/src/metric_vec_duration.rs b/libs/metrics/src/metric_vec_duration.rs new file mode 100644 index 0000000000..840f60f19b --- /dev/null +++ b/libs/metrics/src/metric_vec_duration.rs @@ -0,0 +1,23 @@ +//! Helpers for observing duration on HistogramVec / CounterVec / GaugeVec / MetricVec. + +use std::{future::Future, time::Instant}; + +pub trait DurationResultObserver { + fn observe_result(&self, res: &Result, duration: std::time::Duration); +} + +pub async fn observe_async_block_duration_by_result< + T, + E, + F: Future>, + O: DurationResultObserver, +>( + observer: &O, + block: F, +) -> Result { + let start = Instant::now(); + let result = block.await; + let duration = start.elapsed(); + observer.observe_result(&result, duration); + result +} diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 43d06db6d8..b7fdc65a00 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1,3 +1,4 @@ +use metrics::metric_vec_duration::DurationResultObserver; use metrics::{ register_counter_vec, register_histogram, register_histogram_vec, register_int_counter, register_int_counter_vec, register_int_gauge, register_int_gauge_vec, register_uint_gauge_vec, @@ -424,6 +425,27 @@ pub static SMGR_QUERY_TIME: Lazy = Lazy::new(|| { .expect("failed to define a metric") }); +pub struct BasebackupQueryTime(HistogramVec); +pub static BASEBACKUP_QUERY_TIME: Lazy = Lazy::new(|| { + BasebackupQueryTime({ + register_histogram_vec!( + "pageserver_basebackup_query_seconds", + "Histogram of basebackup queries durations, by result type", + &["result"], + CRITICAL_OP_BUCKETS.into(), + ) + .expect("failed to define a metric") + }) +}); + +impl DurationResultObserver for BasebackupQueryTime { + fn observe_result(&self, res: &Result, duration: std::time::Duration) { + let label_value = if res.is_ok() { "ok" } else { "error" }; + let metric = self.0.get_metric_with_label_values(&[label_value]).unwrap(); + metric.observe(duration.as_secs_f64()); + } +} + pub static LIVE_CONNECTIONS_COUNT: Lazy = Lazy::new(|| { register_int_gauge_vec!( "pageserver_live_connections", diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 31ad45790c..d32518b513 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -913,10 +913,24 @@ where None }; - // Check that the timeline exists - self.handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, None, false, ctx) - .await?; - pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; + metrics::metric_vec_duration::observe_async_block_duration_by_result( + &*crate::metrics::BASEBACKUP_QUERY_TIME, + async move { + self.handle_basebackup_request( + pgb, + tenant_id, + timeline_id, + lsn, + None, + false, + ctx, + ) + .await?; + pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; + anyhow::Ok(()) + }, + ) + .await?; } // return pair of prev_lsn and last_lsn else if query_string.starts_with("get_last_record_rlsn ") { diff --git a/test_runner/fixtures/metrics.py b/test_runner/fixtures/metrics.py index d55d159037..7ee3c33f92 100644 --- a/test_runner/fixtures/metrics.py +++ b/test_runner/fixtures/metrics.py @@ -62,6 +62,7 @@ PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = ( "pageserver_getpage_reconstruct_seconds_bucket", "pageserver_getpage_reconstruct_seconds_count", "pageserver_getpage_reconstruct_seconds_sum", + *[f"pageserver_basebackup_query_seconds_{x}" for x in ["bucket", "count", "sum"]], ) PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = ( From a3f0dd2d3008fbabc01a76c60edf16860271c62b Mon Sep 17 00:00:00 2001 From: Vadim Kharitonov Date: Fri, 23 Jun 2023 17:56:49 +0300 Subject: [PATCH 06/12] Compile `pg_uuidv7` (#4558) Doc says that it should be added into `shared_preload_libraries`, but, practically, it's not required. ``` postgres=# create extension pg_uuidv7; CREATE EXTENSION postgres=# SELECT uuid_generate_v7(); uuid_generate_v7 -------------------------------------- 0188e823-3f8f-796c-a92c-833b0b2d1746 (1 row) ``` --- Dockerfile.compute-node | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/Dockerfile.compute-node b/Dockerfile.compute-node index fc575536bc..d8770785eb 100644 --- a/Dockerfile.compute-node +++ b/Dockerfile.compute-node @@ -481,6 +481,23 @@ RUN wget https://github.com/rdkit/rdkit/archive/refs/tags/Release_2023_03_1.tar. make -j $(getconf _NPROCESSORS_ONLN) install && \ echo 'trusted = true' >> /usr/local/pgsql/share/extension/rdkit.control +######################################################################################### +# +# Layer "pg-uuidv7-pg-build" +# compile pg_uuidv7 extension +# +######################################################################################### +FROM build-deps AS pg-uuidv7-pg-build +COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/ + +ENV PATH "/usr/local/pgsql/bin/:$PATH" +RUN wget https://github.com/fboulnois/pg_uuidv7/archive/refs/tags/v1.0.1.tar.gz -O pg_uuidv7.tar.gz && \ + echo "0d0759ab01b7fb23851ecffb0bce27822e1868a4a5819bfd276101c716637a7a pg_uuidv7.tar.gz" | sha256sum --check && \ + mkdir pg_uuidv7-src && cd pg_uuidv7-src && tar xvzf ../pg_uuidv7.tar.gz --strip-components=1 -C . && \ + make -j $(getconf _NPROCESSORS_ONLN) && \ + make -j $(getconf _NPROCESSORS_ONLN) install && \ + echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_uuidv7.control + ######################################################################################### # # Layer "rust extensions" @@ -614,6 +631,7 @@ COPY --from=kq-imcx-pg-build /usr/local/pgsql/ /usr/local/pgsql/ COPY --from=pg-cron-pg-build /usr/local/pgsql/ /usr/local/pgsql/ COPY --from=pg-pgx-ulid-build /usr/local/pgsql/ /usr/local/pgsql/ COPY --from=rdkit-pg-build /usr/local/pgsql/ /usr/local/pgsql/ +COPY --from=pg-uuidv7-pg-build /usr/local/pgsql/ /usr/local/pgsql/ COPY pgxn/ pgxn/ RUN make -j $(getconf _NPROCESSORS_ONLN) \ From 15456625c29b0996ebb5641e4438c427bcb77fc2 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 23 Jun 2023 21:40:36 +0200 Subject: [PATCH 07/12] don't use MGMT_REQUEST_RUNTIME for consumption metrics synthetic size worker (#4560) The consumption metrics synthetic size worker does logical size calculation. Logical size calculation currently does synchronous disk IO. This blocks the MGMT_REQUEST_RUNTIME's executor threads, starving other futures. While there's work on the way to move the synchronous disk IO into spawn_blocking, the quickfix here is to use the BACKGROUND_RUNTIME instead of MGMT_REQUEST_RUNTIME. Actually it's not just a quickfix. We simply shouldn't be blocking MGMT_REQUEST_RUNTIME executor threads on CPU or sync disk IO. That work isn't done yet, as many of the mgmt tasks still _do_ disk IO. But it's not as intensive as the logical size calculations that we're fixing here. While we're at it, fix disk-usage-based eviction in a similar way. It wasn't the culprit here, according to prod logs, but it can theoretically be a little CPU-intensive. More context, including graphs from Prod: https://neondb.slack.com/archives/C03F5SM1N02/p1687541681336949 --- pageserver/src/bin/pageserver.rs | 82 ++++++++++++++++---------------- pageserver/src/http/routes.rs | 4 +- 2 files changed, 42 insertions(+), 44 deletions(-) diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 1fa5e4ab3b..b01ace63e4 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -495,50 +495,50 @@ fn start_pageserver( Ok(()) }, ); + } - if let Some(metric_collection_endpoint) = &conf.metric_collection_endpoint { - let background_jobs_barrier = background_jobs_barrier; - let metrics_ctx = RequestContext::todo_child( - TaskKind::MetricsCollection, - // This task itself shouldn't download anything. - // The actual size calculation does need downloads, and - // creates a child context with the right DownloadBehavior. - DownloadBehavior::Error, - ); - task_mgr::spawn( - MGMT_REQUEST_RUNTIME.handle(), - TaskKind::MetricsCollection, - None, - None, - "consumption metrics collection", - true, - async move { - // first wait until background jobs are cleared to launch. - // - // this is because we only process active tenants and timelines, and the - // Timeline::get_current_logical_size will spawn the logical size calculation, - // which will not be rate-limited. - let cancel = task_mgr::shutdown_token(); + if let Some(metric_collection_endpoint) = &conf.metric_collection_endpoint { + let background_jobs_barrier = background_jobs_barrier; + let metrics_ctx = RequestContext::todo_child( + TaskKind::MetricsCollection, + // This task itself shouldn't download anything. + // The actual size calculation does need downloads, and + // creates a child context with the right DownloadBehavior. + DownloadBehavior::Error, + ); + task_mgr::spawn( + crate::BACKGROUND_RUNTIME.handle(), + TaskKind::MetricsCollection, + None, + None, + "consumption metrics collection", + true, + async move { + // first wait until background jobs are cleared to launch. + // + // this is because we only process active tenants and timelines, and the + // Timeline::get_current_logical_size will spawn the logical size calculation, + // which will not be rate-limited. + let cancel = task_mgr::shutdown_token(); - tokio::select! { - _ = cancel.cancelled() => { return Ok(()); }, - _ = background_jobs_barrier.wait() => {} - }; + tokio::select! { + _ = cancel.cancelled() => { return Ok(()); }, + _ = background_jobs_barrier.wait() => {} + }; - pageserver::consumption_metrics::collect_metrics( - metric_collection_endpoint, - conf.metric_collection_interval, - conf.cached_metric_collection_interval, - conf.synthetic_size_calculation_interval, - conf.id, - metrics_ctx, - ) - .instrument(info_span!("metrics_collection")) - .await?; - Ok(()) - }, - ); - } + pageserver::consumption_metrics::collect_metrics( + metric_collection_endpoint, + conf.metric_collection_interval, + conf.cached_metric_collection_interval, + conf.synthetic_size_calculation_interval, + conf.id, + metrics_ctx, + ) + .instrument(info_span!("metrics_collection")) + .await?; + Ok(()) + }, + ); } // Spawn a task to listen for libpq connections. It will spawn further tasks diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 5bec07b74a..8d3bb5552b 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -1128,8 +1128,6 @@ async fn disk_usage_eviction_run( freed_bytes: 0, }; - use crate::task_mgr::MGMT_REQUEST_RUNTIME; - let (tx, rx) = tokio::sync::oneshot::channel(); let state = get_state(&r); @@ -1147,7 +1145,7 @@ async fn disk_usage_eviction_run( let _g = cancel.drop_guard(); crate::task_mgr::spawn( - MGMT_REQUEST_RUNTIME.handle(), + crate::task_mgr::BACKGROUND_RUNTIME.handle(), TaskKind::DiskUsageEviction, None, None, From a500bb06fb69069286d85d12cdd7be7d6b42b607 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 23 Jun 2023 22:40:50 +0200 Subject: [PATCH 08/12] use preinitialize_metrics to initialize page cache metrics (#4557) This is follow-up to ``` commit 2252c5c282e8463b0f1dc1d9c7484e50706392e9 Author: Alex Chi Z Date: Wed Jun 14 17:12:34 2023 -0400 metrics: convert some metrics to pageserver-level (#4490) ``` --- pageserver/src/metrics.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index b7fdc65a00..00745143bd 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -845,11 +845,6 @@ impl TimelineMetrics { let evictions_with_low_residence_duration = evictions_with_low_residence_duration_builder.build(&tenant_id, &timeline_id); - // TODO(chi): remove this once we remove Lazy for all metrics. Otherwise this will not appear in the exporter - // and integration test will error. - MATERIALIZED_PAGE_CACHE_HIT_DIRECT.get(); - MATERIALIZED_PAGE_CACHE_HIT.get(); - TimelineMetrics { tenant_id, timeline_id, @@ -1324,4 +1319,8 @@ pub fn preinitialize_metrics() { // Same as above for this metric, but, it's a Vec-type metric for which we don't know all the labels. BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT.reset(); + + // Python tests need these. + MATERIALIZED_PAGE_CACHE_HIT_DIRECT.get(); + MATERIALIZED_PAGE_CACHE_HIT.get(); } From b1477b4448e120ea3fc6443043664f07bc4b6f82 Mon Sep 17 00:00:00 2001 From: Sasha Krassovsky Date: Fri, 23 Jun 2023 15:38:27 -0700 Subject: [PATCH 09/12] Create neon_superuser role, grant it to roles created from control plane (#4425) ## Problem Currently, if a user creates a role, it won't by default have any grants applied to it. If the compute restarts, the grants get applied. This gives a very strange UX of being able to drop roles/not have any access to anything at first, and then once something triggers a config application, suddenly grants are applied. This removes these grants. --- compute_tools/src/compute.rs | 88 ++++++++++++++++++++++- compute_tools/src/pg_helpers.rs | 2 +- compute_tools/src/spec.rs | 45 +++--------- test_runner/regress/test_compatibility.py | 54 +++++++++++++- test_runner/regress/test_hot_standby.py | 9 ++- test_runner/regress/test_tenant_size.py | 16 +++-- 6 files changed, 164 insertions(+), 50 deletions(-) diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 94cebf93de..8415d5dad5 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -133,6 +133,84 @@ impl TryFrom for ParsedSpec { } } +/// Create special neon_superuser role, that's a slightly nerfed version of a real superuser +/// that we give to customers +fn create_neon_superuser(spec: &ComputeSpec, client: &mut Client) -> Result<()> { + let roles = spec + .cluster + .roles + .iter() + .map(|r| format!("'{}'", escape_literal(&r.name))) + .collect::>(); + + let dbs = spec + .cluster + .databases + .iter() + .map(|db| format!("'{}'", escape_literal(&db.name))) + .collect::>(); + + let roles_decl = if roles.is_empty() { + String::from("roles text[] := NULL;") + } else { + format!( + r#" + roles text[] := ARRAY(SELECT rolname + FROM pg_catalog.pg_roles + WHERE rolname IN ({}));"#, + roles.join(", ") + ) + }; + + let database_decl = if dbs.is_empty() { + String::from("dbs text[] := NULL;") + } else { + format!( + r#" + dbs text[] := ARRAY(SELECT datname + FROM pg_catalog.pg_database + WHERE datname IN ({}));"#, + dbs.join(", ") + ) + }; + + // ALL PRIVILEGES grants CREATE, CONNECT, and TEMPORARY on all databases + // (see https://www.postgresql.org/docs/current/ddl-priv.html) + let query = format!( + r#" + DO $$ + DECLARE + r text; + {} + {} + BEGIN + IF NOT EXISTS ( + SELECT FROM pg_catalog.pg_roles WHERE rolname = 'neon_superuser') + THEN + CREATE ROLE neon_superuser CREATEDB CREATEROLE NOLOGIN IN ROLE pg_read_all_data, pg_write_all_data; + IF array_length(roles, 1) IS NOT NULL THEN + EXECUTE format('GRANT neon_superuser TO %s', + array_to_string(roles, ', ')); + FOREACH r IN ARRAY roles LOOP + EXECUTE format('ALTER ROLE %s CREATEROLE CREATEDB', r); + END LOOP; + END IF; + IF array_length(dbs, 1) IS NOT NULL THEN + EXECUTE format('GRANT ALL PRIVILEGES ON DATABASE %s TO neon_superuser', + array_to_string(dbs, ', ')); + END IF; + END IF; + END + $$;"#, + roles_decl, database_decl, + ); + info!("Neon superuser created:\n{}", &query); + client + .simple_query(&query) + .map_err(|e| anyhow::anyhow!(e).context(query))?; + Ok(()) +} + impl ComputeNode { pub fn set_status(&self, status: ComputeStatus) { let mut state = self.state.lock().unwrap(); @@ -347,6 +425,8 @@ impl ComputeNode { .map_err(|_| anyhow::anyhow!("invalid connstr"))?; let mut client = Client::connect(zenith_admin_connstr.as_str(), NoTls)?; + // Disable forwarding so that users don't get a cloud_admin role + client.simple_query("SET neon.forward_ddl = false")?; client.simple_query("CREATE USER cloud_admin WITH SUPERUSER")?; client.simple_query("GRANT zenith_admin TO cloud_admin")?; drop(client); @@ -357,14 +437,16 @@ impl ComputeNode { Ok(client) => client, }; - // Proceed with post-startup configuration. Note, that order of operations is important. // Disable DDL forwarding because control plane already knows about these roles/databases. client.simple_query("SET neon.forward_ddl = false")?; + + // Proceed with post-startup configuration. Note, that order of operations is important. let spec = &compute_state.pspec.as_ref().expect("spec must be set").spec; + create_neon_superuser(spec, &mut client)?; handle_roles(spec, &mut client)?; handle_databases(spec, &mut client)?; handle_role_deletions(spec, self.connstr.as_str(), &mut client)?; - handle_grants(spec, self.connstr.as_str(), &mut client)?; + handle_grants(spec, self.connstr.as_str())?; handle_extensions(spec, &mut client)?; // 'Close' connection @@ -402,7 +484,7 @@ impl ComputeNode { handle_roles(&spec, &mut client)?; handle_databases(&spec, &mut client)?; handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?; - handle_grants(&spec, self.connstr.as_str(), &mut client)?; + handle_grants(&spec, self.connstr.as_str())?; handle_extensions(&spec, &mut client)?; } diff --git a/compute_tools/src/pg_helpers.rs b/compute_tools/src/pg_helpers.rs index d5c845e9ea..b76ed1fd85 100644 --- a/compute_tools/src/pg_helpers.rs +++ b/compute_tools/src/pg_helpers.rs @@ -17,7 +17,7 @@ use compute_api::spec::{Database, GenericOption, GenericOptions, PgIdent, Role}; const POSTGRES_WAIT_TIMEOUT: Duration = Duration::from_millis(60 * 1000); // milliseconds /// Escape a string for including it in a SQL literal -fn escape_literal(s: &str) -> String { +pub fn escape_literal(s: &str) -> String { s.replace('\'', "''").replace('\\', "\\\\") } diff --git a/compute_tools/src/spec.rs b/compute_tools/src/spec.rs index a2a19ae0da..520696da00 100644 --- a/compute_tools/src/spec.rs +++ b/compute_tools/src/spec.rs @@ -269,17 +269,13 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> { xact.execute(query.as_str(), &[])?; } RoleAction::Create => { - let mut query: String = format!("CREATE ROLE {} ", name.pg_quote()); + let mut query: String = format!( + "CREATE ROLE {} CREATEROLE CREATEDB IN ROLE neon_superuser", + name.pg_quote() + ); info!("role create query: '{}'", &query); query.push_str(&role.to_pg_options()); xact.execute(query.as_str(), &[])?; - - let grant_query = format!( - "GRANT pg_read_all_data, pg_write_all_data TO {}", - name.pg_quote() - ); - xact.execute(grant_query.as_str(), &[])?; - info!("role grant query: '{}'", &grant_query); } } @@ -476,6 +472,11 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> { query.push_str(&db.to_pg_options()); let _guard = info_span!("executing", query).entered(); client.execute(query.as_str(), &[])?; + let grant_query: String = format!( + "GRANT ALL PRIVILEGES ON DATABASE {} TO neon_superuser", + name.pg_quote() + ); + client.execute(grant_query.as_str(), &[])?; } }; @@ -495,35 +496,9 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> { /// Grant CREATE ON DATABASE to the database owner and do some other alters and grants /// to allow users creating trusted extensions and re-creating `public` schema, for example. #[instrument(skip_all)] -pub fn handle_grants(spec: &ComputeSpec, connstr: &str, client: &mut Client) -> Result<()> { +pub fn handle_grants(spec: &ComputeSpec, connstr: &str) -> Result<()> { info!("cluster spec grants:"); - // We now have a separate `web_access` role to connect to the database - // via the web interface and proxy link auth. And also we grant a - // read / write all data privilege to every role. So also grant - // create to everyone. - // XXX: later we should stop messing with Postgres ACL in such horrible - // ways. - let roles = spec - .cluster - .roles - .iter() - .map(|r| r.name.pg_quote()) - .collect::>(); - - for db in &spec.cluster.databases { - let dbname = &db.name; - - let query: String = format!( - "GRANT CREATE ON DATABASE {} TO {}", - dbname.pg_quote(), - roles.join(", ") - ); - info!("grant query {}", &query); - - client.execute(query.as_str(), &[])?; - } - // Do some per-database access adjustments. We'd better do this at db creation time, // but CREATE DATABASE isn't transactional. So we cannot create db + do some grants // atomically. diff --git a/test_runner/regress/test_compatibility.py b/test_runner/regress/test_compatibility.py index 61f86dc3ce..51e7b01eba 100644 --- a/test_runner/regress/test_compatibility.py +++ b/test_runner/regress/test_compatibility.py @@ -2,6 +2,7 @@ import copy import os import shutil import subprocess +import tempfile from pathlib import Path from typing import Any, Optional @@ -448,7 +449,7 @@ def dump_differs(first: Path, second: Path, output: Path) -> bool: """ with output.open("w") as stdout: - rv = subprocess.run( + res = subprocess.run( [ "diff", "--unified", # Make diff output more readable @@ -460,4 +461,53 @@ def dump_differs(first: Path, second: Path, output: Path) -> bool: stdout=stdout, ) - return rv.returncode != 0 + differs = res.returncode != 0 + + # TODO: Remove after https://github.com/neondatabase/neon/pull/4425 is merged, and a couple of releases are made + if differs: + with tempfile.NamedTemporaryFile(mode="w") as tmp: + tmp.write(PR4425_ALLOWED_DIFF) + tmp.flush() + + allowed = subprocess.run( + [ + "diff", + "--unified", # Make diff output more readable + r"--ignore-matching-lines=^---", # Ignore diff headers + r"--ignore-matching-lines=^\+\+\+", # Ignore diff headers + "--ignore-matching-lines=^@@", # Ignore diff blocks location + "--ignore-matching-lines=^ *$", # Ignore lines with only spaces + "--ignore-matching-lines=^ --.*", # Ignore the " --" lines for compatibility with PG14 + "--ignore-blank-lines", + str(output), + str(tmp.name), + ], + ) + + differs = allowed.returncode != 0 + + return differs + + +PR4425_ALLOWED_DIFF = """ +--- /tmp/test_output/test_backward_compatibility[release-pg15]/compatibility_snapshot/dump.sql 2023-06-08 18:12:45.000000000 +0000 ++++ /tmp/test_output/test_backward_compatibility[release-pg15]/dump.sql 2023-06-13 07:25:35.211733653 +0000 +@@ -13,12 +13,20 @@ + + CREATE ROLE cloud_admin; + ALTER ROLE cloud_admin WITH SUPERUSER INHERIT CREATEROLE CREATEDB LOGIN REPLICATION BYPASSRLS; ++CREATE ROLE neon_superuser; ++ALTER ROLE neon_superuser WITH NOSUPERUSER INHERIT CREATEROLE CREATEDB NOLOGIN NOREPLICATION NOBYPASSRLS; + + -- + -- User Configurations + -- + + ++-- ++-- Role memberships ++-- ++ ++GRANT pg_read_all_data TO neon_superuser GRANTED BY cloud_admin; ++GRANT pg_write_all_data TO neon_superuser GRANTED BY cloud_admin; +""" diff --git a/test_runner/regress/test_hot_standby.py b/test_runner/regress/test_hot_standby.py index 12e034cea2..6b003ce356 100644 --- a/test_runner/regress/test_hot_standby.py +++ b/test_runner/regress/test_hot_standby.py @@ -1,3 +1,5 @@ +import time + import pytest from fixtures.neon_fixtures import NeonEnv @@ -10,9 +12,10 @@ def test_hot_standby(neon_simple_env: NeonEnv): branch_name="main", endpoint_id="primary", ) as primary: + time.sleep(1) with env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") as secondary: primary_lsn = None - cought_up = False + caught_up = False queries = [ "SHOW neon.timeline_id", "SHOW neon.tenant_id", @@ -56,7 +59,7 @@ def test_hot_standby(neon_simple_env: NeonEnv): res = s_cur.fetchone() assert res is not None - while not cought_up: + while not caught_up: with s_con.cursor() as secondary_cursor: secondary_cursor.execute("SELECT pg_last_wal_replay_lsn()") res = secondary_cursor.fetchone() @@ -66,7 +69,7 @@ def test_hot_standby(neon_simple_env: NeonEnv): # due to e.g. autovacuum, but that shouldn't impact the content # of the tables, so we check whether we've replayed up to at # least after the commit of the `test` table. - cought_up = secondary_lsn >= primary_lsn + caught_up = secondary_lsn >= primary_lsn # Explicit commit to flush any transient transaction-level state. s_con.commit() diff --git a/test_runner/regress/test_tenant_size.py b/test_runner/regress/test_tenant_size.py index a0f9f854ed..0ebe606066 100644 --- a/test_runner/regress/test_tenant_size.py +++ b/test_runner/regress/test_tenant_size.py @@ -44,12 +44,16 @@ def test_empty_tenant_size(neon_simple_env: NeonEnv, test_output_dir: Path): # we've disabled the autovacuum and checkpoint # so background processes should not change the size. # If this test will flake we should probably loosen the check - assert size == initial_size, "starting idle compute should not change the tenant size" + assert ( + size == initial_size + ), f"starting idle compute should not change the tenant size (Currently {size}, expected {initial_size})" # the size should be the same, until we increase the size over the # gc_horizon size, inputs = http_client.tenant_size_and_modelinputs(tenant_id) - assert size == initial_size, "tenant_size should not be affected by shutdown of compute" + assert ( + size == initial_size + ), f"tenant_size should not be affected by shutdown of compute (Currently {size}, expected {initial_size})" expected_inputs = { "segments": [ @@ -333,13 +337,13 @@ def test_single_branch_get_tenant_size_grows( # inserts is larger than gc_horizon. for example 0x20000 here hid the fact # that there next_gc_cutoff could be smaller than initdb_lsn, which will # obviously lead to issues when calculating the size. - gc_horizon = 0x38000 + gc_horizon = 0x3BA00 # it's a bit of a hack, but different versions of postgres have different # amount of WAL generated for the same amount of data. so we need to # adjust the gc_horizon accordingly. if pg_version == PgVersion.V14: - gc_horizon = 0x40000 + gc_horizon = 0x4A000 neon_env_builder.pageserver_config_override = f"tenant_config={{compaction_period='0s', gc_period='0s', pitr_interval='0sec', gc_horizon={gc_horizon}}}" @@ -360,11 +364,11 @@ def test_single_branch_get_tenant_size_grows( if current_lsn - initdb_lsn >= gc_horizon: assert ( size >= prev_size - ), "tenant_size may grow or not grow, because we only add gc_horizon amount of WAL to initial snapshot size" + ), f"tenant_size may grow or not grow, because we only add gc_horizon amount of WAL to initial snapshot size (Currently at: {current_lsn}, Init at: {initdb_lsn})" else: assert ( size > prev_size - ), "tenant_size should grow, because we continue to add WAL to initial snapshot size" + ), f"tenant_size should grow, because we continue to add WAL to initial snapshot size (Currently at: {current_lsn}, Init at: {initdb_lsn})" def get_current_consistent_size( env: NeonEnv, From c215389f1cf37c77f7496a188792f0f7d117e582 Mon Sep 17 00:00:00 2001 From: Sasha Krassovsky Date: Sat, 24 Jun 2023 00:34:15 -0700 Subject: [PATCH 10/12] quote_ident identifiers when creating neon_superuser (#4562) ## Problem --- compute_tools/src/compute.rs | 6 +++--- test_runner/regress/test_tenant_size.py | 2 ++ 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 8415d5dad5..6b549e198b 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -190,14 +190,14 @@ fn create_neon_superuser(spec: &ComputeSpec, client: &mut Client) -> Result<()> CREATE ROLE neon_superuser CREATEDB CREATEROLE NOLOGIN IN ROLE pg_read_all_data, pg_write_all_data; IF array_length(roles, 1) IS NOT NULL THEN EXECUTE format('GRANT neon_superuser TO %s', - array_to_string(roles, ', ')); + array_to_string(ARRAY(SELECT quote_ident(x) FROM unnest(roles) as x), ', ')); FOREACH r IN ARRAY roles LOOP - EXECUTE format('ALTER ROLE %s CREATEROLE CREATEDB', r); + EXECUTE format('ALTER ROLE %s CREATEROLE CREATEDB', quote_ident(r)); END LOOP; END IF; IF array_length(dbs, 1) IS NOT NULL THEN EXECUTE format('GRANT ALL PRIVILEGES ON DATABASE %s TO neon_superuser', - array_to_string(dbs, ', ')); + array_to_string(ARRAY(SELECT quote_ident(x) FROM unnest(dbs) as x), ', ')); END IF; END IF; END diff --git a/test_runner/regress/test_tenant_size.py b/test_runner/regress/test_tenant_size.py index 0ebe606066..25c6634108 100644 --- a/test_runner/regress/test_tenant_size.py +++ b/test_runner/regress/test_tenant_size.py @@ -16,6 +16,7 @@ from fixtures.pg_version import PgVersion, xfail_on_postgres from fixtures.types import Lsn, TenantId, TimelineId +@pytest.mark.xfail def test_empty_tenant_size(neon_simple_env: NeonEnv, test_output_dir: Path): env = neon_simple_env (tenant_id, _) = env.neon_cli.create_tenant() @@ -322,6 +323,7 @@ def test_only_heads_within_horizon(neon_simple_env: NeonEnv, test_output_dir: Pa size_debug_file.write(size_debug) +@pytest.mark.xfail def test_single_branch_get_tenant_size_grows( neon_env_builder: NeonEnvBuilder, test_output_dir: Path, pg_version: PgVersion ): From 44a441080d6d3b10933f7ec7f7b69a17562924d5 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 26 Jun 2023 11:42:17 +0200 Subject: [PATCH 11/12] bring back spawn_blocking for `compact_level0_phase1` (#4537) The stats for `compact_level0_phase` that I added in #4527 show the following breakdown (24h data from prod, only looking at compactions with > 1 L1 produced): * 10%ish of wall-clock time spent between the two read locks * I learned that the `DeltaLayer::iter()` and `DeltaLayer::key_iter()` calls actually do IO, even before we call `.next()`. I suspect that is why they take so much time between the locks. * 80+% of wall-clock time spent writing layer files * Lock acquisition time is irrelevant (low double-digit microseconds at most) * The generation of the holes holds the read lock for a relatively long time and it's proportional to the amount of keys / IO required to iterate over them (max: 110ms in prod; staging (nightly benchmarks): multiple seconds). Find below screenshots from my ad-hoc spreadsheet + some graphs. image image image ## Changes In This PR This PR makes the following changes: * rearrange the `compact_level0_phase1` code such that we build the `all_keys_iter` and `all_values_iter` later than before * only grab the `Timeline::layers` lock once, and hold it until we've computed the holes * run compact_level0_phase1 in spawn_blocking, pre-grabbing the `Timeline::layers` lock in the async code and passing it in as an `OwnedRwLockReadGuard`. * the code inside spawn_blocking drops this guard after computing the holds * the `OwnedRwLockReadGuard` requires the `Timeline::layers` to be wrapped in an `Arc`. I think that's Ok, the locking for the RwLock is more heavy-weight than an additional pointer indirection. ## Alternatives Considered The naive alternative is to throw the entire function into `spawn_blocking`, and use `blocking_read` for `Timeline::layers` access. What I've done in this PR is better because, with this alternative, 1. while we `blocking_read()`, we'd waste one slot in the spawn_blocking pool 2. there's deadlock risk because the spawn_blocking pool is a finite resource ![image](https://github.com/neondatabase/neon/assets/956573/46c419f1-6695-467e-b315-9d1fc0949058) ## Metadata Fixes https://github.com/neondatabase/neon/issues/4492 --- pageserver/src/tenant/timeline.rs | 262 ++++++++++++++++-------------- 1 file changed, 139 insertions(+), 123 deletions(-) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 122331ac19..060000a01a 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -129,7 +129,7 @@ pub struct Timeline { pub pg_version: u32, - pub(crate) layers: tokio::sync::RwLock>, + pub(crate) layers: Arc>>, /// Set of key ranges which should be covered by image layers to /// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored. @@ -1418,7 +1418,7 @@ impl Timeline { timeline_id, tenant_id, pg_version, - layers: tokio::sync::RwLock::new(LayerMap::default()), + layers: Arc::new(tokio::sync::RwLock::new(LayerMap::default())), wanted_image_layers: Mutex::new(None), walredo_mgr, @@ -3370,14 +3370,14 @@ struct CompactLevel0Phase1StatsBuilder { version: Option, tenant_id: Option, timeline_id: Option, - first_read_lock_acquisition_micros: DurationRecorder, - get_level0_deltas_plus_drop_lock_micros: DurationRecorder, - level0_deltas_count: Option, - time_spent_between_locks: DurationRecorder, - second_read_lock_acquisition_micros: DurationRecorder, - second_read_lock_held_micros: DurationRecorder, - sort_holes_micros: DurationRecorder, + read_lock_acquisition_micros: DurationRecorder, + read_lock_held_spawn_blocking_startup_micros: DurationRecorder, + read_lock_held_prerequisites_micros: DurationRecorder, + read_lock_held_compute_holes_micros: DurationRecorder, + read_lock_drop_micros: DurationRecorder, + prepare_iterators_micros: DurationRecorder, write_layer_files_micros: DurationRecorder, + level0_deltas_count: Option, new_deltas_count: Option, new_deltas_size: Option, } @@ -3390,14 +3390,14 @@ struct CompactLevel0Phase1Stats { tenant_id: TenantId, #[serde_as(as = "serde_with::DisplayFromStr")] timeline_id: TimelineId, - first_read_lock_acquisition_micros: RecordedDuration, - get_level0_deltas_plus_drop_lock_micros: RecordedDuration, - level0_deltas_count: usize, - time_spent_between_locks: RecordedDuration, - second_read_lock_acquisition_micros: RecordedDuration, - second_read_lock_held_micros: RecordedDuration, - sort_holes_micros: RecordedDuration, + read_lock_acquisition_micros: RecordedDuration, + read_lock_held_spawn_blocking_startup_micros: RecordedDuration, + read_lock_held_prerequisites_micros: RecordedDuration, + read_lock_held_compute_holes_micros: RecordedDuration, + read_lock_drop_micros: RecordedDuration, + prepare_iterators_micros: RecordedDuration, write_layer_files_micros: RecordedDuration, + level0_deltas_count: usize, new_deltas_count: usize, new_deltas_size: u64, } @@ -3406,54 +3406,51 @@ impl TryFrom for CompactLevel0Phase1Stats { type Error = anyhow::Error; fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result { - let CompactLevel0Phase1StatsBuilder { - version, - tenant_id, - timeline_id, - first_read_lock_acquisition_micros, - get_level0_deltas_plus_drop_lock_micros, - level0_deltas_count, - time_spent_between_locks, - second_read_lock_acquisition_micros, - second_read_lock_held_micros, - sort_holes_micros, - write_layer_files_micros, - new_deltas_count, - new_deltas_size, - } = value; - Ok(CompactLevel0Phase1Stats { - version: version.ok_or_else(|| anyhow::anyhow!("version not set"))?, - tenant_id: tenant_id.ok_or_else(|| anyhow::anyhow!("tenant_id not set"))?, - timeline_id: timeline_id.ok_or_else(|| anyhow::anyhow!("timeline_id not set"))?, - first_read_lock_acquisition_micros: first_read_lock_acquisition_micros + Ok(Self { + version: value.version.ok_or_else(|| anyhow!("version not set"))?, + tenant_id: value + .tenant_id + .ok_or_else(|| anyhow!("tenant_id not set"))?, + timeline_id: value + .timeline_id + .ok_or_else(|| anyhow!("timeline_id not set"))?, + read_lock_acquisition_micros: value + .read_lock_acquisition_micros .into_recorded() - .ok_or_else(|| anyhow::anyhow!("first_read_lock_acquisition_micros not set"))?, - get_level0_deltas_plus_drop_lock_micros: get_level0_deltas_plus_drop_lock_micros + .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?, + read_lock_held_spawn_blocking_startup_micros: value + .read_lock_held_spawn_blocking_startup_micros .into_recorded() - .ok_or_else(|| { - anyhow::anyhow!("get_level0_deltas_plus_drop_lock_micros not set") - })?, - level0_deltas_count: level0_deltas_count - .ok_or_else(|| anyhow::anyhow!("level0_deltas_count not set"))?, - time_spent_between_locks: time_spent_between_locks + .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?, + read_lock_held_prerequisites_micros: value + .read_lock_held_prerequisites_micros .into_recorded() - .ok_or_else(|| anyhow::anyhow!("time_spent_between_locks not set"))?, - second_read_lock_acquisition_micros: second_read_lock_acquisition_micros + .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?, + read_lock_held_compute_holes_micros: value + .read_lock_held_compute_holes_micros .into_recorded() - .ok_or_else(|| anyhow::anyhow!("second_read_lock_acquisition_micros not set"))?, - second_read_lock_held_micros: second_read_lock_held_micros + .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?, + read_lock_drop_micros: value + .read_lock_drop_micros .into_recorded() - .ok_or_else(|| anyhow::anyhow!("second_read_lock_held_micros not set"))?, - sort_holes_micros: sort_holes_micros + .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?, + prepare_iterators_micros: value + .prepare_iterators_micros .into_recorded() - .ok_or_else(|| anyhow::anyhow!("sort_holes_micros not set"))?, - write_layer_files_micros: write_layer_files_micros + .ok_or_else(|| anyhow!("prepare_iterators_micros not set"))?, + write_layer_files_micros: value + .write_layer_files_micros .into_recorded() - .ok_or_else(|| anyhow::anyhow!("write_layer_files_micros not set"))?, - new_deltas_count: new_deltas_count - .ok_or_else(|| anyhow::anyhow!("new_deltas_count not set"))?, - new_deltas_size: new_deltas_size - .ok_or_else(|| anyhow::anyhow!("new_deltas_size not set"))?, + .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?, + level0_deltas_count: value + .level0_deltas_count + .ok_or_else(|| anyhow!("level0_deltas_count not set"))?, + new_deltas_count: value + .new_deltas_count + .ok_or_else(|| anyhow!("new_deltas_count not set"))?, + new_deltas_size: value + .new_deltas_size + .ok_or_else(|| anyhow!("new_deltas_size not set"))?, }) } } @@ -3464,30 +3461,18 @@ impl Timeline { /// This method takes the `_layer_removal_cs` guard to highlight it required downloads are /// returned as an error. If the `layer_removal_cs` boundary is changed not to be taken in the /// start of level0 files compaction, the on-demand download should be revisited as well. - async fn compact_level0_phase1( - &self, + fn compact_level0_phase1( + self: Arc, _layer_removal_cs: Arc>, + layers: tokio::sync::OwnedRwLockReadGuard>, + mut stats: CompactLevel0Phase1StatsBuilder, target_file_size: u64, ctx: &RequestContext, ) -> Result { - let mut stats = CompactLevel0Phase1StatsBuilder { - version: Some(1), - tenant_id: Some(self.tenant_id), - timeline_id: Some(self.timeline_id), - ..Default::default() - }; - - let begin = tokio::time::Instant::now(); - let layers = self.layers.read().await; - let now = tokio::time::Instant::now(); - stats.first_read_lock_acquisition_micros = - DurationRecorder::Recorded(RecordedDuration(now - begin), now); + stats.read_lock_held_spawn_blocking_startup_micros = + stats.read_lock_acquisition_micros.till_now(); // set by caller let mut level0_deltas = layers.get_level0_deltas()?; - drop(layers); stats.level0_deltas_count = Some(level0_deltas.len()); - stats.get_level0_deltas_plus_drop_lock_micros = - stats.first_read_lock_acquisition_micros.till_now(); - // Only compact if enough layers have accumulated. let threshold = self.get_compaction_threshold(); if level0_deltas.is_empty() || level0_deltas.len() < threshold { @@ -3565,6 +3550,53 @@ impl Timeline { // we don't accidentally use it later in the function. drop(level0_deltas); + stats.read_lock_held_prerequisites_micros = stats + .read_lock_held_spawn_blocking_startup_micros + .till_now(); + + // Determine N largest holes where N is number of compacted layers. + let max_holes = deltas_to_compact.len(); + let last_record_lsn = self.get_last_record_lsn(); + let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128; + let min_hole_coverage_size = 3; // TODO: something more flexible? + + // min-heap (reserve space for one more element added before eviction) + let mut heap: BinaryHeap = BinaryHeap::with_capacity(max_holes + 1); + let mut prev: Option = None; + for (next_key, _next_lsn, _size) in itertools::process_results( + deltas_to_compact.iter().map(|l| l.key_iter(ctx)), + |iter_iter| iter_iter.kmerge_by(|a, b| a.0 <= b.0), + )? { + if let Some(prev_key) = prev { + // just first fast filter + if next_key.to_i128() - prev_key.to_i128() >= min_hole_range { + let key_range = prev_key..next_key; + // Measuring hole by just subtraction of i128 representation of key range boundaries + // has not so much sense, because largest holes will corresponds field1/field2 changes. + // But we are mostly interested to eliminate holes which cause generation of excessive image layers. + // That is why it is better to measure size of hole as number of covering image layers. + let coverage_size = layers.image_coverage(&key_range, last_record_lsn)?.len(); + if coverage_size >= min_hole_coverage_size { + heap.push(Hole { + key_range, + coverage_size, + }); + if heap.len() > max_holes { + heap.pop(); // remove smallest hole + } + } + } + } + prev = Some(next_key.next()); + } + stats.read_lock_held_compute_holes_micros = + stats.read_lock_held_prerequisites_micros.till_now(); + drop(layers); + stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now(); + let mut holes = heap.into_vec(); + holes.sort_unstable_by_key(|hole| hole.key_range.start); + let mut next_hole = 0; // index of next hole in holes vector + // This iterator walks through all key-value pairs from all the layers // we're compacting, in key, LSN order. let all_values_iter = itertools::process_results( @@ -3604,50 +3636,7 @@ impl Timeline { }, )?; - // Determine N largest holes where N is number of compacted layers. - let max_holes = deltas_to_compact.len(); - let last_record_lsn = self.get_last_record_lsn(); - stats.time_spent_between_locks = stats.get_level0_deltas_plus_drop_lock_micros.till_now(); - let layers = self.layers.read().await; // Is'n it better to hold original layers lock till here? - stats.second_read_lock_acquisition_micros = stats.time_spent_between_locks.till_now(); - let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128; - let min_hole_coverage_size = 3; // TODO: something more flexible? - - // min-heap (reserve space for one more element added before eviction) - let mut heap: BinaryHeap = BinaryHeap::with_capacity(max_holes + 1); - let mut prev: Option = None; - for (next_key, _next_lsn, _size) in itertools::process_results( - deltas_to_compact.iter().map(|l| l.key_iter(ctx)), - |iter_iter| iter_iter.kmerge_by(|a, b| a.0 <= b.0), - )? { - if let Some(prev_key) = prev { - // just first fast filter - if next_key.to_i128() - prev_key.to_i128() >= min_hole_range { - let key_range = prev_key..next_key; - // Measuring hole by just subtraction of i128 representation of key range boundaries - // has not so much sense, because largest holes will corresponds field1/field2 changes. - // But we are mostly interested to eliminate holes which cause generation of excessive image layers. - // That is why it is better to measure size of hole as number of covering image layers. - let coverage_size = layers.image_coverage(&key_range, last_record_lsn)?.len(); - if coverage_size >= min_hole_coverage_size { - heap.push(Hole { - key_range, - coverage_size, - }); - if heap.len() > max_holes { - heap.pop(); // remove smallest hole - } - } - } - } - prev = Some(next_key.next()); - } - drop(layers); - stats.second_read_lock_held_micros = stats.second_read_lock_acquisition_micros.till_now(); - let mut holes = heap.into_vec(); - holes.sort_unstable_by_key(|hole| hole.key_range.start); - let mut next_hole = 0; // index of next hole in holes vector - stats.sort_holes_micros = stats.second_read_lock_held_micros.till_now(); + stats.prepare_iterators_micros = stats.read_lock_drop_micros.till_now(); // Merge the contents of all the input delta layers into a new set // of delta layers, based on the current partitioning. @@ -3807,7 +3796,7 @@ impl Timeline { layer_paths.pop().unwrap(); } - stats.write_layer_files_micros = stats.sort_holes_micros.till_now(); + stats.write_layer_files_micros = stats.prepare_iterators_micros.till_now(); stats.new_deltas_count = Some(new_layers.len()); stats.new_deltas_size = Some(new_layers.iter().map(|l| l.desc.file_size).sum()); @@ -3846,9 +3835,36 @@ impl Timeline { let CompactLevel0Phase1Result { new_layers, deltas_to_compact, - } = self - .compact_level0_phase1(layer_removal_cs.clone(), target_file_size, ctx) - .await?; + } = { + let phase1_span = info_span!("compact_level0_phase1"); + let myself = Arc::clone(self); + let ctx = ctx.attached_child(); // technically, the spawn_blocking can outlive this future + let mut stats = CompactLevel0Phase1StatsBuilder { + version: Some(2), + tenant_id: Some(self.tenant_id), + timeline_id: Some(self.timeline_id), + ..Default::default() + }; + + let begin = tokio::time::Instant::now(); + let phase1_layers_locked = Arc::clone(&self.layers).read_owned().await; + let now = tokio::time::Instant::now(); + stats.read_lock_acquisition_micros = + DurationRecorder::Recorded(RecordedDuration(now - begin), now); + let layer_removal_cs = layer_removal_cs.clone(); + tokio::task::spawn_blocking(move || { + let _entered = phase1_span.enter(); + myself.compact_level0_phase1( + layer_removal_cs, + phase1_layers_locked, + stats, + target_file_size, + &ctx, + ) + }) + .await + .context("spawn_blocking")?? + }; if new_layers.is_empty() && deltas_to_compact.is_empty() { // nothing to do From 1faf69a698c7dd1392a0a48a201e3d1fbc582c30 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 26 Jun 2023 11:43:11 +0200 Subject: [PATCH 12/12] run `Layer::get_value_reconstruct_data` in `spawn_blocking` (#4498) This PR concludes the "async `Layer::get_value_reconstruct_data`" project. The problem we're solving is that, before this patch, we'd execute `Layer::get_value_reconstruct_data` on the tokio executor threads. This function is IO- and/or CPU-intensive. The IO is using VirtualFile / std::fs; hence it's blocking. This results in unfairness towards other tokio tasks, especially under (disk) load. Some context can be found at https://github.com/neondatabase/neon/issues/4154 where I suspect (but can't prove) load spikes of logical size calculation to cause heavy eviction skew. Sadly we don't have tokio runtime/scheduler metrics to quantify the unfairness. But generally, we know blocking the executor threads on std::fs IO is bad. So, let's have this change and watch out for severe perf regressions in staging & during rollout. ## Changes * rename `Layer::get_value_reconstruct_data` to `Layer::get_value_reconstruct_data_blocking` * add a new blanket impl'd `Layer::get_value_reconstruct_data` `async_trait` method that runs `get_value_reconstruct_data_blocking` inside `spawn_blocking`. * The `spawn_blocking` requires `'static` lifetime of the captured variables; hence I had to change the data flow to _move_ the `ValueReconstructState` into and back out of get_value_reconstruct_data instead of passing a reference. It's a small struct, so I don't expect a big performance penalty. ## Performance Fundamentally, the code changes cause the following performance-relevant changes: * Latency & allocations: each `get_value_reconstruct_data` call now makes a short-lived allocation because `async_trait` is just sugar for boxed futures under the hood * Latency: `spawn_blocking` adds some latency because it needs to move the work to a thread pool * using `spawn_blocking` plus the existing synchronous code inside is probably more efficient better than switching all the synchronous code to tokio::fs because _each_ tokio::fs call does `spawn_blocking` under the hood. * Throughput: the `spawn_blocking` thread pool is much larger than the async executor thread pool. Hence, as long as the disks can keep up, which they should according to AWS specs, we will be able to deliver higher `get_value_reconstruct_data` throughput. * Disk IOPS utilization: we will see higher disk utilization if we get more throughput. Not a problem because the disks in prod are currently under-utilized, according to node_exporter metrics & the AWS specs. * CPU utilization: at higher throughput, CPU utilization will be higher. Slightly higher latency under regular load is acceptable given the throughput gains and expected better fairness during disk load peaks, such as logical size calculation peaks uncovered in #4154. ## Full Stack Of Preliminary PRs This PR builds on top of the following preliminary PRs 1. Clean-ups * https://github.com/neondatabase/neon/pull/4316 * https://github.com/neondatabase/neon/pull/4317 * https://github.com/neondatabase/neon/pull/4318 * https://github.com/neondatabase/neon/pull/4319 * https://github.com/neondatabase/neon/pull/4321 * Note: these were mostly to find an alternative to #4291, which I thought we'd need in my original plan where we would need to convert `Tenant::timelines` into an async locking primitive (#4333). In reviews, we walked away from that, but these cleanups were still quite useful. 2. https://github.com/neondatabase/neon/pull/4364 3. https://github.com/neondatabase/neon/pull/4472 4. https://github.com/neondatabase/neon/pull/4476 5. https://github.com/neondatabase/neon/pull/4477 6. https://github.com/neondatabase/neon/pull/4485 7. https://github.com/neondatabase/neon/pull/4441 --- pageserver/src/tenant/storage_layer.rs | 51 ++++++++++--- .../src/tenant/storage_layer/delta_layer.rs | 15 ++-- .../src/tenant/storage_layer/image_layer.rs | 15 ++-- .../tenant/storage_layer/inmemory_layer.rs | 15 ++-- .../src/tenant/storage_layer/remote_layer.rs | 13 ++-- pageserver/src/tenant/timeline.rs | 73 ++++++++++++------- 6 files changed, 118 insertions(+), 64 deletions(-) diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 0af3d4ce39..06b90decd2 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -12,7 +12,7 @@ use crate::context::RequestContext; use crate::repository::{Key, Value}; use crate::task_mgr::TaskKind; use crate::walrecord::NeonWalRecord; -use anyhow::Result; +use anyhow::{Context, Result}; use bytes::Bytes; use enum_map::EnumMap; use enumset::EnumSet; @@ -343,7 +343,8 @@ impl LayerAccessStats { /// All layers should implement a minimal `std::fmt::Debug` without tenant or /// timeline names, because those are known in the context of which the layers /// are used in (timeline). -pub trait Layer: std::fmt::Debug + Send + Sync { +#[async_trait::async_trait] +pub trait Layer: std::fmt::Debug + Send + Sync + 'static { /// Range of keys that this layer covers fn get_key_range(&self) -> Range; @@ -373,13 +374,42 @@ pub trait Layer: std::fmt::Debug + Send + Sync { /// is available. If this returns ValueReconstructResult::Continue, look up /// the predecessor layer and call again with the same 'reconstruct_data' to /// collect more data. - fn get_value_reconstruct_data( + fn get_value_reconstruct_data_blocking( &self, key: Key, lsn_range: Range, - reconstruct_data: &mut ValueReconstructState, - ctx: &RequestContext, - ) -> Result; + reconstruct_data: ValueReconstructState, + ctx: RequestContext, + ) -> Result<(ValueReconstructState, ValueReconstructResult)>; + + /// CANCEL SAFETY: if the returned future is dropped, + /// the wrapped closure still run to completion and the return value discarded. + /// For the case of get_value_reconstruct_data, we expect the closure to not + /// have any side effects, as it only attempts to read a layer (and stuff like + /// page cache isn't considered a real side effect). + /// But, ... + /// TRACING: + /// If the returned future is cancelled, the spawn_blocking span can outlive + /// the caller's span. + /// So, technically, we should be using `parent: None` and `follows_from: current` + /// instead. However, in practice, the advantage of maintaining the span stack + /// in logs outweighs the disadvantage of having a dangling span in a case that + /// is not expected to happen because in pageserver we generally don't drop pending futures. + async fn get_value_reconstruct_data( + self: Arc, + key: Key, + lsn_range: Range, + reconstruct_data: ValueReconstructState, + ctx: RequestContext, + ) -> Result<(ValueReconstructState, ValueReconstructResult)> { + let span = tracing::info_span!("get_value_reconstruct_data_spawn_blocking"); + tokio::task::spawn_blocking(move || { + let _enter = span.enter(); + self.get_value_reconstruct_data_blocking(key, lsn_range, reconstruct_data, ctx) + }) + .await + .context("spawn_blocking")? + } /// A short ID string that uniquely identifies the given layer within a [`LayerMap`]. fn short_id(&self) -> String; @@ -499,6 +529,7 @@ impl LayerDescriptor { } } +#[async_trait::async_trait] impl Layer for LayerDescriptor { fn get_key_range(&self) -> Range { self.key.clone() @@ -512,13 +543,13 @@ impl Layer for LayerDescriptor { self.is_incremental } - fn get_value_reconstruct_data( + fn get_value_reconstruct_data_blocking( &self, _key: Key, _lsn_range: Range, - _reconstruct_data: &mut ValueReconstructState, - _ctx: &RequestContext, - ) -> Result { + _reconstruct_data: ValueReconstructState, + _ctx: RequestContext, + ) -> Result<(ValueReconstructState, ValueReconstructResult)> { todo!("This method shouldn't be part of the Layer trait") } diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 6e14663121..cec7a09eff 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -218,6 +218,7 @@ impl std::fmt::Debug for DeltaLayerInner { } } +#[async_trait::async_trait] impl Layer for DeltaLayer { /// debugging function to print out the contents of the layer fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> { @@ -294,13 +295,13 @@ impl Layer for DeltaLayer { Ok(()) } - fn get_value_reconstruct_data( + fn get_value_reconstruct_data_blocking( &self, key: Key, lsn_range: Range, - reconstruct_state: &mut ValueReconstructState, - ctx: &RequestContext, - ) -> anyhow::Result { + mut reconstruct_state: ValueReconstructState, + ctx: RequestContext, + ) -> anyhow::Result<(ValueReconstructState, ValueReconstructResult)> { ensure!(lsn_range.start >= self.desc.lsn_range.start); let mut need_image = true; @@ -308,7 +309,7 @@ impl Layer for DeltaLayer { { // Open the file and lock the metadata in memory - let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?; + let inner = self.load(LayerAccessKind::GetValueReconstructData, &ctx)?; // Scan the page versions backwards, starting from `lsn`. let file = &inner.file; @@ -374,9 +375,9 @@ impl Layer for DeltaLayer { // If an older page image is needed to reconstruct the page, let the // caller know. if need_image { - Ok(ValueReconstructResult::Continue) + Ok((reconstruct_state, ValueReconstructResult::Continue)) } else { - Ok(ValueReconstructResult::Complete) + Ok((reconstruct_state, ValueReconstructResult::Complete)) } } diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 07a16a7de2..6019590db0 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -149,6 +149,7 @@ impl std::fmt::Debug for ImageLayerInner { } } +#[async_trait::async_trait] impl Layer for ImageLayer { /// debugging function to print out the contents of the layer fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> { @@ -181,18 +182,18 @@ impl Layer for ImageLayer { } /// Look up given page in the file - fn get_value_reconstruct_data( + fn get_value_reconstruct_data_blocking( &self, key: Key, lsn_range: Range, - reconstruct_state: &mut ValueReconstructState, - ctx: &RequestContext, - ) -> anyhow::Result { + mut reconstruct_state: ValueReconstructState, + ctx: RequestContext, + ) -> anyhow::Result<(ValueReconstructState, ValueReconstructResult)> { assert!(self.desc.key_range.contains(&key)); assert!(lsn_range.start >= self.lsn); assert!(lsn_range.end >= self.lsn); - let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?; + let inner = self.load(LayerAccessKind::GetValueReconstructData, &ctx)?; let file = inner.file.as_ref().unwrap(); let tree_reader = DiskBtreeReader::new(inner.index_start_blk, inner.index_root_blk, file); @@ -210,9 +211,9 @@ impl Layer for ImageLayer { let value = Bytes::from(blob); reconstruct_state.img = Some((self.lsn, value)); - Ok(ValueReconstructResult::Complete) + Ok((reconstruct_state, ValueReconstructResult::Complete)) } else { - Ok(ValueReconstructResult::Missing) + Ok((reconstruct_state, ValueReconstructResult::Missing)) } } diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 78bcfdafc0..4efd032ba9 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -110,6 +110,7 @@ impl InMemoryLayer { } } +#[async_trait::async_trait] impl Layer for InMemoryLayer { fn get_key_range(&self) -> Range { Key::MIN..Key::MAX @@ -190,13 +191,13 @@ impl Layer for InMemoryLayer { } /// Look up given value in the layer. - fn get_value_reconstruct_data( + fn get_value_reconstruct_data_blocking( &self, key: Key, lsn_range: Range, - reconstruct_state: &mut ValueReconstructState, - _ctx: &RequestContext, - ) -> anyhow::Result { + mut reconstruct_state: ValueReconstructState, + _ctx: RequestContext, + ) -> anyhow::Result<(ValueReconstructState, ValueReconstructResult)> { ensure!(lsn_range.start >= self.start_lsn); let mut need_image = true; @@ -213,7 +214,7 @@ impl Layer for InMemoryLayer { match value { Value::Image(img) => { reconstruct_state.img = Some((*entry_lsn, img)); - return Ok(ValueReconstructResult::Complete); + return Ok((reconstruct_state, ValueReconstructResult::Complete)); } Value::WalRecord(rec) => { let will_init = rec.will_init(); @@ -233,9 +234,9 @@ impl Layer for InMemoryLayer { // If an older page image is needed to reconstruct the page, let the // caller know. if need_image { - Ok(ValueReconstructResult::Continue) + Ok((reconstruct_state, ValueReconstructResult::Continue)) } else { - Ok(ValueReconstructResult::Complete) + Ok((reconstruct_state, ValueReconstructResult::Complete)) } } } diff --git a/pageserver/src/tenant/storage_layer/remote_layer.rs b/pageserver/src/tenant/storage_layer/remote_layer.rs index 387bae5b1f..a62334689a 100644 --- a/pageserver/src/tenant/storage_layer/remote_layer.rs +++ b/pageserver/src/tenant/storage_layer/remote_layer.rs @@ -6,7 +6,7 @@ use crate::context::RequestContext; use crate::repository::Key; use crate::tenant::layer_map::BatchedUpdates; use crate::tenant::remote_timeline_client::index::LayerFileMetadata; -use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState}; +use crate::tenant::storage_layer::{Layer, ValueReconstructState}; use anyhow::{bail, Result}; use pageserver_api::models::HistoricLayerInfo; use std::ops::Range; @@ -21,7 +21,7 @@ use utils::{ use super::filename::{DeltaFileName, ImageFileName}; use super::{ DeltaLayer, ImageLayer, LayerAccessStats, LayerAccessStatsReset, LayerIter, LayerKeyIter, - LayerResidenceStatus, PersistentLayer, PersistentLayerDesc, + LayerResidenceStatus, PersistentLayer, PersistentLayerDesc, ValueReconstructResult, }; /// RemoteLayer is a not yet downloaded [`ImageLayer`] or @@ -63,14 +63,15 @@ impl std::fmt::Debug for RemoteLayer { } } +#[async_trait::async_trait] impl Layer for RemoteLayer { - fn get_value_reconstruct_data( + fn get_value_reconstruct_data_blocking( &self, _key: Key, _lsn_range: Range, - _reconstruct_state: &mut ValueReconstructState, - _ctx: &RequestContext, - ) -> Result { + _reconstruct_state: ValueReconstructState, + _ctx: RequestContext, + ) -> Result<(ValueReconstructState, ValueReconstructResult)> { bail!( "layer {} needs to be downloaded", self.filename().file_name() diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 060000a01a..447c09db76 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -555,13 +555,14 @@ impl Timeline { None => None, }; - let mut reconstruct_state = ValueReconstructState { + let reconstruct_state = ValueReconstructState { records: Vec::new(), img: cached_page_img, }; let timer = self.metrics.get_reconstruct_data_time_histo.start_timer(); - self.get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx) + let reconstruct_state = self + .get_reconstruct_data(key, lsn, reconstruct_state, ctx) .await?; timer.stop_and_record(); @@ -2352,9 +2353,9 @@ impl Timeline { &self, key: Key, request_lsn: Lsn, - reconstruct_state: &mut ValueReconstructState, + mut reconstruct_state: ValueReconstructState, ctx: &RequestContext, - ) -> Result<(), PageReconstructError> { + ) -> Result { // Start from the current timeline. let mut timeline_owned; let mut timeline = self; @@ -2384,12 +2385,12 @@ impl Timeline { // The function should have updated 'state' //info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn); match result { - ValueReconstructResult::Complete => return Ok(()), + ValueReconstructResult::Complete => return Ok(reconstruct_state), ValueReconstructResult::Continue => { // If we reached an earlier cached page image, we're done. if cont_lsn == cached_lsn + 1 { MATERIALIZED_PAGE_CACHE_HIT.inc_by(1); - return Ok(()); + return Ok(reconstruct_state); } if prev_lsn <= cont_lsn { // Didn't make any progress in last iteration. Error out to avoid @@ -2493,13 +2494,19 @@ impl Timeline { // Get all the data needed to reconstruct the page version from this layer. // But if we have an older cached page image, no need to go past that. let lsn_floor = max(cached_lsn + 1, start_lsn); - result = match open_layer.get_value_reconstruct_data( - key, - lsn_floor..cont_lsn, - reconstruct_state, - ctx, - ) { - Ok(result) => result, + result = match Arc::clone(open_layer) + .get_value_reconstruct_data( + key, + lsn_floor..cont_lsn, + reconstruct_state, + ctx.attached_child(), + ) + .await + { + Ok((new_reconstruct_state, result)) => { + reconstruct_state = new_reconstruct_state; + result + } Err(e) => return Err(PageReconstructError::from(e)), }; cont_lsn = lsn_floor; @@ -2520,13 +2527,19 @@ impl Timeline { if cont_lsn > start_lsn { //info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display()); let lsn_floor = max(cached_lsn + 1, start_lsn); - result = match frozen_layer.get_value_reconstruct_data( - key, - lsn_floor..cont_lsn, - reconstruct_state, - ctx, - ) { - Ok(result) => result, + result = match Arc::clone(frozen_layer) + .get_value_reconstruct_data( + key, + lsn_floor..cont_lsn, + reconstruct_state, + ctx.attached_child(), + ) + .await + { + Ok((new_reconstruct_state, result)) => { + reconstruct_state = new_reconstruct_state; + result + } Err(e) => return Err(PageReconstructError::from(e)), }; cont_lsn = lsn_floor; @@ -2555,13 +2568,19 @@ impl Timeline { // Get all the data needed to reconstruct the page version from this layer. // But if we have an older cached page image, no need to go past that. let lsn_floor = max(cached_lsn + 1, lsn_floor); - result = match layer.get_value_reconstruct_data( - key, - lsn_floor..cont_lsn, - reconstruct_state, - ctx, - ) { - Ok(result) => result, + result = match Arc::clone(&layer) + .get_value_reconstruct_data( + key, + lsn_floor..cont_lsn, + reconstruct_state, + ctx.attached_child(), + ) + .await + { + Ok((new_reconstruct_state, result)) => { + reconstruct_state = new_reconstruct_state; + result + } Err(e) => return Err(PageReconstructError::from(e)), }; cont_lsn = lsn_floor;