From 4385e0c2913e965aa93d8e7994591db7ef8e5ad8 Mon Sep 17 00:00:00 2001 From: Stas Kelvich Date: Tue, 13 Jun 2023 01:32:34 +0300 Subject: [PATCH 01/21] Return more RowDescription fields via proxy json endpoint As we aim to align client-side behavior with node-postgres, it's necessary for us to return these fields, because node-postgres does so as well. --- Cargo.lock | 10 +++++----- Cargo.toml | 12 ++++++------ proxy/src/http/sql_over_http.rs | 5 +++++ 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6856b9e3ac..71a6699c50 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2770,7 +2770,7 @@ dependencies = [ [[package]] name = "postgres" version = "0.19.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c#f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" +source = "git+https://github.com/neondatabase/rust-postgres.git?rev=1aaedab101b23f7612042850d8f2036810fa7c7f#1aaedab101b23f7612042850d8f2036810fa7c7f" dependencies = [ "bytes", "fallible-iterator", @@ -2783,7 +2783,7 @@ dependencies = [ [[package]] name = "postgres-native-tls" version = "0.5.0" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c#f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" +source = "git+https://github.com/neondatabase/rust-postgres.git?rev=1aaedab101b23f7612042850d8f2036810fa7c7f#1aaedab101b23f7612042850d8f2036810fa7c7f" dependencies = [ "native-tls", "tokio", @@ -2794,7 +2794,7 @@ dependencies = [ [[package]] name = "postgres-protocol" version = "0.6.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c#f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" +source = "git+https://github.com/neondatabase/rust-postgres.git?rev=1aaedab101b23f7612042850d8f2036810fa7c7f#1aaedab101b23f7612042850d8f2036810fa7c7f" dependencies = [ "base64 0.20.0", "byteorder", @@ -2812,7 +2812,7 @@ dependencies = [ [[package]] name = "postgres-types" version = "0.2.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c#f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" +source = "git+https://github.com/neondatabase/rust-postgres.git?rev=1aaedab101b23f7612042850d8f2036810fa7c7f#1aaedab101b23f7612042850d8f2036810fa7c7f" dependencies = [ "bytes", "fallible-iterator", @@ -4272,7 +4272,7 @@ dependencies = [ [[package]] name = "tokio-postgres" version = "0.7.7" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c#f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" +source = "git+https://github.com/neondatabase/rust-postgres.git?rev=1aaedab101b23f7612042850d8f2036810fa7c7f#1aaedab101b23f7612042850d8f2036810fa7c7f" dependencies = [ "async-trait", "byteorder", diff --git a/Cargo.toml b/Cargo.toml index dc34705f8d..551a9dc783 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -140,11 +140,11 @@ env_logger = "0.10" log = "0.4" ## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed -postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" } -postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" } -postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" } -postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" } -tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" } +postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="1aaedab101b23f7612042850d8f2036810fa7c7f" } +postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="1aaedab101b23f7612042850d8f2036810fa7c7f" } +postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="1aaedab101b23f7612042850d8f2036810fa7c7f" } +postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="1aaedab101b23f7612042850d8f2036810fa7c7f" } +tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="1aaedab101b23f7612042850d8f2036810fa7c7f" } tokio-tar = { git = "https://github.com/neondatabase/tokio-tar.git", rev="404df61437de0feef49ba2ccdbdd94eb8ad6e142" } ## Other git libraries @@ -180,7 +180,7 @@ tonic-build = "0.9" # This is only needed for proxy's tests. # TODO: we should probably fork `tokio-postgres-rustls` instead. -tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="f6ec31df3bcce89cb34f300f17c8a8c031c5ee8c" } +tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="1aaedab101b23f7612042850d8f2036810fa7c7f" } # Changes the MAX_THREADS limit from 4096 to 32768. # This is a temporary workaround for using tracing from many threads in safekeepers code, diff --git a/proxy/src/http/sql_over_http.rs b/proxy/src/http/sql_over_http.rs index 050f00dd7d..1007532a96 100644 --- a/proxy/src/http/sql_over_http.rs +++ b/proxy/src/http/sql_over_http.rs @@ -280,6 +280,11 @@ pub async fn handle( json!({ "name": Value::String(c.name().to_owned()), "dataTypeID": Value::Number(c.type_().oid().into()), + "tableID": c.table_oid(), + "columnID": c.column_id(), + "dataTypeSize": c.type_size(), + "dataTypeModifier": c.type_modifier(), + "format": "text", }) }) .collect::>() From a0b3990411071539e715ba8564b9c873dfd50d2b Mon Sep 17 00:00:00 2001 From: Alexander Bayandin Date: Tue, 13 Jun 2023 22:33:42 +0100 Subject: [PATCH 02/21] Retry data ingestion scripts on connection errors (#4382) ## Problem From time to time, we're catching a race condition when trying to upload perf or regression test results. Ref: - https://neondb.slack.com/archives/C03H1K0PGKH/p1685462717870759 - https://github.com/neondatabase/cloud/issues/3686 ## Summary of changes Wrap `psycopg2.connect` method with `@backoff.on_exception` contextmanager --- scripts/ingest_perf_test_result.py | 15 ++++++++++++++- scripts/ingest_regress_test_result.py | 14 +++++++++++++- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/scripts/ingest_perf_test_result.py b/scripts/ingest_perf_test_result.py index 1bfc907def..fc177b590e 100644 --- a/scripts/ingest_perf_test_result.py +++ b/scripts/ingest_perf_test_result.py @@ -1,12 +1,14 @@ #!/usr/bin/env python3 import argparse import json +import logging import os import sys from contextlib import contextmanager from datetime import datetime from pathlib import Path +import backoff import psycopg2 import psycopg2.extras @@ -35,9 +37,18 @@ def get_connection_cursor(): connstr = os.getenv("DATABASE_URL") if not connstr: err("DATABASE_URL environment variable is not set") - with psycopg2.connect(connstr, connect_timeout=30) as conn: + + @backoff.on_exception(backoff.expo, psycopg2.OperationalError, max_time=150) + def connect(connstr): + return psycopg2.connect(connstr, connect_timeout=30) + + conn = connect(connstr) + try: with conn.cursor() as cur: yield cur + finally: + if conn is not None: + conn.close() def create_table(cur): @@ -115,6 +126,7 @@ def main(): parser.add_argument( "--ingest", type=Path, + required=True, help="Path to perf test result file, or directory with perf test result files", ) parser.add_argument("--initdb", action="store_true", help="Initialuze database") @@ -140,4 +152,5 @@ def main(): if __name__ == "__main__": + logging.getLogger("backoff").addHandler(logging.StreamHandler()) main() diff --git a/scripts/ingest_regress_test_result.py b/scripts/ingest_regress_test_result.py index 974167483a..dff8e0cefa 100644 --- a/scripts/ingest_regress_test_result.py +++ b/scripts/ingest_regress_test_result.py @@ -1,11 +1,13 @@ #!/usr/bin/env python3 import argparse +import logging import os import re import sys from contextlib import contextmanager from pathlib import Path +import backoff import psycopg2 CREATE_TABLE = """ @@ -29,9 +31,18 @@ def get_connection_cursor(): connstr = os.getenv("DATABASE_URL") if not connstr: err("DATABASE_URL environment variable is not set") - with psycopg2.connect(connstr, connect_timeout=30) as conn: + + @backoff.on_exception(backoff.expo, psycopg2.OperationalError, max_time=150) + def connect(connstr): + return psycopg2.connect(connstr, connect_timeout=30) + + conn = connect(connstr) + try: with conn.cursor() as cur: yield cur + finally: + if conn is not None: + conn.close() def create_table(cur): @@ -101,4 +112,5 @@ def main(): if __name__ == "__main__": + logging.getLogger("backoff").addHandler(logging.StreamHandler()) main() From 3164ad7052fc8680538cc7659c9770d807177567 Mon Sep 17 00:00:00 2001 From: bojanserafimov Date: Tue, 13 Jun 2023 21:48:09 -0400 Subject: [PATCH 03/21] compute_ctl: Spec parser forward compatibility test (#4494) --- libs/compute_api/src/spec.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/libs/compute_api/src/spec.rs b/libs/compute_api/src/spec.rs index c2ad30f86f..b3f0e9ba43 100644 --- a/libs/compute_api/src/spec.rs +++ b/libs/compute_api/src/spec.rs @@ -148,4 +148,14 @@ mod tests { let file = File::open("tests/cluster_spec.json").unwrap(); let _spec: ComputeSpec = serde_json::from_reader(file).unwrap(); } + + #[test] + fn parse_unknown_fields() { + // Forward compatibility test + let file = File::open("tests/cluster_spec.json").unwrap(); + let mut json: serde_json::Value = serde_json::from_reader(file).unwrap(); + let ob = json.as_object_mut().unwrap(); + ob.insert("unknown_field_123123123".into(), "hello".into()); + let _spec: ComputeSpec = serde_json::from_value(json).unwrap(); + } } From ebee8247b54dbea641215506abc722208f8095a7 Mon Sep 17 00:00:00 2001 From: Shany Pozin Date: Wed, 14 Jun 2023 15:38:01 +0300 Subject: [PATCH 04/21] Move s3 delete_objects to use chunks of 1000 OIDs (#4463) See https://github.com/neondatabase/neon/pull/4461#pullrequestreview-1474240712 --- libs/remote_storage/src/s3_bucket.rs | 40 ++++++++++++++------ libs/remote_storage/src/simulate_failures.rs | 13 ++++++- 2 files changed, 41 insertions(+), 12 deletions(-) diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 38e1bf00f8..dafb6dcb45 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -34,6 +34,8 @@ use crate::{ Download, DownloadError, RemotePath, RemoteStorage, S3Config, REMOTE_STORAGE_PREFIX_SEPARATOR, }; +const MAX_DELETE_OBJECTS_REQUEST_SIZE: usize = 1000; + pub(super) mod metrics { use metrics::{register_int_counter_vec, IntCounterVec}; use once_cell::sync::Lazy; @@ -424,17 +426,33 @@ impl RemoteStorage for S3Bucket { delete_objects.push(obj_id); } - metrics::inc_delete_objects(paths.len() as u64); - self.client - .delete_objects() - .bucket(self.bucket_name.clone()) - .delete(Delete::builder().set_objects(Some(delete_objects)).build()) - .send() - .await - .map_err(|e| { - metrics::inc_delete_objects_fail(paths.len() as u64); - e - })?; + for chunk in delete_objects.chunks(MAX_DELETE_OBJECTS_REQUEST_SIZE) { + metrics::inc_delete_objects(chunk.len() as u64); + + let resp = self + .client + .delete_objects() + .bucket(self.bucket_name.clone()) + .delete(Delete::builder().set_objects(Some(chunk.to_vec())).build()) + .send() + .await; + + match resp { + Ok(resp) => { + if let Some(errors) = resp.errors { + metrics::inc_delete_objects_fail(errors.len() as u64); + return Err(anyhow::format_err!( + "Failed to delete {} objects", + errors.len() + )); + } + } + Err(e) => { + metrics::inc_delete_objects_fail(chunk.len() as u64); + return Err(e.into()); + } + } + } Ok(()) } diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index 2f341bb29d..741c18bf6f 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -24,6 +24,7 @@ enum RemoteOp { Upload(RemotePath), Download(RemotePath), Delete(RemotePath), + DeleteObjects(Vec), } impl UnreliableWrapper { @@ -121,8 +122,18 @@ impl RemoteStorage for UnreliableWrapper { } async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> { + self.attempt(RemoteOp::DeleteObjects(paths.to_vec()))?; + let mut error_counter = 0; for path in paths { - self.delete(path).await? + if (self.delete(path).await).is_err() { + error_counter += 1; + } + } + if error_counter > 0 { + return Err(anyhow::anyhow!( + "failed to delete {} objects", + error_counter + )); } Ok(()) } From 9484b96d7cd31e3d1f91f0feb3ecd2c5afff2ca3 Mon Sep 17 00:00:00 2001 From: Alexander Bayandin Date: Wed, 14 Jun 2023 15:07:30 +0100 Subject: [PATCH 05/21] GitHub Autocomment: do not fail the job (#4478) ## Problem If the script fails to generate a test summary, the step also fails the job/workflow (despite this could be a non-fatal problem). ## Summary of changes - Separate JSON parsing and summarisation into separate functions - Wrap functions calling into try..catch block, add an error message to GitHub comment and do not fail the step - Make `scripts/comment-test-report.js` a CLI script that can be run locally (mock GitHub calls) to make it easier to debug issues locally --- scripts/comment-test-report.js | 191 ++++++++++++++++++++++++++------- 1 file changed, 150 insertions(+), 41 deletions(-) mode change 100644 => 100755 scripts/comment-test-report.js diff --git a/scripts/comment-test-report.js b/scripts/comment-test-report.js old mode 100644 new mode 100755 index a7fd5b0bef..432c78d1af --- a/scripts/comment-test-report.js +++ b/scripts/comment-test-report.js @@ -1,3 +1,5 @@ +#! /usr/bin/env node + // // The script parses Allure reports and posts a comment with a summary of the test results to the PR or to the latest commit in the branch. // @@ -19,7 +21,7 @@ // }) // -// Analog of Python's defaultdict. +// Equivalent of Python's defaultdict. // // const dm = new DefaultMap(() => new DefaultMap(() => [])) // dm["firstKey"]["secondKey"].push("value") @@ -32,34 +34,7 @@ class DefaultMap extends Map { } } -module.exports = async ({ github, context, fetch, report }) => { - // Marker to find the comment in the subsequent runs - const startMarker = `` - // If we run the script in the PR or in the branch (main/release/...) - const isPullRequest = !!context.payload.pull_request - // Latest commit in PR or in the branch - const commitSha = isPullRequest ? context.payload.pull_request.head.sha : context.sha - // Let users know that the comment is updated automatically - const autoupdateNotice = `
The comment gets automatically updated with the latest test results
${commitSha} at ${new Date().toISOString()} :recycle:
` - // GitHub bot id taken from (https://api.github.com/users/github-actions[bot]) - const githubActionsBotId = 41898282 - // Commend body itself - let commentBody = `${startMarker}\n` - - // Common parameters for GitHub API requests - const ownerRepoParams = { - owner: context.repo.owner, - repo: context.repo.repo, - } - - const {reportUrl, reportJsonUrl} = report - - if (!reportUrl || !reportJsonUrl) { - commentBody += `#### No tests were run or test report is not available\n` - commentBody += autoupdateNotice - return - } - +const parseReportJson = async ({ reportJsonUrl, fetch }) => { const suites = await (await fetch(reportJsonUrl)).json() // Allure distinguishes "failed" (with an assertion error) and "broken" (with any other error) tests. @@ -83,7 +58,7 @@ module.exports = async ({ github, context, fetch, report }) => { let buildType, pgVersion const match = test.name.match(/[\[-](?debug|release)-pg(?\d+)[-\]]/)?.groups if (match) { - ({buildType, pgVersion} = match) + ({ buildType, pgVersion } = match) } else { // It's ok, we embed BUILD_TYPE and Postgres Version into the test name only for regress suite and do not for other suites (like performance). console.info(`Cannot get BUILD_TYPE and Postgres Version from test name: "${test.name}", defaulting to "release" and "14"`) @@ -123,37 +98,68 @@ module.exports = async ({ github, context, fetch, report }) => { } } + return { + failedTests, + failedTestsCount, + passedTests, + passedTestsCount, + skippedTests, + skippedTestsCount, + flakyTests, + flakyTestsCount, + retriedTests, + pgVersions, + } +} + +const reportSummary = async (params) => { + const { + failedTests, + failedTestsCount, + passedTests, + passedTestsCount, + skippedTests, + skippedTestsCount, + flakyTests, + flakyTestsCount, + retriedTests, + pgVersions, + reportUrl, + } = params + + let summary = "" + const totalTestsCount = failedTestsCount + passedTestsCount + skippedTestsCount - commentBody += `### ${totalTestsCount} tests run: ${passedTestsCount} passed, ${failedTestsCount} failed, ${skippedTestsCount} skipped ([full report](${reportUrl}))\n___\n` + summary += `### ${totalTestsCount} tests run: ${passedTestsCount} passed, ${failedTestsCount} failed, ${skippedTestsCount} skipped ([full report](${reportUrl}))\n___\n` // Print test resuls from the newest to the oldest Postgres version for release and debug builds. for (const pgVersion of Array.from(pgVersions).sort().reverse()) { if (Object.keys(failedTests[pgVersion]).length > 0) { - commentBody += `#### Failures on Posgres ${pgVersion}\n\n` + summary += `#### Failures on Posgres ${pgVersion}\n\n` for (const [testName, tests] of Object.entries(failedTests[pgVersion])) { const links = [] for (const test of tests) { const allureLink = `${reportUrl}#suites/${test.parentUid}/${test.uid}` links.push(`[${test.buildType}](${allureLink})`) } - commentBody += `- \`${testName}\`: ${links.join(", ")}\n` + summary += `- \`${testName}\`: ${links.join(", ")}\n` } const testsToRerun = Object.values(failedTests[pgVersion]).map(x => x[0].name) const command = `DEFAULT_PG_VERSION=${pgVersion} scripts/pytest -k "${testsToRerun.join(" or ")}"` - commentBody += "```\n" - commentBody += `# Run failed on Postgres ${pgVersion} tests locally:\n` - commentBody += `${command}\n` - commentBody += "```\n" + summary += "```\n" + summary += `# Run failed on Postgres ${pgVersion} tests locally:\n` + summary += `${command}\n` + summary += "```\n" } } if (flakyTestsCount > 0) { - commentBody += `
\nFlaky tests (${flakyTestsCount})\n\n` + summary += `
\nFlaky tests (${flakyTestsCount})\n\n` for (const pgVersion of Array.from(pgVersions).sort().reverse()) { if (Object.keys(flakyTests[pgVersion]).length > 0) { - commentBody += `#### Postgres ${pgVersion}\n\n` + summary += `#### Postgres ${pgVersion}\n\n` for (const [testName, tests] of Object.entries(flakyTests[pgVersion])) { const links = [] for (const test of tests) { @@ -161,11 +167,57 @@ module.exports = async ({ github, context, fetch, report }) => { const status = test.status === "passed" ? ":white_check_mark:" : ":x:" links.push(`[${status} ${test.buildType}](${allureLink})`) } - commentBody += `- \`${testName}\`: ${links.join(", ")}\n` + summary += `- \`${testName}\`: ${links.join(", ")}\n` } } } - commentBody += "\n
\n" + summary += "\n
\n" + } + + return summary +} + +module.exports = async ({ github, context, fetch, report }) => { + // Marker to find the comment in the subsequent runs + const startMarker = `` + // If we run the script in the PR or in the branch (main/release/...) + const isPullRequest = !!context.payload.pull_request + // Latest commit in PR or in the branch + const commitSha = isPullRequest ? context.payload.pull_request.head.sha : context.sha + // Let users know that the comment is updated automatically + const autoupdateNotice = `
The comment gets automatically updated with the latest test results
${commitSha} at ${new Date().toISOString()} :recycle:
` + // GitHub bot id taken from (https://api.github.com/users/github-actions[bot]) + const githubActionsBotId = 41898282 + // Commend body itself + let commentBody = `${startMarker}\n` + + // Common parameters for GitHub API requests + const ownerRepoParams = { + owner: context.repo.owner, + repo: context.repo.repo, + } + + const {reportUrl, reportJsonUrl} = report + + if (!reportUrl || !reportJsonUrl) { + commentBody += `#### No tests were run or test report is not available\n` + commentBody += autoupdateNotice + return + } + + try { + const parsed = await parseReportJson({ reportJsonUrl, fetch }) + commentBody += await reportSummary({ ...parsed, reportUrl }) + } catch (error) { + commentBody += `### [full report](${reportUrl})\n___\n` + commentBody += `#### Failed to create a summary for the test run: \n` + commentBody += "```\n" + commentBody += `${error.stack}\n` + commentBody += "```\n" + commentBody += "\nTo reproduce and debug the error locally run:\n" + commentBody += "```\n" + commentBody += `scripts/comment-test-report.js ${reportJsonUrl}` + commentBody += "\n```\n" } commentBody += autoupdateNotice @@ -207,3 +259,60 @@ module.exports = async ({ github, context, fetch, report }) => { }) } } + +// Equivalent of Python's `if __name__ == "__main__":` +// https://nodejs.org/docs/latest/api/modules.html#accessing-the-main-module +if (require.main === module) { + // Poor man's argument parsing: we expect the third argument is a JSON URL (0: node binary, 1: this script, 2: JSON url) + if (process.argv.length !== 3) { + console.error(`Unexpected number of arguments\nUsage: node ${process.argv[1]} `) + process.exit(1) + } + const jsonUrl = process.argv[2] + + try { + new URL(jsonUrl) + } catch (error) { + console.error(`Invalid URL: ${jsonUrl}\nUsage: node ${process.argv[1]} `) + process.exit(1) + } + + const htmlUrl = jsonUrl.replace("/data/suites.json", "/index.html") + + const githubMock = { + rest: { + issues: { + createComment: console.log, + listComments: async () => ({ data: [] }), + updateComment: console.log + }, + repos: { + createCommitComment: console.log, + listCommentsForCommit: async () => ({ data: [] }), + updateCommitComment: console.log + } + } + } + + const contextMock = { + repo: { + owner: 'testOwner', + repo: 'testRepo' + }, + payload: { + number: 42, + pull_request: null, + }, + sha: '0000000000000000000000000000000000000000', + } + + module.exports({ + github: githubMock, + context: contextMock, + fetch: fetch, + report: { + reportUrl: htmlUrl, + reportJsonUrl: jsonUrl, + } + }) +} From ee9a5bae43da5cac32cc71326f6e482ed5eeb389 Mon Sep 17 00:00:00 2001 From: Dmitry Rodionov Date: Wed, 14 Jun 2023 19:07:42 +0300 Subject: [PATCH 06/21] Filter only active timelines for compaction (#4487) Previously we may've included Stopping/Broken timelines here, which leads to errors in logs -> causes tests to sporadically fail resolves #4467 --- pageserver/src/tenant.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 5603bcef84..3ed4621112 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -1462,7 +1462,13 @@ impl Tenant { let timelines = self.timelines.lock().unwrap(); let timelines_to_compact = timelines .iter() - .map(|(timeline_id, timeline)| (*timeline_id, timeline.clone())) + .filter_map(|(timeline_id, timeline)| { + if timeline.is_active() { + Some((*timeline_id, timeline.clone())) + } else { + None + } + }) .collect::>(); drop(timelines); timelines_to_compact From a7a0c3cd278c485a027620cfd373d6b9ca7e6c0c Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Wed, 14 Jun 2023 19:24:46 +0300 Subject: [PATCH 07/21] Invalidate proxy cache in http-over-sql (#4500) HTTP queries failed with errors `error connecting to server: failed to lookup address information: Name or service not known\n\nCaused by:\n failed to lookup address information: Name or service not known` The fix reused cache invalidation logic in proxy from usual postgres connections and added it to HTTP-over-SQL queries. Also removed a timeout for HTTP request, because it almost never worked on staging (50s+ time just to start the compute), and we can have the similar case in production. Should be ok, since we have a limits for the requests and responses. --- proxy/src/http/sql_over_http.rs | 136 +++++++++++++++++++++++--------- proxy/src/http/websocket.rs | 12 +-- proxy/src/proxy.rs | 39 ++++----- 3 files changed, 120 insertions(+), 67 deletions(-) diff --git a/proxy/src/http/sql_over_http.rs b/proxy/src/http/sql_over_http.rs index 1007532a96..e8ad2d04f3 100644 --- a/proxy/src/http/sql_over_http.rs +++ b/proxy/src/http/sql_over_http.rs @@ -1,5 +1,6 @@ use futures::pin_mut; use futures::StreamExt; +use futures::TryFutureExt; use hyper::body::HttpBody; use hyper::http::HeaderName; use hyper::http::HeaderValue; @@ -11,8 +12,13 @@ use serde_json::Value; use tokio_postgres::types::Kind; use tokio_postgres::types::Type; use tokio_postgres::Row; +use tracing::error; +use tracing::info; +use tracing::instrument; use url::Url; +use crate::proxy::invalidate_cache; +use crate::proxy::NUM_RETRIES_WAKE_COMPUTE; use crate::{auth, config::ProxyConfig, console}; #[derive(serde::Deserialize)] @@ -90,10 +96,17 @@ fn json_array_to_pg_array(value: &Value) -> Result, serde_json::E } } +struct ConnInfo { + username: String, + dbname: String, + hostname: String, + password: String, +} + fn get_conn_info( headers: &HeaderMap, sni_hostname: Option, -) -> Result<(String, String, String, String), anyhow::Error> { +) -> Result { let connection_string = headers .get("Neon-Connection-String") .ok_or(anyhow::anyhow!("missing connection string"))? @@ -146,12 +159,12 @@ fn get_conn_info( } } - Ok(( - username.to_owned(), - dbname.to_owned(), - hostname.to_owned(), - password.to_owned(), - )) + Ok(ConnInfo { + username: username.to_owned(), + dbname: dbname.to_owned(), + hostname: hostname.to_owned(), + password: password.to_owned(), + }) } // TODO: return different http error codes @@ -164,10 +177,10 @@ pub async fn handle( // Determine the destination and connection params // let headers = request.headers(); - let (username, dbname, hostname, password) = get_conn_info(headers, sni_hostname)?; + let conn_info = get_conn_info(headers, sni_hostname)?; let credential_params = StartupMessageParams::new([ - ("user", &username), - ("database", &dbname), + ("user", &conn_info.username), + ("database", &conn_info.dbname), ("application_name", APP_NAME), ]); @@ -186,21 +199,20 @@ pub async fn handle( let creds = config .auth_backend .as_ref() - .map(|_| auth::ClientCredentials::parse(&credential_params, Some(&hostname), common_names)) + .map(|_| { + auth::ClientCredentials::parse( + &credential_params, + Some(&conn_info.hostname), + common_names, + ) + }) .transpose()?; let extra = console::ConsoleReqExtra { session_id: uuid::Uuid::new_v4(), application_name: Some(APP_NAME), }; - let node = creds.wake_compute(&extra).await?.expect("msg"); - let conf = node.value.config; - let port = *conf.get_ports().first().expect("no port"); - let host = match conf.get_hosts().first().expect("no host") { - tokio_postgres::config::Host::Tcp(host) => host, - tokio_postgres::config::Host::Unix(_) => { - return Err(anyhow::anyhow!("unix socket is not supported")); - } - }; + + let mut node_info = creds.wake_compute(&extra).await?.expect("msg"); let request_content_length = match request.body().size_hint().upper() { Some(v) => v, @@ -220,28 +232,10 @@ pub async fn handle( let QueryData { query, params } = serde_json::from_slice(&body)?; let query_params = json_to_pg_text(params)?; - // - // Connenct to the destination - // - let (client, connection) = tokio_postgres::Config::new() - .host(host) - .port(port) - .user(&username) - .password(&password) - .dbname(&dbname) - .max_backend_message_size(MAX_RESPONSE_SIZE) - .connect(tokio_postgres::NoTls) - .await?; - - tokio::spawn(async move { - if let Err(e) = connection.await { - eprintln!("connection error: {}", e); - } - }); - // // Now execute the query and return the result // + let client = connect_to_compute(&mut node_info, &extra, &creds, &conn_info).await?; let row_stream = client.query_raw_txt(query, query_params).await?; // Manually drain the stream into a vector to leave row_stream hanging @@ -308,6 +302,70 @@ pub async fn handle( })) } +/// This function is a copy of `connect_to_compute` from `src/proxy.rs` with +/// the difference that it uses `tokio_postgres` for the connection. +#[instrument(skip_all)] +async fn connect_to_compute( + node_info: &mut console::CachedNodeInfo, + extra: &console::ConsoleReqExtra<'_>, + creds: &auth::BackendType<'_, auth::ClientCredentials<'_>>, + conn_info: &ConnInfo, +) -> anyhow::Result { + let mut num_retries: usize = NUM_RETRIES_WAKE_COMPUTE; + + loop { + match connect_to_compute_once(node_info, conn_info).await { + Err(e) if num_retries > 0 => { + info!("compute node's state has changed; requesting a wake-up"); + match creds.wake_compute(extra).await? { + // Update `node_info` and try one more time. + Some(new) => { + *node_info = new; + } + // Link auth doesn't work that way, so we just exit. + None => return Err(e), + } + } + other => return other, + } + + num_retries -= 1; + info!("retrying after wake-up ({num_retries} attempts left)"); + } +} + +async fn connect_to_compute_once( + node_info: &console::CachedNodeInfo, + conn_info: &ConnInfo, +) -> anyhow::Result { + let mut config = (*node_info.config).clone(); + + let (client, connection) = config + .user(&conn_info.username) + .password(&conn_info.password) + .dbname(&conn_info.dbname) + .max_backend_message_size(MAX_RESPONSE_SIZE) + .connect(tokio_postgres::NoTls) + .inspect_err(|e: &tokio_postgres::Error| { + error!( + "failed to connect to compute node hosts={:?} ports={:?}: {}", + node_info.config.get_hosts(), + node_info.config.get_ports(), + e + ); + invalidate_cache(node_info) + }) + .await?; + + tokio::spawn(async move { + if let Err(e) = connection.await { + error!("connection error: {}", e); + } + }); + + Ok(client) +} + // // Convert postgres row with text-encoded values to JSON object // diff --git a/proxy/src/http/websocket.rs b/proxy/src/http/websocket.rs index fbb602e3d2..9f467aceb7 100644 --- a/proxy/src/http/websocket.rs +++ b/proxy/src/http/websocket.rs @@ -26,7 +26,6 @@ use tls_listener::TlsListener; use tokio::{ io::{self, AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf}, net::TcpListener, - select, }; use tokio_util::sync::CancellationToken; use tracing::{error, info, info_span, warn, Instrument}; @@ -193,14 +192,9 @@ async fn ws_handler( // TODO: that deserves a refactor as now this function also handles http json client besides websockets. // Right now I don't want to blow up sql-over-http patch with file renames and do that as a follow up instead. } else if request.uri().path() == "/sql" && request.method() == Method::POST { - let result = select! { - _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => { - Err(anyhow::anyhow!("Query timed out")) - } - response = sql_over_http::handle(config, request, sni_hostname) => { - response - } - }; + let result = sql_over_http::handle(config, request, sni_hostname) + .instrument(info_span!("sql-over-http")) + .await; let status_code = match result { Ok(_) => StatusCode::OK, Err(_) => StatusCode::BAD_REQUEST, diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index cf2dd000db..8efb7005c8 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -22,7 +22,7 @@ use tracing::{error, info, warn}; use utils::measured_stream::MeasuredStream; /// Number of times we should retry the `/proxy_wake_compute` http request. -const NUM_RETRIES_WAKE_COMPUTE: usize = 1; +pub const NUM_RETRIES_WAKE_COMPUTE: usize = 1; const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)"; const ERR_PROTO_VIOLATION: &str = "protocol violation"; @@ -283,34 +283,35 @@ async fn handshake( } } +/// If we couldn't connect, a cached connection info might be to blame +/// (e.g. the compute node's address might've changed at the wrong time). +/// Invalidate the cache entry (if any) to prevent subsequent errors. +#[tracing::instrument(name = "invalidate_cache", skip_all)] +pub fn invalidate_cache(node_info: &console::CachedNodeInfo) { + let is_cached = node_info.cached(); + if is_cached { + warn!("invalidating stalled compute node info cache entry"); + node_info.invalidate(); + } + + let label = match is_cached { + true => "compute_cached", + false => "compute_uncached", + }; + NUM_CONNECTION_FAILURES.with_label_values(&[label]).inc(); +} + /// Try to connect to the compute node once. #[tracing::instrument(name = "connect_once", skip_all)] async fn connect_to_compute_once( node_info: &console::CachedNodeInfo, ) -> Result { - // If we couldn't connect, a cached connection info might be to blame - // (e.g. the compute node's address might've changed at the wrong time). - // Invalidate the cache entry (if any) to prevent subsequent errors. - let invalidate_cache = |_: &compute::ConnectionError| { - let is_cached = node_info.cached(); - if is_cached { - warn!("invalidating stalled compute node info cache entry"); - node_info.invalidate(); - } - - let label = match is_cached { - true => "compute_cached", - false => "compute_uncached", - }; - NUM_CONNECTION_FAILURES.with_label_values(&[label]).inc(); - }; - let allow_self_signed_compute = node_info.allow_self_signed_compute; node_info .config .connect(allow_self_signed_compute) - .inspect_err(invalidate_cache) + .inspect_err(|_: &compute::ConnectionError| invalidate_cache(node_info)) .await } From cd3faa8c0ccef3a80ff04d5582393450d6693fd6 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 14 Jun 2023 19:04:22 +0200 Subject: [PATCH 08/21] test_basic_eviction: avoid some sources of flakiness (#4504) We've seen the download_layer() call return 304 in prod because of a spurious on-demand download caused by a GetPage request from compute. Avoid these and some other sources of on-demand downloads by shutting down compute, SKs, and by disabling background loops. CF https://neon-github-public-dev.s3.amazonaws.com/reports/pr-4498/5258914461/index.html#suites/2599693fa27db8427603ba822bcf2a20/357808fd552fede3 --- test_runner/regress/test_layer_eviction.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/test_runner/regress/test_layer_eviction.py b/test_runner/regress/test_layer_eviction.py index a96532c0d8..b22e545f20 100644 --- a/test_runner/regress/test_layer_eviction.py +++ b/test_runner/regress/test_layer_eviction.py @@ -24,7 +24,13 @@ def test_basic_eviction( test_name="test_download_remote_layers_api", ) - env = neon_env_builder.init_start() + env = neon_env_builder.init_start( + initial_tenant_conf={ + # disable gc and compaction background loops because they perform on-demand downloads + "gc_period": "0s", + "compaction_period": "0s", + } + ) client = env.pageserver.http_client() endpoint = env.endpoints.create_start("main") @@ -47,6 +53,11 @@ def test_basic_eviction( client.timeline_checkpoint(tenant_id, timeline_id) wait_for_upload(client, tenant_id, timeline_id, current_lsn) + # disable compute & sks to avoid on-demand downloads by walreceiver / getpage + endpoint.stop() + for sk in env.safekeepers: + sk.stop() + timeline_path = env.repo_dir / "tenants" / str(tenant_id) / "timelines" / str(timeline_id) initial_local_layers = sorted( list(filter(lambda path: path.name != "metadata", timeline_path.glob("*"))) From 94f315d490af8f3dc29f291b34b95f86678843ac Mon Sep 17 00:00:00 2001 From: Alexander Bayandin Date: Wed, 14 Jun 2023 19:03:09 +0100 Subject: [PATCH 09/21] Remove neon-image-depot job (#4506) ## Problem `neon-image-depot` is an experimental job we use to compare with the main `neon-image` job. But it's not stable and right now we don't have the capacity to properly fix and evaluate it. We can come back to this later. ## Summary of changes Remove `neon-image-depot` job --- .github/workflows/build_and_test.yml | 45 ---------------------------- 1 file changed, 45 deletions(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 897e1a7aad..471dc68df9 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -623,51 +623,6 @@ jobs: - name: Cleanup ECR folder run: rm -rf ~/.ecr - - neon-image-depot: - # For testing this will run side-by-side for a few merges. - # This action is not really optimized yet, but gets the job done - runs-on: [ self-hosted, gen3, large ] - needs: [ tag ] - container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:pinned - permissions: - contents: read - id-token: write - - steps: - - name: Checkout - uses: actions/checkout@v3 - with: - submodules: true - fetch-depth: 0 - - - name: Setup go - uses: actions/setup-go@v3 - with: - go-version: '1.19' - - - name: Set up Depot CLI - uses: depot/setup-action@v1 - - - name: Install Crane & ECR helper - run: go install github.com/awslabs/amazon-ecr-credential-helper/ecr-login/cli/docker-credential-ecr-login@69c85dc22db6511932bbf119e1a0cc5c90c69a7f # v0.6.0 - - - name: Configure ECR login - run: | - mkdir /github/home/.docker/ - echo "{\"credsStore\":\"ecr-login\"}" > /github/home/.docker/config.json - - - name: Build and push - uses: depot/build-push-action@v1 - with: - # if no depot.json file is at the root of your repo, you must specify the project id - project: nrdv0s4kcs - push: true - tags: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:depot-${{needs.tag.outputs.build-tag}} - build-args: | - GIT_VERSION=${{ github.sha }} - REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com - compute-tools-image: runs-on: [ self-hosted, gen3, large ] needs: [ tag ] From 2252c5c282e8463b0f1dc1d9c7484e50706392e9 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Wed, 14 Jun 2023 17:12:34 -0400 Subject: [PATCH 10/21] metrics: convert some metrics to pageserver-level (#4490) ## Problem Some metrics are better to be observed at page-server level. Otherwise, as we have a lot of tenants in production, we cannot do a sum b/c Prometheus has limit on how many time series we can aggregate. This also helps reduce metrics scraping size. ## Summary of changes Some integration tests are likely not to pass as it will check the existence of some metrics. Waiting for CI complete and fix them. Metrics downgraded: page cache hit (where we are likely to have a page-server level page cache in the future instead of per-tenant), and reconstruct time (this would better be tenant-level, as we have one pg replayer for each tenant, but now we make it page-server level as we do not need that fine-grained data). --------- Signed-off-by: Alex Chi --- pageserver/src/metrics.rs | 41 +++++++++-------------------- pageserver/src/tenant/timeline.rs | 14 +++++----- test_runner/fixtures/metrics.py | 10 +++---- test_runner/regress/test_tenants.py | 2 +- 4 files changed, 25 insertions(+), 42 deletions(-) diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index cc444c479a..43d06db6d8 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1,4 +1,3 @@ -use metrics::core::{AtomicU64, GenericCounter}; use metrics::{ register_counter_vec, register_histogram, register_histogram_vec, register_int_counter, register_int_counter_vec, register_int_gauge, register_int_gauge_vec, register_uint_gauge_vec, @@ -95,21 +94,19 @@ static READ_NUM_FS_LAYERS: Lazy = Lazy::new(|| { }); // Metrics collected on operations on the storage repository. -static RECONSTRUCT_TIME: Lazy = Lazy::new(|| { - register_histogram_vec!( +pub static RECONSTRUCT_TIME: Lazy = Lazy::new(|| { + register_histogram!( "pageserver_getpage_reconstruct_seconds", - "Time spent in reconstruct_value", - &["tenant_id", "timeline_id"], + "Time spent in reconstruct_value (reconstruct a page from deltas)", CRITICAL_OP_BUCKETS.into(), ) .expect("failed to define a metric") }); -static MATERIALIZED_PAGE_CACHE_HIT_DIRECT: Lazy = Lazy::new(|| { - register_int_counter_vec!( +pub static MATERIALIZED_PAGE_CACHE_HIT_DIRECT: Lazy = Lazy::new(|| { + register_int_counter!( "pageserver_materialized_cache_hits_direct_total", "Number of cache hits from materialized page cache without redo", - &["tenant_id", "timeline_id"] ) .expect("failed to define a metric") }); @@ -124,11 +121,10 @@ static GET_RECONSTRUCT_DATA_TIME: Lazy = Lazy::new(|| { .expect("failed to define a metric") }); -static MATERIALIZED_PAGE_CACHE_HIT: Lazy = Lazy::new(|| { - register_int_counter_vec!( +pub static MATERIALIZED_PAGE_CACHE_HIT: Lazy = Lazy::new(|| { + register_int_counter!( "pageserver_materialized_cache_hits_total", "Number of cache hits from materialized page cache", - &["tenant_id", "timeline_id"] ) .expect("failed to define a metric") }); @@ -752,10 +748,7 @@ impl StorageTimeMetrics { pub struct TimelineMetrics { tenant_id: String, timeline_id: String, - pub reconstruct_time_histo: Histogram, pub get_reconstruct_data_time_histo: Histogram, - pub materialized_page_cache_hit_counter: GenericCounter, - pub materialized_page_cache_hit_upon_request_counter: GenericCounter, pub flush_time_histo: StorageTimeMetrics, pub compact_time_histo: StorageTimeMetrics, pub create_images_time_histo: StorageTimeMetrics, @@ -783,15 +776,9 @@ impl TimelineMetrics { ) -> Self { let tenant_id = tenant_id.to_string(); let timeline_id = timeline_id.to_string(); - let reconstruct_time_histo = RECONSTRUCT_TIME - .get_metric_with_label_values(&[&tenant_id, &timeline_id]) - .unwrap(); let get_reconstruct_data_time_histo = GET_RECONSTRUCT_DATA_TIME .get_metric_with_label_values(&[&tenant_id, &timeline_id]) .unwrap(); - let materialized_page_cache_hit_counter = MATERIALIZED_PAGE_CACHE_HIT - .get_metric_with_label_values(&[&tenant_id, &timeline_id]) - .unwrap(); let flush_time_histo = StorageTimeMetrics::new(StorageTimeOperation::LayerFlush, &tenant_id, &timeline_id); let compact_time_histo = @@ -833,19 +820,18 @@ impl TimelineMetrics { let read_num_fs_layers = READ_NUM_FS_LAYERS .get_metric_with_label_values(&[&tenant_id, &timeline_id]) .unwrap(); - let materialized_page_cache_hit_upon_request_counter = MATERIALIZED_PAGE_CACHE_HIT_DIRECT - .get_metric_with_label_values(&[&tenant_id, &timeline_id]) - .unwrap(); let evictions_with_low_residence_duration = evictions_with_low_residence_duration_builder.build(&tenant_id, &timeline_id); + // TODO(chi): remove this once we remove Lazy for all metrics. Otherwise this will not appear in the exporter + // and integration test will error. + MATERIALIZED_PAGE_CACHE_HIT_DIRECT.get(); + MATERIALIZED_PAGE_CACHE_HIT.get(); + TimelineMetrics { tenant_id, timeline_id, - reconstruct_time_histo, get_reconstruct_data_time_histo, - materialized_page_cache_hit_counter, - materialized_page_cache_hit_upon_request_counter, flush_time_histo, compact_time_histo, create_images_time_histo, @@ -872,10 +858,7 @@ impl Drop for TimelineMetrics { fn drop(&mut self) { let tenant_id = &self.tenant_id; let timeline_id = &self.timeline_id; - let _ = RECONSTRUCT_TIME.remove_label_values(&[tenant_id, timeline_id]); let _ = GET_RECONSTRUCT_DATA_TIME.remove_label_values(&[tenant_id, timeline_id]); - let _ = MATERIALIZED_PAGE_CACHE_HIT.remove_label_values(&[tenant_id, timeline_id]); - let _ = MATERIALIZED_PAGE_CACHE_HIT_DIRECT.remove_label_values(&[tenant_id, timeline_id]); let _ = LAST_RECORD_LSN.remove_label_values(&[tenant_id, timeline_id]); let _ = WAIT_LSN_TIME.remove_label_values(&[tenant_id, timeline_id]); let _ = RESIDENT_PHYSICAL_SIZE.remove_label_values(&[tenant_id, timeline_id]); diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index d642090996..855896c832 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -47,7 +47,10 @@ use crate::tenant::{ use crate::config::PageServerConf; use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceRandomAccum}; -use crate::metrics::{TimelineMetrics, UNEXPECTED_ONDEMAND_DOWNLOADS}; +use crate::metrics::{ + TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT, + RECONSTRUCT_TIME, UNEXPECTED_ONDEMAND_DOWNLOADS, +}; use crate::pgdatadir_mapping::LsnForTimestamp; use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key}; use crate::pgdatadir_mapping::{BlockNumber, CalculateLogicalSizeError}; @@ -539,9 +542,7 @@ impl Timeline { match cached_lsn.cmp(&lsn) { Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check Ordering::Equal => { - self.metrics - .materialized_page_cache_hit_upon_request_counter - .inc(); + MATERIALIZED_PAGE_CACHE_HIT_DIRECT.inc(); return Ok(cached_img); // exact LSN match, return the image } Ordering::Greater => { @@ -563,8 +564,7 @@ impl Timeline { .await?; timer.stop_and_record(); - self.metrics - .reconstruct_time_histo + RECONSTRUCT_TIME .observe_closure_duration(|| self.reconstruct_value(key, lsn, reconstruct_state)) } @@ -2387,7 +2387,7 @@ impl Timeline { ValueReconstructResult::Continue => { // If we reached an earlier cached page image, we're done. if cont_lsn == cached_lsn + 1 { - self.metrics.materialized_page_cache_hit_counter.inc_by(1); + MATERIALIZED_PAGE_CACHE_HIT.inc_by(1); return Ok(()); } if prev_lsn <= cont_lsn { diff --git a/test_runner/fixtures/metrics.py b/test_runner/fixtures/metrics.py index b4c237cfa6..d55d159037 100644 --- a/test_runner/fixtures/metrics.py +++ b/test_runner/fixtures/metrics.py @@ -57,14 +57,16 @@ PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = ( "libmetrics_launch_timestamp", "libmetrics_build_info", "libmetrics_tracing_event_count_total", + "pageserver_materialized_cache_hits_total", + "pageserver_materialized_cache_hits_direct_total", + "pageserver_getpage_reconstruct_seconds_bucket", + "pageserver_getpage_reconstruct_seconds_count", + "pageserver_getpage_reconstruct_seconds_sum", ) PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = ( "pageserver_current_logical_size", "pageserver_resident_physical_size", - "pageserver_getpage_reconstruct_seconds_bucket", - "pageserver_getpage_reconstruct_seconds_count", - "pageserver_getpage_reconstruct_seconds_sum", "pageserver_getpage_get_reconstruct_data_seconds_bucket", "pageserver_getpage_get_reconstruct_data_seconds_count", "pageserver_getpage_get_reconstruct_data_seconds_sum", @@ -73,8 +75,6 @@ PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = ( "pageserver_io_operations_seconds_count", "pageserver_io_operations_seconds_sum", "pageserver_last_record_lsn", - "pageserver_materialized_cache_hits_total", - "pageserver_materialized_cache_hits_direct_total", "pageserver_read_num_fs_layers_bucket", "pageserver_read_num_fs_layers_count", "pageserver_read_num_fs_layers_sum", diff --git a/test_runner/regress/test_tenants.py b/test_runner/regress/test_tenants.py index aef2df4932..4a1d659be3 100644 --- a/test_runner/regress/test_tenants.py +++ b/test_runner/regress/test_tenants.py @@ -213,7 +213,7 @@ def test_metrics_normal_work(neon_env_builder: NeonEnvBuilder): # Test (a subset of) pageserver global metrics for metric in PAGESERVER_GLOBAL_METRICS: ps_samples = ps_metrics.query_all(metric, {}) - assert len(ps_samples) > 0 + assert len(ps_samples) > 0, f"expected at least one sample for {metric}" for sample in ps_samples: labels = ",".join([f'{key}="{value}"' for key, value in sample.labels.items()]) log.info(f"{sample.name}{{{labels}}} {sample.value}") From e60b70b4759406283eebf4d6f16c458512b2b63f Mon Sep 17 00:00:00 2001 From: Alexander Bayandin Date: Thu, 15 Jun 2023 13:01:06 +0100 Subject: [PATCH 11/21] Fix data ingestion scripts (#4515) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Problem When I switched `psycopg2.connect` from context manager to a regular function call in https://github.com/neondatabase/neon/pull/4382 I embarrassingly forgot about commit, so it doesn't really put data into DB 😞 ## Summary of changes - Enable autocommit for data ingestion scripts --- scripts/ingest_perf_test_result.py | 4 +++- scripts/ingest_regress_test_result.py | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/scripts/ingest_perf_test_result.py b/scripts/ingest_perf_test_result.py index fc177b590e..35a1e29720 100644 --- a/scripts/ingest_perf_test_result.py +++ b/scripts/ingest_perf_test_result.py @@ -40,7 +40,9 @@ def get_connection_cursor(): @backoff.on_exception(backoff.expo, psycopg2.OperationalError, max_time=150) def connect(connstr): - return psycopg2.connect(connstr, connect_timeout=30) + conn = psycopg2.connect(connstr, connect_timeout=30) + conn.autocommit = True + return conn conn = connect(connstr) try: diff --git a/scripts/ingest_regress_test_result.py b/scripts/ingest_regress_test_result.py index dff8e0cefa..39c1c02941 100644 --- a/scripts/ingest_regress_test_result.py +++ b/scripts/ingest_regress_test_result.py @@ -34,7 +34,9 @@ def get_connection_cursor(): @backoff.on_exception(backoff.expo, psycopg2.OperationalError, max_time=150) def connect(connstr): - return psycopg2.connect(connstr, connect_timeout=30) + conn = psycopg2.connect(connstr, connect_timeout=30) + conn.autocommit = True + return conn conn = connect(connstr) try: From 76413a0fb8df249a3ea7ae82f2766c50ea6e980b Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Thu, 15 Jun 2023 15:26:59 +0300 Subject: [PATCH 12/21] Revert reconnect_timeout to improve performance (#4512) Default value for `wal_acceptor_reconnect_timeout` was changed in https://github.com/neondatabase/neon/pull/4428 and it affected performance up to 20% in some cases. Revert the value back. --- pgxn/neon/walproposer.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index 64d980d2e4..8d82de6dc4 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -257,7 +257,7 @@ nwp_register_gucs(void) "Walproposer reconnects to offline safekeepers once in this interval.", NULL, &wal_acceptor_reconnect_timeout, - 5000, 0, INT_MAX, /* default, min, max */ + 1000, 0, INT_MAX, /* default, min, max */ PGC_SIGHUP, /* context */ GUC_UNIT_MS, /* flags */ NULL, NULL, NULL); From 472cc17b7aba4f78bc7a71a2c04d2e7cb8b696d8 Mon Sep 17 00:00:00 2001 From: Dmitry Rodionov Date: Thu, 15 Jun 2023 17:30:12 +0300 Subject: [PATCH 13/21] propagate lock guard to background deletion task (#4495) ## Problem 1. During the rollout we got a panic: "timeline that we were deleting was concurrently removed from 'timelines' map" that was caused by lock guard not being propagated to the background part of the deletion. Existing test didnt catch it because failpoint that was used for verification was placed earlier prior to background task spawning. 2. When looking at surrounding code one more bug was detected. We removed timeline from the map before deletion is finished, which breaks client retry logic, because it will indicate 404 before actual deletion is completed which can lead to client stopping its retry poll earlier. ## Summary of changes 1. Carry the lock guard over to background deletion. Ensure existing test case fails without applied patch (second deletion becomes stuck without it, which eventually leads to a test failure). 2. Move delete_all call earlier so timeline is removed from the map is the last thing done during deletion. Additionally I've added timeline_id to the `update_gc_info` span, because `debug_assert_current_span_has_tenant_and_timeline_id` in `download_remote_layer` was firing when `update_gc_info` lead to on-demand downloads via `find_lsn_for_timestamp` (caught by @problame). This is not directly related to the PR but fixes possible flakiness. Another smaller set of changes involves deletion wrapper used in python tests. Now there is a simpler wrapper that waits for deletions to complete `timeline_delete_wait_completed`. Most of the test_delete_timeline.py tests make negative tests, i.e., "does ps_http.timeline_delete() fail in this and that scenario". These can be left alone. Other places when we actually do the deletions, we need to use the helper that polls for completion. Discussion https://neondb.slack.com/archives/C03F5SM1N02/p1686668007396639 resolves #4496 --------- Co-authored-by: Christian Schwarz --- pageserver/src/tenant.rs | 60 +++++--- .../src/tenant/remote_timeline_client.rs | 28 ++-- pageserver/src/tenant/timeline.rs | 1 + test_runner/fixtures/neon_fixtures.py | 2 + test_runner/fixtures/pageserver/http.py | 5 + test_runner/fixtures/pageserver/utils.py | 35 +++-- test_runner/regress/test_compatibility.py | 8 +- test_runner/regress/test_import.py | 8 +- test_runner/regress/test_remote_storage.py | 7 +- test_runner/regress/test_tenant_size.py | 5 +- test_runner/regress/test_tenant_tasks.py | 8 +- test_runner/regress/test_tenants.py | 6 +- test_runner/regress/test_timeline_delete.py | 130 ++++++++---------- test_runner/regress/test_timeline_size.py | 3 +- test_runner/regress/test_wal_acceptor.py | 16 ++- 15 files changed, 184 insertions(+), 138 deletions(-) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 3ed4621112..7fdd047c96 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -473,6 +473,14 @@ pub(crate) enum ShutdownError { AlreadyStopping, } +struct DeletionGuard(OwnedMutexGuard); + +impl DeletionGuard { + fn is_deleted(&self) -> bool { + *self.0 + } +} + impl Tenant { /// Yet another helper for timeline initialization. /// Contains the common part of `load_local_timeline` and `load_remote_timeline`. @@ -1138,7 +1146,11 @@ impl Tenant { ) .context("create_timeline_struct")?; - let guard = Arc::clone(&timeline.delete_lock).lock_owned().await; + let guard = DeletionGuard( + Arc::clone(&timeline.delete_lock) + .try_lock_owned() + .expect("cannot happen because we're the only owner"), + ); // Note: here we even skip populating layer map. Timeline is essentially uninitialized. // RemoteTimelineClient is the only functioning part. @@ -1549,6 +1561,7 @@ impl Tenant { &self, timeline_id: TimelineId, timeline: Arc, + guard: DeletionGuard, ) -> anyhow::Result<()> { { // Grab the layer_removal_cs lock, and actually perform the deletion. @@ -1621,6 +1634,25 @@ impl Tenant { Err(anyhow::anyhow!("failpoint: timeline-delete-after-rm"))? }); + if let Some(remote_client) = &timeline.remote_client { + remote_client.delete_all().await.context("delete_all")? + }; + + // Have a failpoint that can use the `pause` failpoint action. + // We don't want to block the executor thread, hence, spawn_blocking + await. + if cfg!(feature = "testing") { + tokio::task::spawn_blocking({ + let current = tracing::Span::current(); + move || { + let _entered = current.entered(); + tracing::info!("at failpoint in_progress_delete"); + fail::fail_point!("in_progress_delete"); + } + }) + .await + .expect("spawn_blocking"); + } + { // Remove the timeline from the map. let mut timelines = self.timelines.lock().unwrap(); @@ -1641,12 +1673,7 @@ impl Tenant { drop(timelines); } - let remote_client = match &timeline.remote_client { - Some(remote_client) => remote_client, - None => return Ok(()), - }; - - remote_client.delete_all().await?; + drop(guard); Ok(()) } @@ -1694,23 +1721,18 @@ impl Tenant { timeline = Arc::clone(timeline_entry.get()); // Prevent two tasks from trying to delete the timeline at the same time. - // - // XXX: We should perhaps return an HTTP "202 Accepted" to signal that the caller - // needs to poll until the operation has finished. But for now, we return an - // error, because the control plane knows to retry errors. - delete_lock_guard = - Arc::clone(&timeline.delete_lock) - .try_lock_owned() - .map_err(|_| { + DeletionGuard(Arc::clone(&timeline.delete_lock).try_lock_owned().map_err( + |_| { DeleteTimelineError::Other(anyhow::anyhow!( "timeline deletion is already in progress" )) - })?; + }, + )?); // If another task finished the deletion just before we acquired the lock, // return success. - if *delete_lock_guard { + if delete_lock_guard.is_deleted() { return Ok(()); } @@ -1784,7 +1806,7 @@ impl Tenant { self: Arc, timeline_id: TimelineId, timeline: Arc, - _guard: OwnedMutexGuard, + guard: DeletionGuard, ) { let tenant_id = self.tenant_id; let timeline_clone = Arc::clone(&timeline); @@ -1797,7 +1819,7 @@ impl Tenant { "timeline_delete", false, async move { - if let Err(err) = self.delete_timeline(timeline_id, timeline).await { + if let Err(err) = self.delete_timeline(timeline_id, timeline, guard).await { error!("Error: {err:#}"); timeline_clone.set_broken(err.to_string()) }; diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 2c84c59dcb..8db2bc4eb2 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -753,22 +753,18 @@ impl RemoteTimelineClient { // Have a failpoint that can use the `pause` failpoint action. // We don't want to block the executor thread, hence, spawn_blocking + await. - #[cfg(feature = "testing")] - tokio::task::spawn_blocking({ - let current = tracing::Span::current(); - move || { - let _entered = current.entered(); - tracing::info!( - "at failpoint persist_index_part_with_deleted_flag_after_set_before_upload_pause" - ); - fail::fail_point!( - "persist_index_part_with_deleted_flag_after_set_before_upload_pause" - ); - } - }) - .await - .expect("spawn_blocking"); - + if cfg!(feature = "testing") { + tokio::task::spawn_blocking({ + let current = tracing::Span::current(); + move || { + let _entered = current.entered(); + tracing::info!("at failpoint persist_deleted_index_part"); + fail::fail_point!("persist_deleted_index_part"); + } + }) + .await + .expect("spawn_blocking"); + } upload::upload_index_part( self.conf, &self.storage_impl, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 855896c832..d42fdf5e55 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3791,6 +3791,7 @@ impl Timeline { /// for example. The caller should hold `Tenant::gc_cs` lock to ensure /// that. /// + #[instrument(skip_all, fields(timline_id=%self.timeline_id))] pub(super) async fn update_gc_info( &self, retain_lsns: Vec, diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index a8610e24df..64c71d2a59 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1631,6 +1631,8 @@ class NeonPageserver(PgProtocol): r".*ERROR.*ancestor timeline \S+ is being stopped", # this is expected given our collaborative shutdown approach for the UploadQueue ".*Compaction failed, retrying in .*: queue is in state Stopped.*", + # Pageserver timeline deletion should be polled until it gets 404, so ignore it globally + ".*Error processing HTTP request: NotFound: Timeline .* was not found", ] def start( diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index f258a3a24d..5c4f5177d0 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -342,6 +342,11 @@ class PageserverHttpClient(requests.Session): return res_json def timeline_delete(self, tenant_id: TenantId, timeline_id: TimelineId, **kwargs): + """ + Note that deletion is not instant, it is scheduled and performed mostly in the background. + So if you need to wait for it to complete use `timeline_delete_wait_completed`. + For longer description consult with pageserver openapi spec. + """ res = self.delete( f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}", **kwargs ) diff --git a/test_runner/fixtures/pageserver/utils.py b/test_runner/fixtures/pageserver/utils.py index 83880abc77..ad89ebad00 100644 --- a/test_runner/fixtures/pageserver/utils.py +++ b/test_runner/fixtures/pageserver/utils.py @@ -193,19 +193,30 @@ def wait_for_upload_queue_empty( time.sleep(0.2) -def assert_timeline_detail_404( +def wait_timeline_detail_404( + pageserver_http: PageserverHttpClient, tenant_id: TenantId, timeline_id: TimelineId +): + last_exc = None + for _ in range(2): + time.sleep(0.250) + try: + data = pageserver_http.timeline_detail(tenant_id, timeline_id) + log.error(f"detail {data}") + except PageserverApiException as e: + log.debug(e) + if e.status_code == 404: + return + + last_exc = e + + raise last_exc or RuntimeError(f"Timeline wasnt deleted in time, state: {data['state']}") + + +def timeline_delete_wait_completed( pageserver_http: PageserverHttpClient, tenant_id: TenantId, timeline_id: TimelineId, + **delete_args, ): - """Asserts that timeline_detail returns 404, or dumps the detail.""" - try: - data = pageserver_http.timeline_detail(tenant_id, timeline_id) - log.error(f"detail {data}") - except PageserverApiException as e: - log.error(e) - if e.status_code == 404: - return - else: - raise - raise Exception("detail succeeded (it should return 404)") + pageserver_http.timeline_delete(tenant_id=tenant_id, timeline_id=timeline_id, **delete_args) + wait_timeline_detail_404(pageserver_http, tenant_id, timeline_id) diff --git a/test_runner/regress/test_compatibility.py b/test_runner/regress/test_compatibility.py index 2635dbd93c..61f86dc3ce 100644 --- a/test_runner/regress/test_compatibility.py +++ b/test_runner/regress/test_compatibility.py @@ -15,7 +15,11 @@ from fixtures.neon_fixtures import ( PortDistributor, ) from fixtures.pageserver.http import PageserverHttpClient -from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload +from fixtures.pageserver.utils import ( + timeline_delete_wait_completed, + wait_for_last_record_lsn, + wait_for_upload, +) from fixtures.pg_version import PgVersion from fixtures.types import Lsn from pytest import FixtureRequest @@ -417,7 +421,7 @@ def check_neon_works( ) shutil.rmtree(repo_dir / "local_fs_remote_storage") - pageserver_http.timeline_delete(tenant_id, timeline_id) + timeline_delete_wait_completed(pageserver_http, tenant_id, timeline_id) pageserver_http.timeline_create(pg_version, tenant_id, timeline_id) pg_bin.run( ["pg_dumpall", f"--dbname={connstr}", f"--file={test_output_dir / 'dump-from-wal.sql'}"] diff --git a/test_runner/regress/test_import.py b/test_runner/regress/test_import.py index 5c3948b027..141c69b230 100644 --- a/test_runner/regress/test_import.py +++ b/test_runner/regress/test_import.py @@ -14,7 +14,11 @@ from fixtures.neon_fixtures import ( NeonEnvBuilder, PgBin, ) -from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload +from fixtures.pageserver.utils import ( + timeline_delete_wait_completed, + wait_for_last_record_lsn, + wait_for_upload, +) from fixtures.types import Lsn, TenantId, TimelineId from fixtures.utils import subprocess_capture @@ -151,7 +155,7 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build ".*files not bound to index_file.json, proceeding with their deletion.*" ) - client.timeline_delete(tenant, timeline) + timeline_delete_wait_completed(client, tenant, timeline) # Importing correct backup works import_tar(base_tar, wal_tar) diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index 11ac9e2555..f2b954a822 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -20,7 +20,7 @@ from fixtures.neon_fixtures import ( ) from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient from fixtures.pageserver.utils import ( - assert_timeline_detail_404, + timeline_delete_wait_completed, wait_for_last_record_lsn, wait_for_upload, wait_until_tenant_active, @@ -597,14 +597,11 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue( env.pageserver.allowed_errors.append( ".* ERROR .*Error processing HTTP request: InternalServerError\\(timeline is Stopping" ) - client.timeline_delete(tenant_id, timeline_id) - env.pageserver.allowed_errors.append(f".*Timeline {tenant_id}/{timeline_id} was not found.*") env.pageserver.allowed_errors.append( ".*files not bound to index_file.json, proceeding with their deletion.*" ) - - wait_until(2, 0.5, lambda: assert_timeline_detail_404(client, tenant_id, timeline_id)) + timeline_delete_wait_completed(client, tenant_id, timeline_id) assert not timeline_path.exists() diff --git a/test_runner/regress/test_tenant_size.py b/test_runner/regress/test_tenant_size.py index e9dcd1e5cd..a0f9f854ed 100644 --- a/test_runner/regress/test_tenant_size.py +++ b/test_runner/regress/test_tenant_size.py @@ -11,6 +11,7 @@ from fixtures.neon_fixtures import ( wait_for_wal_insert_lsn, ) from fixtures.pageserver.http import PageserverHttpClient +from fixtures.pageserver.utils import timeline_delete_wait_completed from fixtures.pg_version import PgVersion, xfail_on_postgres from fixtures.types import Lsn, TenantId, TimelineId @@ -628,12 +629,12 @@ def test_get_tenant_size_with_multiple_branches( size_debug_file_before.write(size_debug) # teardown, delete branches, and the size should be going down - http_client.timeline_delete(tenant_id, first_branch_timeline_id) + timeline_delete_wait_completed(http_client, tenant_id, first_branch_timeline_id) size_after_deleting_first = http_client.tenant_size(tenant_id) assert size_after_deleting_first < size_after_thinning_branch - http_client.timeline_delete(tenant_id, second_branch_timeline_id) + timeline_delete_wait_completed(http_client, tenant_id, second_branch_timeline_id) size_after_deleting_second = http_client.tenant_size(tenant_id) assert size_after_deleting_second < size_after_deleting_first diff --git a/test_runner/regress/test_tenant_tasks.py b/test_runner/regress/test_tenant_tasks.py index 21e4af4127..75e5c2c91c 100644 --- a/test_runner/regress/test_tenant_tasks.py +++ b/test_runner/regress/test_tenant_tasks.py @@ -1,6 +1,10 @@ from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnvBuilder -from fixtures.pageserver.utils import assert_tenant_state, wait_until_tenant_active +from fixtures.pageserver.utils import ( + assert_tenant_state, + timeline_delete_wait_completed, + wait_until_tenant_active, +) from fixtures.types import TenantId, TimelineId from fixtures.utils import wait_until @@ -24,7 +28,7 @@ def test_tenant_tasks(neon_env_builder: NeonEnvBuilder): def delete_all_timelines(tenant: TenantId): timelines = [TimelineId(t["timeline_id"]) for t in client.timeline_list(tenant)] for t in timelines: - client.timeline_delete(tenant, t) + timeline_delete_wait_completed(client, tenant, t) # Create tenant, start compute tenant, _ = env.neon_cli.create_tenant() diff --git a/test_runner/regress/test_tenants.py b/test_runner/regress/test_tenants.py index 4a1d659be3..4dbfa8bc1f 100644 --- a/test_runner/regress/test_tenants.py +++ b/test_runner/regress/test_tenants.py @@ -21,6 +21,7 @@ from fixtures.neon_fixtures import ( RemoteStorageKind, available_remote_storages, ) +from fixtures.pageserver.utils import timeline_delete_wait_completed from fixtures.types import Lsn, TenantId, TimelineId from fixtures.utils import wait_until from prometheus_client.samples import Sample @@ -318,9 +319,10 @@ def test_pageserver_with_empty_tenants( client.tenant_create(tenant_with_empty_timelines) temp_timelines = client.timeline_list(tenant_with_empty_timelines) for temp_timeline in temp_timelines: - client.timeline_delete( - tenant_with_empty_timelines, TimelineId(temp_timeline["timeline_id"]) + timeline_delete_wait_completed( + client, tenant_with_empty_timelines, TimelineId(temp_timeline["timeline_id"]) ) + files_in_timelines_dir = sum( 1 for _p in Path.iterdir( diff --git a/test_runner/regress/test_timeline_delete.py b/test_runner/regress/test_timeline_delete.py index 28b15d03ca..ddd9ffd755 100644 --- a/test_runner/regress/test_timeline_delete.py +++ b/test_runner/regress/test_timeline_delete.py @@ -17,9 +17,10 @@ from fixtures.neon_fixtures import ( ) from fixtures.pageserver.http import PageserverApiException from fixtures.pageserver.utils import ( - assert_timeline_detail_404, + timeline_delete_wait_completed, wait_for_last_record_lsn, wait_for_upload, + wait_timeline_detail_404, wait_until_tenant_active, wait_until_timeline_state, ) @@ -83,7 +84,7 @@ def test_timeline_delete(neon_simple_env: NeonEnv): wait_until( number_of_iterations=3, interval=0.2, - func=lambda: ps_http.timeline_delete(env.initial_tenant, leaf_timeline_id), + func=lambda: timeline_delete_wait_completed(ps_http, env.initial_tenant, leaf_timeline_id), ) assert not timeline_path.exists() @@ -94,16 +95,16 @@ def test_timeline_delete(neon_simple_env: NeonEnv): match=f"Timeline {env.initial_tenant}/{leaf_timeline_id} was not found", ) as exc: ps_http.timeline_detail(env.initial_tenant, leaf_timeline_id) - - # FIXME leaves tenant without timelines, should we prevent deletion of root timeline? - wait_until( - number_of_iterations=3, - interval=0.2, - func=lambda: ps_http.timeline_delete(env.initial_tenant, parent_timeline_id), - ) - assert exc.value.status_code == 404 + wait_until( + number_of_iterations=3, + interval=0.2, + func=lambda: timeline_delete_wait_completed( + ps_http, env.initial_tenant, parent_timeline_id + ), + ) + # Check that we didn't pick up the timeline again after restart. # See https://github.com/neondatabase/neon/issues/3560 env.pageserver.stop(immediate=True) @@ -143,7 +144,6 @@ def test_delete_timeline_post_rm_failure( ps_http.configure_failpoints((failpoint_name, "return")) ps_http.timeline_delete(env.initial_tenant, env.initial_timeline) - timeline_info = wait_until_timeline_state( pageserver_http=ps_http, tenant_id=env.initial_tenant, @@ -165,13 +165,7 @@ def test_delete_timeline_post_rm_failure( # this should succeed # this also checks that delete can be retried even when timeline is in Broken state - ps_http.timeline_delete(env.initial_tenant, env.initial_timeline, timeout=2) - with pytest.raises(PageserverApiException) as e: - ps_http.timeline_detail(env.initial_tenant, env.initial_timeline) - - assert e.value.status_code == 404 - - env.pageserver.allowed_errors.append(f".*NotFound: Timeline.*{env.initial_timeline}.*") + timeline_delete_wait_completed(ps_http, env.initial_tenant, env.initial_timeline) env.pageserver.allowed_errors.append( f".*{env.initial_timeline}.*timeline directory not found, proceeding anyway.*" ) @@ -247,13 +241,7 @@ def test_timeline_resurrection_on_attach( pass # delete new timeline - ps_http.timeline_delete(tenant_id=tenant_id, timeline_id=branch_timeline_id) - - env.pageserver.allowed_errors.append( - f".*Timeline {tenant_id}/{branch_timeline_id} was not found.*" - ) - - wait_until(2, 0.5, lambda: assert_timeline_detail_404(ps_http, tenant_id, branch_timeline_id)) + timeline_delete_wait_completed(ps_http, tenant_id=tenant_id, timeline_id=branch_timeline_id) ##### Stop the pageserver instance, erase all its data env.endpoints.stop_all() @@ -338,7 +326,6 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild ) ps_http.timeline_delete(env.initial_tenant, leaf_timeline_id) - timeline_info = wait_until_timeline_state( pageserver_http=ps_http, tenant_id=env.initial_tenant, @@ -357,12 +344,15 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild # Wait for tenant to finish loading. wait_until_tenant_active(ps_http, tenant_id=env.initial_tenant, iterations=10, period=1) - env.pageserver.allowed_errors.append( - f".*Timeline {env.initial_tenant}/{leaf_timeline_id} was not found.*" - ) - wait_until( - 2, 0.5, lambda: assert_timeline_detail_404(ps_http, env.initial_tenant, leaf_timeline_id) - ) + try: + data = ps_http.timeline_detail(env.initial_tenant, leaf_timeline_id) + log.debug(f"detail {data}") + except PageserverApiException as e: + log.debug(e) + if e.status_code != 404: + raise + else: + raise Exception("detail succeeded (it should return 404)") assert ( not leaf_timeline_path.exists() @@ -389,13 +379,8 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild assert env.initial_timeline is not None for timeline_id in (intermediate_timeline_id, env.initial_timeline): - ps_http.timeline_delete(env.initial_tenant, timeline_id) - - env.pageserver.allowed_errors.append( - f".*Timeline {env.initial_tenant}/{timeline_id} was not found.*" - ) - wait_until( - 2, 0.5, lambda: assert_timeline_detail_404(ps_http, env.initial_tenant, timeline_id) + timeline_delete_wait_completed( + ps_http, tenant_id=env.initial_tenant, timeline_id=timeline_id ) assert_prefix_empty( @@ -419,23 +404,27 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild ) -def test_concurrent_timeline_delete_if_first_stuck_at_index_upload( - neon_env_builder: NeonEnvBuilder, +@pytest.mark.parametrize( + "stuck_failpoint", + ["persist_deleted_index_part", "in_progress_delete"], +) +def test_concurrent_timeline_delete_stuck_on( + neon_env_builder: NeonEnvBuilder, stuck_failpoint: str ): """ - If we're stuck uploading the index file with the is_delete flag, - eventually console will hand up and retry. - If we're still stuck at the retry time, ensure that the retry - fails with status 500, signalling to console that it should retry - later. - Ideally, timeline_delete should return 202 Accepted and require - console to poll for completion, but, that would require changing - the API contract. + If delete is stuck console will eventually retry deletion. + So we need to be sure that these requests wont interleave with each other. + In this tests we check two places where we can spend a lot of time. + This is a regression test because there was a bug when DeletionGuard wasnt propagated + to the background task. + + Ensure that when retry comes if we're still stuck request will get an immediate error response, + signalling to console that it should retry later. """ neon_env_builder.enable_remote_storage( remote_storage_kind=RemoteStorageKind.MOCK_S3, - test_name="test_concurrent_timeline_delete_if_first_stuck_at_index_upload", + test_name=f"concurrent_timeline_delete_stuck_on_{stuck_failpoint}", ) env = neon_env_builder.init_start() @@ -445,13 +434,14 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload( ps_http = env.pageserver.http_client() # make the first call sleep practically forever - failpoint_name = "persist_index_part_with_deleted_flag_after_set_before_upload_pause" - ps_http.configure_failpoints((failpoint_name, "pause")) + ps_http.configure_failpoints((stuck_failpoint, "pause")) def first_call(result_queue): try: log.info("first call start") - ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=10) + timeline_delete_wait_completed( + ps_http, env.initial_tenant, child_timeline_id, timeout=10 + ) log.info("first call success") result_queue.put("success") except Exception: @@ -466,7 +456,7 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload( def first_call_hit_failpoint(): assert env.pageserver.log_contains( - f".*{child_timeline_id}.*at failpoint {failpoint_name}" + f".*{child_timeline_id}.*at failpoint {stuck_failpoint}" ) wait_until(50, 0.1, first_call_hit_failpoint) @@ -484,8 +474,12 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload( ) log.info("second call failed as expected") + # ensure it is not 404 and stopping + detail = ps_http.timeline_detail(env.initial_tenant, child_timeline_id) + assert detail["state"] == "Stopping" + # by now we know that the second call failed, let's ensure the first call will finish - ps_http.configure_failpoints((failpoint_name, "off")) + ps_http.configure_failpoints((stuck_failpoint, "off")) result = first_call_result.get() assert result == "success" @@ -498,8 +492,10 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload( def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder): """ - If the client hangs up before we start the index part upload but after we mark it + If the client hangs up before we start the index part upload but after deletion is scheduled + we mark it deleted in local memory, a subsequent delete_timeline call should be able to do + another delete timeline operation. This tests cancel safety up to the given failpoint. @@ -515,12 +511,18 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder): ps_http = env.pageserver.http_client() - failpoint_name = "persist_index_part_with_deleted_flag_after_set_before_upload_pause" + failpoint_name = "persist_deleted_index_part" ps_http.configure_failpoints((failpoint_name, "pause")) with pytest.raises(requests.exceptions.Timeout): ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=2) + env.pageserver.allowed_errors.append( + f".*{child_timeline_id}.*timeline deletion is already in progress.*" + ) + with pytest.raises(PageserverApiException, match="timeline deletion is already in progress"): + ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=2) + # make sure the timeout was due to the failpoint at_failpoint_log_message = f".*{child_timeline_id}.*at failpoint {failpoint_name}.*" @@ -552,12 +554,7 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder): wait_until(50, 0.1, first_request_finished) # check that the timeline is gone - notfound_message = f"Timeline {env.initial_tenant}/{child_timeline_id} was not found" - env.pageserver.allowed_errors.append(".*" + notfound_message) - with pytest.raises(PageserverApiException, match=notfound_message) as exc: - ps_http.timeline_detail(env.initial_tenant, child_timeline_id) - - assert exc.value.status_code == 404 + wait_timeline_detail_404(ps_http, env.initial_tenant, child_timeline_id) @pytest.mark.parametrize( @@ -616,12 +613,7 @@ def test_timeline_delete_works_for_remote_smoke( for timeline_id in reversed(timeline_ids): # note that we need to finish previous deletion before scheduling next one # otherwise we can get an "HasChildren" error if deletion is not fast enough (real_s3) - ps_http.timeline_delete(tenant_id=tenant_id, timeline_id=timeline_id) - - env.pageserver.allowed_errors.append( - f".*Timeline {env.initial_tenant}/{timeline_id} was not found.*" - ) - wait_until(2, 0.5, lambda: assert_timeline_detail_404(ps_http, tenant_id, timeline_id)) + timeline_delete_wait_completed(ps_http, tenant_id=tenant_id, timeline_id=timeline_id) assert_prefix_empty( neon_env_builder, diff --git a/test_runner/regress/test_timeline_size.py b/test_runner/regress/test_timeline_size.py index 1460172afe..5bdbc18927 100644 --- a/test_runner/regress/test_timeline_size.py +++ b/test_runner/regress/test_timeline_size.py @@ -24,6 +24,7 @@ from fixtures.neon_fixtures import ( from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient from fixtures.pageserver.utils import ( assert_tenant_state, + timeline_delete_wait_completed, wait_for_upload_queue_empty, wait_until_tenant_active, ) @@ -272,7 +273,7 @@ def test_timeline_initial_logical_size_calculation_cancellation( if deletion_method == "tenant_detach": client.tenant_detach(tenant_id) elif deletion_method == "timeline_delete": - client.timeline_delete(tenant_id, timeline_id) + timeline_delete_wait_completed(client, tenant_id, timeline_id) delete_timeline_success.put(True) except PageserverApiException: delete_timeline_success.put(False) diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index 8b595596cb..a837501678 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -31,7 +31,11 @@ from fixtures.neon_fixtures import ( SafekeeperPort, available_remote_storages, ) -from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload +from fixtures.pageserver.utils import ( + timeline_delete_wait_completed, + wait_for_last_record_lsn, + wait_for_upload, +) from fixtures.pg_version import PgVersion from fixtures.types import Lsn, TenantId, TimelineId from fixtures.utils import get_dir_size, query_scalar, start_in_background @@ -548,15 +552,15 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re f"sk_id={sk.id} to flush {last_lsn}", ) - ps_cli = env.pageserver.http_client() - pageserver_lsn = Lsn(ps_cli.timeline_detail(tenant_id, timeline_id)["last_record_lsn"]) + ps_http = env.pageserver.http_client() + pageserver_lsn = Lsn(ps_http.timeline_detail(tenant_id, timeline_id)["last_record_lsn"]) lag = last_lsn - pageserver_lsn log.info( f"Pageserver last_record_lsn={pageserver_lsn}; flush_lsn={last_lsn}; lag before replay is {lag / 1024}kb" ) endpoint.stop_and_destroy() - ps_cli.timeline_delete(tenant_id, timeline_id) + timeline_delete_wait_completed(ps_http, tenant_id, timeline_id) # Also delete and manually create timeline on safekeepers -- this tests # scenario of manual recovery on different set of safekeepers. @@ -583,7 +587,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re shutil.copy(f_partial_saved, f_partial_path) # recreate timeline on pageserver from scratch - ps_cli.timeline_create( + ps_http.timeline_create( pg_version=PgVersion(pg_version), tenant_id=tenant_id, new_timeline_id=timeline_id, @@ -598,7 +602,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re if elapsed > wait_lsn_timeout: raise RuntimeError("Timed out waiting for WAL redo") - tenant_status = ps_cli.tenant_status(tenant_id) + tenant_status = ps_http.tenant_status(tenant_id) if tenant_status["state"]["slug"] == "Loading": log.debug(f"Tenant {tenant_id} is still loading, retrying") else: From 14d495ae141fbde57146e8e5afad696565e1cbbf Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 16 Jun 2023 13:23:55 +0200 Subject: [PATCH 14/21] create_delta_layer: improve misleading TODO comment (#4488) Context: https://github.com/neondatabase/neon/pull/4441#discussion_r1228086608 --- pageserver/src/tenant/timeline.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index d42fdf5e55..13705d8b85 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3015,16 +3015,19 @@ impl Timeline { // Sync it to disk. // // We must also fsync the timeline dir to ensure the directory entries for - // new layer files are durable + // new layer files are durable. + // + // NB: timeline dir must be synced _after_ the file contents are durable. + // So, two separate fsyncs are required, they mustn't be batched. // // TODO: If we're running inside 'flush_frozen_layers' and there are multiple - // files to flush, it might be better to first write them all, and then fsync - // them all in parallel. - - // First sync the delta layer. We still use par_fsync here to keep everything consistent. Feel free to replace - // this with a single fsync in future refactors. + // files to flush, the fsync overhead can be reduces as follows: + // 1. write them all to temporary file names + // 2. fsync them + // 3. rename to the final name + // 4. fsync the parent directory. + // Note that (1),(2),(3) today happen inside write_to_disk(). par_fsync::par_fsync(&[new_delta_path.clone()]).context("fsync of delta layer")?; - // Then sync the parent directory. par_fsync::par_fsync(&[self_clone .conf .timeline_path(&self_clone.timeline_id, &self_clone.tenant_id)]) From 190c3ba6109710f53f70d36c1cdae5891b007f7a Mon Sep 17 00:00:00 2001 From: Alexander Bayandin Date: Fri, 16 Jun 2023 14:17:37 +0100 Subject: [PATCH 15/21] Add tags for releases (#4524) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Problem It's not a trivial task to find corresponding changes for a particular release (for example, for 3371 — 🤷) Ref: https://neondb.slack.com/archives/C04BLQ4LW7K/p1686761537607649?thread_ts=1686736854.174559&cid=C04BLQ4LW7K ## Summary of changes - Tag releases - Add a manual trigger for the release workflow --- .github/workflows/build_and_test.yml | 14 ++++++++++++++ .github/workflows/release.yml | 1 + 2 files changed, 15 insertions(+) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 471dc68df9..5f82ab7aca 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -914,6 +914,20 @@ jobs: exit 1 fi + - name: Create tag "release-${{ needs.tag.outputs.build-tag }}" + if: github.ref_name == 'release' + uses: actions/github-script@v6 + with: + # Retry script for 5XX server errors: https://github.com/actions/github-script#retries + retries: 5 + script: | + github.rest.git.createRef({ + owner: context.repo.owner, + repo: context.repo.repo, + ref: "refs/tags/release-${{ needs.tag.outputs.build-tag }}", + sha: context.sha, + }) + promote-compatibility-data: runs-on: [ self-hosted, gen3, small ] container: diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 4bce9cdd1e..595ee05514 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -3,6 +3,7 @@ name: Create Release Branch on: schedule: - cron: '0 10 * * 2' + workflow_dispatch: jobs: create_release_branch: From 78082d0b9fdfdb8a4c328ad1b2082f840dc0968f Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 16 Jun 2023 16:54:41 +0200 Subject: [PATCH 16/21] create_delta_layer: avoid needless `stat` (#4489) We already do it inside `frozen_layer.write_to_disk()`. Context: https://github.com/neondatabase/neon/pull/4441#discussion_r1228083959 --- pageserver/src/tenant/timeline.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 13705d8b85..ef7474cb8b 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3003,7 +3003,7 @@ impl Timeline { frozen_layer: &Arc, ) -> anyhow::Result<(LayerFileName, LayerFileMetadata)> { let span = tracing::info_span!("blocking"); - let (new_delta, sz): (DeltaLayer, _) = tokio::task::spawn_blocking({ + let new_delta: DeltaLayer = tokio::task::spawn_blocking({ let _g = span.entered(); let self_clone = Arc::clone(self); let frozen_layer = Arc::clone(frozen_layer); @@ -3027,20 +3027,19 @@ impl Timeline { // 3. rename to the final name // 4. fsync the parent directory. // Note that (1),(2),(3) today happen inside write_to_disk(). - par_fsync::par_fsync(&[new_delta_path.clone()]).context("fsync of delta layer")?; + par_fsync::par_fsync(&[new_delta_path]).context("fsync of delta layer")?; par_fsync::par_fsync(&[self_clone .conf .timeline_path(&self_clone.timeline_id, &self_clone.tenant_id)]) .context("fsync of timeline dir")?; - let sz = new_delta_path.metadata()?.len(); - - anyhow::Ok((new_delta, sz)) + anyhow::Ok(new_delta) } }) .await .context("spawn_blocking")??; let new_delta_name = new_delta.filename(); + let sz = new_delta.desc.file_size; // Add it to the layer map let l = Arc::new(new_delta); @@ -3054,9 +3053,8 @@ impl Timeline { batch_updates.insert_historic(l.layer_desc().clone(), l); batch_updates.flush(); - // update the timeline's physical size - self.metrics.resident_physical_size_gauge.add(sz); // update metrics + self.metrics.resident_physical_size_gauge.add(sz); self.metrics.num_persistent_files_created.inc_by(1); self.metrics.persistent_bytes_written.inc_by(sz); From 1b947fc8aff5b3cad6f8a057372860491ec58ab6 Mon Sep 17 00:00:00 2001 From: Alexander Bayandin Date: Fri, 16 Jun 2023 18:08:11 +0100 Subject: [PATCH 17/21] test_runner: workaround rerunfailures and timeout incompatibility (#4469) ## Problem `pytest-timeout` and `pytest-rerunfailures` are incompatible (or rather not fully compatible). Timeouts aren't set for reruns. Ref https://github.com/pytest-dev/pytest-rerunfailures/issues/99 ## Summary of changes - Dynamically make timeouts `func_only` for tests that we're going to retry. It applies timeouts for reruns as well. --- test_runner/fixtures/flaky.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/test_runner/fixtures/flaky.py b/test_runner/fixtures/flaky.py index 9d7f8ead9a..d13f3318b0 100644 --- a/test_runner/fixtures/flaky.py +++ b/test_runner/fixtures/flaky.py @@ -1,6 +1,6 @@ import json from pathlib import Path -from typing import List +from typing import Any, List, MutableMapping, cast import pytest from _pytest.config import Config @@ -56,3 +56,15 @@ def pytest_collection_modifyitems(config: Config, items: List[pytest.Item]): # Rerun 3 times = 1 original run + 2 reruns log.info(f"Marking {item.nodeid} as flaky. It will be rerun up to 3 times") item.add_marker(pytest.mark.flaky(reruns=2)) + + # pytest-rerunfailures is not compatible with pytest-timeout (timeout is not set for reruns), + # we can workaround it by setting `timeout_func_only` to True[1]. + # Unfortunately, setting `timeout_func_only = True` globally in pytest.ini is broken[2], + # but we still can do it using pytest marker. + # + # - [1] https://github.com/pytest-dev/pytest-rerunfailures/issues/99 + # - [2] https://github.com/pytest-dev/pytest-timeout/issues/142 + timeout_marker = item.get_closest_marker("timeout") + if timeout_marker is not None: + kwargs = cast(MutableMapping[str, Any], timeout_marker.kwargs) + kwargs["func_only"] = True From 3b06a5bc54a2ef7b5ec8f3fee24556547310586f Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Mon, 19 Jun 2023 14:04:16 +0400 Subject: [PATCH 18/21] Raise pageserver walreceiver timeouts. I observe sporadic reconnections with ~10k idle computes. It looks like a separate issue, probably walreceiver runtime gets blocked somewhere, but in any case 2-3 seconds is too small. --- pageserver/src/tenant/config.rs | 4 ++-- test_runner/regress/test_wal_receiver.py | 9 ++++++++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/pageserver/src/tenant/config.rs b/pageserver/src/tenant/config.rs index 80d153661a..ffe2c5eab6 100644 --- a/pageserver/src/tenant/config.rs +++ b/pageserver/src/tenant/config.rs @@ -38,8 +38,8 @@ pub mod defaults { pub const DEFAULT_GC_PERIOD: &str = "1 hr"; pub const DEFAULT_IMAGE_CREATION_THRESHOLD: usize = 3; pub const DEFAULT_PITR_INTERVAL: &str = "7 days"; - pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "2 seconds"; - pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "3 seconds"; + pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "10 seconds"; + pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "10 seconds"; pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 10 * 1024 * 1024; pub const DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD: &str = "24 hour"; } diff --git a/test_runner/regress/test_wal_receiver.py b/test_runner/regress/test_wal_receiver.py index 515d47c079..7ac6e6332c 100644 --- a/test_runner/regress/test_wal_receiver.py +++ b/test_runner/regress/test_wal_receiver.py @@ -1,3 +1,5 @@ +import time + from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder from fixtures.types import Lsn, TenantId @@ -40,7 +42,10 @@ def test_pageserver_lsn_wait_error_start(neon_env_builder: NeonEnvBuilder): # Kills one of the safekeepers and ensures that only the active ones are printed in the state. def test_pageserver_lsn_wait_error_safekeeper_stop(neon_env_builder: NeonEnvBuilder): # Trigger WAL wait timeout faster - neon_env_builder.pageserver_config_override = "wait_lsn_timeout = '1s'" + neon_env_builder.pageserver_config_override = """ + wait_lsn_timeout = "1s" + tenant_config={walreceiver_connect_timeout = "2s", lagging_wal_timeout = "2s"} + """ # Have notable SK ids to ensure we check logs for their presence, not some other random numbers neon_env_builder.safekeepers_id_start = 12345 neon_env_builder.num_safekeepers = 3 @@ -70,6 +75,8 @@ def test_pageserver_lsn_wait_error_safekeeper_stop(neon_env_builder: NeonEnvBuil stopped_safekeeper_id = stopped_safekeeper.id log.info(f"Stopping safekeeper {stopped_safekeeper.id}") stopped_safekeeper.stop() + # sleep until stopped safekeeper is removed from candidates + time.sleep(2) # Spend some more time inserting, to ensure SKs report updated statuses and walreceiver in PS have time to update its connection stats. insert_test_elements(env, tenant_id, start=elements_to_insert + 1, count=elements_to_insert) From 557abc18f36506c001c62c22ffa13ee11610d6c7 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Mon, 19 Jun 2023 13:27:14 +0400 Subject: [PATCH 19/21] Fix test_s3_wal_replay assertion flakiness. Supposedly fixes https://github.com/neondatabase/neon/issues/4277 --- test_runner/regress/test_wal_acceptor.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index a837501678..994858edf7 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -575,11 +575,21 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re pg_version = sk.http_client().timeline_status(tenant_id, timeline_id).pg_version + # Terminate first all safekeepers to prevent communication unexpectantly + # advancing peer_horizon_lsn. for sk in env.safekeepers: cli = sk.http_client() cli.timeline_delete_force(tenant_id, timeline_id) # restart safekeeper to clear its in-memory state - sk.stop().start() + sk.stop() + # wait all potenital in flight pushes to broker arrive before starting + # safekeepers (even without sleep, it is very unlikely they are not + # delivered yet). + time.sleep(1) + + for sk in env.safekeepers: + sk.start() + cli = sk.http_client() cli.timeline_create(tenant_id, timeline_id, pg_version, last_lsn) f_partial_path = ( Path(sk.data_dir()) / str(tenant_id) / str(timeline_id) / f_partial_saved.name From 036fda392ff9e864aeb2d5a9528d85a8c388d590 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 19 Jun 2023 16:25:57 +0200 Subject: [PATCH 20/21] log timings for compact_level0_phase1 (#4527) The data will help decide whether it's ok to keep holding Timeline::layers in shared mode until after we've calculated the holes. Other timings are to understand the general breakdown of timings in that function. Context: https://github.com/neondatabase/neon/issues/4492 --- pageserver/src/tenant/timeline.rs | 161 ++++++++++++++++++++++++++++++ 1 file changed, 161 insertions(+) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index ef7474cb8b..de786da322 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -15,6 +15,7 @@ use pageserver_api::models::{ TimelineState, }; use remote_storage::GenericRemoteStorage; +use serde_with::serde_as; use storage_broker::BrokerClientChannel; use tokio::sync::{oneshot, watch, Semaphore, TryAcquireError}; use tokio_util::sync::CancellationToken; @@ -3333,6 +3334,130 @@ impl From for CompactionError { } } +#[serde_as] +#[derive(serde::Serialize)] +struct RecordedDuration(#[serde_as(as = "serde_with::DurationMicroSeconds")] Duration); + +#[derive(Default)] +enum DurationRecorder { + #[default] + NotStarted, + Recorded(RecordedDuration, tokio::time::Instant), +} + +impl DurationRecorder { + pub fn till_now(&self) -> DurationRecorder { + match self { + DurationRecorder::NotStarted => { + panic!("must only call on recorded measurements") + } + DurationRecorder::Recorded(_, ended) => { + let now = tokio::time::Instant::now(); + DurationRecorder::Recorded(RecordedDuration(now - *ended), now) + } + } + } + pub fn into_recorded(self) -> Option { + match self { + DurationRecorder::NotStarted => None, + DurationRecorder::Recorded(recorded, _) => Some(recorded), + } + } +} + +#[derive(Default)] +struct CompactLevel0Phase1StatsBuilder { + version: Option, + tenant_id: Option, + timeline_id: Option, + first_read_lock_acquisition_micros: DurationRecorder, + get_level0_deltas_plus_drop_lock_micros: DurationRecorder, + level0_deltas_count: Option, + time_spent_between_locks: DurationRecorder, + second_read_lock_acquisition_micros: DurationRecorder, + second_read_lock_held_micros: DurationRecorder, + sort_holes_micros: DurationRecorder, + write_layer_files_micros: DurationRecorder, + new_deltas_count: Option, + new_deltas_size: Option, +} + +#[serde_as] +#[derive(serde::Serialize)] +struct CompactLevel0Phase1Stats { + version: u64, + #[serde_as(as = "serde_with::DisplayFromStr")] + tenant_id: TenantId, + #[serde_as(as = "serde_with::DisplayFromStr")] + timeline_id: TimelineId, + first_read_lock_acquisition_micros: RecordedDuration, + get_level0_deltas_plus_drop_lock_micros: RecordedDuration, + level0_deltas_count: usize, + time_spent_between_locks: RecordedDuration, + second_read_lock_acquisition_micros: RecordedDuration, + second_read_lock_held_micros: RecordedDuration, + sort_holes_micros: RecordedDuration, + write_layer_files_micros: RecordedDuration, + new_deltas_count: usize, + new_deltas_size: u64, +} + +impl TryFrom for CompactLevel0Phase1Stats { + type Error = anyhow::Error; + + fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result { + let CompactLevel0Phase1StatsBuilder { + version, + tenant_id, + timeline_id, + first_read_lock_acquisition_micros, + get_level0_deltas_plus_drop_lock_micros, + level0_deltas_count, + time_spent_between_locks, + second_read_lock_acquisition_micros, + second_read_lock_held_micros, + sort_holes_micros, + write_layer_files_micros, + new_deltas_count, + new_deltas_size, + } = value; + Ok(CompactLevel0Phase1Stats { + version: version.ok_or_else(|| anyhow::anyhow!("version not set"))?, + tenant_id: tenant_id.ok_or_else(|| anyhow::anyhow!("tenant_id not set"))?, + timeline_id: timeline_id.ok_or_else(|| anyhow::anyhow!("timeline_id not set"))?, + first_read_lock_acquisition_micros: first_read_lock_acquisition_micros + .into_recorded() + .ok_or_else(|| anyhow::anyhow!("first_read_lock_acquisition_micros not set"))?, + get_level0_deltas_plus_drop_lock_micros: get_level0_deltas_plus_drop_lock_micros + .into_recorded() + .ok_or_else(|| { + anyhow::anyhow!("get_level0_deltas_plus_drop_lock_micros not set") + })?, + level0_deltas_count: level0_deltas_count + .ok_or_else(|| anyhow::anyhow!("level0_deltas_count not set"))?, + time_spent_between_locks: time_spent_between_locks + .into_recorded() + .ok_or_else(|| anyhow::anyhow!("time_spent_between_locks not set"))?, + second_read_lock_acquisition_micros: second_read_lock_acquisition_micros + .into_recorded() + .ok_or_else(|| anyhow::anyhow!("second_read_lock_acquisition_micros not set"))?, + second_read_lock_held_micros: second_read_lock_held_micros + .into_recorded() + .ok_or_else(|| anyhow::anyhow!("second_read_lock_held_micros not set"))?, + sort_holes_micros: sort_holes_micros + .into_recorded() + .ok_or_else(|| anyhow::anyhow!("sort_holes_micros not set"))?, + write_layer_files_micros: write_layer_files_micros + .into_recorded() + .ok_or_else(|| anyhow::anyhow!("write_layer_files_micros not set"))?, + new_deltas_count: new_deltas_count + .ok_or_else(|| anyhow::anyhow!("new_deltas_count not set"))?, + new_deltas_size: new_deltas_size + .ok_or_else(|| anyhow::anyhow!("new_deltas_size not set"))?, + }) + } +} + impl Timeline { /// Level0 files first phase of compaction, explained in the [`compact_inner`] comment. /// @@ -3345,9 +3470,23 @@ impl Timeline { target_file_size: u64, ctx: &RequestContext, ) -> Result { + let mut stats = CompactLevel0Phase1StatsBuilder { + version: Some(1), + tenant_id: Some(self.tenant_id), + timeline_id: Some(self.timeline_id), + ..Default::default() + }; + + let begin = tokio::time::Instant::now(); let layers = self.layers.read().await; + let now = tokio::time::Instant::now(); + stats.first_read_lock_acquisition_micros = + DurationRecorder::Recorded(RecordedDuration(now - begin), now); let mut level0_deltas = layers.get_level0_deltas()?; drop(layers); + stats.level0_deltas_count = Some(level0_deltas.len()); + stats.get_level0_deltas_plus_drop_lock_micros = + stats.first_read_lock_acquisition_micros.till_now(); // Only compact if enough layers have accumulated. let threshold = self.get_compaction_threshold(); @@ -3468,7 +3607,9 @@ impl Timeline { // Determine N largest holes where N is number of compacted layers. let max_holes = deltas_to_compact.len(); let last_record_lsn = self.get_last_record_lsn(); + stats.time_spent_between_locks = stats.get_level0_deltas_plus_drop_lock_micros.till_now(); let layers = self.layers.read().await; // Is'n it better to hold original layers lock till here? + stats.second_read_lock_acquisition_micros = stats.time_spent_between_locks.till_now(); let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128; let min_hole_coverage_size = 3; // TODO: something more flexible? @@ -3502,9 +3643,11 @@ impl Timeline { prev = Some(next_key.next()); } drop(layers); + stats.second_read_lock_held_micros = stats.second_read_lock_acquisition_micros.till_now(); let mut holes = heap.into_vec(); holes.sort_unstable_by_key(|hole| hole.key_range.start); let mut next_hole = 0; // index of next hole in holes vector + stats.sort_holes_micros = stats.second_read_lock_held_micros.till_now(); // Merge the contents of all the input delta layers into a new set // of delta layers, based on the current partitioning. @@ -3664,8 +3807,26 @@ impl Timeline { layer_paths.pop().unwrap(); } + stats.write_layer_files_micros = stats.sort_holes_micros.till_now(); + stats.new_deltas_count = Some(new_layers.len()); + stats.new_deltas_size = Some(new_layers.iter().map(|l| l.desc.file_size).sum()); + drop(all_keys_iter); // So that deltas_to_compact is no longer borrowed + match TryInto::::try_into(stats) + .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string")) + { + Ok(stats_json) => { + info!( + stats_json = stats_json.as_str(), + "compact_level0_phase1 stats available" + ) + } + Err(e) => { + warn!("compact_level0_phase1 stats failed to serialize: {:#}", e); + } + } + Ok(CompactLevel0Phase1Result { new_layers, deltas_to_compact, From 2023e22ed3620b050473b5171c064df5bc1ba7aa Mon Sep 17 00:00:00 2001 From: Alek Westover Date: Mon, 19 Jun 2023 13:14:20 -0400 Subject: [PATCH 21/21] Add `RelationError` error type to pageserver rather than string parsing error messages (#4508) --- pageserver/src/import_datadir.rs | 12 +++--- pageserver/src/pgdatadir_mapping.rs | 58 ++++++++++++++++++----------- pageserver/src/walingest.rs | 7 +++- 3 files changed, 48 insertions(+), 29 deletions(-) diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 9ad0124a80..5bff5337bd 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -148,17 +148,17 @@ async fn import_rel( // because there is no guarantee about the order in which we are processing segments. // ignore "relation already exists" error // - // FIXME: use proper error type for this, instead of parsing the error message. - // Or better yet, keep track of which relations we've already created + // FIXME: Keep track of which relations we've already created? // https://github.com/neondatabase/neon/issues/3309 if let Err(e) = modification .put_rel_creation(rel, nblocks as u32, ctx) .await { - if e.to_string().contains("already exists") { - debug!("relation {} already exists. we must be extending it", rel); - } else { - return Err(e); + match e { + RelationError::AlreadyExists => { + debug!("Relation {} already exist. We must be extending it.", rel) + } + _ => return Err(e.into()), } } diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 86c84ec82f..998c199ba6 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -43,6 +43,16 @@ pub enum CalculateLogicalSizeError { Other(#[from] anyhow::Error), } +#[derive(Debug, thiserror::Error)] +pub enum RelationError { + #[error("Relation Already Exists")] + AlreadyExists, + #[error("invalid relnode")] + InvalidRelnode, + #[error(transparent)] + Other(#[from] anyhow::Error), +} + /// /// This impl provides all the functionality to store PostgreSQL relations, SLRUs, /// and other special kinds of files, in a versioned key-value store. The @@ -101,9 +111,9 @@ impl Timeline { ctx: &RequestContext, ) -> Result { if tag.relnode == 0 { - return Err(PageReconstructError::Other(anyhow::anyhow!( - "invalid relnode" - ))); + return Err(PageReconstructError::Other( + RelationError::InvalidRelnode.into(), + )); } let nblocks = self.get_rel_size(tag, lsn, latest, ctx).await?; @@ -148,9 +158,9 @@ impl Timeline { ctx: &RequestContext, ) -> Result { if tag.relnode == 0 { - return Err(PageReconstructError::Other(anyhow::anyhow!( - "invalid relnode" - ))); + return Err(PageReconstructError::Other( + RelationError::InvalidRelnode.into(), + )); } if let Some(nblocks) = self.get_cached_rel_size(&tag, lsn) { @@ -193,9 +203,9 @@ impl Timeline { ctx: &RequestContext, ) -> Result { if tag.relnode == 0 { - return Err(PageReconstructError::Other(anyhow::anyhow!( - "invalid relnode" - ))); + return Err(PageReconstructError::Other( + RelationError::InvalidRelnode.into(), + )); } // first try to lookup relation in cache @@ -724,7 +734,7 @@ impl<'a> DatadirModification<'a> { blknum: BlockNumber, rec: NeonWalRecord, ) -> anyhow::Result<()> { - anyhow::ensure!(rel.relnode != 0, "invalid relnode"); + anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode); self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec)); Ok(()) } @@ -751,7 +761,7 @@ impl<'a> DatadirModification<'a> { blknum: BlockNumber, img: Bytes, ) -> anyhow::Result<()> { - anyhow::ensure!(rel.relnode != 0, "invalid relnode"); + anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode); self.put(rel_block_to_key(rel, blknum), Value::Image(img)); Ok(()) } @@ -875,32 +885,38 @@ impl<'a> DatadirModification<'a> { rel: RelTag, nblocks: BlockNumber, ctx: &RequestContext, - ) -> anyhow::Result<()> { - anyhow::ensure!(rel.relnode != 0, "invalid relnode"); + ) -> Result<(), RelationError> { + if rel.relnode == 0 { + return Err(RelationError::AlreadyExists); + } // It's possible that this is the first rel for this db in this // tablespace. Create the reldir entry for it if so. - let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await?)?; + let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?) + .context("deserialize db")?; let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode); let mut rel_dir = if dbdir.dbdirs.get(&(rel.spcnode, rel.dbnode)).is_none() { // Didn't exist. Update dbdir dbdir.dbdirs.insert((rel.spcnode, rel.dbnode), false); - let buf = DbDirectory::ser(&dbdir)?; + let buf = DbDirectory::ser(&dbdir).context("serialize db")?; self.put(DBDIR_KEY, Value::Image(buf.into())); // and create the RelDirectory RelDirectory::default() } else { // reldir already exists, fetch it - RelDirectory::des(&self.get(rel_dir_key, ctx).await?)? + RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?) + .context("deserialize db")? }; // Add the new relation to the rel directory entry, and write it back if !rel_dir.rels.insert((rel.relnode, rel.forknum)) { - anyhow::bail!("rel {rel} already exists"); + return Err(RelationError::AlreadyExists); } self.put( rel_dir_key, - Value::Image(Bytes::from(RelDirectory::ser(&rel_dir)?)), + Value::Image(Bytes::from( + RelDirectory::ser(&rel_dir).context("serialize")?, + )), ); // Put size @@ -925,7 +941,7 @@ impl<'a> DatadirModification<'a> { nblocks: BlockNumber, ctx: &RequestContext, ) -> anyhow::Result<()> { - anyhow::ensure!(rel.relnode != 0, "invalid relnode"); + anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode); let last_lsn = self.tline.get_last_record_lsn(); if self.tline.get_rel_exists(rel, last_lsn, true, ctx).await? { let size_key = rel_size_to_key(rel); @@ -956,7 +972,7 @@ impl<'a> DatadirModification<'a> { nblocks: BlockNumber, ctx: &RequestContext, ) -> anyhow::Result<()> { - anyhow::ensure!(rel.relnode != 0, "invalid relnode"); + anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode); // Put size let size_key = rel_size_to_key(rel); @@ -977,7 +993,7 @@ impl<'a> DatadirModification<'a> { /// Drop a relation. pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> { - anyhow::ensure!(rel.relnode != 0, "invalid relnode"); + anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode); // Remove it from the directory entry let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode); diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 68cf2a4645..8d4c1842bd 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -25,7 +25,7 @@ use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes; use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment; use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn}; -use anyhow::Result; +use anyhow::{Context, Result}; use bytes::{Buf, Bytes, BytesMut}; use tracing::*; @@ -1082,7 +1082,10 @@ impl<'a> WalIngest<'a> { .await? { // create it with 0 size initially, the logic below will extend it - modification.put_rel_creation(rel, 0, ctx).await?; + modification + .put_rel_creation(rel, 0, ctx) + .await + .context("Relation Error")?; 0 } else { self.timeline.get_rel_size(rel, last_lsn, true, ctx).await?