Compare commits

..

4 Commits

Author SHA1 Message Date
Konstantin Knizhnik
a612e9a356 New RFC version 2024-12-16 09:16:39 +02:00
Konstantin Knizhnik
5af77ab39b Update V3 protol version RFC 2024-12-15 19:22:10 +02:00
Konstantin Knizhnik
10bc58c510 Update V3 protol version RFC 2024-12-15 19:21:54 +02:00
Konstantin Knizhnik
da6c78dd5d Add RFC for V3 protocol version 2024-08-15 16:23:12 +03:00
106 changed files with 1532 additions and 2461 deletions

View File

@@ -6,10 +6,6 @@ self-hosted-runner:
- small
- small-arm64
- us-east-2
- aws-arm64-8core
- aws-arm64-16core
- aws-arm64-32core
- qemu-arm64
config-variables:
- BENCHMARK_PROJECT_ID_PUB
- BENCHMARK_PROJECT_ID_SUB

View File

@@ -43,7 +43,7 @@ inputs:
pg_version:
description: 'Postgres version to use for tests'
required: false
default: 'v16'
default: 'v14'
benchmark_durations:
description: 'benchmark durations JSON'
required: false
@@ -131,7 +131,7 @@ runs:
fi
if [[ "${{ inputs.run_in_parallel }}" == "true" ]]; then
# -n sets the number of parallel processes that pytest-xdist will run
EXTRA_PARAMS="-n auto $EXTRA_PARAMS"
EXTRA_PARAMS="-n12 $EXTRA_PARAMS"
# --dist=loadgroup points tests marked with @pytest.mark.xdist_group
# to the same worker to make @pytest.mark.order work with xdist
@@ -169,8 +169,10 @@ runs:
EXTRA_PARAMS="--durations-path $TEST_OUTPUT/benchmark_durations.json $EXTRA_PARAMS"
fi
if [[ $BUILD_TYPE == "debug" && $RUNNER_ARCH == 'X64' ]]; then
if [[ "${{ inputs.build_type }}" == "debug" ]]; then
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
elif [[ "${{ inputs.build_type }}" == "release" ]]; then
cov_prefix=()
else
cov_prefix=()
fi

View File

@@ -36,7 +36,7 @@ env:
jobs:
build-neon:
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', inputs.arch == 'arm64' && 'qemu-arm64' || 'large')) }}
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', inputs.arch == 'arm64' && 'large-arm64' || 'large')) }}
container:
image: ${{ inputs.build-tools-image }}
credentials:
@@ -94,16 +94,11 @@ jobs:
# We run tests with addtional features, that are turned off by default (e.g. in release builds), see
# corresponding Cargo.toml files for their descriptions.
- name: Set env variables
env:
ARCH: ${{ inputs.arch }}
run: |
CARGO_FEATURES="--features testing"
if [[ $BUILD_TYPE == "debug" && $ARCH == 'x64' ]]; then
if [[ $BUILD_TYPE == "debug" ]]; then
cov_prefix="scripts/coverage --profraw-prefix=$GITHUB_JOB --dir=/tmp/coverage run"
CARGO_FLAGS="--locked"
elif [[ $BUILD_TYPE == "debug" ]]; then
cov_prefix=""
CARGO_FLAGS="--locked"
elif [[ $BUILD_TYPE == "release" ]]; then
cov_prefix=""
CARGO_FLAGS="--locked --release"
@@ -115,11 +110,6 @@ jobs:
echo "CARGO_HOME=${GITHUB_WORKSPACE}/.cargo"
} >> $GITHUB_ENV
# See https://github.com/aws/aws-graviton-getting-started/blob/57dc813626d0266f1cc12ef83474745bb1f31fb4/rust.md
- name: Set RUSTFLAGS for ARM
if: inputs.arch == 'arm64'
run: echo "RUSTFLAGS=-Ctarget-feature=+lse -Ctarget-cpu=neoverse-n1" >> $GITHUB_ENV
- name: Cache postgres v14 build
id: cache_pg_14
uses: actions/cache@v4
@@ -163,13 +153,11 @@ jobs:
run: |
PQ_LIB_DIR=$(pwd)/pg_install/v16/lib
export PQ_LIB_DIR
${cov_prefix} mold -run cargo build $CARGO_FLAGS $CARGO_FEATURES --bins --tests -j$(nproc)
${cov_prefix} mold -run cargo build $CARGO_FLAGS $CARGO_FEATURES --bins --tests
# Do install *before* running rust tests because they might recompile the
# binaries with different features/flags.
- name: Install rust binaries
env:
ARCH: ${{ inputs.arch }}
run: |
# Install target binaries
mkdir -p /tmp/neon/bin/
@@ -184,7 +172,7 @@ jobs:
done
# Install test executables and write list of all binaries (for code coverage)
if [[ $BUILD_TYPE == "debug" && $ARCH == 'x64' ]]; then
if [[ $BUILD_TYPE == "debug" ]]; then
# Keep bloated coverage data files away from the rest of the artifact
mkdir -p /tmp/coverage/
@@ -255,10 +243,10 @@ jobs:
uses: ./.github/actions/save-coverage-data
regress-tests:
# Don't run regression tests on debug arm64 builds
if: inputs.build-type != 'debug' || inputs.arch != 'arm64'
# Run test on x64 only
if: inputs.arch == 'x64'
needs: [ build-neon ]
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', inputs.arch == 'arm64' && 'qemu-arm64' || 'large')) }}
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', inputs.arch == 'arm64' && 'large-arm64' || 'large')) }}
container:
image: ${{ inputs.build-tools-image }}
credentials:

View File

