fix: cleanup of layers from the future can race with their re-creation (#5890)

fixes https://github.com/neondatabase/neon/issues/5878
obsoletes https://github.com/neondatabase/neon/issues/5879

Before this PR, it could happen that `load_layer_map` schedules removal
of the future
image layer. Then a later compaction run could re-create the same image
layer, scheduling a PUT.
Due to lack of an upload queue barrier, the PUT and DELETE could be
re-ordered.
The result was IndexPart referencing a non-existent object.

## Summary of changes

* Add support to `pagectl` / Python tests to decode `IndexPart`
  * Rust
    * new `pagectl` Subcommand
* `IndexPart::{from,to}_s3_bytes()` methods to internalize knowledge
about encoding of `IndexPart`
  * Python
    * new `NeonCli` subclass
* Add regression test
  * Rust
* Ability to force repartitioning; required to ensure image layer
creation at last_record_lsn
  * Python
    * The regression test.
* Fix the issue
  * Insert an `UploadOp::Barrier` after scheduling the deletions.
This commit is contained in:
Christian Schwarz
2023-11-23 14:33:41 +01:00
committed by GitHub
parent 6afbadc90e
commit a0e61145c8
18 changed files with 615 additions and 56 deletions

2
Cargo.lock generated
View File

@@ -2905,6 +2905,8 @@ dependencies = [
"git-version",
"pageserver",
"postgres_ffi",
"serde",
"serde_json",
"svg_fmt",
"tokio",
"utils",

View File

@@ -18,3 +18,5 @@ tokio.workspace = true
utils.workspace = true
svg_fmt.workspace = true
workspace_hack.workspace = true
serde.workspace = true
serde_json.workspace = true

View File

@@ -0,0 +1,38 @@
use std::collections::HashMap;
use anyhow::Context;
use camino::Utf8PathBuf;
use pageserver::tenant::remote_timeline_client::index::IndexLayerMetadata;
use pageserver::tenant::storage_layer::LayerFileName;
use pageserver::tenant::{metadata::TimelineMetadata, IndexPart};
use utils::lsn::Lsn;
#[derive(clap::Subcommand)]
pub(crate) enum IndexPartCmd {
Dump { path: Utf8PathBuf },
}
pub(crate) async fn main(cmd: &IndexPartCmd) -> anyhow::Result<()> {
match cmd {
IndexPartCmd::Dump { path } => {
let bytes = tokio::fs::read(path).await.context("read file")?;
let des: IndexPart = IndexPart::from_s3_bytes(&bytes).context("deserialize")?;
#[derive(serde::Serialize)]
struct Output<'a> {
layer_metadata: &'a HashMap<LayerFileName, IndexLayerMetadata>,
disk_consistent_lsn: Lsn,
timeline_metadata: &'a TimelineMetadata,
}
let output = Output {
layer_metadata: &des.layer_metadata,
disk_consistent_lsn: des.get_disk_consistent_lsn(),
timeline_metadata: &des.metadata,
};
let output = serde_json::to_string_pretty(&output).context("serialize output")?;
println!("{output}");
Ok(())
}
}
}

View File

@@ -5,11 +5,13 @@
//! Separate, `metadata` subcommand allows to print and update pageserver's metadata file.
mod draw_timeline_dir;
mod index_part;
mod layer_map_analyzer;
mod layers;
use camino::{Utf8Path, Utf8PathBuf};
use clap::{Parser, Subcommand};
use index_part::IndexPartCmd;
use layers::LayerCmd;
use pageserver::{
context::{DownloadBehavior, RequestContext},
@@ -38,6 +40,8 @@ struct CliOpts {
#[derive(Subcommand)]
enum Commands {
Metadata(MetadataCmd),
#[command(subcommand)]
IndexPart(IndexPartCmd),
PrintLayerFile(PrintLayerFileCmd),
DrawTimeline {},
AnalyzeLayerMap(AnalyzeLayerMapCmd),
@@ -83,6 +87,9 @@ async fn main() -> anyhow::Result<()> {
Commands::Metadata(cmd) => {
handle_metadata(&cmd)?;
}
Commands::IndexPart(cmd) => {
index_part::main(&cmd).await?;
}
Commands::DrawTimeline {} => {
draw_timeline_dir::main()?;
}

View File

@@ -513,6 +513,7 @@ impl DeletionQueueClient {
) -> Result<(), DeletionQueueError> {
if current_generation.is_none() {
debug!("Enqueuing deletions in legacy mode, skipping queue");
let mut layer_paths = Vec::new();
for (layer, generation) in layers {
layer_paths.push(remote_layer_path(

View File

@@ -6,6 +6,7 @@ use std::str::FromStr;
use std::sync::Arc;
use anyhow::{anyhow, Context, Result};
use enumset::EnumSet;
use futures::TryFutureExt;
use humantime::format_rfc3339;
use hyper::header;
@@ -42,6 +43,7 @@ use crate::tenant::mgr::{
};
use crate::tenant::size::ModelInputs;
use crate::tenant::storage_layer::LayerAccessStatsReset;
use crate::tenant::timeline::CompactFlags;
use crate::tenant::timeline::Timeline;
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError, TenantSharedResources};
use crate::{config::PageServerConf, tenant::mgr};
@@ -1268,11 +1270,15 @@ async fn timeline_compact_handler(
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;
let mut flags = EnumSet::empty();
if Some(true) == parse_query_param::<_, bool>(&request, "force_repartition")? {
flags |= CompactFlags::ForceRepartition;
}
async {
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
timeline
.compact(&cancel, &ctx)
.compact(&cancel, flags, &ctx)
.await
.map_err(|e| ApiError::InternalServerError(e.into()))?;
json_response(StatusCode::OK, ())
@@ -1289,6 +1295,11 @@ async fn timeline_checkpoint_handler(
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;
let mut flags = EnumSet::empty();
if Some(true) == parse_query_param::<_, bool>(&request, "force_repartition")? {
flags |= CompactFlags::ForceRepartition;
}
async {
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
@@ -1297,7 +1308,7 @@ async fn timeline_checkpoint_handler(
.await
.map_err(ApiError::InternalServerError)?;
timeline
.compact(&cancel, &ctx)
.compact(&cancel, flags, &ctx)
.await
.map_err(|e| ApiError::InternalServerError(e.into()))?;

View File

@@ -13,6 +13,7 @@
use anyhow::{bail, Context};
use camino::{Utf8Path, Utf8PathBuf};
use enumset::EnumSet;
use futures::FutureExt;
use pageserver_api::models::TimelineState;
use remote_storage::DownloadError;
@@ -1699,7 +1700,7 @@ impl Tenant {
for (timeline_id, timeline) in &timelines_to_compact {
timeline
.compact(cancel, ctx)
.compact(cancel, EnumSet::empty(), ctx)
.instrument(info_span!("compact_timeline", %timeline_id))
.await?;
}
@@ -4299,7 +4300,9 @@ mod tests {
drop(writer);
tline.freeze_and_flush().await?;
tline.compact(&CancellationToken::new(), &ctx).await?;
tline
.compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
.await?;
let writer = tline.writer().await;
writer
@@ -4314,7 +4317,9 @@ mod tests {
drop(writer);
tline.freeze_and_flush().await?;
tline.compact(&CancellationToken::new(), &ctx).await?;
tline
.compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
.await?;
let writer = tline.writer().await;
writer
@@ -4329,7 +4334,9 @@ mod tests {
drop(writer);
tline.freeze_and_flush().await?;
tline.compact(&CancellationToken::new(), &ctx).await?;
tline
.compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
.await?;
let writer = tline.writer().await;
writer
@@ -4344,7 +4351,9 @@ mod tests {
drop(writer);
tline.freeze_and_flush().await?;
tline.compact(&CancellationToken::new(), &ctx).await?;
tline
.compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
.await?;
assert_eq!(
tline.get(*TEST_KEY, Lsn(0x10), &ctx).await?,
@@ -4415,7 +4424,9 @@ mod tests {
.update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx)
.await?;
tline.freeze_and_flush().await?;
tline.compact(&CancellationToken::new(), &ctx).await?;
tline
.compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
.await?;
tline.gc().await?;
}
@@ -4495,7 +4506,9 @@ mod tests {
.update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx)
.await?;
tline.freeze_and_flush().await?;
tline.compact(&CancellationToken::new(), &ctx).await?;
tline
.compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
.await?;
tline.gc().await?;
}
@@ -4585,7 +4598,9 @@ mod tests {
.update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx)
.await?;
tline.freeze_and_flush().await?;
tline.compact(&CancellationToken::new(), &ctx).await?;
tline
.compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
.await?;
tline.gc().await?;
}

View File

@@ -816,7 +816,7 @@ impl RemoteTimelineClient {
let mut receiver = {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
self.schedule_barrier(upload_queue)
self.schedule_barrier0(upload_queue)
};
if receiver.changed().await.is_err() {
@@ -825,7 +825,14 @@ impl RemoteTimelineClient {
Ok(())
}
fn schedule_barrier(
pub(crate) fn schedule_barrier(self: &Arc<Self>) -> anyhow::Result<()> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
self.schedule_barrier0(upload_queue);
Ok(())
}
fn schedule_barrier0(
self: &Arc<Self>,
upload_queue: &mut UploadQueueInitialized,
) -> tokio::sync::watch::Receiver<()> {
@@ -1229,8 +1236,9 @@ impl RemoteTimelineClient {
}
res
}
UploadOp::Delete(delete) => self
.deletion_queue_client
UploadOp::Delete(delete) => {
pausable_failpoint!("before-delete-layer-pausable");
self.deletion_queue_client
.push_layers(
self.tenant_id,
self.timeline_id,
@@ -1238,7 +1246,8 @@ impl RemoteTimelineClient {
delete.layers.clone(),
)
.await
.map_err(|e| anyhow::anyhow!(e)),
.map_err(|e| anyhow::anyhow!(e))
}
UploadOp::Barrier(_) => {
// unreachable. Barrier operations are handled synchronously in
// launch_queued_tasks

View File

@@ -128,6 +128,14 @@ impl IndexPart {
pub fn get_disk_consistent_lsn(&self) -> Lsn {
self.disk_consistent_lsn
}
pub fn from_s3_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
serde_json::from_slice::<IndexPart>(bytes)
}
pub fn to_s3_bytes(&self) -> serde_json::Result<Vec<u8>> {
serde_json::to_vec(self)
}
}
impl TryFrom<&UploadQueueInitialized> for IndexPart {
@@ -201,7 +209,7 @@ mod tests {
deleted_at: None,
};
let part = serde_json::from_str::<IndexPart>(example).unwrap();
let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap();
assert_eq!(part, expected);
}
@@ -239,7 +247,7 @@ mod tests {
deleted_at: None,
};
let part = serde_json::from_str::<IndexPart>(example).unwrap();
let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap();
assert_eq!(part, expected);
}
@@ -279,7 +287,7 @@ mod tests {
"2023-07-31T09:00:00.123000000", "%Y-%m-%dT%H:%M:%S.%f").unwrap())
};
let part = serde_json::from_str::<IndexPart>(example).unwrap();
let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap();
assert_eq!(part, expected);
}
@@ -323,7 +331,7 @@ mod tests {
deleted_at: None,
};
let empty_layers_parsed = serde_json::from_str::<IndexPart>(empty_layers_json).unwrap();
let empty_layers_parsed = IndexPart::from_s3_bytes(empty_layers_json.as_bytes()).unwrap();
assert_eq!(empty_layers_parsed, expected);
}
@@ -361,7 +369,7 @@ mod tests {
"2023-07-31T09:00:00.123000000", "%Y-%m-%dT%H:%M:%S.%f").unwrap())
};
let part = serde_json::from_str::<IndexPart>(example).unwrap();
let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap();
assert_eq!(part, expected);
}
}

View File

@@ -33,8 +33,9 @@ pub(super) async fn upload_index_part<'a>(
});
pausable_failpoint!("before-upload-index-pausable");
let index_part_bytes =
serde_json::to_vec(&index_part).context("serialize index part file into bytes")?;
let index_part_bytes = index_part
.to_s3_bytes()
.context("serialize index part file into bytes")?;
let index_part_size = index_part_bytes.len();
let index_part_bytes = tokio::io::BufReader::new(std::io::Cursor::new(index_part_bytes));

View File

@@ -10,6 +10,7 @@ mod walreceiver;
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::Bytes;
use camino::{Utf8Path, Utf8PathBuf};
use enumset::EnumSet;
use fail::fail_point;
use itertools::Itertools;
use pageserver_api::models::{
@@ -437,6 +438,11 @@ pub enum LogicalSizeCalculationCause {
TenantSizeHandler,
}
#[derive(enumset::EnumSetType)]
pub(crate) enum CompactFlags {
ForceRepartition,
}
/// Public interface functions
impl Timeline {
/// Get the LSN where this branch was created
@@ -694,6 +700,7 @@ impl Timeline {
pub(crate) async fn compact(
self: &Arc<Self>,
cancel: &CancellationToken,
flags: EnumSet<CompactFlags>,
ctx: &RequestContext,
) -> Result<(), CompactionError> {
// this wait probably never needs any "long time spent" logging, because we already nag if
@@ -766,6 +773,7 @@ impl Timeline {
.repartition(
self.get_last_record_lsn(),
self.get_compaction_target_size(),
flags,
ctx,
)
.await
@@ -1711,6 +1719,30 @@ impl Timeline {
if let Some(rtc) = self.remote_client.as_ref() {
rtc.schedule_layer_file_deletion(&needs_cleanup)?;
rtc.schedule_index_upload_for_file_changes()?;
// This barrier orders above DELETEs before any later operations.
// This is critical because code executing after the barrier might
// create again objects with the same key that we just scheduled for deletion.
// For example, if we just scheduled deletion of an image layer "from the future",
// later compaction might run again and re-create the same image layer.
// "from the future" here means an image layer whose LSN is > IndexPart::disk_consistent_lsn.
// "same" here means same key range and LSN.
//
// Without a barrier between above DELETEs and the re-creation's PUTs,
// the upload queue may execute the PUT first, then the DELETE.
// In our example, we will end up with an IndexPart referencing a non-existent object.
//
// 1. a future image layer is created and uploaded
// 2. ps restart
// 3. the future layer from (1) is deleted during load layer map
// 4. image layer is re-created and uploaded
// 5. deletion queue would like to delete (1) but actually deletes (4)
// 6. delete by name works as expected, but it now deletes the wrong (later) version
//
// See https://github.com/neondatabase/neon/issues/5878
//
// NB: generation numbers naturally protect against this because they disambiguate
// (1) and (4)
rtc.schedule_barrier()?;
// Tenant::create_timeline will wait for these uploads to happen before returning, or
// on retry.
}
@@ -2525,7 +2557,12 @@ impl Timeline {
// Note: The 'ctx' in use here has DownloadBehavior::Error. We should not
// require downloading anything during initial import.
let (partitioning, _lsn) = self
.repartition(self.initdb_lsn, self.get_compaction_target_size(), ctx)
.repartition(
self.initdb_lsn,
self.get_compaction_target_size(),
EnumSet::empty(),
ctx,
)
.await?;
if self.cancel.is_cancelled() {
@@ -2744,12 +2781,16 @@ impl Timeline {
&self,
lsn: Lsn,
partition_size: u64,
flags: EnumSet<CompactFlags>,
ctx: &RequestContext,
) -> anyhow::Result<(KeyPartitioning, Lsn)> {
{
let partitioning_guard = self.partitioning.lock().unwrap();
let distance = lsn.0 - partitioning_guard.1 .0;
if partitioning_guard.1 != Lsn(0) && distance <= self.repartition_threshold {
if partitioning_guard.1 != Lsn(0)
&& distance <= self.repartition_threshold
&& !flags.contains(CompactFlags::ForceRepartition)
{
debug!(
distance,
threshold = self.repartition_threshold,

View File

@@ -42,6 +42,7 @@ from urllib3.util.retry import Retry
from fixtures.broker import NeonBroker
from fixtures.log_helper import log
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.types import IndexPartDump
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.pg_version import PgVersion
from fixtures.port_distributor import PortDistributor
@@ -702,6 +703,7 @@ class NeonEnv:
self.port_distributor = config.port_distributor
self.s3_mock_server = config.mock_s3_server
self.neon_cli = NeonCli(env=self)
self.pagectl = Pagectl(env=self)
self.endpoints = EndpointFactory(self)
self.safekeepers: List[Safekeeper] = []
self.pageservers: List[NeonPageserver] = []
@@ -1558,6 +1560,20 @@ class ComputeCtl(AbstractNeonCli):
COMMAND = "compute_ctl"
class Pagectl(AbstractNeonCli):
"""
A typed wrapper around the `pagectl` utility CLI tool.
"""
COMMAND = "pagectl"
def dump_index_part(self, path: Path) -> IndexPartDump:
res = self.raw_cli(["index-part", "dump", str(path)])
res.check_returncode()
parsed = json.loads(res.stdout)
return IndexPartDump.from_json(parsed)
class NeonAttachmentService:
def __init__(self, env: NeonEnv):
self.env = env

View File

@@ -432,12 +432,18 @@ class PageserverHttpClient(requests.Session):
assert isinstance(res_json, dict)
return res_json
def timeline_compact(self, tenant_id: TenantId, timeline_id: TimelineId):
def timeline_compact(
self, tenant_id: TenantId, timeline_id: TimelineId, force_repartition=False
):
self.is_testing_enabled_or_skip()
query = {}
if force_repartition:
query["force_repartition"] = "true"
log.info(f"Requesting compact: tenant {tenant_id}, timeline {timeline_id}")
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/compact"
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/compact",
params=query,
)
log.info(f"Got compact request response code: {res.status_code}")
self.verbose_error(res)
@@ -466,12 +472,18 @@ class PageserverHttpClient(requests.Session):
res_json = res.json()
return res_json
def timeline_checkpoint(self, tenant_id: TenantId, timeline_id: TimelineId):
def timeline_checkpoint(
self, tenant_id: TenantId, timeline_id: TimelineId, force_repartition=False
):
self.is_testing_enabled_or_skip()
query = {}
if force_repartition:
query["force_repartition"] = "true"
log.info(f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}")
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint"
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint",
params=query,
)
log.info(f"Got checkpoint request response code: {res.status_code}")
self.verbose_error(res)

View File

@@ -0,0 +1,146 @@
from dataclasses import dataclass
from typing import Any, Dict, Tuple, Union
from fixtures.types import KEY_MAX, KEY_MIN, Key, Lsn
@dataclass
class IndexLayerMetadata:
@classmethod
def from_json(cls, d: Dict[str, Any]):
return {}
@dataclass(frozen=True)
class ImageLayerFileName:
lsn: Lsn
key_start: Key
key_end: Key
def to_str(self):
ret = (
f"{self.key_start.as_int():036X}-{self.key_end.as_int():036X}__{self.lsn.as_int():016X}"
)
assert self == parse_layer_file_name(ret)
return ret
@dataclass(frozen=True)
class DeltaLayerFileName:
lsn_start: Lsn
lsn_end: Lsn
key_start: Key
key_end: Key
def is_l0(self):
return self.key_start == KEY_MIN and self.key_end == KEY_MAX
def to_str(self):
ret = f"{self.key_start.as_int():036X}-{self.key_end.as_int():036X}__{self.lsn_start.as_int():016X}-{self.lsn_end.as_int():016X}"
assert self == parse_layer_file_name(ret)
return ret
LayerFileName = Union[ImageLayerFileName, DeltaLayerFileName]
class InvalidFileName(Exception):
pass
def parse_image_layer(f_name: str) -> Tuple[int, int, int]:
"""Parse an image layer file name. Return key start, key end, and snapshot lsn"""
parts = f_name.split("__")
if len(parts) != 2:
raise InvalidFileName(f"expecting two parts separated by '__', got: {parts}")
key_parts = parts[0].split("-")
if len(key_parts) != 2:
raise InvalidFileName(
f"expecting two key parts separated by '--' in parts[0], got: {key_parts}"
)
try:
return int(key_parts[0], 16), int(key_parts[1], 16), int(parts[1], 16)
except ValueError as e:
raise InvalidFileName(f"conversion error: {f_name}") from e
def parse_delta_layer(f_name: str) -> Tuple[int, int, int, int]:
"""Parse a delta layer file name. Return key start, key end, lsn start, and lsn end"""
parts = f_name.split("__")
if len(parts) != 2:
raise InvalidFileName(f"expecting two parts separated by '__', got: {parts}")
key_parts = parts[0].split("-")
if len(key_parts) != 2:
raise InvalidFileName(
f"expecting two key parts separated by '--' in parts[0], got: {key_parts}"
)
lsn_parts = parts[1].split("-")
if len(lsn_parts) != 2:
raise InvalidFileName(
f"expecting two lsn parts separated by '--' in parts[1], got: {lsn_parts}"
)
try:
return (
int(key_parts[0], 16),
int(key_parts[1], 16),
int(lsn_parts[0], 16),
int(lsn_parts[1], 16),
)
except ValueError as e:
raise InvalidFileName(f"conversion error: {f_name}") from e
def parse_layer_file_name(file_name: str) -> LayerFileName:
try:
key_start, key_end, lsn = parse_image_layer(file_name)
return ImageLayerFileName(lsn=Lsn(lsn), key_start=Key(key_start), key_end=Key(key_end))
except InvalidFileName:
pass
try:
key_start, key_end, lsn_start, lsn_end = parse_delta_layer(file_name)
return DeltaLayerFileName(
lsn_start=Lsn(lsn_start),
lsn_end=Lsn(lsn_end),
key_start=Key(key_start),
key_end=Key(key_end),
)
except InvalidFileName:
pass
raise ValueError()
def is_future_layer(layer_file_name: LayerFileName, disk_consistent_lsn: Lsn):
"""
Determines if this layer file is considered to be in future meaning we will discard these
layers during timeline initialization from the given disk_consistent_lsn.
"""
if (
isinstance(layer_file_name, ImageLayerFileName)
and layer_file_name.lsn > disk_consistent_lsn
):
return True
elif (
isinstance(layer_file_name, DeltaLayerFileName)
and layer_file_name.lsn_end > disk_consistent_lsn + 1
):
return True
else:
return False
@dataclass
class IndexPartDump:
layer_metadata: Dict[LayerFileName, IndexLayerMetadata]
disk_consistent_lsn: Lsn
@classmethod
def from_json(cls, d: Dict[str, Any]) -> "IndexPartDump":
return IndexPartDump(
layer_metadata={
parse_layer_file_name(n): IndexLayerMetadata.from_json(v)
for n, v in d["layer_metadata"].items()
},
disk_consistent_lsn=Lsn(d["disk_consistent_lsn"]),
)

View File

@@ -12,6 +12,7 @@ import boto3
from mypy_boto3_s3 import S3Client
from fixtures.log_helper import log
from fixtures.pageserver.types import LayerFileName
from fixtures.types import TenantId, TimelineId
TIMELINE_INDEX_PART_FILE_NAME = "index_part.json"
@@ -87,6 +88,11 @@ class LocalFsStorage:
def timeline_path(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path:
return self.tenant_path(tenant_id) / "timelines" / str(timeline_id)
def layer_path(
self, tenant_id: TenantId, timeline_id: TimelineId, layer_file_name: LayerFileName
):
return self.timeline_path(tenant_id, timeline_id) / layer_file_name.to_str()
def index_path(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path:
return self.timeline_path(tenant_id, timeline_id) / TIMELINE_INDEX_PART_FILE_NAME

View File

@@ -1,4 +1,5 @@
import random
from dataclasses import dataclass
from functools import total_ordering
from typing import Any, Type, TypeVar, Union
@@ -36,6 +37,11 @@ class Lsn:
return NotImplemented
return self.lsn_int < other.lsn_int
def __gt__(self, other: Any) -> bool:
if not isinstance(other, Lsn):
raise NotImplementedError
return self.lsn_int > other.lsn_int
def __eq__(self, other: Any) -> bool:
if not isinstance(other, Lsn):
return NotImplemented
@@ -47,9 +53,32 @@ class Lsn:
return NotImplemented
return self.lsn_int - other.lsn_int
def __add__(self, other: Union[int, "Lsn"]) -> "Lsn":
if isinstance(other, int):
return Lsn(self.lsn_int + other)
elif isinstance(other, Lsn):
return Lsn(self.lsn_int + other.lsn_int)
else:
raise NotImplementedError
def __hash__(self) -> int:
return hash(self.lsn_int)
def as_int(self) -> int:
return self.lsn_int
@dataclass(frozen=True)
class Key:
key_int: int
def as_int(self) -> int:
return self.key_int
KEY_MAX = Key(0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF)
KEY_MIN = Key(0)
@total_ordering
class Id:

View File

@@ -6,7 +6,16 @@ import subprocess
import threading
import time
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, TypeVar
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
List,
Optional,
Tuple,
TypeVar,
)
from urllib.parse import urlencode
import allure
@@ -14,6 +23,10 @@ import zstandard
from psycopg2.extensions import cursor
from fixtures.log_helper import log
from fixtures.pageserver.types import (
parse_delta_layer,
parse_image_layer,
)
if TYPE_CHECKING:
from fixtures.neon_fixtures import PgBin
@@ -193,26 +206,6 @@ def get_timeline_dir_size(path: Path) -> int:
return sz
def parse_image_layer(f_name: str) -> Tuple[int, int, int]:
"""Parse an image layer file name. Return key start, key end, and snapshot lsn"""
parts = f_name.split("__")
key_parts = parts[0].split("-")
return int(key_parts[0], 16), int(key_parts[1], 16), int(parts[1], 16)
def parse_delta_layer(f_name: str) -> Tuple[int, int, int, int]:
"""Parse a delta layer file name. Return key start, key end, lsn start, and lsn end"""
parts = f_name.split("__")
key_parts = parts[0].split("-")
lsn_parts = parts[1].split("-")
return (
int(key_parts[0], 16),
int(key_parts[1], 16),
int(lsn_parts[0], 16),
int(lsn_parts[1], 16),
)
def get_scale_for_db(size_mb: int) -> int:
"""Returns pgbench scale factor for given target db size in MB.

View File

@@ -0,0 +1,222 @@
import time
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.pageserver.types import (
DeltaLayerFileName,
ImageLayerFileName,
is_future_layer,
)
from fixtures.pageserver.utils import (
wait_for_last_record_lsn,
wait_for_upload_queue_empty,
wait_until_tenant_active,
)
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
from fixtures.types import Lsn
from fixtures.utils import query_scalar, wait_until
def test_issue_5878(neon_env_builder: NeonEnvBuilder):
"""
Regression test for issue https://github.com/neondatabase/neon/issues/5878 .
Create a situation where IndexPart contains an image layer from a future
(i.e., image layer > IndexPart::disk_consistent_lsn).
Detach.
Attach.
Wait for tenant to finish load_layer_map (by waiting for it to become active).
Wait for any remote timeline client ops to finish that the attach started.
Integrity-check the index part.
Before fixing the issue, load_layer_map would schedule removal of the future
image layer. A compaction run could later re-create the image layer with
the same file name, scheduling a PUT.
Due to lack of an upload queue barrier, the PUT and DELETE could be re-ordered.
The result was IndexPart referencing a non-existent object.
"""
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
env = neon_env_builder.init_start()
ps_http = env.pageserver.http_client()
l0_l1_threshold = 3
image_creation_threshold = 1
tenant_config = {
"gc_period": "0s", # disable GC (shouldn't matter for this test but still)
"compaction_period": "0s", # we want to control when compaction runs
"checkpoint_timeout": "24h", # something we won't reach
"checkpoint_distance": f"{50 * (1024**2)}", # something we won't reach, we checkpoint manually
"image_creation_threshold": f"{image_creation_threshold}",
"compaction_threshold": f"{l0_l1_threshold}",
"compaction_target_size": f"{128 * (1024**3)}", # make it so that we only have 1 partition => image coverage for delta layers => enables gc of delta layers
}
tenant_id, timeline_id = env.neon_cli.create_tenant(conf=tenant_config)
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
def get_index_part():
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
ip_path = env.pageserver_remote_storage.index_path(tenant_id, timeline_id)
return env.pagectl.dump_index_part(ip_path)
def get_future_layers():
ip = get_index_part()
future_layers = [
layer_file_name
for layer_file_name in ip.layer_metadata.keys()
if is_future_layer(layer_file_name, ip.disk_consistent_lsn)
]
return future_layers
assert len(get_future_layers()) == 0
current = get_index_part()
assert len(set(current.layer_metadata.keys())) == 1
layer_file_name = list(current.layer_metadata.keys())[0]
assert isinstance(layer_file_name, DeltaLayerFileName)
assert layer_file_name.is_l0(), f"{layer_file_name}"
log.info("force image layer creation in the future by writing some data into in-memory layer")
# Create a number of layers in the tenant
with endpoint.cursor() as cur:
cur.execute("CREATE TABLE foo (t text)")
iters = l0_l1_threshold * image_creation_threshold
for i in range(0, iters):
cur.execute(
f"""
INSERT INTO foo
SELECT '{i}' || g
FROM generate_series(1, 10000) g
"""
)
last_record_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
wait_for_last_record_lsn(ps_http, tenant_id, timeline_id, last_record_lsn)
# 0..iters-1: create a stack of delta layers
# iters: leave a non-empty in-memory layer which we'll use for image layer generation
if i < iters - 1:
ps_http.timeline_checkpoint(tenant_id, timeline_id, force_repartition=True)
assert (
len(
[
layer
for layer in ps_http.layer_map_info(
tenant_id, timeline_id
).historic_layers
if layer.kind == "Image"
]
)
== 0
)
endpoint.stop()
wait_for_upload_queue_empty(ps_http, tenant_id, timeline_id)
ip = get_index_part()
assert len(ip.layer_metadata.keys())
assert (
ip.disk_consistent_lsn < last_record_lsn
), "sanity check for what above loop is supposed to do"
# create the image layer from the future
ps_http.timeline_compact(tenant_id, timeline_id, force_repartition=True)
assert (
len(
[
layer
for layer in ps_http.layer_map_info(tenant_id, timeline_id).historic_layers
if layer.kind == "Image"
]
)
== 1
)
wait_for_upload_queue_empty(ps_http, tenant_id, timeline_id)
future_layers = get_future_layers()
assert len(future_layers) == 1
future_layer = future_layers[0]
assert isinstance(future_layer, ImageLayerFileName)
assert future_layer.lsn == last_record_lsn
log.info(
f"got layer from the future: lsn={future_layer.lsn} disk_consistent_lsn={ip.disk_consistent_lsn} last_record_lsn={last_record_lsn}"
)
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
future_layer_path = env.pageserver_remote_storage.layer_path(
tenant_id, timeline_id, future_layer
)
log.info(f"future layer path: {future_layer_path}")
pre_stat = future_layer_path.stat()
time.sleep(1.1) # so that we can use change in pre_stat.st_mtime to detect overwrites
# force removal of layers from the future
tenant_conf = ps_http.tenant_config(tenant_id)
ps_http.tenant_detach(tenant_id)
failpoint_name = "before-delete-layer-pausable"
ps_http.configure_failpoints((failpoint_name, "pause"))
ps_http.tenant_attach(tenant_id, tenant_conf.tenant_specific_overrides)
wait_until_tenant_active(ps_http, tenant_id)
# Ensure the IndexPart upload that unlinks the layer file finishes, i.e., doesn't clog the queue.
def future_layer_is_gone_from_index_part():
future_layers = set(get_future_layers())
assert future_layer not in future_layers
wait_until(10, 0.5, future_layer_is_gone_from_index_part)
# NB: the layer file is unlinked index part now, but, because we made the delete
# operation stuck, the layer file itself is still in the remote_storage
def delete_at_pause_point():
assert env.pageserver.log_contains(f".*{tenant_id}.*at failpoint.*{failpoint_name}")
wait_until(10, 0.5, delete_at_pause_point)
assert future_layer_path.exists()
# wait for re-ingestion of the WAL from safekeepers into the in-memory layer
# (this happens in parallel to the above)
wait_for_last_record_lsn(ps_http, tenant_id, timeline_id, last_record_lsn)
# re-do image layer generation
# This will produce the same image layer and queue an upload.
# However, we still have the deletion for the layer queued, stuck on the failpoint.
# An incorrect implementation would let the PUT execute before the DELETE.
# The later code in this test asserts that this doesn't happen.
ps_http.timeline_compact(tenant_id, timeline_id, force_repartition=True)
# Let things sit for some time; a good implementation makes no progress because
# we can't execute the PUT before the DELETE. A bad implementation would do that.
max_race_opportunity_window = 4
start = time.monotonic()
while True:
post_stat = future_layer_path.stat()
assert (
pre_stat.st_mtime == post_stat.st_mtime
), "observed PUT overtake the stucked DELETE => bug isn't fixed yet"
if time.monotonic() - start > max_race_opportunity_window:
log.info(
"a correct implementation would never let the later PUT overtake the earlier DELETE"
)
break
time.sleep(1)
# Window has passed, unstuck the delete, let upload queue drain.
log.info("unstuck the DELETE")
ps_http.configure_failpoints(("before-delete-layer-pausable", "off"))
wait_for_upload_queue_empty(ps_http, tenant_id, timeline_id)
# Examine the resulting S3 state.
log.info("integrity-check the remote storage")
ip = get_index_part()
for layer_file_name in ip.layer_metadata.keys():
layer_path = env.pageserver_remote_storage.layer_path(
tenant_id, timeline_id, layer_file_name
)
assert layer_path.exists(), f"{layer_file_name.to_str()}"
log.info("assert that the overwritten layer won")
final_stat = future_layer_path.stat()
assert final_stat.st_mtime != pre_stat.st_mtime