fix(scrubber): more robust metadata consistency checks (#8344)

Part of #8128.

## Problem

Scrubber uses `scan_metadata` command to flag metadata inconsistencies.
To trust it at scale, we need to make sure the errors we emit is a
reflection of real scenario. One check performed in the scrubber is to
see whether layers listed in the latest `index_part.json` is present in
object listing. Currently, the scrubber does not robustly handle the
case where objects are uploaded/deleted during the scan.

## Summary of changes

**Condition for success:** An object in the index is (1) in the object
listing we acquire from S3 or (2) found in a HeadObject request (new
object).

- Add in the `HeadObject` requests for the layers missing from the
object listing.
- Keep the order of first getting the object listing and then
downloading the layers.
- Update check to only consider shards with highest shard count.
- Skip analyzing a timeline if `deleted_at` tombstone is marked in
`index_part.json`.
- Add new test to see if scrubber actually detect the metadata
inconsistency.

_Misc_

- A timeline with no ancestor should always have some layers.
- Removed experimental histograms

_Caveat_

- Ancestor layer is not cleaned until #8308 is implemented. If ancestor
layers reference non-existing layers in the index, the scrubber will
emit false positives.

Signed-off-by: Yuchen Liang <yuchen@neon.tech>
This commit is contained in:
Yuchen Liang
2024-07-22 09:53:33 -04:00
committed by GitHub
parent 204bb8faa3
commit 595c450036
12 changed files with 251 additions and 148 deletions

11
Cargo.lock generated
View File

@@ -2384,16 +2384,6 @@ version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fe2267d4ed49bc07b63801559be28c718ea06c4738b7a03c94df7386d2cde46"
[[package]]
name = "histogram"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e673d137229619d5c2c8903b6ed5852b43636c0017ff2e66b1aafb8ccf04b80b"
dependencies = [
"serde",
"thiserror",
]
[[package]]
name = "hmac"
version = "0.12.1"
@@ -5847,7 +5837,6 @@ dependencies = [
"futures",
"futures-util",
"hex",
"histogram",
"humantime",
"itertools",
"once_cell",

View File

@@ -49,6 +49,7 @@ pub struct TenantShardId {
impl ShardCount {
pub const MAX: Self = Self(u8::MAX);
pub const MIN: Self = Self(0);
/// The internal value of a ShardCount may be zero, which means "1 shard, but use
/// legacy format for TenantShardId that excludes the shard suffix", also known

View File

@@ -463,7 +463,7 @@ impl LayerMap {
pub(self) fn insert_historic_noflush(&mut self, layer_desc: PersistentLayerDesc) {
// TODO: See #3869, resulting #4088, attempted fix and repro #4094
if Self::is_l0(&layer_desc) {
if Self::is_l0(&layer_desc.key_range) {
self.l0_delta_layers.push(layer_desc.clone().into());
}
@@ -482,7 +482,7 @@ impl LayerMap {
self.historic
.remove(historic_layer_coverage::LayerKey::from(layer_desc));
let layer_key = layer_desc.key();
if Self::is_l0(layer_desc) {
if Self::is_l0(&layer_desc.key_range) {
let len_before = self.l0_delta_layers.len();
let mut l0_delta_layers = std::mem::take(&mut self.l0_delta_layers);
l0_delta_layers.retain(|other| other.key() != layer_key);
@@ -598,8 +598,9 @@ impl LayerMap {
coverage
}
pub fn is_l0(layer: &PersistentLayerDesc) -> bool {
layer.get_key_range() == (Key::MIN..Key::MAX)
/// Check if the key range resembles that of an L0 layer.
pub fn is_l0(key_range: &Range<Key>) -> bool {
key_range == &(Key::MIN..Key::MAX)
}
/// This function determines which layers are counted in `count_deltas`:
@@ -626,7 +627,7 @@ impl LayerMap {
/// than just the current partition_range.
pub fn is_reimage_worthy(layer: &PersistentLayerDesc, partition_range: &Range<Key>) -> bool {
// Case 1
if !Self::is_l0(layer) {
if !Self::is_l0(&layer.key_range) {
return true;
}

View File

@@ -1298,7 +1298,7 @@ impl LayerInner {
lsn_end: lsn_range.end,
remote: !resident,
access_stats,
l0: crate::tenant::layer_map::LayerMap::is_l0(self.layer_desc()),
l0: crate::tenant::layer_map::LayerMap::is_l0(&self.layer_desc().key_range),
}
} else {
let lsn = self.desc.image_layer_lsn();

View File

@@ -248,6 +248,14 @@ impl LayerName {
Image(_) => "image",
}
}
/// Gets the key range encoded in the layer name.
pub fn key_range(&self) -> &Range<Key> {
match &self {
LayerName::Image(layer) => &layer.key_range,
LayerName::Delta(layer) => &layer.key_range,
}
}
}
impl fmt::Display for LayerName {

View File

@@ -4868,7 +4868,7 @@ impl Timeline {
// for compact_level0_phase1 creating an L0, which does not happen in practice
// because we have not implemented L0 => L0 compaction.
duplicated_layers.insert(l.layer_desc().key());
} else if LayerMap::is_l0(l.layer_desc()) {
} else if LayerMap::is_l0(&l.layer_desc().key_range) {
bail!("compaction generates a L0 layer file as output, which will cause infinite compaction.");
} else {
insert_layers.push(l.clone());

View File

@@ -49,6 +49,5 @@ tracing.workspace = true
tracing-subscriber.workspace = true
clap.workspace = true
tracing-appender = "0.2"
histogram = "0.7"
futures.workspace = true

View File

@@ -2,6 +2,7 @@ use std::collections::{HashMap, HashSet};
use anyhow::Context;
use aws_sdk_s3::Client;
use pageserver::tenant::layer_map::LayerMap;
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
use pageserver_api::shard::ShardIndex;
use tracing::{error, info, warn};
@@ -12,7 +13,7 @@ use crate::cloud_admin_api::BranchData;
use crate::metadata_stream::stream_listing;
use crate::{download_object_with_retries, RootTarget, TenantShardTimelineId};
use futures_util::StreamExt;
use pageserver::tenant::remote_timeline_client::parse_remote_index_path;
use pageserver::tenant::remote_timeline_client::{parse_remote_index_path, remote_layer_path};
use pageserver::tenant::storage_layer::LayerName;
use pageserver::tenant::IndexPart;
use remote_storage::RemotePath;
@@ -41,7 +42,9 @@ impl TimelineAnalysis {
}
}
pub(crate) fn branch_cleanup_and_check_errors(
pub(crate) async fn branch_cleanup_and_check_errors(
s3_client: &Client,
target: &RootTarget,
id: &TenantShardTimelineId,
tenant_objects: &mut TenantObjectListing,
s3_active_branch: Option<&BranchData>,
@@ -85,15 +88,17 @@ pub(crate) fn branch_cleanup_and_check_errors(
}
if &index_part.version() != IndexPart::KNOWN_VERSIONS.last().unwrap() {
result.warnings.push(format!(
info!(
"index_part.json version is not latest: {}",
index_part.version()
))
);
}
if index_part.metadata.disk_consistent_lsn()
!= index_part.duplicated_disk_consistent_lsn()
{
// Tech debt: let's get rid of one of these, they are redundant
// https://github.com/neondatabase/neon/issues/8343
result.errors.push(format!(
"Mismatching disk_consistent_lsn in TimelineMetadata ({}) and in the index_part ({})",
index_part.metadata.disk_consistent_lsn(),
@@ -102,8 +107,16 @@ pub(crate) fn branch_cleanup_and_check_errors(
}
if index_part.layer_metadata.is_empty() {
// not an error, can happen for branches with zero writes, but notice that
info!("index_part.json has no layers");
if index_part.metadata.ancestor_timeline().is_none() {
// The initial timeline with no ancestor should ALWAYS have layers.
result.errors.push(
"index_part.json has no layers (ancestor_timeline=None)"
.to_string(),
);
} else {
// Not an error, can happen for branches with zero writes, but notice that
info!("index_part.json has no layers (ancestor_timeline exists)");
}
}
for (layer, metadata) in index_part.layer_metadata {
@@ -114,16 +127,41 @@ pub(crate) fn branch_cleanup_and_check_errors(
}
if !tenant_objects.check_ref(id.timeline_id, &layer, &metadata) {
// FIXME: this will emit false positives if an index was
// uploaded concurrently with our scan. To make this check
// correct, we need to try sending a HEAD request for the
// layer we think is missing.
result.errors.push(format!(
"index_part.json contains a layer {}{} (shard {}) that is not present in remote storage",
layer,
metadata.generation.get_suffix(),
metadata.shard
))
let path = remote_layer_path(
&id.tenant_shard_id.tenant_id,
&id.timeline_id,
metadata.shard,
&layer,
metadata.generation,
);
// HEAD request used here to address a race condition when an index was uploaded concurrently
// with our scan. We check if the object is uploaded to S3 after taking the listing snapshot.
let response = s3_client
.head_object()
.bucket(target.bucket_name())
.key(path.get_path().as_str())
.send()
.await;
if response.is_err() {
// Object is not present.
let is_l0 = LayerMap::is_l0(layer.key_range());
let msg = format!(
"index_part.json contains a layer {}{} (shard {}) that is not present in remote storage (layer_is_l0: {})",
layer,
metadata.generation.get_suffix(),
metadata.shard,
is_l0,
);
if is_l0 {
result.warnings.push(msg);
} else {
result.errors.push(msg);
}
}
}
}
}
@@ -303,6 +341,9 @@ pub(crate) async fn list_timeline_blobs(
tracing::debug!("initdb archive {key}");
initdb_archive = true;
}
Some("initdb-preserved.tar.zst") => {
tracing::info!("initdb archive preserved {key}");
}
Some(maybe_layer_name) => match parse_layer_object_name(maybe_layer_name) {
Ok((new_layer, gen)) => {
tracing::debug!("Parsed layer key: {} {:?}", new_layer, gen);

View File

@@ -8,12 +8,11 @@ use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId};
use aws_sdk_s3::Client;
use futures_util::{StreamExt, TryStreamExt};
use histogram::Histogram;
use pageserver::tenant::remote_timeline_client::remote_layer_path;
use pageserver::tenant::IndexPart;
use pageserver_api::shard::TenantShardId;
use serde::Serialize;
use utils::id::TenantId;
use utils::shard::ShardCount;
#[derive(Serialize)]
pub struct MetadataSummary {
@@ -24,66 +23,6 @@ pub struct MetadataSummary {
with_warnings: HashSet<TenantShardTimelineId>,
with_orphans: HashSet<TenantShardTimelineId>,
indices_by_version: HashMap<usize, usize>,
layer_count: MinMaxHisto,
timeline_size_bytes: MinMaxHisto,
layer_size_bytes: MinMaxHisto,
}
/// A histogram plus minimum and maximum tracking
#[derive(Serialize)]
struct MinMaxHisto {
#[serde(skip)]
histo: Histogram,
min: u64,
max: u64,
}
impl MinMaxHisto {
fn new() -> Self {
Self {
histo: histogram::Histogram::builder()
.build()
.expect("Bad histogram params"),
min: u64::MAX,
max: 0,
}
}
fn sample(&mut self, v: u64) -> Result<(), histogram::Error> {
self.min = std::cmp::min(self.min, v);
self.max = std::cmp::max(self.max, v);
let r = self.histo.increment(v, 1);
if r.is_err() {
tracing::warn!("Bad histogram sample: {v}");
}
r
}
fn oneline(&self) -> String {
let percentiles = match self.histo.percentiles(&[1.0, 10.0, 50.0, 90.0, 99.0]) {
Ok(p) => p,
Err(e) => return format!("No data: {}", e),
};
let percentiles: Vec<u64> = percentiles
.iter()
.map(|p| p.bucket().low() + p.bucket().high() / 2)
.collect();
format!(
"min {}, 1% {}, 10% {}, 50% {}, 90% {}, 99% {}, max {}",
self.min,
percentiles[0],
percentiles[1],
percentiles[2],
percentiles[3],
percentiles[4],
self.max,
)
}
}
impl MetadataSummary {
@@ -96,25 +35,9 @@ impl MetadataSummary {
with_warnings: HashSet::new(),
with_orphans: HashSet::new(),
indices_by_version: HashMap::new(),
layer_count: MinMaxHisto::new(),
timeline_size_bytes: MinMaxHisto::new(),
layer_size_bytes: MinMaxHisto::new(),
}
}
fn update_histograms(&mut self, index_part: &IndexPart) -> Result<(), histogram::Error> {
self.layer_count
.sample(index_part.layer_metadata.len() as u64)?;
let mut total_size: u64 = 0;
for meta in index_part.layer_metadata.values() {
total_size += meta.file_size;
self.layer_size_bytes.sample(meta.file_size)?;
}
self.timeline_size_bytes.sample(total_size)?;
Ok(())
}
fn update_data(&mut self, data: &S3TimelineBlobData) {
self.timeline_shard_count += 1;
if let BlobDataParseResult::Parsed {
@@ -127,14 +50,6 @@ impl MetadataSummary {
.indices_by_version
.entry(index_part.version())
.or_insert(0) += 1;
if let Err(e) = self.update_histograms(index_part) {
// Value out of range? Warn that the results are untrustworthy
tracing::warn!(
"Error updating histograms, summary stats may be wrong: {}",
e
);
}
}
}
@@ -169,9 +84,6 @@ With errors: {}
With warnings: {}
With orphan layers: {}
Index versions: {version_summary}
Timeline size bytes: {}
Layer size bytes: {}
Timeline layer count: {}
",
self.tenant_count,
self.timeline_count,
@@ -179,9 +91,6 @@ Timeline layer count: {}
self.with_errors.len(),
self.with_warnings.len(),
self.with_orphans.len(),
self.timeline_size_bytes.oneline(),
self.layer_size_bytes.oneline(),
self.layer_count.oneline(),
)
}
@@ -235,33 +144,60 @@ pub async fn scan_metadata(
let mut tenant_objects = TenantObjectListing::default();
let mut tenant_timeline_results = Vec::new();
fn analyze_tenant(
async fn analyze_tenant(
s3_client: &Client,
target: &RootTarget,
tenant_id: TenantId,
summary: &mut MetadataSummary,
mut tenant_objects: TenantObjectListing,
timelines: Vec<(TenantShardTimelineId, S3TimelineBlobData)>,
highest_shard_count: ShardCount,
) {
summary.tenant_count += 1;
let mut timeline_ids = HashSet::new();
let mut timeline_generations = HashMap::new();
for (ttid, data) in timelines {
timeline_ids.insert(ttid.timeline_id);
// Stash the generation of each timeline, for later use identifying orphan layers
if let BlobDataParseResult::Parsed {
index_part: _index_part,
index_part_generation,
s3_layers: _s3_layers,
} = &data.blob_data
{
timeline_generations.insert(ttid, *index_part_generation);
}
if ttid.tenant_shard_id.shard_count == highest_shard_count {
// Only analyze `TenantShardId`s with highest shard count.
// Apply checks to this timeline shard's metadata, and in the process update `tenant_objects`
// reference counts for layers across the tenant.
let analysis =
branch_cleanup_and_check_errors(&ttid, &mut tenant_objects, None, None, Some(data));
summary.update_analysis(&ttid, &analysis);
// Stash the generation of each timeline, for later use identifying orphan layers
if let BlobDataParseResult::Parsed {
index_part,
index_part_generation,
s3_layers: _s3_layers,
} = &data.blob_data
{
if index_part.deleted_at.is_some() {
// skip deleted timeline.
tracing::info!("Skip analysis of {} b/c timeline is already deleted", ttid);
continue;
}
timeline_generations.insert(ttid, *index_part_generation);
}
// Apply checks to this timeline shard's metadata, and in the process update `tenant_objects`
// reference counts for layers across the tenant.
let analysis = branch_cleanup_and_check_errors(
s3_client,
target,
&ttid,
&mut tenant_objects,
None,
None,
Some(data),
)
.await;
summary.update_analysis(&ttid, &analysis);
timeline_ids.insert(ttid.timeline_id);
} else {
tracing::info!(
"Skip analysis of {} b/c a lower shard count than {}",
ttid,
highest_shard_count.0,
);
}
}
summary.timeline_count += timeline_ids.len();
@@ -309,18 +245,35 @@ pub async fn scan_metadata(
// all results for the same tenant will be adjacent. We accumulate these,
// and then call `analyze_tenant` to flush, when we see the next tenant ID.
let mut summary = MetadataSummary::new();
let mut highest_shard_count = ShardCount::MIN;
while let Some(i) = timelines.next().await {
let (ttid, data) = i?;
summary.update_data(&data);
match tenant_id {
None => tenant_id = Some(ttid.tenant_shard_id.tenant_id),
None => {
tenant_id = Some(ttid.tenant_shard_id.tenant_id);
highest_shard_count = highest_shard_count.max(ttid.tenant_shard_id.shard_count);
}
Some(prev_tenant_id) => {
if prev_tenant_id != ttid.tenant_shard_id.tenant_id {
// New tenant: analyze this tenant's timelines, clear accumulated tenant_timeline_results
let tenant_objects = std::mem::take(&mut tenant_objects);
let timelines = std::mem::take(&mut tenant_timeline_results);
analyze_tenant(prev_tenant_id, &mut summary, tenant_objects, timelines);
analyze_tenant(
&s3_client,
&target,
prev_tenant_id,
&mut summary,
tenant_objects,
timelines,
highest_shard_count,
)
.await;
tenant_id = Some(ttid.tenant_shard_id.tenant_id);
highest_shard_count = ttid.tenant_shard_id.shard_count;
} else {
highest_shard_count = highest_shard_count.max(ttid.tenant_shard_id.shard_count);
}
}
}
@@ -338,11 +291,15 @@ pub async fn scan_metadata(
if !tenant_timeline_results.is_empty() {
analyze_tenant(
&s3_client,
&target,
tenant_id.expect("Must be set if results are present"),
&mut summary,
tenant_objects,
tenant_timeline_results,
);
highest_shard_count,
)
.await;
}
Ok(summary)

View File

@@ -143,6 +143,9 @@ class TimelineId(Id):
def __repr__(self) -> str:
return f'TimelineId("{self.id.hex()}")'
def __str__(self) -> str:
return self.id.hex()
# Workaround for compat with python 3.9, which does not have `typing.Self`
TTenantShardId = TypeVar("TTenantShardId", bound="TenantShardId")

View File

@@ -12,8 +12,9 @@ import boto3
import toml
from mypy_boto3_s3 import S3Client
from fixtures.common_types import TenantId, TimelineId
from fixtures.common_types import TenantId, TenantShardId, TimelineId
from fixtures.log_helper import log
from fixtures.pageserver.common_types import IndexPartDump
TIMELINE_INDEX_PART_FILE_NAME = "index_part.json"
TENANT_HEATMAP_FILE_NAME = "heatmap-v1.json"
@@ -265,9 +266,38 @@ class S3Storage:
def tenants_path(self) -> str:
return f"{self.prefix_in_bucket}/tenants"
def tenant_path(self, tenant_id: TenantId) -> str:
def tenant_path(self, tenant_id: Union[TenantShardId, TenantId]) -> str:
return f"{self.tenants_path()}/{tenant_id}"
def timeline_path(
self, tenant_id: Union[TenantShardId, TenantId], timeline_id: TimelineId
) -> str:
return f"{self.tenant_path(tenant_id)}/timelines/{timeline_id}"
def get_latest_index_key(self, index_keys: List[str]) -> str:
"""
Gets the latest index file key.
@param index_keys: A list of index keys of different generations.
"""
def parse_gen(index_key: str) -> int:
parts = index_key.split("index_part.json-")
return int(parts[-1], base=16) if len(parts) == 2 else -1
return max(index_keys, key=parse_gen)
def download_index_part(self, index_key: str) -> IndexPartDump:
"""
Downloads the index content from remote storage.
@param index_key: index key in remote storage.
"""
response = self.client.get_object(Bucket=self.bucket_name, Key=index_key)
body = response["Body"].read().decode("utf-8")
log.info(f"index_part.json: {body}")
return IndexPartDump.from_json(json.loads(body))
def heatmap_key(self, tenant_id: TenantId) -> str:
return f"{self.tenant_path(tenant_id)}/{TENANT_HEATMAP_FILE_NAME}"

View File

@@ -1,4 +1,5 @@
import os
import pprint
import shutil
import threading
import time
@@ -373,3 +374,76 @@ def test_scrubber_physical_gc_ancestors_split(neon_env_builder: NeonEnvBuilder):
assert gc_output["ancestor_layers_deleted"] > 0
assert gc_output["remote_storage_errors"] == 0
assert gc_output["controller_api_errors"] == 0
@pytest.mark.parametrize("shard_count", [None, 4])
def test_scrubber_scan_pageserver_metadata(
neon_env_builder: NeonEnvBuilder, shard_count: Optional[int]
):
"""
Create some layers. Delete an object listed in index. Run scrubber and see if it detects the defect.
"""
# Use s3_storage so we could test out scrubber.
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.num_pageservers = shard_count if shard_count is not None else 1
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)
# Create some layers.
workload = Workload(env, env.initial_tenant, env.initial_timeline)
workload.init()
for _ in range(3):
workload.write_rows(128)
for pageserver in env.pageservers:
pageserver.stop()
pageserver.start()
for _ in range(3):
workload.write_rows(128)
# Get the latest index for a particular timeline.
tenant_shard_id = TenantShardId(env.initial_tenant, 0, shard_count if shard_count else 0)
assert isinstance(env.pageserver_remote_storage, S3Storage)
timeline_path = env.pageserver_remote_storage.timeline_path(
tenant_shard_id, env.initial_timeline
)
client = env.pageserver_remote_storage.client
bucket = env.pageserver_remote_storage.bucket_name
objects = client.list_objects_v2(Bucket=bucket, Prefix=f"{timeline_path}/", Delimiter="").get(
"Contents", []
)
keys = [obj["Key"] for obj in objects]
index_keys = list(filter(lambda s: s.startswith(f"{timeline_path}/index_part"), keys))
assert len(index_keys) > 0
latest_index_key = env.pageserver_remote_storage.get_latest_index_key(index_keys)
log.info(f"{latest_index_key=}")
index = env.pageserver_remote_storage.download_index_part(latest_index_key)
assert len(index.layer_metadata) > 0
it = iter(index.layer_metadata.items())
scan_summary = env.storage_scrubber.scan_metadata()
assert not scan_summary["with_warnings"]
assert not scan_summary["with_errors"]
# Delete a layer file that is listed in the index.
layer, metadata = next(it)
log.info(f"Deleting {timeline_path}/{layer.to_str()}")
delete_response = client.delete_object(
Bucket=bucket,
Key=f"{timeline_path}/{layer.to_str()}-{metadata.generation:08x}",
)
log.info(f"delete response: {delete_response}")
# Check scan summary. Expect it to be a L0 layer so only emit warnings.
scan_summary = env.storage_scrubber.scan_metadata()
log.info(f"{pprint.pformat(scan_summary)}")
assert len(scan_summary["with_warnings"]) > 0