diff --git a/storage_scrubber/src/main.rs b/storage_scrubber/src/main.rs index 3935e513e3..c5961753c5 100644 --- a/storage_scrubber/src/main.rs +++ b/storage_scrubber/src/main.rs @@ -1,4 +1,4 @@ -use anyhow::{anyhow, bail}; +use anyhow::{anyhow, bail, Context}; use camino::Utf8PathBuf; use pageserver_api::controller_api::{MetadataHealthUpdateRequest, MetadataHealthUpdateResponse}; use pageserver_api::shard::TenantShardId; @@ -7,6 +7,7 @@ use storage_controller_client::control_api; use storage_scrubber::garbage::{find_garbage, purge_garbage, PurgeMode}; use storage_scrubber::pageserver_physical_gc::GcMode; use storage_scrubber::scan_pageserver_metadata::scan_pageserver_metadata; +use storage_scrubber::scan_safekeeper_metadata::DatabaseOrList; use storage_scrubber::tenant_snapshot::SnapshotDownloader; use storage_scrubber::{find_large_objects, ControllerClientConfig}; use storage_scrubber::{ @@ -76,6 +77,9 @@ enum Command { /// For safekeeper node_kind only, table in the db with debug dump #[arg(long, default_value = None)] dump_db_table: Option, + /// For safekeeper node_kind only, json list of timelines and their lsn info + #[arg(long, default_value = None)] + timeline_lsns: Option, }, TenantSnapshot { #[arg(long = "tenant-id")] @@ -155,20 +159,22 @@ async fn main() -> anyhow::Result<()> { post_to_storcon, dump_db_connstr, dump_db_table, + timeline_lsns, } => { if let NodeKind::Safekeeper = node_kind { - let dump_db_connstr = - dump_db_connstr.ok_or(anyhow::anyhow!("dump_db_connstr not specified"))?; - let dump_db_table = - dump_db_table.ok_or(anyhow::anyhow!("dump_db_table not specified"))?; - - let summary = scan_safekeeper_metadata( - bucket_config.clone(), - tenant_ids.iter().map(|tshid| tshid.tenant_id).collect(), - dump_db_connstr, - dump_db_table, - ) - .await?; + let db_or_list = match (timeline_lsns, dump_db_connstr) { + (Some(timeline_lsns), _) => { + let timeline_lsns = serde_json::from_str(&timeline_lsns).context("parsing timeline_lsns")?; + DatabaseOrList::List(timeline_lsns) + } + (None, Some(dump_db_connstr)) => { + let dump_db_table = dump_db_table.ok_or_else(|| anyhow::anyhow!("dump_db_table not specified"))?; + let tenant_ids = tenant_ids.iter().map(|tshid| tshid.tenant_id).collect(); + DatabaseOrList::Database { tenant_ids, connstr: dump_db_connstr, table: dump_db_table } + } + (None, None) => anyhow::bail!("neither `timeline_lsns` specified, nor `dump_db_connstr` and `dump_db_table`"), + }; + let summary = scan_safekeeper_metadata(bucket_config.clone(), db_or_list).await?; if json { println!("{}", serde_json::to_string(&summary).unwrap()) } else { diff --git a/storage_scrubber/src/scan_safekeeper_metadata.rs b/storage_scrubber/src/scan_safekeeper_metadata.rs index 1a9f3d0ef5..15f3665fac 100644 --- a/storage_scrubber/src/scan_safekeeper_metadata.rs +++ b/storage_scrubber/src/scan_safekeeper_metadata.rs @@ -7,7 +7,7 @@ use postgres_ffi::{XLogFileName, PG_TLI}; use remote_storage::GenericRemoteStorage; use serde::Serialize; use tokio_postgres::types::PgLsn; -use tracing::{error, info, trace}; +use tracing::{debug, error, info}; use utils::{ id::{TenantId, TenantTimelineId, TimelineId}, lsn::Lsn, @@ -54,6 +54,23 @@ impl MetadataSummary { } } +#[derive(serde::Deserialize)] +pub struct TimelineLsnData { + tenant_id: String, + timeline_id: String, + timeline_start_lsn: Lsn, + backup_lsn: Lsn, +} + +pub enum DatabaseOrList { + Database { + tenant_ids: Vec, + connstr: String, + table: String, + }, + List(Vec), +} + /// Scan the safekeeper metadata in an S3 bucket, reporting errors and /// statistics. /// @@ -63,68 +80,39 @@ impl MetadataSummary { /// the project wasn't deleted in the meanwhile. pub async fn scan_safekeeper_metadata( bucket_config: BucketConfig, - tenant_ids: Vec, - dump_db_connstr: String, - dump_db_table: String, + db_or_list: DatabaseOrList, ) -> anyhow::Result { info!( - "checking bucket {}, region {}, dump_db_table {}", - bucket_config.bucket, bucket_config.region, dump_db_table + "checking bucket {}, region {}", + bucket_config.bucket, bucket_config.region ); - // Use rustls (Neon requires TLS) - let root_store = TLS_ROOTS.get_or_try_init(load_certs)?.clone(); - let client_config = rustls::ClientConfig::builder() - .with_root_certificates(root_store) - .with_no_client_auth(); - let tls_connector = tokio_postgres_rustls::MakeRustlsConnect::new(client_config); - let (client, connection) = tokio_postgres::connect(&dump_db_connstr, tls_connector).await?; - // The connection object performs the actual communication with the database, - // so spawn it off to run on its own. - tokio::spawn(async move { - if let Err(e) = connection.await { - eprintln!("connection error: {}", e); - } - }); - - let tenant_filter_clause = if !tenant_ids.is_empty() { - format!( - "and tenant_id in ({})", - tenant_ids - .iter() - .map(|t| format!("'{}'", t)) - .collect::>() - .join(", ") - ) - } else { - "".to_owned() - }; - let query = format!( - "select tenant_id, timeline_id, min(timeline_start_lsn), max(backup_lsn) from \"{}\" where not is_cancelled {} group by tenant_id, timeline_id;", - dump_db_table, tenant_filter_clause, - ); - info!("query is {}", query); - let timelines = client.query(&query, &[]).await?; - info!("loaded {} timelines", timelines.len()); let (remote_client, target) = init_remote(bucket_config, NodeKind::Safekeeper).await?; let console_config = ConsoleConfig::from_env()?; let cloud_admin_api_client = CloudAdminApiClient::new(console_config); - let checks = futures::stream::iter(timelines.iter().map(Ok)).map_ok(|row| { - let tenant_id = TenantId::from_str(row.get(0)).expect("failed to parse tenant_id"); - let timeline_id = TimelineId::from_str(row.get(1)).expect("failed to parse tenant_id"); - let timeline_start_lsn_pg: PgLsn = row.get(2); - let timeline_start_lsn: Lsn = Lsn(u64::from(timeline_start_lsn_pg)); - let backup_lsn_pg: PgLsn = row.get(3); - let backup_lsn: Lsn = Lsn(u64::from(backup_lsn_pg)); + let timelines = match db_or_list { + DatabaseOrList::Database { + tenant_ids, + connstr, + table, + } => load_timelines_from_db(tenant_ids, connstr, table).await?, + DatabaseOrList::List(list) => list, + }; + info!("loaded {} timelines", timelines.len()); + + let checks = futures::stream::iter(timelines.into_iter().map(Ok)).map_ok(|timeline| { + let tenant_id = TenantId::from_str(&timeline.tenant_id).expect("failed to parse tenant_id"); + let timeline_id = + TimelineId::from_str(&timeline.timeline_id).expect("failed to parse tenant_id"); let ttid = TenantTimelineId::new(tenant_id, timeline_id); check_timeline( &remote_client, &target, &cloud_admin_api_client, ttid, - timeline_start_lsn, - backup_lsn, + timeline.timeline_start_lsn, + timeline.backup_lsn, ) }); // Run multiple check_timeline's concurrently. @@ -163,11 +151,9 @@ async fn check_timeline( timeline_start_lsn: Lsn, backup_lsn: Lsn, ) -> anyhow::Result { - trace!( + debug!( "checking ttid {}, should contain WAL [{}-{}]", - ttid, - timeline_start_lsn, - backup_lsn + ttid, timeline_start_lsn, backup_lsn ); // calculate expected segfiles let expected_first_segno = timeline_start_lsn.segment_number(WAL_SEGSIZE); @@ -177,7 +163,7 @@ async fn check_timeline( .map(|segno| XLogFileName(PG_TLI, segno, WAL_SEGSIZE)), ); let expected_files_num = expected_segfiles.len(); - trace!("expecting {} files", expected_segfiles.len(),); + debug!("expecting {} files", expected_segfiles.len(),); // now list s3 and check if it misses something let ttshid = @@ -252,3 +238,65 @@ fn load_certs() -> Result, std::io::Error> { Ok(Arc::new(store)) } static TLS_ROOTS: OnceCell> = OnceCell::new(); + +async fn load_timelines_from_db( + tenant_ids: Vec, + dump_db_connstr: String, + dump_db_table: String, +) -> anyhow::Result> { + info!("loading from table {dump_db_table}"); + + // Use rustls (Neon requires TLS) + let root_store = TLS_ROOTS.get_or_try_init(load_certs)?.clone(); + let client_config = rustls::ClientConfig::builder() + .with_root_certificates(root_store) + .with_no_client_auth(); + let tls_connector = tokio_postgres_rustls::MakeRustlsConnect::new(client_config); + let (client, connection) = tokio_postgres::connect(&dump_db_connstr, tls_connector).await?; + // The connection object performs the actual communication with the database, + // so spawn it off to run on its own. + tokio::spawn(async move { + if let Err(e) = connection.await { + eprintln!("connection error: {}", e); + } + }); + + let tenant_filter_clause = if !tenant_ids.is_empty() { + format!( + "and tenant_id in ({})", + tenant_ids + .iter() + .map(|t| format!("'{}'", t)) + .collect::>() + .join(", ") + ) + } else { + "".to_owned() + }; + let query = format!( + "select tenant_id, timeline_id, min(timeline_start_lsn), max(backup_lsn) \ + from \"{dump_db_table}\" \ + where not is_cancelled {tenant_filter_clause} \ + group by tenant_id, timeline_id;" + ); + info!("query is {}", query); + let timelines = client.query(&query, &[]).await?; + + let timelines = timelines + .into_iter() + .map(|row| { + let tenant_id = row.get(0); + let timeline_id = row.get(1); + let timeline_start_lsn_pg: PgLsn = row.get(2); + let backup_lsn_pg: PgLsn = row.get(3); + + TimelineLsnData { + tenant_id, + timeline_id, + timeline_start_lsn: Lsn(u64::from(timeline_start_lsn_pg)), + backup_lsn: Lsn(u64::from(backup_lsn_pg)), + } + }) + .collect::>(); + Ok(timelines) +} diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 69a4234617..800ae03d13 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -4625,12 +4625,20 @@ class Safekeeper(LogUtils): wait_until(20, 0.5, paused) +# TODO: Replace with `StrEnum` when we upgrade to python 3.11 +class NodeKind(str, Enum): + PAGESERVER = "pageserver" + SAFEKEEPER = "safekeeper" + + class StorageScrubber: def __init__(self, env: NeonEnv, log_dir: Path): self.env = env self.log_dir = log_dir - def scrubber_cli(self, args: list[str], timeout) -> str: + def scrubber_cli( + self, args: list[str], timeout, extra_env: Optional[Dict[str, str]] = None + ) -> str: assert isinstance(self.env.pageserver_remote_storage, S3Storage) s3_storage = self.env.pageserver_remote_storage @@ -4645,6 +4653,9 @@ class StorageScrubber: if s3_storage.endpoint is not None: env.update({"AWS_ENDPOINT_URL": s3_storage.endpoint}) + if extra_env is not None: + env.update(extra_env) + base_args = [ str(self.env.neon_binpath / "storage_scrubber"), f"--controller-api={self.env.storage_controller.api_root()}", @@ -4672,18 +4683,43 @@ class StorageScrubber: assert stdout is not None return stdout - def scan_metadata(self, post_to_storage_controller: bool = False) -> Tuple[bool, Any]: + def scan_metadata_safekeeper( + self, + timeline_lsns: List[Dict[str, Any]], + cloud_admin_api_url: str, + cloud_admin_api_token: str, + ) -> Tuple[bool, Any]: + extra_env = { + "CLOUD_ADMIN_API_URL": cloud_admin_api_url, + "CLOUD_ADMIN_API_TOKEN": cloud_admin_api_token, + } + return self.scan_metadata( + node_kind=NodeKind.SAFEKEEPER, timeline_lsns=timeline_lsns, extra_env=extra_env + ) + + def scan_metadata( + self, + post_to_storage_controller: bool = False, + node_kind: NodeKind = NodeKind.PAGESERVER, + timeline_lsns: Optional[List[Dict[str, Any]]] = None, + extra_env: Optional[Dict[str, str]] = None, + ) -> Tuple[bool, Any]: """ Returns the health status and the metadata summary. """ - args = ["scan-metadata", "--node-kind", "pageserver", "--json"] + args = ["scan-metadata", "--node-kind", node_kind.value, "--json"] if post_to_storage_controller: args.append("--post") - stdout = self.scrubber_cli(args, timeout=30) + if timeline_lsns is not None: + args.append("--timeline-lsns") + args.append(json.dumps(timeline_lsns)) + stdout = self.scrubber_cli(args, timeout=30, extra_env=extra_env) try: summary = json.loads(stdout) - healthy = not summary["with_errors"] and not summary["with_warnings"] + # summary does not contain "with_warnings" if node_kind is the safekeeper + no_warnings = "with_warnings" not in summary or not summary["with_warnings"] + healthy = not summary["with_errors"] and no_warnings return healthy, summary except: log.error("Failed to decode JSON output from `scan-metadata`. Dumping stdout:") diff --git a/test_runner/regress/test_tenant_delete.py b/test_runner/regress/test_tenant_delete.py index 448a28dc31..7ee949e8d3 100644 --- a/test_runner/regress/test_tenant_delete.py +++ b/test_runner/regress/test_tenant_delete.py @@ -1,7 +1,9 @@ +import json from threading import Thread import pytest from fixtures.common_types import Lsn, TenantId, TimelineId +from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnvBuilder, PgBin, @@ -17,6 +19,8 @@ from fixtures.pageserver.utils import ( from fixtures.remote_storage import RemoteStorageKind, s3_storage from fixtures.utils import run_pg_bench_small, wait_until from requests.exceptions import ReadTimeout +from werkzeug.wrappers.request import Request +from werkzeug.wrappers.response import Response def error_tolerant_delete(ps_http, tenant_id): @@ -322,7 +326,7 @@ def test_tenant_delete_races_timeline_creation(neon_env_builder: NeonEnvBuilder) env.pageserver.stop() -def test_tenant_delete_scrubber(pg_bin: PgBin, neon_env_builder: NeonEnvBuilder): +def test_tenant_delete_scrubber(pg_bin: PgBin, make_httpserver, neon_env_builder: NeonEnvBuilder): """ Validate that creating and then deleting the tenant both survives the scrubber, and that one can run the scrubber without problems. @@ -347,6 +351,45 @@ def test_tenant_delete_scrubber(pg_bin: PgBin, neon_env_builder: NeonEnvBuilder) healthy, _ = env.storage_scrubber.scan_metadata() assert healthy + timeline_lsns = { + "tenant_id": f"{tenant_id}", + "timeline_id": f"{timeline_id}", + "timeline_start_lsn": f"{last_flush_lsn}", + "backup_lsn": f"{last_flush_lsn}", + } + + cloud_admin_url = f"http://{make_httpserver.host}:{make_httpserver.port}/" + cloud_admin_token = "" + + def get_branches(request: Request): + # Compare definition with `BranchData` struct + dummy_data = { + "id": "test-branch-id", + "created_at": "", # TODO + "updated_at": "", # TODO + "name": "testbranchname", + "project_id": "test-project-id", + "timeline_id": f"{timeline_id}", + "default": False, + "deleted": False, + "logical_size": 42000, + "physical_size": 42000, + "written_size": 42000, + } + # This test does all its own compute configuration (by passing explicit pageserver ID to Workload functions), + # so we send controller notifications to /dev/null to prevent it fighting the test for control of the compute. + log.info(f"got get_branches request: {request.json}") + return Response(json.dumps(dummy_data), content_type="application/json", status=200) + + make_httpserver.expect_request("/branches", method="GET").respond_with_handler(get_branches) + + healthy, _ = env.storage_scrubber.scan_metadata_safekeeper( + timeline_lsns=[timeline_lsns], + cloud_admin_api_url=cloud_admin_url, + cloud_admin_api_token=cloud_admin_token, + ) + assert healthy + env.start() ps_http = env.pageserver.http_client() ps_http.tenant_delete(tenant_id) @@ -354,3 +397,10 @@ def test_tenant_delete_scrubber(pg_bin: PgBin, neon_env_builder: NeonEnvBuilder) healthy, _ = env.storage_scrubber.scan_metadata() assert healthy + + healthy, _ = env.storage_scrubber.scan_metadata_safekeeper( + timeline_lsns=[timeline_lsns], + cloud_admin_api_url=cloud_admin_url, + cloud_admin_api_token=cloud_admin_token, + ) + assert healthy