From 339a3e314609ed00e675b455d0fdb98e908394d2 Mon Sep 17 00:00:00 2001 From: Alexander Bayandin Date: Fri, 26 May 2023 14:49:42 +0100 Subject: [PATCH 01/18] GitHub Autocomment: comment commits for branches (#4335) ## Problem GitHub Autocomment script posts a comment only for PRs. It's harder to debug failed tests on main or release branches. ## Summary of changes - Change the GitHub Autocomment script to be able to post a comment to either a PR or a commit of a branch --- .github/workflows/build_and_test.yml | 6 +-- ...-test-report.js => comment-test-report.js} | 37 +++++++++++++++---- 2 files changed, 31 insertions(+), 12 deletions(-) rename scripts/{pr-comment-test-report.js => comment-test-report.js} (85%) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 6d89ce9994..336dea04eb 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -407,9 +407,7 @@ jobs: uses: ./.github/actions/allure-report-generate - uses: actions/github-script@v6 - if: > - !cancelled() && - github.event_name == 'pull_request' + if: ${{ !cancelled() }} with: # Retry script for 5XX server errors: https://github.com/actions/github-script#retries retries: 5 @@ -419,7 +417,7 @@ jobs: reportJsonUrl: "${{ steps.create-allure-report.outputs.report-json-url }}", } - const script = require("./scripts/pr-comment-test-report.js") + const script = require("./scripts/comment-test-report.js") await script({ github, context, diff --git a/scripts/pr-comment-test-report.js b/scripts/comment-test-report.js similarity index 85% rename from scripts/pr-comment-test-report.js rename to scripts/comment-test-report.js index 3a7bba0daa..a7fd5b0bef 100644 --- a/scripts/pr-comment-test-report.js +++ b/scripts/comment-test-report.js @@ -1,5 +1,5 @@ // -// The script parses Allure reports and posts a comment with a summary of the test results to the PR. +// The script parses Allure reports and posts a comment with a summary of the test results to the PR or to the latest commit in the branch. // // The comment is updated on each run with the latest results. // @@ -7,7 +7,7 @@ // - uses: actions/github-script@v6 // with: // script: | -// const script = require("./scripts/pr-comment-test-report.js") +// const script = require("./scripts/comment-test-report.js") // await script({ // github, // context, @@ -35,8 +35,12 @@ class DefaultMap extends Map { module.exports = async ({ github, context, fetch, report }) => { // Marker to find the comment in the subsequent runs const startMarker = `` + // If we run the script in the PR or in the branch (main/release/...) + const isPullRequest = !!context.payload.pull_request + // Latest commit in PR or in the branch + const commitSha = isPullRequest ? context.payload.pull_request.head.sha : context.sha // Let users know that the comment is updated automatically - const autoupdateNotice = `
The comment gets automatically updated with the latest test results
${context.payload.pull_request.head.sha} at ${new Date().toISOString()} :recycle:
` + const autoupdateNotice = `
The comment gets automatically updated with the latest test results
${commitSha} at ${new Date().toISOString()} :recycle:
` // GitHub bot id taken from (https://api.github.com/users/github-actions[bot]) const githubActionsBotId = 41898282 // Commend body itself @@ -166,22 +170,39 @@ module.exports = async ({ github, context, fetch, report }) => { commentBody += autoupdateNotice - const { data: comments } = await github.rest.issues.listComments({ - issue_number: context.payload.number, + let createCommentFn, listCommentsFn, updateCommentFn, issueNumberOrSha + if (isPullRequest) { + createCommentFn = github.rest.issues.createComment + listCommentsFn = github.rest.issues.listComments + updateCommentFn = github.rest.issues.updateComment + issueNumberOrSha = { + issue_number: context.payload.number, + } + } else { + updateCommentFn = github.rest.repos.updateCommitComment + listCommentsFn = github.rest.repos.listCommentsForCommit + createCommentFn = github.rest.repos.createCommitComment + issueNumberOrSha = { + commit_sha: commitSha, + } + } + + const { data: comments } = await listCommentsFn({ + ...issueNumberOrSha, ...ownerRepoParams, }) const comment = comments.find(comment => comment.user.id === githubActionsBotId && comment.body.startsWith(startMarker)) if (comment) { - await github.rest.issues.updateComment({ + await updateCommentFn({ comment_id: comment.id, body: commentBody, ...ownerRepoParams, }) } else { - await github.rest.issues.createComment({ - issue_number: context.payload.number, + await createCommentFn({ body: commentBody, + ...issueNumberOrSha, ...ownerRepoParams, }) } From be177f82dc5c9aa8166a3fdfbc03dbd8105d0c59 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Fri, 26 May 2023 18:37:17 +0300 Subject: [PATCH 02/18] Revert "Allow for higher s3 concurrency (#4292)" (#4356) This reverts commit 024109fbeb533b4574976a5899c27f56891de881 for it failing to be speed up anything, but run into more errors. See: #3698. --- Cargo.lock | 12 ---- libs/remote_storage/Cargo.toml | 2 - libs/remote_storage/src/lib.rs | 2 - libs/remote_storage/src/s3_bucket.rs | 85 +++++++++++++++++++--------- 4 files changed, 59 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 69d161d2b1..d390df94e0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2040,17 +2040,6 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" -[[package]] -name = "leaky-bucket" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d615fd0b579225f0d3c8d781af50a73644b571da8b5b50053ef2dcfa60dd51e7" -dependencies = [ - "parking_lot", - "tokio", - "tracing", -] - [[package]] name = "libc" version = "0.2.144" @@ -3233,7 +3222,6 @@ dependencies = [ "aws-smithy-http", "aws-types", "hyper", - "leaky-bucket", "metrics", "once_cell", "pin-project-lite", diff --git a/libs/remote_storage/Cargo.toml b/libs/remote_storage/Cargo.toml index 5da02293a8..0877a38dd9 100644 --- a/libs/remote_storage/Cargo.toml +++ b/libs/remote_storage/Cargo.toml @@ -25,8 +25,6 @@ utils.workspace = true pin-project-lite.workspace = true workspace_hack.workspace = true -leaky-bucket = "1.0" - [dev-dependencies] tempfile.workspace = true test-context.workspace = true diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index f3ae2425f6..e0cc3ca543 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -37,8 +37,6 @@ pub const DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS: u32 = 10; /// https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html /// ~3500 PUT/COPY/POST/DELETE or 5500 GET/HEAD S3 requests /// https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/ -/// -/// IAM ratelimit should never be observed with caching credentials provider. pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100; /// No limits on the client side, which currenltly means 1000 for AWS S3. /// https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 631caa6a48..0be8c72fe0 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -21,7 +21,10 @@ use aws_sdk_s3::{ }; use aws_smithy_http::body::SdkBody; use hyper::Body; -use tokio::io; +use tokio::{ + io::{self, AsyncRead}, + sync::Semaphore, +}; use tokio_util::io::ReaderStream; use tracing::debug; @@ -102,8 +105,9 @@ pub struct S3Bucket { prefix_in_bucket: Option, max_keys_per_list_response: Option, // Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded. + // Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold. // The helps to ensure we don't exceed the thresholds. - concurrency_limiter: Arc, + concurrency_limiter: Arc, } #[derive(Default)] @@ -154,24 +158,12 @@ impl S3Bucket { } prefix }); - - let rps = aws_config.concurrency_limit.get(); - let concurrency_limiter = leaky_bucket::RateLimiter::builder() - .max(rps) - .initial(0) - // refill it by rps every second. this means the (rps+1)th request will have to wait for - // 1 second from earliest. - .refill(rps) - .interval(std::time::Duration::from_secs(1)) - .fair(true) - .build(); - Ok(Self { client, bucket_name: aws_config.bucket_name.clone(), max_keys_per_list_response: aws_config.max_keys_per_list_response, prefix_in_bucket, - concurrency_limiter: Arc::new(concurrency_limiter), + concurrency_limiter: Arc::new(Semaphore::new(aws_config.concurrency_limit.get())), }) } @@ -203,10 +195,13 @@ impl S3Bucket { } async fn download_object(&self, request: GetObjectRequest) -> Result { - // while the download could take a long time with `leaky_bucket` we have nothing to release - // once the download is done. this is because with "requests per second" rate limiting on - // s3, there should be no meaning for the long requests. - self.concurrency_limiter.clone().acquire_owned(1).await; + let permit = self + .concurrency_limiter + .clone() + .acquire_owned() + .await + .context("Concurrency limiter semaphore got closed during S3 download") + .map_err(DownloadError::Other)?; metrics::inc_get_object(); @@ -224,9 +219,10 @@ impl S3Bucket { let metadata = object_output.metadata().cloned().map(StorageMetadata); Ok(Download { metadata, - download_stream: Box::pin(io::BufReader::new( + download_stream: Box::pin(io::BufReader::new(RatelimitedAsyncRead::new( + permit, object_output.body.into_async_read(), - )), + ))), }) } Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => { @@ -242,6 +238,32 @@ impl S3Bucket { } } +pin_project_lite::pin_project! { + /// An `AsyncRead` adapter which carries a permit for the lifetime of the value. + struct RatelimitedAsyncRead { + permit: tokio::sync::OwnedSemaphorePermit, + #[pin] + inner: S, + } +} + +impl RatelimitedAsyncRead { + fn new(permit: tokio::sync::OwnedSemaphorePermit, inner: S) -> Self { + RatelimitedAsyncRead { permit, inner } + } +} + +impl AsyncRead for RatelimitedAsyncRead { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut io::ReadBuf<'_>, + ) -> std::task::Poll> { + let this = self.project(); + this.inner.poll_read(cx, buf) + } +} + #[async_trait::async_trait] impl RemoteStorage for S3Bucket { /// See the doc for `RemoteStorage::list_prefixes` @@ -267,7 +289,12 @@ impl RemoteStorage for S3Bucket { let mut continuation_token = None; loop { - self.concurrency_limiter.acquire_one().await; + let _guard = self + .concurrency_limiter + .acquire() + .await + .context("Concurrency limiter semaphore got closed during S3 list") + .map_err(DownloadError::Other)?; metrics::inc_list_objects(); @@ -312,9 +339,11 @@ impl RemoteStorage for S3Bucket { to: &RemotePath, metadata: Option, ) -> anyhow::Result<()> { - // similarly to downloads, the permit does not have live through the upload, but instead we - // are rate limiting requests per second. - self.concurrency_limiter.acquire_one().await; + let _guard = self + .concurrency_limiter + .acquire() + .await + .context("Concurrency limiter semaphore got closed during S3 upload")?; metrics::inc_put_object(); @@ -369,7 +398,11 @@ impl RemoteStorage for S3Bucket { } async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> { - self.concurrency_limiter.acquire_one().await; + let _guard = self + .concurrency_limiter + .acquire() + .await + .context("Concurrency limiter semaphore got closed during S3 delete")?; metrics::inc_delete_object(); From 4e359db4c78e0fd3d3e8f6a69baac4fb5b80b752 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Fri, 26 May 2023 17:15:47 -0400 Subject: [PATCH 03/18] pgserver: spawn_blocking in compaction (#4265) Compaction is usually a compute-heavy process and might affect other futures running on the thread of the compaction. Therefore, we add `block_in_place` as a temporary solution to avoid blocking other futures on the same thread as compaction in the runtime. As we are migrating towards a fully-async-style pageserver, we can revert this change when everything is async and when we move compaction to a separate runtime. --------- Signed-off-by: Alex Chi --- pageserver/src/context.rs | 3 +- pageserver/src/tenant/par_fsync.rs | 52 +++++++++++++---- pageserver/src/tenant/timeline.rs | 89 ++++++++++++++++++------------ 3 files changed, 98 insertions(+), 46 deletions(-) diff --git a/pageserver/src/context.rs b/pageserver/src/context.rs index e826d28e6d..f53b7736ab 100644 --- a/pageserver/src/context.rs +++ b/pageserver/src/context.rs @@ -88,6 +88,7 @@ use crate::task_mgr::TaskKind; // The main structure of this module, see module-level comment. +#[derive(Clone, Debug)] pub struct RequestContext { task_kind: TaskKind, download_behavior: DownloadBehavior, @@ -95,7 +96,7 @@ pub struct RequestContext { /// Desired behavior if the operation requires an on-demand download /// to proceed. -#[derive(Clone, Copy, PartialEq, Eq)] +#[derive(Clone, Copy, PartialEq, Eq, Debug)] pub enum DownloadBehavior { /// Download the layer file. It can take a while. Download, diff --git a/pageserver/src/tenant/par_fsync.rs b/pageserver/src/tenant/par_fsync.rs index 0b0217ab58..3cbcfe8774 100644 --- a/pageserver/src/tenant/par_fsync.rs +++ b/pageserver/src/tenant/par_fsync.rs @@ -19,14 +19,8 @@ fn parallel_worker(paths: &[PathBuf], next_path_idx: &AtomicUsize) -> io::Result Ok(()) } -pub fn par_fsync(paths: &[PathBuf]) -> io::Result<()> { - const PARALLEL_PATH_THRESHOLD: usize = 1; - if paths.len() <= PARALLEL_PATH_THRESHOLD { - for path in paths { - fsync_path(path)?; - } - return Ok(()); - } +fn fsync_in_thread_pool(paths: &[PathBuf]) -> io::Result<()> { + // TODO: remove this function in favor of `par_fsync_async` once we asyncify everything. /// Use at most this number of threads. /// Increasing this limit will @@ -36,11 +30,11 @@ pub fn par_fsync(paths: &[PathBuf]) -> io::Result<()> { let num_threads = paths.len().min(MAX_NUM_THREADS); let next_path_idx = AtomicUsize::new(0); - crossbeam_utils::thread::scope(|s| -> io::Result<()> { + std::thread::scope(|s| -> io::Result<()> { let mut handles = vec![]; // Spawn `num_threads - 1`, as the current thread is also a worker. for _ in 1..num_threads { - handles.push(s.spawn(|_| parallel_worker(paths, &next_path_idx))); + handles.push(s.spawn(|| parallel_worker(paths, &next_path_idx))); } parallel_worker(paths, &next_path_idx)?; @@ -51,5 +45,41 @@ pub fn par_fsync(paths: &[PathBuf]) -> io::Result<()> { Ok(()) }) - .unwrap() +} + +/// Parallel fsync all files. Can be used in non-async context as it is using rayon thread pool. +pub fn par_fsync(paths: &[PathBuf]) -> io::Result<()> { + if paths.len() == 1 { + fsync_path(&paths[0])?; + return Ok(()); + } + + fsync_in_thread_pool(paths) +} + +/// Parallel fsync asynchronously. If number of files are less than PARALLEL_PATH_THRESHOLD, fsync is done in the current +/// execution thread. Otherwise, we will spawn_blocking and run it in tokio. +pub async fn par_fsync_async(paths: &[PathBuf]) -> io::Result<()> { + const MAX_CONCURRENT_FSYNC: usize = 64; + let mut next = paths.iter().peekable(); + let mut js = tokio::task::JoinSet::new(); + loop { + while js.len() < MAX_CONCURRENT_FSYNC && next.peek().is_some() { + let next = next.next().expect("just peeked"); + let next = next.to_owned(); + js.spawn_blocking(move || fsync_path(&next)); + } + + // now the joinset has been filled up, wait for next to complete + if let Some(res) = js.join_next().await { + res??; + } else { + // last item had already completed + assert!( + next.peek().is_none(), + "joinset emptied, we shouldn't have more work" + ); + return Ok(()); + } + } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index b0aca45882..4bfebd93df 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -195,8 +195,9 @@ pub struct Timeline { /// Layer removal lock. /// A lock to ensure that no layer of the timeline is removed concurrently by other tasks. /// This lock is acquired in [`Timeline::gc`], [`Timeline::compact`], - /// and [`Tenant::delete_timeline`]. - pub(super) layer_removal_cs: tokio::sync::Mutex<()>, + /// and [`Tenant::delete_timeline`]. This is an `Arc` lock because we need an owned + /// lock guard in functions that will be spawned to tokio I/O pool (which requires `'static`). + pub(super) layer_removal_cs: Arc>, // Needed to ensure that we can't create a branch at a point that was already garbage collected pub latest_gc_cutoff_lsn: Rcu, @@ -669,7 +670,7 @@ impl Timeline { } /// Outermost timeline compaction operation; downloads needed layers. - pub async fn compact(&self, ctx: &RequestContext) -> anyhow::Result<()> { + pub async fn compact(self: &Arc, ctx: &RequestContext) -> anyhow::Result<()> { const ROUNDS: usize = 2; let last_record_lsn = self.get_last_record_lsn(); @@ -758,7 +759,7 @@ impl Timeline { } /// Compaction which might need to be retried after downloading remote layers. - async fn compact_inner(&self, ctx: &RequestContext) -> Result<(), CompactionError> { + async fn compact_inner(self: &Arc, ctx: &RequestContext) -> Result<(), CompactionError> { // // High level strategy for compaction / image creation: // @@ -793,7 +794,7 @@ impl Timeline { // Below are functions compact_level0() and create_image_layers() // but they are a bit ad hoc and don't quite work like it's explained // above. Rewrite it. - let layer_removal_cs = self.layer_removal_cs.lock().await; + let layer_removal_cs = Arc::new(self.layer_removal_cs.clone().lock_owned().await); // Is the timeline being deleted? let state = *self.state.borrow(); if state == TimelineState::Stopping { @@ -827,7 +828,7 @@ impl Timeline { // 3. Compact let timer = self.metrics.compact_time_histo.start_timer(); - self.compact_level0(&layer_removal_cs, target_file_size, ctx) + self.compact_level0(layer_removal_cs.clone(), target_file_size, ctx) .await?; timer.stop_and_record(); } @@ -2168,7 +2169,7 @@ impl Timeline { fn delete_historic_layer( &self, // we cannot remove layers otherwise, since gc and compaction will race - _layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>, + _layer_removal_cs: Arc>, layer: Arc, updates: &mut BatchedUpdates<'_, dyn PersistentLayer>, ) -> anyhow::Result<()> { @@ -2632,7 +2633,7 @@ impl Timeline { /// Layer flusher task's main loop. async fn flush_loop( - &self, + self: &Arc, mut layer_flush_start_rx: tokio::sync::watch::Receiver, ctx: &RequestContext, ) { @@ -2723,7 +2724,7 @@ impl Timeline { /// Flush one frozen in-memory layer to disk, as a new delta layer. #[instrument(skip(self, frozen_layer, ctx), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id, layer=%frozen_layer.short_id()))] async fn flush_frozen_layer( - &self, + self: &Arc, frozen_layer: Arc, ctx: &RequestContext, ) -> anyhow::Result<()> { @@ -2743,7 +2744,11 @@ impl Timeline { .await? } else { // normal case, write out a L0 delta layer file. - let (delta_path, metadata) = self.create_delta_layer(&frozen_layer)?; + let this = self.clone(); + let frozen_layer = frozen_layer.clone(); + let (delta_path, metadata) = + tokio::task::spawn_blocking(move || this.create_delta_layer(&frozen_layer)) + .await??; HashMap::from([(delta_path, metadata)]) }; @@ -2847,7 +2852,7 @@ impl Timeline { // Write out the given frozen in-memory layer as a new L0 delta file fn create_delta_layer( - &self, + self: &Arc, frozen_layer: &InMemoryLayer, ) -> anyhow::Result<(LayerFileName, LayerFileMetadata)> { // Write it out @@ -2863,10 +2868,13 @@ impl Timeline { // TODO: If we're running inside 'flush_frozen_layers' and there are multiple // files to flush, it might be better to first write them all, and then fsync // them all in parallel. - par_fsync::par_fsync(&[ - new_delta_path.clone(), - self.conf.timeline_path(&self.timeline_id, &self.tenant_id), - ])?; + + // First sync the delta layer. We still use par_fsync here to keep everything consistent. Feel free to replace + // this with a single fsync in future refactors. + par_fsync::par_fsync(&[new_delta_path.clone()]).context("fsync of delta layer")?; + // Then sync the parent directory. + par_fsync::par_fsync(&[self.conf.timeline_path(&self.timeline_id, &self.tenant_id)]) + .context("fsync of timeline dir")?; // Add it to the layer map let l = Arc::new(new_delta); @@ -3090,11 +3098,15 @@ impl Timeline { let all_paths = image_layers .iter() .map(|layer| layer.path()) - .chain(std::iter::once( - self.conf.timeline_path(&self.timeline_id, &self.tenant_id), - )) .collect::>(); - par_fsync::par_fsync(&all_paths).context("fsync of newly created layer files")?; + + par_fsync::par_fsync_async(&all_paths) + .await + .context("fsync of newly created layer files")?; + + par_fsync::par_fsync_async(&[self.conf.timeline_path(&self.timeline_id, &self.tenant_id)]) + .await + .context("fsync of timeline dir")?; let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len()); @@ -3159,9 +3171,9 @@ impl Timeline { /// This method takes the `_layer_removal_cs` guard to highlight it required downloads are /// returned as an error. If the `layer_removal_cs` boundary is changed not to be taken in the /// start of level0 files compaction, the on-demand download should be revisited as well. - async fn compact_level0_phase1( + fn compact_level0_phase1( &self, - _layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>, + _layer_removal_cs: Arc>, target_file_size: u64, ctx: &RequestContext, ) -> Result { @@ -3474,13 +3486,13 @@ impl Timeline { if !new_layers.is_empty() { let mut layer_paths: Vec = new_layers.iter().map(|l| l.path()).collect(); - // also sync the directory - layer_paths.push(self.conf.timeline_path(&self.timeline_id, &self.tenant_id)); - // Fsync all the layer files and directory using multiple threads to // minimize latency. par_fsync::par_fsync(&layer_paths).context("fsync all new layers")?; + par_fsync::par_fsync(&[self.conf.timeline_path(&self.timeline_id, &self.tenant_id)]) + .context("fsync of timeline dir")?; + layer_paths.pop().unwrap(); } @@ -3497,17 +3509,22 @@ impl Timeline { /// as Level 1 files. /// async fn compact_level0( - &self, - layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>, + self: &Arc, + layer_removal_cs: Arc>, target_file_size: u64, ctx: &RequestContext, ) -> Result<(), CompactionError> { + let this = self.clone(); + let ctx_inner = ctx.clone(); + let layer_removal_cs_inner = layer_removal_cs.clone(); let CompactLevel0Phase1Result { new_layers, deltas_to_compact, - } = self - .compact_level0_phase1(layer_removal_cs, target_file_size, ctx) - .await?; + } = tokio::task::spawn_blocking(move || { + this.compact_level0_phase1(layer_removal_cs_inner, target_file_size, &ctx_inner) + }) + .await + .unwrap()?; if new_layers.is_empty() && deltas_to_compact.is_empty() { // nothing to do @@ -3565,7 +3582,7 @@ impl Timeline { let mut layer_names_to_delete = Vec::with_capacity(deltas_to_compact.len()); for l in deltas_to_compact { layer_names_to_delete.push(l.filename()); - self.delete_historic_layer(layer_removal_cs, l, &mut updates)?; + self.delete_historic_layer(layer_removal_cs.clone(), l, &mut updates)?; } updates.flush(); drop(layers); @@ -3685,7 +3702,7 @@ impl Timeline { fail_point!("before-timeline-gc"); - let layer_removal_cs = self.layer_removal_cs.lock().await; + let layer_removal_cs = Arc::new(self.layer_removal_cs.clone().lock_owned().await); // Is the timeline being deleted? let state = *self.state.borrow(); if state == TimelineState::Stopping { @@ -3705,7 +3722,7 @@ impl Timeline { let res = self .gc_timeline( - &layer_removal_cs, + layer_removal_cs.clone(), horizon_cutoff, pitr_cutoff, retain_lsns, @@ -3724,7 +3741,7 @@ impl Timeline { async fn gc_timeline( &self, - layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>, + layer_removal_cs: Arc>, horizon_cutoff: Lsn, pitr_cutoff: Lsn, retain_lsns: Vec, @@ -3897,7 +3914,11 @@ impl Timeline { { for doomed_layer in layers_to_remove { layer_names_to_delete.push(doomed_layer.filename()); - self.delete_historic_layer(layer_removal_cs, doomed_layer, &mut updates)?; // FIXME: schedule succeeded deletions before returning? + self.delete_historic_layer( + layer_removal_cs.clone(), + doomed_layer, + &mut updates, + )?; // FIXME: schedule succeeded deletions before returning? result.layers_removed += 1; } } From 200a520e6c73ce028a4964e9f9d1c1eb92515415 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Sat, 27 May 2023 01:07:00 +0300 Subject: [PATCH 04/18] Minor refactoring in RequestSpan Require the error type to be ApiError. It implicitly required that anyway, because the function used error::handler, which downcasted the error to an ApiError. If the error was in fact anything else than ApiError, it would just panic. Better to check it at compilation time. Also make the last-resort error handler more forgiving, so that it returns an 500 Internal Server error response, instead of panicking, if a request handler returns some other error than an ApiError. --- libs/utils/src/http/endpoint.rs | 39 +++++++++++++++++++-------------- libs/utils/src/http/error.rs | 21 +++++++++++++----- 2 files changed, 38 insertions(+), 22 deletions(-) diff --git a/libs/utils/src/http/endpoint.rs b/libs/utils/src/http/endpoint.rs index 4bfb5bf994..4a78f16cfb 100644 --- a/libs/utils/src/http/endpoint.rs +++ b/libs/utils/src/http/endpoint.rs @@ -1,5 +1,5 @@ use crate::auth::{Claims, JwtAuth}; -use crate::http::error; +use crate::http::error::{api_error_handler, route_error_handler, ApiError}; use anyhow::{anyhow, Context}; use hyper::header::{HeaderName, AUTHORIZATION}; use hyper::http::HeaderValue; @@ -16,8 +16,6 @@ use std::future::Future; use std::net::TcpListener; use std::str::FromStr; -use super::error::ApiError; - static SERVE_METRICS_COUNT: Lazy = Lazy::new(|| { register_int_counter!( "libmetrics_metric_handler_requests_total", @@ -38,6 +36,8 @@ struct RequestId(String); /// Use this to distinguish between logs of different HTTP requests: every request handler wrapped /// in this type will get request info logged in the wrapping span, including the unique request ID. /// +/// This also handles errors, logging them and converting them to an HTTP error response. +/// /// There could be other ways to implement similar functionality: /// /// * procmacros placed on top of all handler methods @@ -54,21 +54,19 @@ struct RequestId(String); /// tries to achive with its `.instrument` used in the current approach. /// /// If needed, a declarative macro to substitute the |r| ... closure boilerplate could be introduced. -pub struct RequestSpan(pub H) +pub struct RequestSpan(pub H) where - E: Into> + 'static, - R: Future, E>> + Send + 'static, + R: Future, ApiError>> + Send + 'static, H: Fn(Request) -> R + Send + Sync + 'static; -impl RequestSpan +impl RequestSpan where - E: Into> + 'static, - R: Future, E>> + Send + 'static, + R: Future, ApiError>> + Send + 'static, H: Fn(Request) -> R + Send + Sync + 'static, { /// Creates a tracing span around inner request handler and executes the request handler in the contex of that span. /// Use as `|r| RequestSpan(my_handler).handle(r)` instead of `my_handler` as the request handler to get the span enabled. - pub async fn handle(self, request: Request) -> Result, E> { + pub async fn handle(self, request: Request) -> Result, ApiError> { let request_id = request.context::().unwrap_or_default().0; let method = request.method(); let path = request.uri().path(); @@ -83,15 +81,22 @@ where info!("Handling request"); } - // Note that we reuse `error::handler` here and not returning and error at all, - // yet cannot use `!` directly in the method signature due to `routerify::RouterBuilder` limitation. - // Usage of the error handler also means that we expect only the `ApiError` errors to be raised in this call. - // - // Panics are not handled separately, there's a `tracing_panic_hook` from another module to do that globally. + // No special handling for panics here. There's a `tracing_panic_hook` from another + // module to do that globally. let res = (self.0)(request).await; cancellation_guard.disarm(); + // Log the result if needed. + // + // We also convert any errors into an Ok response with HTTP error code here. + // `make_router` sets a last-resort error handler that would do the same, but + // we prefer to do it here, before we exit the request span, so that the error + // is still logged with the span. + // + // (Because we convert errors to Ok response, we never actually return an error, + // and we could declare the function to return the never type (`!`). However, + // using `routerify::RouterBuilder` requires a proper error type.) match res { Ok(response) => { let response_status = response.status(); @@ -102,7 +107,7 @@ where } Ok(response) } - Err(e) => Ok(error::handler(e.into()).await), + Err(err) => Ok(api_error_handler(err)), } } .instrument(request_span) @@ -210,7 +215,7 @@ pub fn make_router() -> RouterBuilder { .get("/metrics", |r| { RequestSpan(prometheus_metrics_handler).handle(r) }) - .err_handler(error::handler) + .err_handler(route_error_handler) } pub fn attach_openapi_ui( diff --git a/libs/utils/src/http/error.rs b/libs/utils/src/http/error.rs index 3c6023eb80..4eff16b6a3 100644 --- a/libs/utils/src/http/error.rs +++ b/libs/utils/src/http/error.rs @@ -83,13 +83,24 @@ impl HttpErrorBody { } } -pub async fn handler(err: routerify::RouteError) -> Response { - let api_error = err - .downcast::() - .expect("handler should always return api error"); +pub async fn route_error_handler(err: routerify::RouteError) -> Response { + match err.downcast::() { + Ok(api_error) => api_error_handler(*api_error), + Err(other_error) => { + // We expect all the request handlers to return an ApiError, so this should + // not be reached. But just in case. + error!("Error processing HTTP request: {other_error:?}"); + HttpErrorBody::response_from_msg_and_status( + other_error.to_string(), + StatusCode::INTERNAL_SERVER_ERROR, + ) + } + } +} +pub fn api_error_handler(api_error: ApiError) -> Response { // Print a stack trace for Internal Server errors - if let ApiError::InternalServerError(_) = api_error.as_ref() { + if let ApiError::InternalServerError(_) = api_error { error!("Error processing HTTP request: {api_error:?}"); } else { error!("Error processing HTTP request: {api_error:#}"); From 2cdf07f12c5d1fc637e0acfdd512ab88f801907d Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Sat, 27 May 2023 01:11:58 +0300 Subject: [PATCH 05/18] Refactor RequestSpan into a function. Previously, you used it like this: |r| RequestSpan(my_handler).handle(r) But I don't see the point of the RequestSpan struct. It's just a wrapper around the handler function. With this commit, the call becomes: |r| request_span(r, my_handler) Which seems a little simpler. At first I thought that the RequestSpan struct would allow "chaining" other kinds of decorators like RequestSpan, so that you could do something like this: |r| CheckPermissions(RequestSpan(my_handler)).handle(r) But it doesn't work like that. If each of those structs wrap a handler *function*, it would actually look like this: |r| CheckPermissions(|r| RequestSpan(my_handler).handle(r))).handle(r) This commit doesn't make that kind of chaining any easier, but seems a little more straightforward anyway. --- libs/utils/src/http/endpoint.rs | 122 +++++++++++++++----------------- pageserver/src/http/routes.rs | 56 +++++++-------- 2 files changed, 84 insertions(+), 94 deletions(-) diff --git a/libs/utils/src/http/endpoint.rs b/libs/utils/src/http/endpoint.rs index 4a78f16cfb..db3642b507 100644 --- a/libs/utils/src/http/endpoint.rs +++ b/libs/utils/src/http/endpoint.rs @@ -33,8 +33,10 @@ struct RequestId(String); /// Adds a tracing info_span! instrumentation around the handler events, /// logs the request start and end events for non-GET requests and non-200 responses. /// +/// Usage: Replace `my_handler` with `|r| request_span(r, my_handler)` +/// /// Use this to distinguish between logs of different HTTP requests: every request handler wrapped -/// in this type will get request info logged in the wrapping span, including the unique request ID. +/// with this will get request info logged in the wrapping span, including the unique request ID. /// /// This also handles errors, logging them and converting them to an HTTP error response. /// @@ -54,65 +56,56 @@ struct RequestId(String); /// tries to achive with its `.instrument` used in the current approach. /// /// If needed, a declarative macro to substitute the |r| ... closure boilerplate could be introduced. -pub struct RequestSpan(pub H) +pub async fn request_span(request: Request, handler: H) -> R::Output where R: Future, ApiError>> + Send + 'static, - H: Fn(Request) -> R + Send + Sync + 'static; - -impl RequestSpan -where - R: Future, ApiError>> + Send + 'static, - H: Fn(Request) -> R + Send + Sync + 'static, + H: FnOnce(Request) -> R + Send + Sync + 'static, { - /// Creates a tracing span around inner request handler and executes the request handler in the contex of that span. - /// Use as `|r| RequestSpan(my_handler).handle(r)` instead of `my_handler` as the request handler to get the span enabled. - pub async fn handle(self, request: Request) -> Result, ApiError> { - let request_id = request.context::().unwrap_or_default().0; - let method = request.method(); - let path = request.uri().path(); - let request_span = info_span!("request", %method, %path, %request_id); + let request_id = request.context::().unwrap_or_default().0; + let method = request.method(); + let path = request.uri().path(); + let request_span = info_span!("request", %method, %path, %request_id); - let log_quietly = method == Method::GET; - async move { - let cancellation_guard = RequestCancelled::warn_when_dropped_without_responding(); - if log_quietly { - debug!("Handling request"); - } else { - info!("Handling request"); - } - - // No special handling for panics here. There's a `tracing_panic_hook` from another - // module to do that globally. - let res = (self.0)(request).await; - - cancellation_guard.disarm(); - - // Log the result if needed. - // - // We also convert any errors into an Ok response with HTTP error code here. - // `make_router` sets a last-resort error handler that would do the same, but - // we prefer to do it here, before we exit the request span, so that the error - // is still logged with the span. - // - // (Because we convert errors to Ok response, we never actually return an error, - // and we could declare the function to return the never type (`!`). However, - // using `routerify::RouterBuilder` requires a proper error type.) - match res { - Ok(response) => { - let response_status = response.status(); - if log_quietly && response_status.is_success() { - debug!("Request handled, status: {response_status}"); - } else { - info!("Request handled, status: {response_status}"); - } - Ok(response) - } - Err(err) => Ok(api_error_handler(err)), - } + let log_quietly = method == Method::GET; + async move { + let cancellation_guard = RequestCancelled::warn_when_dropped_without_responding(); + if log_quietly { + debug!("Handling request"); + } else { + info!("Handling request"); + } + + // No special handling for panics here. There's a `tracing_panic_hook` from another + // module to do that globally. + let res = handler(request).await; + + cancellation_guard.disarm(); + + // Log the result if needed. + // + // We also convert any errors into an Ok response with HTTP error code here. + // `make_router` sets a last-resort error handler that would do the same, but + // we prefer to do it here, before we exit the request span, so that the error + // is still logged with the span. + // + // (Because we convert errors to Ok response, we never actually return an error, + // and we could declare the function to return the never type (`!`). However, + // using `routerify::RouterBuilder` requires a proper error type.) + match res { + Ok(response) => { + let response_status = response.status(); + if log_quietly && response_status.is_success() { + debug!("Request handled, status: {response_status}"); + } else { + info!("Request handled, status: {response_status}"); + } + Ok(response) + } + Err(err) => Ok(api_error_handler(err)), } - .instrument(request_span) - .await } + .instrument(request_span) + .await } /// Drop guard to WARN in case the request was dropped before completion. @@ -212,9 +205,7 @@ pub fn make_router() -> RouterBuilder { .middleware(Middleware::post_with_info( add_request_id_header_to_response, )) - .get("/metrics", |r| { - RequestSpan(prometheus_metrics_handler).handle(r) - }) + .get("/metrics", |r| request_span(r, prometheus_metrics_handler)) .err_handler(route_error_handler) } @@ -225,12 +216,14 @@ pub fn attach_openapi_ui( ui_mount_path: &'static str, ) -> RouterBuilder { router_builder - .get(spec_mount_path, move |r| { - RequestSpan(move |_| async move { Ok(Response::builder().body(Body::from(spec)).unwrap()) }) - .handle(r) - }) - .get(ui_mount_path, move |r| RequestSpan( move |_| async move { - Ok(Response::builder().body(Body::from(format!(r#" + .get(spec_mount_path, + move |r| request_span(r, move |_| async move { + Ok(Response::builder().body(Body::from(spec)).unwrap()) + }) + ) + .get(ui_mount_path, + move |r| request_span(r, move |_| async move { + Ok(Response::builder().body(Body::from(format!(r#" @@ -260,7 +253,8 @@ pub fn attach_openapi_ui( "#, spec_mount_path))).unwrap()) - }).handle(r)) + }) + ) } fn parse_token(header_value: &str) -> Result<&str, ApiError> { diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 30c219f773..279f069be7 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -11,7 +11,7 @@ use storage_broker::BrokerClientChannel; use tenant_size_model::{SizeResult, StorageModel}; use tokio_util::sync::CancellationToken; use tracing::*; -use utils::http::endpoint::RequestSpan; +use utils::http::endpoint::request_span; use utils::http::json::json_request_or_empty_body; use utils::http::request::{get_request_param, must_get_query_param, parse_query_param}; @@ -1179,7 +1179,7 @@ pub fn make_router( #[cfg(not(feature = "testing"))] let handler = cfg_disabled; - move |r| RequestSpan(handler).handle(r) + move |r| request_span(r, handler) }}; } @@ -1194,54 +1194,50 @@ pub fn make_router( ) .context("Failed to initialize router state")?, )) - .get("/v1/status", |r| RequestSpan(status_handler).handle(r)) + .get("/v1/status", |r| request_span(r, status_handler)) .put( "/v1/failpoints", testing_api!("manage failpoints", failpoints_handler), ) - .get("/v1/tenant", |r| RequestSpan(tenant_list_handler).handle(r)) - .post("/v1/tenant", |r| { - RequestSpan(tenant_create_handler).handle(r) - }) - .get("/v1/tenant/:tenant_id", |r| { - RequestSpan(tenant_status).handle(r) - }) + .get("/v1/tenant", |r| request_span(r, tenant_list_handler)) + .post("/v1/tenant", |r| request_span(r, tenant_create_handler)) + .get("/v1/tenant/:tenant_id", |r| request_span(r, tenant_status)) .get("/v1/tenant/:tenant_id/synthetic_size", |r| { - RequestSpan(tenant_size_handler).handle(r) + request_span(r, tenant_size_handler) }) .put("/v1/tenant/config", |r| { - RequestSpan(update_tenant_config_handler).handle(r) + request_span(r, update_tenant_config_handler) }) .get("/v1/tenant/:tenant_id/config", |r| { - RequestSpan(get_tenant_config_handler).handle(r) + request_span(r, get_tenant_config_handler) }) .get("/v1/tenant/:tenant_id/timeline", |r| { - RequestSpan(timeline_list_handler).handle(r) + request_span(r, timeline_list_handler) }) .post("/v1/tenant/:tenant_id/timeline", |r| { - RequestSpan(timeline_create_handler).handle(r) + request_span(r, timeline_create_handler) }) .post("/v1/tenant/:tenant_id/attach", |r| { - RequestSpan(tenant_attach_handler).handle(r) + request_span(r, tenant_attach_handler) }) .post("/v1/tenant/:tenant_id/detach", |r| { - RequestSpan(tenant_detach_handler).handle(r) + request_span(r, tenant_detach_handler) }) .post("/v1/tenant/:tenant_id/load", |r| { - RequestSpan(tenant_load_handler).handle(r) + request_span(r, tenant_load_handler) }) .post("/v1/tenant/:tenant_id/ignore", |r| { - RequestSpan(tenant_ignore_handler).handle(r) + request_span(r, tenant_ignore_handler) }) .get("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| { - RequestSpan(timeline_detail_handler).handle(r) + request_span(r, timeline_detail_handler) }) .get( "/v1/tenant/:tenant_id/timeline/:timeline_id/get_lsn_by_timestamp", - |r| RequestSpan(get_lsn_by_timestamp_handler).handle(r), + |r| request_span(r, get_lsn_by_timestamp_handler), ) .put("/v1/tenant/:tenant_id/timeline/:timeline_id/do_gc", |r| { - RequestSpan(timeline_gc_handler).handle(r) + request_span(r, timeline_gc_handler) }) .put( "/v1/tenant/:tenant_id/timeline/:timeline_id/compact", @@ -1253,34 +1249,34 @@ pub fn make_router( ) .post( "/v1/tenant/:tenant_id/timeline/:timeline_id/download_remote_layers", - |r| RequestSpan(timeline_download_remote_layers_handler_post).handle(r), + |r| request_span(r, timeline_download_remote_layers_handler_post), ) .get( "/v1/tenant/:tenant_id/timeline/:timeline_id/download_remote_layers", - |r| RequestSpan(timeline_download_remote_layers_handler_get).handle(r), + |r| request_span(r, timeline_download_remote_layers_handler_get), ) .delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| { - RequestSpan(timeline_delete_handler).handle(r) + request_span(r, timeline_delete_handler) }) .get("/v1/tenant/:tenant_id/timeline/:timeline_id/layer", |r| { - RequestSpan(layer_map_info_handler).handle(r) + request_span(r, layer_map_info_handler) }) .get( "/v1/tenant/:tenant_id/timeline/:timeline_id/layer/:layer_file_name", - |r| RequestSpan(layer_download_handler).handle(r), + |r| request_span(r, layer_download_handler), ) .delete( "/v1/tenant/:tenant_id/timeline/:timeline_id/layer/:layer_file_name", - |r| RequestSpan(evict_timeline_layer_handler).handle(r), + |r| request_span(r, evict_timeline_layer_handler), ) .put("/v1/disk_usage_eviction/run", |r| { - RequestSpan(disk_usage_eviction_run).handle(r) + request_span(r, disk_usage_eviction_run) }) .put( "/v1/tenant/:tenant_id/break", testing_api!("set tenant state to broken", handle_tenant_break), ) - .get("/v1/panic", |r| RequestSpan(always_panic_handler).handle(r)) + .get("/v1/panic", |r| request_span(r, always_panic_handler)) .post( "/v1/tracing/event", testing_api!("emit a tracing event", post_tracing_event_handler), From 2d6a022bb819aca9fe9689ac23184b01b070e12f Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Sat, 27 May 2023 15:55:43 +0300 Subject: [PATCH 06/18] Don't allow two timeline_delete operations to run concurrently. (#4313) If the timeline is already being deleted, return an error. We used to notice the duplicate request and error out in persist_index_part_with_deleted_flag(), but it's better to detect it earlier. Add an explicit lock for the deletion. Note: This doesn't do anything about the async cancellation problem (github issue #3478): if the original HTTP request dropped, because the client disconnected, the timeline deletion stops half-way through the operation. That needs to be fixed, too, but that's a separate story. (This is a simpler replacement for PR #4194. I'm also working on the cancellation shielding, see PR #4314.) --- pageserver/src/tenant.rs | 106 +++++++++++--------- pageserver/src/tenant/timeline.rs | 5 + test_runner/regress/test_timeline_delete.py | 2 +- 3 files changed, 67 insertions(+), 46 deletions(-) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 2827830f02..991f5ca1c6 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -1436,7 +1436,11 @@ impl Tenant { Ok(()) } - /// Removes timeline-related in-memory data + /// Shuts down a timeline's tasks, removes its in-memory structures, and deletes its + /// data from disk. + /// + /// This doesn't currently delete all data from S3, but sets a flag in its + /// index_part.json file to mark it as deleted. pub async fn delete_timeline( &self, timeline_id: TimelineId, @@ -1446,7 +1450,11 @@ impl Tenant { // Transition the timeline into TimelineState::Stopping. // This should prevent new operations from starting. - let timeline = { + // + // Also grab the Timeline's delete_lock to prevent another deletion from starting. + let timeline; + let mut delete_lock_guard; + { let mut timelines = self.timelines.lock().unwrap(); // Ensure that there are no child timelines **attached to that pageserver**, @@ -1464,20 +1472,36 @@ impl Tenant { Entry::Vacant(_) => return Err(DeleteTimelineError::NotFound), }; - let timeline = Arc::clone(timeline_entry.get()); + timeline = Arc::clone(timeline_entry.get()); + + // Prevent two tasks from trying to delete the timeline at the same time. + // + // XXX: We should perhaps return an HTTP "202 Accepted" to signal that the caller + // needs to poll until the operation has finished. But for now, we return an + // error, because the control plane knows to retry errors. + delete_lock_guard = timeline.delete_lock.try_lock().map_err(|_| { + DeleteTimelineError::Other(anyhow::anyhow!( + "timeline deletion is already in progress" + )) + })?; + + // If another task finished the deletion just before we acquired the lock, + // return success. + if *delete_lock_guard { + return Ok(()); + } + timeline.set_state(TimelineState::Stopping); drop(timelines); - timeline - }; + } // Now that the Timeline is in Stopping state, request all the related tasks to // shut down. // - // NB: If you call delete_timeline multiple times concurrently, they will - // all go through the motions here. Make sure the code here is idempotent, - // and don't error out if some of the shutdown tasks have already been - // completed! + // NB: If this fails half-way through, and is retried, the retry will go through + // all the same steps again. Make sure the code here is idempotent, and don't + // error out if some of the shutdown tasks have already been completed! // Stop the walreceiver first. debug!("waiting for wal receiver to shutdown"); @@ -1518,6 +1542,10 @@ impl Tenant { // If we (now, or already) marked it successfully as deleted, we can proceed Ok(()) | Err(PersistIndexPartWithDeletedFlagError::AlreadyDeleted(_)) => (), // Bail out otherwise + // + // AlreadyInProgress shouldn't happen, because the 'delete_lock' prevents + // two tasks from performing the deletion at the same time. The first task + // that starts deletion should run it to completion. Err(e @ PersistIndexPartWithDeletedFlagError::AlreadyInProgress(_)) | Err(e @ PersistIndexPartWithDeletedFlagError::Other(_)) => { return Err(DeleteTimelineError::Other(anyhow::anyhow!(e))); @@ -1528,14 +1556,12 @@ impl Tenant { { // Grab the layer_removal_cs lock, and actually perform the deletion. // - // This lock prevents multiple concurrent delete_timeline calls from - // stepping on each other's toes, while deleting the files. It also - // prevents GC or compaction from running at the same time. + // This lock prevents prevents GC or compaction from running at the same time. + // The GC task doesn't register itself with the timeline it's operating on, + // so it might still be running even though we called `shutdown_tasks`. // // Note that there are still other race conditions between - // GC, compaction and timeline deletion. GC task doesn't - // register itself properly with the timeline it's - // operating on. See + // GC, compaction and timeline deletion. See // https://github.com/neondatabase/neon/issues/2671 // // No timeout here, GC & Compaction should be responsive to the @@ -1597,37 +1623,27 @@ impl Tenant { }); // Remove the timeline from the map. - let mut timelines = self.timelines.lock().unwrap(); - let children_exist = timelines - .iter() - .any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline_id)); - // XXX this can happen because `branch_timeline` doesn't check `TimelineState::Stopping`. - // We already deleted the layer files, so it's probably best to panic. - // (Ideally, above remove_dir_all is atomic so we don't see this timeline after a restart) - if children_exist { - panic!("Timeline grew children while we removed layer files"); + { + let mut timelines = self.timelines.lock().unwrap(); + + let children_exist = timelines + .iter() + .any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline_id)); + // XXX this can happen because `branch_timeline` doesn't check `TimelineState::Stopping`. + // We already deleted the layer files, so it's probably best to panic. + // (Ideally, above remove_dir_all is atomic so we don't see this timeline after a restart) + if children_exist { + panic!("Timeline grew children while we removed layer files"); + } + + timelines.remove(&timeline_id).expect( + "timeline that we were deleting was concurrently removed from 'timelines' map", + ); } - let removed_timeline = timelines.remove(&timeline_id); - if removed_timeline.is_none() { - // This can legitimately happen if there's a concurrent call to this function. - // T1 T2 - // lock - // unlock - // lock - // unlock - // remove files - // lock - // remove from map - // unlock - // return - // remove files - // lock - // remove from map observes empty map - // unlock - // return - debug!("concurrent call to this function won the race"); - } - drop(timelines); + + // All done! Mark the deletion as completed and release the delete_lock + *delete_lock_guard = true; + drop(delete_lock_guard); Ok(()) } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 4bfebd93df..9dd5352a54 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -236,6 +236,10 @@ pub struct Timeline { state: watch::Sender, + /// Prevent two tasks from deleting the timeline at the same time. If held, the + /// timeline is being deleted. If 'true', the timeline has already been deleted. + pub delete_lock: tokio::sync::Mutex, + eviction_task_timeline_state: tokio::sync::Mutex, } @@ -1414,6 +1418,7 @@ impl Timeline { eviction_task_timeline_state: tokio::sync::Mutex::new( EvictionTaskTimelineState::default(), ), + delete_lock: tokio::sync::Mutex::new(false), }; result.repartition_threshold = result.get_checkpoint_distance() / 10; result diff --git a/test_runner/regress/test_timeline_delete.py b/test_runner/regress/test_timeline_delete.py index 7135b621cb..99bf400207 100644 --- a/test_runner/regress/test_timeline_delete.py +++ b/test_runner/regress/test_timeline_delete.py @@ -371,7 +371,7 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload( # make the second call and assert behavior log.info("second call start") - error_msg_re = "another task is already setting the deleted_flag, started at" + error_msg_re = "timeline deletion is already in progress" with pytest.raises(PageserverApiException, match=error_msg_re) as second_call_err: ps_http.timeline_delete(env.initial_tenant, child_timeline_id) assert second_call_err.value.status_code == 500 From ccf653c1f47ab357cd7f43ca959cf3a8f6627ee9 Mon Sep 17 00:00:00 2001 From: Em Sharnoff Date: Sun, 28 May 2023 10:22:45 -0700 Subject: [PATCH 07/18] re-enable file cache integration for VM compute node (#4338) #4155 inadvertently switched to a version of the VM builder that leaves the file cache integration disabled by default. This re-enables the vm-informant's file cache integration. (as a refresher: The vm-informant is the autoscaling component that sits inside the VM and manages postgres / compute_ctl) See also: https://github.com/neondatabase/autoscaling/pull/265 --- .github/workflows/build_and_test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 336dea04eb..e00b98250c 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -797,7 +797,7 @@ jobs: - name: Build vm image run: | - ./vm-builder -src=369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}} -dst=369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}} + ./vm-builder -enable-file-cache -src=369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}} -dst=369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}} - name: Pushing vm-compute-node image run: | From f4f300732a433eb4873fc2210421bf5e4446a893 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 29 May 2023 16:52:41 +0200 Subject: [PATCH 08/18] refactor TenantState transitions (#4321) This is preliminary work for/from #4220 (async `Layer::get_value_reconstruct_data`). The motivation is to avoid locking `Tenant::timelines` in places that can't be `async`, because in #4333 we want to convert Tenant::timelines from `std::sync::Mutex` to `tokio::sync::Mutex`. But, the changes here are useful in general because they clean up & document tenant state transitions. That also paves the way for #4350, which is an alternative to #4333 that refactors the pageserver code so that we can keep the `Tenant::timelines` mutex sync. This patch consists of the following core insights and changes: * spawn_load and spawn_attach own the tenant state until they're done * once load()/attach() calls are done ... * if they failed, transition them to Broken directly (we know that there's no background activity because we didn't call activate yet) * if they succeed, call activate. We can make it infallible. How? Later. * set_broken() and set_stopping() are changed to wait for spawn_load() / spawn_attach() to finish. * This sounds scary because it might hinder detach or shutdown, but actually, concurrent attach+detach, or attach+shutdown, or load+shutdown, or attach+shutdown were just racy before this PR. So, with this change, they're not anymore. In the future, we can add a `CancellationToken` stored in Tenant to cancel `load` and `attach` faster, i.e., make `spawn_load` / `spawn_attach` transition them to Broken state sooner. See the doc comments on TenantState for the state transitions that are now possible. It might seem scary, but actually, this patch reduces the possible state transitions. We introduce a new state `TenantState::Activating` to avoid grabbing the `Tenant::timelines` lock inside the `send_modify` closure. These were the humble beginnings of this PR (see Motivation section), and I think it's still the right thing to have this `Activating` state, even if we decide against async `Tenant::timelines` mutex. The reason is that `send_modify` locks internally, and by moving locking of Tenant::timelines out of the closure, the internal locking of `send_modify` becomes a leaf of the lock graph, and so, we eliminate deadlock risk. Co-authored-by: Joonas Koivunen --- libs/pageserver_api/src/models.rs | 125 +++++++- pageserver/src/http/routes.rs | 2 +- pageserver/src/lib.rs | 1 + pageserver/src/tenant.rs | 300 ++++++++++++-------- pageserver/src/tenant/mgr.rs | 116 ++++++-- test_runner/regress/test_broken_timeline.py | 2 +- test_runner/regress/test_remote_storage.py | 2 +- test_runner/regress/test_tenant_detach.py | 4 +- test_runner/regress/test_tenants.py | 17 +- 9 files changed, 417 insertions(+), 152 deletions(-) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 540633d113..0b4457a9a5 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -18,7 +18,29 @@ use crate::reltag::RelTag; use anyhow::bail; use bytes::{BufMut, Bytes, BytesMut}; -/// A state of a tenant in pageserver's memory. +/// The state of a tenant in this pageserver. +/// +/// ```mermaid +/// stateDiagram-v2 +/// +/// [*] --> Loading: spawn_load() +/// [*] --> Attaching: spawn_attach() +/// +/// Loading --> Activating: activate() +/// Attaching --> Activating: activate() +/// Activating --> Active: infallible +/// +/// Loading --> Broken: load() failure +/// Attaching --> Broken: attach() failure +/// +/// Active --> Stopping: set_stopping(), part of shutdown & detach +/// Stopping --> Broken: late error in remove_tenant_from_memory +/// +/// Broken --> [*]: ignore / detach / shutdown +/// Stopping --> [*]: remove_from_memory complete +/// +/// Active --> Broken: cfg(testing)-only tenant break point +/// ``` #[derive( Clone, PartialEq, @@ -26,40 +48,63 @@ use bytes::{BufMut, Bytes, BytesMut}; serde::Serialize, serde::Deserialize, strum_macros::Display, - strum_macros::EnumString, strum_macros::EnumVariantNames, strum_macros::AsRefStr, strum_macros::IntoStaticStr, )] #[serde(tag = "slug", content = "data")] pub enum TenantState { - /// This tenant is being loaded from local disk + /// This tenant is being loaded from local disk. + /// + /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass. Loading, - /// This tenant is being downloaded from cloud storage. + /// This tenant is being attached to the pageserver. + /// + /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass. Attaching, - /// Tenant is fully operational + /// The tenant is transitioning from Loading/Attaching to Active. + /// + /// While in this state, the individual timelines are being activated. + /// + /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass. + Activating(ActivatingFrom), + /// The tenant has finished activating and is open for business. + /// + /// Transitions out of this state are possible through `set_stopping()` and `set_broken()`. Active, - /// A tenant is recognized by pageserver, but it is being detached or the + /// The tenant is recognized by pageserver, but it is being detached or the /// system is being shut down. + /// + /// Transitions out of this state are possible through `set_broken()`. Stopping, - /// A tenant is recognized by the pageserver, but can no longer be used for - /// any operations, because it failed to be activated. + /// The tenant is recognized by the pageserver, but can no longer be used for + /// any operations. + /// + /// If the tenant fails to load or attach, it will transition to this state + /// and it is guaranteed that no background tasks are running in its name. + /// + /// The other way to transition into this state is from `Stopping` state + /// through `set_broken()` called from `remove_tenant_from_memory()`. That happens + /// if the cleanup future executed by `remove_tenant_from_memory()` fails. Broken { reason: String, backtrace: String }, } impl TenantState { pub fn attachment_status(&self) -> TenantAttachmentStatus { use TenantAttachmentStatus::*; + + // Below TenantState::Activating is used as "transient" or "transparent" state for + // attachment_status determining. match self { // The attach procedure writes the marker file before adding the Attaching tenant to the tenants map. // So, technically, we can return Attached here. // However, as soon as Console observes Attached, it will proceed with the Postgres-level health check. // But, our attach task might still be fetching the remote timelines, etc. // So, return `Maybe` while Attaching, making Console wait for the attach task to finish. - Self::Attaching => Maybe, + Self::Attaching | Self::Activating(ActivatingFrom::Attaching) => Maybe, // tenant mgr startup distinguishes attaching from loading via marker file. // If it's loading, there is no attach marker file, i.e., attach had finished in the past. - Self::Loading => Attached, + Self::Loading | Self::Activating(ActivatingFrom::Loading) => Attached, // We only reach Active after successful load / attach. // So, call atttachment status Attached. Self::Active => Attached, @@ -98,6 +143,15 @@ impl std::fmt::Debug for TenantState { } } +/// The only [`TenantState`] variants we could be `TenantState::Activating` from. +#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub enum ActivatingFrom { + /// Arrived to [`TenantState::Activating`] from [`TenantState::Loading`] + Loading, + /// Arrived to [`TenantState::Activating`] from [`TenantState::Attaching`] + Attaching, +} + /// A state of a timeline in pageserver's memory. #[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub enum TimelineState { @@ -829,4 +883,55 @@ mod tests { err ); } + + #[test] + fn tenantstatus_activating_serde() { + let states = [ + TenantState::Activating(ActivatingFrom::Loading), + TenantState::Activating(ActivatingFrom::Attaching), + ]; + let expected = "[{\"slug\":\"Activating\",\"data\":\"Loading\"},{\"slug\":\"Activating\",\"data\":\"Attaching\"}]"; + + let actual = serde_json::to_string(&states).unwrap(); + + assert_eq!(actual, expected); + + let parsed = serde_json::from_str::>(&actual).unwrap(); + + assert_eq!(states.as_slice(), &parsed); + } + + #[test] + fn tenantstatus_activating_strum() { + // tests added, because we use these for metrics + let examples = [ + (line!(), TenantState::Loading, "Loading"), + (line!(), TenantState::Attaching, "Attaching"), + ( + line!(), + TenantState::Activating(ActivatingFrom::Loading), + "Activating", + ), + ( + line!(), + TenantState::Activating(ActivatingFrom::Attaching), + "Activating", + ), + (line!(), TenantState::Active, "Active"), + (line!(), TenantState::Stopping, "Stopping"), + ( + line!(), + TenantState::Broken { + reason: "Example".into(), + backtrace: "Looooong backtrace".into(), + }, + "Broken", + ), + ]; + + for (line, rendered, expected) in examples { + let actual: &'static str = rendered.into(); + assert_eq!(actual, expected, "example on {line}"); + } + } } diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 279f069be7..61028e23fe 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -859,7 +859,7 @@ async fn handle_tenant_break(r: Request) -> Result, ApiErro .await .map_err(|_| ApiError::Conflict(String::from("no active tenant found")))?; - tenant.set_broken("broken from test".to_owned()); + tenant.set_broken("broken from test".to_owned()).await; json_response(StatusCode::OK, ()) } diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 36578ee4e0..776cf0dac1 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -45,6 +45,7 @@ static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]); pub use crate::metrics::preinitialize_metrics; +#[tracing::instrument] pub async fn shutdown_pageserver(exit_code: i32) { // Shut down the libpq endpoint task. This prevents new connections from // being accepted. diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 991f5ca1c6..4c8101af8d 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -447,6 +447,11 @@ pub enum DeleteTimelineError { Other(#[from] anyhow::Error), } +pub enum SetStoppingError { + AlreadyStopping, + Broken, +} + struct RemoteStartupData { index_part: IndexPart, remote_metadata: TimelineMetadata, @@ -645,16 +650,17 @@ impl Tenant { "attach tenant", false, async move { - let doit = async { - tenant_clone.attach(&ctx).await?; - tenant_clone.activate(broker_client, &ctx)?; - anyhow::Ok(()) - }; - match doit.await { - Ok(_) => {} + match tenant_clone.attach(&ctx).await { + Ok(()) => { + info!("attach finished, activating"); + tenant_clone.activate(broker_client, &ctx); + } Err(e) => { - tenant_clone.set_broken(e.to_string()); - error!("error attaching tenant: {:?}", e); + error!("attach failed, setting tenant state to Broken: {:?}", e); + tenant_clone.state.send_modify(|state| { + assert_eq!(*state, TenantState::Attaching, "the attach task owns the tenant state until activation is complete"); + *state = TenantState::broken_from_reason(e.to_string()); + }); } } Ok(()) @@ -671,6 +677,8 @@ impl Tenant { /// /// Background task that downloads all data for a tenant and brings it to Active state. /// + /// No background tasks are started as part of this routine. + /// async fn attach(self: &Arc, ctx: &RequestContext) -> anyhow::Result<()> { debug_assert_current_span_has_tenant_id(); @@ -920,20 +928,20 @@ impl Tenant { "initial tenant load", false, async move { - let doit = async { - tenant_clone.load(&ctx).await?; - tenant_clone.activate(broker_client, &ctx)?; - anyhow::Ok(()) - }; - match doit.await { - Ok(()) => {} + match tenant_clone.load(&ctx).await { + Ok(()) => { + info!("load finished, activating"); + tenant_clone.activate(broker_client, &ctx); + } Err(err) => { - tenant_clone.set_broken(err.to_string()); - error!("could not load tenant {tenant_id}: {err:?}"); + error!("load failed, setting tenant state to Broken: {err:?}"); + tenant_clone.state.send_modify(|state| { + assert_eq!(*state, TenantState::Loading, "the loading task owns the tenant state until activation is complete"); + *state = TenantState::broken_from_reason(err.to_string()); + }); } } - info!("initial load for tenant {tenant_id} finished!"); - Ok(()) + Ok(()) } .instrument({ let span = tracing::info_span!(parent: None, "load", tenant_id=%tenant_id); @@ -951,6 +959,7 @@ impl Tenant { /// Background task to load in-memory data structures for this tenant, from /// files on disk. Used at pageserver startup. /// + /// No background tasks are started as part of this routine. async fn load(self: &Arc, ctx: &RequestContext) -> anyhow::Result<()> { debug_assert_current_span_has_tenant_id(); @@ -1657,130 +1666,191 @@ impl Tenant { } /// Changes tenant status to active, unless shutdown was already requested. - fn activate( - self: &Arc, - broker_client: BrokerClientChannel, - ctx: &RequestContext, - ) -> anyhow::Result<()> { + fn activate(self: &Arc, broker_client: BrokerClientChannel, ctx: &RequestContext) { debug_assert_current_span_has_tenant_id(); - let mut result = Ok(()); + let mut activating = false; self.state.send_modify(|current_state| { + use pageserver_api::models::ActivatingFrom; match &*current_state { - TenantState::Active => { - // activate() was called on an already Active tenant. Shouldn't happen. - result = Err(anyhow::anyhow!("Tenant is already active")); + TenantState::Activating(_) | TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping => { + panic!("caller is responsible for calling activate() only on Loading / Attaching tenants, got {state:?}", state = current_state); } - TenantState::Broken { reason, .. } => { - // This shouldn't happen either - result = Err(anyhow::anyhow!( - "Could not activate tenant because it is in broken state due to: {reason}", - )); + TenantState::Loading => { + *current_state = TenantState::Activating(ActivatingFrom::Loading); } - TenantState::Stopping => { - // The tenant was detached, or system shutdown was requested, while we were - // loading or attaching the tenant. - info!("Tenant is already in Stopping state, skipping activation"); - } - TenantState::Loading | TenantState::Attaching => { - *current_state = TenantState::Active; - - debug!(tenant_id = %self.tenant_id, "Activating tenant"); - - let timelines_accessor = self.timelines.lock().unwrap(); - let not_broken_timelines = timelines_accessor - .values() - .filter(|timeline| timeline.current_state() != TimelineState::Broken); - - // Spawn gc and compaction loops. The loops will shut themselves - // down when they notice that the tenant is inactive. - tasks::start_background_loops(self); - - let mut activated_timelines = 0; - - for timeline in not_broken_timelines { - timeline.activate(broker_client.clone(), ctx); - activated_timelines += 1; - } - - let elapsed = self.loading_started_at.elapsed(); - let total_timelines = timelines_accessor.len(); - - // log a lot of stuff, because some tenants sometimes suffer from user-visible - // times to activate. see https://github.com/neondatabase/neon/issues/4025 - info!( - since_creation_millis = elapsed.as_millis(), - tenant_id = %self.tenant_id, - activated_timelines, - total_timelines, - post_state = <&'static str>::from(&*current_state), - "activation attempt finished" - ); + TenantState::Attaching => { + *current_state = TenantState::Activating(ActivatingFrom::Attaching); } } + debug!(tenant_id = %self.tenant_id, "Activating tenant"); + activating = true; + // Continue outside the closure. We need to grab timelines.lock() + // and we plan to turn it into a tokio::sync::Mutex in a future patch. }); - result + + if activating { + let timelines_accessor = self.timelines.lock().unwrap(); + let not_broken_timelines = timelines_accessor + .values() + .filter(|timeline| timeline.current_state() != TimelineState::Broken); + + // Spawn gc and compaction loops. The loops will shut themselves + // down when they notice that the tenant is inactive. + tasks::start_background_loops(self); + + let mut activated_timelines = 0; + + for timeline in not_broken_timelines { + timeline.activate(broker_client.clone(), ctx); + activated_timelines += 1; + } + + self.state.send_modify(move |current_state| { + assert!( + matches!(current_state, TenantState::Activating(_)), + "set_stopping and set_broken wait for us to leave Activating state", + ); + *current_state = TenantState::Active; + + let elapsed = self.loading_started_at.elapsed(); + let total_timelines = timelines_accessor.len(); + + // log a lot of stuff, because some tenants sometimes suffer from user-visible + // times to activate. see https://github.com/neondatabase/neon/issues/4025 + info!( + since_creation_millis = elapsed.as_millis(), + tenant_id = %self.tenant_id, + activated_timelines, + total_timelines, + post_state = <&'static str>::from(&*current_state), + "activation attempt finished" + ); + }); + } } - /// Change tenant status to Stopping, to mark that it is being shut down - pub fn set_stopping(&self) { - self.state.send_modify(|current_state| { - match current_state { - TenantState::Active | TenantState::Loading | TenantState::Attaching => { - *current_state = TenantState::Stopping; + /// Change tenant status to Stopping, to mark that it is being shut down. + /// + /// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state. + /// + /// This function is not cancel-safe! + pub async fn set_stopping(&self) -> Result<(), SetStoppingError> { + let mut rx = self.state.subscribe(); - // FIXME: If the tenant is still Loading or Attaching, new timelines - // might be created after this. That's harmless, as the Timelines - // won't be accessible to anyone, when the Tenant is in Stopping - // state. - let timelines_accessor = self.timelines.lock().unwrap(); - let not_broken_timelines = timelines_accessor - .values() - .filter(|timeline| timeline.current_state() != TimelineState::Broken); - for timeline in not_broken_timelines { - timeline.set_state(TimelineState::Stopping); - } - } - TenantState::Broken { reason, .. } => { - info!("Cannot set tenant to Stopping state, it is in Broken state due to: {reason}"); - } - TenantState::Stopping => { - // The tenant was detached, or system shutdown was requested, while we were - // loading or attaching the tenant. - info!("Tenant is already in Stopping state"); - } + // cannot stop before we're done activating, so wait out until we're done activating + rx.wait_for(|state| match state { + TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => { + info!( + "waiting for {} to turn Active|Broken|Stopping", + <&'static str>::from(state) + ); + false + } + TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping {} => true, + }) + .await + .expect("cannot drop self.state while on a &self method"); + + // we now know we're done activating, let's see whether this task is the winner to transition into Stopping + let mut err = None; + let stopping = self.state.send_if_modified(|current_state| match current_state { + TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => { + unreachable!("we ensured above that we're done with activation, and, there is no re-activation") + } + TenantState::Active => { + // FIXME: due to time-of-check vs time-of-use issues, it can happen that new timelines + // are created after the transition to Stopping. That's harmless, as the Timelines + // won't be accessible to anyone afterwards, because the Tenant is in Stopping state. + *current_state = TenantState::Stopping; + // Continue stopping outside the closure. We need to grab timelines.lock() + // and we plan to turn it into a tokio::sync::Mutex in a future patch. + true + } + TenantState::Broken { reason, .. } => { + info!( + "Cannot set tenant to Stopping state, it is in Broken state due to: {reason}" + ); + err = Some(SetStoppingError::Broken); + false + } + TenantState::Stopping => { + info!("Tenant is already in Stopping state"); + err = Some(SetStoppingError::AlreadyStopping); + false } }); + match (stopping, err) { + (true, None) => {} // continue + (false, Some(err)) => return Err(err), + (true, Some(_)) => unreachable!( + "send_if_modified closure must error out if not transitioning to Stopping" + ), + (false, None) => unreachable!( + "send_if_modified closure must return true if transitioning to Stopping" + ), + } + + let timelines_accessor = self.timelines.lock().unwrap(); + let not_broken_timelines = timelines_accessor + .values() + .filter(|timeline| timeline.current_state() != TimelineState::Broken); + for timeline in not_broken_timelines { + timeline.set_state(TimelineState::Stopping); + } + Ok(()) } - pub fn set_broken(&self, reason: String) { + /// Method for tenant::mgr to transition us into Broken state in case of a late failure in + /// `remove_tenant_from_memory` + /// + /// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state. + /// + /// In tests, we also use this to set tenants to Broken state on purpose. + pub(crate) async fn set_broken(&self, reason: String) { + let mut rx = self.state.subscribe(); + + // The load & attach routines own the tenant state until it has reached `Active`. + // So, wait until it's done. + rx.wait_for(|state| match state { + TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => { + info!( + "waiting for {} to turn Active|Broken|Stopping", + <&'static str>::from(state) + ); + false + } + TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping {} => true, + }) + .await + .expect("cannot drop self.state while on a &self method"); + + // we now know we're done activating, let's see whether this task is the winner to transition into Broken self.state.send_modify(|current_state| { match *current_state { + TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => { + unreachable!("we ensured above that we're done with activation, and, there is no re-activation") + } TenantState::Active => { - // Broken tenants can currently only used for fatal errors that happen - // while loading or attaching a tenant. A tenant that has already been - // activated should never be marked as broken. We cope with it the best - // we can, but it shouldn't happen. - warn!("Changing Active tenant to Broken state, reason: {}", reason); - *current_state = TenantState::broken_from_reason(reason); + if cfg!(feature = "testing") { + warn!("Changing Active tenant to Broken state, reason: {}", reason); + *current_state = TenantState::broken_from_reason(reason); + } else { + unreachable!("not allowed to call set_broken on Active tenants in non-testing builds") + } } TenantState::Broken { .. } => { - // This shouldn't happen either warn!("Tenant is already in Broken state"); } + // This is the only "expected" path, any other path is a bug. TenantState::Stopping => { - // This shouldn't happen either warn!( "Marking Stopping tenant as Broken state, reason: {}", reason ); *current_state = TenantState::broken_from_reason(reason); } - TenantState::Loading | TenantState::Attaching => { - info!("Setting tenant as Broken state, reason: {}", reason); - *current_state = TenantState::broken_from_reason(reason); - } - } + } }); } @@ -1793,7 +1863,7 @@ impl Tenant { loop { let current_state = receiver.borrow_and_update().clone(); match current_state { - TenantState::Loading | TenantState::Attaching => { + TenantState::Loading | TenantState::Attaching | TenantState::Activating(_) => { // in these states, there's a chance that we can reach ::Active receiver.changed().await.map_err( |_e: tokio::sync::watch::error::RecvError| { diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index dbb9577bf0..c0bd81ebfc 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -10,6 +10,7 @@ use tokio::fs; use anyhow::Context; use once_cell::sync::Lazy; use tokio::sync::RwLock; +use tokio::task::JoinSet; use tracing::*; use remote_storage::GenericRemoteStorage; @@ -19,7 +20,9 @@ use crate::config::PageServerConf; use crate::context::{DownloadBehavior, RequestContext}; use crate::task_mgr::{self, TaskKind}; use crate::tenant::config::TenantConfOpt; -use crate::tenant::{create_tenant_files, CreateTenantFilesMode, Tenant, TenantState}; +use crate::tenant::{ + create_tenant_files, CreateTenantFilesMode, SetStoppingError, Tenant, TenantState, +}; use crate::IGNORED_TENANT_FILE_NAME; use utils::fs_ext::PathExt; @@ -222,6 +225,7 @@ pub fn schedule_local_tenant_processing( /// That could be easily misinterpreted by control plane, the consumer of the /// management API. For example, it could attach the tenant on a different pageserver. /// We would then be in split-brain once this pageserver restarts. +#[instrument] pub async fn shutdown_all_tenants() { // Prevent new tenants from being created. let tenants_to_shut_down = { @@ -244,15 +248,65 @@ pub async fn shutdown_all_tenants() { } }; + // Set tenant (and its timlines) to Stoppping state. + // + // Since we can only transition into Stopping state after activation is complete, + // run it in a JoinSet so all tenants have a chance to stop before we get SIGKILLed. + // + // Transitioning tenants to Stopping state has a couple of non-obvious side effects: + // 1. Lock out any new requests to the tenants. + // 2. Signal cancellation to WAL receivers (we wait on it below). + // 3. Signal cancellation for other tenant background loops. + // 4. ??? + // + // The waiting for the cancellation is not done uniformly. + // We certainly wait for WAL receivers to shut down. + // That is necessary so that no new data comes in before the freeze_and_flush. + // But the tenant background loops are joined-on in our caller. + // It's mesed up. + let mut join_set = JoinSet::new(); let mut tenants_to_freeze_and_flush = Vec::with_capacity(tenants_to_shut_down.len()); - for (_, tenant) in tenants_to_shut_down { - if tenant.is_active() { - // updates tenant state, forbidding new GC and compaction iterations from starting - tenant.set_stopping(); - tenants_to_freeze_and_flush.push(tenant); + for (tenant_id, tenant) in tenants_to_shut_down { + join_set.spawn( + async move { + match tenant.set_stopping().await { + Ok(()) => debug!("tenant successfully stopped"), + Err(SetStoppingError::Broken) => { + info!("tenant is broken, so stopping failed, freeze_and_flush is likely going to make noise as well"); + }, + Err(SetStoppingError::AlreadyStopping) => { + // our task_mgr::shutdown_tasks are going to coalesce on that just fine + } + } + + tenant + } + .instrument(info_span!("set_stopping", %tenant_id)), + ); + } + + let mut panicked = 0; + + while let Some(res) = join_set.join_next().await { + match res { + Err(join_error) if join_error.is_cancelled() => { + unreachable!("we are not cancelling any of the futures"); + } + Err(join_error) if join_error.is_panic() => { + // cannot really do anything, as this panic is likely a bug + panicked += 1; + } + Err(join_error) => { + warn!("unknown kind of JoinError: {join_error}"); + } + Ok(tenant) => tenants_to_freeze_and_flush.push(tenant), } } + if panicked > 0 { + warn!(panicked, "observed panicks while stopping tenants"); + } + // Shut down all existing walreceiver connections and stop accepting the new ones. task_mgr::shutdown_tasks(Some(TaskKind::WalReceiverManager), None, None).await; @@ -264,12 +318,30 @@ pub async fn shutdown_all_tenants() { // should be no more activity in any of the repositories. // // On error, log it but continue with the shutdown for other tenants. + + let mut join_set = tokio::task::JoinSet::new(); + for tenant in tenants_to_freeze_and_flush { let tenant_id = tenant.tenant_id(); - debug!("shutdown tenant {tenant_id}"); - if let Err(err) = tenant.freeze_and_flush().await { - error!("Could not checkpoint tenant {tenant_id} during shutdown: {err:?}"); + join_set.spawn( + async move { + if let Err(err) = tenant.freeze_and_flush().await { + warn!("Could not checkpoint tenant during shutdown: {err:?}"); + } + } + .instrument(info_span!("freeze_and_flush", %tenant_id)), + ); + } + + while let Some(next) = join_set.join_next().await { + match next { + Ok(()) => {} + Err(join_error) if join_error.is_cancelled() => { + unreachable!("no cancelling") + } + Err(join_error) if join_error.is_panic() => { /* reported already */ } + Err(join_error) => warn!("unknown kind of JoinError: {join_error}"), } } } @@ -589,13 +661,23 @@ where { let tenants_accessor = TENANTS.write().await; match tenants_accessor.get(&tenant_id) { - Some(tenant) => match tenant.current_state() { - TenantState::Attaching - | TenantState::Loading - | TenantState::Broken { .. } - | TenantState::Active => tenant.set_stopping(), - TenantState::Stopping => return Err(TenantStateError::IsStopping(tenant_id)), - }, + Some(tenant) => { + let tenant = Arc::clone(tenant); + // don't hold TENANTS lock while set_stopping waits for activation to finish + drop(tenants_accessor); + match tenant.set_stopping().await { + Ok(()) => { + // we won, continue stopping procedure + } + Err(SetStoppingError::Broken) => { + // continue the procedure, let's hope the closure can deal with broken tenants + } + Err(SetStoppingError::AlreadyStopping) => { + // the tenant is already stopping or broken, don't do anything + return Err(TenantStateError::IsStopping(tenant_id)); + } + } + } None => return Err(TenantStateError::NotFound(tenant_id)), } } @@ -620,7 +702,7 @@ where let tenants_accessor = TENANTS.read().await; match tenants_accessor.get(&tenant_id) { Some(tenant) => { - tenant.set_broken(e.to_string()); + tenant.set_broken(e.to_string()).await; } None => { warn!("Tenant {tenant_id} got removed from memory"); diff --git a/test_runner/regress/test_broken_timeline.py b/test_runner/regress/test_broken_timeline.py index fb592bfbc3..0fb3b4f262 100644 --- a/test_runner/regress/test_broken_timeline.py +++ b/test_runner/regress/test_broken_timeline.py @@ -20,7 +20,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder): ".*is not active. Current state: Broken.*", ".*will not become active. Current state: Broken.*", ".*failed to load metadata.*", - ".*could not load tenant.*load local timeline.*", + ".*load failed.*load local timeline.*", ] ) diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index 02f1aac99c..aefc8befeb 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -140,7 +140,7 @@ def test_remote_storage_backup_and_restore( # This is before the failures injected by test_remote_failures, so it's a permanent error. pageserver_http.configure_failpoints(("storage-sync-list-remote-timelines", "return")) env.pageserver.allowed_errors.append( - ".*error attaching tenant: storage-sync-list-remote-timelines", + ".*attach failed.*: storage-sync-list-remote-timelines", ) # Attach it. This HTTP request will succeed and launch a # background task to load the tenant. In that background task, diff --git a/test_runner/regress/test_tenant_detach.py b/test_runner/regress/test_tenant_detach.py index 82664cff94..f5e0e34bc9 100644 --- a/test_runner/regress/test_tenant_detach.py +++ b/test_runner/regress/test_tenant_detach.py @@ -647,7 +647,9 @@ def test_ignored_tenant_stays_broken_without_metadata( metadata_removed = True assert metadata_removed, f"Failed to find metadata file in {tenant_timeline_dir}" - env.pageserver.allowed_errors.append(".*could not load tenant .*?: failed to load metadata.*") + env.pageserver.allowed_errors.append( + f".*{tenant_id}.*: load failed.*: failed to load metadata.*" + ) # now, load it from the local files and expect it to be broken due to inability to load tenant files into memory pageserver_http.tenant_load(tenant_id=tenant_id) diff --git a/test_runner/regress/test_tenants.py b/test_runner/regress/test_tenants.py index 6599fa7ba5..59b7b574cd 100644 --- a/test_runner/regress/test_tenants.py +++ b/test_runner/regress/test_tenants.py @@ -22,6 +22,7 @@ from fixtures.neon_fixtures import ( available_remote_storages, ) from fixtures.types import Lsn, TenantId, TimelineId +from fixtures.utils import wait_until from prometheus_client.samples import Sample @@ -308,9 +309,7 @@ def test_pageserver_with_empty_tenants( env.pageserver.allowed_errors.append( ".*marking .* as locally complete, while it doesnt exist in remote index.*" ) - env.pageserver.allowed_errors.append( - ".*could not load tenant.*Failed to list timelines directory.*" - ) + env.pageserver.allowed_errors.append(".*load failed.*Failed to list timelines directory.*") client = env.pageserver.http_client() @@ -341,9 +340,15 @@ def test_pageserver_with_empty_tenants( env.pageserver.start() client = env.pageserver.http_client() - tenants = client.tenant_list() - assert len(tenants) == 2 + def not_loading(): + tenants = client.tenant_list() + assert len(tenants) == 2 + assert all(t["state"]["slug"] != "Loading" for t in tenants) + + wait_until(10, 0.2, not_loading) + + tenants = client.tenant_list() [broken_tenant] = [t for t in tenants if t["id"] == str(tenant_without_timelines_dir)] assert ( @@ -355,7 +360,7 @@ def test_pageserver_with_empty_tenants( broken_tenant_status["state"]["slug"] == "Broken" ), f"Tenant {tenant_without_timelines_dir} without timelines dir should be broken" - assert env.pageserver.log_contains(".*Setting tenant as Broken state, reason:.*") + assert env.pageserver.log_contains(".*load failed, setting tenant state to Broken:.*") [loaded_tenant] = [t for t in tenants if t["id"] == str(tenant_with_empty_timelines)] assert ( From cb834957446b39bf67e0fbacaeef669f572c9ca4 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Mon, 29 May 2023 21:48:38 +0300 Subject: [PATCH 09/18] try: startup speedup (#4366) Startup can take a long time. We suspect it's the initial logical size calculations. Long term solution is to not block the tokio executors but do most of I/O in spawn_blocking. See: #4025, #4183 Short-term solution to above: - Delay global background tasks until initial tenant loads complete - Just limit how many init logical size calculations can we have at the same time to `cores / 2` This PR is for trying in staging. --- pageserver/src/bin/pageserver.rs | 33 ++++++++++++++++++++++ pageserver/src/disk_usage_eviction_task.rs | 5 ++++ pageserver/src/tenant.rs | 4 +++ pageserver/src/tenant/mgr.rs | 18 +++++++++--- pageserver/src/tenant/timeline.rs | 17 +++++++++++ 5 files changed, 73 insertions(+), 4 deletions(-) diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index d9d3d9d662..cbc97e7228 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -335,13 +335,36 @@ fn start_pageserver( // Set up remote storage client let remote_storage = create_remote_storage_client(conf)?; + // All tenant load operations carry this while they are ongoing; it will be dropped once those + // operations finish either successfully or in some other manner. However, the initial load + // will be then done, and we can start the global background tasks. + let (init_done_tx, init_done_rx) = tokio::sync::mpsc::channel::<()>(1); + let init_done_rx = Arc::new(tokio::sync::Mutex::new(init_done_rx)); + // Scan the local 'tenants/' directory and start loading the tenants + let init_started_at = std::time::Instant::now(); BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr( conf, broker_client.clone(), remote_storage.clone(), + init_done_tx, ))?; + BACKGROUND_RUNTIME.spawn({ + let init_done_rx = init_done_rx.clone(); + async move { + let init_done = async move { init_done_rx.lock().await.recv().await }; + init_done.await; + + let elapsed = init_started_at.elapsed(); + + tracing::info!( + elapsed_millis = elapsed.as_millis(), + "Initial load completed." + ); + } + }); + // shared state between the disk-usage backed eviction background task and the http endpoint // that allows triggering disk-usage based eviction manually. note that the http endpoint // is still accessible even if background task is not configured as long as remote storage has @@ -353,6 +376,7 @@ fn start_pageserver( conf, remote_storage.clone(), disk_usage_eviction_state.clone(), + init_done_rx.clone(), )?; } @@ -390,6 +414,7 @@ fn start_pageserver( ); if let Some(metric_collection_endpoint) = &conf.metric_collection_endpoint { + let init_done_rx = init_done_rx; let metrics_ctx = RequestContext::todo_child( TaskKind::MetricsCollection, // This task itself shouldn't download anything. @@ -405,6 +430,14 @@ fn start_pageserver( "consumption metrics collection", true, async move { + // first wait for initial load to complete before first iteration. + // + // this is because we only process active tenants and timelines, and the + // Timeline::get_current_logical_size will spawn the logical size calculation, + // which will not be rate-limited. + let init_done = async move { init_done_rx.lock().await.recv().await }; + init_done.await; + pageserver::consumption_metrics::collect_metrics( metric_collection_endpoint, conf.metric_collection_interval, diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index f4a0f3f18e..0358969199 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -82,6 +82,7 @@ pub fn launch_disk_usage_global_eviction_task( conf: &'static PageServerConf, storage: GenericRemoteStorage, state: Arc, + init_done_rx: Arc>>, ) -> anyhow::Result<()> { let Some(task_config) = &conf.disk_usage_based_eviction else { info!("disk usage based eviction task not configured"); @@ -98,6 +99,10 @@ pub fn launch_disk_usage_global_eviction_task( "disk usage based eviction", false, async move { + // wait until initial load is complete, because we cannot evict from loading tenants. + let init_done = async move { init_done_rx.lock().await.recv().await }; + init_done.await; + disk_usage_eviction_task( &state, task_config, diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 4c8101af8d..d6eb824107 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -895,6 +895,7 @@ impl Tenant { tenant_id: TenantId, broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, + init_done_tx: Option>, ctx: &RequestContext, ) -> Arc { let tenant_conf = match Self::load_tenant_config(conf, tenant_id) { @@ -928,6 +929,9 @@ impl Tenant { "initial tenant load", false, async move { + // keep the sender alive as long as we have the initial load ongoing; it will be + // None for loads spawned after init_tenant_mgr. + let _init_done_tx = init_done_tx; match tenant_clone.load(&ctx).await { Ok(()) => { info!("load finished, activating"); diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index c0bd81ebfc..d74a025bbb 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -66,6 +66,7 @@ pub async fn init_tenant_mgr( conf: &'static PageServerConf, broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, + init_done_tx: tokio::sync::mpsc::Sender<()>, ) -> anyhow::Result<()> { // Scan local filesystem for attached tenants let tenants_dir = conf.tenants_path(); @@ -122,6 +123,7 @@ pub async fn init_tenant_mgr( &tenant_dir_path, broker_client.clone(), remote_storage.clone(), + Some(init_done_tx.clone()), &ctx, ) { Ok(tenant) => { @@ -157,6 +159,7 @@ pub fn schedule_local_tenant_processing( tenant_path: &Path, broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, + init_done_tx: Option>, ctx: &RequestContext, ) -> anyhow::Result> { anyhow::ensure!( @@ -210,7 +213,14 @@ pub fn schedule_local_tenant_processing( } else { info!("tenant {tenant_id} is assumed to be loadable, starting load operation"); // Start loading the tenant into memory. It will initially be in Loading state. - Tenant::spawn_load(conf, tenant_id, broker_client, remote_storage, ctx) + Tenant::spawn_load( + conf, + tenant_id, + broker_client, + remote_storage, + init_done_tx, + ctx, + ) }; Ok(tenant) } @@ -363,7 +373,7 @@ pub async fn create_tenant( // See https://github.com/neondatabase/neon/issues/4233 let created_tenant = - schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, ctx)?; + schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, None, ctx)?; // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here. // See https://github.com/neondatabase/neon/issues/4233 @@ -509,7 +519,7 @@ pub async fn load_tenant( .with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?; } - let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, ctx) + let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, None, ctx) .with_context(|| { format!("Failed to schedule tenant processing in path {tenant_path:?}") })?; @@ -582,7 +592,7 @@ pub async fn attach_tenant( .context("check for attach marker file existence")?; anyhow::ensure!(marker_file_exists, "create_tenant_files should have created the attach marker file"); - let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), ctx)?; + let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), None, ctx)?; // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here. // See https://github.com/neondatabase/neon/issues/4233 diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 9dd5352a54..0569bd45e0 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1910,6 +1910,23 @@ impl Timeline { // no cancellation here, because nothing really waits for this to complete compared // to spawn_ondemand_logical_size_calculation. let cancel = CancellationToken::new(); + + /// Ugly, but necessary until `spawn_blocking` is used for blocking I/O, otherwise + /// we could lock up all worker threads. + static GLOBAL_INITIAL_LOGICAL_SIZES_AT_ONCE: once_cell::sync::Lazy> = once_cell::sync::Lazy::new(|| { + let cores = std::thread::available_parallelism(); + // half rationale: we have other blocking work which will start later: + // consumption metrics and per timeline eviction task. we however need to + // be fast to accept page reads, so perhaps this is a suitable middle ground? + let max_blocked_threads = cores.map(|count| count.get() / 2); + let max_blocked_threads = max_blocked_threads.unwrap_or(1); + let max_blocked_threads = std::cmp::max(1, max_blocked_threads); + tracing::info!("using max {max_blocked_threads} threads for initial logical size"); + Arc::new(tokio::sync::Semaphore::new(max_blocked_threads)) + }); + + let _permit = GLOBAL_INITIAL_LOGICAL_SIZES_AT_ONCE.clone().acquire_owned().await.expect("global semaphore is never closed"); + let calculated_size = match self_clone .logical_size_calculation_task(lsn, LogicalSizeCalculationCause::Initial, &background_ctx, cancel) .await From db1435536779b37a37da57774f07a233975bdd25 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Tue, 30 May 2023 10:40:37 +0300 Subject: [PATCH 10/18] revert: static global init logical size limiter (#4368) added in #4366. revert for testing without it; it may have unintenteded side-effects, and it's very difficult to understand the results from the 10k load testing environments. earlier results: https://github.com/neondatabase/neon/pull/4366#issuecomment-1567491064 --- pageserver/src/tenant/timeline.rs | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 0569bd45e0..5c889e804c 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1911,22 +1911,6 @@ impl Timeline { // to spawn_ondemand_logical_size_calculation. let cancel = CancellationToken::new(); - /// Ugly, but necessary until `spawn_blocking` is used for blocking I/O, otherwise - /// we could lock up all worker threads. - static GLOBAL_INITIAL_LOGICAL_SIZES_AT_ONCE: once_cell::sync::Lazy> = once_cell::sync::Lazy::new(|| { - let cores = std::thread::available_parallelism(); - // half rationale: we have other blocking work which will start later: - // consumption metrics and per timeline eviction task. we however need to - // be fast to accept page reads, so perhaps this is a suitable middle ground? - let max_blocked_threads = cores.map(|count| count.get() / 2); - let max_blocked_threads = max_blocked_threads.unwrap_or(1); - let max_blocked_threads = std::cmp::max(1, max_blocked_threads); - tracing::info!("using max {max_blocked_threads} threads for initial logical size"); - Arc::new(tokio::sync::Semaphore::new(max_blocked_threads)) - }); - - let _permit = GLOBAL_INITIAL_LOGICAL_SIZES_AT_ONCE.clone().acquire_owned().await.expect("global semaphore is never closed"); - let calculated_size = match self_clone .logical_size_calculation_task(lsn, LogicalSizeCalculationCause::Initial, &background_ctx, cancel) .await From daa79b150f8d81430d9d3aa92c9cdc8c5568e377 Mon Sep 17 00:00:00 2001 From: Alexander Bayandin Date: Tue, 30 May 2023 14:05:41 +0100 Subject: [PATCH 11/18] Code Coverage: store lcov report (#4358) ## Problem In the future, we want to compare code coverage on a PR with coverage on the main branch. Currently, we store only code coverage HTML reports, I suggest we start storing reports in "lcov info" format that we can use/parse in the future. Currently, the file size is ~7Mb (it's a text-based format and could be compressed into a ~400Kb archive) - More about "lcov info" format: https://manpages.ubuntu.com/manpages/jammy/man1/geninfo.1.html#files - Part of https://github.com/neondatabase/neon/issues/3543 ## Summary of changes - Change `scripts/coverage` to output lcov coverage to `report/lcov.info` file instead of stdout (we already upload the whole `report/` directory to S3) --- .github/workflows/build_and_test.yml | 11 ++++++++--- scripts/coverage | 21 +++++++++++++++------ 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index e00b98250c..b732095f8f 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -492,19 +492,24 @@ jobs: env: COMMIT_URL: ${{ github.server_url }}/${{ github.repository }}/commit/${{ github.event.pull_request.head.sha || github.sha }} run: | - scripts/coverage \ - --dir=/tmp/coverage report \ + scripts/coverage --dir=/tmp/coverage \ + report \ --input-objects=/tmp/coverage/binaries.list \ --commit-url=${COMMIT_URL} \ --format=github + scripts/coverage --dir=/tmp/coverage \ + report \ + --input-objects=/tmp/coverage/binaries.list \ + --format=lcov + - name: Upload coverage report id: upload-coverage-report env: BUCKET: neon-github-public-dev COMMIT_SHA: ${{ github.event.pull_request.head.sha || github.sha }} run: | - aws s3 cp --only-show-errors --recursive /tmp/coverage/report s3://neon-github-public-dev/code-coverage/${COMMIT_SHA} + aws s3 cp --only-show-errors --recursive /tmp/coverage/report s3://${BUCKET}/code-coverage/${COMMIT_SHA} REPORT_URL=https://${BUCKET}.s3.amazonaws.com/code-coverage/${COMMIT_SHA}/index.html echo "report-url=${REPORT_URL}" >> $GITHUB_OUTPUT diff --git a/scripts/coverage b/scripts/coverage index 1dc92e57cc..52a69c93b9 100755 --- a/scripts/coverage +++ b/scripts/coverage @@ -156,7 +156,9 @@ class LLVM: profdata: Path, objects: List[str], sources: List[str], - demangler: Optional[Path] = None) -> None: + demangler: Optional[Path] = None, + output_file: Optional[Path] = None, + ) -> None: cwd = self.cargo.cwd objects = list(intersperse('-object', objects)) @@ -180,14 +182,18 @@ class LLVM: *objects, *sources, ] - subprocess.check_call(cmd, cwd=cwd) + if output_file is not None: + with output_file.open('w') as outfile: + subprocess.check_call(cmd, cwd=cwd, stdout=outfile) + else: + subprocess.check_call(cmd, cwd=cwd) def cov_report(self, **kwargs) -> None: self._cov(subcommand='report', **kwargs) - def cov_export(self, *, kind: str, **kwargs) -> None: + def cov_export(self, *, kind: str, output_file: Optional[Path], **kwargs) -> None: extras = (f'-format={kind}', ) - self._cov(subcommand='export', *extras, **kwargs) + self._cov(subcommand='export', *extras, output_file=output_file, **kwargs) def cov_show(self, *, kind: str, output_dir: Optional[Path] = None, **kwargs) -> None: extras = [f'-format={kind}'] @@ -283,9 +289,12 @@ class TextReport(Report): self.llvm.cov_show(kind='text', **self._common_kwargs()) +@dataclass class LcovReport(Report): + output_file: Path + def generate(self) -> None: - self.llvm.cov_export(kind='lcov', **self._common_kwargs()) + self.llvm.cov_export(kind='lcov', output_file=self.output_file, **self._common_kwargs()) @dataclass @@ -475,7 +484,7 @@ class State: 'text': lambda: TextReport(**params), 'lcov': - lambda: LcovReport(**params), + lambda: LcovReport(**params, output_file=self.report_dir / 'lcov.info'), 'summary': lambda: SummaryReport(**params), 'github': From 210be6b6aba377592aa9aaefcea51c6427d22dac Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Tue, 30 May 2023 16:08:02 +0300 Subject: [PATCH 12/18] Replace broker duration logs with metrics (#4370) I've added logs for broker push duration after every iteration in https://github.com/neondatabase/neon/pull/4142. This log has not found any real issues, so we can replace it with metrics, to slightly reduce log volume. LogQL query found that pushes longer that 500ms happened only 90 times for the last month. https://neonprod.grafana.net/goto/KTNj9UwVg?orgId=1 `{unit="safekeeper.service"} |= "timeline updates to broker in" | regexp "to broker in (?P.*)" | duration > 500ms` --- safekeeper/src/broker.rs | 12 ++++++++++-- safekeeper/src/metrics.rs | 19 +++++++++++++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/safekeeper/src/broker.rs b/safekeeper/src/broker.rs index 5e25d22ec1..48c56ee58f 100644 --- a/safekeeper/src/broker.rs +++ b/safekeeper/src/broker.rs @@ -19,8 +19,10 @@ use tokio::task::JoinHandle; use tokio::{runtime, time::sleep}; use tracing::*; +use crate::metrics::BROKER_ITERATION_TIMELINES; use crate::metrics::BROKER_PULLED_UPDATES; use crate::metrics::BROKER_PUSHED_UPDATES; +use crate::metrics::BROKER_PUSH_ALL_UPDATES_SECONDS; use crate::GlobalTimelines; use crate::SafeKeeperConf; @@ -61,8 +63,14 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> { BROKER_PUSHED_UPDATES.inc(); } let elapsed = now.elapsed(); - // Log duration every second. Should be about 10MB of logs per day. - info!("pushed {} timeline updates to broker in {:?}", active_tlis.len(), elapsed); + + BROKER_PUSH_ALL_UPDATES_SECONDS.observe(elapsed.as_secs_f64()); + BROKER_ITERATION_TIMELINES.observe(active_tlis.len() as f64); + + if elapsed > push_interval / 2 { + info!("broker push is too long, pushed {} timeline updates to broker in {:?}", active_tlis.len(), elapsed); + } + sleep(push_interval).await; } }; diff --git a/safekeeper/src/metrics.rs b/safekeeper/src/metrics.rs index 189af2b044..235a88501d 100644 --- a/safekeeper/src/metrics.rs +++ b/safekeeper/src/metrics.rs @@ -125,6 +125,25 @@ pub static BACKUP_ERRORS: Lazy = Lazy::new(|| { ) .expect("Failed to register safekeeper_backup_errors_total counter") }); +pub static BROKER_PUSH_ALL_UPDATES_SECONDS: Lazy = Lazy::new(|| { + register_histogram!( + "safekeeper_broker_push_update_seconds", + "Seconds to push all timeline updates to the broker", + DISK_WRITE_SECONDS_BUCKETS.to_vec() + ) + .expect("Failed to register safekeeper_broker_push_update_seconds histogram vec") +}); +pub const TIMELINES_COUNT_BUCKETS: &[f64] = &[ + 1.0, 10.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0, +]; +pub static BROKER_ITERATION_TIMELINES: Lazy = Lazy::new(|| { + register_histogram!( + "safekeeper_broker_iteration_timelines", + "Count of timelines pushed to the broker in a single iteration", + TIMELINES_COUNT_BUCKETS.to_vec() + ) + .expect("Failed to register safekeeper_broker_iteration_timelines histogram vec") +}); pub const LABEL_UNKNOWN: &str = "unknown"; From f4db85de404f2bba8d8cede7440a1fa654d53ce6 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Tue, 30 May 2023 16:25:07 +0300 Subject: [PATCH 13/18] Continued startup speedup (#4372) Startup continues to be slow, work towards to alleviate it. Summary of changes: - pretty the functional improvements from #4366 into `utils::completion::{Completion, Barrier}` - extend "initial load completion" usage up to tenant background tasks - previously only global background tasks - spawn_blocking the tenant load directory traversal - demote some logging - remove some unwraps - propagate some spans to `spawn_blocking` Runtime effects should be major speedup to loading, but after that, the `BACKGROUND_RUNTIME` will be blocked for a long time (minutes). Possible follow-ups: - complete initial tenant sizes before allowing background tasks to block the `BACKGROUND_RUNTIME` --- libs/utils/src/completion.rs | 33 ++++ libs/utils/src/lib.rs | 3 + pageserver/src/bin/pageserver.rs | 11 +- pageserver/src/disk_usage_eviction_task.rs | 6 +- pageserver/src/tenant.rs | 207 +++++++++++---------- pageserver/src/tenant/mgr.rs | 9 +- pageserver/src/tenant/tasks.rs | 7 +- pageserver/src/tenant/timeline.rs | 19 +- test_runner/regress/test_tenants.py | 2 +- 9 files changed, 181 insertions(+), 116 deletions(-) create mode 100644 libs/utils/src/completion.rs diff --git a/libs/utils/src/completion.rs b/libs/utils/src/completion.rs new file mode 100644 index 0000000000..2cdaee548e --- /dev/null +++ b/libs/utils/src/completion.rs @@ -0,0 +1,33 @@ +use std::sync::Arc; + +use tokio::sync::{mpsc, Mutex}; + +/// While a reference is kept around, the associated [`Barrier::wait`] will wait. +/// +/// Can be cloned, moved and kept around in futures as "guard objects". +#[derive(Clone)] +pub struct Completion(mpsc::Sender<()>); + +/// Barrier will wait until all clones of [`Completion`] have been dropped. +#[derive(Clone)] +pub struct Barrier(Arc>>); + +impl Barrier { + pub async fn wait(self) { + self.0.lock().await.recv().await; + } + + pub async fn maybe_wait(barrier: Option) { + if let Some(b) = barrier { + b.wait().await + } + } +} + +/// Create new Guard and Barrier pair. +pub fn channel() -> (Completion, Barrier) { + let (tx, rx) = mpsc::channel::<()>(1); + let rx = Mutex::new(rx); + let rx = Arc::new(rx); + (Completion(tx), Barrier(rx)) +} diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 4e4f79ab6b..69d3a1b9f2 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -60,6 +60,9 @@ pub mod tracing_span_assert; pub mod rate_limit; +/// Simple once-barrier and a guard which keeps barrier awaiting. +pub mod completion; + mod failpoint_macro_helpers { /// use with fail::cfg("$name", "return(2000)") diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index cbc97e7228..a2cebffc83 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -338,8 +338,7 @@ fn start_pageserver( // All tenant load operations carry this while they are ongoing; it will be dropped once those // operations finish either successfully or in some other manner. However, the initial load // will be then done, and we can start the global background tasks. - let (init_done_tx, init_done_rx) = tokio::sync::mpsc::channel::<()>(1); - let init_done_rx = Arc::new(tokio::sync::Mutex::new(init_done_rx)); + let (init_done_tx, init_done_rx) = utils::completion::channel(); // Scan the local 'tenants/' directory and start loading the tenants let init_started_at = std::time::Instant::now(); @@ -347,14 +346,13 @@ fn start_pageserver( conf, broker_client.clone(), remote_storage.clone(), - init_done_tx, + (init_done_tx, init_done_rx.clone()), ))?; BACKGROUND_RUNTIME.spawn({ let init_done_rx = init_done_rx.clone(); async move { - let init_done = async move { init_done_rx.lock().await.recv().await }; - init_done.await; + init_done_rx.wait().await; let elapsed = init_started_at.elapsed(); @@ -435,8 +433,7 @@ fn start_pageserver( // this is because we only process active tenants and timelines, and the // Timeline::get_current_logical_size will spawn the logical size calculation, // which will not be rate-limited. - let init_done = async move { init_done_rx.lock().await.recv().await }; - init_done.await; + init_done_rx.wait().await; pageserver::consumption_metrics::collect_metrics( metric_collection_endpoint, diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index 0358969199..1a8886935c 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -54,6 +54,7 @@ use serde::{Deserialize, Serialize}; use tokio::time::Instant; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, instrument, warn, Instrument}; +use utils::completion; use utils::serde_percent::Percent; use crate::{ @@ -82,7 +83,7 @@ pub fn launch_disk_usage_global_eviction_task( conf: &'static PageServerConf, storage: GenericRemoteStorage, state: Arc, - init_done_rx: Arc>>, + init_done: completion::Barrier, ) -> anyhow::Result<()> { let Some(task_config) = &conf.disk_usage_based_eviction else { info!("disk usage based eviction task not configured"); @@ -100,8 +101,7 @@ pub fn launch_disk_usage_global_eviction_task( false, async move { // wait until initial load is complete, because we cannot evict from loading tenants. - let init_done = async move { init_done_rx.lock().await.recv().await }; - init_done.await; + init_done.wait().await; disk_usage_eviction_task( &state, diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index d6eb824107..ff975db601 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -20,6 +20,7 @@ use storage_broker::BrokerClientChannel; use tokio::sync::watch; use tokio::task::JoinSet; use tracing::*; +use utils::completion; use utils::crashsafe::path_with_suffix_extension; use std::cmp::min; @@ -653,7 +654,7 @@ impl Tenant { match tenant_clone.attach(&ctx).await { Ok(()) => { info!("attach finished, activating"); - tenant_clone.activate(broker_client, &ctx); + tenant_clone.activate(broker_client, None, &ctx); } Err(e) => { error!("attach failed, setting tenant state to Broken: {:?}", e); @@ -889,15 +890,17 @@ impl Tenant { /// If the loading fails for some reason, the Tenant will go into Broken /// state. /// - #[instrument(skip(conf, remote_storage, ctx), fields(tenant_id=%tenant_id))] + #[instrument(skip_all, fields(tenant_id=%tenant_id))] pub fn spawn_load( conf: &'static PageServerConf, tenant_id: TenantId, broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, - init_done_tx: Option>, + init_done: Option<(completion::Completion, completion::Barrier)>, ctx: &RequestContext, ) -> Arc { + debug_assert_current_span_has_tenant_id(); + let tenant_conf = match Self::load_tenant_config(conf, tenant_id) { Ok(conf) => conf, Err(e) => { @@ -931,11 +934,15 @@ impl Tenant { async move { // keep the sender alive as long as we have the initial load ongoing; it will be // None for loads spawned after init_tenant_mgr. - let _init_done_tx = init_done_tx; + let (_tx, rx) = if let Some((tx, rx)) = init_done { + (Some(tx), Some(rx)) + } else { + (None, None) + }; match tenant_clone.load(&ctx).await { Ok(()) => { - info!("load finished, activating"); - tenant_clone.activate(broker_client, &ctx); + debug!("load finished, activating"); + tenant_clone.activate(broker_client, rx.as_ref(), &ctx); } Err(err) => { error!("load failed, setting tenant state to Broken: {err:?}"); @@ -954,8 +961,6 @@ impl Tenant { }), ); - info!("spawned load into background"); - tenant } @@ -967,7 +972,7 @@ impl Tenant { async fn load(self: &Arc, ctx: &RequestContext) -> anyhow::Result<()> { debug_assert_current_span_has_tenant_id(); - info!("loading tenant task"); + debug!("loading tenant task"); utils::failpoint_sleep_millis_async!("before-loading-tenant"); @@ -977,102 +982,109 @@ impl Tenant { // // Scan the directory, peek into the metadata file of each timeline, and // collect a list of timelines and their ancestors. - let mut timelines_to_load: HashMap = HashMap::new(); - let timelines_dir = self.conf.timelines_path(&self.tenant_id); - for entry in std::fs::read_dir(&timelines_dir).with_context(|| { - format!( - "Failed to list timelines directory for tenant {}", - self.tenant_id - ) - })? { - let entry = entry.with_context(|| { - format!("cannot read timeline dir entry for {}", self.tenant_id) - })?; - let timeline_dir = entry.path(); + let tenant_id = self.tenant_id; + let conf = self.conf; + let span = info_span!("blocking"); - if crate::is_temporary(&timeline_dir) { - info!( - "Found temporary timeline directory, removing: {}", - timeline_dir.display() - ); - if let Err(e) = std::fs::remove_dir_all(&timeline_dir) { - error!( - "Failed to remove temporary directory '{}': {:?}", - timeline_dir.display(), - e + let sorted_timelines: Vec<(_, _)> = tokio::task::spawn_blocking(move || { + let _g = span.entered(); + let mut timelines_to_load: HashMap = HashMap::new(); + let timelines_dir = conf.timelines_path(&tenant_id); + + for entry in + std::fs::read_dir(&timelines_dir).context("list timelines directory for tenant")? + { + let entry = entry.context("read timeline dir entry")?; + let timeline_dir = entry.path(); + + if crate::is_temporary(&timeline_dir) { + info!( + "Found temporary timeline directory, removing: {}", + timeline_dir.display() ); - } - } else if is_uninit_mark(&timeline_dir) { - let timeline_uninit_mark_file = &timeline_dir; - info!( - "Found an uninit mark file {}, removing the timeline and its uninit mark", - timeline_uninit_mark_file.display() - ); - let timeline_id = timeline_uninit_mark_file - .file_stem() - .and_then(OsStr::to_str) - .unwrap_or_default() - .parse::() - .with_context(|| { - format!( + if let Err(e) = std::fs::remove_dir_all(&timeline_dir) { + error!( + "Failed to remove temporary directory '{}': {:?}", + timeline_dir.display(), + e + ); + } + } else if is_uninit_mark(&timeline_dir) { + let timeline_uninit_mark_file = &timeline_dir; + info!( + "Found an uninit mark file {}, removing the timeline and its uninit mark", + timeline_uninit_mark_file.display() + ); + let timeline_id = timeline_uninit_mark_file + .file_stem() + .and_then(OsStr::to_str) + .unwrap_or_default() + .parse::() + .with_context(|| { + format!( "Could not parse timeline id out of the timeline uninit mark name {}", timeline_uninit_mark_file.display() ) - })?; - let timeline_dir = self.conf.timeline_path(&timeline_id, &self.tenant_id); - if let Err(e) = - remove_timeline_and_uninit_mark(&timeline_dir, timeline_uninit_mark_file) - { - error!("Failed to clean up uninit marked timeline: {e:?}"); - } - } else { - let timeline_id = timeline_dir - .file_name() - .and_then(OsStr::to_str) - .unwrap_or_default() - .parse::() - .with_context(|| { - format!( - "Could not parse timeline id out of the timeline dir name {}", - timeline_dir.display() - ) - })?; - let timeline_uninit_mark_file = self - .conf - .timeline_uninit_mark_file_path(self.tenant_id, timeline_id); - if timeline_uninit_mark_file.exists() { - info!( - "Found an uninit mark file for timeline {}/{}, removing the timeline and its uninit mark", - self.tenant_id, timeline_id - ); + })?; + let timeline_dir = conf.timeline_path(&timeline_id, &tenant_id); if let Err(e) = - remove_timeline_and_uninit_mark(&timeline_dir, &timeline_uninit_mark_file) + remove_timeline_and_uninit_mark(&timeline_dir, timeline_uninit_mark_file) { error!("Failed to clean up uninit marked timeline: {e:?}"); } - continue; - } - - let file_name = entry.file_name(); - if let Ok(timeline_id) = - file_name.to_str().unwrap_or_default().parse::() - { - let metadata = load_metadata(self.conf, timeline_id, self.tenant_id) - .context("failed to load metadata")?; - timelines_to_load.insert(timeline_id, metadata); } else { - // A file or directory that doesn't look like a timeline ID - warn!( - "unexpected file or directory in timelines directory: {}", - file_name.to_string_lossy() - ); + let timeline_id = timeline_dir + .file_name() + .and_then(OsStr::to_str) + .unwrap_or_default() + .parse::() + .with_context(|| { + format!( + "Could not parse timeline id out of the timeline dir name {}", + timeline_dir.display() + ) + })?; + let timeline_uninit_mark_file = + conf.timeline_uninit_mark_file_path(tenant_id, timeline_id); + if timeline_uninit_mark_file.exists() { + info!( + %timeline_id, + "Found an uninit mark file, removing the timeline and its uninit mark", + ); + if let Err(e) = remove_timeline_and_uninit_mark( + &timeline_dir, + &timeline_uninit_mark_file, + ) { + error!("Failed to clean up uninit marked timeline: {e:?}"); + } + continue; + } + + let file_name = entry.file_name(); + if let Ok(timeline_id) = + file_name.to_str().unwrap_or_default().parse::() + { + let metadata = load_metadata(conf, timeline_id, tenant_id) + .context("failed to load metadata")?; + timelines_to_load.insert(timeline_id, metadata); + } else { + // A file or directory that doesn't look like a timeline ID + warn!( + "unexpected file or directory in timelines directory: {}", + file_name.to_string_lossy() + ); + } } } - } - // Sort the array of timeline IDs into tree-order, so that parent comes before - // all its children. - let sorted_timelines = tree_sort_timelines(timelines_to_load)?; + // Sort the array of timeline IDs into tree-order, so that parent comes before + // all its children. + tree_sort_timelines(timelines_to_load) + }) + .await + .context("load spawn_blocking") + .and_then(|res| res)?; + // FIXME original collect_timeline_files contained one more check: // 1. "Timeline has no ancestor and no layer files" @@ -1082,7 +1094,7 @@ impl Tenant { .with_context(|| format!("load local timeline {timeline_id}"))?; } - info!("Done"); + trace!("Done"); Ok(()) } @@ -1670,7 +1682,12 @@ impl Tenant { } /// Changes tenant status to active, unless shutdown was already requested. - fn activate(self: &Arc, broker_client: BrokerClientChannel, ctx: &RequestContext) { + fn activate( + self: &Arc, + broker_client: BrokerClientChannel, + init_done: Option<&completion::Barrier>, + ctx: &RequestContext, + ) { debug_assert_current_span_has_tenant_id(); let mut activating = false; @@ -1701,7 +1718,7 @@ impl Tenant { // Spawn gc and compaction loops. The loops will shut themselves // down when they notice that the tenant is inactive. - tasks::start_background_loops(self); + tasks::start_background_loops(self, init_done); let mut activated_timelines = 0; diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index d74a025bbb..d3cd914037 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -25,6 +25,7 @@ use crate::tenant::{ }; use crate::IGNORED_TENANT_FILE_NAME; +use utils::completion; use utils::fs_ext::PathExt; use utils::id::{TenantId, TimelineId}; @@ -66,7 +67,7 @@ pub async fn init_tenant_mgr( conf: &'static PageServerConf, broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, - init_done_tx: tokio::sync::mpsc::Sender<()>, + init_done: (completion::Completion, completion::Barrier), ) -> anyhow::Result<()> { // Scan local filesystem for attached tenants let tenants_dir = conf.tenants_path(); @@ -123,7 +124,7 @@ pub async fn init_tenant_mgr( &tenant_dir_path, broker_client.clone(), remote_storage.clone(), - Some(init_done_tx.clone()), + Some(init_done.clone()), &ctx, ) { Ok(tenant) => { @@ -159,7 +160,7 @@ pub fn schedule_local_tenant_processing( tenant_path: &Path, broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, - init_done_tx: Option>, + init_done: Option<(completion::Completion, completion::Barrier)>, ctx: &RequestContext, ) -> anyhow::Result> { anyhow::ensure!( @@ -218,7 +219,7 @@ pub fn schedule_local_tenant_processing( tenant_id, broker_client, remote_storage, - init_done_tx, + init_done, ctx, ) }; diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index b3c8a4a3bb..02aed11114 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -12,8 +12,9 @@ use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME}; use crate::tenant::{Tenant, TenantState}; use tokio_util::sync::CancellationToken; use tracing::*; +use utils::completion; -pub fn start_background_loops(tenant: &Arc) { +pub fn start_background_loops(tenant: &Arc, init_done: Option<&completion::Barrier>) { let tenant_id = tenant.tenant_id; task_mgr::spawn( BACKGROUND_RUNTIME.handle(), @@ -24,7 +25,9 @@ pub fn start_background_loops(tenant: &Arc) { false, { let tenant = Arc::clone(tenant); + let init_done = init_done.cloned(); async move { + completion::Barrier::maybe_wait(init_done).await; compaction_loop(tenant) .instrument(info_span!("compaction_loop", tenant_id = %tenant_id)) .await; @@ -41,7 +44,9 @@ pub fn start_background_loops(tenant: &Arc) { false, { let tenant = Arc::clone(tenant); + let init_done = init_done.cloned(); async move { + completion::Barrier::maybe_wait(init_done).await; gc_loop(tenant) .instrument(info_span!("gc_loop", tenant_id = %tenant_id)) .await; diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 5c889e804c..ee7b002450 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -2728,7 +2728,7 @@ impl Timeline { } /// Flush one frozen in-memory layer to disk, as a new delta layer. - #[instrument(skip(self, frozen_layer, ctx), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id, layer=%frozen_layer.short_id()))] + #[instrument(skip_all, fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id, layer=%frozen_layer.short_id()))] async fn flush_frozen_layer( self: &Arc, frozen_layer: Arc, @@ -2752,9 +2752,14 @@ impl Timeline { // normal case, write out a L0 delta layer file. let this = self.clone(); let frozen_layer = frozen_layer.clone(); - let (delta_path, metadata) = - tokio::task::spawn_blocking(move || this.create_delta_layer(&frozen_layer)) - .await??; + let span = tracing::info_span!("blocking"); + let (delta_path, metadata) = tokio::task::spawn_blocking(move || { + let _g = span.entered(); + this.create_delta_layer(&frozen_layer) + }) + .await + .context("create_delta_layer spawn_blocking") + .and_then(|res| res)?; HashMap::from([(delta_path, metadata)]) }; @@ -3523,14 +3528,18 @@ impl Timeline { let this = self.clone(); let ctx_inner = ctx.clone(); let layer_removal_cs_inner = layer_removal_cs.clone(); + let span = tracing::info_span!("blocking"); let CompactLevel0Phase1Result { new_layers, deltas_to_compact, } = tokio::task::spawn_blocking(move || { + let _g = span.entered(); this.compact_level0_phase1(layer_removal_cs_inner, target_file_size, &ctx_inner) }) .await - .unwrap()?; + .context("compact_level0_phase1 spawn_blocking") + .map_err(CompactionError::Other) + .and_then(|res| res)?; if new_layers.is_empty() && deltas_to_compact.is_empty() { // nothing to do diff --git a/test_runner/regress/test_tenants.py b/test_runner/regress/test_tenants.py index 59b7b574cd..15712b9e55 100644 --- a/test_runner/regress/test_tenants.py +++ b/test_runner/regress/test_tenants.py @@ -309,7 +309,7 @@ def test_pageserver_with_empty_tenants( env.pageserver.allowed_errors.append( ".*marking .* as locally complete, while it doesnt exist in remote index.*" ) - env.pageserver.allowed_errors.append(".*load failed.*Failed to list timelines directory.*") + env.pageserver.allowed_errors.append(".*load failed.*list timelines directory.*") client = env.pageserver.http_client() From b190c3e6c3771005ac761e71a9635092c3addc93 Mon Sep 17 00:00:00 2001 From: Dmitry Rodionov Date: Tue, 30 May 2023 20:11:44 +0300 Subject: [PATCH 14/18] reduce flakiness by allowing Compaction failed, retrying in X queue is in state Stopped. (#4379) resolves https://github.com/neondatabase/neon/issues/4374 by adding the error to allowed_errors --- test_runner/fixtures/neon_fixtures.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 6b97c33ae4..1007cb11b5 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1621,6 +1621,8 @@ class NeonPageserver(PgProtocol): ".*Compaction failed, retrying in [^:]+: Cannot run compaction iteration on inactive tenant", # these can happen anytime we do compactions from background task and shutdown pageserver r".*ERROR.*ancestor timeline \S+ is being stopped", + # this is expected given our collaborative shutdown approach for the UploadQueue + ".*Compaction failed, retrying in .*: queue is in state Stopped.*", ] def start( From b6447462dc72b8634cf122c76d3e155c2f6b5d60 Mon Sep 17 00:00:00 2001 From: bojanserafimov Date: Wed, 31 May 2023 12:23:00 -0400 Subject: [PATCH 15/18] Fix layer map correctness bug (#4342) --- .../layer_map/historic_layer_coverage.rs | 29 ++++++++++++++++ .../src/tenant/layer_map/layer_coverage.rs | 33 ++++++++++++------- 2 files changed, 50 insertions(+), 12 deletions(-) diff --git a/pageserver/src/tenant/layer_map/historic_layer_coverage.rs b/pageserver/src/tenant/layer_map/historic_layer_coverage.rs index b63c361314..49dcbc63c2 100644 --- a/pageserver/src/tenant/layer_map/historic_layer_coverage.rs +++ b/pageserver/src/tenant/layer_map/historic_layer_coverage.rs @@ -204,6 +204,35 @@ fn test_off_by_one() { assert_eq!(version.image_coverage.query(5), None); } +/// White-box regression test, checking for incorrect removal of node at key.end +#[test] +fn test_regression() { + let mut map = HistoricLayerCoverage::::new(); + map.insert( + LayerKey { + key: 0..5, + lsn: 0..5, + is_image: false, + }, + "Layer 1".to_string(), + ); + map.insert( + LayerKey { + key: 0..5, + lsn: 1..2, + is_image: false, + }, + "Layer 2".to_string(), + ); + + // If an insertion operation improperly deletes the endpoint of a previous layer + // (which is more likely to happen with layers that collide on key.end), we will + // end up with an infinite layer, covering the entire keyspace. Here we assert + // that there's no layer at key 100 because we didn't insert any layer there. + let version = map.get_version(100).unwrap(); + assert_eq!(version.delta_coverage.query(100), None); +} + /// Cover edge cases where layers begin or end on the same key #[test] fn test_key_collision() { diff --git a/pageserver/src/tenant/layer_map/layer_coverage.rs b/pageserver/src/tenant/layer_map/layer_coverage.rs index 4e3b4516dc..9d9d1d6ccf 100644 --- a/pageserver/src/tenant/layer_map/layer_coverage.rs +++ b/pageserver/src/tenant/layer_map/layer_coverage.rs @@ -10,19 +10,22 @@ use rpds::RedBlackTreeMapSync; /// - iterate the latest layers in a key range /// - insert layers in non-decreasing lsn.start order /// -/// The struct is parameterized over Value for easier -/// testing, but in practice it's some sort of layer. +/// For a detailed explanation and justification of this approach, see: +/// https://neon.tech/blog/persistent-structures-in-neons-wal-indexing +/// +/// NOTE The struct is parameterized over Value for easier +/// testing, but in practice it's some sort of layer. pub struct LayerCoverage { /// For every change in coverage (as we sweep the key space) /// we store (lsn.end, value). /// - /// We use an immutable/persistent tree so that we can keep historic - /// versions of this coverage without cloning the whole thing and - /// incurring quadratic memory cost. See HistoricLayerCoverage. + /// NOTE We use an immutable/persistent tree so that we can keep historic + /// versions of this coverage without cloning the whole thing and + /// incurring quadratic memory cost. See HistoricLayerCoverage. /// - /// We use the Sync version of the map because we want Self to - /// be Sync. Using nonsync might be faster, if we can work with - /// that. + /// NOTE We use the Sync version of the map because we want Self to + /// be Sync. Using nonsync might be faster, if we can work with + /// that. nodes: RedBlackTreeMapSync>, } @@ -41,6 +44,13 @@ impl LayerCoverage { /// Helper function to subdivide the key range without changing any values /// + /// This operation has no semantic effect by itself. It only helps us pin in + /// place the part of the coverage we don't want to change when inserting. + /// + /// As an analogy, think of a polygon. If you add a vertex along one of the + /// segments, the polygon is still the same, but it behaves differently when + /// we move or delete one of the other points. + /// /// Complexity: O(log N) fn add_node(&mut self, key: i128) { let value = match self.nodes.range(..=key).last() { @@ -74,7 +84,7 @@ impl LayerCoverage { let mut to_update = Vec::new(); let mut to_remove = Vec::new(); let mut prev_covered = false; - for (k, node) in self.nodes.range(key.clone()) { + for (k, node) in self.nodes.range(key) { let needs_cover = match node { None => true, Some((h, _)) => h < &lsn.end, @@ -87,9 +97,8 @@ impl LayerCoverage { } prev_covered = needs_cover; } - if !prev_covered { - to_remove.push(key.end); - } + // TODO check if the nodes inserted at key.start and key.end are safe + // to remove. It's fine to keep them but they could be redundant. for k in to_update { self.nodes.insert_mut(k, Some((lsn.end, value.clone()))); } From 952d6e43a21d3b1ff7e5c602007fe06725c2e1bc Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Wed, 31 May 2023 21:37:20 +0300 Subject: [PATCH 16/18] =?UTF-8?q?Add=20pageserver=20parameter=20forced=5Fi?= =?UTF-8?q?mage=5Fcreation=5Flimit=20which=20can=20be=20used=E2=80=A6=20(#?= =?UTF-8?q?4353)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This parameter can be use to restrict number of image layers generated because of GC request (wanted image layers). Been set to zero it completely eliminates creation of such image layers. So it allows to avoid extra storage consumption after merging #3673 ## Problem PR #3673 forces generation of missed image layers. So i short term is cause cause increase (in worst case up to two times) size of storage. It was intended (by me) that GC period is comparable with PiTR interval. But looks like it is not the case now - GC is performed much more frequently. It may cause the problem with space exhaustion: GC forces new image creation while large PiTR still prevent GC from collecting old layers. ## Summary of changes Add new pageserver parameter` forced_image_creation_limit` which restrict number of created image layers which are requested by GC. ## Checklist before requesting a review - [ ] I have performed a self-review of my code. - [ ] If it is a core feature, I have added thorough tests. - [ ] Do we need to implement analytics? if so did you add the relevant metrics to the dashboard? - [ ] If this PR requires public announcement, mark it with /release-notes label and add several sentences in this section. ## Checklist before merging - [ ] Do not forget to reformat commit message to not include the above checklist --- control_plane/src/pageserver.rs | 10 ++++++++++ libs/pageserver_api/src/models.rs | 2 ++ pageserver/src/config.rs | 10 +++++++++- pageserver/src/tenant.rs | 1 + pageserver/src/tenant/config.rs | 8 ++++++++ pageserver/src/tenant/timeline.rs | 10 +++++++++- test_runner/regress/test_attach_tenant_config.py | 1 + 7 files changed, 40 insertions(+), 2 deletions(-) diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 149cfd00cb..400df60f0e 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -369,6 +369,11 @@ impl PageServerNode { evictions_low_residence_duration_metric_threshold: settings .remove("evictions_low_residence_duration_metric_threshold") .map(|x| x.to_string()), + gc_feedback: settings + .remove("gc_feedback") + .map(|x| x.parse::()) + .transpose() + .context("Failed to parse 'gc_feedback' as bool")?, }; // If tenant ID was not specified, generate one @@ -463,6 +468,11 @@ impl PageServerNode { evictions_low_residence_duration_metric_threshold: settings .remove("evictions_low_residence_duration_metric_threshold") .map(|x| x.to_string()), + gc_feedback: settings + .remove("gc_feedback") + .map(|x| x.parse::()) + .transpose() + .context("Failed to parse 'gc_feedback' as bool")?, } }; diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 0b4457a9a5..162bf6b294 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -223,6 +223,7 @@ pub struct TenantConfig { pub eviction_policy: Option, pub min_resident_size_override: Option, pub evictions_low_residence_duration_metric_threshold: Option, + pub gc_feedback: Option, } #[serde_as] @@ -281,6 +282,7 @@ impl TenantConfigRequest { eviction_policy: None, min_resident_size_override: None, evictions_low_residence_duration_metric_threshold: None, + gc_feedback: None, }; TenantConfigRequest { tenant_id, config } } diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 88a7f15b21..02763c9b7d 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -108,7 +108,7 @@ pub mod defaults { #min_resident_size_override = .. # in bytes #evictions_low_residence_duration_metric_threshold = '{DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD}' - +#gc_feedback = false # [remote_storage] "### @@ -828,6 +828,14 @@ impl PageServerConf { )?); } + if let Some(gc_feedback) = item.get("gc_feedback") { + t_conf.gc_feedback = Some( + gc_feedback + .as_bool() + .with_context(|| "configure option gc_feedback is not a bool".to_string())?, + ); + } + Ok(t_conf) } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index ff975db601..af6a70c4f2 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3209,6 +3209,7 @@ pub mod harness { evictions_low_residence_duration_metric_threshold: Some( tenant_conf.evictions_low_residence_duration_metric_threshold, ), + gc_feedback: Some(tenant_conf.gc_feedback), } } } diff --git a/pageserver/src/tenant/config.rs b/pageserver/src/tenant/config.rs index 50de316bc4..80d153661a 100644 --- a/pageserver/src/tenant/config.rs +++ b/pageserver/src/tenant/config.rs @@ -99,6 +99,7 @@ pub struct TenantConf { // See the corresponding metric's help string. #[serde(with = "humantime_serde")] pub evictions_low_residence_duration_metric_threshold: Duration, + pub gc_feedback: bool, } /// Same as TenantConf, but this struct preserves the information about @@ -175,6 +176,10 @@ pub struct TenantConfOpt { #[serde(with = "humantime_serde")] #[serde(default)] pub evictions_low_residence_duration_metric_threshold: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + pub gc_feedback: Option, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] @@ -242,6 +247,7 @@ impl TenantConfOpt { evictions_low_residence_duration_metric_threshold: self .evictions_low_residence_duration_metric_threshold .unwrap_or(global_conf.evictions_low_residence_duration_metric_threshold), + gc_feedback: self.gc_feedback.unwrap_or(global_conf.gc_feedback), } } } @@ -278,6 +284,7 @@ impl Default for TenantConf { DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD, ) .expect("cannot parse default evictions_low_residence_duration_metric_threshold"), + gc_feedback: false, } } } @@ -372,6 +379,7 @@ impl TryFrom<&'_ models::TenantConfig> for TenantConfOpt { ))?, ); } + tenant_conf.gc_feedback = request_data.gc_feedback; Ok(tenant_conf) } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index ee7b002450..36c4b0bcd4 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1291,6 +1291,13 @@ impl Timeline { .unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold) } + fn get_gc_feedback(&self) -> bool { + let tenant_conf = self.tenant_conf.read().unwrap(); + tenant_conf + .gc_feedback + .unwrap_or(self.conf.default_tenant_conf.gc_feedback) + } + pub(super) fn tenant_conf_updated(&self) { // NB: Most tenant conf options are read by background loops, so, // changes will automatically be picked up. @@ -3124,6 +3131,7 @@ impl Timeline { let mut layers = self.layers.write().unwrap(); let mut updates = layers.batch_update(); let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id); + for l in image_layers { let path = l.filename(); let metadata = timeline_path @@ -3896,7 +3904,7 @@ impl Timeline { // delta layers. Image layers can form "stairs" preventing old image from been deleted. // But image layers are in any case less sparse than delta layers. Also we need some // protection from replacing recent image layers with new one after each GC iteration. - if l.is_incremental() && !LayerMap::is_l0(&*l) { + if self.get_gc_feedback() && l.is_incremental() && !LayerMap::is_l0(&*l) { wanted_image_layers.add_range(l.get_key_range()); } result.layers_not_updated += 1; diff --git a/test_runner/regress/test_attach_tenant_config.py b/test_runner/regress/test_attach_tenant_config.py index eb2ba3e9ed..6261ec28db 100644 --- a/test_runner/regress/test_attach_tenant_config.py +++ b/test_runner/regress/test_attach_tenant_config.py @@ -158,6 +158,7 @@ def test_fully_custom_config(positive_env: NeonEnv): "threshold": "23h", }, "evictions_low_residence_duration_metric_threshold": "2days", + "gc_feedback": True, "gc_horizon": 23 * (1024 * 1024), "gc_period": "2h 13m", "image_creation_threshold": 7, From 330083638fef3db1c2162d62b6bdbd47f890a18b Mon Sep 17 00:00:00 2001 From: bojanserafimov Date: Wed, 31 May 2023 22:04:46 -0400 Subject: [PATCH 17/18] Fix stale and misleading comment in LayerMap (#4297) --- pageserver/src/tenant/layer_map/layer_coverage.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pageserver/src/tenant/layer_map/layer_coverage.rs b/pageserver/src/tenant/layer_map/layer_coverage.rs index 9d9d1d6ccf..47aace97a5 100644 --- a/pageserver/src/tenant/layer_map/layer_coverage.rs +++ b/pageserver/src/tenant/layer_map/layer_coverage.rs @@ -1,8 +1,8 @@ use std::ops::Range; -// TODO the `im` crate has 20x more downloads and also has -// persistent/immutable BTree. It also runs a bit faster but -// results are not the same on some tests. +// NOTE the `im` crate has 20x more downloads and also has +// persistent/immutable BTree. But it's bugged so rpds is a +// better choice https://github.com/neondatabase/neon/issues/3395 use rpds::RedBlackTreeMapSync; /// Data structure that can efficiently: From 36fee50f4d376f0f665a2f005b0ab4b122bba323 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Thu, 1 Jun 2023 20:12:07 +0300 Subject: [PATCH 18/18] compute_ctl: enable tracing panic hook (#4375) compute_ctl can panic, but `tracing` is used for logging. panic stderr output can interleave with messages from normal logging. The fix is to use the established way (pageserver, safekeeper, storage_broker) of using `tracing` to report panics. --- compute_tools/src/logger.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/compute_tools/src/logger.rs b/compute_tools/src/logger.rs index 1b5cf647b0..f6fc882968 100644 --- a/compute_tools/src/logger.rs +++ b/compute_tools/src/logger.rs @@ -33,5 +33,7 @@ pub fn init_tracing_and_logging(default_log_level: &str) -> anyhow::Result<()> { .init(); tracing::info!("logging and tracing started"); + utils::logging::replace_panic_hook_with_tracing_panic_hook().forget(); + Ok(()) }