@@ -222,20 +222,13 @@ jobs:
id: create-allure-report
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
with:
store-test-results-into-db: true
env:
REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}
uses: slackapi/slack-github-action@v1
with:
channel-id: "C06T9AMNDQQ" # on-call-compute-staging-stream
slack-message: |
Periodic replication testing: ${{ job.status }}
<${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>
<${{ steps.create-allure-report.outputs.report-url }}|Allure report>
channel-id: "C033QLM5P7D" # dev-staging-stream
slack-message: "Periodic replication testing: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
@@ -337,7 +330,7 @@ jobs:
prepare_AWS_RDS_databases:
uses: ./.github/workflows/_benchmarking_preparation.yml
secrets: inherit
pgbench-compare:
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
needs: [ generate-matrices, prepare_AWS_RDS_databases ]

View File

@@ -198,7 +198,7 @@ jobs:
strategy:
fail-fast: false
matrix:
arch: [ x64, arm64 ]
arch: [ x64 ]
# Do not build or run tests in debug for release branches
build-type: ${{ fromJson((startsWith(github.ref_name, 'release') && github.event_name == 'push') && '["release"]' || '["debug", "release"]') }}
include:
@@ -280,7 +280,6 @@ jobs:
save_perf_report: ${{ github.ref_name == 'main' }}
extra_params: --splits 5 --group ${{ matrix.pytest_split_group }}
benchmark_durations: ${{ needs.get-benchmarks-durations.outputs.json }}
pg_version: v16
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
@@ -986,10 +985,10 @@ jobs:
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
run: |
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
gh workflow --repo neondatabase/infra run deploy-dev.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f deployPreprodRegion=false
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f deployPreprodRegion=false
gh workflow --repo neondatabase/azure run deploy.yml -f dockerTag=${{needs.tag.outputs.build-tag}}
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
gh workflow --repo neondatabase/infra run deploy-dev.yml --ref main \
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main \
-f deployPgSniRouter=false \
-f deployProxy=false \
-f deployStorage=true \
@@ -999,14 +998,14 @@ jobs:
-f dockerTag=${{needs.tag.outputs.build-tag}} \
-f deployPreprodRegion=true
gh workflow --repo neondatabase/infra run deploy-prod.yml --ref main \
gh workflow --repo neondatabase/aws run deploy-prod.yml --ref main \
-f deployStorage=true \
-f deployStorageBroker=true \
-f deployStorageController=true \
-f branch=main \
-f dockerTag=${{needs.tag.outputs.build-tag}}
elif [[ "$GITHUB_REF_NAME" == "release-proxy" ]]; then
gh workflow --repo neondatabase/infra run deploy-dev.yml --ref main \
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main \
-f deployPgSniRouter=true \
-f deployProxy=true \
-f deployStorage=false \
@@ -1016,7 +1015,7 @@ jobs:
-f dockerTag=${{needs.tag.outputs.build-tag}} \
-f deployPreprodRegion=true
gh workflow --repo neondatabase/infra run deploy-proxy-prod.yml --ref main \
gh workflow --repo neondatabase/aws run deploy-proxy-prod.yml --ref main \
-f deployPgSniRouter=true \
-f deployProxy=true \
-f branch=main \

View File

@@ -4,7 +4,7 @@ on:
issues:
types:
- opened
pull_request_target:
pull_request:
types:
- opened
@@ -25,7 +25,7 @@ jobs:
- name: Check whether `${{ github.actor }}` is a member of `${{ github.repository_owner }}`
id: check-user
env:
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
if gh api -H "Accept: application/vnd.github+json" -H "X-GitHub-Api-Version: 2022-11-28" "/orgs/${GITHUB_REPOSITORY_OWNER}/members/${GITHUB_ACTOR}"; then
is_member=true
@@ -45,10 +45,10 @@ jobs:
issues: write # for `gh issue edit`
steps:
- name: Add `${{ env.LABEL }}` label
- name: Label new ${{ github.event_name }}
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
ITEM_NUMBER: ${{ github.event[github.event_name == 'pull_request_target' && 'pull_request' || 'issue'].number }}
GH_CLI_COMMAND: ${{ github.event_name == 'pull_request_target' && 'pr' || 'issue' }}
ITEM_NUMBER: ${{ github.event[github.event_name == 'pull_request' && 'pull_request' || 'issue'].number }}
GH_CLI_COMMAND: ${{ github.event_name == 'pull_request' && 'pr' || 'issue' }}
run: |
gh ${GH_CLI_COMMAND} --repo ${GITHUB_REPOSITORY} edit --add-label=${LABEL} ${ITEM_NUMBER}

View File

@@ -262,7 +262,7 @@ By default, this runs both debug and release modes, and all supported postgres v
testing locally, it is convenient to run just one set of permutations, like this:
```sh
DEFAULT_PG_VERSION=16 BUILD_TYPE=release ./scripts/pytest
DEFAULT_PG_VERSION=15 BUILD_TYPE=release ./scripts/pytest
```
## Flamegraphs

View File

@@ -379,7 +379,7 @@ where
}
}
pub(crate) fn process_has_stopped(pid: Pid) -> anyhow::Result<bool> {
fn process_has_stopped(pid: Pid) -> anyhow::Result<bool> {
match kill(pid, None) {
// Process exists, keep waiting
Ok(_) => Ok(false),

View File

@@ -15,9 +15,7 @@ use control_plane::local_env::{
};
use control_plane::pageserver::PageServerNode;
use control_plane::safekeeper::SafekeeperNode;
use control_plane::storage_controller::{
NeonStorageControllerStartArgs, NeonStorageControllerStopArgs, StorageController,
};
use control_plane::storage_controller::StorageController;
use control_plane::{broker, local_env};
use pageserver_api::config::{
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
@@ -54,7 +52,7 @@ const DEFAULT_PAGESERVER_ID: NodeId = NodeId(1);
const DEFAULT_BRANCH_NAME: &str = "main";
project_git_version!(GIT_VERSION);
const DEFAULT_PG_VERSION: &str = "16";
const DEFAULT_PG_VERSION: &str = "15";
const DEFAULT_PAGESERVER_CONTROL_PLANE_API: &str = "http://127.0.0.1:1234/upcall/v1/";
@@ -1054,36 +1052,6 @@ fn get_start_timeout(args: &ArgMatches) -> &Duration {
humantime_duration.as_ref()
}
fn storage_controller_start_args(args: &ArgMatches) -> NeonStorageControllerStartArgs {
let maybe_instance_id = args.get_one::<u8>("instance-id");
let base_port = args.get_one::<u16>("base-port");
if maybe_instance_id.is_some() && base_port.is_none() {
panic!("storage-controller start specificied instance-id but did not provide base-port");
}
let start_timeout = args
.get_one::<humantime::Duration>("start-timeout")
.expect("invalid value for start-timeout");
NeonStorageControllerStartArgs {
instance_id: maybe_instance_id.copied().unwrap_or(1),
base_port: base_port.copied(),
start_timeout: *start_timeout,
}
}
fn storage_controller_stop_args(args: &ArgMatches) -> NeonStorageControllerStopArgs {
let maybe_instance_id = args.get_one::<u8>("instance-id");
let immediate = args.get_one::<String>("stop-mode").map(|s| s.as_str()) == Some("immediate");
NeonStorageControllerStopArgs {
instance_id: maybe_instance_id.copied().unwrap_or(1),
immediate,
}
}
async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
match sub_match.subcommand() {
Some(("start", subcommand_args)) => {
@@ -1145,14 +1113,19 @@ async fn handle_storage_controller(
let svc = StorageController::from_env(env);
match sub_match.subcommand() {
Some(("start", start_match)) => {
if let Err(e) = svc.start(storage_controller_start_args(start_match)).await {
if let Err(e) = svc.start(get_start_timeout(start_match)).await {
eprintln!("start failed: {e}");
exit(1);
}
}
Some(("stop", stop_match)) => {
if let Err(e) = svc.stop(storage_controller_stop_args(stop_match)).await {
let immediate = stop_match
.get_one::<String>("stop-mode")
.map(|s| s.as_str())
== Some("immediate");
if let Err(e) = svc.stop(immediate).await {
eprintln!("stop failed: {}", e);
exit(1);
}
@@ -1255,12 +1228,7 @@ async fn handle_start_all(
// Only start the storage controller if the pageserver is configured to need it
if env.control_plane_api.is_some() {
let storage_controller = StorageController::from_env(env);
if let Err(e) = storage_controller
.start(NeonStorageControllerStartArgs::with_default_instance_id(
(*retry_timeout).into(),
))
.await
{
if let Err(e) = storage_controller.start(retry_timeout).await {
eprintln!("storage_controller start failed: {:#}", e);
try_stop_all(env, true).await;
exit(1);
@@ -1390,21 +1358,10 @@ async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
eprintln!("neon broker stop failed: {e:#}");
}
// Stop all storage controller instances. In the most common case there's only one,
// but iterate though the base data directory in order to discover the instances.
let storcon_instances = env
.storage_controller_instances()
.await
.expect("Must inspect data dir");
for (instance_id, _instance_dir_path) in storcon_instances {
if env.control_plane_api.is_some() {
let storage_controller = StorageController::from_env(env);
let stop_args = NeonStorageControllerStopArgs {
instance_id,
immediate,
};
if let Err(e) = storage_controller.stop(stop_args).await {
eprintln!("Storage controller instance {instance_id} stop failed: {e:#}");
if let Err(e) = storage_controller.stop(immediate).await {
eprintln!("storage controller stop failed: {e:#}");
}
}
}
@@ -1544,18 +1501,6 @@ fn cli() -> Command {
.action(ArgAction::SetTrue)
.required(false);
let instance_id = Arg::new("instance-id")
.long("instance-id")
.help("Identifier used to distinguish storage controller instances (default 1)")
.value_parser(value_parser!(u8))
.required(false);
let base_port = Arg::new("base-port")
.long("base-port")
.help("Base port for the storage controller instance idenfified by instance-id (defaults to pagserver cplane api)")
.value_parser(value_parser!(u16))
.required(false);
Command::new("Neon CLI")
.arg_required_else_help(true)
.version(GIT_VERSION)
@@ -1664,12 +1609,9 @@ fn cli() -> Command {
.arg_required_else_help(true)
.about("Manage storage_controller")
.subcommand(Command::new("start").about("Start storage controller")
.arg(timeout_arg.clone())
.arg(instance_id.clone())
.arg(base_port))
.arg(timeout_arg.clone()))
.subcommand(Command::new("stop").about("Stop storage controller")
.arg(stop_mode_arg.clone())
.arg(instance_id))
.arg(stop_mode_arg.clone()))
)
.subcommand(
Command::new("safekeeper")

View File

@@ -27,7 +27,7 @@ use crate::pageserver::PageServerNode;
use crate::pageserver::PAGESERVER_REMOTE_STORAGE_DIR;
use crate::safekeeper::SafekeeperNode;
pub const DEFAULT_PG_VERSION: u32 = 16;
pub const DEFAULT_PG_VERSION: u32 = 15;
//
// This data structures represents neon_local CLI config
@@ -156,11 +156,6 @@ pub struct NeonStorageControllerConf {
#[serde(with = "humantime_serde")]
pub max_warming_up: Duration,
pub start_as_candidate: bool,
/// Database url used when running multiple storage controller instances
pub database_url: Option<SocketAddr>,
/// Threshold for auto-splitting a tenant into shards
pub split_threshold: Option<u64>,
@@ -179,8 +174,6 @@ impl Default for NeonStorageControllerConf {
Self {
max_offline: Self::DEFAULT_MAX_OFFLINE_INTERVAL,
max_warming_up: Self::DEFAULT_MAX_WARMING_UP_INTERVAL,
start_as_candidate: false,
database_url: None,
split_threshold: None,
max_secondary_lag_bytes: None,
}
@@ -399,36 +392,6 @@ impl LocalEnv {
}
}
/// Inspect the base data directory and extract the instance id and instance directory path
/// for all storage controller instances
pub async fn storage_controller_instances(&self) -> std::io::Result<Vec<(u8, PathBuf)>> {
let mut instances = Vec::default();
let dir = std::fs::read_dir(self.base_data_dir.clone())?;
for dentry in dir {
let dentry = dentry?;
let is_dir = dentry.metadata()?.is_dir();
let filename = dentry.file_name().into_string().unwrap();
let parsed_instance_id = match filename.strip_prefix("storage_controller_") {
Some(suffix) => suffix.parse::<u8>().ok(),
None => None,
};
let is_instance_dir = is_dir && parsed_instance_id.is_some();
if !is_instance_dir {
continue;
}
instances.push((
parsed_instance_id.expect("Checked previously"),
dentry.path(),
));
}
Ok(instances)
}
pub fn register_branch_mapping(
&mut self,
branch_name: String,

View File

@@ -3,8 +3,6 @@ use crate::{
local_env::{LocalEnv, NeonStorageControllerConf},
};
use camino::{Utf8Path, Utf8PathBuf};
use hyper::Uri;
use nix::unistd::Pid;
use pageserver_api::{
controller_api::{
NodeConfigureRequest, NodeDescribeResponse, NodeRegisterRequest, TenantCreateRequest,
@@ -20,7 +18,7 @@ use pageserver_client::mgmt_api::ResponseErrorMessageExt;
use postgres_backend::AuthType;
use reqwest::Method;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{fs, net::SocketAddr, path::PathBuf, str::FromStr, sync::OnceLock};
use std::{fs, str::FromStr, time::Duration};
use tokio::process::Command;
use tracing::instrument;
use url::Url;
@@ -31,14 +29,12 @@ use utils::{
pub struct StorageController {
env: LocalEnv,
listen: String,
private_key: Option<Vec<u8>>,
public_key: Option<String>,
postgres_port: u16,
client: reqwest::Client,
config: NeonStorageControllerConf,
// The listen addresses is learned when starting the storage controller,
// hence the use of OnceLock to init it at the right time.
listen: OnceLock<SocketAddr>,
}
const COMMAND: &str = "storage_controller";
@@ -47,36 +43,6 @@ const STORAGE_CONTROLLER_POSTGRES_VERSION: u32 = 16;
const DB_NAME: &str = "storage_controller";
pub struct NeonStorageControllerStartArgs {
pub instance_id: u8,
pub base_port: Option<u16>,
pub start_timeout: humantime::Duration,
}
impl NeonStorageControllerStartArgs {
pub fn with_default_instance_id(start_timeout: humantime::Duration) -> Self {
Self {
instance_id: 1,
base_port: None,
start_timeout,
}
}
}
pub struct NeonStorageControllerStopArgs {
pub instance_id: u8,
pub immediate: bool,
}
impl NeonStorageControllerStopArgs {
pub fn with_default_instance_id(immediate: bool) -> Self {
Self {
instance_id: 1,
immediate,
}
}
}
#[derive(Serialize, Deserialize)]
pub struct AttachHookRequest {
pub tenant_shard_id: TenantShardId,
@@ -101,6 +67,23 @@ pub struct InspectResponse {
impl StorageController {
pub fn from_env(env: &LocalEnv) -> Self {
// Makes no sense to construct this if pageservers aren't going to use it: assume
// pageservers have control plane API set
let listen_url = env.control_plane_api.clone().unwrap();
let listen = format!(
"{}:{}",
listen_url.host_str().unwrap(),
listen_url.port().unwrap()
);
// Convention: NeonEnv in python tests reserves the next port after the control_plane_api
// port, for use by our captive postgres.
let postgres_port = listen_url
.port()
.expect("Control plane API setting should always have a port")
+ 1;
// Assume all pageservers have symmetric auth configuration: this service
// expects to use one JWT token to talk to all of them.
let ps_conf = env
@@ -143,28 +126,20 @@ impl StorageController {
Self {
env: env.clone(),
listen,
private_key,
public_key,
postgres_port,
client: reqwest::ClientBuilder::new()
.build()
.expect("Failed to construct http client"),
config: env.storage_controller.clone(),
listen: OnceLock::default(),
}
}
fn storage_controller_instance_dir(&self, instance_id: u8) -> PathBuf {
self.env
.base_data_dir
.join(format!("storage_controller_{}", instance_id))
}
fn pid_file(&self, instance_id: u8) -> Utf8PathBuf {
Utf8PathBuf::from_path_buf(
self.storage_controller_instance_dir(instance_id)
.join("storage_controller.pid"),
)
.expect("non-Unicode path")
fn pid_file(&self) -> Utf8PathBuf {
Utf8PathBuf::from_path_buf(self.env.base_data_dir.join("storage_controller.pid"))
.expect("non-Unicode path")
}
/// PIDFile for the postgres instance used to store storage controller state
@@ -209,9 +184,9 @@ impl StorageController {
}
/// Readiness check for our postgres process
async fn pg_isready(&self, pg_bin_dir: &Utf8Path, postgres_port: u16) -> anyhow::Result<bool> {
async fn pg_isready(&self, pg_bin_dir: &Utf8Path) -> anyhow::Result<bool> {
let bin_path = pg_bin_dir.join("pg_isready");
let args = ["-h", "localhost", "-p", &format!("{}", postgres_port)];
let args = ["-h", "localhost", "-p", &format!("{}", self.postgres_port)];
let exitcode = Command::new(bin_path).args(args).spawn()?.wait().await?;
Ok(exitcode.success())
@@ -224,8 +199,8 @@ impl StorageController {
/// who just want to run `cargo neon_local` without knowing about diesel.
///
/// Returns the database url
pub async fn setup_database(&self, postgres_port: u16) -> anyhow::Result<String> {
let database_url = format!("postgresql://localhost:{}/{DB_NAME}", postgres_port);
pub async fn setup_database(&self) -> anyhow::Result<String> {
let database_url = format!("postgresql://localhost:{}/{DB_NAME}", self.postgres_port);
let pg_bin_dir = self.get_pg_bin_dir().await?;
let createdb_path = pg_bin_dir.join("createdb");
@@ -234,7 +209,7 @@ impl StorageController {
"-h",
"localhost",
"-p",
&format!("{}", postgres_port),
&format!("{}", self.postgres_port),
DB_NAME,
])
.output()
@@ -255,14 +230,13 @@ impl StorageController {
pub async fn connect_to_database(
&self,
postgres_port: u16,
) -> anyhow::Result<(
tokio_postgres::Client,
tokio_postgres::Connection<tokio_postgres::Socket, tokio_postgres::tls::NoTlsStream>,
)> {
tokio_postgres::Config::new()
.host("localhost")
.port(postgres_port)
.port(self.postgres_port)
// The user is the ambient operating system user name.
// That is an impurity which we want to fix in => TODO https://github.com/neondatabase/neon/issues/8400
//
@@ -278,115 +252,72 @@ impl StorageController {
.map_err(anyhow::Error::new)
}
pub async fn start(&self, start_args: NeonStorageControllerStartArgs) -> anyhow::Result<()> {
let instance_dir = self.storage_controller_instance_dir(start_args.instance_id);
if let Err(err) = tokio::fs::create_dir(&instance_dir).await {
if err.kind() != std::io::ErrorKind::AlreadyExists {
panic!("Failed to create instance dir {instance_dir:?}");
}
}
pub async fn start(&self, retry_timeout: &Duration) -> anyhow::Result<()> {
// Start a vanilla Postgres process used by the storage controller for persistence.
let pg_data_path = Utf8PathBuf::from_path_buf(self.env.base_data_dir.clone())
.unwrap()
.join("storage_controller_db");
let pg_bin_dir = self.get_pg_bin_dir().await?;
let pg_lib_dir = self.get_pg_lib_dir().await?;
let pg_log_path = pg_data_path.join("postgres.log");
let (listen, postgres_port) = {
if let Some(base_port) = start_args.base_port {
(
format!("127.0.0.1:{base_port}"),
self.config
.database_url
.expect("--base-port requires NeonStorageControllerConf::database_url")
.port(),
)
} else {
let listen_url = self.env.control_plane_api.clone().unwrap();
let listen = format!(
"{}:{}",
listen_url.host_str().unwrap(),
listen_url.port().unwrap()
);
(listen, listen_url.port().unwrap() + 1)
if !tokio::fs::try_exists(&pg_data_path).await? {
// Initialize empty database
let initdb_path = pg_bin_dir.join("initdb");
let mut child = Command::new(&initdb_path)
.envs(vec![
("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
])
.args(["-D", pg_data_path.as_ref()])
.spawn()
.expect("Failed to spawn initdb");
let status = child.wait().await?;
if !status.success() {
anyhow::bail!("initdb failed with status {status}");
}
};
let socket_addr = listen
.parse()
.expect("listen address is a valid socket address");
self.listen
.set(socket_addr)
.expect("StorageController::listen is only set here");
// Write a minimal config file:
// - Specify the port, since this is chosen dynamically
// - Switch off fsync, since we're running on lightweight test environments and when e.g. scale testing
// the storage controller we don't want a slow local disk to interfere with that.
//
// NB: it's important that we rewrite this file on each start command so we propagate changes
// from `LocalEnv`'s config file (`.neon/config`).
tokio::fs::write(
&pg_data_path.join("postgresql.conf"),
format!("port = {}\nfsync=off\n", self.postgres_port),
)
.await?;
// Do we remove the pid file on stop?
let pg_started = self.is_postgres_running().await?;
let pg_lib_dir = self.get_pg_lib_dir().await?;
println!("Starting storage controller database...");
let db_start_args = [
"-w",
"-D",
pg_data_path.as_ref(),
"-l",
pg_log_path.as_ref(),
"start",
];
if !pg_started {
// Start a vanilla Postgres process used by the storage controller for persistence.
let pg_data_path = Utf8PathBuf::from_path_buf(self.env.base_data_dir.clone())
.unwrap()
.join("storage_controller_db");
let pg_bin_dir = self.get_pg_bin_dir().await?;
let pg_log_path = pg_data_path.join("postgres.log");
background_process::start_process(
"storage_controller_db",
&self.env.base_data_dir,
pg_bin_dir.join("pg_ctl").as_std_path(),
db_start_args,
vec![
("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
],
background_process::InitialPidFile::Create(self.postgres_pid_file()),
retry_timeout,
|| self.pg_isready(&pg_bin_dir),
)
.await?;
if !tokio::fs::try_exists(&pg_data_path).await? {
// Initialize empty database
let initdb_path = pg_bin_dir.join("initdb");
let mut child = Command::new(&initdb_path)
.envs(vec![
("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
])
.args(["-D", pg_data_path.as_ref()])
.spawn()
.expect("Failed to spawn initdb");
let status = child.wait().await?;
if !status.success() {
anyhow::bail!("initdb failed with status {status}");
}
};
// Write a minimal config file:
// - Specify the port, since this is chosen dynamically
// - Switch off fsync, since we're running on lightweight test environments and when e.g. scale testing
// the storage controller we don't want a slow local disk to interfere with that.
//
// NB: it's important that we rewrite this file on each start command so we propagate changes
// from `LocalEnv`'s config file (`.neon/config`).
tokio::fs::write(
&pg_data_path.join("postgresql.conf"),
format!("port = {}\nfsync=off\n", postgres_port),
)
.await?;
println!("Starting storage controller database...");
let db_start_args = [
"-w",
"-D",
pg_data_path.as_ref(),
"-l",
pg_log_path.as_ref(),
"start",
];
background_process::start_process(
"storage_controller_db",
&self.env.base_data_dir,
pg_bin_dir.join("pg_ctl").as_std_path(),
db_start_args,
vec![
("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
],
background_process::InitialPidFile::Create(self.postgres_pid_file()),
&start_args.start_timeout,
|| self.pg_isready(&pg_bin_dir, postgres_port),
)
.await?;
// Run migrations on every startup, in case something changed.
self.setup_database(postgres_port).await?;
}
let database_url = format!("postgresql://localhost:{}/{DB_NAME}", postgres_port);
// Run migrations on every startup, in case something changed.
let database_url = self.setup_database().await?;
// We support running a startup SQL script to fiddle with the database before we launch storcon.
// This is used by the test suite.
@@ -408,7 +339,7 @@ impl StorageController {
}
}
};
let (mut client, conn) = self.connect_to_database(postgres_port).await?;
let (mut client, conn) = self.connect_to_database().await?;
let conn = tokio::spawn(conn);
let tx = client.build_transaction();
let tx = tx.start().await?;
@@ -417,20 +348,9 @@ impl StorageController {
drop(client);
conn.await??;
let listen = self
.listen
.get()
.expect("cell is set earlier in this function");
let address_for_peers = Uri::builder()
.scheme("http")
.authority(format!("{}:{}", listen.ip(), listen.port()))
.path_and_query("")
.build()
.unwrap();
let mut args = vec![
"-l",
&listen.to_string(),
&self.listen,
"--dev",
"--database-url",
&database_url,
@@ -438,17 +358,10 @@ impl StorageController {
&humantime::Duration::from(self.config.max_offline).to_string(),
"--max-warming-up-interval",
&humantime::Duration::from(self.config.max_warming_up).to_string(),
"--address-for-peers",
&address_for_peers.to_string(),
]
.into_iter()
.map(|s| s.to_string())
.collect::<Vec<_>>();
if self.config.start_as_candidate {
args.push("--start-as-candidate".to_string());
}
if let Some(private_key) = &self.private_key {
let claims = Claims::new(None, Scope::PageServerApi);
let jwt_token =
@@ -481,15 +394,15 @@ impl StorageController {
background_process::start_process(
COMMAND,
&instance_dir,
&self.env.base_data_dir,
&self.env.storage_controller_bin(),
args,
vec![
("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()),
],
background_process::InitialPidFile::Create(self.pid_file(start_args.instance_id)),
&start_args.start_timeout,
background_process::InitialPidFile::Create(self.pid_file()),
retry_timeout,
|| async {
match self.ready().await {
Ok(_) => Ok(true),
@@ -502,35 +415,8 @@ impl StorageController {
Ok(())
}
pub async fn stop(&self, stop_args: NeonStorageControllerStopArgs) -> anyhow::Result<()> {
background_process::stop_process(
stop_args.immediate,
COMMAND,
&self.pid_file(stop_args.instance_id),
)?;
let storcon_instances = self.env.storage_controller_instances().await?;
for (instance_id, instanced_dir_path) in storcon_instances {
if instance_id == stop_args.instance_id {
continue;
}
let pid_file = instanced_dir_path.join("storage_controller.pid");
let pid = tokio::fs::read_to_string(&pid_file)
.await
.map_err(|err| {
anyhow::anyhow!("Failed to read storcon pid file at {pid_file:?}: {err}")
})?
.parse::<i32>()
.expect("pid is valid i32");
let other_proc_alive = !background_process::process_has_stopped(Pid::from_raw(pid))?;
if other_proc_alive {
// There is another storage controller instance running, so we return
// and leave the database running.
return Ok(());
}
}
pub async fn stop(&self, immediate: bool) -> anyhow::Result<()> {
background_process::stop_process(immediate, COMMAND, &self.pid_file())?;
let pg_data_path = self.env.base_data_dir.join("storage_controller_db");
let pg_bin_dir = self.get_pg_bin_dir().await?;
@@ -543,51 +429,27 @@ impl StorageController {
.wait()
.await?;
if !stop_status.success() {
match self.is_postgres_running().await {
Ok(false) => {
println!("Storage controller database is already stopped");
return Ok(());
}
Ok(true) => {
anyhow::bail!("Failed to stop storage controller database");
}
Err(err) => {
anyhow::bail!("Failed to stop storage controller database: {err}");
}
let pg_status_args = ["-D", &pg_data_path.to_string_lossy(), "status"];
let status_exitcode = Command::new(pg_bin_dir.join("pg_ctl"))
.args(pg_status_args)
.spawn()?
.wait()
.await?;
// pg_ctl status returns this exit code if postgres is not running: in this case it is
// fine that stop failed. Otherwise it is an error that stop failed.
const PG_STATUS_NOT_RUNNING: i32 = 3;
if Some(PG_STATUS_NOT_RUNNING) == status_exitcode.code() {
println!("Storage controller database is already stopped");
return Ok(());
} else {
anyhow::bail!("Failed to stop storage controller database: {stop_status}")
}
}
Ok(())
}
async fn is_postgres_running(&self) -> anyhow::Result<bool> {
let pg_data_path = self.env.base_data_dir.join("storage_controller_db");
let pg_bin_dir = self.get_pg_bin_dir().await?;
let pg_status_args = ["-D", &pg_data_path.to_string_lossy(), "status"];
let status_exitcode = Command::new(pg_bin_dir.join("pg_ctl"))
.args(pg_status_args)
.spawn()?
.wait()
.await?;
// pg_ctl status returns this exit code if postgres is not running: in this case it is
// fine that stop failed. Otherwise it is an error that stop failed.
const PG_STATUS_NOT_RUNNING: i32 = 3;
const PG_NO_DATA_DIR: i32 = 4;
const PG_STATUS_RUNNING: i32 = 0;
match status_exitcode.code() {
Some(PG_STATUS_NOT_RUNNING) => Ok(false),
Some(PG_NO_DATA_DIR) => Ok(false),
Some(PG_STATUS_RUNNING) => Ok(true),
Some(code) => Err(anyhow::anyhow!(
"pg_ctl status returned unexpected status code: {:?}",
code
)),
None => Err(anyhow::anyhow!("pg_ctl status returned no status code")),
}
}
fn get_claims_for_path(path: &str) -> anyhow::Result<Option<Claims>> {
let category = match path.find('/') {
Some(idx) => &path[..idx],
@@ -613,31 +475,15 @@ impl StorageController {
RQ: Serialize + Sized,
RS: DeserializeOwned + Sized,
{
// In the special case of the `storage_controller start` subcommand, we wish
// to use the API endpoint of the newly started storage controller in order
// to pass the readiness check. In this scenario [`Self::listen`] will be set
// (see [`Self::start`]).
//
// Otherwise, we infer the storage controller api endpoint from the configured
// control plane API.
let url = if let Some(socket_addr) = self.listen.get() {
Url::from_str(&format!(
"http://{}:{}/{path}",
socket_addr.ip().to_canonical(),
socket_addr.port()
))
.unwrap()
} else {
// The configured URL has the /upcall path prefix for pageservers to use: we will strip that out
// for general purpose API access.
let listen_url = self.env.control_plane_api.clone().unwrap();
Url::from_str(&format!(
"http://{}:{}/{path}",
listen_url.host_str().unwrap(),
listen_url.port().unwrap()
))
.unwrap()
};
// The configured URL has the /upcall path prefix for pageservers to use: we will strip that out
// for general purpose API access.
let listen_url = self.env.control_plane_api.clone().unwrap();
let url = Url::from_str(&format!(
"http://{}:{}/{path}",
listen_url.host_str().unwrap(),
listen_url.port().unwrap()
))
.unwrap();
let mut builder = self.client.request(method, url);
if let Some(body) = body {

View File

@@ -14,7 +14,7 @@ picked tenant (which requested on-demand activation) for around 30 seconds
during the restart at 2024-04-03 16:37 UTC.
Note that lots of shutdowns on loaded pageservers do not finish within the
[10 second systemd enforced timeout](https://github.com/neondatabase/infra/blob/0a5280b383e43c063d43cbf87fa026543f6d6ad4/.github/ansible/systemd/pageserver.service#L16). This means we are shutting down without flushing ephemeral layers
[10 second systemd enforced timeout](https://github.com/neondatabase/aws/blob/0a5280b383e43c063d43cbf87fa026543f6d6ad4/.github/ansible/systemd/pageserver.service#L16). This means we are shutting down without flushing ephemeral layers
and have to reingest data in order to serve requests after restarting, potentially making first request latencies worse.
This problem is not yet very acutely felt in storage controller managed pageservers since

View File

@@ -1,265 +0,0 @@
# Physical Replication
This RFC is a bit special in that we have already implemented physical
replication a long time ago. However, we never properly wrote down all
the decisions and assumptions, and in the last months when more users
have started to use the feature, numerous issues have surfaced.
This RFC documents the design decisions that have been made.
## Summary
PostgreSQL has a feature called streaming replication, where a replica
streams WAL from the primary and continuously applies it. It is also
known as "physical replication", to distinguish it from logical
replication. In PostgreSQL, a replica is initialized by taking a
physical backup of the primary. In Neon, the replica is initialized
from a slim "base backup" from the pageserver, just like a primary,
and the primary and the replicas connect to the same pageserver,
sharing the storage.
There are two kinds of read-only replicas in Neon:
- replicas that follow the primary, and
- "static" replicas that are pinned at a particular LSN.
A static replica is useful e.g. for performing time-travel queries and
running one-off slow queries without affecting the primary. A replica
that follows the primary can be used e.g. to scale out read-only
workloads.
## Motivation
Read-only replicas allow offloading read-only queries. It's useful for
isolation, if you want to make sure that read-only queries don't
affect the primary, and it's also an easy way to provide guaranteed
read-only access to an application, without having to mess with access
controls.
## Non Goals (if relevant)
This RFC is all about WAL-based *physical* replication. Logical
replication is a different feature.
Neon also has the capability to launch "static" read-only nodes which
do not follow the primary, but are pinned to a particular LSN. They
can be used for long-running one-off queries, or for Point-in-time
queries. They work similarly to read replicas that follow the primary,
but some things are simpler: there are no concerns about cache
invalidation when the data changes on the primary, or worrying about
transactions that are in-progress on the primary.
## Impacted components (e.g. pageserver, safekeeper, console, etc)
- Control plane launches the replica
- Replica Postgres instance connects to the safekeepers, to stream the WAL
- The primary does not know about the standby, except for the hot standby feedback
- The primary and replicas all connect to the same pageservers
# Context
Some useful things to know about hot standby and replicas in
PostgreSQL.
## PostgreSQL startup sequence
"Running" and "start up" terms are little imprecise. PostgreSQL
replica startup goes through several stages:
1. First, the process is started up, and various initialization steps
are performed, like initializing shared memory. If you try to
connect to the server in this stage, you get an error: ERROR: the
database system is starting up. This stage happens very quickly, no
2. Then the server reads the checpoint record from the WAL and starts
the WAL replay starting from the checkpoint. This works differently
in Neon: we start the WAL replay at the basebackup LSN, not from a
checkpoint! If you connect to the server in this state, you get an
error: ERROR: the database system is not yet accepting
connections. We proceed to the next stage, when the WAL replay sees
a running-xacts record. Or in Neon, the "CLOG scanning" mechanism
can allow us to move directly to next stage, with all the caveats
listed in this RFC.
3. When the running-xacts information is established, the server
starts to accept connections normally.
From PostgreSQL's point of view, the server is already running in
stage 2, even though it's not accepting connections yet. Our
`compute_ctl` does not consider it as running until stage 3. If the
transition from stage 2 to 3 doesn't happen fast enough, the control
plane will mark the start operation as failed.
## Decisions, Issues
### Cache invalidation in replica
When a read replica follows the primary in PostgreSQL, it needs to
stream all the WAL from the primary and apply all the records, to keep
the local copy of the data consistent with the primary. In Neon, the
replica can fetch the updated page versions from the pageserver, so
it's not necessary to apply all the WAL. However, it needs to ensure
that any pages that are currently in the Postgres buffer cache, or the
Local File Cache, are either updated, or thrown away so that the next
read of the page will fetch the latest version.
We choose to apply the WAL records for pages that are already in the
buffer cache, and skip records for other pages. Somewhat arbitrarily,
we also apply records affecting catalog relations, fetching the old
page version from the pageserver if necessary first. See
`neon_redo_read_buffer_filter()` function.
The replica wouldn't necessarily need to see all the WAL records, only
the records that apply to cached pages. For simplicity, we do stream
all the WAL to the replica, and the replica simply ignores WAL records
that require no action.
Like in PostgreSQL, the read replica maintains a "replay LSN", which
is the LSN up to which the replica has received and replayed the
WAL. The replica can lag behind the primary, if it cannot quite keep
up with the primary, or if a long-running query conflicts with changes
that are about to be applied, or even intentionally if the user wishes
to see delayed data (see recovery_min_apply_delay). It's important
that the replica sees a consistent view of the whole cluster at the
replay LSN, when it's lagging behind.
In Neon, the replica connects to a safekeeper to get the WAL
stream. That means that the safekeepers must be able to regurgitate
the original WAL as far back as the replay LSN of any running read
replica. (A static read-only node that does not follow the primary
does not require a WAL stream however). The primary does not need to
be running, and when it is, the replicas don't incur any extra
overhead to the primary (see hot standby feedback though).
### In-progress transactions
In PostgreSQL, when a hot standby server starts up, it cannot
immediately open up for queries (see [PostgreSQL startup
sequence]). It first needs to establish a complete list of in-progress
transactions, including subtransactions, that are running at the
primary, at the current replay LSN. Normally that happens quickly,
when the replica sees a "running-xacts" WAL record, because the
primary writes a running-xacts WAL record at every checkpoint, and in
PostgreSQL the replica always starts the WAL replay from a checkpoint
REDO point. (A shutdown checkpoint WAL record also implies that all
the non-prepared transactions have ended.) If there are a lot of
subtransactions in progress, however, the standby might need to wait
for old transactions to complete before it can open up for queries.
In Neon that problem is worse: a replica can start at any LSN, so
there's no guarantee that it will see a running-xacts record any time
soon. In particular, if the primary is not running when the replica is
started, it might never see a running-xacts record.
To make things worse, we initially missed this issue, and always
started accepting queries at replica startup, even if it didn't have
the transaction information. That could lead to incorrect query
results and data corruption later. However, as we fixed that, we
introduced a new problem compared to what we had before: previously
the replica would always start up, but after fixing that bug, it might
not. In a superficial way, the old behavior was better (but could lead
to serious issues later!). That made fixing that bug was very hard,
because as we fixed it, we made things (superficially) worse for
others.
See https://github.com/neondatabase/neon/pull/7288 which fixed the
bug, and follow-up PRs https://github.com/neondatabase/neon/pull/8323
and https://github.com/neondatabase/neon/pull/8484 to try to claw back
the cases that started to cause trouble as fixing it. As of this
writing, there are still cases where a replica might not immediately
start up, causing the control plane operation to fail, the remaining
issues are tracked in https://github.com/neondatabase/neon/issues/6211.
One long-term fix for this is to switch to using so-called CSN
snapshots in read replica. That would make it unnecessary to have the
full in-progress transaction list in the replica at startup time. See
https://commitfest.postgresql.org/48/4912/ for a work-in-progress
patch to upstream to implement that.
Another thing we could do is to teach the control plane about that
distinction between "starting up" and "running but haven't received
running-xacts information yet", so that we could keep the replica
waiting longer in that stage, and also give any client connections the
same `ERROR: the database system is not yet accepting connections`
error that you get in standalone PostgreSQL in that state.
### Recovery conflicts and Hot standby feedback
It's possible that a tuple version is vacuumed away in the primary,
even though it is still needed by a running transactions in the
replica. This is called a "recovery conflict", and PostgreSQL provides
various options for dealing with it. By default, the WAL replay will
wait up to 30 s for the conflicting query to finish. After that, it
will kill the running query, so that the WAL replay can proceed.
Another way to avoid the situation is to enable the
[`hot_standby_feedback`](https://www.postgresql.org/docs/current/runtime-config-replication.html#GUC-HOT-STANDBY-FEEDBACK)
option. When it is enabled, the primary will refrain from vacuuming
tuples that are still needed in the primary. That means potentially
bloating the primary, which violates the usual rule that read replicas
don't affect the operations on the primary, which is why it's off by
default. We leave it to users to decide if they want to turn it on,
same as PostgreSQL.
Neon supports `hot_standby_feedback` by passing the feedback messages
from the replica to the safekeepers, and from safekeepers to the
primary.
### Relationship of settings between primary and replica
In order to enter hot standby mode, some configuration options need to
be set to the same or larger values in the standby, compared to the
primary. See [explanation in the PostgreSQL
docs](https://www.postgresql.org/docs/current/hot-standby.html#HOT-STANDBY-ADMIN)
In Neon, we have this problem too. To prevent customers from hitting
it, the control plane automatically adjusts the settings of a replica,
so that they match or exceed the primary's settings (see
https://github.com/neondatabase/cloud/issues/14903). However, you
can still hit the issue if the primary is restarted with larger
settings, while the replica is running.
### Interaction with Pageserver GC
The read replica can lag behind the primary. If there are recovery
conflicts or the replica cannot keep up for some reason, the lag can
in principle grow indefinitely. The replica will issue all GetPage
requests to the pageservers at the current replay LSN, and needs to
see the old page versions.
If the retention period in the pageserver is set to be small, it may
have already garbage collected away the old page versions. That will
cause read errors in the compute, and can mean that the replica cannot
make progress with the replication anymore.
There is a mechanism for replica to pass information about its replay
LSN to the pageserver, so that the pageserver refrains from GC'ing
data that is still needed by the standby. It's called
'standby_horizon' in the pageserver code, see
https://github.com/neondatabase/neon/pull/7368. A separate "lease"
mechanism also is in the works, where the replica could hold a lease
on the old LSN, preventing the pageserver from advancing the GC
horizon past that point. The difference is that the standby_horizon
mechanism relies on a feedback message from replica to safekeeper,
while the least API is exposed directly from the pageserver. A static
read-only node is not connected to safekeepers, so it cannot use the
standby_horizon mechanism.
### Synchronous replication
We haven't put any effort into synchronous replication yet.
PostgreSQL provides multiple levels of synchronicity. In the weaker
levels, a transaction is not acknowledged as committed to the client
in the primary until the WAL has been streamed to a replica or flushed
to disk there. Those modes don't make senses in Neon, because the
safekeepers handle durability.
`synchronous_commit=remote_apply` mode would make sense. In that mode,
the commit is not acknowledged to the client until it has been
replayed in the replica. That ensures that after commit, you can see
the commit in the replica too (aka. read-your-write consistency).

View File

@@ -0,0 +1,84 @@
# V3 version of compute-page server protocol
Created on: 2024-08-15
Author: Konstantin Knizhnik
## Summary
Current version of compute-PS protocol doesn't allow to verify that received response actually corresponds to the request.
In most cases it should not cause any problems, because Neon SMGR follows classical server request-response pattern.
If response is not received due to some reasons (network error, ...), then connection is dropped and error is reported.
But we also actively use prefetch, which allows to minimize network round-trip overhead.
In case of prefetch compute sends multiple getpage requests and then gets responses to all of them.
It is expected that responses are received in the same order as requests are sent, i.e. for each request we receive response.
Unfortunately it can be violated in case of errors. In this case it can happen that connection is reset but prefetch ring - not.
As a result we treat response of new request as response to some older prefetch request and place wrong page image in shared buffer.
If this page is modified, then it is saved in WAL and database file. So we are not able to recover original (correct) page image by applying WAL.
So we can mix pages of one relation or even pages of different relations. Most frequently such corruption is detected for indexes,
but just because there are more invariants which can be checked. And there is no good universal way to detect and recover such
corruption.
## Motivation
This bug in prefetch was fixed, but we want to prevent similar problems in future.
For example prewarm is also similar with prefetch and sens several requests and only
after it wait for response.
## Previous work
We already changed protocol version from V1 to V2 when replaced single request LSN with pair
(request LSN,not modified since LSN). It was done by introducing new command codes.
So there was no explicit check for protocol version: if server receives new command,
it assumes that it is new protocol version. After both clients and servers were upgraded to new version,
new command codes were removed. Then version was added to "pagestream" command used to perform handshake.
Client has `neon.protocol_version` GUC specifying which protocol version it should use.
So client informs server about protocol version it is going to use, but server can not ask the client to use some other protocol version,
it can only reject connection request if it is not supporting this protocol version.
## Requirements
- Be able to verify that page server response corresponds to the requests
- Provide backward compatibility: old clients should work with new server.
## Non Goals
- Detect page corruption (include CRC)
- Support of vector operation (merge several requests into one)
- Forward compatibility: support new clients with old page server
## Solution
Include in response extra fields making it possible to verify that response corresponds to the particular request.
Such extra fields include:
- tablespace OID
- database OID
- relation OID
- fork number
- block id (for getpage)
- request LSN
- last modified LSN
In addition to this fields, we also introduce unique auto-incremented `request_id`.
It is combined from `backend_id` and local auto-incremented counter.
There is some probability of collision if backend is restarted, but it is not critical as far as we have all other fields included in response.
`request_id` can be used for better tracking and associating log messages produced by client and page server.
Although only mismatch of `getpage` request can cause data corruption, we want to extend responses for all other commands: get relation/db size, check presence of relation.
## Compatibility
We will change handshake command freom "pagestream_v2" to "pagestream_v3". With V3 version of protocol server should
reply with extended responses. Request/response tags will not be changed.
To prevent forward compatibility issues (when new client tries to access old server), deploy of this PR should be done in three steps:
1. Deploy of new server recognizing V3 protocol version
2. Deploy of new client which is able to send V3 commands, but by default still using V2.
3. After one release cycle when no rollback to previous PS version is possible, we can switch default version of protocol to V3, by changing `neon.protocol_version` GUC in project settings.

View File

@@ -383,48 +383,6 @@ impl RemoteStorage for AzureBlobStorage {
}
}
async fn head_object(
&self,
key: &RemotePath,
cancel: &CancellationToken,
) -> Result<ListingObject, DownloadError> {
let kind = RequestKind::Head;
let _permit = self.permit(kind, cancel).await?;
let started_at = start_measuring_requests(kind);
let blob_client = self.client.blob_client(self.relative_path_to_name(key));
let properties_future = blob_client.get_properties().into_future();
let properties_future = tokio::time::timeout(self.timeout, properties_future);
let res = tokio::select! {
res = properties_future => res,
_ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
};
if let Ok(inner) = &res {
// do not incl. timeouts as errors in metrics but cancellations
let started_at = ScopeGuard::into_inner(started_at);
crate::metrics::BUCKET_METRICS
.req_seconds
.observe_elapsed(kind, inner, started_at);
}
let data = match res {
Ok(Ok(data)) => Ok(data),
Ok(Err(sdk)) => Err(to_download_error(sdk)),
Err(_timeout) => Err(DownloadError::Timeout),
}?;
let properties = data.blob.properties;
Ok(ListingObject {
key: key.to_owned(),
last_modified: SystemTime::from(properties.last_modified),
size: properties.content_length,
})
}
async fn upload(
&self,
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,

View File

@@ -150,7 +150,7 @@ pub enum ListingMode {
NoDelimiter,
}
#[derive(PartialEq, Eq, Debug, Clone)]
#[derive(PartialEq, Eq, Debug)]
pub struct ListingObject {
pub key: RemotePath,
pub last_modified: SystemTime,
@@ -215,13 +215,6 @@ pub trait RemoteStorage: Send + Sync + 'static {
Ok(combined)
}
/// Obtain metadata information about an object.
async fn head_object(
&self,
key: &RemotePath,
cancel: &CancellationToken,
) -> Result<ListingObject, DownloadError>;
/// Streams the local file contents into remote into the remote storage entry.
///
/// If the operation fails because of timeout or cancellation, the root cause of the error will be
@@ -370,20 +363,6 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
}
}
// See [`RemoteStorage::head_object`].
pub async fn head_object(
&self,
key: &RemotePath,
cancel: &CancellationToken,
) -> Result<ListingObject, DownloadError> {
match self {
Self::LocalFs(s) => s.head_object(key, cancel).await,
Self::AwsS3(s) => s.head_object(key, cancel).await,
Self::AzureBlob(s) => s.head_object(key, cancel).await,
Self::Unreliable(s) => s.head_object(key, cancel).await,
}
}
/// See [`RemoteStorage::upload`]
pub async fn upload(
&self,
@@ -619,7 +598,6 @@ impl ConcurrencyLimiter {
RequestKind::Delete => &self.write,
RequestKind::Copy => &self.write,
RequestKind::TimeTravel => &self.write,
RequestKind::Head => &self.read,
}
}

View File

@@ -445,20 +445,6 @@ impl RemoteStorage for LocalFs {
}
}
async fn head_object(
&self,
key: &RemotePath,
_cancel: &CancellationToken,
) -> Result<ListingObject, DownloadError> {
let target_file_path = key.with_base(&self.storage_root);
let metadata = file_metadata(&target_file_path).await?;
Ok(ListingObject {
key: key.clone(),
last_modified: metadata.modified()?,
size: metadata.len(),
})
}
async fn upload(
&self,
data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync,

View File

@@ -13,7 +13,6 @@ pub(crate) enum RequestKind {
List = 3,
Copy = 4,
TimeTravel = 5,
Head = 6,
}
use scopeguard::ScopeGuard;
@@ -28,7 +27,6 @@ impl RequestKind {
List => "list_objects",
Copy => "copy_object",
TimeTravel => "time_travel_recover",
Head => "head_object",
}
}
const fn as_index(&self) -> usize {
@@ -36,8 +34,7 @@ impl RequestKind {
}
}
const REQUEST_KIND_COUNT: usize = 7;
pub(crate) struct RequestTyped<C>([C; REQUEST_KIND_COUNT]);
pub(crate) struct RequestTyped<C>([C; 6]);
impl<C> RequestTyped<C> {
pub(crate) fn get(&self, kind: RequestKind) -> &C {
@@ -46,8 +43,8 @@ impl<C> RequestTyped<C> {
fn build_with(mut f: impl FnMut(RequestKind) -> C) -> Self {
use RequestKind::*;
let mut it = [Get, Put, Delete, List, Copy, TimeTravel, Head].into_iter();
let arr = std::array::from_fn::<C, REQUEST_KIND_COUNT, _>(|index| {
let mut it = [Get, Put, Delete, List, Copy, TimeTravel].into_iter();
let arr = std::array::from_fn::<C, 6, _>(|index| {
let next = it.next().unwrap();
assert_eq!(index, next.as_index());
f(next)

View File

@@ -23,7 +23,7 @@ use aws_config::{
use aws_sdk_s3::{
config::{AsyncSleep, IdentityCache, Region, SharedAsyncSleep},
error::SdkError,
operation::{get_object::GetObjectError, head_object::HeadObjectError},
operation::get_object::GetObjectError,
types::{Delete, DeleteMarkerEntry, ObjectIdentifier, ObjectVersion, StorageClass},
Client,
};
@@ -604,78 +604,6 @@ impl RemoteStorage for S3Bucket {
}
}
async fn head_object(
&self,
key: &RemotePath,
cancel: &CancellationToken,
) -> Result<ListingObject, DownloadError> {
let kind = RequestKind::Head;
let _permit = self.permit(kind, cancel).await?;
let started_at = start_measuring_requests(kind);
let head_future = self
.client
.head_object()
.bucket(self.bucket_name())
.key(self.relative_path_to_s3_object(key))
.send();
let head_future = tokio::time::timeout(self.timeout, head_future);
let res = tokio::select! {
res = head_future => res,
_ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
};
let res = res.map_err(|_e| DownloadError::Timeout)?;
// do not incl. timeouts as errors in metrics but cancellations
let started_at = ScopeGuard::into_inner(started_at);
crate::metrics::BUCKET_METRICS
.req_seconds
.observe_elapsed(kind, &res, started_at);
let data = match res {
Ok(object_output) => object_output,
Err(SdkError::ServiceError(e)) if matches!(e.err(), HeadObjectError::NotFound(_)) => {
// Count this in the AttemptOutcome::Ok bucket, because 404 is not
// an error: we expect to sometimes fetch an object and find it missing,
// e.g. when probing for timeline indices.
crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
kind,
AttemptOutcome::Ok,
started_at,
);
return Err(DownloadError::NotFound);
}
Err(e) => {
crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
kind,
AttemptOutcome::Err,
started_at,
);
return Err(DownloadError::Other(
anyhow::Error::new(e).context("s3 head object"),
));
}
};
let (Some(last_modified), Some(size)) = (data.last_modified, data.content_length) else {
return Err(DownloadError::Other(anyhow!(
"head_object doesn't contain last_modified or content_length"
)))?;
};
Ok(ListingObject {
key: key.to_owned(),
last_modified: SystemTime::try_from(last_modified).map_err(|e| {
DownloadError::Other(anyhow!("can't convert time '{last_modified}': {e}"))
})?,
size: size as u64,
})
}
async fn upload(
&self,
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,

View File

@@ -30,7 +30,6 @@ pub struct UnreliableWrapper {
#[derive(Debug, Hash, Eq, PartialEq)]
enum RemoteOp {
ListPrefixes(Option<RemotePath>),
HeadObject(RemotePath),
Upload(RemotePath),
Download(RemotePath),
Delete(RemotePath),
@@ -138,16 +137,6 @@ impl RemoteStorage for UnreliableWrapper {
self.inner.list(prefix, mode, max_keys, cancel).await
}
async fn head_object(
&self,
key: &RemotePath,
cancel: &CancellationToken,
) -> Result<crate::ListingObject, DownloadError> {
self.attempt(RemoteOp::HeadObject(key.clone()))
.map_err(DownloadError::Other)?;
self.inner.head_object(key, cancel).await
}
async fn upload(
&self,
data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,

View File

@@ -50,6 +50,7 @@ pub mod defaults {
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
DEFAULT_PG_LISTEN_PORT,
};
use pageserver_api::models::ImageCompressionAlgorithm;
pub use storage_broker::DEFAULT_ENDPOINT as BROKER_DEFAULT_ENDPOINT;
pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "300 s";
@@ -89,7 +90,8 @@ pub mod defaults {
pub const DEFAULT_MAX_VECTORED_READ_BYTES: usize = 128 * 1024; // 128 KiB
pub const DEFAULT_IMAGE_COMPRESSION: &str = "zstd(1)";
pub const DEFAULT_IMAGE_COMPRESSION: ImageCompressionAlgorithm =
ImageCompressionAlgorithm::Disabled;
pub const DEFAULT_VALIDATE_VECTORED_GET: bool = false;
@@ -476,7 +478,7 @@ impl PageServerConfigBuilder {
max_vectored_read_bytes: Set(MaxVectoredReadBytes(
NonZeroUsize::new(DEFAULT_MAX_VECTORED_READ_BYTES).unwrap(),
)),
image_compression: Set(DEFAULT_IMAGE_COMPRESSION.parse().unwrap()),
image_compression: Set(DEFAULT_IMAGE_COMPRESSION),
ephemeral_bytes_per_memory_kb: Set(DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
l0_flush: Set(L0FlushConfig::default()),
compact_level0_phase1_value_access: Set(CompactL0Phase1ValueAccess::default()),
@@ -1063,7 +1065,7 @@ impl PageServerConf {
NonZeroUsize::new(defaults::DEFAULT_MAX_VECTORED_READ_BYTES)
.expect("Invalid default constant"),
),
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION.parse().unwrap(),
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
l0_flush: L0FlushConfig::default(),
compact_level0_phase1_value_access: CompactL0Phase1ValueAccess::default(),
@@ -1303,7 +1305,7 @@ background_task_maximum_delay = '334 s'
NonZeroUsize::new(defaults::DEFAULT_MAX_VECTORED_READ_BYTES)
.expect("Invalid default constant")
),
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION.parse().unwrap(),
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
l0_flush: L0FlushConfig::default(),
compact_level0_phase1_value_access: CompactL0Phase1ValueAccess::default(),
@@ -1376,7 +1378,7 @@ background_task_maximum_delay = '334 s'
NonZeroUsize::new(defaults::DEFAULT_MAX_VECTORED_READ_BYTES)
.expect("Invalid default constant")
),
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION.parse().unwrap(),
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
l0_flush: L0FlushConfig::default(),
compact_level0_phase1_value_access: CompactL0Phase1ValueAccess::default(),

View File

@@ -1,10 +1,15 @@
use std::{num::NonZeroUsize, sync::Arc};
use crate::tenant::ephemeral_file;
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize)]
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
pub enum L0FlushConfig {
PageCached,
#[serde(rename_all = "snake_case")]
Direct { max_concurrency: NonZeroUsize },
Direct {
max_concurrency: NonZeroUsize,
},
}
impl Default for L0FlushConfig {
@@ -20,12 +25,14 @@ impl Default for L0FlushConfig {
pub struct L0FlushGlobalState(Arc<Inner>);
pub enum Inner {
PageCached,
Direct { semaphore: tokio::sync::Semaphore },
}
impl L0FlushGlobalState {
pub fn new(config: L0FlushConfig) -> Self {
match config {
L0FlushConfig::PageCached => Self(Arc::new(Inner::PageCached)),
L0FlushConfig::Direct { max_concurrency } => {
let semaphore = tokio::sync::Semaphore::new(max_concurrency.get());
Self(Arc::new(Inner::Direct { semaphore }))
@@ -37,3 +44,13 @@ impl L0FlushGlobalState {
&self.0
}
}
impl L0FlushConfig {
pub(crate) fn prewarm_on_write(&self) -> ephemeral_file::PrewarmPageCacheOnWrite {
use L0FlushConfig::*;
match self {
PageCached => ephemeral_file::PrewarmPageCacheOnWrite::Yes,
Direct { .. } => ephemeral_file::PrewarmPageCacheOnWrite::No,
}
}
}

View File

@@ -49,7 +49,7 @@ use tracing::{info, info_span};
/// backwards-compatible changes to the metadata format.
pub const STORAGE_FORMAT_VERSION: u16 = 3;
pub const DEFAULT_PG_VERSION: u32 = 16;
pub const DEFAULT_PG_VERSION: u32 = 15;
// Magic constants used to identify different kinds of files
pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;

View File

@@ -393,7 +393,7 @@ struct PageServerTask {
/// Tasks may optionally be launched for a particular tenant/timeline, enabling
/// later cancelling tasks for that tenant/timeline in [`shutdown_tasks`]
tenant_shard_id: TenantShardId,
tenant_shard_id: Option<TenantShardId>,
timeline_id: Option<TimelineId>,
mutable: Mutex<MutableTaskState>,
@@ -405,7 +405,7 @@ struct PageServerTask {
pub fn spawn<F>(
runtime: &tokio::runtime::Handle,
kind: TaskKind,
tenant_shard_id: TenantShardId,
tenant_shard_id: Option<TenantShardId>,
timeline_id: Option<TimelineId>,
name: &str,
future: F,
@@ -550,7 +550,7 @@ pub async fn shutdown_tasks(
let tasks = TASKS.lock().unwrap();
for task in tasks.values() {
if (kind.is_none() || Some(task.kind) == kind)
&& (tenant_shard_id.is_none() || Some(task.tenant_shard_id) == tenant_shard_id)
&& (tenant_shard_id.is_none() || task.tenant_shard_id == tenant_shard_id)
&& (timeline_id.is_none() || task.timeline_id == timeline_id)
{
task.cancel.cancel();
@@ -573,8 +573,13 @@ pub async fn shutdown_tasks(
};
if let Some(mut join_handle) = join_handle {
if log_all {
// warn to catch these in tests; there shouldn't be any
warn!(name = task.name, tenant_shard_id = ?tenant_shard_id, timeline_id = ?timeline_id, kind = ?task_kind, "stopping left-over");
if tenant_shard_id.is_none() {
// there are quite few of these
info!(name = task.name, kind = ?task_kind, "stopping global task");
} else {
// warn to catch these in tests; there shouldn't be any
warn!(name = task.name, tenant_shard_id = ?tenant_shard_id, timeline_id = ?timeline_id, kind = ?task_kind, "stopping left-over");
}
}
if tokio::time::timeout(std::time::Duration::from_secs(1), &mut join_handle)
.await

View File

@@ -798,7 +798,7 @@ impl Tenant {
task_mgr::spawn(
&tokio::runtime::Handle::current(),
TaskKind::Attach,
tenant_shard_id,
Some(tenant_shard_id),
None,
"attach tenant",
async move {

View File

@@ -21,6 +21,7 @@ pub struct EphemeralFile {
}
mod page_caching;
pub(crate) use page_caching::PrewarmOnWrite as PrewarmPageCacheOnWrite;
mod zero_padded_read_write;
impl EphemeralFile {
@@ -51,10 +52,12 @@ impl EphemeralFile {
)
.await?;
let prewarm = conf.l0_flush.prewarm_on_write();
Ok(EphemeralFile {
_tenant_shard_id: tenant_shard_id,
_timeline_id: timeline_id,
rw: page_caching::RW::new(file, gate_guard),
rw: page_caching::RW::new(file, prewarm, gate_guard),
})
}

View File

@@ -1,15 +1,15 @@
//! Wrapper around [`super::zero_padded_read_write::RW`] that uses the
//! [`crate::page_cache`] to serve reads that need to go to the underlying [`VirtualFile`].
//!
//! Subject to removal in <https://github.com/neondatabase/neon/pull/8537>
use crate::context::RequestContext;
use crate::page_cache::{self, PAGE_SZ};
use crate::tenant::block_io::BlockLease;
use crate::virtual_file::owned_buffers_io::util::size_tracking_writer;
use crate::virtual_file::owned_buffers_io::io_buf_ext::FullSlice;
use crate::virtual_file::VirtualFile;
use std::io::{self};
use once_cell::sync::Lazy;
use std::io::{self, ErrorKind};
use std::ops::{Deref, Range};
use tokio_epoll_uring::BoundedBuf;
use tracing::*;
@@ -18,17 +18,33 @@ use super::zero_padded_read_write;
/// See module-level comment.
pub struct RW {
page_cache_file_id: page_cache::FileId,
rw: super::zero_padded_read_write::RW<size_tracking_writer::Writer<VirtualFile>>,
rw: super::zero_padded_read_write::RW<PreWarmingWriter>,
/// Gate guard is held on as long as we need to do operations in the path (delete on drop).
_gate_guard: utils::sync::gate::GateGuard,
}
/// When we flush a block to the underlying [`crate::virtual_file::VirtualFile`],
/// should we pre-warm the [`crate::page_cache`] with the contents?
#[derive(Clone, Copy)]
pub enum PrewarmOnWrite {
Yes,
No,
}
impl RW {
pub fn new(file: VirtualFile, _gate_guard: utils::sync::gate::GateGuard) -> Self {
pub fn new(
file: VirtualFile,
prewarm_on_write: PrewarmOnWrite,
_gate_guard: utils::sync::gate::GateGuard,
) -> Self {
let page_cache_file_id = page_cache::next_file_id();
Self {
page_cache_file_id,
rw: super::zero_padded_read_write::RW::new(size_tracking_writer::Writer::new(file)),
rw: super::zero_padded_read_write::RW::new(PreWarmingWriter::new(
page_cache_file_id,
file,
prewarm_on_write,
)),
_gate_guard,
}
}
@@ -68,10 +84,10 @@ impl RW {
let vec = Vec::with_capacity(size);
// read from disk what we've already flushed
let file_size_tracking_writer = self.rw.as_writer();
let flushed_range = 0..usize::try_from(file_size_tracking_writer.bytes_written()).unwrap();
let mut vec = file_size_tracking_writer
.as_inner()
let writer = self.rw.as_writer();
let flushed_range = writer.written_range();
let mut vec = writer
.file
.read_exact_at(
vec.slice(0..(flushed_range.end - flushed_range.start)),
u64::try_from(flushed_range.start).unwrap(),
@@ -106,7 +122,7 @@ impl RW {
format!(
"ephemeral file: read immutable page #{}: {}: {:#}",
blknum,
self.rw.as_writer().as_inner().path,
self.rw.as_writer().file.path,
e,
),
)
@@ -116,7 +132,7 @@ impl RW {
}
page_cache::ReadBufResult::NotFound(write_guard) => {
let write_guard = writer
.as_inner()
.file
.read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64, ctx)
.await?;
let read_guard = write_guard.mark_valid();
@@ -138,16 +154,137 @@ impl Drop for RW {
// unlink the file
// we are clear to do this, because we have entered a gate
let path = &self.rw.as_writer().as_inner().path;
let res = std::fs::remove_file(path);
let res = std::fs::remove_file(&self.rw.as_writer().file.path);
if let Err(e) = res {
if e.kind() != std::io::ErrorKind::NotFound {
// just never log the not found errors, we cannot do anything for them; on detach
// the tenant directory is already gone.
//
// not found files might also be related to https://github.com/neondatabase/neon/issues/2442
error!("could not remove ephemeral file '{path}': {e}");
error!(
"could not remove ephemeral file '{}': {}",
self.rw.as_writer().file.path,
e
);
}
}
}
}
struct PreWarmingWriter {
prewarm_on_write: PrewarmOnWrite,
nwritten_blocks: u32,
page_cache_file_id: page_cache::FileId,
file: VirtualFile,
}
impl PreWarmingWriter {
fn new(
page_cache_file_id: page_cache::FileId,
file: VirtualFile,
prewarm_on_write: PrewarmOnWrite,
) -> Self {
Self {
prewarm_on_write,
nwritten_blocks: 0,
page_cache_file_id,
file,
}
}
/// Return the byte range within `file` that has been written though `write_all`.
///
/// The returned range would be invalidated by another `write_all`. To prevent that, we capture `&_`.
fn written_range(&self) -> (impl Deref<Target = Range<usize>> + '_) {
let nwritten_blocks = usize::try_from(self.nwritten_blocks).unwrap();
struct Wrapper(Range<usize>);
impl Deref for Wrapper {
type Target = Range<usize>;
fn deref(&self) -> &Range<usize> {
&self.0
}
}
Wrapper(0..nwritten_blocks * PAGE_SZ)
}
}
impl crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter for PreWarmingWriter {
async fn write_all<Buf: tokio_epoll_uring::IoBuf + Send>(
&mut self,
buf: FullSlice<Buf>,
ctx: &RequestContext,
) -> std::io::Result<(usize, FullSlice<Buf>)> {
let buflen = buf.len();
assert_eq!(
buflen % PAGE_SZ,
0,
"{buflen} ; we know TAIL_SZ is a PAGE_SZ multiple, and write_buffered_borrowed is used"
);
// Do the IO.
let buf = match self.file.write_all(buf, ctx).await {
(buf, Ok(nwritten)) => {
assert_eq!(nwritten, buflen);
buf
}
(_, Err(e)) => {
return Err(std::io::Error::new(
ErrorKind::Other,
// order error before path because path is long and error is short
format!(
"ephemeral_file: write_blob: write-back tail self.nwritten_blocks={}, buflen={}, {:#}: {}",
self.nwritten_blocks, buflen, e, self.file.path,
),
));
}
};
let nblocks = buflen / PAGE_SZ;
let nblocks32 = u32::try_from(nblocks).unwrap();
if matches!(self.prewarm_on_write, PrewarmOnWrite::Yes) {
// Pre-warm page cache with the contents.
// At least in isolated bulk ingest benchmarks (test_bulk_insert.py), the pre-warming
// benefits the code that writes InMemoryLayer=>L0 layers.
let cache = page_cache::get();
static CTX: Lazy<RequestContext> = Lazy::new(|| {
RequestContext::new(
crate::task_mgr::TaskKind::EphemeralFilePreWarmPageCache,
crate::context::DownloadBehavior::Error,
)
});
for blknum_in_buffer in 0..nblocks {
let blk_in_buffer =
&buf[blknum_in_buffer * PAGE_SZ..(blknum_in_buffer + 1) * PAGE_SZ];
let blknum = self
.nwritten_blocks
.checked_add(blknum_in_buffer as u32)
.unwrap();
match cache
.read_immutable_buf(self.page_cache_file_id, blknum, &CTX)
.await
{
Err(e) => {
error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}");
// fail gracefully, it's not the end of the world if we can't pre-warm the cache here
}
Ok(v) => match v {
page_cache::ReadBufResult::Found(_guard) => {
// This function takes &mut self, so, it shouldn't be possible to reach this point.
unreachable!("we just wrote block {blknum} to the VirtualFile, which is owned by Self, \
and this function takes &mut self, so, no concurrent read_blk is possible");
}
page_cache::ReadBufResult::NotFound(mut write_guard) => {
write_guard.copy_from_slice(blk_in_buffer);
let _ = write_guard.mark_valid();
}
},
}
}
}
self.nwritten_blocks = self.nwritten_blocks.checked_add(nblocks32).unwrap();
Ok((buflen, buf))
}
}

View File

@@ -565,7 +565,7 @@ mod tests {
);
let expected_bytes = vec![
/* TimelineMetadataHeader */
74, 104, 158, 105, 0, 70, 0, 4, // checksum, size, format_version (4 + 2 + 2)
4, 37, 101, 34, 0, 70, 0, 4, // checksum, size, format_version (4 + 2 + 2)
/* TimelineMetadataBodyV2 */
0, 0, 0, 0, 0, 0, 2, 0, // disk_consistent_lsn (8 bytes)
1, 0, 0, 0, 0, 0, 0, 1, 0, // prev_record_lsn (9 bytes)
@@ -574,7 +574,7 @@ mod tests {
0, 0, 0, 0, 0, 0, 0, 0, // ancestor_lsn (8 bytes)
0, 0, 0, 0, 0, 0, 0, 0, // latest_gc_cutoff_lsn (8 bytes)
0, 0, 0, 0, 0, 0, 0, 0, // initdb_lsn (8 bytes)
0, 0, 0, 16, // pg_version (4 bytes)
0, 0, 0, 15, // pg_version (4 bytes)
/* padding bytes */
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,

View File

@@ -1728,7 +1728,7 @@ impl RemoteTimelineClient {
task_mgr::spawn(
&self.runtime,
TaskKind::RemoteUploadTask,
self.tenant_shard_id,
Some(self.tenant_shard_id),
Some(self.timeline_id),
"remote upload",
async move {

View File

@@ -29,16 +29,16 @@ pub(super) struct HeatMapTenant {
#[derive(Serialize, Deserialize)]
pub(crate) struct HeatMapTimeline {
#[serde_as(as = "DisplayFromStr")]
pub(crate) timeline_id: TimelineId,
pub(super) timeline_id: TimelineId,
pub(crate) layers: Vec<HeatMapLayer>,
pub(super) layers: Vec<HeatMapLayer>,
}
#[serde_as]
#[derive(Serialize, Deserialize)]
pub(crate) struct HeatMapLayer {
pub(crate) name: LayerName,
pub(crate) metadata: LayerFileMetadata,
pub(super) name: LayerName,
pub(super) metadata: LayerFileMetadata,
#[serde_as(as = "TimestampSeconds<i64>")]
pub(super) access_time: SystemTime,

View File

@@ -13,7 +13,7 @@ use crate::tenant::ephemeral_file::EphemeralFile;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::PageReconstructError;
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::{l0_flush, page_cache};
use crate::{l0_flush, page_cache, walrecord};
use anyhow::{anyhow, Result};
use camino::Utf8PathBuf;
use pageserver_api::key::CompactKey;
@@ -249,7 +249,9 @@ impl InMemoryLayer {
/// debugging function to print out the contents of the layer
///
/// this is likely completly unused
pub async fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
let inner = self.inner.read().await;
let end_str = self.end_lsn_or_max();
println!(
@@ -257,6 +259,39 @@ impl InMemoryLayer {
self.timeline_id, self.start_lsn, end_str,
);
if !verbose {
return Ok(());
}
let cursor = inner.file.block_cursor();
let mut buf = Vec::new();
for (key, vec_map) in inner.index.iter() {
for (lsn, pos) in vec_map.as_slice() {
let mut desc = String::new();
cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?;
let val = Value::des(&buf);
match val {
Ok(Value::Image(img)) => {
write!(&mut desc, " img {} bytes", img.len())?;
}
Ok(Value::WalRecord(rec)) => {
let wal_desc = walrecord::describe_wal_record(&rec).unwrap();
write!(
&mut desc,
" rec {} bytes will_init: {} {}",
buf.len(),
rec.will_init(),
wal_desc
)?;
}
Err(err) => {
write!(&mut desc, " DESERIALIZATION ERROR: {}", err)?;
}
}
println!(" key {} at {}: {}", key, lsn, desc);
}
}
Ok(())
}
@@ -501,6 +536,7 @@ impl InMemoryLayer {
use l0_flush::Inner;
let _concurrency_permit = match l0_flush_global_state {
Inner::PageCached => None,
Inner::Direct { semaphore, .. } => Some(semaphore.acquire().await),
};
@@ -532,6 +568,34 @@ impl InMemoryLayer {
.await?;
match l0_flush_global_state {
l0_flush::Inner::PageCached => {
let ctx = RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::InMemoryLayer)
.build();
let mut buf = Vec::new();
let cursor = inner.file.block_cursor();
for (key, vec_map) in inner.index.iter() {
// Write all page versions
for (lsn, pos) in vec_map.as_slice() {
cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?;
let will_init = Value::des(&buf)?.will_init();
let (tmp, res) = delta_layer_writer
.put_value_bytes(
Key::from_compact(*key),
*lsn,
buf.slice_len(),
will_init,
&ctx,
)
.await;
res?;
buf = tmp.into_raw_slice().into_inner();
}
}
}
l0_flush::Inner::Direct { .. } => {
let file_contents: Vec<u8> = inner.file.load_to_vec(ctx).await?;
assert_eq!(

View File

@@ -208,8 +208,6 @@ impl SplitDeltaLayerWriter {
#[cfg(test)]
mod tests {
use rand::{RngCore, SeedableRng};
use crate::{
tenant::{
harness::{TenantHarness, TIMELINE_ID},
@@ -231,10 +229,7 @@ mod tests {
}
fn get_large_img() -> Bytes {
let mut rng = rand::rngs::SmallRng::seed_from_u64(42);
let mut data = vec![0; 8192];
rng.fill_bytes(&mut data);
data.into()
vec![0; 8192].into()
}
#[tokio::test]

View File

@@ -98,7 +98,7 @@ pub fn start_background_loops(
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::Compaction,
tenant_shard_id,
Some(tenant_shard_id),
None,
&format!("compactor for tenant {tenant_shard_id}"),
{
@@ -121,7 +121,7 @@ pub fn start_background_loops(
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::GarbageCollector,
tenant_shard_id,
Some(tenant_shard_id),
None,
&format!("garbage collector for tenant {tenant_shard_id}"),
{
@@ -144,7 +144,7 @@ pub fn start_background_loops(
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::IngestHousekeeping,
tenant_shard_id,
Some(tenant_shard_id),
None,
&format!("ingest housekeeping for tenant {tenant_shard_id}"),
{

View File

@@ -1645,20 +1645,6 @@ impl Timeline {
self.last_record_lsn.shutdown();
if try_freeze_and_flush {
if let Some((open, frozen)) = self
.layers
.read()
.await
.layer_map()
.map(|lm| (lm.open_layer.is_some(), lm.frozen_layers.len()))
.ok()
.filter(|(open, frozen)| *open || *frozen > 0)
{
tracing::info!(?open, frozen, "flushing and freezing on shutdown");
} else {
// this is double-shutdown, ignore it
}
// we shut down walreceiver above, so, we won't add anything more
// to the InMemoryLayer; freeze it and wait for all frozen layers
// to reach the disk & upload queue, then shut the upload queue and
@@ -2281,7 +2267,7 @@ impl Timeline {
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
task_mgr::TaskKind::LayerFlushTask,
self.tenant_shard_id,
Some(self.tenant_shard_id),
Some(self.timeline_id),
"layer flush task",
async move {
@@ -2635,7 +2621,7 @@ impl Timeline {
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
task_mgr::TaskKind::InitialLogicalSizeCalculation,
self.tenant_shard_id,
Some(self.tenant_shard_id),
Some(self.timeline_id),
"initial size calculation",
// NB: don't log errors here, task_mgr will do that.
@@ -2803,7 +2789,7 @@ impl Timeline {
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
task_mgr::TaskKind::OndemandLogicalSizeCalculation,
self.tenant_shard_id,
Some(self.tenant_shard_id),
Some(self.timeline_id),
"ondemand logical size calculation",
async move {
@@ -2977,7 +2963,11 @@ impl Timeline {
LayerVisibilityHint::Visible => {
// Layer is visible to one or more read LSNs: elegible for inclusion in layer map
let last_activity_ts = layer.latest_activity();
Some((layer.layer_desc(), layer.metadata(), last_activity_ts))
Some(HeatMapLayer::new(
layer.layer_desc().layer_name(),
layer.metadata(),
last_activity_ts,
))
}
LayerVisibilityHint::Covered => {
// Layer is resident but unlikely to be read: not elegible for inclusion in heatmap.
@@ -2986,23 +2976,7 @@ impl Timeline {
}
});
let mut layers = resident.collect::<Vec<_>>();
// Sort layers in order of which to download first. For a large set of layers to download, we
// want to prioritize those layers which are most likely to still be in the resident many minutes
// or hours later:
// - Download L0s last, because they churn the fastest: L0s on a fast-writing tenant might
// only exist for a few minutes before being compacted into L1s.
// - For L1 & image layers, download most recent LSNs first: the older the LSN, the sooner
// the layer is likely to be covered by an image layer during compaction.
layers.sort_by_key(|(desc, _meta, _atime)| {
std::cmp::Reverse((!LayerMap::is_l0(&desc.key_range), desc.lsn_range.end))
});
let layers = layers
.into_iter()
.map(|(desc, meta, atime)| HeatMapLayer::new(desc.layer_name(), meta, atime))
.collect();
let layers = resident.collect();
Some(HeatMapTimeline::new(self.timeline_id, layers))
}
@@ -4528,7 +4502,6 @@ impl DurationRecorder {
/// the layer descriptor requires the user to provide the ranges, which should cover all
/// keys specified in the `data` field.
#[cfg(test)]
#[derive(Clone)]
pub struct DeltaLayerTestDesc {
pub lsn_range: Range<Lsn>,
pub key_range: Range<Key>,
@@ -4558,13 +4531,6 @@ impl DeltaLayerTestDesc {
data,
}
}
pub(crate) fn layer_name(&self) -> LayerName {
LayerName::Delta(super::storage_layer::DeltaLayerName {
key_range: self.key_range.clone(),
lsn_range: self.lsn_range.clone(),
})
}
}
impl Timeline {
@@ -5162,7 +5128,7 @@ impl Timeline {
let task_id = task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
task_mgr::TaskKind::DownloadAllRemoteLayers,
self.tenant_shard_id,
Some(self.tenant_shard_id),
Some(self.timeline_id),
"download all remote layers task",
async move {
@@ -5788,110 +5754,12 @@ fn is_send() {
#[cfg(test)]
mod tests {
use pageserver_api::key::Key;
use utils::{id::TimelineId, lsn::Lsn};
use crate::{
repository::Value,
tenant::{
harness::{test_img, TenantHarness},
layer_map::LayerMap,
storage_layer::{Layer, LayerName},
timeline::{DeltaLayerTestDesc, EvictionError},
Timeline,
},
use crate::tenant::{
harness::TenantHarness, storage_layer::Layer, timeline::EvictionError, Timeline,
};
#[tokio::test]
async fn test_heatmap_generation() {
let harness = TenantHarness::create("heatmap_generation").await.unwrap();
let covered_delta = DeltaLayerTestDesc::new_with_inferred_key_range(
Lsn(0x10)..Lsn(0x20),
vec![(
Key::from_hex("620000000033333333444444445500000000").unwrap(),
Lsn(0x11),
Value::Image(test_img("foo")),
)],
);
let visible_delta = DeltaLayerTestDesc::new_with_inferred_key_range(
Lsn(0x10)..Lsn(0x20),
vec![(
Key::from_hex("720000000033333333444444445500000000").unwrap(),
Lsn(0x11),
Value::Image(test_img("foo")),
)],
);
let l0_delta = DeltaLayerTestDesc::new(
Lsn(0x20)..Lsn(0x30),
Key::from_hex("000000000000000000000000000000000000").unwrap()
..Key::from_hex("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF").unwrap(),
vec![(
Key::from_hex("720000000033333333444444445500000000").unwrap(),
Lsn(0x25),
Value::Image(test_img("foo")),
)],
);
let delta_layers = vec![
covered_delta.clone(),
visible_delta.clone(),
l0_delta.clone(),
];
let image_layer = (
Lsn(0x40),
vec![(
Key::from_hex("620000000033333333444444445500000000").unwrap(),
test_img("bar"),
)],
);
let image_layers = vec![image_layer];
let (tenant, ctx) = harness.load().await;
let timeline = tenant
.create_test_timeline_with_layers(
TimelineId::generate(),
Lsn(0x10),
14,
&ctx,
delta_layers,
image_layers,
Lsn(0x100),
)
.await
.unwrap();
// Layer visibility is an input to heatmap generation, so refresh it first
timeline.update_layer_visibility().await.unwrap();
let heatmap = timeline
.generate_heatmap()
.await
.expect("Infallible while timeline is not shut down");
assert_eq!(heatmap.timeline_id, timeline.timeline_id);
// L0 should come last
assert_eq!(heatmap.layers.last().unwrap().name, l0_delta.layer_name());
let mut last_lsn = Lsn::MAX;
for layer in heatmap.layers {
// Covered layer should be omitted
assert!(layer.name != covered_delta.layer_name());
let layer_lsn = match &layer.name {
LayerName::Delta(d) => d.lsn_range.end,
LayerName::Image(i) => i.lsn,
};
// Apart from L0s, newest Layers should come first
if !LayerMap::is_l0(layer.name.key_range()) {
assert!(layer_lsn <= last_lsn);
last_lsn = layer_lsn;
}
}
}
#[tokio::test]
async fn two_layer_eviction_attempts_at_the_same_time() {
let harness = TenantHarness::create("two_layer_eviction_attempts_at_the_same_time")

View File

@@ -395,7 +395,7 @@ impl DeleteTimelineFlow {
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
TaskKind::TimelineDeletionWorker,
tenant_shard_id,
Some(tenant_shard_id),
Some(timeline_id),
"timeline_delete",
async move {

View File

@@ -60,7 +60,7 @@ impl Timeline {
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::Eviction,
self.tenant_shard_id,
Some(self.tenant_shard_id),
Some(self.timeline_id),
&format!(
"layer eviction for {}/{}",

View File

@@ -41,8 +41,6 @@
#include "hll.h"
#define CriticalAssert(cond) do if (!(cond)) elog(PANIC, "Assertion %s failed at %s:%d: ", #cond, __FILE__, __LINE__); while (0)
/*
* Local file cache is used to temporary store relations pages in local file system.
* All blocks of all relations are stored inside one file and addressed using shared hash map.
@@ -53,43 +51,19 @@
*
* Cache is always reconstructed at node startup, so we do not need to save mapping somewhere and worry about
* its consistency.
*
* ## Holes
*
* The LFC can be resized on the fly, up to a maximum size that's determined
* at server startup (neon.max_file_cache_size). After server startup, we
* expand the underlying file when needed, until it reaches the soft limit
* (neon.file_cache_size_limit). If the soft limit is later reduced, we shrink
* the LFC by punching holes in the underlying file with a
* fallocate(FALLOC_FL_PUNCH_HOLE) call. The nominal size of the file doesn't
* shrink, but the disk space it uses does.
*
* Each hole is tracked by a dummy FileCacheEntry, which are kept in the
* 'holes' linked list. They are entered into the chunk hash table, with a
* special key where the blockNumber is used to store the 'offset' of the
* hole, and all other fields are zero. Holes are never looked up in the hash
* table, we only enter them there to have a FileCacheEntry that we can keep
* in the linked list. If the soft limit is raised again, we reuse the holes
* before extending the nominal size of the file.
*/
/* Local file storage allocation chunk.
* Should be power of two. Using larger than page chunks can
* Should be power of two and not less than 32. Using larger than page chunks can
* 1. Reduce hash-map memory footprint: 8TB database contains billion pages
* and size of hash entry is 40 bytes, so we need 40Gb just for hash map.
* 1Mb chunks can reduce hash map size to 320Mb.
* 2. Improve access locality, subsequent pages will be allocated together improving seqscan speed
*/
#define BLOCKS_PER_CHUNK 128 /* 1Mb chunk */
/*
* Smaller chunk seems to be better for OLTP workload
*/
// #define BLOCKS_PER_CHUNK 8 /* 64kb chunk */
#define MB ((uint64)1024*1024)
#define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ / BLOCKS_PER_CHUNK))
#define CHUNK_BITMAP_SIZE ((BLOCKS_PER_CHUNK + 31) / 32)
typedef struct FileCacheEntry
{
@@ -97,8 +71,8 @@ typedef struct FileCacheEntry
uint32 hash;
uint32 offset;
uint32 access_count;
uint32 bitmap[CHUNK_BITMAP_SIZE];
dlist_node list_node; /* LRU/holes list node */
uint32 bitmap[BLOCKS_PER_CHUNK / 32];
dlist_node lru_node; /* LRU list node */
} FileCacheEntry;
typedef struct FileCacheControl
@@ -113,7 +87,6 @@ typedef struct FileCacheControl
uint64 writes;
dlist_head lru; /* double linked list for LRU replacement
* algorithm */
dlist_head holes; /* double linked list of punched holes */
HyperLogLogState wss_estimation; /* estimation of working set size */
} FileCacheControl;
@@ -162,7 +135,6 @@ lfc_disable(char const *op)
lfc_ctl->used = 0;
lfc_ctl->limit = 0;
dlist_init(&lfc_ctl->lru);
dlist_init(&lfc_ctl->holes);
if (lfc_desc > 0)
{
@@ -242,18 +214,18 @@ lfc_shmem_startup(void)
if (!found)
{
int fd;
uint32 n_chunks = SIZE_MB_TO_CHUNKS(lfc_max_size);
uint32 lfc_size = SIZE_MB_TO_CHUNKS(lfc_max_size);
lfc_lock = (LWLockId) GetNamedLWLockTranche("lfc_lock");
info.keysize = sizeof(BufferTag);
info.entrysize = sizeof(FileCacheEntry);
/*
* n_chunks+1 because we add new element to hash table before eviction
* lfc_size+1 because we add new element to hash table before eviction
* of victim
*/
lfc_hash = ShmemInitHash("lfc_hash",
n_chunks + 1, n_chunks + 1,
lfc_size + 1, lfc_size + 1,
&info,
HASH_ELEM | HASH_BLOBS);
lfc_ctl->generation = 0;
@@ -263,7 +235,6 @@ lfc_shmem_startup(void)
lfc_ctl->misses = 0;
lfc_ctl->writes = 0;
dlist_init(&lfc_ctl->lru);
dlist_init(&lfc_ctl->holes);
/* Initialize hyper-log-log structure for estimating working set size */
initSHLL(&lfc_ctl->wss_estimation);
@@ -339,31 +310,14 @@ lfc_change_limit_hook(int newval, void *extra)
* Shrink cache by throwing away least recently accessed chunks and
* returning their space to file system
*/
FileCacheEntry *victim = dlist_container(FileCacheEntry, list_node, dlist_pop_head_node(&lfc_ctl->lru));
FileCacheEntry *hole;
uint32 offset = victim->offset;
uint32 hash;
bool found;
BufferTag holetag;
FileCacheEntry *victim = dlist_container(FileCacheEntry, lru_node, dlist_pop_head_node(&lfc_ctl->lru));
CriticalAssert(victim->access_count == 0);
Assert(victim->access_count == 0);
#ifdef FALLOC_FL_PUNCH_HOLE
if (fallocate(lfc_desc, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, (off_t) victim->offset * BLOCKS_PER_CHUNK * BLCKSZ, BLOCKS_PER_CHUNK * BLCKSZ) < 0)
neon_log(LOG, "Failed to punch hole in file: %m");
#endif
/* We remove the old entry, and re-enter a hole to the hash table */
hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL);
memset(&holetag, 0, sizeof(holetag));
holetag.blockNum = offset;
hash = get_hash_value(lfc_hash, &holetag);
hole = hash_search_with_hash_value(lfc_hash, &holetag, hash, HASH_ENTER, &found);
hole->hash = hash;
hole->offset = offset;
hole->access_count = 0;
CriticalAssert(!found);
dlist_push_tail(&lfc_ctl->holes, &hole->list_node);
lfc_ctl->used -= 1;
}
lfc_ctl->limit = new_size;
@@ -455,8 +409,6 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
CopyNRelFileInfoToBufTag(tag, rinfo);
tag.forkNum = forkNum;
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1);
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
hash = get_hash_value(lfc_hash, &tag);
LWLockAcquire(lfc_lock, LW_SHARED);
@@ -488,7 +440,6 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
tag.forkNum = forkNum;
tag.blockNum = (blkno & ~(BLOCKS_PER_CHUNK - 1));
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
hash = get_hash_value(lfc_hash, &tag);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
@@ -519,7 +470,7 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
{
bool has_remaining_pages;
for (int i = 0; i < CHUNK_BITMAP_SIZE; i++)
for (int i = 0; i < (BLOCKS_PER_CHUNK / 32); i++)
{
if (entry->bitmap[i] != 0)
{
@@ -534,8 +485,8 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
*/
if (!has_remaining_pages)
{
dlist_delete(&entry->list_node);
dlist_push_head(&lfc_ctl->lru, &entry->list_node);
dlist_delete(&entry->lru_node);
dlist_push_head(&lfc_ctl->lru, &entry->lru_node);
}
}
@@ -574,8 +525,6 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
CopyNRelFileInfoToBufTag(tag, rinfo);
tag.forkNum = forkNum;
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1);
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
hash = get_hash_value(lfc_hash, &tag);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
@@ -602,7 +551,7 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
}
/* Unlink entry from LRU list to pin it for the duration of IO operation */
if (entry->access_count++ == 0)
dlist_delete(&entry->list_node);
dlist_delete(&entry->lru_node);
generation = lfc_ctl->generation;
entry_offset = entry->offset;
@@ -620,12 +569,12 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
if (lfc_ctl->generation == generation)
{
CriticalAssert(LFC_ENABLED());
Assert(LFC_ENABLED());
lfc_ctl->hits += 1;
pgBufferUsage.file_cache.hits += 1;
CriticalAssert(entry->access_count > 0);
Assert(entry->access_count > 0);
if (--entry->access_count == 0)
dlist_push_tail(&lfc_ctl->lru, &entry->list_node);
dlist_push_tail(&lfc_ctl->lru, &entry->lru_node);
}
else
result = false;
@@ -664,8 +613,6 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, const void
tag.forkNum = forkNum;
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1);
CopyNRelFileInfoToBufTag(tag, rinfo);
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
hash = get_hash_value(lfc_hash, &tag);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
@@ -685,7 +632,7 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, const void
* operation
*/
if (entry->access_count++ == 0)
dlist_delete(&entry->list_node);
dlist_delete(&entry->lru_node);
}
else
{
@@ -708,26 +655,13 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, const void
if (lfc_ctl->used >= lfc_ctl->limit && !dlist_is_empty(&lfc_ctl->lru))
{
/* Cache overflow: evict least recently used chunk */
FileCacheEntry *victim = dlist_container(FileCacheEntry, list_node, dlist_pop_head_node(&lfc_ctl->lru));
FileCacheEntry *victim = dlist_container(FileCacheEntry, lru_node, dlist_pop_head_node(&lfc_ctl->lru));
CriticalAssert(victim->access_count == 0);
Assert(victim->access_count == 0);
entry->offset = victim->offset; /* grab victim's chunk */
hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL);
neon_log(DEBUG2, "Swap file cache page");
}
else if (!dlist_is_empty(&lfc_ctl->holes))
{
/* We can reuse a hole that was left behind when the LFC was shrunk previously */
FileCacheEntry *hole = dlist_container(FileCacheEntry, list_node, dlist_pop_head_node(&lfc_ctl->holes));
uint32 offset = hole->offset;
bool found;
hash_search_with_hash_value(lfc_hash, &hole->key, hole->hash, HASH_REMOVE, &found);
CriticalAssert(found);
lfc_ctl->used += 1;
entry->offset = offset; /* reuse the hole */
}
else
{
lfc_ctl->used += 1;
@@ -755,11 +689,11 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, const void
if (lfc_ctl->generation == generation)
{
CriticalAssert(LFC_ENABLED());
Assert(LFC_ENABLED());
/* Place entry to the head of LRU list */
CriticalAssert(entry->access_count > 0);
Assert(entry->access_count > 0);
if (--entry->access_count == 0)
dlist_push_tail(&lfc_ctl->lru, &entry->list_node);
dlist_push_tail(&lfc_ctl->lru, &entry->lru_node);
entry->bitmap[chunk_offs >> 5] |= (1 << (chunk_offs & 31));
}
@@ -774,6 +708,7 @@ typedef struct
} NeonGetStatsCtx;
#define NUM_NEON_GET_STATS_COLS 2
#define NUM_NEON_GET_STATS_ROWS 3
PG_FUNCTION_INFO_V1(neon_get_lfc_stats);
Datum
@@ -809,6 +744,7 @@ neon_get_lfc_stats(PG_FUNCTION_ARGS)
INT8OID, -1, 0);
fctx->tupdesc = BlessTupleDesc(tupledesc);
funcctx->max_calls = NUM_NEON_GET_STATS_ROWS;
funcctx->user_fctx = fctx;
/* Return to original context when allocating transient memory */
@@ -842,11 +778,6 @@ neon_get_lfc_stats(PG_FUNCTION_ARGS)
if (lfc_ctl)
value = lfc_ctl->writes;
break;
case 4:
key = "file_cache_size";
if (lfc_ctl)
value = lfc_ctl->size;
break;
default:
SRF_RETURN_DONE(funcctx);
}
@@ -970,7 +901,7 @@ local_cache_pages(PG_FUNCTION_ARGS)
hash_seq_init(&status, lfc_hash);
while ((entry = hash_seq_search(&status)) != NULL)
{
for (int i = 0; i < CHUNK_BITMAP_SIZE; i++)
for (int i = 0; i < BLOCKS_PER_CHUNK / 32; i++)
n_pages += pg_popcount32(entry->bitmap[i]);
}
}

View File

@@ -192,13 +192,6 @@ LogicalSlotsMonitorMain(Datum main_arg)
{
XLogRecPtr cutoff_lsn;
/* In case of a SIGHUP, just reload the configuration. */
if (ConfigReloadPending)
{
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
}
/*
* If there are too many .snap files, just drop all logical slots to
* prevent aux files bloat.

View File

@@ -54,10 +54,6 @@
#define BufTagGetNRelFileInfo(tag) tag.rnode
#define BufTagGetRelNumber(tagp) ((tagp)->rnode.relNode)
#define InvalidRelFileNumber InvalidOid
#define SMgrRelGetRelInfo(reln) \
(reln->smgr_rnode.node)

View File

@@ -113,36 +113,38 @@ impl<E: Into<AuthErrorImpl>> From<E> for AuthError {
impl UserFacingError for AuthError {
fn to_string_client(&self) -> String {
use AuthErrorImpl::*;
match self.0.as_ref() {
AuthErrorImpl::Link(e) => e.to_string_client(),
AuthErrorImpl::GetAuthInfo(e) => e.to_string_client(),
AuthErrorImpl::Sasl(e) => e.to_string_client(),
AuthErrorImpl::AuthFailed(_) => self.to_string(),
AuthErrorImpl::BadAuthMethod(_) => self.to_string(),
AuthErrorImpl::MalformedPassword(_) => self.to_string(),
AuthErrorImpl::MissingEndpointName => self.to_string(),
AuthErrorImpl::Io(_) => "Internal error".to_string(),
AuthErrorImpl::IpAddressNotAllowed(_) => self.to_string(),
AuthErrorImpl::TooManyConnections => self.to_string(),
AuthErrorImpl::UserTimeout(_) => self.to_string(),
Link(e) => e.to_string_client(),
GetAuthInfo(e) => e.to_string_client(),
Sasl(e) => e.to_string_client(),
AuthFailed(_) => self.to_string(),
BadAuthMethod(_) => self.to_string(),
MalformedPassword(_) => self.to_string(),
MissingEndpointName => self.to_string(),
Io(_) => "Internal error".to_string(),
IpAddressNotAllowed(_) => self.to_string(),
TooManyConnections => self.to_string(),
UserTimeout(_) => self.to_string(),
}
}
}
impl ReportableError for AuthError {
fn get_error_kind(&self) -> crate::error::ErrorKind {
use AuthErrorImpl::*;
match self.0.as_ref() {
AuthErrorImpl::Link(e) => e.get_error_kind(),
AuthErrorImpl::GetAuthInfo(e) => e.get_error_kind(),
AuthErrorImpl::Sasl(e) => e.get_error_kind(),
AuthErrorImpl::AuthFailed(_) => crate::error::ErrorKind::User,
AuthErrorImpl::BadAuthMethod(_) => crate::error::ErrorKind::User,
AuthErrorImpl::MalformedPassword(_) => crate::error::ErrorKind::User,
AuthErrorImpl::MissingEndpointName => crate::error::ErrorKind::User,
AuthErrorImpl::Io(_) => crate::error::ErrorKind::ClientDisconnect,
AuthErrorImpl::IpAddressNotAllowed(_) => crate::error::ErrorKind::User,
AuthErrorImpl::TooManyConnections => crate::error::ErrorKind::RateLimit,
AuthErrorImpl::UserTimeout(_) => crate::error::ErrorKind::User,
Link(e) => e.get_error_kind(),
GetAuthInfo(e) => e.get_error_kind(),
Sasl(e) => e.get_error_kind(),
AuthFailed(_) => crate::error::ErrorKind::User,
BadAuthMethod(_) => crate::error::ErrorKind::User,
MalformedPassword(_) => crate::error::ErrorKind::User,
MissingEndpointName => crate::error::ErrorKind::User,
Io(_) => crate::error::ErrorKind::ClientDisconnect,
IpAddressNotAllowed(_) => crate::error::ErrorKind::User,
TooManyConnections => crate::error::ErrorKind::RateLimit,
UserTimeout(_) => crate::error::ErrorKind::User,
}
}
}

View File

@@ -80,8 +80,9 @@ pub trait TestBackend: Send + Sync + 'static {
impl std::fmt::Display for BackendType<'_, (), ()> {
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
use BackendType::*;
match self {
Self::Console(api, _) => match &**api {
Console(api, _) => match &**api {
ConsoleBackend::Console(endpoint) => {
fmt.debug_tuple("Console").field(&endpoint.url()).finish()
}
@@ -92,7 +93,7 @@ impl std::fmt::Display for BackendType<'_, (), ()> {
#[cfg(test)]
ConsoleBackend::Test(_) => fmt.debug_tuple("Test").finish(),
},
Self::Link(url, _) => fmt.debug_tuple("Link").field(&url.as_str()).finish(),
Link(url, _) => fmt.debug_tuple("Link").field(&url.as_str()).finish(),
}
}
}
@@ -101,9 +102,10 @@ impl<T, D> BackendType<'_, T, D> {
/// Very similar to [`std::option::Option::as_ref`].
/// This helps us pass structured config to async tasks.
pub fn as_ref(&self) -> BackendType<'_, &T, &D> {
use BackendType::*;
match self {
Self::Console(c, x) => BackendType::Console(MaybeOwned::Borrowed(c), x),
Self::Link(c, x) => BackendType::Link(MaybeOwned::Borrowed(c), x),
Console(c, x) => Console(MaybeOwned::Borrowed(c), x),
Link(c, x) => Link(MaybeOwned::Borrowed(c), x),
}
}
}
@@ -113,9 +115,10 @@ impl<'a, T, D> BackendType<'a, T, D> {
/// Maps [`BackendType<T>`] to [`BackendType<R>`] by applying
/// a function to a contained value.
pub fn map<R>(self, f: impl FnOnce(T) -> R) -> BackendType<'a, R, D> {
use BackendType::*;
match self {
Self::Console(c, x) => BackendType::Console(c, f(x)),
Self::Link(c, x) => BackendType::Link(c, x),
Console(c, x) => Console(c, f(x)),
Link(c, x) => Link(c, x),
}
}
}
@@ -123,9 +126,10 @@ impl<'a, T, D, E> BackendType<'a, Result<T, E>, D> {
/// Very similar to [`std::option::Option::transpose`].
/// This is most useful for error handling.
pub fn transpose(self) -> Result<BackendType<'a, T, D>, E> {
use BackendType::*;
match self {
Self::Console(c, x) => x.map(|x| BackendType::Console(c, x)),
Self::Link(c, x) => Ok(BackendType::Link(c, x)),
Console(c, x) => x.map(|x| Console(c, x)),
Link(c, x) => Ok(Link(c, x)),
}
}
}
@@ -289,9 +293,7 @@ async fn auth_quirks(
ctx.set_endpoint_id(res.info.endpoint.clone());
let password = match res.keys {
ComputeCredentialKeys::Password(p) => p,
ComputeCredentialKeys::AuthKeys(_) => {
unreachable!("password hack should return a password")
}
_ => unreachable!("password hack should return a password"),
};
(res.info, Some(password))
}
@@ -398,17 +400,21 @@ async fn authenticate_with_secret(
impl<'a> BackendType<'a, ComputeUserInfoMaybeEndpoint, &()> {
/// Get compute endpoint name from the credentials.
pub fn get_endpoint(&self) -> Option<EndpointId> {
use BackendType::*;
match self {
Self::Console(_, user_info) => user_info.endpoint_id.clone(),
Self::Link(_, _) => Some("link".into()),
Console(_, user_info) => user_info.endpoint_id.clone(),
Link(_, _) => Some("link".into()),
}
}
/// Get username from the credentials.
pub fn get_user(&self) -> &str {
use BackendType::*;
match self {
Self::Console(_, user_info) => &user_info.user,
Self::Link(_, _) => "link",
Console(_, user_info) => &user_info.user,
Link(_, _) => "link",
}
}
@@ -422,8 +428,10 @@ impl<'a> BackendType<'a, ComputeUserInfoMaybeEndpoint, &()> {
config: &'static AuthenticationConfig,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
) -> auth::Result<BackendType<'a, ComputeCredentials, NodeInfo>> {
use BackendType::*;
let res = match self {
Self::Console(api, user_info) => {
Console(api, user_info) => {
info!(
user = &*user_info.user,
project = user_info.endpoint(),
@@ -443,7 +451,7 @@ impl<'a> BackendType<'a, ComputeUserInfoMaybeEndpoint, &()> {
BackendType::Console(api, credentials)
}
// NOTE: this auth backend doesn't use client credentials.
Self::Link(url, _) => {
Link(url, _) => {
info!("performing link authentication");
let info = link::authenticate(ctx, &url, client).await?;
@@ -462,9 +470,10 @@ impl BackendType<'_, ComputeUserInfo, &()> {
&self,
ctx: &RequestMonitoring,
) -> Result<CachedRoleSecret, GetAuthInfoError> {
use BackendType::*;
match self {
Self::Console(api, user_info) => api.get_role_secret(ctx, user_info).await,
Self::Link(_, _) => Ok(Cached::new_uncached(None)),
Console(api, user_info) => api.get_role_secret(ctx, user_info).await,
Link(_, _) => Ok(Cached::new_uncached(None)),
}
}
@@ -472,9 +481,10 @@ impl BackendType<'_, ComputeUserInfo, &()> {
&self,
ctx: &RequestMonitoring,
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), GetAuthInfoError> {
use BackendType::*;
match self {
Self::Console(api, user_info) => api.get_allowed_ips_and_secret(ctx, user_info).await,
Self::Link(_, _) => Ok((Cached::new_uncached(Arc::new(vec![])), None)),
Console(api, user_info) => api.get_allowed_ips_and_secret(ctx, user_info).await,
Link(_, _) => Ok((Cached::new_uncached(Arc::new(vec![])), None)),
}
}
}
@@ -485,16 +495,18 @@ impl ComputeConnectBackend for BackendType<'_, ComputeCredentials, NodeInfo> {
&self,
ctx: &RequestMonitoring,
) -> Result<CachedNodeInfo, console::errors::WakeComputeError> {
use BackendType::*;
match self {
Self::Console(api, creds) => api.wake_compute(ctx, &creds.info).await,
Self::Link(_, info) => Ok(Cached::new_uncached(info.clone())),
Console(api, creds) => api.wake_compute(ctx, &creds.info).await,
Link(_, info) => Ok(Cached::new_uncached(info.clone())),
}
}
fn get_keys(&self) -> Option<&ComputeCredentialKeys> {
match self {
Self::Console(_, creds) => Some(&creds.keys),
Self::Link(_, _) => None,
BackendType::Console(_, creds) => Some(&creds.keys),
BackendType::Link(_, _) => None,
}
}
}
@@ -505,16 +517,18 @@ impl ComputeConnectBackend for BackendType<'_, ComputeCredentials, &()> {
&self,
ctx: &RequestMonitoring,
) -> Result<CachedNodeInfo, console::errors::WakeComputeError> {
use BackendType::*;
match self {
Self::Console(api, creds) => api.wake_compute(ctx, &creds.info).await,
Self::Link(_, _) => unreachable!("link auth flow doesn't support waking the compute"),
Console(api, creds) => api.wake_compute(ctx, &creds.info).await,
Link(_, _) => unreachable!("link auth flow doesn't support waking the compute"),
}
}
fn get_keys(&self) -> Option<&ComputeCredentialKeys> {
match self {
Self::Console(_, creds) => Some(&creds.keys),
Self::Link(_, _) => None,
BackendType::Console(_, creds) => Some(&creds.keys),
BackendType::Link(_, _) => None,
}
}
}

View File

@@ -195,7 +195,7 @@ impl JwkCacheEntryLock {
let header = base64::decode_config(header, base64::URL_SAFE_NO_PAD)
.context("Provided authentication token is not a valid JWT encoding")?;
let header = serde_json::from_slice::<JWTHeader<'_>>(&header)
let header = serde_json::from_slice::<JWTHeader>(&header)
.context("Provided authentication token is not a valid JWT encoding")?;
let sig = base64::decode_config(signature, base64::URL_SAFE_NO_PAD)
@@ -340,7 +340,7 @@ impl JwkRenewalPermit<'_> {
}
}
async fn acquire_permit(from: &Arc<JwkCacheEntryLock>) -> JwkRenewalPermit<'_> {
async fn acquire_permit(from: &Arc<JwkCacheEntryLock>) -> JwkRenewalPermit {
match from.lookup.acquire().await {
Ok(permit) => {
permit.forget();
@@ -352,7 +352,7 @@ impl JwkRenewalPermit<'_> {
}
}
fn try_acquire_permit(from: &Arc<JwkCacheEntryLock>) -> Option<JwkRenewalPermit<'_>> {
fn try_acquire_permit(from: &Arc<JwkCacheEntryLock>) -> Option<JwkRenewalPermit> {
match from.lookup.try_acquire() {
Ok(permit) => {
permit.forget();

View File

@@ -89,12 +89,10 @@ impl ComputeUserInfoMaybeEndpoint {
sni: Option<&str>,
common_names: Option<&HashSet<String>>,
) -> Result<Self, ComputeUserInfoParseError> {
use ComputeUserInfoParseError::*;
// Some parameters are stored in the startup message.
let get_param = |key| {
params
.get(key)
.ok_or(ComputeUserInfoParseError::MissingKey(key))
};
let get_param = |key| params.get(key).ok_or(MissingKey(key));
let user: RoleName = get_param("user")?.into();
// Project name might be passed via PG's command-line options.
@@ -124,14 +122,11 @@ impl ComputeUserInfoMaybeEndpoint {
let endpoint = match (endpoint_option, endpoint_from_domain) {
// Invariant: if we have both project name variants, they should match.
(Some(option), Some(domain)) if option != domain => {
Some(Err(ComputeUserInfoParseError::InconsistentProjectNames {
domain,
option,
}))
Some(Err(InconsistentProjectNames { domain, option }))
}
// Invariant: project name may not contain certain characters.
(a, b) => a.or(b).map(|name| match project_name_valid(name.as_ref()) {
false => Err(ComputeUserInfoParseError::MalformedProjectName(name)),
false => Err(MalformedProjectName(name)),
true => Ok(name),
}),
}
@@ -191,7 +186,7 @@ impl<'de> serde::de::Deserialize<'de> for IpPattern {
impl<'de> serde::de::Visitor<'de> for StrVisitor {
type Value = IpPattern;
fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(formatter, "comma separated list with ip address, ip address range, or ip address subnet mask")
}

View File

@@ -24,7 +24,7 @@ impl<C: Cache> Cache for &C {
type LookupInfo<Key> = C::LookupInfo<Key>;
fn invalidate(&self, info: &Self::LookupInfo<Self::Key>) {
C::invalidate(self, info);
C::invalidate(self, info)
}
}

View File

@@ -58,7 +58,7 @@ impl<K: Hash + Eq, V> Cache for TimedLru<K, V> {
type LookupInfo<Key> = LookupInfo<Key>;
fn invalidate(&self, info: &Self::LookupInfo<K>) {
self.invalidate_raw(info);
self.invalidate_raw(info)
}
}

View File

@@ -44,10 +44,11 @@ pub enum ConnectionError {
impl UserFacingError for ConnectionError {
fn to_string_client(&self) -> String {
use ConnectionError::*;
match self {
// This helps us drop irrelevant library-specific prefixes.
// TODO: propagate severity level and other parameters.
ConnectionError::Postgres(err) => match err.as_db_error() {
Postgres(err) => match err.as_db_error() {
Some(err) => {
let msg = err.message();
@@ -61,8 +62,8 @@ impl UserFacingError for ConnectionError {
}
None => err.to_string(),
},
ConnectionError::WakeComputeError(err) => err.to_string_client(),
ConnectionError::TooManyConnectionAttempts(_) => {
WakeComputeError(err) => err.to_string_client(),
TooManyConnectionAttempts(_) => {
"Failed to acquire permit to connect to the database. Too many database connection attempts are currently ongoing.".to_owned()
}
_ => COULD_NOT_CONNECT.to_owned(),
@@ -365,16 +366,16 @@ static TLS_ROOTS: OnceCell<Arc<rustls::RootCertStore>> = OnceCell::new();
struct AcceptEverythingVerifier;
impl ServerCertVerifier for AcceptEverythingVerifier {
fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
use rustls::SignatureScheme;
use rustls::SignatureScheme::*;
// The schemes for which `SignatureScheme::supported_in_tls13` returns true.
vec![
SignatureScheme::ECDSA_NISTP521_SHA512,
SignatureScheme::ECDSA_NISTP384_SHA384,
SignatureScheme::ECDSA_NISTP256_SHA256,
SignatureScheme::RSA_PSS_SHA512,
SignatureScheme::RSA_PSS_SHA384,
SignatureScheme::RSA_PSS_SHA256,
SignatureScheme::ED25519,
ECDSA_NISTP521_SHA512,
ECDSA_NISTP384_SHA384,
ECDSA_NISTP256_SHA256,
RSA_PSS_SHA512,
RSA_PSS_SHA384,
RSA_PSS_SHA256,
ED25519,
]
}
fn verify_server_cert(

View File

@@ -155,7 +155,7 @@ pub enum TlsServerEndPoint {
}
impl TlsServerEndPoint {
pub fn new(cert: &CertificateDer<'_>) -> anyhow::Result<Self> {
pub fn new(cert: &CertificateDer) -> anyhow::Result<Self> {
let sha256_oids = [
// I'm explicitly not adding MD5 or SHA1 here... They're bad.
oid_registry::OID_SIG_ECDSA_WITH_SHA256,
@@ -278,7 +278,7 @@ impl CertResolver {
impl rustls::server::ResolvesServerCert for CertResolver {
fn resolve(
&self,
client_hello: rustls::server::ClientHello<'_>,
client_hello: rustls::server::ClientHello,
) -> Option<Arc<rustls::sign::CertifiedKey>> {
self.resolve(client_hello.server_name()).map(|x| x.0)
}
@@ -559,7 +559,7 @@ impl RetryConfig {
match key {
"num_retries" => num_retries = Some(value.parse()?),
"base_retry_wait_duration" => {
base_retry_wait_duration = Some(humantime::parse_duration(value)?);
base_retry_wait_duration = Some(humantime::parse_duration(value)?)
}
"retry_wait_exponent_base" => retry_wait_exponent_base = Some(value.parse()?),
unknown => bail!("unknown key: {unknown}"),

View File

@@ -22,15 +22,16 @@ impl ConsoleError {
self.status
.as_ref()
.and_then(|s| s.details.error_info.as_ref())
.map_or(Reason::Unknown, |e| e.reason)
.map(|e| e.reason)
.unwrap_or(Reason::Unknown)
}
pub fn get_user_facing_message(&self) -> String {
use super::provider::errors::REQUEST_FAILED;
self.status
.as_ref()
.and_then(|s| s.details.user_facing_message.as_ref())
.map_or_else(|| {
.map(|m| m.message.clone().into())
.unwrap_or_else(|| {
// Ask @neondatabase/control-plane for review before adding more.
match self.http_status_code {
http::StatusCode::NOT_FOUND => {
@@ -47,18 +48,19 @@ impl ConsoleError {
}
_ => REQUEST_FAILED.to_owned(),
}
}, |m| m.message.clone().into())
})
}
}
impl Display for ConsoleError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let msg: &str = self
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let msg = self
.status
.as_ref()
.and_then(|s| s.details.user_facing_message.as_ref())
.map_or_else(|| self.error.as_ref(), |m| m.message.as_ref());
write!(f, "{msg}")
.map(|m| m.message.as_ref())
.unwrap_or_else(|| &self.error);
write!(f, "{}", msg)
}
}
@@ -284,7 +286,7 @@ pub struct DatabaseInfo {
// Manually implement debug to omit sensitive info.
impl fmt::Debug for DatabaseInfo {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("DatabaseInfo")
.field("host", &self.host)
.field("port", &self.port)
@@ -371,7 +373,7 @@ mod tests {
}
}
});
let _: KickSession<'_> = serde_json::from_str(&json.to_string())?;
let _: KickSession = serde_json::from_str(&json.to_string())?;
Ok(())
}

View File

@@ -93,8 +93,7 @@ impl postgres_backend::Handler<tokio::net::TcpStream> for MgmtHandler {
}
fn try_process_query(pgb: &mut PostgresBackendTCP, query: &str) -> Result<(), QueryError> {
let resp: KickSession<'_> =
serde_json::from_str(query).context("Failed to parse query as json")?;
let resp: KickSession = serde_json::from_str(query).context("Failed to parse query as json")?;
let span = info_span!("event", session_id = resp.session_id);
let _enter = span.enter();

View File

@@ -26,7 +26,7 @@ use tracing::info;
pub mod errors {
use crate::{
console::messages::{self, ConsoleError, Reason},
error::{io_error, ErrorKind, ReportableError, UserFacingError},
error::{io_error, ReportableError, UserFacingError},
proxy::retry::CouldRetry,
};
use thiserror::Error;
@@ -51,19 +51,21 @@ pub mod errors {
impl ApiError {
/// Returns HTTP status code if it's the reason for failure.
pub fn get_reason(&self) -> messages::Reason {
use ApiError::*;
match self {
ApiError::Console(e) => e.get_reason(),
ApiError::Transport(_) => messages::Reason::Unknown,
Console(e) => e.get_reason(),
_ => messages::Reason::Unknown,
}
}
}
impl UserFacingError for ApiError {
fn to_string_client(&self) -> String {
use ApiError::*;
match self {
// To minimize risks, only select errors are forwarded to users.
ApiError::Console(c) => c.get_user_facing_message(),
ApiError::Transport(_) => REQUEST_FAILED.to_owned(),
Console(c) => c.get_user_facing_message(),
_ => REQUEST_FAILED.to_owned(),
}
}
}
@@ -71,53 +73,57 @@ pub mod errors {
impl ReportableError for ApiError {
fn get_error_kind(&self) -> crate::error::ErrorKind {
match self {
ApiError::Console(e) => match e.get_reason() {
Reason::RoleProtected => ErrorKind::User,
Reason::ResourceNotFound => ErrorKind::User,
Reason::ProjectNotFound => ErrorKind::User,
Reason::EndpointNotFound => ErrorKind::User,
Reason::BranchNotFound => ErrorKind::User,
Reason::RateLimitExceeded => ErrorKind::ServiceRateLimit,
Reason::NonDefaultBranchComputeTimeExceeded => ErrorKind::User,
Reason::ActiveTimeQuotaExceeded => ErrorKind::User,
Reason::ComputeTimeQuotaExceeded => ErrorKind::User,
Reason::WrittenDataQuotaExceeded => ErrorKind::User,
Reason::DataTransferQuotaExceeded => ErrorKind::User,
Reason::LogicalSizeQuotaExceeded => ErrorKind::User,
Reason::ConcurrencyLimitReached => ErrorKind::ControlPlane,
Reason::LockAlreadyTaken => ErrorKind::ControlPlane,
Reason::RunningOperations => ErrorKind::ControlPlane,
Reason::Unknown => match &e {
ConsoleError {
http_status_code:
http::StatusCode::NOT_FOUND | http::StatusCode::NOT_ACCEPTABLE,
..
} => crate::error::ErrorKind::User,
ConsoleError {
http_status_code: http::StatusCode::UNPROCESSABLE_ENTITY,
error,
..
} if error
.contains("compute time quota of non-primary branches is exceeded") =>
{
crate::error::ErrorKind::User
}
ConsoleError {
http_status_code: http::StatusCode::LOCKED,
error,
..
} if error.contains("quota exceeded")
|| error.contains("the limit for current plan reached") =>
{
crate::error::ErrorKind::User
}
ConsoleError {
http_status_code: http::StatusCode::TOO_MANY_REQUESTS,
..
} => crate::error::ErrorKind::ServiceRateLimit,
ConsoleError { .. } => crate::error::ErrorKind::ControlPlane,
},
},
ApiError::Console(e) => {
use crate::error::ErrorKind::*;
match e.get_reason() {
Reason::RoleProtected => User,
Reason::ResourceNotFound => User,
Reason::ProjectNotFound => User,
Reason::EndpointNotFound => User,
Reason::BranchNotFound => User,
Reason::RateLimitExceeded => ServiceRateLimit,
Reason::NonDefaultBranchComputeTimeExceeded => User,
Reason::ActiveTimeQuotaExceeded => User,
Reason::ComputeTimeQuotaExceeded => User,
Reason::WrittenDataQuotaExceeded => User,
Reason::DataTransferQuotaExceeded => User,
Reason::LogicalSizeQuotaExceeded => User,
Reason::ConcurrencyLimitReached => ControlPlane,
Reason::LockAlreadyTaken => ControlPlane,
Reason::RunningOperations => ControlPlane,
Reason::Unknown => match &e {
ConsoleError {
http_status_code:
http::StatusCode::NOT_FOUND | http::StatusCode::NOT_ACCEPTABLE,
..
} => crate::error::ErrorKind::User,
ConsoleError {
http_status_code: http::StatusCode::UNPROCESSABLE_ENTITY,
error,
..
} if error.contains(
"compute time quota of non-primary branches is exceeded",
) =>
{
crate::error::ErrorKind::User
}
ConsoleError {
http_status_code: http::StatusCode::LOCKED,
error,
..
} if error.contains("quota exceeded")
|| error.contains("the limit for current plan reached") =>
{
crate::error::ErrorKind::User
}
ConsoleError {
http_status_code: http::StatusCode::TOO_MANY_REQUESTS,
..
} => crate::error::ErrorKind::ServiceRateLimit,
ConsoleError { .. } => crate::error::ErrorKind::ControlPlane,
},
}
}
ApiError::Transport(_) => crate::error::ErrorKind::ControlPlane,
}
}
@@ -164,11 +170,12 @@ pub mod errors {
impl UserFacingError for GetAuthInfoError {
fn to_string_client(&self) -> String {
use GetAuthInfoError::*;
match self {
// We absolutely should not leak any secrets!
Self::BadSecret => REQUEST_FAILED.to_owned(),
BadSecret => REQUEST_FAILED.to_owned(),
// However, API might return a meaningful error.
Self::ApiError(e) => e.to_string_client(),
ApiError(e) => e.to_string_client(),
}
}
}
@@ -176,8 +183,8 @@ pub mod errors {
impl ReportableError for GetAuthInfoError {
fn get_error_kind(&self) -> crate::error::ErrorKind {
match self {
Self::BadSecret => crate::error::ErrorKind::ControlPlane,
Self::ApiError(_) => crate::error::ErrorKind::ControlPlane,
GetAuthInfoError::BadSecret => crate::error::ErrorKind::ControlPlane,
GetAuthInfoError::ApiError(_) => crate::error::ErrorKind::ControlPlane,
}
}
}
@@ -206,16 +213,17 @@ pub mod errors {
impl UserFacingError for WakeComputeError {
fn to_string_client(&self) -> String {
use WakeComputeError::*;
match self {
// We shouldn't show user the address even if it's broken.
// Besides, user is unlikely to care about this detail.
Self::BadComputeAddress(_) => REQUEST_FAILED.to_owned(),
BadComputeAddress(_) => REQUEST_FAILED.to_owned(),
// However, API might return a meaningful error.
Self::ApiError(e) => e.to_string_client(),
ApiError(e) => e.to_string_client(),
Self::TooManyConnections => self.to_string(),
TooManyConnections => self.to_string(),
Self::TooManyConnectionAttempts(_) => {
TooManyConnectionAttempts(_) => {
"Failed to acquire permit to connect to the database. Too many database connection attempts are currently ongoing.".to_owned()
}
}
@@ -225,10 +233,10 @@ pub mod errors {
impl ReportableError for WakeComputeError {
fn get_error_kind(&self) -> crate::error::ErrorKind {
match self {
Self::BadComputeAddress(_) => crate::error::ErrorKind::ControlPlane,
Self::ApiError(e) => e.get_error_kind(),
Self::TooManyConnections => crate::error::ErrorKind::RateLimit,
Self::TooManyConnectionAttempts(e) => e.get_error_kind(),
WakeComputeError::BadComputeAddress(_) => crate::error::ErrorKind::ControlPlane,
WakeComputeError::ApiError(e) => e.get_error_kind(),
WakeComputeError::TooManyConnections => crate::error::ErrorKind::RateLimit,
WakeComputeError::TooManyConnectionAttempts(e) => e.get_error_kind(),
}
}
}
@@ -236,10 +244,10 @@ pub mod errors {
impl CouldRetry for WakeComputeError {
fn could_retry(&self) -> bool {
match self {
Self::BadComputeAddress(_) => false,
Self::ApiError(e) => e.could_retry(),
Self::TooManyConnections => false,
Self::TooManyConnectionAttempts(_) => false,
WakeComputeError::BadComputeAddress(_) => false,
WakeComputeError::ApiError(e) => e.could_retry(),
WakeComputeError::TooManyConnections => false,
WakeComputeError::TooManyConnectionAttempts(_) => false,
}
}
}
@@ -358,14 +366,13 @@ impl Api for ConsoleBackend {
ctx: &RequestMonitoring,
user_info: &ComputeUserInfo,
) -> Result<CachedRoleSecret, errors::GetAuthInfoError> {
use ConsoleBackend::*;
match self {
Self::Console(api) => api.get_role_secret(ctx, user_info).await,
Console(api) => api.get_role_secret(ctx, user_info).await,
#[cfg(any(test, feature = "testing"))]
Self::Postgres(api) => api.get_role_secret(ctx, user_info).await,
Postgres(api) => api.get_role_secret(ctx, user_info).await,
#[cfg(test)]
Self::Test(_) => {
unreachable!("this function should never be called in the test backend")
}
Test(_) => unreachable!("this function should never be called in the test backend"),
}
}
@@ -374,12 +381,13 @@ impl Api for ConsoleBackend {
ctx: &RequestMonitoring,
user_info: &ComputeUserInfo,
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), errors::GetAuthInfoError> {
use ConsoleBackend::*;
match self {
Self::Console(api) => api.get_allowed_ips_and_secret(ctx, user_info).await,
Console(api) => api.get_allowed_ips_and_secret(ctx, user_info).await,
#[cfg(any(test, feature = "testing"))]
Self::Postgres(api) => api.get_allowed_ips_and_secret(ctx, user_info).await,
Postgres(api) => api.get_allowed_ips_and_secret(ctx, user_info).await,
#[cfg(test)]
Self::Test(api) => api.get_allowed_ips_and_secret(),
Test(api) => api.get_allowed_ips_and_secret(),
}
}
@@ -388,12 +396,14 @@ impl Api for ConsoleBackend {
ctx: &RequestMonitoring,
user_info: &ComputeUserInfo,
) -> Result<CachedNodeInfo, errors::WakeComputeError> {
use ConsoleBackend::*;
match self {
Self::Console(api) => api.wake_compute(ctx, user_info).await,
Console(api) => api.wake_compute(ctx, user_info).await,
#[cfg(any(test, feature = "testing"))]
Self::Postgres(api) => api.wake_compute(ctx, user_info).await,
Postgres(api) => api.wake_compute(ctx, user_info).await,
#[cfg(test)]
Self::Test(api) => api.wake_compute(),
Test(api) => api.wake_compute(),
}
}
}
@@ -539,7 +549,7 @@ impl WakeComputePermit {
!self.permit.is_disabled()
}
pub fn release(self, outcome: Outcome) {
self.permit.release(outcome);
self.permit.release(outcome)
}
pub fn release_result<T, E>(self, res: Result<T, E>) -> Result<T, E> {
match res {

View File

@@ -166,7 +166,7 @@ impl RequestMonitoring {
pub fn set_project(&self, x: MetricsAuxInfo) {
let mut this = self.0.try_lock().expect("should not deadlock");
if this.endpoint_id.is_none() {
this.set_endpoint_id(x.endpoint_id.as_str().into());
this.set_endpoint_id(x.endpoint_id.as_str().into())
}
this.branch = Some(x.branch_id);
this.project = Some(x.project_id);
@@ -260,7 +260,7 @@ impl RequestMonitoring {
.cold_start_info
}
pub fn latency_timer_pause(&self, waiting_for: Waiting) -> LatencyTimerPause<'_> {
pub fn latency_timer_pause(&self, waiting_for: Waiting) -> LatencyTimerPause {
LatencyTimerPause {
ctx: self,
start: tokio::time::Instant::now(),
@@ -273,7 +273,7 @@ impl RequestMonitoring {
.try_lock()
.expect("should not deadlock")
.latency_timer
.success();
.success()
}
}
@@ -328,7 +328,7 @@ impl RequestMonitoringInner {
fn has_private_peer_addr(&self) -> bool {
match self.peer_addr {
IpAddr::V4(ip) => ip.is_private(),
IpAddr::V6(_) => false,
_ => false,
}
}

View File

@@ -736,7 +736,7 @@ mod tests {
while let Some(r) = s.next().await {
tx.send(r).unwrap();
}
time::sleep(time::Duration::from_secs(70)).await;
time::sleep(time::Duration::from_secs(70)).await
}
});

View File

@@ -56,7 +56,7 @@ impl<'de, Id: InternId> serde::de::Deserialize<'de> for InternedString<Id> {
impl<'de, Id: InternId> serde::de::Visitor<'de> for Visitor<Id> {
type Value = InternedString<Id>;
fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("a string")
}

View File

@@ -252,7 +252,7 @@ impl Drop for HttpEndpointPoolsGuard<'_> {
}
impl HttpEndpointPools {
pub fn guard(&self) -> HttpEndpointPoolsGuard<'_> {
pub fn guard(&self) -> HttpEndpointPoolsGuard {
self.http_pool_endpoints_registered_total.inc();
HttpEndpointPoolsGuard {
dec: &self.http_pool_endpoints_unregistered_total,

View File

@@ -184,7 +184,7 @@ impl CopyBuffer {
}
Poll::Pending
}
res @ Poll::Ready(_) => res.map_err(ErrorDirection::Write),
res => res.map_err(ErrorDirection::Write),
}
}

View File

@@ -82,8 +82,9 @@ pub async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
let mut stream = PqStream::new(Stream::from_raw(stream));
loop {
let msg = stream.read_startup_packet().await?;
use FeStartupPacket::*;
match msg {
FeStartupPacket::SslRequest { direct } => match stream.get_ref() {
SslRequest { direct } => match stream.get_ref() {
Stream::Raw { .. } if !tried_ssl => {
tried_ssl = true;
@@ -138,7 +139,7 @@ pub async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
let tls_stream = accept.await.inspect_err(|_| {
if record_handshake_error {
Metrics::get().proxy.tls_handshake_failures.inc();
Metrics::get().proxy.tls_handshake_failures.inc()
}
})?;
@@ -181,7 +182,7 @@ pub async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
}
_ => return Err(HandshakeError::ProtocolViolation),
},
FeStartupPacket::GssEncRequest => match stream.get_ref() {
GssEncRequest => match stream.get_ref() {
Stream::Raw { .. } if !tried_gss => {
tried_gss = true;
@@ -190,7 +191,7 @@ pub async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
}
_ => return Err(HandshakeError::ProtocolViolation),
},
FeStartupPacket::StartupMessage { params, version }
StartupMessage { params, version }
if PG_PROTOCOL_EARLIEST <= version && version <= PG_PROTOCOL_LATEST =>
{
// Check that the config has been consumed during upgrade
@@ -210,7 +211,7 @@ pub async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
break Ok(HandshakeData::Startup(stream, params));
}
// downgrade protocol version
FeStartupPacket::StartupMessage { params, version }
StartupMessage { params, version }
if version.major() == 3 && version > PG_PROTOCOL_LATEST =>
{
warn!(?version, "unsupported minor version");
@@ -240,7 +241,7 @@ pub async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
);
break Ok(HandshakeData::Startup(stream, params));
}
FeStartupPacket::StartupMessage { version, .. } => {
StartupMessage { version, .. } => {
warn!(
?version,
session_type = "normal",
@@ -248,7 +249,7 @@ pub async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
);
return Err(HandshakeError::ProtocolViolation);
}
FeStartupPacket::CancelRequest(cancel_key_data) => {
CancelRequest(cancel_key_data) => {
info!(session_type = "cancellation", "successful handshake");
break Ok(HandshakeData::Cancel(cancel_key_data));
}

View File

@@ -68,7 +68,7 @@ async fn proxy_mitm(
end_client.send(Bytes::from_static(b"R\0\0\0\x17\0\0\0\x0aSCRAM-SHA-256\0\0")).await.unwrap();
continue;
}
end_client.send(message).await.unwrap();
end_client.send(message).await.unwrap()
}
_ => break,
}
@@ -88,7 +88,7 @@ async fn proxy_mitm(
end_server.send(buf.freeze()).await.unwrap();
continue;
}
end_server.send(message).await.unwrap();
end_server.send(message).await.unwrap()
}
_ => break,
}

View File

@@ -237,7 +237,7 @@ impl Token {
}
pub fn release(mut self, outcome: Outcome) {
self.release_mut(Some(outcome));
self.release_mut(Some(outcome))
}
pub fn release_mut(&mut self, outcome: Option<Outcome>) {
@@ -249,7 +249,7 @@ impl Token {
impl Drop for Token {
fn drop(&mut self) {
self.release_mut(None);
self.release_mut(None)
}
}

View File

@@ -25,8 +25,9 @@ pub struct Aimd {
impl LimitAlgorithm for Aimd {
fn update(&self, old_limit: usize, sample: Sample) -> usize {
use Outcome::*;
match sample.outcome {
Outcome::Success => {
Success => {
let utilisation = sample.in_flight as f32 / old_limit as f32;
if utilisation > self.utilisation {
@@ -41,7 +42,7 @@ impl LimitAlgorithm for Aimd {
old_limit
}
}
Outcome::Overload => {
Overload => {
let limit = old_limit as f32 * self.dec;
// Floor instead of round, so the limit reduces even with small numbers.

View File

@@ -98,7 +98,7 @@ impl ConnectionWithCredentialsProvider {
info!("Establishing a new connection...");
self.con = None;
if let Some(f) = self.refresh_token_task.take() {
f.abort();
f.abort()
}
let mut con = self
.get_client()

View File

@@ -108,6 +108,7 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
}
#[tracing::instrument(skip(self, msg), fields(session_id = tracing::field::Empty))]
async fn handle_message(&self, msg: redis::Msg) -> anyhow::Result<()> {
use Notification::*;
let payload: String = msg.get_payload()?;
tracing::debug!(?payload, "received a message payload");
@@ -123,7 +124,7 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
};
tracing::debug!(?msg, "received a message");
match msg {
Notification::Cancel(cancel_session) => {
Cancel(cancel_session) => {
tracing::Span::current().record(
"session_id",
tracing::field::display(cancel_session.session_id),
@@ -152,12 +153,12 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
}
_ => {
invalidate_cache(self.cache.clone(), msg.clone());
if matches!(msg, Notification::AllowedIpsUpdate { .. }) {
if matches!(msg, AllowedIpsUpdate { .. }) {
Metrics::get()
.proxy
.redis_events_count
.inc(RedisEventsCount::AllowedIpsUpdate);
} else if matches!(msg, Notification::PasswordUpdate { .. }) {
} else if matches!(msg, PasswordUpdate { .. }) {
Metrics::get()
.proxy
.redis_events_count
@@ -179,16 +180,16 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
}
fn invalidate_cache<C: ProjectInfoCache>(cache: Arc<C>, msg: Notification) {
use Notification::*;
match msg {
Notification::AllowedIpsUpdate { allowed_ips_update } => {
cache.invalidate_allowed_ips_for_project(allowed_ips_update.project_id);
AllowedIpsUpdate { allowed_ips_update } => {
cache.invalidate_allowed_ips_for_project(allowed_ips_update.project_id)
}
Notification::PasswordUpdate { password_update } => cache
.invalidate_role_secret_for_project(
password_update.project_id,
password_update.role_name,
),
Notification::Cancel(_) => unreachable!("cancel message should be handled separately"),
PasswordUpdate { password_update } => cache.invalidate_role_secret_for_project(
password_update.project_id,
password_update.role_name,
),
Cancel(_) => unreachable!("cancel message should be handled separately"),
}
}

View File

@@ -42,9 +42,10 @@ pub enum Error {
impl UserFacingError for Error {
fn to_string_client(&self) -> String {
use Error::*;
match self {
Self::ChannelBindingFailed(m) => (*m).to_string(),
Self::ChannelBindingBadMethod(m) => format!("unsupported channel binding method {m}"),
ChannelBindingFailed(m) => m.to_string(),
ChannelBindingBadMethod(m) => format!("unsupported channel binding method {m}"),
_ => "authentication protocol violation".to_string(),
}
}

View File

@@ -13,10 +13,11 @@ pub enum ChannelBinding<T> {
impl<T> ChannelBinding<T> {
pub fn and_then<R, E>(self, f: impl FnOnce(T) -> Result<R, E>) -> Result<ChannelBinding<R>, E> {
use ChannelBinding::*;
Ok(match self {
Self::NotSupportedClient => ChannelBinding::NotSupportedClient,
Self::NotSupportedServer => ChannelBinding::NotSupportedServer,
Self::Required(x) => ChannelBinding::Required(f(x)?),
NotSupportedClient => NotSupportedClient,
NotSupportedServer => NotSupportedServer,
Required(x) => Required(f(x)?),
})
}
}
@@ -24,10 +25,11 @@ impl<T> ChannelBinding<T> {
impl<'a> ChannelBinding<&'a str> {
// NB: FromStr doesn't work with lifetimes
pub fn parse(input: &'a str) -> Option<Self> {
use ChannelBinding::*;
Some(match input {
"n" => Self::NotSupportedClient,
"y" => Self::NotSupportedServer,
other => Self::Required(other.strip_prefix("p=")?),
"n" => NotSupportedClient,
"y" => NotSupportedServer,
other => Required(other.strip_prefix("p=")?),
})
}
}
@@ -38,16 +40,17 @@ impl<T: std::fmt::Display> ChannelBinding<T> {
&self,
get_cbind_data: impl FnOnce(&T) -> Result<&'a [u8], E>,
) -> Result<std::borrow::Cow<'static, str>, E> {
use ChannelBinding::*;
Ok(match self {
Self::NotSupportedClient => {
NotSupportedClient => {
// base64::encode("n,,")
"biws".into()
}
Self::NotSupportedServer => {
NotSupportedServer => {
// base64::encode("y,,")
"eSws".into()
}
Self::Required(mode) => {
Required(mode) => {
use std::io::Write;
let mut cbind_input = vec![];
write!(&mut cbind_input, "p={mode},,",).unwrap();

View File

@@ -42,9 +42,10 @@ pub(super) enum ServerMessage<T> {
impl<'a> ServerMessage<&'a str> {
pub(super) fn to_reply(&self) -> BeMessage<'a> {
use BeAuthenticationSaslMessage::*;
BeMessage::AuthenticationSasl(match self {
ServerMessage::Continue(s) => BeAuthenticationSaslMessage::Continue(s.as_bytes()),
ServerMessage::Final(s) => BeAuthenticationSaslMessage::Final(s.as_bytes()),
ServerMessage::Continue(s) => Continue(s.as_bytes()),
ServerMessage::Final(s) => Final(s.as_bytes()),
})
}
}

View File

@@ -137,12 +137,12 @@ mod tests {
#[tokio::test]
async fn round_trip() {
run_round_trip_test("pencil", "pencil").await;
run_round_trip_test("pencil", "pencil").await
}
#[tokio::test]
#[should_panic(expected = "password doesn't match")]
async fn failure() {
run_round_trip_test("pencil", "eraser").await;
run_round_trip_test("pencil", "eraser").await
}
}

View File

@@ -98,6 +98,8 @@ mod tests {
// q% of counts will be within p of the actual value
let mut sketch = CountMinSketch::with_params(p / N as f64, 1.0 - q);
dbg!(sketch.buckets.len());
// insert a bunch of entries in a random order
let mut ids2 = ids.clone();
while !ids2.is_empty() {

View File

@@ -210,23 +210,23 @@ impl sasl::Mechanism for Exchange<'_> {
type Output = super::ScramKey;
fn exchange(mut self, input: &str) -> sasl::Result<sasl::Step<Self, Self::Output>> {
use {sasl::Step, ExchangeState};
use {sasl::Step::*, ExchangeState::*};
match &self.state {
ExchangeState::Initial(init) => {
Initial(init) => {
match init.transition(self.secret, &self.tls_server_end_point, input)? {
Step::Continue(sent, msg) => {
self.state = ExchangeState::SaltSent(sent);
Ok(Step::Continue(self, msg))
Continue(sent, msg) => {
self.state = SaltSent(sent);
Ok(Continue(self, msg))
}
Step::Success(x, _) => match x {},
Step::Failure(msg) => Ok(Step::Failure(msg)),
Success(x, _) => match x {},
Failure(msg) => Ok(Failure(msg)),
}
}
ExchangeState::SaltSent(sent) => {
SaltSent(sent) => {
match sent.transition(self.secret, &self.tls_server_end_point, input)? {
Step::Success(keys, msg) => Ok(Step::Success(keys, msg)),
Step::Continue(x, _) => match x {},
Step::Failure(msg) => Ok(Step::Failure(msg)),
Success(keys, msg) => Ok(Success(keys, msg)),
Continue(x, _) => match x {},
Failure(msg) => Ok(Failure(msg)),
}
}
}

View File

@@ -59,7 +59,7 @@ impl<'a> ClientFirstMessage<'a> {
// https://github.com/postgres/postgres/blob/f83908798f78c4cafda217ca875602c88ea2ae28/src/backend/libpq/auth-scram.c#L13-L14
if !username.is_empty() {
tracing::warn!(username, "scram username provided, but is not expected");
tracing::warn!(username, "scram username provided, but is not expected")
// TODO(conrad):
// return None;
}
@@ -137,7 +137,7 @@ impl<'a> ClientFinalMessage<'a> {
/// Build a response to [`ClientFinalMessage`].
pub fn build_server_final_message(
&self,
signature_builder: SignatureBuilder<'_>,
signature_builder: SignatureBuilder,
server_key: &ScramKey,
) -> String {
let mut buf = String::from("v=");
@@ -212,7 +212,7 @@ mod tests {
#[test]
fn parse_client_first_message_with_invalid_gs2_authz() {
assert!(ClientFirstMessage::parse("n,authzid,n=,r=nonce").is_none());
assert!(ClientFirstMessage::parse("n,authzid,n=,r=nonce").is_none())
}
#[test]

View File

@@ -84,6 +84,6 @@ mod tests {
};
let expected = pbkdf2_hmac_array::<Sha256, 32>(pass, salt, 600000);
assert_eq!(hash, expected);
assert_eq!(hash, expected)
}
}

View File

@@ -270,7 +270,7 @@ fn thread_rt(pool: Arc<ThreadPool>, worker: Worker<JobSpec>, index: usize) {
.inc(ThreadPoolWorkerId(index));
// skip for now
worker.push(job);
worker.push(job)
}
}
@@ -316,6 +316,6 @@ mod tests {
10, 114, 73, 188, 140, 222, 196, 156, 214, 184, 79, 157, 119, 242, 16, 31, 53, 242,
178, 43, 95, 8, 225, 182, 122, 40, 219, 21, 89, 147, 64, 140,
];
assert_eq!(actual, expected);
assert_eq!(actual, expected)
}
}

View File

@@ -120,7 +120,7 @@ pub async fn task_main(
tracing::trace!("attempting to cancel a random connection");
if let Some(token) = config.http_config.cancel_set.take() {
tracing::debug!("cancelling a random connection");
token.cancel();
token.cancel()
}
}
@@ -198,7 +198,7 @@ async fn connection_startup(
let peer_addr = peer.unwrap_or(peer_addr).ip();
let has_private_peer_addr = match peer_addr {
IpAddr::V4(ip) => ip.is_private(),
IpAddr::V6(_) => false,
_ => false,
};
info!(?session_id, %peer_addr, "accepted new TCP connection");

View File

@@ -390,7 +390,7 @@ impl<C: ClientInnerExt> GlobalConnPool<C> {
.write()
.get_conn_entry(conn_info.db_and_user())
{
client = Some(entry.conn);
client = Some(entry.conn)
}
let endpoint_pool = Arc::downgrade(&endpoint_pool);
@@ -662,13 +662,13 @@ impl<C: ClientInnerExt> Discard<'_, C> {
pub fn check_idle(&mut self, status: ReadyForQueryStatus) {
let conn_info = &self.conn_info;
if status != ReadyForQueryStatus::Idle && std::mem::take(self.pool).strong_count() > 0 {
info!("pool: throwing away connection '{conn_info}' because connection is not idle");
info!("pool: throwing away connection '{conn_info}' because connection is not idle")
}
}
pub fn discard(&mut self) {
let conn_info = &self.conn_info;
if std::mem::take(self.pool).strong_count() > 0 {
info!("pool: throwing away connection '{conn_info}' because connection is potentially in a broken state");
info!("pool: throwing away connection '{conn_info}' because connection is potentially in a broken state")
}
}
}

View File

@@ -234,7 +234,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Stream<S> {
.await
.inspect_err(|_| {
if record_handshake_error {
Metrics::get().proxy.tls_handshake_failures.inc();
Metrics::get().proxy.tls_handshake_failures.inc()
}
})?),
Stream::Tls { .. } => Err(StreamUpgradeError::AlreadyTls),

View File

@@ -12,7 +12,7 @@ impl ApiUrl {
}
/// See [`url::Url::path_segments_mut`].
pub fn path_segments_mut(&mut self) -> url::PathSegmentsMut<'_> {
pub fn path_segments_mut(&mut self) -> url::PathSegmentsMut {
// We've already verified that it works during construction.
self.0.path_segments_mut().expect("bad API url")
}

View File

@@ -36,7 +36,7 @@ impl<T> Default for Waiters<T> {
}
impl<T> Waiters<T> {
pub fn register(&self, key: String) -> Result<Waiter<'_, T>, RegisterError> {
pub fn register(&self, key: String) -> Result<Waiter<T>, RegisterError> {
let (tx, rx) = oneshot::channel();
self.0

View File

@@ -92,7 +92,7 @@ impl TermHistory {
}
/// Find point of divergence between leader (walproposer) term history and
/// safekeeper. Arguments are not symmetric as proposer history ends at
/// safekeeper. Arguments are not symmetrics as proposer history ends at
/// +infinity while safekeeper at flush_lsn.
/// C version is at walproposer SendProposerElected.
pub fn find_highest_common_point(
@@ -701,13 +701,7 @@ where
.with_label_values(&["handle_elected"])
.start_timer();
info!(
"received ProposerElected {:?}, term={}, last_log_term={}, flush_lsn={}",
msg,
self.state.acceptor_state.term,
self.get_last_log_term(),
self.flush_lsn()
);
info!("received ProposerElected {:?}", msg);
if self.state.acceptor_state.term < msg.term {
let mut state = self.state.start_change();
state.acceptor_state.term = msg.term;
@@ -719,43 +713,22 @@ where
return Ok(None);
}
// Before truncating WAL check-cross the check divergence point received
// from the walproposer.
let sk_th = self.get_term_history();
let last_common_point = match TermHistory::find_highest_common_point(
&msg.term_history,
&sk_th,
self.flush_lsn(),
) {
// No common point. Expect streaming from the beginning of the
// history like walproposer while we don't have proper init.
None => *msg.term_history.0.first().ok_or(anyhow::anyhow!(
"empty walproposer term history {:?}",
msg.term_history
))?,
Some(lcp) => lcp,
};
// This is expected to happen in a rare race when another connection
// from the same walproposer writes + flushes WAL after this connection
// sent flush_lsn in VoteRequest; for instance, very late
// ProposerElected message delivery after another connection was
// established and wrote WAL. In such cases error is transient;
// reconnection makes safekeeper send newest term history and flush_lsn
// and walproposer recalculates the streaming point. OTOH repeating
// error indicates a serious bug.
if last_common_point.lsn != msg.start_streaming_at {
bail!("refusing ProposerElected with unexpected truncation point: lcp={:?} start_streaming_at={}, term={}, sk_th={:?} flush_lsn={}, wp_th={:?}",
last_common_point, msg.start_streaming_at,
self.state.acceptor_state.term, sk_th, self.flush_lsn(), msg.term_history,
);
// This might happen in a rare race when another (old) connection from
// the same walproposer writes + flushes WAL after this connection
// already sent flush_lsn in VoteRequest. It is generally safe to
// proceed, but to prevent commit_lsn surprisingly going down we should
// either refuse the session (simpler) or skip the part we already have
// from the stream (can be implemented).
if msg.term == self.get_last_log_term() && self.flush_lsn() > msg.start_streaming_at {
bail!("refusing ProposerElected which is going to overwrite correct WAL: term={}, flush_lsn={}, start_streaming_at={}; restarting the handshake should help",
msg.term, self.flush_lsn(), msg.start_streaming_at)
}
// We are also expected to never attempt to truncate committed data.
// Otherwise we must never attempt to truncate committed data.
assert!(
msg.start_streaming_at >= self.state.inmem.commit_lsn,
"attempt to truncate committed data: start_streaming_at={}, commit_lsn={}, term={}, sk_th={:?} flush_lsn={}, wp_th={:?}",
msg.start_streaming_at, self.state.inmem.commit_lsn,
self.state.acceptor_state.term, sk_th, self.flush_lsn(), msg.term_history,
"attempt to truncate committed data: start_streaming_at={}, commit_lsn={}",
msg.start_streaming_at,
self.state.inmem.commit_lsn
);
// Before first WAL write initialize its segment. It makes first segment
@@ -770,6 +743,9 @@ where
.await?;
}
// TODO: cross check divergence point, check if msg.start_streaming_at corresponds to
// intersection of our history and history from msg
// truncate wal, update the LSNs
self.wal_store.truncate_wal(msg.start_streaming_at).await?;
@@ -1093,7 +1069,7 @@ mod tests {
let pem = ProposerElected {
term: 1,
start_streaming_at: Lsn(3),
start_streaming_at: Lsn(1),
term_history: TermHistory(vec![TermLsn {
term: 1,
lsn: Lsn(3),

View File

@@ -18,7 +18,6 @@ import psycopg2
from psycopg2.extras import execute_values
CREATE_TABLE = """
CREATE TYPE arch AS ENUM ('ARM64', 'X64', 'UNKNOWN');
CREATE TABLE IF NOT EXISTS results (
id BIGSERIAL PRIMARY KEY,
parent_suite TEXT NOT NULL,
@@ -29,7 +28,6 @@ CREATE TABLE IF NOT EXISTS results (
stopped_at TIMESTAMPTZ NOT NULL,
duration INT NOT NULL,
flaky BOOLEAN NOT NULL,
arch arch DEFAULT 'X64',
build_type TEXT NOT NULL,
pg_version INT NOT NULL,
run_id BIGINT NOT NULL,
@@ -37,7 +35,7 @@ CREATE TABLE IF NOT EXISTS results (
reference TEXT NOT NULL,
revision CHAR(40) NOT NULL,
raw JSONB COMPRESSION lz4 NOT NULL,
UNIQUE (parent_suite, suite, name, arch, build_type, pg_version, started_at, stopped_at, run_id)
UNIQUE (parent_suite, suite, name, build_type, pg_version, started_at, stopped_at, run_id)
);
"""
@@ -52,7 +50,6 @@ class Row:
stopped_at: datetime
duration: int
flaky: bool
arch: str
build_type: str
pg_version: int
run_id: int
@@ -124,14 +121,6 @@ def ingest_test_result(
raw.pop("labels")
raw.pop("extra")
# All allure parameters are prefixed with "__", see test_runner/fixtures/parametrize.py
parameters = {
p["name"].removeprefix("__"): p["value"]
for p in test["parameters"]
if p["name"].startswith("__")
}
arch = parameters.get("arch", "UNKNOWN").strip("'")
build_type, pg_version, unparametrized_name = parse_test_name(test["name"])
labels = {label["name"]: label["value"] for label in test["labels"]}
row = Row(
@@ -143,7 +132,6 @@ def ingest_test_result(
stopped_at=datetime.fromtimestamp(test["time"]["stop"] / 1000, tz=timezone.utc),
duration=test["time"]["duration"],
flaky=test["flaky"] or test["retriesStatusChange"],
arch=arch,
build_type=build_type,
pg_version=pg_version,
run_id=run_id,

View File

@@ -44,7 +44,7 @@ run the following commands from the top of the neon.git checkout
# test suite run
export TEST_OUTPUT="$TEST_OUTPUT"
DEFAULT_PG_VERSION=16 BUILD_TYPE=release ./scripts/pytest test_runner/performance/test_latency.py
DEFAULT_PG_VERSION=15 BUILD_TYPE=release ./scripts/pytest test_runner/performance/test_latency.py
# for interactive use
export NEON_REPO_DIR="$NEON_REPO_DIR"

View File

@@ -520,19 +520,6 @@ async fn handle_node_status(req: Request<Body>) -> Result<Response<Body>, ApiErr
json_response(StatusCode::OK, node_status)
}
async fn handle_get_leader(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
let state = get_state(&req);
let leader = state.service.get_leader().await.map_err(|err| {
ApiError::InternalServerError(anyhow::anyhow!(
"Failed to read leader from database: {err}"
))
})?;
json_response(StatusCode::OK, leader)
}
async fn handle_node_drain(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
@@ -1029,9 +1016,6 @@ pub fn make_router(
.get("/control/v1/node/:node_id", |r| {
named_request_span(r, handle_node_status, RequestName("control_v1_node_status"))
})
.get("/control/v1/leader", |r| {
named_request_span(r, handle_get_leader, RequestName("control_v1_get_leader"))
})
.put("/control/v1/node/:node_id/drain", |r| {
named_request_span(r, handle_node_drain, RequestName("control_v1_node_drain"))
})

View File

@@ -196,26 +196,14 @@ async fn migration_run(database_url: &str) -> anyhow::Result<()> {
}
fn main() -> anyhow::Result<()> {
logging::init(
LogFormat::Plain,
logging::TracingErrorLayerEnablement::Disabled,
logging::Output::Stdout,
)?;
// log using tracing so we don't get confused output by default hook writing to stderr
utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
let hook = std::panic::take_hook();
let default_panic = std::panic::take_hook();
std::panic::set_hook(Box::new(move |info| {
// let sentry send a message (and flush)
// and trace the error
hook(info);
default_panic(info);
std::process::exit(1);
}));
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
tokio::runtime::Builder::new_current_thread()
// We use spawn_blocking for database operations, so require approximately
// as many blocking threads as we will open database connections.
@@ -229,6 +217,12 @@ fn main() -> anyhow::Result<()> {
async fn async_main() -> anyhow::Result<()> {
let launch_ts = Box::leak(Box::new(LaunchTimestamp::generate()));
logging::init(
LogFormat::Plain,
logging::TracingErrorLayerEnablement::Disabled,
logging::Output::Stdout,
)?;
preinitialize_metrics();
let args = Cli::parse();

View File

@@ -1,7 +1,7 @@
use crate::tenant_shard::ObservedState;
use pageserver_api::shard::TenantShardId;
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, time::Duration};
use std::collections::HashMap;
use tokio_util::sync::CancellationToken;
use hyper::Uri;
@@ -69,8 +69,6 @@ impl PeerClient {
req
};
let req = req.timeout(Duration::from_secs(2));
let res = req
.send()
.await

View File

@@ -20,8 +20,7 @@ use crate::{
metrics,
peer_client::{GlobalObservedState, PeerClient},
persistence::{
AbortShardSplitStatus, ControllerPersistence, DatabaseResult, MetadataHealthPersistence,
TenantFilter,
AbortShardSplitStatus, ControllerPersistence, MetadataHealthPersistence, TenantFilter,
},
reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder},
scheduler::{MaySchedule, ScheduleContext, ScheduleMode},
@@ -490,6 +489,11 @@ pub(crate) enum ReconcileResultRequest {
Stop,
}
struct LeaderStepDownState {
observed: GlobalObservedState,
leader: ControllerPersistence,
}
impl Service {
pub fn get_config(&self) -> &Config {
&self.config
@@ -500,8 +504,7 @@ impl Service {
#[instrument(skip_all)]
async fn startup_reconcile(
self: &Arc<Service>,
current_leader: Option<ControllerPersistence>,
leader_step_down_state: Option<GlobalObservedState>,
leader_step_down_state: Option<LeaderStepDownState>,
bg_compute_notify_result_tx: tokio::sync::mpsc::Sender<
Result<(), (TenantShardId, NotifyError)>,
>,
@@ -519,15 +522,17 @@ impl Service {
.checked_add(STARTUP_RECONCILE_TIMEOUT / 2)
.expect("Reconcile timeout is a modest constant");
let observed = if let Some(state) = leader_step_down_state {
let (observed, current_leader) = if let Some(state) = leader_step_down_state {
tracing::info!(
"Using observed state received from leader at {}",
current_leader.as_ref().unwrap().address
state.leader.address,
);
state
(state.observed, Some(state.leader))
} else {
self.build_global_observed_state(node_scan_deadline).await
(
self.build_global_observed_state(node_scan_deadline).await,
None,
)
};
// Accumulate a list of any tenant locations that ought to be detached
@@ -1377,32 +1382,13 @@ impl Service {
};
let leadership_status = this.inner.read().unwrap().get_leadership_status();
let leader = match this.get_leader().await {
Ok(ok) => ok,
Err(err) => {
tracing::error!(
"Failed to query database for current leader: {err}. Aborting start-up ..."
);
std::process::exit(1);
}
};
let leader_step_down_state = match leadership_status {
LeadershipStatus::Candidate => {
if let Some(ref leader) = leader {
this.request_step_down(leader).await
} else {
tracing::info!(
"No leader found to request step down from. Will build observed state."
);
None
}
}
let peer_observed_state = match leadership_status {
LeadershipStatus::Candidate => this.request_step_down().await,
LeadershipStatus::Leader => None,
LeadershipStatus::SteppedDown => unreachable!(),
};
this.startup_reconcile(leader, leader_step_down_state, bg_compute_notify_result_tx)
this.startup_reconcile(peer_observed_state, bg_compute_notify_result_tx)
.await;
drop(startup_completion);
@@ -4664,10 +4650,6 @@ impl Service {
))
}
pub(crate) async fn get_leader(&self) -> DatabaseResult<Option<ControllerPersistence>> {
self.persistence.get_leader().await
}
pub(crate) async fn node_register(
&self,
register_req: NodeRegisterRequest,
@@ -6360,7 +6342,6 @@ impl Service {
pub(crate) async fn step_down(&self) -> GlobalObservedState {
tracing::info!("Received step down request from peer");
failpoint_support::sleep_millis_async!("sleep-on-step-down-handling");
self.inner.write().unwrap().step_down();
// TODO: would it make sense to have a time-out for this?
@@ -6386,31 +6367,50 @@ impl Service {
///
/// On failures to query the database or step down error responses the process is killed
/// and we rely on k8s to retry.
async fn request_step_down(
&self,
leader: &ControllerPersistence,
) -> Option<GlobalObservedState> {
tracing::info!("Sending step down request to {leader:?}");
// TODO: jwt token
let client = PeerClient::new(
Uri::try_from(leader.address.as_str()).expect("Failed to build leader URI"),
self.config.jwt_token.clone(),
);
let state = client.step_down(&self.cancel).await;
match state {
Ok(state) => Some(state),
async fn request_step_down(&self) -> Option<LeaderStepDownState> {
let leader = match self.persistence.get_leader().await {
Ok(leader) => leader,
Err(err) => {
// TODO: Make leaders periodically update a timestamp field in the
// database and, if the leader is not reachable from the current instance,
// but inferred as alive from the timestamp, abort start-up. This avoids
// a potential scenario in which we have two controllers acting as leaders.
tracing::error!(
"Leader ({}) did not respond to step-down request: {}",
leader.address,
err
"Failed to query database for current leader: {err}. Aborting start-up ..."
);
std::process::exit(1);
}
};
match leader {
Some(leader) => {
tracing::info!("Sending step down request to {leader:?}");
// TODO: jwt token
let client = PeerClient::new(
Uri::try_from(leader.address.as_str()).expect("Failed to build leader URI"),
self.config.jwt_token.clone(),
);
let state = client.step_down(&self.cancel).await;
match state {
Ok(state) => Some(LeaderStepDownState {
observed: state,
leader: leader.clone(),
}),
Err(err) => {
// TODO: Make leaders periodically update a timestamp field in the
// database and, if the leader is not reachable from the current instance,
// but inferred as alive from the timestamp, abort start-up. This avoids
// a potential scenario in which we have two controllers acting as leaders.
tracing::error!(
"Leader ({}) did not respond to step-down request: {}",
leader.address,
err
);
None
}
}
}
None => {
tracing::info!(
"No leader found to request step down from. Will build observed state."
);
None
}
}

View File

@@ -1,10 +1,10 @@
use std::collections::{HashMap, HashSet};
use anyhow::Context;
use aws_sdk_s3::Client;
use pageserver::tenant::layer_map::LayerMap;
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
use pageserver_api::shard::ShardIndex;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
use utils::generation::Generation;
use utils::id::TimelineId;
@@ -16,7 +16,7 @@ use futures_util::StreamExt;
use pageserver::tenant::remote_timeline_client::{parse_remote_index_path, remote_layer_path};
use pageserver::tenant::storage_layer::LayerName;
use pageserver::tenant::IndexPart;
use remote_storage::{GenericRemoteStorage, ListingObject, RemotePath};
use remote_storage::RemotePath;
pub(crate) struct TimelineAnalysis {
/// Anomalies detected
@@ -48,12 +48,13 @@ impl TimelineAnalysis {
}
pub(crate) async fn branch_cleanup_and_check_errors(
remote_client: &GenericRemoteStorage,
s3_client: &Client,
target: &RootTarget,
id: &TenantShardTimelineId,
tenant_objects: &mut TenantObjectListing,
s3_active_branch: Option<&BranchData>,
console_branch: Option<BranchData>,
s3_data: Option<RemoteTimelineBlobData>,
s3_data: Option<S3TimelineBlobData>,
) -> TimelineAnalysis {
let mut result = TimelineAnalysis::new();
@@ -77,9 +78,7 @@ pub(crate) async fn branch_cleanup_and_check_errors(
match s3_data {
Some(s3_data) => {
result
.garbage_keys
.extend(s3_data.unknown_keys.into_iter().map(|k| k.key.to_string()));
result.garbage_keys.extend(s3_data.unknown_keys);
match s3_data.blob_data {
BlobDataParseResult::Parsed {
@@ -144,8 +143,11 @@ pub(crate) async fn branch_cleanup_and_check_errors(
// HEAD request used here to address a race condition when an index was uploaded concurrently
// with our scan. We check if the object is uploaded to S3 after taking the listing snapshot.
let response = remote_client
.head_object(&path, &CancellationToken::new())
let response = s3_client
.head_object()
.bucket(target.bucket_name())
.key(path.get_path().as_str())
.send()
.await;
if response.is_err() {
@@ -282,14 +284,14 @@ impl TenantObjectListing {
}
#[derive(Debug)]
pub(crate) struct RemoteTimelineBlobData {
pub(crate) struct S3TimelineBlobData {
pub(crate) blob_data: BlobDataParseResult,
// Index objects that were not used when loading `blob_data`, e.g. those from old generations
pub(crate) unused_index_keys: Vec<ListingObject>,
pub(crate) unused_index_keys: Vec<String>,
// Objects whose keys were not recognized at all, i.e. not layer files, not indices
pub(crate) unknown_keys: Vec<ListingObject>,
pub(crate) unknown_keys: Vec<String>,
}
#[derive(Debug)]
@@ -321,37 +323,31 @@ pub(crate) fn parse_layer_object_name(name: &str) -> Result<(LayerName, Generati
}
pub(crate) async fn list_timeline_blobs(
remote_client: &GenericRemoteStorage,
s3_client: &Client,
id: TenantShardTimelineId,
root_target: &RootTarget,
) -> anyhow::Result<RemoteTimelineBlobData> {
s3_root: &RootTarget,
) -> anyhow::Result<S3TimelineBlobData> {
let mut s3_layers = HashSet::new();
let mut errors = Vec::new();
let mut unknown_keys = Vec::new();
let mut timeline_dir_target = root_target.timeline_root(&id);
let mut timeline_dir_target = s3_root.timeline_root(&id);
timeline_dir_target.delimiter = String::new();
let mut index_part_keys: Vec<ListingObject> = Vec::new();
let mut index_part_keys: Vec<String> = Vec::new();
let mut initdb_archive: bool = false;
let prefix_str = &timeline_dir_target
.prefix_in_bucket
.strip_prefix("/")
.unwrap_or(&timeline_dir_target.prefix_in_bucket);
let mut stream = std::pin::pin!(stream_listing(remote_client, &timeline_dir_target));
let mut stream = std::pin::pin!(stream_listing(s3_client, &timeline_dir_target));
while let Some(obj) = stream.next().await {
let (key, Some(obj)) = obj? else {
panic!("ListingObject not specified");
};
let obj = obj?;
let key = obj.key();
let blob_name = key.get_path().as_str().strip_prefix(prefix_str);
let blob_name = key.strip_prefix(&timeline_dir_target.prefix_in_bucket);
match blob_name {
Some(name) if name.starts_with("index_part.json") => {
tracing::debug!("Index key {key}");
index_part_keys.push(obj)
index_part_keys.push(key.to_owned())
}
Some("initdb.tar.zst") => {
tracing::debug!("initdb archive {key}");
@@ -362,7 +358,7 @@ pub(crate) async fn list_timeline_blobs(
}
Some(maybe_layer_name) => match parse_layer_object_name(maybe_layer_name) {
Ok((new_layer, gen)) => {
tracing::debug!("Parsed layer key: {new_layer} {gen:?}");
tracing::debug!("Parsed layer key: {} {:?}", new_layer, gen);
s3_layers.insert((new_layer, gen));
}
Err(e) => {
@@ -370,13 +366,13 @@ pub(crate) async fn list_timeline_blobs(
errors.push(
format!("S3 list response got an object with key {key} that is not a layer name: {e}"),
);
unknown_keys.push(obj);
unknown_keys.push(key.to_string());
}
},
None => {
tracing::warn!("Unknown key {key}");
tracing::warn!("Unknown key {}", key);
errors.push(format!("S3 list response got an object with odd key {key}"));
unknown_keys.push(obj);
unknown_keys.push(key.to_string());
}
}
}
@@ -385,7 +381,7 @@ pub(crate) async fn list_timeline_blobs(
tracing::debug!(
"Timeline is empty apart from initdb archive: expected post-deletion state."
);
return Ok(RemoteTimelineBlobData {
return Ok(S3TimelineBlobData {
blob_data: BlobDataParseResult::Relic,
unused_index_keys: index_part_keys,
unknown_keys: Vec::new(),
@@ -399,13 +395,13 @@ pub(crate) async fn list_timeline_blobs(
// Stripping the index key to the last part, because RemotePath doesn't
// like absolute paths, and depending on prefix_in_bucket it's possible
// for the keys we read back to start with a slash.
let basename = key.key.get_path().as_str().rsplit_once('/').unwrap().1;
let basename = key.rsplit_once('/').unwrap().1;
parse_remote_index_path(RemotePath::from_string(basename).unwrap()).map(|g| (key, g))
})
.max_by_key(|i| i.1)
.map(|(k, g)| (k.clone(), g))
{
Some((key, gen)) => (Some::<ListingObject>(key.to_owned()), gen),
Some((key, gen)) => (Some(key), gen),
None => {
// Legacy/missing case: one or zero index parts, which did not have a generation
(index_part_keys.pop(), Generation::none())
@@ -420,14 +416,17 @@ pub(crate) async fn list_timeline_blobs(
}
if let Some(index_part_object_key) = index_part_object.as_ref() {
let index_part_bytes =
download_object_with_retries(remote_client, &index_part_object_key.key)
.await
.context("index_part.json download")?;
let index_part_bytes = download_object_with_retries(
s3_client,
&timeline_dir_target.bucket_name,
index_part_object_key,
)
.await
.context("index_part.json download")?;
match serde_json::from_slice(&index_part_bytes) {
Ok(index_part) => {
return Ok(RemoteTimelineBlobData {
return Ok(S3TimelineBlobData {
blob_data: BlobDataParseResult::Parsed {
index_part: Box::new(index_part),
index_part_generation,
@@ -449,7 +448,7 @@ pub(crate) async fn list_timeline_blobs(
);
}
Ok(RemoteTimelineBlobData {
Ok(S3TimelineBlobData {
blob_data: BlobDataParseResult::Incorrect { errors, s3_layers },
unused_index_keys: index_part_keys,
unknown_keys,

View File

@@ -6,7 +6,7 @@ use remote_storage::ListingMode;
use serde::{Deserialize, Serialize};
use crate::{
checks::parse_layer_object_name, init_remote, metadata_stream::stream_tenants,
checks::parse_layer_object_name, init_remote_generic, metadata_stream::stream_tenants_generic,
stream_objects_with_retries, BucketConfig, NodeKind,
};
@@ -50,8 +50,9 @@ pub async fn find_large_objects(
ignore_deltas: bool,
concurrency: usize,
) -> anyhow::Result<LargeObjectListing> {
let (remote_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
let tenants = pin!(stream_tenants(&remote_client, &target));
let (remote_client, target) =
init_remote_generic(bucket_config.clone(), NodeKind::Pageserver).await?;
let tenants = pin!(stream_tenants_generic(&remote_client, &target));
let objects_stream = tenants.map_ok(|tenant_shard_id| {
let mut tenant_root = target.tenant_root(&tenant_shard_id);

View File

@@ -19,8 +19,8 @@ use utils::id::TenantId;
use crate::{
cloud_admin_api::{CloudAdminApiClient, MaybeDeleted, ProjectData},
init_remote, list_objects_with_retries,
metadata_stream::{stream_tenant_timelines, stream_tenants},
init_remote_generic, list_objects_with_retries_generic,
metadata_stream::{stream_tenant_timelines_generic, stream_tenants_generic},
BucketConfig, ConsoleConfig, NodeKind, TenantShardTimelineId, TraversingDepth,
};
@@ -153,7 +153,7 @@ async fn find_garbage_inner(
node_kind: NodeKind,
) -> anyhow::Result<GarbageList> {
// Construct clients for S3 and for Console API
let (remote_client, target) = init_remote(bucket_config.clone(), node_kind).await?;
let (remote_client, target) = init_remote_generic(bucket_config.clone(), node_kind).await?;
let cloud_admin_api_client = Arc::new(CloudAdminApiClient::new(console_config));
// Build a set of console-known tenants, for quickly eliminating known-active tenants without having
@@ -179,7 +179,7 @@ async fn find_garbage_inner(
// Enumerate Tenants in S3, and check if each one exists in Console
tracing::info!("Finding all tenants in bucket {}...", bucket_config.bucket);
let tenants = stream_tenants(&remote_client, &target);
let tenants = stream_tenants_generic(&remote_client, &target);
let tenants_checked = tenants.map_ok(|t| {
let api_client = cloud_admin_api_client.clone();
let console_cache = console_cache.clone();
@@ -237,13 +237,14 @@ async fn find_garbage_inner(
// Special case: If it's missing in console, check for known bugs that would enable us to conclusively
// identify it as purge-able anyway
if console_result.is_none() {
let timelines = stream_tenant_timelines(&remote_client, &target, tenant_shard_id)
.await?
.collect::<Vec<_>>()
.await;
let timelines =
stream_tenant_timelines_generic(&remote_client, &target, tenant_shard_id)
.await?
.collect::<Vec<_>>()
.await;
if timelines.is_empty() {
// No timelines, but a heatmap: the deletion bug where we deleted everything but heatmaps
let tenant_objects = list_objects_with_retries(
let tenant_objects = list_objects_with_retries_generic(
&remote_client,
ListingMode::WithDelimiter,
&target.tenant_root(&tenant_shard_id),
@@ -264,7 +265,7 @@ async fn find_garbage_inner(
for timeline_r in timelines {
let timeline = timeline_r?;
let timeline_objects = list_objects_with_retries(
let timeline_objects = list_objects_with_retries_generic(
&remote_client,
ListingMode::WithDelimiter,
&target.timeline_root(&timeline),
@@ -330,7 +331,8 @@ async fn find_garbage_inner(
// Construct a stream of all timelines within active tenants
let active_tenants = tokio_stream::iter(active_tenants.iter().map(Ok));
let timelines = active_tenants.map_ok(|t| stream_tenant_timelines(&remote_client, &target, *t));
let timelines =
active_tenants.map_ok(|t| stream_tenant_timelines_generic(&remote_client, &target, *t));
let timelines = timelines.try_buffer_unordered(S3_CONCURRENCY);
let timelines = timelines.try_flatten();
@@ -505,7 +507,7 @@ pub async fn purge_garbage(
);
let (remote_client, _target) =
init_remote(garbage_list.bucket_config.clone(), garbage_list.node_kind).await?;
init_remote_generic(garbage_list.bucket_config.clone(), garbage_list.node_kind).await?;
assert_eq!(
&garbage_list.bucket_config.bucket,

View File

@@ -15,7 +15,7 @@ use std::fmt::Display;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Context;
use anyhow::{anyhow, Context};
use aws_config::retry::{RetryConfigBuilder, RetryMode};
use aws_sdk_s3::config::Region;
use aws_sdk_s3::error::DisplayErrorContext;
@@ -352,7 +352,7 @@ fn make_root_target(
}
}
async fn init_remote_s3(
async fn init_remote(
bucket_config: BucketConfig,
node_kind: NodeKind,
) -> anyhow::Result<(Arc<Client>, RootTarget)> {
@@ -369,7 +369,7 @@ async fn init_remote_s3(
Ok((s3_client, s3_root))
}
async fn init_remote(
async fn init_remote_generic(
bucket_config: BucketConfig,
node_kind: NodeKind,
) -> anyhow::Result<(GenericRemoteStorage, RootTarget)> {
@@ -394,10 +394,45 @@ async fn init_remote(
// We already pass the prefix to the remote client above
let prefix_in_root_target = String::new();
let root_target = make_root_target(bucket_config.bucket, prefix_in_root_target, node_kind);
let s3_root = make_root_target(bucket_config.bucket, prefix_in_root_target, node_kind);
let client = GenericRemoteStorage::from_config(&storage_config).await?;
Ok((client, root_target))
Ok((client, s3_root))
}
async fn list_objects_with_retries(
s3_client: &Client,
s3_target: &S3Target,
continuation_token: Option<String>,
) -> anyhow::Result<aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Output> {
for trial in 0..MAX_RETRIES {
match s3_client
.list_objects_v2()
.bucket(&s3_target.bucket_name)
.prefix(&s3_target.prefix_in_bucket)
.delimiter(&s3_target.delimiter)
.set_continuation_token(continuation_token.clone())
.send()
.await
{
Ok(response) => return Ok(response),
Err(e) => {
if trial == MAX_RETRIES - 1 {
return Err(e)
.with_context(|| format!("Failed to list objects {MAX_RETRIES} times"));
}
error!(
"list_objects_v2 query failed: bucket_name={}, prefix={}, delimiter={}, error={}",
s3_target.bucket_name,
s3_target.prefix_in_bucket,
s3_target.delimiter,
DisplayErrorContext(e),
);
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
Err(anyhow!("unreachable unless MAX_RETRIES==0"))
}
/// Listing possibly large amounts of keys in a streaming fashion.
@@ -417,26 +452,23 @@ fn stream_objects_with_retries<'a>(
let mut list_stream =
storage_client.list_streaming(Some(&prefix), listing_mode, None, &cancel);
while let Some(res) = list_stream.next().await {
match res {
Err(err) => {
let yield_err = if err.is_permanent() {
true
} else {
let backoff_time = 1 << trial.max(5);
tokio::time::sleep(Duration::from_secs(backoff_time)).await;
trial += 1;
trial == MAX_RETRIES - 1
};
if yield_err {
yield Err(err)
.with_context(|| format!("Failed to list objects {MAX_RETRIES} times"));
break;
}
}
Ok(res) => {
trial = 0;
yield Ok(res);
if let Err(err) = res {
let yield_err = if err.is_permanent() {
true
} else {
let backoff_time = 1 << trial.max(5);
tokio::time::sleep(Duration::from_secs(backoff_time)).await;
trial += 1;
trial == MAX_RETRIES - 1
};
if yield_err {
yield Err(err)
.with_context(|| format!("Failed to list objects {MAX_RETRIES} times"));
break;
}
} else {
trial = 0;
yield res.map_err(anyhow::Error::from);
}
}
}
@@ -444,7 +476,7 @@ fn stream_objects_with_retries<'a>(
/// If you want to list a bounded amount of prefixes or keys. For larger numbers of keys/prefixes,
/// use [`stream_objects_with_retries`] instead.
async fn list_objects_with_retries(
async fn list_objects_with_retries_generic(
remote_client: &GenericRemoteStorage,
listing_mode: ListingMode,
s3_target: &S3Target,
@@ -482,34 +514,40 @@ async fn list_objects_with_retries(
}
async fn download_object_with_retries(
remote_client: &GenericRemoteStorage,
key: &RemotePath,
s3_client: &Client,
bucket_name: &str,
key: &str,
) -> anyhow::Result<Vec<u8>> {
let cancel = CancellationToken::new();
for trial in 0..MAX_RETRIES {
let mut buf = Vec::new();
let download = match remote_client.download(key, &cancel).await {
for _ in 0..MAX_RETRIES {
let mut body_buf = Vec::new();
let response_stream = match s3_client
.get_object()
.bucket(bucket_name)
.key(key)
.send()
.await
{
Ok(response) => response,
Err(e) => {
error!("Failed to download object for key {key}: {e}");
let backoff_time = 1 << trial.max(5);
tokio::time::sleep(Duration::from_secs(backoff_time)).await;
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
};
match tokio_util::io::StreamReader::new(download.download_stream)
.read_to_end(&mut buf)
match response_stream
.body
.into_async_read()
.read_to_end(&mut body_buf)
.await
{
Ok(bytes_read) => {
tracing::debug!("Downloaded {bytes_read} bytes for object {key}");
return Ok(buf);
return Ok(body_buf);
}
Err(e) => {
error!("Failed to stream object body for key {key}: {e}");
let backoff_time = 1 << trial.max(5);
tokio::time::sleep(Duration::from_secs(backoff_time)).await;
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
@@ -517,7 +555,7 @@ async fn download_object_with_retries(
anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times")
}
async fn download_object_to_file_s3(
async fn download_object_to_file(
s3_client: &Client,
bucket_name: &str,
key: &str,

View File

@@ -3,10 +3,9 @@ use camino::Utf8PathBuf;
use pageserver_api::controller_api::{MetadataHealthUpdateRequest, MetadataHealthUpdateResponse};
use pageserver_api::shard::TenantShardId;
use reqwest::{Method, Url};
use storage_controller_client::control_api;
use storage_scrubber::garbage::{find_garbage, purge_garbage, PurgeMode};
use storage_scrubber::pageserver_physical_gc::GcMode;
use storage_scrubber::scan_pageserver_metadata::scan_pageserver_metadata;
use storage_scrubber::scan_pageserver_metadata::scan_metadata;
use storage_scrubber::tenant_snapshot::SnapshotDownloader;
use storage_scrubber::{find_large_objects, ControllerClientConfig};
use storage_scrubber::{
@@ -69,7 +68,7 @@ enum Command {
#[arg(long = "tenant-id", num_args = 0..)]
tenant_ids: Vec<TenantShardId>,
#[arg(long = "post", default_value_t = false)]
post_to_storcon: bool,
post_to_storage_controller: bool,
#[arg(long, default_value = None)]
/// For safekeeper node_kind only, points to db with debug dump
dump_db_connstr: Option<String>,
@@ -101,16 +100,6 @@ enum Command {
#[arg(long = "concurrency", short = 'j', default_value_t = 64)]
concurrency: usize,
},
CronJob {
// PageserverPhysicalGc
#[arg(long = "min-age")]
gc_min_age: humantime::Duration,
#[arg(short, long, default_value_t = GcMode::IndicesOnly)]
gc_mode: GcMode,
// ScanMetadata
#[arg(long = "post", default_value_t = false)]
post_to_storcon: bool,
},
}
#[tokio::main]
@@ -128,7 +117,6 @@ async fn main() -> anyhow::Result<()> {
Command::TenantSnapshot { .. } => "tenant-snapshot",
Command::PageserverPhysicalGc { .. } => "pageserver-physical-gc",
Command::FindLargeObjects { .. } => "find-large-objects",
Command::CronJob { .. } => "cron-job",
};
let _guard = init_logging(&format!(
"{}_{}_{}_{}.log",
@@ -138,13 +126,12 @@ async fn main() -> anyhow::Result<()> {
chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S")
));
let controller_client = cli.controller_api.map(|controller_api| {
let controller_client_conf = cli.controller_api.map(|controller_api| {
ControllerClientConfig {
controller_api,
// Default to no key: this is a convenience when working in a development environment
controller_jwt: cli.controller_jwt.unwrap_or("".to_owned()),
}
.build_client()
});
match cli.command {
@@ -152,7 +139,7 @@ async fn main() -> anyhow::Result<()> {
json,
tenant_ids,
node_kind,
post_to_storcon,
post_to_storage_controller,
dump_db_connstr,
dump_db_table,
} => {
@@ -191,14 +178,53 @@ async fn main() -> anyhow::Result<()> {
}
Ok(())
} else {
scan_pageserver_metadata_cmd(
bucket_config,
controller_client.as_ref(),
tenant_ids,
json,
post_to_storcon,
)
.await
if controller_client_conf.is_none() && post_to_storage_controller {
return Err(anyhow!("Posting pageserver scan health status to storage controller requires `--controller-api` and `--controller-jwt` to run"));
}
match scan_metadata(bucket_config.clone(), tenant_ids).await {
Err(e) => {
tracing::error!("Failed: {e}");
Err(e)
}
Ok(summary) => {
if json {
println!("{}", serde_json::to_string(&summary).unwrap())
} else {
println!("{}", summary.summary_string());
}
if post_to_storage_controller {
if let Some(conf) = controller_client_conf {
let controller_client = conf.build_client();
let body = summary.build_health_update_request();
controller_client
.dispatch::<MetadataHealthUpdateRequest, MetadataHealthUpdateResponse>(
Method::POST,
"control/v1/metadata_health/update".to_string(),
Some(body),
)
.await?;
}
}
if summary.is_fatal() {
tracing::error!("Fatal scrub errors detected");
} else if summary.is_empty() {
// Strictly speaking an empty bucket is a valid bucket, but if someone ran the
// scrubber they were likely expecting to scan something, and if we see no timelines
// at all then it's likely due to some configuration issues like a bad prefix
tracing::error!(
"No timelines found in bucket {} prefix {}",
bucket_config.bucket,
bucket_config
.prefix_in_bucket
.unwrap_or("<none>".to_string())
);
}
Ok(())
}
}
}
}
Command::FindGarbage {
@@ -228,14 +254,31 @@ async fn main() -> anyhow::Result<()> {
min_age,
mode,
} => {
pageserver_physical_gc_cmd(
&bucket_config,
controller_client.as_ref(),
match (&controller_client_conf, mode) {
(Some(_), _) => {
// Any mode may run when controller API is set
}
(None, GcMode::Full) => {
// The part of physical GC where we erase ancestor layers cannot be done safely without
// confirming the most recent complete shard split with the controller. Refuse to run, rather
// than doing it unsafely.
return Err(anyhow!("Full physical GC requires `--controller-api` and `--controller-jwt` to run"));
}
(None, GcMode::DryRun | GcMode::IndicesOnly) => {
// These GcModes do not require the controller to run.
}
}
let summary = pageserver_physical_gc(
bucket_config,
controller_client_conf,
tenant_ids,
min_age,
min_age.into(),
mode,
)
.await
.await?;
println!("{}", serde_json::to_string(&summary).unwrap());
Ok(())
}
Command::FindLargeObjects {
min_size,
@@ -252,142 +295,5 @@ async fn main() -> anyhow::Result<()> {
println!("{}", serde_json::to_string(&summary).unwrap());
Ok(())
}
Command::CronJob {
gc_min_age,
gc_mode,
post_to_storcon,
} => {
run_cron_job(
bucket_config,
controller_client.as_ref(),
gc_min_age,
gc_mode,
post_to_storcon,
)
.await
}
}
}
/// Runs the scrubber cron job.
/// 1. Do pageserver physical gc
/// 2. Scan pageserver metadata
pub async fn run_cron_job(
bucket_config: BucketConfig,
controller_client: Option<&control_api::Client>,
gc_min_age: humantime::Duration,
gc_mode: GcMode,
post_to_storcon: bool,
) -> anyhow::Result<()> {
tracing::info!(%gc_min_age, %gc_mode, "Running pageserver-physical-gc");
pageserver_physical_gc_cmd(
&bucket_config,
controller_client,
Vec::new(),
gc_min_age,
gc_mode,
)
.await?;
tracing::info!(%post_to_storcon, node_kind = %NodeKind::Pageserver, "Running scan-metadata");
scan_pageserver_metadata_cmd(
bucket_config,
controller_client,
Vec::new(),
true,
post_to_storcon,
)
.await?;
Ok(())
}
pub async fn pageserver_physical_gc_cmd(
bucket_config: &BucketConfig,
controller_client: Option<&control_api::Client>,
tenant_shard_ids: Vec<TenantShardId>,
min_age: humantime::Duration,
mode: GcMode,
) -> anyhow::Result<()> {
match (controller_client, mode) {
(Some(_), _) => {
// Any mode may run when controller API is set
}
(None, GcMode::Full) => {
// The part of physical GC where we erase ancestor layers cannot be done safely without
// confirming the most recent complete shard split with the controller. Refuse to run, rather
// than doing it unsafely.
return Err(anyhow!(
"Full physical GC requires `--controller-api` and `--controller-jwt` to run"
));
}
(None, GcMode::DryRun | GcMode::IndicesOnly) => {
// These GcModes do not require the controller to run.
}
}
let summary = pageserver_physical_gc(
bucket_config,
controller_client,
tenant_shard_ids,
min_age.into(),
mode,
)
.await?;
println!("{}", serde_json::to_string(&summary).unwrap());
Ok(())
}
pub async fn scan_pageserver_metadata_cmd(
bucket_config: BucketConfig,
controller_client: Option<&control_api::Client>,
tenant_shard_ids: Vec<TenantShardId>,
json: bool,
post_to_storcon: bool,
) -> anyhow::Result<()> {
if controller_client.is_none() && post_to_storcon {
return Err(anyhow!("Posting pageserver scan health status to storage controller requires `--controller-api` and `--controller-jwt` to run"));
}
match scan_pageserver_metadata(bucket_config.clone(), tenant_shard_ids).await {
Err(e) => {
tracing::error!("Failed: {e}");
Err(e)
}
Ok(summary) => {
if json {
println!("{}", serde_json::to_string(&summary).unwrap())
} else {
println!("{}", summary.summary_string());
}
if post_to_storcon {
if let Some(client) = controller_client {
let body = summary.build_health_update_request();
client
.dispatch::<MetadataHealthUpdateRequest, MetadataHealthUpdateResponse>(
Method::POST,
"control/v1/metadata_health/update".to_string(),
Some(body),
)
.await?;
}
}
if summary.is_fatal() {
tracing::error!("Fatal scrub errors detected");
} else if summary.is_empty() {
// Strictly speaking an empty bucket is a valid bucket, but if someone ran the
// scrubber they were likely expecting to scan something, and if we see no timelines
// at all then it's likely due to some configuration issues like a bad prefix
tracing::error!(
"No timelines found in bucket {} prefix {}",
bucket_config.bucket,
bucket_config
.prefix_in_bucket
.unwrap_or("<none>".to_string())
);
}
Ok(())
}
}
}

View File

@@ -2,6 +2,7 @@ use std::str::FromStr;
use anyhow::{anyhow, Context};
use async_stream::{stream, try_stream};
use aws_sdk_s3::{types::ObjectIdentifier, Client};
use futures::StreamExt;
use remote_storage::{GenericRemoteStorage, ListingMode, ListingObject, RemotePath};
use tokio_stream::Stream;
@@ -14,7 +15,7 @@ use pageserver_api::shard::TenantShardId;
use utils::id::{TenantId, TimelineId};
/// Given a remote storage and a target, output a stream of TenantIds discovered via listing prefixes
pub fn stream_tenants<'a>(
pub fn stream_tenants_generic<'a>(
remote_client: &'a GenericRemoteStorage,
target: &'a RootTarget,
) -> impl Stream<Item = anyhow::Result<TenantShardId>> + 'a {
@@ -35,36 +36,92 @@ pub fn stream_tenants<'a>(
}
}
/// Given an S3 bucket, output a stream of TenantIds discovered via ListObjectsv2
pub fn stream_tenants<'a>(
s3_client: &'a Client,
target: &'a RootTarget,
) -> impl Stream<Item = anyhow::Result<TenantShardId>> + 'a {
try_stream! {
let mut continuation_token = None;
let tenants_target = target.tenants_root();
loop {
let fetch_response =
list_objects_with_retries(s3_client, &tenants_target, continuation_token.clone()).await?;
let new_entry_ids = fetch_response
.common_prefixes()
.iter()
.filter_map(|prefix| prefix.prefix())
.filter_map(|prefix| -> Option<&str> {
prefix
.strip_prefix(&tenants_target.prefix_in_bucket)?
.strip_suffix('/')
}).map(|entry_id_str| {
entry_id_str
.parse()
.with_context(|| format!("Incorrect entry id str: {entry_id_str}"))
});
for i in new_entry_ids {
yield i?;
}
match fetch_response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}
}
}
pub async fn stream_tenant_shards<'a>(
remote_client: &'a GenericRemoteStorage,
s3_client: &'a Client,
target: &'a RootTarget,
tenant_id: TenantId,
) -> anyhow::Result<impl Stream<Item = Result<TenantShardId, anyhow::Error>> + 'a> {
let mut tenant_shard_ids: Vec<Result<TenantShardId, anyhow::Error>> = Vec::new();
let mut continuation_token = None;
let shards_target = target.tenant_shards_prefix(&tenant_id);
let strip_prefix = target.tenants_root().prefix_in_bucket;
let prefix_str = &strip_prefix.strip_prefix("/").unwrap_or(&strip_prefix);
loop {
tracing::info!("Listing in {}", shards_target.prefix_in_bucket);
let fetch_response =
list_objects_with_retries(s3_client, &shards_target, continuation_token.clone()).await;
let fetch_response = match fetch_response {
Err(e) => {
tenant_shard_ids.push(Err(e));
break;
}
Ok(r) => r,
};
tracing::info!("Listing shards in {}", shards_target.prefix_in_bucket);
let listing =
list_objects_with_retries(remote_client, ListingMode::WithDelimiter, &shards_target)
.await?;
let new_entry_ids = fetch_response
.common_prefixes()
.iter()
.filter_map(|prefix| prefix.prefix())
.filter_map(|prefix| -> Option<&str> {
prefix
.strip_prefix(&target.tenants_root().prefix_in_bucket)?
.strip_suffix('/')
})
.map(|entry_id_str| {
let first_part = entry_id_str.split('/').next().unwrap();
let tenant_shard_ids = listing
.prefixes
.iter()
.map(|prefix| prefix.get_path().as_str())
.filter_map(|prefix| -> Option<&str> { prefix.strip_prefix(prefix_str) })
.map(|entry_id_str| {
let first_part = entry_id_str.split('/').next().unwrap();
first_part
.parse::<TenantShardId>()
.with_context(|| format!("Incorrect entry id str: {first_part}"))
});
first_part
.parse::<TenantShardId>()
.with_context(|| format!("Incorrect entry id str: {first_part}"))
})
.collect::<Vec<_>>();
for i in new_entry_ids {
tenant_shard_ids.push(i);
}
match fetch_response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}
tracing::debug!("Yielding {} shards for {tenant_id}", tenant_shard_ids.len());
Ok(stream! {
for i in tenant_shard_ids {
let id = i?;
@@ -73,10 +130,69 @@ pub async fn stream_tenant_shards<'a>(
})
}
/// Given a TenantShardId, output a stream of the timelines within that tenant, discovered
/// using ListObjectsv2. The listing is done before the stream is built, so that this
/// function can be used to generate concurrency on a stream using buffer_unordered.
pub async fn stream_tenant_timelines<'a>(
s3_client: &'a Client,
target: &'a RootTarget,
tenant: TenantShardId,
) -> anyhow::Result<impl Stream<Item = Result<TenantShardTimelineId, anyhow::Error>> + 'a> {
let mut timeline_ids: Vec<Result<TimelineId, anyhow::Error>> = Vec::new();
let mut continuation_token = None;
let timelines_target = target.timelines_root(&tenant);
loop {
tracing::debug!("Listing in {}", tenant);
let fetch_response =
list_objects_with_retries(s3_client, &timelines_target, continuation_token.clone())
.await;
let fetch_response = match fetch_response {
Err(e) => {
timeline_ids.push(Err(e));
break;
}
Ok(r) => r,
};
let new_entry_ids = fetch_response
.common_prefixes()
.iter()
.filter_map(|prefix| prefix.prefix())
.filter_map(|prefix| -> Option<&str> {
prefix
.strip_prefix(&timelines_target.prefix_in_bucket)?
.strip_suffix('/')
})
.map(|entry_id_str| {
entry_id_str
.parse::<TimelineId>()
.with_context(|| format!("Incorrect entry id str: {entry_id_str}"))
});
for i in new_entry_ids {
timeline_ids.push(i);
}
match fetch_response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}
tracing::debug!("Yielding for {}", tenant);
Ok(stream! {
for i in timeline_ids {
let id = i?;
yield Ok(TenantShardTimelineId::new(tenant, id));
}
})
}
/// Given a `TenantShardId`, output a stream of the timelines within that tenant, discovered
/// using a listing. The listing is done before the stream is built, so that this
/// function can be used to generate concurrency on a stream using buffer_unordered.
pub async fn stream_tenant_timelines<'a>(
pub async fn stream_tenant_timelines_generic<'a>(
remote_client: &'a GenericRemoteStorage,
target: &'a RootTarget,
tenant: TenantShardId,
@@ -84,11 +200,6 @@ pub async fn stream_tenant_timelines<'a>(
let mut timeline_ids: Vec<Result<TimelineId, anyhow::Error>> = Vec::new();
let timelines_target = target.timelines_root(&tenant);
let prefix_str = &timelines_target
.prefix_in_bucket
.strip_prefix("/")
.unwrap_or(&timelines_target.prefix_in_bucket);
let mut objects_stream = std::pin::pin!(stream_objects_with_retries(
remote_client,
ListingMode::WithDelimiter,
@@ -109,7 +220,11 @@ pub async fn stream_tenant_timelines<'a>(
.prefixes
.iter()
.filter_map(|prefix| -> Option<&str> {
prefix.get_path().as_str().strip_prefix(prefix_str)
prefix
.get_path()
.as_str()
.strip_prefix(&timelines_target.prefix_in_bucket)?
.strip_suffix('/')
})
.map(|entry_id_str| {
entry_id_str
@@ -122,7 +237,7 @@ pub async fn stream_tenant_timelines<'a>(
}
}
tracing::debug!("Yielding {} timelines for {}", timeline_ids.len(), tenant);
tracing::debug!("Yielding for {}", tenant);
Ok(stream! {
for i in timeline_ids {
let id = i?;
@@ -132,6 +247,37 @@ pub async fn stream_tenant_timelines<'a>(
}
pub(crate) fn stream_listing<'a>(
s3_client: &'a Client,
target: &'a S3Target,
) -> impl Stream<Item = anyhow::Result<ObjectIdentifier>> + 'a {
try_stream! {
let mut continuation_token = None;
loop {
let fetch_response =
list_objects_with_retries(s3_client, target, continuation_token.clone()).await?;
if target.delimiter.is_empty() {
for object_key in fetch_response.contents().iter().filter_map(|object| object.key())
{
let object_id = ObjectIdentifier::builder().key(object_key).build()?;
yield object_id;
}
} else {
for prefix in fetch_response.common_prefixes().iter().filter_map(|p| p.prefix()) {
let object_id = ObjectIdentifier::builder().key(prefix).build()?;
yield object_id;
}
}
match fetch_response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}
}
}
pub(crate) fn stream_listing_generic<'a>(
remote_client: &'a GenericRemoteStorage,
target: &'a S3Target,
) -> impl Stream<Item = anyhow::Result<(RemotePath, Option<ListingObject>)>> + 'a {

View File

@@ -1,10 +1,13 @@
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, SystemTime};
use crate::checks::{list_timeline_blobs, BlobDataParseResult};
use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId};
use crate::{
init_remote, BucketConfig, ControllerClientConfig, NodeKind, RootTarget, TenantShardTimelineId,
};
use aws_sdk_s3::Client;
use futures_util::{StreamExt, TryStreamExt};
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
use pageserver::tenant::remote_timeline_client::{parse_remote_index_path, remote_layer_path};
@@ -12,11 +15,10 @@ use pageserver::tenant::storage_layer::LayerName;
use pageserver::tenant::IndexPart;
use pageserver_api::controller_api::TenantDescribeResponse;
use pageserver_api::shard::{ShardIndex, TenantShardId};
use remote_storage::{GenericRemoteStorage, ListingObject, RemotePath};
use remote_storage::RemotePath;
use reqwest::Method;
use serde::Serialize;
use storage_controller_client::control_api;
use tokio_util::sync::CancellationToken;
use tracing::{info_span, Instrument};
use utils::generation::Generation;
use utils::id::{TenantId, TenantTimelineId};
@@ -240,13 +242,38 @@ impl TenantRefAccumulator {
}
}
fn is_old_enough(min_age: &Duration, key: &ListingObject, summary: &mut GcSummary) -> bool {
async fn is_old_enough(
s3_client: &Client,
bucket_config: &BucketConfig,
min_age: &Duration,
key: &str,
summary: &mut GcSummary,
) -> bool {
// Validation: we will only GC indices & layers after a time threshold (e.g. one week) so that during an incident
// it is easier to read old data for analysis, and easier to roll back shard splits without having to un-delete any objects.
let age = match key.last_modified.elapsed() {
Ok(e) => e,
Err(_) => {
tracing::warn!("Bad last_modified time: {:?}", key.last_modified);
let age: Duration = match s3_client
.head_object()
.bucket(&bucket_config.bucket)
.key(key)
.send()
.await
{
Ok(response) => match response.last_modified {
None => {
tracing::warn!("Missing last_modified");
summary.remote_storage_errors += 1;
return false;
}
Some(last_modified) => match SystemTime::try_from(last_modified).map(|t| t.elapsed()) {
Ok(Ok(e)) => e,
Err(_) | Ok(Err(_)) => {
tracing::warn!("Bad last_modified time: {last_modified:?}");
return false;
}
},
},
Err(e) => {
tracing::warn!("Failed to HEAD {key}: {e}");
summary.remote_storage_errors += 1;
return false;
}
@@ -264,30 +291,17 @@ fn is_old_enough(min_age: &Duration, key: &ListingObject, summary: &mut GcSummar
old_enough
}
/// Same as [`is_old_enough`], but doesn't require a [`ListingObject`] passed to it.
async fn check_is_old_enough(
remote_client: &GenericRemoteStorage,
key: &RemotePath,
min_age: &Duration,
summary: &mut GcSummary,
) -> Option<bool> {
let listing_object = remote_client
.head_object(key, &CancellationToken::new())
.await
.ok()?;
Some(is_old_enough(min_age, &listing_object, summary))
}
async fn maybe_delete_index(
remote_client: &GenericRemoteStorage,
s3_client: &Client,
bucket_config: &BucketConfig,
min_age: &Duration,
latest_gen: Generation,
obj: &ListingObject,
key: &str,
mode: GcMode,
summary: &mut GcSummary,
) {
// Validation: we will only delete things that parse cleanly
let basename = obj.key.get_path().file_name().unwrap();
let basename = key.rsplit_once('/').unwrap().1;
let candidate_generation =
match parse_remote_index_path(RemotePath::from_string(basename).unwrap()) {
Some(g) => g,
@@ -316,7 +330,7 @@ async fn maybe_delete_index(
return;
}
if !is_old_enough(min_age, obj, summary) {
if !is_old_enough(s3_client, bucket_config, min_age, key, summary).await {
return;
}
@@ -326,8 +340,11 @@ async fn maybe_delete_index(
}
// All validations passed: erase the object
match remote_client
.delete(&obj.key, &CancellationToken::new())
match s3_client
.delete_object()
.bucket(&bucket_config.bucket)
.key(key)
.send()
.await
{
Ok(_) => {
@@ -343,7 +360,8 @@ async fn maybe_delete_index(
#[allow(clippy::too_many_arguments)]
async fn gc_ancestor(
remote_client: &GenericRemoteStorage,
s3_client: &Client,
bucket_config: &BucketConfig,
root_target: &RootTarget,
min_age: &Duration,
ancestor: TenantShardId,
@@ -352,7 +370,7 @@ async fn gc_ancestor(
summary: &mut GcSummary,
) -> anyhow::Result<()> {
// Scan timelines in the ancestor
let timelines = stream_tenant_timelines(remote_client, root_target, ancestor).await?;
let timelines = stream_tenant_timelines(s3_client, root_target, ancestor).await?;
let mut timelines = std::pin::pin!(timelines);
// Build a list of keys to retain
@@ -360,7 +378,7 @@ async fn gc_ancestor(
while let Some(ttid) = timelines.next().await {
let ttid = ttid?;
let data = list_timeline_blobs(remote_client, ttid, root_target).await?;
let data = list_timeline_blobs(s3_client, ttid, root_target).await?;
let s3_layers = match data.blob_data {
BlobDataParseResult::Parsed {
@@ -411,8 +429,7 @@ async fn gc_ancestor(
// We apply a time threshold to GCing objects that are un-referenced: this preserves our ability
// to roll back a shard split if we have to, by avoiding deleting ancestor layers right away
let path = RemotePath::from_string(key.strip_prefix("/").unwrap_or(&key)).unwrap();
if check_is_old_enough(remote_client, &path, min_age, summary).await != Some(true) {
if !is_old_enough(s3_client, bucket_config, min_age, &key, summary).await {
continue;
}
@@ -422,7 +439,13 @@ async fn gc_ancestor(
}
// All validations passed: erase the object
match remote_client.delete(&path, &CancellationToken::new()).await {
match s3_client
.delete_object()
.bucket(&bucket_config.bucket)
.key(&key)
.send()
.await
{
Ok(_) => {
tracing::info!("Successfully deleted unreferenced ancestor layer {key}");
summary.ancestor_layers_deleted += 1;
@@ -450,16 +473,16 @@ async fn gc_ancestor(
/// This type of GC is not necessary for correctness: rather it serves to reduce wasted storage capacity, and
/// make sure that object listings don't get slowed down by large numbers of garbage objects.
pub async fn pageserver_physical_gc(
bucket_config: &BucketConfig,
controller_client: Option<&control_api::Client>,
bucket_config: BucketConfig,
controller_client_conf: Option<ControllerClientConfig>,
tenant_shard_ids: Vec<TenantShardId>,
min_age: Duration,
mode: GcMode,
) -> anyhow::Result<GcSummary> {
let (remote_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
let tenants = if tenant_shard_ids.is_empty() {
futures::future::Either::Left(stream_tenants(&remote_client, &target))
futures::future::Either::Left(stream_tenants(&s3_client, &target))
} else {
futures::future::Either::Right(futures::stream::iter(tenant_shard_ids.into_iter().map(Ok)))
};
@@ -472,13 +495,14 @@ pub async fn pageserver_physical_gc(
let accumulator = Arc::new(std::sync::Mutex::new(TenantRefAccumulator::default()));
// Generate a stream of TenantTimelineId
let timelines = tenants.map_ok(|t| stream_tenant_timelines(&remote_client, &target, t));
let timelines = tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, t));
let timelines = timelines.try_buffered(CONCURRENCY);
let timelines = timelines.try_flatten();
// Generate a stream of S3TimelineBlobData
async fn gc_timeline(
remote_client: &GenericRemoteStorage,
s3_client: &Client,
bucket_config: &BucketConfig,
min_age: &Duration,
target: &RootTarget,
mode: GcMode,
@@ -486,7 +510,7 @@ pub async fn pageserver_physical_gc(
accumulator: &Arc<std::sync::Mutex<TenantRefAccumulator>>,
) -> anyhow::Result<GcSummary> {
let mut summary = GcSummary::default();
let data = list_timeline_blobs(remote_client, ttid, target).await?;
let data = list_timeline_blobs(s3_client, ttid, target).await?;
let (index_part, latest_gen, candidates) = match &data.blob_data {
BlobDataParseResult::Parsed {
@@ -511,9 +535,17 @@ pub async fn pageserver_physical_gc(
accumulator.lock().unwrap().update(ttid, index_part);
for key in candidates {
maybe_delete_index(remote_client, min_age, latest_gen, &key, mode, &mut summary)
.instrument(info_span!("maybe_delete_index", %ttid, ?latest_gen, %key.key))
.await;
maybe_delete_index(
s3_client,
bucket_config,
min_age,
latest_gen,
&key,
mode,
&mut summary,
)
.instrument(info_span!("maybe_delete_index", %ttid, ?latest_gen, key))
.await;
}
Ok(summary)
@@ -524,7 +556,15 @@ pub async fn pageserver_physical_gc(
// Drain futures for per-shard GC, populating accumulator as a side effect
{
let timelines = timelines.map_ok(|ttid| {
gc_timeline(&remote_client, &min_age, &target, mode, ttid, &accumulator)
gc_timeline(
&s3_client,
&bucket_config,
&min_age,
&target,
mode,
ttid,
&accumulator,
)
});
let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));
@@ -534,7 +574,7 @@ pub async fn pageserver_physical_gc(
}
// Execute cross-shard GC, using the accumulator's full view of all the shards built in the per-shard GC
let Some(client) = controller_client else {
let Some(controller_client) = controller_client_conf.map(|c| c.build_client()) else {
tracing::info!("Skipping ancestor layer GC, because no `--controller-api` was specified");
return Ok(summary);
};
@@ -543,12 +583,13 @@ pub async fn pageserver_physical_gc(
.unwrap()
.into_inner()
.unwrap()
.into_gc_ancestors(client, &mut summary)
.into_gc_ancestors(&controller_client, &mut summary)
.await;
for ancestor_shard in ancestor_shards {
gc_ancestor(
&remote_client,
&s3_client,
&bucket_config,
&target,
&min_age,
ancestor_shard,

View File

@@ -1,16 +1,16 @@
use std::collections::{HashMap, HashSet};
use crate::checks::{
branch_cleanup_and_check_errors, list_timeline_blobs, BlobDataParseResult,
RemoteTimelineBlobData, TenantObjectListing, TimelineAnalysis,
branch_cleanup_and_check_errors, list_timeline_blobs, BlobDataParseResult, S3TimelineBlobData,
TenantObjectListing, TimelineAnalysis,
};
use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId};
use aws_sdk_s3::Client;
use futures_util::{StreamExt, TryStreamExt};
use pageserver::tenant::remote_timeline_client::remote_layer_path;
use pageserver_api::controller_api::MetadataHealthUpdateRequest;
use pageserver_api::shard::TenantShardId;
use remote_storage::GenericRemoteStorage;
use serde::Serialize;
use utils::id::TenantId;
use utils::shard::ShardCount;
@@ -36,7 +36,7 @@ impl MetadataSummary {
Self::default()
}
fn update_data(&mut self, data: &RemoteTimelineBlobData) {
fn update_data(&mut self, data: &S3TimelineBlobData) {
self.timeline_shard_count += 1;
if let BlobDataParseResult::Parsed {
index_part,
@@ -116,14 +116,14 @@ Index versions: {version_summary}
}
/// Scan the pageserver metadata in an S3 bucket, reporting errors and statistics.
pub async fn scan_pageserver_metadata(
pub async fn scan_metadata(
bucket_config: BucketConfig,
tenant_ids: Vec<TenantShardId>,
) -> anyhow::Result<MetadataSummary> {
let (remote_client, target) = init_remote(bucket_config, NodeKind::Pageserver).await?;
let (s3_client, target) = init_remote(bucket_config, NodeKind::Pageserver).await?;
let tenants = if tenant_ids.is_empty() {
futures::future::Either::Left(stream_tenants(&remote_client, &target))
futures::future::Either::Left(stream_tenants(&s3_client, &target))
} else {
futures::future::Either::Right(futures::stream::iter(tenant_ids.into_iter().map(Ok)))
};
@@ -133,20 +133,20 @@ pub async fn scan_pageserver_metadata(
const CONCURRENCY: usize = 32;
// Generate a stream of TenantTimelineId
let timelines = tenants.map_ok(|t| stream_tenant_timelines(&remote_client, &target, t));
let timelines = tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, t));
let timelines = timelines.try_buffered(CONCURRENCY);
let timelines = timelines.try_flatten();
// Generate a stream of S3TimelineBlobData
async fn report_on_timeline(
remote_client: &GenericRemoteStorage,
s3_client: &Client,
target: &RootTarget,
ttid: TenantShardTimelineId,
) -> anyhow::Result<(TenantShardTimelineId, RemoteTimelineBlobData)> {
let data = list_timeline_blobs(remote_client, ttid, target).await?;
) -> anyhow::Result<(TenantShardTimelineId, S3TimelineBlobData)> {
let data = list_timeline_blobs(s3_client, ttid, target).await?;
Ok((ttid, data))
}
let timelines = timelines.map_ok(|ttid| report_on_timeline(&remote_client, &target, ttid));
let timelines = timelines.map_ok(|ttid| report_on_timeline(&s3_client, &target, ttid));
let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));
// We must gather all the TenantShardTimelineId->S3TimelineBlobData for each tenant, because different
@@ -157,11 +157,12 @@ pub async fn scan_pageserver_metadata(
let mut tenant_timeline_results = Vec::new();
async fn analyze_tenant(
remote_client: &GenericRemoteStorage,
s3_client: &Client,
target: &RootTarget,
tenant_id: TenantId,
summary: &mut MetadataSummary,
mut tenant_objects: TenantObjectListing,
timelines: Vec<(TenantShardTimelineId, RemoteTimelineBlobData)>,
timelines: Vec<(TenantShardTimelineId, S3TimelineBlobData)>,
highest_shard_count: ShardCount,
) {
summary.tenant_count += 1;
@@ -190,7 +191,8 @@ pub async fn scan_pageserver_metadata(
// Apply checks to this timeline shard's metadata, and in the process update `tenant_objects`
// reference counts for layers across the tenant.
let analysis = branch_cleanup_and_check_errors(
remote_client,
s3_client,
target,
&ttid,
&mut tenant_objects,
None,
@@ -271,7 +273,8 @@ pub async fn scan_pageserver_metadata(
let tenant_objects = std::mem::take(&mut tenant_objects);
let timelines = std::mem::take(&mut tenant_timeline_results);
analyze_tenant(
&remote_client,
&s3_client,
&target,
prev_tenant_id,
&mut summary,
tenant_objects,
@@ -308,7 +311,8 @@ pub async fn scan_pageserver_metadata(
if !tenant_timeline_results.is_empty() {
analyze_tenant(
&remote_client,
&s3_client,
&target,
tenant_id.expect("Must be set if results are present"),
&mut summary,
tenant_objects,

View File

@@ -14,8 +14,9 @@ use utils::{
};
use crate::{
cloud_admin_api::CloudAdminApiClient, init_remote, metadata_stream::stream_listing,
BucketConfig, ConsoleConfig, NodeKind, RootTarget, TenantShardTimelineId,
cloud_admin_api::CloudAdminApiClient, init_remote_generic,
metadata_stream::stream_listing_generic, BucketConfig, ConsoleConfig, NodeKind, RootTarget,
TenantShardTimelineId,
};
/// Generally we should ask safekeepers, but so far we use everywhere default 16MB.
@@ -106,7 +107,7 @@ pub async fn scan_safekeeper_metadata(
let timelines = client.query(&query, &[]).await?;
info!("loaded {} timelines", timelines.len());
let (remote_client, target) = init_remote(bucket_config, NodeKind::Safekeeper).await?;
let (remote_client, target) = init_remote_generic(bucket_config, NodeKind::Safekeeper).await?;
let console_config = ConsoleConfig::from_env()?;
let cloud_admin_api_client = CloudAdminApiClient::new(console_config);
@@ -187,19 +188,14 @@ async fn check_timeline(
// we need files, so unset it.
timeline_dir_target.delimiter = String::new();
let prefix_str = &timeline_dir_target
.prefix_in_bucket
.strip_prefix("/")
.unwrap_or(&timeline_dir_target.prefix_in_bucket);
let mut stream = std::pin::pin!(stream_listing(remote_client, &timeline_dir_target));
let mut stream = std::pin::pin!(stream_listing_generic(remote_client, &timeline_dir_target));
while let Some(obj) = stream.next().await {
let (key, _obj) = obj?;
let seg_name = key
.get_path()
.as_str()
.strip_prefix(prefix_str)
.strip_prefix(&timeline_dir_target.prefix_in_bucket)
.expect("failed to extract segment name");
expected_segfiles.remove(seg_name);
}

View File

@@ -1,11 +1,10 @@
use std::collections::HashMap;
use std::sync::Arc;
use crate::checks::{list_timeline_blobs, BlobDataParseResult, RemoteTimelineBlobData};
use crate::checks::{list_timeline_blobs, BlobDataParseResult, S3TimelineBlobData};
use crate::metadata_stream::{stream_tenant_shards, stream_tenant_timelines};
use crate::{
download_object_to_file_s3, init_remote, init_remote_s3, BucketConfig, NodeKind, RootTarget,
TenantShardTimelineId,
download_object_to_file, init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId,
};
use anyhow::Context;
use async_stream::stream;
@@ -16,7 +15,6 @@ use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
use pageserver::tenant::storage_layer::LayerName;
use pageserver::tenant::IndexPart;
use pageserver_api::shard::TenantShardId;
use remote_storage::GenericRemoteStorage;
use utils::generation::Generation;
use utils::id::TenantId;
@@ -36,8 +34,7 @@ impl SnapshotDownloader {
output_path: Utf8PathBuf,
concurrency: usize,
) -> anyhow::Result<Self> {
let (s3_client, s3_root) =
init_remote_s3(bucket_config.clone(), NodeKind::Pageserver).await?;
let (s3_client, s3_root) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
Ok(Self {
s3_client,
s3_root,
@@ -94,7 +91,7 @@ impl SnapshotDownloader {
let Some(version) = versions.versions.as_ref().and_then(|v| v.first()) else {
return Err(anyhow::anyhow!("No versions found for {remote_layer_path}"));
};
download_object_to_file_s3(
download_object_to_file(
&self.s3_client,
&self.bucket_config.bucket,
&remote_layer_path,
@@ -218,11 +215,11 @@ impl SnapshotDownloader {
}
pub async fn download(&self) -> anyhow::Result<()> {
let (remote_client, target) =
let (s3_client, target) =
init_remote(self.bucket_config.clone(), NodeKind::Pageserver).await?;
// Generate a stream of TenantShardId
let shards = stream_tenant_shards(&remote_client, &target, self.tenant_id).await?;
let shards = stream_tenant_shards(&s3_client, &target, self.tenant_id).await?;
let shards: Vec<TenantShardId> = shards.try_collect().await?;
// Only read from shards that have the highest count: avoids redundantly downloading
@@ -240,19 +237,18 @@ impl SnapshotDownloader {
for shard in shards.into_iter().filter(|s| s.shard_count == shard_count) {
// Generate a stream of TenantTimelineId
let timelines = stream_tenant_timelines(&remote_client, &target, shard).await?;
let timelines = stream_tenant_timelines(&s3_client, &self.s3_root, shard).await?;
// Generate a stream of S3TimelineBlobData
async fn load_timeline_index(
remote_client: &GenericRemoteStorage,
s3_client: &Client,
target: &RootTarget,
ttid: TenantShardTimelineId,
) -> anyhow::Result<(TenantShardTimelineId, RemoteTimelineBlobData)> {
let data = list_timeline_blobs(remote_client, ttid, target).await?;
) -> anyhow::Result<(TenantShardTimelineId, S3TimelineBlobData)> {
let data = list_timeline_blobs(s3_client, ttid, target).await?;
Ok((ttid, data))
}
let timelines =
timelines.map_ok(|ttid| load_timeline_index(&remote_client, &target, ttid));
let timelines = timelines.map_ok(|ttid| load_timeline_index(&s3_client, &target, ttid));
let mut timelines = std::pin::pin!(timelines.try_buffered(8));
while let Some(i) = timelines.next().await {
@@ -282,7 +278,7 @@ impl SnapshotDownloader {
for (ttid, layers) in ancestor_layers.into_iter() {
tracing::info!(
"Downloading {} layers from ancestor timeline {ttid}...",
"Downloading {} layers from ancvestor timeline {ttid}...",
layers.len()
);

View File

@@ -71,7 +71,8 @@ a subdirectory for each version with naming convention `v{PG_VERSION}/`.
Inside that dir, a `bin/postgres` binary should be present.
`DEFAULT_PG_VERSION`: The version of Postgres to use,
This is used to construct full path to the postgres binaries.
Format is 2-digit major version nubmer, i.e. `DEFAULT_PG_VERSION=16`
Format is 2-digit major version nubmer, i.e. `DEFAULT_PG_VERSION="14"`. Alternatively,
you can use `--pg-version` argument.
`TEST_OUTPUT`: Set the directory where test state and test output files
should go.
`TEST_SHARED_FIXTURES`: Try to re-use a single pageserver for all the tests.

View File

@@ -3,7 +3,6 @@ pytest_plugins = (
"fixtures.parametrize",
"fixtures.httpserver",
"fixtures.compute_reconfigure",
"fixtures.storage_controller_proxy",
"fixtures.neon_fixtures",
"fixtures.benchmark_fixture",
"fixtures.pg_stats",

View File

@@ -497,7 +497,6 @@ class NeonEnvBuilder:
pageserver_aux_file_policy: Optional[AuxFileStore] = None,
pageserver_default_tenant_config_compaction_algorithm: Optional[Dict[str, Any]] = None,
safekeeper_extra_opts: Optional[list[str]] = None,
storage_controller_port_override: Optional[int] = None,
):
self.repo_dir = repo_dir
self.rust_log_override = rust_log_override
@@ -550,8 +549,6 @@ class NeonEnvBuilder:
self.safekeeper_extra_opts = safekeeper_extra_opts
self.storage_controller_port_override = storage_controller_port_override
assert test_name.startswith(
"test_"
), "Unexpectedly instantiated from outside a test function"
@@ -1057,7 +1054,6 @@ class NeonEnv:
"""
BASE_PAGESERVER_ID = 1
storage_controller: NeonStorageController | NeonProxiedStorageController
def __init__(self, config: NeonEnvBuilder):
self.repo_dir = config.repo_dir
@@ -1088,41 +1084,27 @@ class NeonEnv:
self.initial_tenant = config.initial_tenant
self.initial_timeline = config.initial_timeline
# Find two adjacent ports for storage controller and its postgres DB. This
# loop would eventually throw from get_port() if we run out of ports (extremely
# unlikely): usually we find two adjacent free ports on the first iteration.
while True:
self.storage_controller_port = self.port_distributor.get_port()
storage_controller_pg_port = self.port_distributor.get_port()
if storage_controller_pg_port == self.storage_controller_port + 1:
break
# The URL for the pageserver to use as its control_plane_api config
if config.storage_controller_port_override is not None:
log.info(
f"Using storage controller api override {config.storage_controller_port_override}"
)
self.storage_controller_port = config.storage_controller_port_override
self.storage_controller = NeonProxiedStorageController(
self, config.storage_controller_port_override, config.auth_enabled
)
else:
# Find two adjacent ports for storage controller and its postgres DB. This
# loop would eventually throw from get_port() if we run out of ports (extremely
# unlikely): usually we find two adjacent free ports on the first iteration.
while True:
storage_controller_port = self.port_distributor.get_port()
storage_controller_pg_port = self.port_distributor.get_port()
if storage_controller_pg_port == storage_controller_port + 1:
break
self.storage_controller_port = storage_controller_port
self.storage_controller = NeonStorageController(
self, storage_controller_port, config.auth_enabled
)
log.info(
f"Using generated control_plane_api: {self.storage_controller.upcall_api_endpoint()}"
)
self.storage_controller_api: str = self.storage_controller.api_root()
self.control_plane_api: str = self.storage_controller.upcall_api_endpoint()
self.control_plane_api: str = f"http://127.0.0.1:{self.storage_controller_port}/upcall/v1"
# The base URL of the storage controller
self.storage_controller_api: str = f"http://127.0.0.1:{self.storage_controller_port}"
# For testing this with a fake HTTP server, enable passing through a URL from config
self.control_plane_compute_hook_api = config.control_plane_compute_hook_api
self.storage_controller: NeonStorageController = NeonStorageController(
self, config.auth_enabled
)
self.pageserver_virtual_file_io_engine = config.pageserver_virtual_file_io_engine
self.pageserver_aux_file_policy = config.pageserver_aux_file_policy
@@ -1162,6 +1144,7 @@ class NeonEnv:
"listen_http_addr": f"localhost:{pageserver_port.http}",
"pg_auth_type": pg_auth_type,
"http_auth_type": http_auth_type,
"image_compression": "zstd",
}
if self.pageserver_virtual_file_io_engine is not None:
ps_cfg["virtual_file_io_engine"] = self.pageserver_virtual_file_io_engine
@@ -1886,24 +1869,16 @@ class NeonCli(AbstractNeonCli):
def storage_controller_start(
self,
timeout_in_seconds: Optional[int] = None,
instance_id: Optional[int] = None,
base_port: Optional[int] = None,
):
cmd = ["storage_controller", "start"]
if timeout_in_seconds is not None:
cmd.append(f"--start-timeout={timeout_in_seconds}s")
if instance_id is not None:
cmd.append(f"--instance-id={instance_id}")
if base_port is not None:
cmd.append(f"--base-port={base_port}")
return self.raw_cli(cmd)
def storage_controller_stop(self, immediate: bool, instance_id: Optional[int] = None):
def storage_controller_stop(self, immediate: bool):
cmd = ["storage_controller", "stop"]
if immediate:
cmd.extend(["-m", "immediate"])
if instance_id is not None:
cmd.append(f"--instance-id={instance_id}")
return self.raw_cli(cmd)
def pageserver_start(
@@ -2214,30 +2189,17 @@ class PageserverSchedulingPolicy(str, Enum):
PAUSE_FOR_RESTART = "PauseForRestart"
class StorageControllerLeadershipStatus(str, Enum):
LEADER = "leader"
STEPPED_DOWN = "stepped_down"
CANDIDATE = "candidate"
class NeonStorageController(MetricsGetter, LogUtils):
def __init__(self, env: NeonEnv, port: int, auth_enabled: bool):
def __init__(self, env: NeonEnv, auth_enabled: bool):
self.env = env
self.port: int = port
self.api: str = f"http://127.0.0.1:{port}"
self.running = False
self.auth_enabled = auth_enabled
self.allowed_errors: list[str] = DEFAULT_STORAGE_CONTROLLER_ALLOWED_ERRORS
self.logfile = self.env.repo_dir / "storage_controller_1" / "storage_controller.log"
self.logfile = self.workdir / "storage_controller.log"
def start(
self,
timeout_in_seconds: Optional[int] = None,
instance_id: Optional[int] = None,
base_port: Optional[int] = None,
):
def start(self, timeout_in_seconds: Optional[int] = None):
assert not self.running
self.env.neon_cli.storage_controller_start(timeout_in_seconds, instance_id, base_port)
self.env.neon_cli.storage_controller_start(timeout_in_seconds)
self.running = True
return self
@@ -2247,12 +2209,6 @@ class NeonStorageController(MetricsGetter, LogUtils):
self.running = False
return self
def upcall_api_endpoint(self) -> str:
return f"{self.api}/upcall/v1"
def api_root(self) -> str:
return self.api
@staticmethod
def retryable_node_operation(op, ps_id, max_attempts, backoff):
while max_attempts > 0:
@@ -2281,9 +2237,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
def assert_no_errors(self):
assert_no_errors(
self.logfile,
"storage_controller",
self.allowed_errors,
self.env.repo_dir / "storage_controller.log", "storage_controller", self.allowed_errors
)
def pageserver_api(self) -> PageserverHttpClient:
@@ -2295,7 +2249,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
auth_token = None
if self.auth_enabled:
auth_token = self.env.auth_keys.generate_token(scope=TokenScope.PAGE_SERVER_API)
return PageserverHttpClient(self.port, lambda: True, auth_token)
return PageserverHttpClient(self.env.storage_controller_port, lambda: True, auth_token)
def request(self, method, *args, **kwargs) -> requests.Response:
resp = requests.request(method, *args, **kwargs)
@@ -2312,13 +2266,13 @@ class NeonStorageController(MetricsGetter, LogUtils):
return headers
def get_metrics(self) -> Metrics:
res = self.request("GET", f"{self.api}/metrics")
res = self.request("GET", f"{self.env.storage_controller_api}/metrics")
return parse_metrics(res.text)
def ready(self) -> bool:
status = None
try:
resp = self.request("GET", f"{self.api}/ready")
resp = self.request("GET", f"{self.env.storage_controller_api}/ready")
status = resp.status_code
except StorageControllerApiException as e:
status = e.status_code
@@ -2351,7 +2305,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
response = self.request(
"POST",
f"{self.api}/debug/v1/attach-hook",
f"{self.env.storage_controller_api}/debug/v1/attach-hook",
json=body,
headers=self.headers(TokenScope.ADMIN),
)
@@ -2362,7 +2316,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
def attach_hook_drop(self, tenant_shard_id: Union[TenantId, TenantShardId]):
self.request(
"POST",
f"{self.api}/debug/v1/attach-hook",
f"{self.env.storage_controller_api}/debug/v1/attach-hook",
json={"tenant_shard_id": str(tenant_shard_id), "node_id": None},
headers=self.headers(TokenScope.ADMIN),
)
@@ -2373,7 +2327,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
"""
response = self.request(
"POST",
f"{self.api}/debug/v1/inspect",
f"{self.env.storage_controller_api}/debug/v1/inspect",
json={"tenant_shard_id": str(tenant_shard_id)},
headers=self.headers(TokenScope.ADMIN),
)
@@ -2396,7 +2350,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
log.info(f"node_register({body})")
self.request(
"POST",
f"{self.api}/control/v1/node",
f"{self.env.storage_controller_api}/control/v1/node",
json=body,
headers=self.headers(TokenScope.ADMIN),
)
@@ -2405,7 +2359,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
log.info(f"node_delete({node_id})")
self.request(
"DELETE",
f"{self.api}/control/v1/node/{node_id}",
f"{self.env.storage_controller_api}/control/v1/node/{node_id}",
headers=self.headers(TokenScope.ADMIN),
)
@@ -2413,7 +2367,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
log.info(f"node_drain({node_id})")
self.request(
"PUT",
f"{self.api}/control/v1/node/{node_id}/drain",
f"{self.env.storage_controller_api}/control/v1/node/{node_id}/drain",
headers=self.headers(TokenScope.ADMIN),
)
@@ -2421,7 +2375,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
log.info(f"cancel_node_drain({node_id})")
self.request(
"DELETE",
f"{self.api}/control/v1/node/{node_id}/drain",
f"{self.env.storage_controller_api}/control/v1/node/{node_id}/drain",
headers=self.headers(TokenScope.ADMIN),
)
@@ -2429,7 +2383,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
log.info(f"node_fill({node_id})")
self.request(
"PUT",
f"{self.api}/control/v1/node/{node_id}/fill",
f"{self.env.storage_controller_api}/control/v1/node/{node_id}/fill",
headers=self.headers(TokenScope.ADMIN),
)
@@ -2437,22 +2391,14 @@ class NeonStorageController(MetricsGetter, LogUtils):
log.info(f"cancel_node_fill({node_id})")
self.request(
"DELETE",
f"{self.api}/control/v1/node/{node_id}/fill",
f"{self.env.storage_controller_api}/control/v1/node/{node_id}/fill",
headers=self.headers(TokenScope.ADMIN),
)
def node_status(self, node_id):
response = self.request(
"GET",
f"{self.api}/control/v1/node/{node_id}",
headers=self.headers(TokenScope.ADMIN),
)
return response.json()
def get_leader(self):
response = self.request(
"GET",
f"{self.api}/control/v1/leader",
f"{self.env.storage_controller_api}/control/v1/node/{node_id}",
headers=self.headers(TokenScope.ADMIN),
)
return response.json()
@@ -2460,7 +2406,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
def node_list(self):
response = self.request(
"GET",
f"{self.api}/control/v1/node",
f"{self.env.storage_controller_api}/control/v1/node",
headers=self.headers(TokenScope.ADMIN),
)
return response.json()
@@ -2468,7 +2414,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
def tenant_list(self):
response = self.request(
"GET",
f"{self.api}/debug/v1/tenant",
f"{self.env.storage_controller_api}/debug/v1/tenant",
headers=self.headers(TokenScope.ADMIN),
)
return response.json()
@@ -2478,7 +2424,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
body["node_id"] = node_id
self.request(
"PUT",
f"{self.api}/control/v1/node/{node_id}/config",
f"{self.env.storage_controller_api}/control/v1/node/{node_id}/config",
json=body,
headers=self.headers(TokenScope.ADMIN),
)
@@ -2513,7 +2459,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
response = self.request(
"POST",
f"{self.api}/v1/tenant",
f"{self.env.storage_controller_api}/v1/tenant",
json=body,
headers=self.headers(TokenScope.PAGE_SERVER_API),
)
@@ -2526,7 +2472,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
"""
response = self.request(
"GET",
f"{self.api}/debug/v1/tenant/{tenant_id}/locate",
f"{self.env.storage_controller_api}/debug/v1/tenant/{tenant_id}/locate",
headers=self.headers(TokenScope.ADMIN),
)
body = response.json()
@@ -2539,7 +2485,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
"""
response = self.request(
"GET",
f"{self.api}/control/v1/tenant/{tenant_id}",
f"{self.env.storage_controller_api}/control/v1/tenant/{tenant_id}",
headers=self.headers(TokenScope.ADMIN),
)
response.raise_for_status()
@@ -2550,7 +2496,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
) -> list[TenantShardId]:
response = self.request(
"PUT",
f"{self.api}/control/v1/tenant/{tenant_id}/shard_split",
f"{self.env.storage_controller_api}/control/v1/tenant/{tenant_id}/shard_split",
json={"new_shard_count": shard_count, "new_stripe_size": shard_stripe_size},
headers=self.headers(TokenScope.ADMIN),
)
@@ -2562,7 +2508,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
def tenant_shard_migrate(self, tenant_shard_id: TenantShardId, dest_ps_id: int):
self.request(
"PUT",
f"{self.api}/control/v1/tenant/{tenant_shard_id}/migrate",
f"{self.env.storage_controller_api}/control/v1/tenant/{tenant_shard_id}/migrate",
json={"tenant_shard_id": str(tenant_shard_id), "node_id": dest_ps_id},
headers=self.headers(TokenScope.ADMIN),
)
@@ -2573,7 +2519,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
log.info(f"tenant_policy_update({tenant_id}, {body})")
self.request(
"PUT",
f"{self.api}/control/v1/tenant/{tenant_id}/policy",
f"{self.env.storage_controller_api}/control/v1/tenant/{tenant_id}/policy",
json=body,
headers=self.headers(TokenScope.ADMIN),
)
@@ -2581,14 +2527,14 @@ class NeonStorageController(MetricsGetter, LogUtils):
def tenant_import(self, tenant_id: TenantId):
self.request(
"POST",
f"{self.api}/debug/v1/tenant/{tenant_id}/import",
f"{self.env.storage_controller_api}/debug/v1/tenant/{tenant_id}/import",
headers=self.headers(TokenScope.ADMIN),
)
def reconcile_all(self):
r = self.request(
"POST",
f"{self.api}/debug/v1/reconcile_all",
f"{self.env.storage_controller_api}/debug/v1/reconcile_all",
headers=self.headers(TokenScope.ADMIN),
)
r.raise_for_status()
@@ -2621,7 +2567,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
"""
self.request(
"POST",
f"{self.api}/debug/v1/consistency_check",
f"{self.env.storage_controller_api}/debug/v1/consistency_check",
headers=self.headers(TokenScope.ADMIN),
)
log.info("storage controller passed consistency check")
@@ -2694,7 +2640,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
self.request(
"POST",
f"{self.api}/control/v1/metadata_health/update",
f"{self.env.storage_controller_api}/control/v1/metadata_health/update",
json=body,
headers=self.headers(TokenScope.SCRUBBER),
)
@@ -2702,7 +2648,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
def metadata_health_list_unhealthy(self):
response = self.request(
"GET",
f"{self.api}/control/v1/metadata_health/unhealthy",
f"{self.env.storage_controller_api}/control/v1/metadata_health/unhealthy",
headers=self.headers(TokenScope.ADMIN),
)
return response.json()
@@ -2712,7 +2658,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
response = self.request(
"POST",
f"{self.api}/control/v1/metadata_health/outdated",
f"{self.env.storage_controller_api}/control/v1/metadata_health/outdated",
json=body,
headers=self.headers(TokenScope.ADMIN),
)
@@ -2735,7 +2681,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
log.info("Asking storage controller to step down")
response = self.request(
"PUT",
f"{self.api}/control/v1/step_down",
f"{self.env.storage_controller_api}/control/v1/step_down",
headers=self.headers(TokenScope.ADMIN),
)
@@ -2752,7 +2698,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
res = self.request(
"PUT",
f"{self.api}/debug/v1/failpoints",
f"{self.env.storage_controller_api}/debug/v1/failpoints",
json=[{"name": name, "actions": actions} for name, actions in pairs],
headers=self.headers(TokenScope.ADMIN),
)
@@ -2822,21 +2768,9 @@ class NeonStorageController(MetricsGetter, LogUtils):
parsed_tid, wait_ms=250
)
def get_leadership_status(self) -> StorageControllerLeadershipStatus:
metric_values = {}
for status in StorageControllerLeadershipStatus:
metric_value = self.get_metric_value(
"storage_controller_leadership_status", filter={"status": status}
)
metric_values[status] = metric_value
assert list(metric_values.values()).count(1) == 1
for status, metric_value in metric_values.items():
if metric_value == 1:
return status
raise AssertionError("unreachable")
@property
def workdir(self) -> Path:
return self.env.repo_dir
def __enter__(self) -> "NeonStorageController":
return self
@@ -2850,59 +2784,6 @@ class NeonStorageController(MetricsGetter, LogUtils):
self.stop(immediate=True)
class NeonProxiedStorageController(NeonStorageController):
def __init__(self, env: NeonEnv, proxy_port: int, auth_enabled: bool):
super(NeonProxiedStorageController, self).__init__(env, proxy_port, auth_enabled)
self.instances: dict[int, dict[str, Any]] = {}
def start(
self,
timeout_in_seconds: Optional[int] = None,
instance_id: Optional[int] = None,
base_port: Optional[int] = None,
):
assert instance_id is not None and base_port is not None
self.env.neon_cli.storage_controller_start(timeout_in_seconds, instance_id, base_port)
self.instances[instance_id] = {"running": True}
self.running = True
return self
def stop_instance(
self, immediate: bool = False, instance_id: Optional[int] = None
) -> "NeonStorageController":
assert instance_id in self.instances
if self.instances[instance_id]["running"]:
self.env.neon_cli.storage_controller_stop(immediate, instance_id)
self.instances[instance_id]["running"] = False
self.running = any(meta["running"] for meta in self.instances.values())
return self
def stop(self, immediate: bool = False) -> "NeonStorageController":
for iid, details in self.instances.items():
if details["running"]:
self.env.neon_cli.storage_controller_stop(immediate, iid)
self.instances[iid]["running"] = False
self.running = False
return self
def assert_no_errors(self):
for instance_id in self.instances.keys():
assert_no_errors(
self.env.repo_dir / f"storage_controller_{instance_id}" / "storage_controller.log",
"storage_controller",
self.allowed_errors,
)
def log_contains(
self, pattern: str, offset: None | LogCursor = None
) -> Optional[Tuple[str, LogCursor]]:
raise NotImplementedError()
@dataclass
class LogCursor:
_line_no: int
@@ -4639,11 +4520,10 @@ class StorageScrubber:
base_args = [
str(self.env.neon_binpath / "storage_scrubber"),
f"--controller-api={self.env.storage_controller.api_root()}",
f"--controller-api={self.env.storage_controller_api}",
]
args = base_args + args
log.info(f"Invoking scrubber command {args} with env: {env}")
(output_path, stdout, status_code) = subprocess_capture(
self.log_dir,
args,

View File

@@ -1,7 +1,6 @@
import os
from typing import Any, Dict, Optional
import allure
import pytest
import toml
from _pytest.python import Metafunc
@@ -92,23 +91,3 @@ def pytest_generate_tests(metafunc: Metafunc):
and (platform := os.getenv("PLATFORM")) is not None
):
metafunc.parametrize("platform", [platform.lower()])
@pytest.hookimpl(hookwrapper=True, tryfirst=True)
def pytest_runtest_makereport(*args, **kwargs):
# Add test parameters to Allue report to distinguish the same tests with different parameters.
# Names has `__` prefix to avoid conflicts with `pytest.mark.parametrize` parameters
# A mapping between `uname -m` and `RUNNER_ARCH` values.
# `RUNNER_ARCH` environment variable is set on GitHub Runners,
# possible values are X86, X64, ARM, or ARM64.
# See https://docs.github.com/en/actions/learn-github-actions/variables#default-environment-variables
uname_m = {
"aarch64": "ARM64",
"arm64": "ARM64",
"x86_64": "X64",
}.get(os.uname().machine, "UNKNOWN")
arch = os.getenv("RUNNER_ARCH", uname_m)
allure.dynamic.parameter("__arch", arch)
yield

View File

@@ -3,6 +3,8 @@ import os
from typing import Optional
import pytest
from _pytest.config import Config
from _pytest.config.argparsing import Parser
"""
This fixture is used to determine which version of Postgres to use for tests.
@@ -50,7 +52,7 @@ class PgVersion(str, enum.Enum):
return None
DEFAULT_VERSION: PgVersion = PgVersion.V16
DEFAULT_VERSION: PgVersion = PgVersion.V15
def skip_on_postgres(version: PgVersion, reason: str):
@@ -67,8 +69,22 @@ def xfail_on_postgres(version: PgVersion, reason: str):
)
def pytest_addoption(parser: Parser):
parser.addoption(
"--pg-version",
action="store",
type=PgVersion,
help="DEPRECATED: Postgres version to use for tests",
)
def run_only_on_default_postgres(reason: str):
return pytest.mark.skipif(
PgVersion(os.environ.get("DEFAULT_PG_VERSION", DEFAULT_VERSION)) is not DEFAULT_VERSION,
reason=reason,
)
def pytest_configure(config: Config):
if config.getoption("--pg-version"):
raise Exception("--pg-version is deprecated, use DEFAULT_PG_VERSION env var instead")

View File

@@ -1,73 +0,0 @@
import re
from typing import Any, Optional
import pytest
import requests
from pytest_httpserver import HTTPServer
from werkzeug.datastructures import Headers
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
from fixtures.log_helper import log
class StorageControllerProxy:
def __init__(self, server: HTTPServer):
self.server: HTTPServer = server
self.listen: str = f"http://{server.host}:{server.port}"
self.routing_to: Optional[str] = None
def route_to(self, storage_controller_api: str):
self.routing_to = storage_controller_api
def port(self) -> int:
return self.server.port
def upcall_api_endpoint(self) -> str:
return f"{self.listen}/upcall/v1"
def proxy_request(method: str, url: str, **kwargs) -> requests.Response:
return requests.request(method, url, **kwargs)
@pytest.fixture(scope="function")
def storage_controller_proxy(make_httpserver):
"""
Proxies requests into the storage controller to the currently
selected storage controller instance via `StorageControllerProxy.route_to`.
This fixture is intended for tests that need to run multiple instances
of the storage controller at the same time.
"""
server = make_httpserver
self = StorageControllerProxy(server)
log.info(f"Storage controller proxy listening on {self.listen}")
def handler(request: Request):
if self.route_to is None:
log.info(f"Storage controller proxy has no routing configured for {request.url}")
return Response("Routing not configured", status=503)
route_to_url = f"{self.routing_to}{request.path}"
log.info(f"Routing {request.url} to {route_to_url}")
args: dict[str, Any] = {"headers": request.headers}
if request.is_json:
args["json"] = request.json
response = proxy_request(request.method, route_to_url, **args)
headers = Headers()
for key, value in response.headers.items():
headers.add(key, value)
return Response(response.content, headers=headers, status=response.status_code)
self.server.expect_request(re.compile(".*")).respond_with_handler(handler)
yield self
server.clear()

View File

@@ -403,7 +403,7 @@ def wait_until(
try:
res = func()
except Exception as e:
log.info("waiting for %s iteration %s failed: %s", func, i + 1, e)
log.info("waiting for %s iteration %s failed", func, i + 1)
last_exception = e
if show_intermediate_error:
log.info(e)

View File

@@ -7,7 +7,7 @@ easier to see if you have compile errors without scrolling up.
You may also need to run `./scripts/pysync`.
Then run the tests
`DEFAULT_PG_VERSION=16 NEON_BIN=./target/release poetry run pytest test_runner/performance`
`DEFAULT_PG_VERSION=15 NEON_BIN=./target/release poetry run pytest test_runner/performance`
Some handy pytest flags for local development:
- `-x` tells pytest to stop on first error

Some files were not shown because too many files have changed in this diff Show More