From a0e61145c8f7b50fefc89b68c54f5012a4313170 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 23 Nov 2023 14:33:41 +0100 Subject: [PATCH] fix: cleanup of layers from the future can race with their re-creation (#5890) fixes https://github.com/neondatabase/neon/issues/5878 obsoletes https://github.com/neondatabase/neon/issues/5879 Before this PR, it could happen that `load_layer_map` schedules removal of the future image layer. Then a later compaction run could re-create the same image layer, scheduling a PUT. Due to lack of an upload queue barrier, the PUT and DELETE could be re-ordered. The result was IndexPart referencing a non-existent object. ## Summary of changes * Add support to `pagectl` / Python tests to decode `IndexPart` * Rust * new `pagectl` Subcommand * `IndexPart::{from,to}_s3_bytes()` methods to internalize knowledge about encoding of `IndexPart` * Python * new `NeonCli` subclass * Add regression test * Rust * Ability to force repartitioning; required to ensure image layer creation at last_record_lsn * Python * The regression test. * Fix the issue * Insert an `UploadOp::Barrier` after scheduling the deletions. --- Cargo.lock | 2 + pageserver/ctl/Cargo.toml | 2 + pageserver/ctl/src/index_part.rs | 38 +++ pageserver/ctl/src/main.rs | 7 + pageserver/src/deletion_queue.rs | 1 + pageserver/src/http/routes.rs | 15 +- pageserver/src/tenant.rs | 31 ++- .../src/tenant/remote_timeline_client.rs | 33 ++- .../tenant/remote_timeline_client/index.rs | 18 +- .../tenant/remote_timeline_client/upload.rs | 5 +- pageserver/src/tenant/timeline.rs | 45 +++- test_runner/fixtures/neon_fixtures.py | 16 ++ test_runner/fixtures/pageserver/http.py | 20 +- test_runner/fixtures/pageserver/types.py | 146 ++++++++++++ test_runner/fixtures/remote_storage.py | 6 + test_runner/fixtures/types.py | 29 +++ test_runner/fixtures/utils.py | 35 ++- .../regress/test_layers_from_future.py | 222 ++++++++++++++++++ 18 files changed, 615 insertions(+), 56 deletions(-) create mode 100644 pageserver/ctl/src/index_part.rs create mode 100644 test_runner/fixtures/pageserver/types.py create mode 100644 test_runner/regress/test_layers_from_future.py diff --git a/Cargo.lock b/Cargo.lock index 57c57182e1..62da3cf070 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2905,6 +2905,8 @@ dependencies = [ "git-version", "pageserver", "postgres_ffi", + "serde", + "serde_json", "svg_fmt", "tokio", "utils", diff --git a/pageserver/ctl/Cargo.toml b/pageserver/ctl/Cargo.toml index ff0c530125..c5cd451e8d 100644 --- a/pageserver/ctl/Cargo.toml +++ b/pageserver/ctl/Cargo.toml @@ -18,3 +18,5 @@ tokio.workspace = true utils.workspace = true svg_fmt.workspace = true workspace_hack.workspace = true +serde.workspace = true +serde_json.workspace = true diff --git a/pageserver/ctl/src/index_part.rs b/pageserver/ctl/src/index_part.rs new file mode 100644 index 0000000000..20e5572914 --- /dev/null +++ b/pageserver/ctl/src/index_part.rs @@ -0,0 +1,38 @@ +use std::collections::HashMap; + +use anyhow::Context; +use camino::Utf8PathBuf; +use pageserver::tenant::remote_timeline_client::index::IndexLayerMetadata; +use pageserver::tenant::storage_layer::LayerFileName; +use pageserver::tenant::{metadata::TimelineMetadata, IndexPart}; +use utils::lsn::Lsn; + +#[derive(clap::Subcommand)] +pub(crate) enum IndexPartCmd { + Dump { path: Utf8PathBuf }, +} + +pub(crate) async fn main(cmd: &IndexPartCmd) -> anyhow::Result<()> { + match cmd { + IndexPartCmd::Dump { path } => { + let bytes = tokio::fs::read(path).await.context("read file")?; + let des: IndexPart = IndexPart::from_s3_bytes(&bytes).context("deserialize")?; + #[derive(serde::Serialize)] + struct Output<'a> { + layer_metadata: &'a HashMap, + disk_consistent_lsn: Lsn, + timeline_metadata: &'a TimelineMetadata, + } + + let output = Output { + layer_metadata: &des.layer_metadata, + disk_consistent_lsn: des.get_disk_consistent_lsn(), + timeline_metadata: &des.metadata, + }; + + let output = serde_json::to_string_pretty(&output).context("serialize output")?; + println!("{output}"); + Ok(()) + } + } +} diff --git a/pageserver/ctl/src/main.rs b/pageserver/ctl/src/main.rs index c4d6e8d883..fb42d6d2f1 100644 --- a/pageserver/ctl/src/main.rs +++ b/pageserver/ctl/src/main.rs @@ -5,11 +5,13 @@ //! Separate, `metadata` subcommand allows to print and update pageserver's metadata file. mod draw_timeline_dir; +mod index_part; mod layer_map_analyzer; mod layers; use camino::{Utf8Path, Utf8PathBuf}; use clap::{Parser, Subcommand}; +use index_part::IndexPartCmd; use layers::LayerCmd; use pageserver::{ context::{DownloadBehavior, RequestContext}, @@ -38,6 +40,8 @@ struct CliOpts { #[derive(Subcommand)] enum Commands { Metadata(MetadataCmd), + #[command(subcommand)] + IndexPart(IndexPartCmd), PrintLayerFile(PrintLayerFileCmd), DrawTimeline {}, AnalyzeLayerMap(AnalyzeLayerMapCmd), @@ -83,6 +87,9 @@ async fn main() -> anyhow::Result<()> { Commands::Metadata(cmd) => { handle_metadata(&cmd)?; } + Commands::IndexPart(cmd) => { + index_part::main(&cmd).await?; + } Commands::DrawTimeline {} => { draw_timeline_dir::main()?; } diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 6a732d1029..86be1b7094 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -513,6 +513,7 @@ impl DeletionQueueClient { ) -> Result<(), DeletionQueueError> { if current_generation.is_none() { debug!("Enqueuing deletions in legacy mode, skipping queue"); + let mut layer_paths = Vec::new(); for (layer, generation) in layers { layer_paths.push(remote_layer_path( diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index aa2b017471..21fd4d786a 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -6,6 +6,7 @@ use std::str::FromStr; use std::sync::Arc; use anyhow::{anyhow, Context, Result}; +use enumset::EnumSet; use futures::TryFutureExt; use humantime::format_rfc3339; use hyper::header; @@ -42,6 +43,7 @@ use crate::tenant::mgr::{ }; use crate::tenant::size::ModelInputs; use crate::tenant::storage_layer::LayerAccessStatsReset; +use crate::tenant::timeline::CompactFlags; use crate::tenant::timeline::Timeline; use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError, TenantSharedResources}; use crate::{config::PageServerConf, tenant::mgr}; @@ -1268,11 +1270,15 @@ async fn timeline_compact_handler( let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?; check_permission(&request, Some(tenant_id))?; + let mut flags = EnumSet::empty(); + if Some(true) == parse_query_param::<_, bool>(&request, "force_repartition")? { + flags |= CompactFlags::ForceRepartition; + } async { let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?; timeline - .compact(&cancel, &ctx) + .compact(&cancel, flags, &ctx) .await .map_err(|e| ApiError::InternalServerError(e.into()))?; json_response(StatusCode::OK, ()) @@ -1289,6 +1295,11 @@ async fn timeline_checkpoint_handler( let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?; check_permission(&request, Some(tenant_id))?; + + let mut flags = EnumSet::empty(); + if Some(true) == parse_query_param::<_, bool>(&request, "force_repartition")? { + flags |= CompactFlags::ForceRepartition; + } async { let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?; @@ -1297,7 +1308,7 @@ async fn timeline_checkpoint_handler( .await .map_err(ApiError::InternalServerError)?; timeline - .compact(&cancel, &ctx) + .compact(&cancel, flags, &ctx) .await .map_err(|e| ApiError::InternalServerError(e.into()))?; diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 507c017532..7ec1395e05 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -13,6 +13,7 @@ use anyhow::{bail, Context}; use camino::{Utf8Path, Utf8PathBuf}; +use enumset::EnumSet; use futures::FutureExt; use pageserver_api::models::TimelineState; use remote_storage::DownloadError; @@ -1699,7 +1700,7 @@ impl Tenant { for (timeline_id, timeline) in &timelines_to_compact { timeline - .compact(cancel, ctx) + .compact(cancel, EnumSet::empty(), ctx) .instrument(info_span!("compact_timeline", %timeline_id)) .await?; } @@ -4299,7 +4300,9 @@ mod tests { drop(writer); tline.freeze_and_flush().await?; - tline.compact(&CancellationToken::new(), &ctx).await?; + tline + .compact(&CancellationToken::new(), EnumSet::empty(), &ctx) + .await?; let writer = tline.writer().await; writer @@ -4314,7 +4317,9 @@ mod tests { drop(writer); tline.freeze_and_flush().await?; - tline.compact(&CancellationToken::new(), &ctx).await?; + tline + .compact(&CancellationToken::new(), EnumSet::empty(), &ctx) + .await?; let writer = tline.writer().await; writer @@ -4329,7 +4334,9 @@ mod tests { drop(writer); tline.freeze_and_flush().await?; - tline.compact(&CancellationToken::new(), &ctx).await?; + tline + .compact(&CancellationToken::new(), EnumSet::empty(), &ctx) + .await?; let writer = tline.writer().await; writer @@ -4344,7 +4351,9 @@ mod tests { drop(writer); tline.freeze_and_flush().await?; - tline.compact(&CancellationToken::new(), &ctx).await?; + tline + .compact(&CancellationToken::new(), EnumSet::empty(), &ctx) + .await?; assert_eq!( tline.get(*TEST_KEY, Lsn(0x10), &ctx).await?, @@ -4415,7 +4424,9 @@ mod tests { .update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx) .await?; tline.freeze_and_flush().await?; - tline.compact(&CancellationToken::new(), &ctx).await?; + tline + .compact(&CancellationToken::new(), EnumSet::empty(), &ctx) + .await?; tline.gc().await?; } @@ -4495,7 +4506,9 @@ mod tests { .update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx) .await?; tline.freeze_and_flush().await?; - tline.compact(&CancellationToken::new(), &ctx).await?; + tline + .compact(&CancellationToken::new(), EnumSet::empty(), &ctx) + .await?; tline.gc().await?; } @@ -4585,7 +4598,9 @@ mod tests { .update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx) .await?; tline.freeze_and_flush().await?; - tline.compact(&CancellationToken::new(), &ctx).await?; + tline + .compact(&CancellationToken::new(), EnumSet::empty(), &ctx) + .await?; tline.gc().await?; } diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index bbf6a0c5c5..8ec26f0b0b 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -816,7 +816,7 @@ impl RemoteTimelineClient { let mut receiver = { let mut guard = self.upload_queue.lock().unwrap(); let upload_queue = guard.initialized_mut()?; - self.schedule_barrier(upload_queue) + self.schedule_barrier0(upload_queue) }; if receiver.changed().await.is_err() { @@ -825,7 +825,14 @@ impl RemoteTimelineClient { Ok(()) } - fn schedule_barrier( + pub(crate) fn schedule_barrier(self: &Arc) -> anyhow::Result<()> { + let mut guard = self.upload_queue.lock().unwrap(); + let upload_queue = guard.initialized_mut()?; + self.schedule_barrier0(upload_queue); + Ok(()) + } + + fn schedule_barrier0( self: &Arc, upload_queue: &mut UploadQueueInitialized, ) -> tokio::sync::watch::Receiver<()> { @@ -1229,16 +1236,18 @@ impl RemoteTimelineClient { } res } - UploadOp::Delete(delete) => self - .deletion_queue_client - .push_layers( - self.tenant_id, - self.timeline_id, - self.generation, - delete.layers.clone(), - ) - .await - .map_err(|e| anyhow::anyhow!(e)), + UploadOp::Delete(delete) => { + pausable_failpoint!("before-delete-layer-pausable"); + self.deletion_queue_client + .push_layers( + self.tenant_id, + self.timeline_id, + self.generation, + delete.layers.clone(), + ) + .await + .map_err(|e| anyhow::anyhow!(e)) + } UploadOp::Barrier(_) => { // unreachable. Barrier operations are handled synchronously in // launch_queued_tasks diff --git a/pageserver/src/tenant/remote_timeline_client/index.rs b/pageserver/src/tenant/remote_timeline_client/index.rs index fa0679c7a2..0d0b34365c 100644 --- a/pageserver/src/tenant/remote_timeline_client/index.rs +++ b/pageserver/src/tenant/remote_timeline_client/index.rs @@ -128,6 +128,14 @@ impl IndexPart { pub fn get_disk_consistent_lsn(&self) -> Lsn { self.disk_consistent_lsn } + + pub fn from_s3_bytes(bytes: &[u8]) -> Result { + serde_json::from_slice::(bytes) + } + + pub fn to_s3_bytes(&self) -> serde_json::Result> { + serde_json::to_vec(self) + } } impl TryFrom<&UploadQueueInitialized> for IndexPart { @@ -201,7 +209,7 @@ mod tests { deleted_at: None, }; - let part = serde_json::from_str::(example).unwrap(); + let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap(); assert_eq!(part, expected); } @@ -239,7 +247,7 @@ mod tests { deleted_at: None, }; - let part = serde_json::from_str::(example).unwrap(); + let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap(); assert_eq!(part, expected); } @@ -279,7 +287,7 @@ mod tests { "2023-07-31T09:00:00.123000000", "%Y-%m-%dT%H:%M:%S.%f").unwrap()) }; - let part = serde_json::from_str::(example).unwrap(); + let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap(); assert_eq!(part, expected); } @@ -323,7 +331,7 @@ mod tests { deleted_at: None, }; - let empty_layers_parsed = serde_json::from_str::(empty_layers_json).unwrap(); + let empty_layers_parsed = IndexPart::from_s3_bytes(empty_layers_json.as_bytes()).unwrap(); assert_eq!(empty_layers_parsed, expected); } @@ -361,7 +369,7 @@ mod tests { "2023-07-31T09:00:00.123000000", "%Y-%m-%dT%H:%M:%S.%f").unwrap()) }; - let part = serde_json::from_str::(example).unwrap(); + let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap(); assert_eq!(part, expected); } } diff --git a/pageserver/src/tenant/remote_timeline_client/upload.rs b/pageserver/src/tenant/remote_timeline_client/upload.rs index 0a37a8f283..01237653ca 100644 --- a/pageserver/src/tenant/remote_timeline_client/upload.rs +++ b/pageserver/src/tenant/remote_timeline_client/upload.rs @@ -33,8 +33,9 @@ pub(super) async fn upload_index_part<'a>( }); pausable_failpoint!("before-upload-index-pausable"); - let index_part_bytes = - serde_json::to_vec(&index_part).context("serialize index part file into bytes")?; + let index_part_bytes = index_part + .to_s3_bytes() + .context("serialize index part file into bytes")?; let index_part_size = index_part_bytes.len(); let index_part_bytes = tokio::io::BufReader::new(std::io::Cursor::new(index_part_bytes)); diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 7ae7b7e7e4..763b18ccc3 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -10,6 +10,7 @@ mod walreceiver; use anyhow::{anyhow, bail, ensure, Context, Result}; use bytes::Bytes; use camino::{Utf8Path, Utf8PathBuf}; +use enumset::EnumSet; use fail::fail_point; use itertools::Itertools; use pageserver_api::models::{ @@ -437,6 +438,11 @@ pub enum LogicalSizeCalculationCause { TenantSizeHandler, } +#[derive(enumset::EnumSetType)] +pub(crate) enum CompactFlags { + ForceRepartition, +} + /// Public interface functions impl Timeline { /// Get the LSN where this branch was created @@ -694,6 +700,7 @@ impl Timeline { pub(crate) async fn compact( self: &Arc, cancel: &CancellationToken, + flags: EnumSet, ctx: &RequestContext, ) -> Result<(), CompactionError> { // this wait probably never needs any "long time spent" logging, because we already nag if @@ -766,6 +773,7 @@ impl Timeline { .repartition( self.get_last_record_lsn(), self.get_compaction_target_size(), + flags, ctx, ) .await @@ -1711,6 +1719,30 @@ impl Timeline { if let Some(rtc) = self.remote_client.as_ref() { rtc.schedule_layer_file_deletion(&needs_cleanup)?; rtc.schedule_index_upload_for_file_changes()?; + // This barrier orders above DELETEs before any later operations. + // This is critical because code executing after the barrier might + // create again objects with the same key that we just scheduled for deletion. + // For example, if we just scheduled deletion of an image layer "from the future", + // later compaction might run again and re-create the same image layer. + // "from the future" here means an image layer whose LSN is > IndexPart::disk_consistent_lsn. + // "same" here means same key range and LSN. + // + // Without a barrier between above DELETEs and the re-creation's PUTs, + // the upload queue may execute the PUT first, then the DELETE. + // In our example, we will end up with an IndexPart referencing a non-existent object. + // + // 1. a future image layer is created and uploaded + // 2. ps restart + // 3. the future layer from (1) is deleted during load layer map + // 4. image layer is re-created and uploaded + // 5. deletion queue would like to delete (1) but actually deletes (4) + // 6. delete by name works as expected, but it now deletes the wrong (later) version + // + // See https://github.com/neondatabase/neon/issues/5878 + // + // NB: generation numbers naturally protect against this because they disambiguate + // (1) and (4) + rtc.schedule_barrier()?; // Tenant::create_timeline will wait for these uploads to happen before returning, or // on retry. } @@ -2525,7 +2557,12 @@ impl Timeline { // Note: The 'ctx' in use here has DownloadBehavior::Error. We should not // require downloading anything during initial import. let (partitioning, _lsn) = self - .repartition(self.initdb_lsn, self.get_compaction_target_size(), ctx) + .repartition( + self.initdb_lsn, + self.get_compaction_target_size(), + EnumSet::empty(), + ctx, + ) .await?; if self.cancel.is_cancelled() { @@ -2744,12 +2781,16 @@ impl Timeline { &self, lsn: Lsn, partition_size: u64, + flags: EnumSet, ctx: &RequestContext, ) -> anyhow::Result<(KeyPartitioning, Lsn)> { { let partitioning_guard = self.partitioning.lock().unwrap(); let distance = lsn.0 - partitioning_guard.1 .0; - if partitioning_guard.1 != Lsn(0) && distance <= self.repartition_threshold { + if partitioning_guard.1 != Lsn(0) + && distance <= self.repartition_threshold + && !flags.contains(CompactFlags::ForceRepartition) + { debug!( distance, threshold = self.repartition_threshold, diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 4776fbc511..def42e94b1 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -42,6 +42,7 @@ from urllib3.util.retry import Retry from fixtures.broker import NeonBroker from fixtures.log_helper import log from fixtures.pageserver.http import PageserverHttpClient +from fixtures.pageserver.types import IndexPartDump from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload from fixtures.pg_version import PgVersion from fixtures.port_distributor import PortDistributor @@ -702,6 +703,7 @@ class NeonEnv: self.port_distributor = config.port_distributor self.s3_mock_server = config.mock_s3_server self.neon_cli = NeonCli(env=self) + self.pagectl = Pagectl(env=self) self.endpoints = EndpointFactory(self) self.safekeepers: List[Safekeeper] = [] self.pageservers: List[NeonPageserver] = [] @@ -1558,6 +1560,20 @@ class ComputeCtl(AbstractNeonCli): COMMAND = "compute_ctl" +class Pagectl(AbstractNeonCli): + """ + A typed wrapper around the `pagectl` utility CLI tool. + """ + + COMMAND = "pagectl" + + def dump_index_part(self, path: Path) -> IndexPartDump: + res = self.raw_cli(["index-part", "dump", str(path)]) + res.check_returncode() + parsed = json.loads(res.stdout) + return IndexPartDump.from_json(parsed) + + class NeonAttachmentService: def __init__(self, env: NeonEnv): self.env = env diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index aff7959aa7..2f1d68b92c 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -432,12 +432,18 @@ class PageserverHttpClient(requests.Session): assert isinstance(res_json, dict) return res_json - def timeline_compact(self, tenant_id: TenantId, timeline_id: TimelineId): + def timeline_compact( + self, tenant_id: TenantId, timeline_id: TimelineId, force_repartition=False + ): self.is_testing_enabled_or_skip() + query = {} + if force_repartition: + query["force_repartition"] = "true" log.info(f"Requesting compact: tenant {tenant_id}, timeline {timeline_id}") res = self.put( - f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/compact" + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/compact", + params=query, ) log.info(f"Got compact request response code: {res.status_code}") self.verbose_error(res) @@ -466,12 +472,18 @@ class PageserverHttpClient(requests.Session): res_json = res.json() return res_json - def timeline_checkpoint(self, tenant_id: TenantId, timeline_id: TimelineId): + def timeline_checkpoint( + self, tenant_id: TenantId, timeline_id: TimelineId, force_repartition=False + ): self.is_testing_enabled_or_skip() + query = {} + if force_repartition: + query["force_repartition"] = "true" log.info(f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}") res = self.put( - f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint" + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint", + params=query, ) log.info(f"Got checkpoint request response code: {res.status_code}") self.verbose_error(res) diff --git a/test_runner/fixtures/pageserver/types.py b/test_runner/fixtures/pageserver/types.py new file mode 100644 index 0000000000..30e3f527bf --- /dev/null +++ b/test_runner/fixtures/pageserver/types.py @@ -0,0 +1,146 @@ +from dataclasses import dataclass +from typing import Any, Dict, Tuple, Union + +from fixtures.types import KEY_MAX, KEY_MIN, Key, Lsn + + +@dataclass +class IndexLayerMetadata: + @classmethod + def from_json(cls, d: Dict[str, Any]): + return {} + + +@dataclass(frozen=True) +class ImageLayerFileName: + lsn: Lsn + key_start: Key + key_end: Key + + def to_str(self): + ret = ( + f"{self.key_start.as_int():036X}-{self.key_end.as_int():036X}__{self.lsn.as_int():016X}" + ) + assert self == parse_layer_file_name(ret) + return ret + + +@dataclass(frozen=True) +class DeltaLayerFileName: + lsn_start: Lsn + lsn_end: Lsn + key_start: Key + key_end: Key + + def is_l0(self): + return self.key_start == KEY_MIN and self.key_end == KEY_MAX + + def to_str(self): + ret = f"{self.key_start.as_int():036X}-{self.key_end.as_int():036X}__{self.lsn_start.as_int():016X}-{self.lsn_end.as_int():016X}" + assert self == parse_layer_file_name(ret) + return ret + + +LayerFileName = Union[ImageLayerFileName, DeltaLayerFileName] + + +class InvalidFileName(Exception): + pass + + +def parse_image_layer(f_name: str) -> Tuple[int, int, int]: + """Parse an image layer file name. Return key start, key end, and snapshot lsn""" + parts = f_name.split("__") + if len(parts) != 2: + raise InvalidFileName(f"expecting two parts separated by '__', got: {parts}") + key_parts = parts[0].split("-") + if len(key_parts) != 2: + raise InvalidFileName( + f"expecting two key parts separated by '--' in parts[0], got: {key_parts}" + ) + try: + return int(key_parts[0], 16), int(key_parts[1], 16), int(parts[1], 16) + except ValueError as e: + raise InvalidFileName(f"conversion error: {f_name}") from e + + +def parse_delta_layer(f_name: str) -> Tuple[int, int, int, int]: + """Parse a delta layer file name. Return key start, key end, lsn start, and lsn end""" + parts = f_name.split("__") + if len(parts) != 2: + raise InvalidFileName(f"expecting two parts separated by '__', got: {parts}") + key_parts = parts[0].split("-") + if len(key_parts) != 2: + raise InvalidFileName( + f"expecting two key parts separated by '--' in parts[0], got: {key_parts}" + ) + lsn_parts = parts[1].split("-") + if len(lsn_parts) != 2: + raise InvalidFileName( + f"expecting two lsn parts separated by '--' in parts[1], got: {lsn_parts}" + ) + try: + return ( + int(key_parts[0], 16), + int(key_parts[1], 16), + int(lsn_parts[0], 16), + int(lsn_parts[1], 16), + ) + except ValueError as e: + raise InvalidFileName(f"conversion error: {f_name}") from e + + +def parse_layer_file_name(file_name: str) -> LayerFileName: + try: + key_start, key_end, lsn = parse_image_layer(file_name) + return ImageLayerFileName(lsn=Lsn(lsn), key_start=Key(key_start), key_end=Key(key_end)) + except InvalidFileName: + pass + + try: + key_start, key_end, lsn_start, lsn_end = parse_delta_layer(file_name) + return DeltaLayerFileName( + lsn_start=Lsn(lsn_start), + lsn_end=Lsn(lsn_end), + key_start=Key(key_start), + key_end=Key(key_end), + ) + except InvalidFileName: + pass + + raise ValueError() + + +def is_future_layer(layer_file_name: LayerFileName, disk_consistent_lsn: Lsn): + """ + Determines if this layer file is considered to be in future meaning we will discard these + layers during timeline initialization from the given disk_consistent_lsn. + """ + if ( + isinstance(layer_file_name, ImageLayerFileName) + and layer_file_name.lsn > disk_consistent_lsn + ): + return True + elif ( + isinstance(layer_file_name, DeltaLayerFileName) + and layer_file_name.lsn_end > disk_consistent_lsn + 1 + ): + return True + else: + return False + + +@dataclass +class IndexPartDump: + layer_metadata: Dict[LayerFileName, IndexLayerMetadata] + disk_consistent_lsn: Lsn + + @classmethod + def from_json(cls, d: Dict[str, Any]) -> "IndexPartDump": + return IndexPartDump( + layer_metadata={ + parse_layer_file_name(n): IndexLayerMetadata.from_json(v) + for n, v in d["layer_metadata"].items() + }, + disk_consistent_lsn=Lsn(d["disk_consistent_lsn"]), + ) diff --git a/test_runner/fixtures/remote_storage.py b/test_runner/fixtures/remote_storage.py index 535f8c2fe7..954c3142a3 100644 --- a/test_runner/fixtures/remote_storage.py +++ b/test_runner/fixtures/remote_storage.py @@ -12,6 +12,7 @@ import boto3 from mypy_boto3_s3 import S3Client from fixtures.log_helper import log +from fixtures.pageserver.types import LayerFileName from fixtures.types import TenantId, TimelineId TIMELINE_INDEX_PART_FILE_NAME = "index_part.json" @@ -87,6 +88,11 @@ class LocalFsStorage: def timeline_path(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path: return self.tenant_path(tenant_id) / "timelines" / str(timeline_id) + def layer_path( + self, tenant_id: TenantId, timeline_id: TimelineId, layer_file_name: LayerFileName + ): + return self.timeline_path(tenant_id, timeline_id) / layer_file_name.to_str() + def index_path(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path: return self.timeline_path(tenant_id, timeline_id) / TIMELINE_INDEX_PART_FILE_NAME diff --git a/test_runner/fixtures/types.py b/test_runner/fixtures/types.py index ef88e09de4..d95368f990 100644 --- a/test_runner/fixtures/types.py +++ b/test_runner/fixtures/types.py @@ -1,4 +1,5 @@ import random +from dataclasses import dataclass from functools import total_ordering from typing import Any, Type, TypeVar, Union @@ -36,6 +37,11 @@ class Lsn: return NotImplemented return self.lsn_int < other.lsn_int + def __gt__(self, other: Any) -> bool: + if not isinstance(other, Lsn): + raise NotImplementedError + return self.lsn_int > other.lsn_int + def __eq__(self, other: Any) -> bool: if not isinstance(other, Lsn): return NotImplemented @@ -47,9 +53,32 @@ class Lsn: return NotImplemented return self.lsn_int - other.lsn_int + def __add__(self, other: Union[int, "Lsn"]) -> "Lsn": + if isinstance(other, int): + return Lsn(self.lsn_int + other) + elif isinstance(other, Lsn): + return Lsn(self.lsn_int + other.lsn_int) + else: + raise NotImplementedError + def __hash__(self) -> int: return hash(self.lsn_int) + def as_int(self) -> int: + return self.lsn_int + + +@dataclass(frozen=True) +class Key: + key_int: int + + def as_int(self) -> int: + return self.key_int + + +KEY_MAX = Key(0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF) +KEY_MIN = Key(0) + @total_ordering class Id: diff --git a/test_runner/fixtures/utils.py b/test_runner/fixtures/utils.py index ba8d70d5a9..6e857766e5 100644 --- a/test_runner/fixtures/utils.py +++ b/test_runner/fixtures/utils.py @@ -6,7 +6,16 @@ import subprocess import threading import time from pathlib import Path -from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, TypeVar +from typing import ( + TYPE_CHECKING, + Any, + Callable, + Dict, + List, + Optional, + Tuple, + TypeVar, +) from urllib.parse import urlencode import allure @@ -14,6 +23,10 @@ import zstandard from psycopg2.extensions import cursor from fixtures.log_helper import log +from fixtures.pageserver.types import ( + parse_delta_layer, + parse_image_layer, +) if TYPE_CHECKING: from fixtures.neon_fixtures import PgBin @@ -193,26 +206,6 @@ def get_timeline_dir_size(path: Path) -> int: return sz -def parse_image_layer(f_name: str) -> Tuple[int, int, int]: - """Parse an image layer file name. Return key start, key end, and snapshot lsn""" - parts = f_name.split("__") - key_parts = parts[0].split("-") - return int(key_parts[0], 16), int(key_parts[1], 16), int(parts[1], 16) - - -def parse_delta_layer(f_name: str) -> Tuple[int, int, int, int]: - """Parse a delta layer file name. Return key start, key end, lsn start, and lsn end""" - parts = f_name.split("__") - key_parts = parts[0].split("-") - lsn_parts = parts[1].split("-") - return ( - int(key_parts[0], 16), - int(key_parts[1], 16), - int(lsn_parts[0], 16), - int(lsn_parts[1], 16), - ) - - def get_scale_for_db(size_mb: int) -> int: """Returns pgbench scale factor for given target db size in MB. diff --git a/test_runner/regress/test_layers_from_future.py b/test_runner/regress/test_layers_from_future.py new file mode 100644 index 0000000000..b36c927628 --- /dev/null +++ b/test_runner/regress/test_layers_from_future.py @@ -0,0 +1,222 @@ +import time + +from fixtures.log_helper import log +from fixtures.neon_fixtures import NeonEnvBuilder +from fixtures.pageserver.types import ( + DeltaLayerFileName, + ImageLayerFileName, + is_future_layer, +) +from fixtures.pageserver.utils import ( + wait_for_last_record_lsn, + wait_for_upload_queue_empty, + wait_until_tenant_active, +) +from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind +from fixtures.types import Lsn +from fixtures.utils import query_scalar, wait_until + + +def test_issue_5878(neon_env_builder: NeonEnvBuilder): + """ + Regression test for issue https://github.com/neondatabase/neon/issues/5878 . + + Create a situation where IndexPart contains an image layer from a future + (i.e., image layer > IndexPart::disk_consistent_lsn). + Detach. + Attach. + Wait for tenant to finish load_layer_map (by waiting for it to become active). + Wait for any remote timeline client ops to finish that the attach started. + Integrity-check the index part. + + Before fixing the issue, load_layer_map would schedule removal of the future + image layer. A compaction run could later re-create the image layer with + the same file name, scheduling a PUT. + Due to lack of an upload queue barrier, the PUT and DELETE could be re-ordered. + The result was IndexPart referencing a non-existent object. + """ + neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS) + + env = neon_env_builder.init_start() + + ps_http = env.pageserver.http_client() + + l0_l1_threshold = 3 + image_creation_threshold = 1 + + tenant_config = { + "gc_period": "0s", # disable GC (shouldn't matter for this test but still) + "compaction_period": "0s", # we want to control when compaction runs + "checkpoint_timeout": "24h", # something we won't reach + "checkpoint_distance": f"{50 * (1024**2)}", # something we won't reach, we checkpoint manually + "image_creation_threshold": f"{image_creation_threshold}", + "compaction_threshold": f"{l0_l1_threshold}", + "compaction_target_size": f"{128 * (1024**3)}", # make it so that we only have 1 partition => image coverage for delta layers => enables gc of delta layers + } + + tenant_id, timeline_id = env.neon_cli.create_tenant(conf=tenant_config) + + endpoint = env.endpoints.create_start("main", tenant_id=tenant_id) + + def get_index_part(): + assert isinstance(env.pageserver_remote_storage, LocalFsStorage) + ip_path = env.pageserver_remote_storage.index_path(tenant_id, timeline_id) + return env.pagectl.dump_index_part(ip_path) + + def get_future_layers(): + ip = get_index_part() + future_layers = [ + layer_file_name + for layer_file_name in ip.layer_metadata.keys() + if is_future_layer(layer_file_name, ip.disk_consistent_lsn) + ] + return future_layers + + assert len(get_future_layers()) == 0 + + current = get_index_part() + assert len(set(current.layer_metadata.keys())) == 1 + layer_file_name = list(current.layer_metadata.keys())[0] + assert isinstance(layer_file_name, DeltaLayerFileName) + assert layer_file_name.is_l0(), f"{layer_file_name}" + + log.info("force image layer creation in the future by writing some data into in-memory layer") + + # Create a number of layers in the tenant + with endpoint.cursor() as cur: + cur.execute("CREATE TABLE foo (t text)") + iters = l0_l1_threshold * image_creation_threshold + for i in range(0, iters): + cur.execute( + f""" + INSERT INTO foo + SELECT '{i}' || g + FROM generate_series(1, 10000) g + """ + ) + last_record_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()")) + wait_for_last_record_lsn(ps_http, tenant_id, timeline_id, last_record_lsn) + # 0..iters-1: create a stack of delta layers + # iters: leave a non-empty in-memory layer which we'll use for image layer generation + if i < iters - 1: + ps_http.timeline_checkpoint(tenant_id, timeline_id, force_repartition=True) + assert ( + len( + [ + layer + for layer in ps_http.layer_map_info( + tenant_id, timeline_id + ).historic_layers + if layer.kind == "Image" + ] + ) + == 0 + ) + + endpoint.stop() + + wait_for_upload_queue_empty(ps_http, tenant_id, timeline_id) + + ip = get_index_part() + assert len(ip.layer_metadata.keys()) + assert ( + ip.disk_consistent_lsn < last_record_lsn + ), "sanity check for what above loop is supposed to do" + + # create the image layer from the future + ps_http.timeline_compact(tenant_id, timeline_id, force_repartition=True) + assert ( + len( + [ + layer + for layer in ps_http.layer_map_info(tenant_id, timeline_id).historic_layers + if layer.kind == "Image" + ] + ) + == 1 + ) + wait_for_upload_queue_empty(ps_http, tenant_id, timeline_id) + future_layers = get_future_layers() + assert len(future_layers) == 1 + future_layer = future_layers[0] + assert isinstance(future_layer, ImageLayerFileName) + assert future_layer.lsn == last_record_lsn + log.info( + f"got layer from the future: lsn={future_layer.lsn} disk_consistent_lsn={ip.disk_consistent_lsn} last_record_lsn={last_record_lsn}" + ) + assert isinstance(env.pageserver_remote_storage, LocalFsStorage) + future_layer_path = env.pageserver_remote_storage.layer_path( + tenant_id, timeline_id, future_layer + ) + log.info(f"future layer path: {future_layer_path}") + pre_stat = future_layer_path.stat() + time.sleep(1.1) # so that we can use change in pre_stat.st_mtime to detect overwrites + + # force removal of layers from the future + tenant_conf = ps_http.tenant_config(tenant_id) + ps_http.tenant_detach(tenant_id) + failpoint_name = "before-delete-layer-pausable" + ps_http.configure_failpoints((failpoint_name, "pause")) + ps_http.tenant_attach(tenant_id, tenant_conf.tenant_specific_overrides) + wait_until_tenant_active(ps_http, tenant_id) + + # Ensure the IndexPart upload that unlinks the layer file finishes, i.e., doesn't clog the queue. + def future_layer_is_gone_from_index_part(): + future_layers = set(get_future_layers()) + assert future_layer not in future_layers + + wait_until(10, 0.5, future_layer_is_gone_from_index_part) + + # NB: the layer file is unlinked index part now, but, because we made the delete + # operation stuck, the layer file itself is still in the remote_storage + def delete_at_pause_point(): + assert env.pageserver.log_contains(f".*{tenant_id}.*at failpoint.*{failpoint_name}") + + wait_until(10, 0.5, delete_at_pause_point) + assert future_layer_path.exists() + + # wait for re-ingestion of the WAL from safekeepers into the in-memory layer + # (this happens in parallel to the above) + wait_for_last_record_lsn(ps_http, tenant_id, timeline_id, last_record_lsn) + + # re-do image layer generation + # This will produce the same image layer and queue an upload. + # However, we still have the deletion for the layer queued, stuck on the failpoint. + # An incorrect implementation would let the PUT execute before the DELETE. + # The later code in this test asserts that this doesn't happen. + ps_http.timeline_compact(tenant_id, timeline_id, force_repartition=True) + + # Let things sit for some time; a good implementation makes no progress because + # we can't execute the PUT before the DELETE. A bad implementation would do that. + max_race_opportunity_window = 4 + start = time.monotonic() + while True: + post_stat = future_layer_path.stat() + assert ( + pre_stat.st_mtime == post_stat.st_mtime + ), "observed PUT overtake the stucked DELETE => bug isn't fixed yet" + if time.monotonic() - start > max_race_opportunity_window: + log.info( + "a correct implementation would never let the later PUT overtake the earlier DELETE" + ) + break + time.sleep(1) + + # Window has passed, unstuck the delete, let upload queue drain. + log.info("unstuck the DELETE") + ps_http.configure_failpoints(("before-delete-layer-pausable", "off")) + + wait_for_upload_queue_empty(ps_http, tenant_id, timeline_id) + + # Examine the resulting S3 state. + log.info("integrity-check the remote storage") + ip = get_index_part() + for layer_file_name in ip.layer_metadata.keys(): + layer_path = env.pageserver_remote_storage.layer_path( + tenant_id, timeline_id, layer_file_name + ) + assert layer_path.exists(), f"{layer_file_name.to_str()}" + + log.info("assert that the overwritten layer won") + final_stat = future_layer_path.stat() + assert final_stat.st_mtime != pre_stat.st_mtime