Add safekeeper scrubber test (#8785)

The test is very rudimentary, it only checks that before and after
tenant deletion, we can run `scan_metadata` for the safekeeper node
kind. Also, we don't actually expect any uploaded data, for that we
don't have enough WAL (needs to create at least one S3-uploaded file,
the scrubber doesn't recognize partial files yet).

The `scan_metadata` scrubber subcommand is extended to support either
specifying a database connection string, which was previously the only
way, and required a database to be present, or specifying the timeline
information manually via json. This is ideal for testing scenarios
because in those, the number of timelines is usually limited,
but it is involved to spin up a database just to write the timeline
information.
This commit is contained in:
Arpad Müller
2024-08-31 01:12:25 +02:00
committed by GitHub
parent 05caaab850
commit 3ec785f30d
4 changed files with 213 additions and 73 deletions

View File

@@ -1,4 +1,4 @@
use anyhow::{anyhow, bail};
use anyhow::{anyhow, bail, Context};
use camino::Utf8PathBuf;
use pageserver_api::controller_api::{MetadataHealthUpdateRequest, MetadataHealthUpdateResponse};
use pageserver_api::shard::TenantShardId;
@@ -7,6 +7,7 @@ use storage_controller_client::control_api;
use storage_scrubber::garbage::{find_garbage, purge_garbage, PurgeMode};
use storage_scrubber::pageserver_physical_gc::GcMode;
use storage_scrubber::scan_pageserver_metadata::scan_pageserver_metadata;
use storage_scrubber::scan_safekeeper_metadata::DatabaseOrList;
use storage_scrubber::tenant_snapshot::SnapshotDownloader;
use storage_scrubber::{find_large_objects, ControllerClientConfig};
use storage_scrubber::{
@@ -76,6 +77,9 @@ enum Command {
/// For safekeeper node_kind only, table in the db with debug dump
#[arg(long, default_value = None)]
dump_db_table: Option<String>,
/// For safekeeper node_kind only, json list of timelines and their lsn info
#[arg(long, default_value = None)]
timeline_lsns: Option<String>,
},
TenantSnapshot {
#[arg(long = "tenant-id")]
@@ -155,20 +159,22 @@ async fn main() -> anyhow::Result<()> {
post_to_storcon,
dump_db_connstr,
dump_db_table,
timeline_lsns,
} => {
if let NodeKind::Safekeeper = node_kind {
let dump_db_connstr =
dump_db_connstr.ok_or(anyhow::anyhow!("dump_db_connstr not specified"))?;
let dump_db_table =
dump_db_table.ok_or(anyhow::anyhow!("dump_db_table not specified"))?;
let summary = scan_safekeeper_metadata(
bucket_config.clone(),
tenant_ids.iter().map(|tshid| tshid.tenant_id).collect(),
dump_db_connstr,
dump_db_table,
)
.await?;
let db_or_list = match (timeline_lsns, dump_db_connstr) {
(Some(timeline_lsns), _) => {
let timeline_lsns = serde_json::from_str(&timeline_lsns).context("parsing timeline_lsns")?;
DatabaseOrList::List(timeline_lsns)
}
(None, Some(dump_db_connstr)) => {
let dump_db_table = dump_db_table.ok_or_else(|| anyhow::anyhow!("dump_db_table not specified"))?;
let tenant_ids = tenant_ids.iter().map(|tshid| tshid.tenant_id).collect();
DatabaseOrList::Database { tenant_ids, connstr: dump_db_connstr, table: dump_db_table }
}
(None, None) => anyhow::bail!("neither `timeline_lsns` specified, nor `dump_db_connstr` and `dump_db_table`"),
};
let summary = scan_safekeeper_metadata(bucket_config.clone(), db_or_list).await?;
if json {
println!("{}", serde_json::to_string(&summary).unwrap())
} else {

View File

@@ -7,7 +7,7 @@ use postgres_ffi::{XLogFileName, PG_TLI};
use remote_storage::GenericRemoteStorage;
use serde::Serialize;
use tokio_postgres::types::PgLsn;
use tracing::{error, info, trace};
use tracing::{debug, error, info};
use utils::{
id::{TenantId, TenantTimelineId, TimelineId},
lsn::Lsn,
@@ -54,6 +54,23 @@ impl MetadataSummary {
}
}
#[derive(serde::Deserialize)]
pub struct TimelineLsnData {
tenant_id: String,
timeline_id: String,
timeline_start_lsn: Lsn,
backup_lsn: Lsn,
}
pub enum DatabaseOrList {
Database {
tenant_ids: Vec<TenantId>,
connstr: String,
table: String,
},
List(Vec<TimelineLsnData>),
}
/// Scan the safekeeper metadata in an S3 bucket, reporting errors and
/// statistics.
///
@@ -63,68 +80,39 @@ impl MetadataSummary {
/// the project wasn't deleted in the meanwhile.
pub async fn scan_safekeeper_metadata(
bucket_config: BucketConfig,
tenant_ids: Vec<TenantId>,
dump_db_connstr: String,
dump_db_table: String,
db_or_list: DatabaseOrList,
) -> anyhow::Result<MetadataSummary> {
info!(
"checking bucket {}, region {}, dump_db_table {}",
bucket_config.bucket, bucket_config.region, dump_db_table
"checking bucket {}, region {}",
bucket_config.bucket, bucket_config.region
);
// Use rustls (Neon requires TLS)
let root_store = TLS_ROOTS.get_or_try_init(load_certs)?.clone();
let client_config = rustls::ClientConfig::builder()
.with_root_certificates(root_store)
.with_no_client_auth();
let tls_connector = tokio_postgres_rustls::MakeRustlsConnect::new(client_config);
let (client, connection) = tokio_postgres::connect(&dump_db_connstr, tls_connector).await?;
// The connection object performs the actual communication with the database,
// so spawn it off to run on its own.
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});
let tenant_filter_clause = if !tenant_ids.is_empty() {
format!(
"and tenant_id in ({})",
tenant_ids
.iter()
.map(|t| format!("'{}'", t))
.collect::<Vec<_>>()
.join(", ")
)
} else {
"".to_owned()
};
let query = format!(
"select tenant_id, timeline_id, min(timeline_start_lsn), max(backup_lsn) from \"{}\" where not is_cancelled {} group by tenant_id, timeline_id;",
dump_db_table, tenant_filter_clause,
);
info!("query is {}", query);
let timelines = client.query(&query, &[]).await?;
info!("loaded {} timelines", timelines.len());
let (remote_client, target) = init_remote(bucket_config, NodeKind::Safekeeper).await?;
let console_config = ConsoleConfig::from_env()?;
let cloud_admin_api_client = CloudAdminApiClient::new(console_config);
let checks = futures::stream::iter(timelines.iter().map(Ok)).map_ok(|row| {
let tenant_id = TenantId::from_str(row.get(0)).expect("failed to parse tenant_id");
let timeline_id = TimelineId::from_str(row.get(1)).expect("failed to parse tenant_id");
let timeline_start_lsn_pg: PgLsn = row.get(2);
let timeline_start_lsn: Lsn = Lsn(u64::from(timeline_start_lsn_pg));
let backup_lsn_pg: PgLsn = row.get(3);
let backup_lsn: Lsn = Lsn(u64::from(backup_lsn_pg));
let timelines = match db_or_list {
DatabaseOrList::Database {
tenant_ids,
connstr,
table,
} => load_timelines_from_db(tenant_ids, connstr, table).await?,
DatabaseOrList::List(list) => list,
};
info!("loaded {} timelines", timelines.len());
let checks = futures::stream::iter(timelines.into_iter().map(Ok)).map_ok(|timeline| {
let tenant_id = TenantId::from_str(&timeline.tenant_id).expect("failed to parse tenant_id");
let timeline_id =
TimelineId::from_str(&timeline.timeline_id).expect("failed to parse tenant_id");
let ttid = TenantTimelineId::new(tenant_id, timeline_id);
check_timeline(
&remote_client,
&target,
&cloud_admin_api_client,
ttid,
timeline_start_lsn,
backup_lsn,
timeline.timeline_start_lsn,
timeline.backup_lsn,
)
});
// Run multiple check_timeline's concurrently.
@@ -163,11 +151,9 @@ async fn check_timeline(
timeline_start_lsn: Lsn,
backup_lsn: Lsn,
) -> anyhow::Result<TimelineCheckResult> {
trace!(
debug!(
"checking ttid {}, should contain WAL [{}-{}]",
ttid,
timeline_start_lsn,
backup_lsn
ttid, timeline_start_lsn, backup_lsn
);
// calculate expected segfiles
let expected_first_segno = timeline_start_lsn.segment_number(WAL_SEGSIZE);
@@ -177,7 +163,7 @@ async fn check_timeline(
.map(|segno| XLogFileName(PG_TLI, segno, WAL_SEGSIZE)),
);
let expected_files_num = expected_segfiles.len();
trace!("expecting {} files", expected_segfiles.len(),);
debug!("expecting {} files", expected_segfiles.len(),);
// now list s3 and check if it misses something
let ttshid =
@@ -252,3 +238,65 @@ fn load_certs() -> Result<Arc<rustls::RootCertStore>, std::io::Error> {
Ok(Arc::new(store))
}
static TLS_ROOTS: OnceCell<Arc<rustls::RootCertStore>> = OnceCell::new();
async fn load_timelines_from_db(
tenant_ids: Vec<TenantId>,
dump_db_connstr: String,
dump_db_table: String,
) -> anyhow::Result<Vec<TimelineLsnData>> {
info!("loading from table {dump_db_table}");
// Use rustls (Neon requires TLS)
let root_store = TLS_ROOTS.get_or_try_init(load_certs)?.clone();
let client_config = rustls::ClientConfig::builder()
.with_root_certificates(root_store)
.with_no_client_auth();
let tls_connector = tokio_postgres_rustls::MakeRustlsConnect::new(client_config);
let (client, connection) = tokio_postgres::connect(&dump_db_connstr, tls_connector).await?;
// The connection object performs the actual communication with the database,
// so spawn it off to run on its own.
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});
let tenant_filter_clause = if !tenant_ids.is_empty() {
format!(
"and tenant_id in ({})",
tenant_ids
.iter()
.map(|t| format!("'{}'", t))
.collect::<Vec<_>>()
.join(", ")
)
} else {
"".to_owned()
};
let query = format!(
"select tenant_id, timeline_id, min(timeline_start_lsn), max(backup_lsn) \
from \"{dump_db_table}\" \
where not is_cancelled {tenant_filter_clause} \
group by tenant_id, timeline_id;"
);
info!("query is {}", query);
let timelines = client.query(&query, &[]).await?;
let timelines = timelines
.into_iter()
.map(|row| {
let tenant_id = row.get(0);
let timeline_id = row.get(1);
let timeline_start_lsn_pg: PgLsn = row.get(2);
let backup_lsn_pg: PgLsn = row.get(3);
TimelineLsnData {
tenant_id,
timeline_id,
timeline_start_lsn: Lsn(u64::from(timeline_start_lsn_pg)),
backup_lsn: Lsn(u64::from(backup_lsn_pg)),
}
})
.collect::<Vec<TimelineLsnData>>();
Ok(timelines)
}

View File

@@ -4625,12 +4625,20 @@ class Safekeeper(LogUtils):
wait_until(20, 0.5, paused)
# TODO: Replace with `StrEnum` when we upgrade to python 3.11
class NodeKind(str, Enum):
PAGESERVER = "pageserver"
SAFEKEEPER = "safekeeper"
class StorageScrubber:
def __init__(self, env: NeonEnv, log_dir: Path):
self.env = env
self.log_dir = log_dir
def scrubber_cli(self, args: list[str], timeout) -> str:
def scrubber_cli(
self, args: list[str], timeout, extra_env: Optional[Dict[str, str]] = None
) -> str:
assert isinstance(self.env.pageserver_remote_storage, S3Storage)
s3_storage = self.env.pageserver_remote_storage
@@ -4645,6 +4653,9 @@ class StorageScrubber:
if s3_storage.endpoint is not None:
env.update({"AWS_ENDPOINT_URL": s3_storage.endpoint})
if extra_env is not None:
env.update(extra_env)
base_args = [
str(self.env.neon_binpath / "storage_scrubber"),
f"--controller-api={self.env.storage_controller.api_root()}",
@@ -4672,18 +4683,43 @@ class StorageScrubber:
assert stdout is not None
return stdout
def scan_metadata(self, post_to_storage_controller: bool = False) -> Tuple[bool, Any]:
def scan_metadata_safekeeper(
self,
timeline_lsns: List[Dict[str, Any]],
cloud_admin_api_url: str,
cloud_admin_api_token: str,
) -> Tuple[bool, Any]:
extra_env = {
"CLOUD_ADMIN_API_URL": cloud_admin_api_url,
"CLOUD_ADMIN_API_TOKEN": cloud_admin_api_token,
}
return self.scan_metadata(
node_kind=NodeKind.SAFEKEEPER, timeline_lsns=timeline_lsns, extra_env=extra_env
)
def scan_metadata(
self,
post_to_storage_controller: bool = False,
node_kind: NodeKind = NodeKind.PAGESERVER,
timeline_lsns: Optional[List[Dict[str, Any]]] = None,
extra_env: Optional[Dict[str, str]] = None,
) -> Tuple[bool, Any]:
"""
Returns the health status and the metadata summary.
"""
args = ["scan-metadata", "--node-kind", "pageserver", "--json"]
args = ["scan-metadata", "--node-kind", node_kind.value, "--json"]
if post_to_storage_controller:
args.append("--post")
stdout = self.scrubber_cli(args, timeout=30)
if timeline_lsns is not None:
args.append("--timeline-lsns")
args.append(json.dumps(timeline_lsns))
stdout = self.scrubber_cli(args, timeout=30, extra_env=extra_env)
try:
summary = json.loads(stdout)
healthy = not summary["with_errors"] and not summary["with_warnings"]
# summary does not contain "with_warnings" if node_kind is the safekeeper
no_warnings = "with_warnings" not in summary or not summary["with_warnings"]
healthy = not summary["with_errors"] and no_warnings
return healthy, summary
except:
log.error("Failed to decode JSON output from `scan-metadata`. Dumping stdout:")

View File

@@ -1,7 +1,9 @@
import json
from threading import Thread
import pytest
from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
PgBin,
@@ -17,6 +19,8 @@ from fixtures.pageserver.utils import (
from fixtures.remote_storage import RemoteStorageKind, s3_storage
from fixtures.utils import run_pg_bench_small, wait_until
from requests.exceptions import ReadTimeout
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
def error_tolerant_delete(ps_http, tenant_id):
@@ -322,7 +326,7 @@ def test_tenant_delete_races_timeline_creation(neon_env_builder: NeonEnvBuilder)
env.pageserver.stop()
def test_tenant_delete_scrubber(pg_bin: PgBin, neon_env_builder: NeonEnvBuilder):
def test_tenant_delete_scrubber(pg_bin: PgBin, make_httpserver, neon_env_builder: NeonEnvBuilder):
"""
Validate that creating and then deleting the tenant both survives the scrubber,
and that one can run the scrubber without problems.
@@ -347,6 +351,45 @@ def test_tenant_delete_scrubber(pg_bin: PgBin, neon_env_builder: NeonEnvBuilder)
healthy, _ = env.storage_scrubber.scan_metadata()
assert healthy
timeline_lsns = {
"tenant_id": f"{tenant_id}",
"timeline_id": f"{timeline_id}",
"timeline_start_lsn": f"{last_flush_lsn}",
"backup_lsn": f"{last_flush_lsn}",
}
cloud_admin_url = f"http://{make_httpserver.host}:{make_httpserver.port}/"
cloud_admin_token = ""
def get_branches(request: Request):
# Compare definition with `BranchData` struct
dummy_data = {
"id": "test-branch-id",
"created_at": "", # TODO
"updated_at": "", # TODO
"name": "testbranchname",
"project_id": "test-project-id",
"timeline_id": f"{timeline_id}",
"default": False,
"deleted": False,
"logical_size": 42000,
"physical_size": 42000,
"written_size": 42000,
}
# This test does all its own compute configuration (by passing explicit pageserver ID to Workload functions),
# so we send controller notifications to /dev/null to prevent it fighting the test for control of the compute.
log.info(f"got get_branches request: {request.json}")
return Response(json.dumps(dummy_data), content_type="application/json", status=200)
make_httpserver.expect_request("/branches", method="GET").respond_with_handler(get_branches)
healthy, _ = env.storage_scrubber.scan_metadata_safekeeper(
timeline_lsns=[timeline_lsns],
cloud_admin_api_url=cloud_admin_url,
cloud_admin_api_token=cloud_admin_token,
)
assert healthy
env.start()
ps_http = env.pageserver.http_client()
ps_http.tenant_delete(tenant_id)
@@ -354,3 +397,10 @@ def test_tenant_delete_scrubber(pg_bin: PgBin, neon_env_builder: NeonEnvBuilder)
healthy, _ = env.storage_scrubber.scan_metadata()
assert healthy
healthy, _ = env.storage_scrubber.scan_metadata_safekeeper(
timeline_lsns=[timeline_lsns],
cloud_admin_api_url=cloud_admin_url,
cloud_admin_api_token=cloud_admin_token,
)
assert healthy