mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-05 19:50:36 +00:00
Compare commits
12 Commits
heavier_on
...
jcsp/compu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
978e55f746 | ||
|
|
090a789408 | ||
|
|
3d4fe205ba | ||
|
|
f7516df6c1 | ||
|
|
f3d7d23805 | ||
|
|
9f75da7c0a | ||
|
|
f4cc7cae14 | ||
|
|
4f57dc6cc6 | ||
|
|
dc811d1923 | ||
|
|
e65f0fe874 | ||
|
|
bb92721168 | ||
|
|
d7b29aace7 |
@@ -179,6 +179,12 @@ runs:
|
||||
aws s3 rm "s3://${BUCKET}/${LOCK_FILE}"
|
||||
fi
|
||||
|
||||
- name: Cache poetry deps
|
||||
uses: actions/cache@v3
|
||||
with:
|
||||
path: ~/.cache/pypoetry/virtualenvs
|
||||
key: v2-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }}
|
||||
|
||||
- name: Store Allure test stat in the DB (new)
|
||||
if: ${{ !cancelled() && inputs.store-test-results-into-db == 'true' }}
|
||||
shell: bash -euxo pipefail {0}
|
||||
|
||||
@@ -44,6 +44,10 @@ inputs:
|
||||
description: 'Postgres version to use for tests'
|
||||
required: false
|
||||
default: 'v14'
|
||||
benchmark_durations:
|
||||
description: 'benchmark durations JSON'
|
||||
required: false
|
||||
default: '{}'
|
||||
|
||||
runs:
|
||||
using: "composite"
|
||||
@@ -82,11 +86,10 @@ runs:
|
||||
fetch-depth: 1
|
||||
|
||||
- name: Cache poetry deps
|
||||
id: cache_poetry
|
||||
uses: actions/cache@v3
|
||||
with:
|
||||
path: ~/.cache/pypoetry/virtualenvs
|
||||
key: v1-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }}
|
||||
key: v2-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }}
|
||||
|
||||
- name: Install Python deps
|
||||
shell: bash -euxo pipefail {0}
|
||||
@@ -160,7 +163,7 @@ runs:
|
||||
# We use pytest-split plugin to run benchmarks in parallel on different CI runners
|
||||
if [ "${TEST_SELECTION}" = "test_runner/performance" ] && [ "${{ inputs.build_type }}" != "remote" ]; then
|
||||
mkdir -p $TEST_OUTPUT
|
||||
poetry run ./scripts/benchmark_durations.py "${TEST_RESULT_CONNSTR}" --days 10 --output "$TEST_OUTPUT/benchmark_durations.json"
|
||||
echo '${{ inputs.benchmark_durations || '{}' }}' > $TEST_OUTPUT/benchmark_durations.json
|
||||
|
||||
EXTRA_PARAMS="--durations-path $TEST_OUTPUT/benchmark_durations.json $EXTRA_PARAMS"
|
||||
fi
|
||||
|
||||
44
.github/workflows/build_and_test.yml
vendored
44
.github/workflows/build_and_test.yml
vendored
@@ -112,11 +112,10 @@ jobs:
|
||||
fetch-depth: 1
|
||||
|
||||
- name: Cache poetry deps
|
||||
id: cache_poetry
|
||||
uses: actions/cache@v3
|
||||
with:
|
||||
path: ~/.cache/pypoetry/virtualenvs
|
||||
key: v1-codestyle-python-deps-${{ hashFiles('poetry.lock') }}
|
||||
key: v2-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }}
|
||||
|
||||
- name: Install Python deps
|
||||
run: ./scripts/pysync
|
||||
@@ -132,7 +131,7 @@ jobs:
|
||||
|
||||
check-codestyle-rust:
|
||||
needs: [ check-permissions, build-buildtools-image ]
|
||||
runs-on: [ self-hosted, gen3, large ]
|
||||
runs-on: [ self-hosted, gen3, small ]
|
||||
container:
|
||||
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:${{ needs.build-buildtools-image.outputs.build-tools-tag }}
|
||||
options: --init
|
||||
@@ -478,8 +477,40 @@ jobs:
|
||||
if: matrix.build_type == 'debug' && matrix.pg_version == 'v14'
|
||||
uses: ./.github/actions/save-coverage-data
|
||||
|
||||
get-benchmarks-durations:
|
||||
outputs:
|
||||
json: ${{ steps.get-benchmark-durations.outputs.json }}
|
||||
needs: [ check-permissions, build-buildtools-image ]
|
||||
runs-on: [ self-hosted, gen3, small ]
|
||||
container:
|
||||
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:${{ needs.build-buildtools-image.outputs.build-tools-tag }}
|
||||
options: --init
|
||||
if: github.ref_name == 'main' || contains(github.event.pull_request.labels.*.name, 'run-benchmarks')
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v3
|
||||
|
||||
- name: Cache poetry deps
|
||||
uses: actions/cache@v3
|
||||
with:
|
||||
path: ~/.cache/pypoetry/virtualenvs
|
||||
key: v1-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }}
|
||||
|
||||
- name: Install Python deps
|
||||
run: ./scripts/pysync
|
||||
|
||||
- name: get benchmark durations
|
||||
id: get-benchmark-durations
|
||||
env:
|
||||
TEST_RESULT_CONNSTR: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
|
||||
run: |
|
||||
poetry run ./scripts/benchmark_durations.py "${TEST_RESULT_CONNSTR}" \
|
||||
--days 10 \
|
||||
--output /tmp/benchmark_durations.json
|
||||
echo "json=$(jq --compact-output '.' /tmp/benchmark_durations.json)" >> $GITHUB_OUTPUT
|
||||
|
||||
benchmarks:
|
||||
needs: [ check-permissions, build-neon, build-buildtools-image ]
|
||||
needs: [ check-permissions, build-neon, build-buildtools-image, get-benchmarks-durations ]
|
||||
runs-on: [ self-hosted, gen3, small ]
|
||||
container:
|
||||
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:${{ needs.build-buildtools-image.outputs.build-tools-tag }}
|
||||
@@ -490,7 +521,7 @@ jobs:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
# the amount of groups (N) should be reflected in `extra_params: --splits N ...`
|
||||
pytest_split_group: [ 1, 2, 3, 4 ]
|
||||
pytest_split_group: [ 1, 2, 3, 4, 5 ]
|
||||
build_type: [ release ]
|
||||
steps:
|
||||
- name: Checkout
|
||||
@@ -503,7 +534,8 @@ jobs:
|
||||
test_selection: performance
|
||||
run_in_parallel: false
|
||||
save_perf_report: ${{ github.ref_name == 'main' }}
|
||||
extra_params: --splits 4 --group ${{ matrix.pytest_split_group }}
|
||||
extra_params: --splits 5 --group ${{ matrix.pytest_split_group }}
|
||||
benchmark_durations: ${{ needs.get-benchmarks-durations.outputs.json }}
|
||||
env:
|
||||
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
|
||||
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
|
||||
|
||||
3
.github/workflows/pg_clients.yml
vendored
3
.github/workflows/pg_clients.yml
vendored
@@ -38,11 +38,10 @@ jobs:
|
||||
uses: snok/install-poetry@v1
|
||||
|
||||
- name: Cache poetry deps
|
||||
id: cache_poetry
|
||||
uses: actions/cache@v3
|
||||
with:
|
||||
path: ~/.cache/pypoetry/virtualenvs
|
||||
key: v1-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }}
|
||||
key: v2-${{ runner.os }}-python-deps-ubunutu-latest-${{ hashFiles('poetry.lock') }}
|
||||
|
||||
- name: Install Python deps
|
||||
shell: bash -euxo pipefail {0}
|
||||
|
||||
23
Cargo.lock
generated
23
Cargo.lock
generated
@@ -289,6 +289,7 @@ dependencies = [
|
||||
"pageserver_api",
|
||||
"pageserver_client",
|
||||
"postgres_connection",
|
||||
"r2d2",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_json",
|
||||
@@ -1651,6 +1652,7 @@ dependencies = [
|
||||
"diesel_derives",
|
||||
"itoa",
|
||||
"pq-sys",
|
||||
"r2d2",
|
||||
"serde_json",
|
||||
]
|
||||
|
||||
@@ -4166,6 +4168,17 @@ dependencies = [
|
||||
"proc-macro2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "r2d2"
|
||||
version = "0.8.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93"
|
||||
dependencies = [
|
||||
"log",
|
||||
"parking_lot 0.12.1",
|
||||
"scheduled-thread-pool",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand"
|
||||
version = "0.7.3"
|
||||
@@ -4879,6 +4892,15 @@ dependencies = [
|
||||
"windows-sys 0.42.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "scheduled-thread-pool"
|
||||
version = "0.2.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19"
|
||||
dependencies = [
|
||||
"parking_lot 0.12.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "scopeguard"
|
||||
version = "1.1.0"
|
||||
@@ -6807,6 +6829,7 @@ dependencies = [
|
||||
"clap_builder",
|
||||
"crossbeam-utils",
|
||||
"diesel",
|
||||
"diesel_derives",
|
||||
"either",
|
||||
"fail",
|
||||
"futures-channel",
|
||||
|
||||
@@ -111,7 +111,7 @@ USER nonroot:nonroot
|
||||
WORKDIR /home/nonroot
|
||||
|
||||
# Python
|
||||
ENV PYTHON_VERSION=3.9.2 \
|
||||
ENV PYTHON_VERSION=3.9.18 \
|
||||
PYENV_ROOT=/home/nonroot/.pyenv \
|
||||
PATH=/home/nonroot/.pyenv/shims:/home/nonroot/.pyenv/bin:/home/nonroot/.poetry/bin:$PATH
|
||||
RUN set -e \
|
||||
|
||||
@@ -207,6 +207,7 @@ fn maybe_cgexec(cmd: &str) -> Command {
|
||||
|
||||
/// Create special neon_superuser role, that's a slightly nerfed version of a real superuser
|
||||
/// that we give to customers
|
||||
#[instrument(skip_all)]
|
||||
fn create_neon_superuser(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
|
||||
let roles = spec
|
||||
.cluster
|
||||
|
||||
@@ -24,8 +24,9 @@ tokio.workspace = true
|
||||
tokio-util.workspace = true
|
||||
tracing.workspace = true
|
||||
|
||||
diesel = { version = "2.1.4", features = ["serde_json", "postgres"] }
|
||||
diesel = { version = "2.1.4", features = ["serde_json", "postgres", "r2d2"] }
|
||||
diesel_migrations = { version = "2.1.0" }
|
||||
r2d2 = { version = "0.8.10" }
|
||||
|
||||
utils = { path = "../../libs/utils/" }
|
||||
metrics = { path = "../../libs/metrics/" }
|
||||
|
||||
@@ -170,14 +170,14 @@ impl ComputeHook {
|
||||
reconfigure_request: &ComputeHookNotifyRequest,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(), NotifyError> {
|
||||
let req = client.request(Method::POST, url);
|
||||
let req = client.request(Method::PUT, url);
|
||||
let req = if let Some(value) = &self.authorization_header {
|
||||
req.header(reqwest::header::AUTHORIZATION, value)
|
||||
} else {
|
||||
req
|
||||
};
|
||||
|
||||
tracing::debug!(
|
||||
tracing::info!(
|
||||
"Sending notify request to {} ({:?})",
|
||||
url,
|
||||
reconfigure_request
|
||||
|
||||
@@ -34,9 +34,9 @@ struct Cli {
|
||||
#[arg(short, long)]
|
||||
listen: std::net::SocketAddr,
|
||||
|
||||
/// Path to public key for JWT authentication of clients
|
||||
/// Public key for JWT authentication of clients
|
||||
#[arg(long)]
|
||||
public_key: Option<camino::Utf8PathBuf>,
|
||||
public_key: Option<String>,
|
||||
|
||||
/// Token for authenticating this service with the pageservers it controls
|
||||
#[arg(long)]
|
||||
@@ -159,7 +159,7 @@ impl Secrets {
|
||||
fn load_cli(database_url: &str, args: &Cli) -> anyhow::Result<Self> {
|
||||
let public_key = match &args.public_key {
|
||||
None => None,
|
||||
Some(key_path) => Some(JwtAuth::from_key_path(key_path)?),
|
||||
Some(key) => Some(JwtAuth::from_key(key.clone()).context("Loading public key")?),
|
||||
};
|
||||
Ok(Self {
|
||||
database_url: database_url.to_owned(),
|
||||
@@ -170,6 +170,7 @@ impl Secrets {
|
||||
}
|
||||
}
|
||||
|
||||
/// Execute the diesel migrations that are built into this binary
|
||||
async fn migration_run(database_url: &str) -> anyhow::Result<()> {
|
||||
use diesel::PgConnection;
|
||||
use diesel_migrations::{HarnessWithOutput, MigrationHarness};
|
||||
@@ -183,8 +184,18 @@ async fn migration_run(database_url: &str) -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
fn main() -> anyhow::Result<()> {
|
||||
tokio::runtime::Builder::new_current_thread()
|
||||
// We use spawn_blocking for database operations, so require approximately
|
||||
// as many blocking threads as we will open database connections.
|
||||
.max_blocking_threads(Persistence::MAX_CONNECTIONS as usize)
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap()
|
||||
.block_on(async_main())
|
||||
}
|
||||
|
||||
async fn async_main() -> anyhow::Result<()> {
|
||||
let launch_ts = Box::leak(Box::new(LaunchTimestamp::generate()));
|
||||
|
||||
logging::init(
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::collections::HashMap;
|
||||
use std::str::FromStr;
|
||||
use std::time::Duration;
|
||||
|
||||
use camino::Utf8Path;
|
||||
use camino::Utf8PathBuf;
|
||||
@@ -44,7 +45,7 @@ use crate::PlacementPolicy;
|
||||
/// updated, and reads of nodes are always from memory, not the database. We only require that
|
||||
/// we can UPDATE a node's scheduling mode reasonably quickly to mark a bad node offline.
|
||||
pub struct Persistence {
|
||||
database_url: String,
|
||||
connection_pool: diesel::r2d2::Pool<diesel::r2d2::ConnectionManager<PgConnection>>,
|
||||
|
||||
// In test environments, we support loading+saving a JSON file. This is temporary, for the benefit of
|
||||
// test_compatibility.py, so that we don't have to commit to making the database contents fully backward/forward
|
||||
@@ -64,6 +65,8 @@ pub(crate) enum DatabaseError {
|
||||
Query(#[from] diesel::result::Error),
|
||||
#[error(transparent)]
|
||||
Connection(#[from] diesel::result::ConnectionError),
|
||||
#[error(transparent)]
|
||||
ConnectionPool(#[from] r2d2::Error),
|
||||
#[error("Logical error: {0}")]
|
||||
Logical(String),
|
||||
}
|
||||
@@ -71,9 +74,31 @@ pub(crate) enum DatabaseError {
|
||||
pub(crate) type DatabaseResult<T> = Result<T, DatabaseError>;
|
||||
|
||||
impl Persistence {
|
||||
// The default postgres connection limit is 100. We use up to 99, to leave one free for a human admin under
|
||||
// normal circumstances. This assumes we have exclusive use of the database cluster to which we connect.
|
||||
pub const MAX_CONNECTIONS: u32 = 99;
|
||||
|
||||
// We don't want to keep a lot of connections alive: close them down promptly if they aren't being used.
|
||||
const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
const MAX_CONNECTION_LIFETIME: Duration = Duration::from_secs(60);
|
||||
|
||||
pub fn new(database_url: String, json_path: Option<Utf8PathBuf>) -> Self {
|
||||
let manager = diesel::r2d2::ConnectionManager::<PgConnection>::new(database_url);
|
||||
|
||||
// We will use a connection pool: this is primarily to _limit_ our connection count, rather than to optimize time
|
||||
// to execute queries (database queries are not generally on latency-sensitive paths).
|
||||
let connection_pool = diesel::r2d2::Pool::builder()
|
||||
.max_size(Self::MAX_CONNECTIONS)
|
||||
.max_lifetime(Some(Self::MAX_CONNECTION_LIFETIME))
|
||||
.idle_timeout(Some(Self::IDLE_CONNECTION_TIMEOUT))
|
||||
// Always keep at least one connection ready to go
|
||||
.min_idle(Some(1))
|
||||
.test_on_check_out(true)
|
||||
.build(manager)
|
||||
.expect("Could not build connection pool");
|
||||
|
||||
Self {
|
||||
database_url,
|
||||
connection_pool,
|
||||
json_path,
|
||||
}
|
||||
}
|
||||
@@ -84,14 +109,10 @@ impl Persistence {
|
||||
F: Fn(&mut PgConnection) -> DatabaseResult<R> + Send + 'static,
|
||||
R: Send + 'static,
|
||||
{
|
||||
let database_url = self.database_url.clone();
|
||||
tokio::task::spawn_blocking(move || -> DatabaseResult<R> {
|
||||
// TODO: connection pooling, such as via diesel::r2d2
|
||||
let mut conn = PgConnection::establish(&database_url)?;
|
||||
func(&mut conn)
|
||||
})
|
||||
.await
|
||||
.expect("Task panic")
|
||||
let mut conn = self.connection_pool.get()?;
|
||||
tokio::task::spawn_blocking(move || -> DatabaseResult<R> { func(&mut conn) })
|
||||
.await
|
||||
.expect("Task panic")
|
||||
}
|
||||
|
||||
/// When a node is first registered, persist it before using it for anything
|
||||
|
||||
@@ -103,7 +103,9 @@ impl From<DatabaseError> for ApiError {
|
||||
match err {
|
||||
DatabaseError::Query(e) => ApiError::InternalServerError(e.into()),
|
||||
// FIXME: ApiError doesn't have an Unavailable variant, but ShuttingDown maps to 503.
|
||||
DatabaseError::Connection(_e) => ApiError::ShuttingDown,
|
||||
DatabaseError::Connection(_) | DatabaseError::ConnectionPool(_) => {
|
||||
ApiError::ShuttingDown
|
||||
}
|
||||
DatabaseError::Logical(reason) => {
|
||||
ApiError::InternalServerError(anyhow::anyhow!(reason))
|
||||
}
|
||||
|
||||
@@ -28,7 +28,7 @@ pub struct AttachmentService {
|
||||
listen: String,
|
||||
path: Utf8PathBuf,
|
||||
jwt_token: Option<String>,
|
||||
public_key_path: Option<Utf8PathBuf>,
|
||||
public_key: Option<String>,
|
||||
postgres_port: u16,
|
||||
client: reqwest::Client,
|
||||
}
|
||||
@@ -207,7 +207,7 @@ impl AttachmentService {
|
||||
.pageservers
|
||||
.first()
|
||||
.expect("Config is validated to contain at least one pageserver");
|
||||
let (jwt_token, public_key_path) = match ps_conf.http_auth_type {
|
||||
let (jwt_token, public_key) = match ps_conf.http_auth_type {
|
||||
AuthType::Trust => (None, None),
|
||||
AuthType::NeonJWT => {
|
||||
let jwt_token = env
|
||||
@@ -219,7 +219,26 @@ impl AttachmentService {
|
||||
let public_key_path =
|
||||
camino::Utf8PathBuf::try_from(env.base_data_dir.join("auth_public_key.pem"))
|
||||
.unwrap();
|
||||
(Some(jwt_token), Some(public_key_path))
|
||||
|
||||
// This service takes keys as a string rather than as a path to a file/dir: read the key into memory.
|
||||
let public_key = if std::fs::metadata(&public_key_path)
|
||||
.expect("Can't stat public key")
|
||||
.is_dir()
|
||||
{
|
||||
// Our config may specify a directory: this is for the pageserver's ability to handle multiple
|
||||
// keys. We only use one key at a time, so, arbitrarily load the first one in the directory.
|
||||
let mut dir =
|
||||
std::fs::read_dir(&public_key_path).expect("Can't readdir public key path");
|
||||
let dent = dir
|
||||
.next()
|
||||
.expect("Empty key dir")
|
||||
.expect("Error reading key dir");
|
||||
|
||||
std::fs::read_to_string(dent.path()).expect("Can't read public key")
|
||||
} else {
|
||||
std::fs::read_to_string(&public_key_path).expect("Can't read public key")
|
||||
};
|
||||
(Some(jwt_token), Some(public_key))
|
||||
}
|
||||
};
|
||||
|
||||
@@ -228,7 +247,7 @@ impl AttachmentService {
|
||||
path,
|
||||
listen,
|
||||
jwt_token,
|
||||
public_key_path,
|
||||
public_key,
|
||||
postgres_port,
|
||||
client: reqwest::ClientBuilder::new()
|
||||
.build()
|
||||
@@ -453,8 +472,8 @@ impl AttachmentService {
|
||||
args.push(format!("--jwt-token={jwt_token}"));
|
||||
}
|
||||
|
||||
if let Some(public_key_path) = &self.public_key_path {
|
||||
args.push(format!("--public-key={public_key_path}"));
|
||||
if let Some(public_key) = &self.public_key {
|
||||
args.push(format!("--public-key=\"{public_key}\""));
|
||||
}
|
||||
|
||||
if let Some(control_plane_compute_hook_api) = &self.env.control_plane_compute_hook_api {
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use std::sync::{
|
||||
atomic::{AtomicBool, AtomicUsize, Ordering},
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
Arc,
|
||||
};
|
||||
use tokio::sync::Semaphore;
|
||||
@@ -14,18 +14,6 @@ use tokio::sync::Semaphore;
|
||||
pub struct OnceCell<T> {
|
||||
inner: tokio::sync::RwLock<Inner<T>>,
|
||||
initializers: AtomicUsize,
|
||||
/// Do we have one permit or `u32::MAX` permits?
|
||||
///
|
||||
/// Having one permit means the cell is not initialized, and one winning future could
|
||||
/// initialize it. The act of initializing the cell adds `u32::MAX` permits and set this to
|
||||
/// `false`.
|
||||
///
|
||||
/// Deinitializing an initialized cell will first take `u32::MAX` permits handing one of them
|
||||
/// out, then set this back to `true`.
|
||||
///
|
||||
/// Because we need to see all changes to this variable, always use Acquire to read, AcqRel to
|
||||
/// compare_exchange.
|
||||
has_one_permit: AtomicBool,
|
||||
}
|
||||
|
||||
impl<T> Default for OnceCell<T> {
|
||||
@@ -34,7 +22,6 @@ impl<T> Default for OnceCell<T> {
|
||||
Self {
|
||||
inner: Default::default(),
|
||||
initializers: AtomicUsize::new(0),
|
||||
has_one_permit: AtomicBool::new(true),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -60,14 +47,14 @@ impl<T> Default for Inner<T> {
|
||||
impl<T> OnceCell<T> {
|
||||
/// Creates an already initialized `OnceCell` with the given value.
|
||||
pub fn new(value: T) -> Self {
|
||||
let sem = Semaphore::new(u32::MAX as usize);
|
||||
let sem = Semaphore::new(1);
|
||||
sem.close();
|
||||
Self {
|
||||
inner: tokio::sync::RwLock::new(Inner {
|
||||
init_semaphore: Arc::new(sem),
|
||||
value: Some(value),
|
||||
}),
|
||||
initializers: AtomicUsize::new(0),
|
||||
has_one_permit: AtomicBool::new(false),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -77,47 +64,42 @@ impl<T> OnceCell<T> {
|
||||
/// Initializing might wait on any existing [`GuardMut::take_and_deinit`] deinitialization.
|
||||
///
|
||||
/// Initialization is panic-safe and cancellation-safe.
|
||||
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
|
||||
pub async fn get_mut_or_init<F, Fut, E>(&self, factory: F) -> Result<GuardMut<'_, T>, E>
|
||||
where
|
||||
F: FnOnce(InitPermit) -> Fut,
|
||||
Fut: std::future::Future<Output = Result<(T, InitPermit), E>>,
|
||||
{
|
||||
loop {
|
||||
let sem = {
|
||||
let guard = self.inner.write().await;
|
||||
if guard.value.is_some() {
|
||||
tracing::debug!("returning GuardMut over existing value");
|
||||
return Ok(GuardMut(guard));
|
||||
}
|
||||
guard.init_semaphore.clone()
|
||||
};
|
||||
|
||||
{
|
||||
let permit = {
|
||||
let _guard = CountWaitingInitializers::start(self);
|
||||
sem.acquire().await
|
||||
};
|
||||
|
||||
let permit = permit.expect("semaphore is never closed");
|
||||
|
||||
if !self.has_one_permit.load(Ordering::Acquire) {
|
||||
// it is important that the permit is dropped here otherwise there would be a
|
||||
// deadlock with `take_and_deinit` happening at the same time.
|
||||
tracing::trace!("seems initialization happened already, trying again");
|
||||
continue;
|
||||
}
|
||||
|
||||
permit.forget();
|
||||
}
|
||||
|
||||
tracing::trace!("calling factory");
|
||||
let permit = InitPermit::from(sem);
|
||||
let (value, permit) = factory(permit).await?;
|
||||
|
||||
let sem = {
|
||||
let guard = self.inner.write().await;
|
||||
if guard.value.is_some() {
|
||||
return Ok(GuardMut(guard));
|
||||
}
|
||||
guard.init_semaphore.clone()
|
||||
};
|
||||
|
||||
return Ok(self.set0(value, guard, permit));
|
||||
let permit = {
|
||||
// increment the count for the duration of queued
|
||||
let _guard = CountWaitingInitializers::start(self);
|
||||
sem.acquire_owned().await
|
||||
};
|
||||
|
||||
match permit {
|
||||
Ok(permit) => {
|
||||
let permit = InitPermit(permit);
|
||||
let (value, _permit) = factory(permit).await?;
|
||||
|
||||
let guard = self.inner.write().await;
|
||||
|
||||
Ok(Self::set0(value, guard))
|
||||
}
|
||||
Err(_closed) => {
|
||||
let guard = self.inner.write().await;
|
||||
assert!(
|
||||
guard.value.is_some(),
|
||||
"semaphore got closed, must be initialized"
|
||||
);
|
||||
return Ok(GuardMut(guard));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -125,48 +107,42 @@ impl<T> OnceCell<T> {
|
||||
/// returning the guard.
|
||||
///
|
||||
/// Initialization is panic-safe and cancellation-safe.
|
||||
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
|
||||
pub async fn get_or_init<F, Fut, E>(&self, factory: F) -> Result<GuardRef<'_, T>, E>
|
||||
where
|
||||
F: FnOnce(InitPermit) -> Fut,
|
||||
Fut: std::future::Future<Output = Result<(T, InitPermit), E>>,
|
||||
{
|
||||
loop {
|
||||
let sem = {
|
||||
let guard = self.inner.read().await;
|
||||
if guard.value.is_some() {
|
||||
tracing::debug!("returning GuardRef over existing value");
|
||||
return Ok(GuardRef(guard));
|
||||
}
|
||||
guard.init_semaphore.clone()
|
||||
};
|
||||
|
||||
{
|
||||
let permit = {
|
||||
// increment the count for the duration of queued
|
||||
let _guard = CountWaitingInitializers::start(self);
|
||||
sem.acquire().await
|
||||
};
|
||||
|
||||
let permit = permit.expect("semaphore is never closed");
|
||||
|
||||
if !self.has_one_permit.load(Ordering::Acquire) {
|
||||
tracing::trace!("seems initialization happened already, trying again");
|
||||
continue;
|
||||
} else {
|
||||
// it is our turn to initialize for sure
|
||||
}
|
||||
|
||||
permit.forget();
|
||||
let sem = {
|
||||
let guard = self.inner.read().await;
|
||||
if guard.value.is_some() {
|
||||
return Ok(GuardRef(guard));
|
||||
}
|
||||
guard.init_semaphore.clone()
|
||||
};
|
||||
|
||||
tracing::trace!("calling factory");
|
||||
let permit = InitPermit::from(sem);
|
||||
let (value, permit) = factory(permit).await?;
|
||||
let permit = {
|
||||
// increment the count for the duration of queued
|
||||
let _guard = CountWaitingInitializers::start(self);
|
||||
sem.acquire_owned().await
|
||||
};
|
||||
|
||||
let guard = self.inner.write().await;
|
||||
match permit {
|
||||
Ok(permit) => {
|
||||
let permit = InitPermit(permit);
|
||||
let (value, _permit) = factory(permit).await?;
|
||||
|
||||
return Ok(self.set0(value, guard, permit).downgrade());
|
||||
let guard = self.inner.write().await;
|
||||
|
||||
Ok(Self::set0(value, guard).downgrade())
|
||||
}
|
||||
Err(_closed) => {
|
||||
let guard = self.inner.read().await;
|
||||
assert!(
|
||||
guard.value.is_some(),
|
||||
"semaphore got closed, must be initialized"
|
||||
);
|
||||
return Ok(GuardRef(guard));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -176,47 +152,26 @@ impl<T> OnceCell<T> {
|
||||
/// # Panics
|
||||
///
|
||||
/// If the inner has already been initialized.
|
||||
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
|
||||
pub async fn set(&self, value: T, permit: InitPermit) -> GuardMut<'_, T> {
|
||||
pub async fn set(&self, value: T, _permit: InitPermit) -> GuardMut<'_, T> {
|
||||
let guard = self.inner.write().await;
|
||||
|
||||
assert!(
|
||||
self.has_one_permit.load(Ordering::Acquire),
|
||||
"cannot set when there are multiple permits"
|
||||
);
|
||||
|
||||
// cannot assert that this permit is for self.inner.semaphore, but we can assert it cannot
|
||||
// give more permits right now.
|
||||
if guard.init_semaphore.try_acquire().is_ok() {
|
||||
let available = guard.init_semaphore.available_permits();
|
||||
drop(guard);
|
||||
panic!("permit is of wrong origin: {available}");
|
||||
panic!("permit is of wrong origin");
|
||||
}
|
||||
|
||||
self.set0(value, guard, permit)
|
||||
Self::set0(value, guard)
|
||||
}
|
||||
|
||||
fn set0<'a>(
|
||||
&'a self,
|
||||
value: T,
|
||||
mut guard: tokio::sync::RwLockWriteGuard<'a, Inner<T>>,
|
||||
permit: InitPermit,
|
||||
) -> GuardMut<'a, T> {
|
||||
fn set0(value: T, mut guard: tokio::sync::RwLockWriteGuard<'_, Inner<T>>) -> GuardMut<'_, T> {
|
||||
if guard.value.is_some() {
|
||||
drop(guard);
|
||||
unreachable!("we won permit, must not be initialized");
|
||||
}
|
||||
guard.value = Some(value);
|
||||
assert!(
|
||||
self.has_one_permit
|
||||
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
|
||||
.is_ok(),
|
||||
"should had only had one permit"
|
||||
);
|
||||
permit.forget();
|
||||
guard.init_semaphore.add_permits(u32::MAX as usize);
|
||||
|
||||
tracing::debug!("value initialized");
|
||||
guard.init_semaphore.close();
|
||||
GuardMut(guard)
|
||||
}
|
||||
|
||||
@@ -244,48 +199,6 @@ impl<T> OnceCell<T> {
|
||||
pub fn initializer_count(&self) -> usize {
|
||||
self.initializers.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Take the current value, and a new permit for it's deinitialization.
|
||||
///
|
||||
/// The permit will be on a semaphore part of the new internal value, and any following
|
||||
/// [`OnceCell::get_or_init`] will wait on it to complete.
|
||||
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
|
||||
pub async fn take_and_deinit(&self, mut guard: GuardMut<'_, T>) -> (T, InitPermit) {
|
||||
// guard exists => we have been initialized
|
||||
assert!(
|
||||
!self.has_one_permit.load(Ordering::Acquire),
|
||||
"has to have all permits after initializing"
|
||||
);
|
||||
assert!(guard.0.value.is_some(), "guard exists => initialized");
|
||||
|
||||
// we must first drain out all "waiting to initialize" stragglers
|
||||
tracing::trace!("draining other initializers");
|
||||
let all_permits = guard
|
||||
.0
|
||||
.init_semaphore
|
||||
.acquire_many(u32::MAX)
|
||||
.await
|
||||
.expect("never closed");
|
||||
all_permits.forget();
|
||||
tracing::debug!("other initializers drained");
|
||||
|
||||
assert_eq!(guard.0.init_semaphore.available_permits(), 0);
|
||||
|
||||
// now that the permits have been drained, switch the state
|
||||
assert!(
|
||||
self.has_one_permit
|
||||
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
|
||||
.is_ok(),
|
||||
"there should be only one GuardMut attempting take_and_deinit"
|
||||
);
|
||||
|
||||
let value = guard.0.value.take().unwrap();
|
||||
|
||||
// act of creating an init_permit is the same as "adding back one when this is dropped"
|
||||
let init_permit = InitPermit::from(guard.0.init_semaphore.clone());
|
||||
|
||||
(value, init_permit)
|
||||
}
|
||||
}
|
||||
|
||||
/// DropGuard counter for queued tasks waiting to initialize, mainly accessible for the
|
||||
@@ -331,6 +244,24 @@ impl<T> std::ops::DerefMut for GuardMut<'_, T> {
|
||||
}
|
||||
|
||||
impl<'a, T> GuardMut<'a, T> {
|
||||
/// Take the current value, and a new permit for it's deinitialization.
|
||||
///
|
||||
/// The permit will be on a semaphore part of the new internal value, and any following
|
||||
/// [`OnceCell::get_or_init`] will wait on it to complete.
|
||||
pub fn take_and_deinit(&mut self) -> (T, InitPermit) {
|
||||
let mut swapped = Inner::default();
|
||||
let permit = swapped
|
||||
.init_semaphore
|
||||
.clone()
|
||||
.try_acquire_owned()
|
||||
.expect("we just created this");
|
||||
std::mem::swap(&mut *self.0, &mut swapped);
|
||||
swapped
|
||||
.value
|
||||
.map(|v| (v, InitPermit(permit)))
|
||||
.expect("guard is not created unless value has been initialized")
|
||||
}
|
||||
|
||||
pub fn downgrade(self) -> GuardRef<'a, T> {
|
||||
GuardRef(self.0.downgrade())
|
||||
}
|
||||
@@ -351,39 +282,13 @@ impl<T> std::ops::Deref for GuardRef<'_, T> {
|
||||
}
|
||||
|
||||
/// Type held by OnceCell (de)initializing task.
|
||||
pub struct InitPermit(Option<Arc<tokio::sync::Semaphore>>);
|
||||
|
||||
impl From<Arc<tokio::sync::Semaphore>> for InitPermit {
|
||||
fn from(value: Arc<tokio::sync::Semaphore>) -> Self {
|
||||
InitPermit(Some(value))
|
||||
}
|
||||
}
|
||||
|
||||
impl InitPermit {
|
||||
fn forget(mut self) {
|
||||
self.0
|
||||
.take()
|
||||
.expect("unable to forget twice, created with None?");
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for InitPermit {
|
||||
fn drop(&mut self) {
|
||||
if let Some(sem) = self.0.take() {
|
||||
debug_assert_eq!(sem.available_permits(), 0);
|
||||
sem.add_permits(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
pub struct InitPermit(tokio::sync::OwnedSemaphorePermit);
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use futures::Future;
|
||||
|
||||
use super::*;
|
||||
use std::{
|
||||
convert::Infallible,
|
||||
pin::{pin, Pin},
|
||||
sync::atomic::{AtomicUsize, Ordering},
|
||||
time::Duration,
|
||||
};
|
||||
@@ -461,8 +366,11 @@ mod tests {
|
||||
let cell = cell.clone();
|
||||
let deinitialization_started = deinitialization_started.clone();
|
||||
async move {
|
||||
let guard = cell.get_mut().await.unwrap();
|
||||
let (answer, _permit) = cell.take_and_deinit(guard).await;
|
||||
let (answer, _permit) = cell
|
||||
.get_mut()
|
||||
.await
|
||||
.expect("initialized to value")
|
||||
.take_and_deinit();
|
||||
assert_eq!(answer, initial);
|
||||
|
||||
deinitialization_started.wait().await;
|
||||
@@ -491,31 +399,12 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn reinit_with_deinit_permit() {
|
||||
let cell = Arc::new(OnceCell::new(42));
|
||||
assert!(!cell.has_one_permit.load(Ordering::Acquire));
|
||||
assert_eq!(
|
||||
cell.inner.read().await.init_semaphore.available_permits(),
|
||||
u32::MAX as usize
|
||||
);
|
||||
|
||||
let guard = cell.get_mut().await.unwrap();
|
||||
assert!(!cell.has_one_permit.load(Ordering::Acquire));
|
||||
assert_eq!(
|
||||
guard.0.init_semaphore.available_permits(),
|
||||
u32::MAX as usize
|
||||
);
|
||||
|
||||
let (mol, permit) = cell.take_and_deinit(guard).await;
|
||||
assert!(cell.has_one_permit.load(Ordering::Acquire));
|
||||
assert_eq!(
|
||||
cell.inner.read().await.init_semaphore.available_permits(),
|
||||
0
|
||||
);
|
||||
|
||||
let (mol, permit) = cell.get_mut().await.unwrap().take_and_deinit();
|
||||
cell.set(5, permit).await;
|
||||
assert_eq!(*cell.get_mut().await.unwrap(), 5);
|
||||
|
||||
let guard = cell.get_mut().await.unwrap();
|
||||
let (five, permit) = cell.take_and_deinit(guard).await;
|
||||
let (five, permit) = cell.get_mut().await.unwrap().take_and_deinit();
|
||||
assert_eq!(5, five);
|
||||
cell.set(mol, permit).await;
|
||||
assert_eq!(*cell.get_mut().await.unwrap(), 42);
|
||||
@@ -566,110 +455,4 @@ mod tests {
|
||||
.unwrap();
|
||||
assert_eq!(*g, "now initialized");
|
||||
}
|
||||
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn reproduce_init_take_deinit_race_ref() {
|
||||
init_take_deinit_scenario(|cell, factory| {
|
||||
Box::pin(async {
|
||||
cell.get_or_init(factory).await.unwrap();
|
||||
})
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn reproduce_init_take_deinit_race_mut() {
|
||||
init_take_deinit_scenario(|cell, factory| {
|
||||
Box::pin(async {
|
||||
cell.get_mut_or_init(factory).await.unwrap();
|
||||
})
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
type BoxedInitFuture<T, E> = Pin<Box<dyn Future<Output = Result<(T, InitPermit), E>>>>;
|
||||
type BoxedInitFunction<T, E> = Box<dyn Fn(InitPermit) -> BoxedInitFuture<T, E>>;
|
||||
|
||||
/// Reproduce an assertion failure with both initialization methods.
|
||||
///
|
||||
/// This has interesting generics to be generic between `get_or_init` and `get_mut_or_init`.
|
||||
/// Alternative would be a macro_rules! but that is the last resort.
|
||||
async fn init_take_deinit_scenario<F>(init_way: F)
|
||||
where
|
||||
F: for<'a> Fn(
|
||||
&'a OnceCell<&'static str>,
|
||||
BoxedInitFunction<&'static str, Infallible>,
|
||||
) -> Pin<Box<dyn Future<Output = ()> + 'a>>,
|
||||
{
|
||||
use tracing::Instrument;
|
||||
|
||||
let cell = OnceCell::default();
|
||||
|
||||
// acquire the init_semaphore only permit to drive initializing tasks in order to waiting
|
||||
// on the same semaphore.
|
||||
let permit = cell
|
||||
.inner
|
||||
.read()
|
||||
.await
|
||||
.init_semaphore
|
||||
.clone()
|
||||
.try_acquire_owned()
|
||||
.unwrap();
|
||||
|
||||
let mut t1 = pin!(init_way(
|
||||
&cell,
|
||||
Box::new(|permit| Box::pin(async move { Ok(("t1", permit)) })),
|
||||
)
|
||||
.instrument(tracing::info_span!("t1")));
|
||||
|
||||
let mut t2 = pin!(init_way(
|
||||
&cell,
|
||||
Box::new(|permit| Box::pin(async move { Ok(("t2", permit)) })),
|
||||
)
|
||||
.instrument(tracing::info_span!("t2")));
|
||||
|
||||
// drive t2 first to the init_semaphore
|
||||
tokio::select! {
|
||||
_ = &mut t2 => unreachable!("it cannot get permit"),
|
||||
_ = tokio::time::sleep(Duration::from_secs(3600 * 24 * 7 * 365)) => {}
|
||||
}
|
||||
|
||||
// followed by t1 in the init_semaphore
|
||||
tokio::select! {
|
||||
_ = &mut t1 => unreachable!("it cannot get permit"),
|
||||
_ = tokio::time::sleep(Duration::from_secs(3600 * 24 * 7 * 365)) => {}
|
||||
}
|
||||
|
||||
// now let t2 proceed and initialize
|
||||
drop(permit);
|
||||
t2.await;
|
||||
|
||||
// in original implementation which did closing and re-creation of the semaphore, t1 was
|
||||
// still stuck on the first semaphore, but now that t1 and deinit are using the same
|
||||
// semaphore, deinit will have to wait for t1.
|
||||
let mut deinit = pin!(async {
|
||||
let guard = cell.get_mut().await.unwrap();
|
||||
cell.take_and_deinit(guard).await
|
||||
}
|
||||
.instrument(tracing::info_span!("deinit")));
|
||||
|
||||
tokio::select! {
|
||||
_ = &mut deinit => unreachable!("deinit must not make progress before t1 is complete"),
|
||||
_ = tokio::time::sleep(Duration::from_secs(3600 * 24 * 7 * 365)) => {}
|
||||
}
|
||||
|
||||
// now originally t1 would see the semaphore it has as closed. it cannot yet get a permit from
|
||||
// the new one.
|
||||
tokio::select! {
|
||||
_ = &mut t1 => unreachable!("it cannot get permit"),
|
||||
_ = tokio::time::sleep(Duration::from_secs(3600 * 24 * 7 * 365)) => {}
|
||||
}
|
||||
|
||||
let (s, _) = deinit.await;
|
||||
assert_eq!("t2", s);
|
||||
|
||||
t1.await;
|
||||
|
||||
assert_eq!("t1", *cell.get().await.unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -278,7 +278,7 @@ pub struct Tenant {
|
||||
// with timelines, which in turn may cause dropping replication connection, expiration of wait_for_lsn
|
||||
// timeout...
|
||||
gc_cs: tokio::sync::Mutex<()>,
|
||||
walredo_mgr: Arc<WalRedoManager>,
|
||||
walredo_mgr: Option<Arc<WalRedoManager>>,
|
||||
|
||||
// provides access to timeline data sitting in the remote storage
|
||||
pub(crate) remote_storage: Option<GenericRemoteStorage>,
|
||||
@@ -635,7 +635,7 @@ impl Tenant {
|
||||
conf,
|
||||
attached_conf,
|
||||
shard_identity,
|
||||
wal_redo_manager,
|
||||
Some(wal_redo_manager),
|
||||
tenant_shard_id,
|
||||
remote_storage.clone(),
|
||||
deletion_queue_client,
|
||||
@@ -1195,10 +1195,6 @@ impl Tenant {
|
||||
tenant_shard_id: TenantShardId,
|
||||
reason: String,
|
||||
) -> Arc<Tenant> {
|
||||
let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
|
||||
conf,
|
||||
tenant_shard_id,
|
||||
)));
|
||||
Arc::new(Tenant::new(
|
||||
TenantState::Broken {
|
||||
reason,
|
||||
@@ -1209,7 +1205,7 @@ impl Tenant {
|
||||
// Shard identity isn't meaningful for a broken tenant: it's just a placeholder
|
||||
// to occupy the slot for this TenantShardId.
|
||||
ShardIdentity::broken(tenant_shard_id.shard_number, tenant_shard_id.shard_count),
|
||||
wal_redo_manager,
|
||||
None,
|
||||
tenant_shard_id,
|
||||
None,
|
||||
DeletionQueueClient::broken(),
|
||||
@@ -1978,7 +1974,7 @@ impl Tenant {
|
||||
}
|
||||
|
||||
pub(crate) fn wal_redo_manager_status(&self) -> Option<WalRedoManagerStatus> {
|
||||
self.walredo_mgr.status()
|
||||
self.walredo_mgr.as_ref().and_then(|mgr| mgr.status())
|
||||
}
|
||||
|
||||
/// Changes tenant status to active, unless shutdown was already requested.
|
||||
@@ -2613,7 +2609,7 @@ impl Tenant {
|
||||
self.tenant_shard_id,
|
||||
self.generation,
|
||||
self.shard_identity,
|
||||
Arc::clone(&self.walredo_mgr),
|
||||
self.walredo_mgr.as_ref().map(Arc::clone),
|
||||
resources,
|
||||
pg_version,
|
||||
state,
|
||||
@@ -2631,7 +2627,7 @@ impl Tenant {
|
||||
conf: &'static PageServerConf,
|
||||
attached_conf: AttachedTenantConf,
|
||||
shard_identity: ShardIdentity,
|
||||
walredo_mgr: Arc<WalRedoManager>,
|
||||
walredo_mgr: Option<Arc<WalRedoManager>>,
|
||||
tenant_shard_id: TenantShardId,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
deletion_queue_client: DeletionQueueClient,
|
||||
@@ -4055,7 +4051,7 @@ pub(crate) mod harness {
|
||||
.unwrap(),
|
||||
// This is a legacy/test code path: sharding isn't supported here.
|
||||
ShardIdentity::unsharded(),
|
||||
walredo_mgr,
|
||||
Some(walredo_mgr),
|
||||
self.tenant_shard_id,
|
||||
Some(self.remote_storage.clone()),
|
||||
self.deletion_queue.new_client(),
|
||||
|
||||
@@ -199,7 +199,9 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
|
||||
// Perhaps we did no work and the walredo process has been idle for some time:
|
||||
// give it a chance to shut down to avoid leaving walredo process running indefinitely.
|
||||
tenant.walredo_mgr.maybe_quiesce(period * 10);
|
||||
if let Some(walredo_mgr) = &tenant.walredo_mgr {
|
||||
walredo_mgr.maybe_quiesce(period * 10);
|
||||
}
|
||||
|
||||
// Sleep
|
||||
if tokio::time::timeout(sleep_duration, cancel.cancelled())
|
||||
|
||||
@@ -215,8 +215,8 @@ pub struct Timeline {
|
||||
// Atomic would be more appropriate here.
|
||||
last_freeze_ts: RwLock<Instant>,
|
||||
|
||||
// WAL redo manager
|
||||
walredo_mgr: Arc<super::WalRedoManager>,
|
||||
// WAL redo manager. `None` only for broken tenants.
|
||||
walredo_mgr: Option<Arc<super::WalRedoManager>>,
|
||||
|
||||
/// Remote storage client.
|
||||
/// See [`remote_timeline_client`](super::remote_timeline_client) module comment for details.
|
||||
@@ -1427,7 +1427,7 @@ impl Timeline {
|
||||
tenant_shard_id: TenantShardId,
|
||||
generation: Generation,
|
||||
shard_identity: ShardIdentity,
|
||||
walredo_mgr: Arc<super::WalRedoManager>,
|
||||
walredo_mgr: Option<Arc<super::WalRedoManager>>,
|
||||
resources: TimelineResources,
|
||||
pg_version: u32,
|
||||
state: TimelineState,
|
||||
@@ -4457,6 +4457,9 @@ impl Timeline {
|
||||
|
||||
let img = match self
|
||||
.walredo_mgr
|
||||
.as_ref()
|
||||
.context("timeline has no walredo manager")
|
||||
.map_err(PageReconstructError::WalRedo)?
|
||||
.request_redo(key, request_lsn, data.img, data.records, self.pg_version)
|
||||
.await
|
||||
.context("reconstruct a page image")
|
||||
|
||||
@@ -343,6 +343,23 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
modification.commit(&ctx).await?;
|
||||
uncommitted_records = 0;
|
||||
filtered_records = 0;
|
||||
|
||||
//
|
||||
// We should check checkpoint distance after appending each ingest_batch_size bytes because otherwise
|
||||
// layer size can become much larger than `checkpoint_distance`.
|
||||
// It can append because wal-sender is sending WAL using 125kb chucks and some WAL records can cause writing large
|
||||
// amount of data to key-value storage. So performing this check only after processing
|
||||
// all WAL records in the chunk, can cause huge L0 layer files.
|
||||
//
|
||||
timeline
|
||||
.check_checkpoint_distance()
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to check checkpoint distance for timeline {}",
|
||||
timeline.timeline_id
|
||||
)
|
||||
})?;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -20,7 +20,7 @@ BENCHMARKS_DURATION_QUERY = """
|
||||
FROM results
|
||||
WHERE
|
||||
started_at > CURRENT_DATE - INTERVAL '%s' day
|
||||
AND parent_suite = 'test_runner.performance'
|
||||
AND starts_with(parent_suite, 'test_runner.performance')
|
||||
AND status = 'passed'
|
||||
GROUP BY
|
||||
parent_suite, suite, name
|
||||
@@ -31,68 +31,75 @@ BENCHMARKS_DURATION_QUERY = """
|
||||
# the total duration varies from 8 to 40 minutes.
|
||||
# We use some pre-collected durations as a fallback to have a better distribution.
|
||||
FALLBACK_DURATION = {
|
||||
"test_runner/performance/test_branch_creation.py::test_branch_creation_heavy_write[20]": 62.144,
|
||||
"test_runner/performance/test_branch_creation.py::test_branch_creation_many[1024]": 90.941,
|
||||
"test_runner/performance/test_branch_creation.py::test_branch_creation_many_relations": 26.053,
|
||||
"test_runner/performance/test_branching.py::test_compare_child_and_root_pgbench_perf": 25.67,
|
||||
"test_runner/performance/test_branching.py::test_compare_child_and_root_read_perf": 14.497,
|
||||
"test_runner/performance/test_branching.py::test_compare_child_and_root_write_perf": 18.852,
|
||||
"test_runner/performance/test_bulk_insert.py::test_bulk_insert[neon]": 26.572,
|
||||
"test_runner/performance/test_bulk_insert.py::test_bulk_insert[vanilla]": 6.259,
|
||||
"test_runner/performance/test_bulk_tenant_create.py::test_bulk_tenant_create[10]": 21.206,
|
||||
"test_runner/performance/test_bulk_tenant_create.py::test_bulk_tenant_create[1]": 3.474,
|
||||
"test_runner/performance/test_bulk_tenant_create.py::test_bulk_tenant_create[5]": 11.262,
|
||||
"test_runner/performance/test_bulk_update.py::test_bulk_update[100]": 94.225,
|
||||
"test_runner/performance/test_bulk_update.py::test_bulk_update[10]": 68.159,
|
||||
"test_runner/performance/test_bulk_update.py::test_bulk_update[50]": 76.719,
|
||||
"test_runner/performance/test_compaction.py::test_compaction": 110.222,
|
||||
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_ro_with_pgbench_select_only[neon-5-10-100]": 10.743,
|
||||
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_ro_with_pgbench_select_only[vanilla-5-10-100]": 16.541,
|
||||
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_rw_with_pgbench_default[neon-5-10-100]": 11.109,
|
||||
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_rw_with_pgbench_default[vanilla-5-10-100]": 18.121,
|
||||
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wal_with_pgbench_default[neon-5-10-100]": 11.3,
|
||||
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wal_with_pgbench_default[vanilla-5-10-100]": 16.086,
|
||||
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wo_with_heavy_write[neon-10-10]": 12.024,
|
||||
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wo_with_heavy_write[neon-10-1]": 11.14,
|
||||
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wo_with_heavy_write[vanilla-10-10]": 10.375,
|
||||
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wo_with_heavy_write[vanilla-10-1]": 10.075,
|
||||
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wo_with_pgbench_simple_update[neon-5-10-100]": 11.147,
|
||||
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wo_with_pgbench_simple_update[vanilla-5-10-100]": 16.321,
|
||||
"test_runner/performance/test_copy.py::test_copy[neon]": 16.579,
|
||||
"test_runner/performance/test_copy.py::test_copy[vanilla]": 10.094,
|
||||
"test_runner/performance/test_gc_feedback.py::test_gc_feedback": 590.157,
|
||||
"test_runner/performance/test_gist_build.py::test_gist_buffering_build[neon]": 14.102,
|
||||
"test_runner/performance/test_gist_build.py::test_gist_buffering_build[vanilla]": 8.677,
|
||||
"test_runner/performance/test_latency.py::test_measure_read_latency_heavy_write_workload[neon-1]": 31.079,
|
||||
"test_runner/performance/test_latency.py::test_measure_read_latency_heavy_write_workload[vanilla-1]": 38.119,
|
||||
"test_runner/performance/test_layer_map.py::test_layer_map": 24.784,
|
||||
"test_runner/performance/test_logical_replication.py::test_logical_replication": 117.707,
|
||||
"test_runner/performance/test_parallel_copy_to.py::test_parallel_copy_different_tables[neon]": 21.194,
|
||||
"test_runner/performance/test_parallel_copy_to.py::test_parallel_copy_different_tables[vanilla]": 59.068,
|
||||
"test_runner/performance/test_parallel_copy_to.py::test_parallel_copy_same_table[neon]": 73.235,
|
||||
"test_runner/performance/test_parallel_copy_to.py::test_parallel_copy_same_table[vanilla]": 82.586,
|
||||
"test_runner/performance/test_perf_pgbench.py::test_pgbench[neon-45-10]": 106.536,
|
||||
"test_runner/performance/test_perf_pgbench.py::test_pgbench[vanilla-45-10]": 98.753,
|
||||
"test_runner/performance/test_random_writes.py::test_random_writes[neon]": 6.975,
|
||||
"test_runner/performance/test_random_writes.py::test_random_writes[vanilla]": 3.69,
|
||||
"test_runner/performance/test_seqscans.py::test_seqscans[neon-100000-100-0]": 3.529,
|
||||
"test_runner/performance/test_seqscans.py::test_seqscans[neon-10000000-1-0]": 64.522,
|
||||
"test_runner/performance/test_seqscans.py::test_seqscans[neon-10000000-1-4]": 40.964,
|
||||
"test_runner/performance/test_seqscans.py::test_seqscans[vanilla-100000-100-0]": 0.55,
|
||||
"test_runner/performance/test_seqscans.py::test_seqscans[vanilla-10000000-1-0]": 12.189,
|
||||
"test_runner/performance/test_seqscans.py::test_seqscans[vanilla-10000000-1-4]": 13.899,
|
||||
"test_runner/performance/test_startup.py::test_startup_simple": 2.51,
|
||||
"test_runner/performance/test_wal_backpressure.py::test_heavy_write_workload[neon_off-10-5-5]": 527.245,
|
||||
"test_runner/performance/test_wal_backpressure.py::test_heavy_write_workload[neon_on-10-5-5]": 583.46,
|
||||
"test_runner/performance/test_wal_backpressure.py::test_heavy_write_workload[vanilla-10-5-5]": 113.653,
|
||||
"test_runner/performance/test_wal_backpressure.py::test_pgbench_intensive_init_workload[neon_off-1000]": 233.728,
|
||||
"test_runner/performance/test_wal_backpressure.py::test_pgbench_intensive_init_workload[neon_on-1000]": 419.093,
|
||||
"test_runner/performance/test_wal_backpressure.py::test_pgbench_intensive_init_workload[vanilla-1000]": 982.461,
|
||||
"test_runner/performance/test_wal_backpressure.py::test_pgbench_simple_update_workload[neon_off-45-100]": 116.522,
|
||||
"test_runner/performance/test_wal_backpressure.py::test_pgbench_simple_update_workload[neon_on-45-100]": 115.583,
|
||||
"test_runner/performance/test_wal_backpressure.py::test_pgbench_simple_update_workload[vanilla-45-100]": 155.282,
|
||||
"test_runner/performance/test_write_amplification.py::test_write_amplification[neon]": 26.704,
|
||||
"test_runner/performance/test_write_amplification.py::test_write_amplification[vanilla]": 16.088,
|
||||
"test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py::test_pageserver_max_throughput_getpage_at_latest_lsn[1-13-30]": 400.15,
|
||||
"test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py::test_pageserver_max_throughput_getpage_at_latest_lsn[1-6-30]": 372.521,
|
||||
"test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py::test_pageserver_max_throughput_getpage_at_latest_lsn[10-13-30]": 420.017,
|
||||
"test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py::test_pageserver_max_throughput_getpage_at_latest_lsn[10-6-30]": 373.769,
|
||||
"test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py::test_pageserver_max_throughput_getpage_at_latest_lsn[100-13-30]": 678.742,
|
||||
"test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py::test_pageserver_max_throughput_getpage_at_latest_lsn[100-6-30]": 512.135,
|
||||
"test_runner/performance/test_branch_creation.py::test_branch_creation_heavy_write[20]": 58.036,
|
||||
"test_runner/performance/test_branch_creation.py::test_branch_creation_many_relations": 22.104,
|
||||
"test_runner/performance/test_branch_creation.py::test_branch_creation_many[1024]": 126.073,
|
||||
"test_runner/performance/test_branching.py::test_compare_child_and_root_pgbench_perf": 25.759,
|
||||
"test_runner/performance/test_branching.py::test_compare_child_and_root_read_perf": 6.885,
|
||||
"test_runner/performance/test_branching.py::test_compare_child_and_root_write_perf": 8.758,
|
||||
"test_runner/performance/test_bulk_insert.py::test_bulk_insert[neon]": 18.275,
|
||||
"test_runner/performance/test_bulk_insert.py::test_bulk_insert[vanilla]": 9.533,
|
||||
"test_runner/performance/test_bulk_tenant_create.py::test_bulk_tenant_create[1]": 12.09,
|
||||
"test_runner/performance/test_bulk_tenant_create.py::test_bulk_tenant_create[10]": 35.145,
|
||||
"test_runner/performance/test_bulk_tenant_create.py::test_bulk_tenant_create[5]": 22.28,
|
||||
"test_runner/performance/test_bulk_update.py::test_bulk_update[10]": 66.353,
|
||||
"test_runner/performance/test_bulk_update.py::test_bulk_update[100]": 75.487,
|
||||
"test_runner/performance/test_bulk_update.py::test_bulk_update[50]": 54.142,
|
||||
"test_runner/performance/test_compaction.py::test_compaction": 110.715,
|
||||
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_ro_with_pgbench_select_only[neon-5-10-100]": 11.68,
|
||||
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_ro_with_pgbench_select_only[vanilla-5-10-100]": 16.384,
|
||||
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_rw_with_pgbench_default[neon-5-10-100]": 11.315,
|
||||
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_rw_with_pgbench_default[vanilla-5-10-100]": 18.783,
|
||||
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wal_with_pgbench_default[neon-5-10-100]": 11.647,
|
||||
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wal_with_pgbench_default[vanilla-5-10-100]": 17.04,
|
||||
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wo_with_heavy_write[neon-10-1]": 11.01,
|
||||
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wo_with_heavy_write[neon-10-10]": 11.902,
|
||||
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wo_with_heavy_write[vanilla-10-1]": 10.077,
|
||||
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wo_with_heavy_write[vanilla-10-10]": 10.4,
|
||||
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wo_with_pgbench_simple_update[neon-5-10-100]": 11.33,
|
||||
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wo_with_pgbench_simple_update[vanilla-5-10-100]": 16.434,
|
||||
"test_runner/performance/test_copy.py::test_copy[neon]": 13.817,
|
||||
"test_runner/performance/test_copy.py::test_copy[vanilla]": 11.736,
|
||||
"test_runner/performance/test_gc_feedback.py::test_gc_feedback": 575.735,
|
||||
"test_runner/performance/test_gist_build.py::test_gist_buffering_build[neon]": 14.868,
|
||||
"test_runner/performance/test_gist_build.py::test_gist_buffering_build[vanilla]": 14.393,
|
||||
"test_runner/performance/test_latency.py::test_measure_read_latency_heavy_write_workload[neon-1]": 20.588,
|
||||
"test_runner/performance/test_latency.py::test_measure_read_latency_heavy_write_workload[vanilla-1]": 30.849,
|
||||
"test_runner/performance/test_layer_map.py::test_layer_map": 39.378,
|
||||
"test_runner/performance/test_lazy_startup.py::test_lazy_startup": 2848.938,
|
||||
"test_runner/performance/test_logical_replication.py::test_logical_replication": 120.952,
|
||||
"test_runner/performance/test_parallel_copy_to.py::test_parallel_copy_different_tables[neon]": 35.552,
|
||||
"test_runner/performance/test_parallel_copy_to.py::test_parallel_copy_different_tables[vanilla]": 66.762,
|
||||
"test_runner/performance/test_parallel_copy_to.py::test_parallel_copy_same_table[neon]": 85.177,
|
||||
"test_runner/performance/test_parallel_copy_to.py::test_parallel_copy_same_table[vanilla]": 92.12,
|
||||
"test_runner/performance/test_perf_pgbench.py::test_pgbench[neon-45-10]": 107.009,
|
||||
"test_runner/performance/test_perf_pgbench.py::test_pgbench[vanilla-45-10]": 99.582,
|
||||
"test_runner/performance/test_random_writes.py::test_random_writes[neon]": 4.737,
|
||||
"test_runner/performance/test_random_writes.py::test_random_writes[vanilla]": 2.686,
|
||||
"test_runner/performance/test_seqscans.py::test_seqscans[neon-100000-100-0]": 3.271,
|
||||
"test_runner/performance/test_seqscans.py::test_seqscans[neon-10000000-1-0]": 50.719,
|
||||
"test_runner/performance/test_seqscans.py::test_seqscans[neon-10000000-1-4]": 15.992,
|
||||
"test_runner/performance/test_seqscans.py::test_seqscans[vanilla-100000-100-0]": 0.566,
|
||||
"test_runner/performance/test_seqscans.py::test_seqscans[vanilla-10000000-1-0]": 13.542,
|
||||
"test_runner/performance/test_seqscans.py::test_seqscans[vanilla-10000000-1-4]": 13.35,
|
||||
"test_runner/performance/test_startup.py::test_startup_simple": 13.043,
|
||||
"test_runner/performance/test_wal_backpressure.py::test_heavy_write_workload[neon_off-10-5-5]": 194.841,
|
||||
"test_runner/performance/test_wal_backpressure.py::test_heavy_write_workload[neon_on-10-5-5]": 286.667,
|
||||
"test_runner/performance/test_wal_backpressure.py::test_heavy_write_workload[vanilla-10-5-5]": 85.577,
|
||||
"test_runner/performance/test_wal_backpressure.py::test_pgbench_intensive_init_workload[neon_off-1000]": 297.626,
|
||||
"test_runner/performance/test_wal_backpressure.py::test_pgbench_intensive_init_workload[neon_on-1000]": 646.187,
|
||||
"test_runner/performance/test_wal_backpressure.py::test_pgbench_intensive_init_workload[vanilla-1000]": 989.776,
|
||||
"test_runner/performance/test_wal_backpressure.py::test_pgbench_simple_update_workload[neon_off-45-100]": 125.638,
|
||||
"test_runner/performance/test_wal_backpressure.py::test_pgbench_simple_update_workload[neon_on-45-100]": 123.554,
|
||||
"test_runner/performance/test_wal_backpressure.py::test_pgbench_simple_update_workload[vanilla-45-100]": 190.083,
|
||||
"test_runner/performance/test_write_amplification.py::test_write_amplification[neon]": 21.016,
|
||||
"test_runner/performance/test_write_amplification.py::test_write_amplification[vanilla]": 23.028,
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -563,13 +563,13 @@ class PageserverHttpClient(requests.Session):
|
||||
self,
|
||||
tenant_id: Union[TenantId, TenantShardId],
|
||||
timeline_id: TimelineId,
|
||||
timestamp,
|
||||
timestamp: datetime,
|
||||
):
|
||||
log.info(
|
||||
f"Requesting lsn by timestamp {timestamp}, tenant {tenant_id}, timeline {timeline_id}"
|
||||
)
|
||||
res = self.get(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp?timestamp={timestamp}",
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp?timestamp={timestamp.isoformat()}Z",
|
||||
)
|
||||
self.verbose_error(res)
|
||||
res_json = res.json()
|
||||
|
||||
@@ -26,86 +26,81 @@ from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
# apply during config step, like more users, databases, or extensions. By default
|
||||
# we load extensions 'neon,pg_stat_statements,timescaledb,pg_cron', but in this
|
||||
# test we only load neon.
|
||||
@pytest.mark.timeout(1000)
|
||||
def test_lazy_startup(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):
|
||||
@pytest.mark.timeout(1800)
|
||||
@pytest.mark.parametrize("slru", ["lazy", "eager"])
|
||||
def test_lazy_startup(slru: str, neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
lazy_tenant, _ = env.neon_cli.create_tenant(
|
||||
lazy_slru_download = "true" if slru == "lazy" else "false"
|
||||
tenant, _ = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
"lazy_slru_download": "true",
|
||||
"lazy_slru_download": lazy_slru_download,
|
||||
}
|
||||
)
|
||||
eager_tenant, _ = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
"lazy_slru_download": "false",
|
||||
}
|
||||
)
|
||||
tenants = [lazy_tenant, eager_tenant]
|
||||
slru = "lazy"
|
||||
for tenant in tenants:
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant)
|
||||
endpoint.safe_psql("CREATE TABLE t (pk integer PRIMARY KEY, x integer)")
|
||||
endpoint.safe_psql("ALTER TABLE t SET (autovacuum_enabled = false)")
|
||||
endpoint.safe_psql("INSERT INTO t VALUES (1, 0)")
|
||||
endpoint.safe_psql(
|
||||
"""
|
||||
CREATE PROCEDURE updating() as
|
||||
$$
|
||||
DECLARE
|
||||
i integer;
|
||||
BEGIN
|
||||
FOR i IN 1..10000000 LOOP
|
||||
UPDATE t SET x = x + 1 WHERE pk=1;
|
||||
COMMIT;
|
||||
END LOOP;
|
||||
END
|
||||
$$ LANGUAGE plpgsql
|
||||
"""
|
||||
)
|
||||
endpoint.safe_psql("SET statement_timeout=0")
|
||||
endpoint.safe_psql("call updating()")
|
||||
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant)
|
||||
with endpoint.cursor() as cur:
|
||||
cur.execute("CREATE TABLE t (pk integer PRIMARY KEY, x integer)")
|
||||
cur.execute("ALTER TABLE t SET (autovacuum_enabled = false)")
|
||||
cur.execute("INSERT INTO t VALUES (1, 0)")
|
||||
cur.execute(
|
||||
"""
|
||||
CREATE PROCEDURE updating() as
|
||||
$$
|
||||
DECLARE
|
||||
i integer;
|
||||
BEGIN
|
||||
FOR i IN 1..1000000 LOOP
|
||||
UPDATE t SET x = x + 1 WHERE pk=1;
|
||||
COMMIT;
|
||||
END LOOP;
|
||||
END
|
||||
$$ LANGUAGE plpgsql
|
||||
"""
|
||||
)
|
||||
cur.execute("SET statement_timeout=0")
|
||||
cur.execute("call updating()")
|
||||
|
||||
endpoint.stop()
|
||||
|
||||
# We do two iterations so we can see if the second startup is faster. It should
|
||||
# be because the compute node should already be configured with roles, databases,
|
||||
# extensions, etc from the first run.
|
||||
for i in range(2):
|
||||
# Start
|
||||
with zenbenchmark.record_duration(f"{slru}_{i}_start"):
|
||||
endpoint.start()
|
||||
|
||||
with zenbenchmark.record_duration(f"{slru}_{i}_select"):
|
||||
sum = endpoint.safe_psql("select sum(x) from t")[0][0]
|
||||
assert sum == 1000000
|
||||
|
||||
# Get metrics
|
||||
metrics = requests.get(f"http://localhost:{endpoint.http_port}/metrics.json").json()
|
||||
durations = {
|
||||
"wait_for_spec_ms": f"{slru}_{i}_wait_for_spec",
|
||||
"sync_safekeepers_ms": f"{slru}_{i}_sync_safekeepers",
|
||||
"sync_sk_check_ms": f"{slru}_{i}_sync_sk_check",
|
||||
"basebackup_ms": f"{slru}_{i}_basebackup",
|
||||
"start_postgres_ms": f"{slru}_{i}_start_postgres",
|
||||
"config_ms": f"{slru}_{i}_config",
|
||||
"total_startup_ms": f"{slru}_{i}_total_startup",
|
||||
}
|
||||
for key, name in durations.items():
|
||||
value = metrics[key]
|
||||
zenbenchmark.record(name, value, "ms", report=MetricReport.LOWER_IS_BETTER)
|
||||
|
||||
basebackup_bytes = metrics["basebackup_bytes"]
|
||||
zenbenchmark.record(
|
||||
f"{slru}_{i}_basebackup_bytes",
|
||||
basebackup_bytes,
|
||||
"bytes",
|
||||
report=MetricReport.LOWER_IS_BETTER,
|
||||
)
|
||||
|
||||
# Stop so we can restart
|
||||
endpoint.stop()
|
||||
|
||||
# We do two iterations so we can see if the second startup is faster. It should
|
||||
# be because the compute node should already be configured with roles, databases,
|
||||
# extensions, etc from the first run.
|
||||
for i in range(2):
|
||||
# Start
|
||||
with zenbenchmark.record_duration(f"{slru}_{i}_start"):
|
||||
endpoint.start()
|
||||
|
||||
with zenbenchmark.record_duration(f"{slru}_{i}_select"):
|
||||
sum = endpoint.safe_psql("select sum(x) from t")[0][0]
|
||||
assert sum == 10000000
|
||||
|
||||
# Get metrics
|
||||
metrics = requests.get(f"http://localhost:{endpoint.http_port}/metrics.json").json()
|
||||
durations = {
|
||||
"wait_for_spec_ms": f"{slru}_{i}_wait_for_spec",
|
||||
"sync_safekeepers_ms": f"{slru}_{i}_sync_safekeepers",
|
||||
"sync_sk_check_ms": f"{slru}_{i}_sync_sk_check",
|
||||
"basebackup_ms": f"{slru}_{i}_basebackup",
|
||||
"start_postgres_ms": f"{slru}_{i}_start_postgres",
|
||||
"config_ms": f"{slru}_{i}_config",
|
||||
"total_startup_ms": f"{slru}_{i}_total_startup",
|
||||
}
|
||||
for key, name in durations.items():
|
||||
value = metrics[key]
|
||||
zenbenchmark.record(name, value, "ms", report=MetricReport.LOWER_IS_BETTER)
|
||||
|
||||
basebackup_bytes = metrics["basebackup_bytes"]
|
||||
zenbenchmark.record(
|
||||
f"{slru}_{i}_basebackup_bytes",
|
||||
basebackup_bytes,
|
||||
"bytes",
|
||||
report=MetricReport.LOWER_IS_BETTER,
|
||||
)
|
||||
|
||||
# Stop so we can restart
|
||||
endpoint.stop()
|
||||
|
||||
# Imitate optimizations that console would do for the second start
|
||||
endpoint.respec(skip_pg_catalog_updates=True)
|
||||
slru = "eager"
|
||||
# Imitate optimizations that console would do for the second start
|
||||
endpoint.respec(skip_pg_catalog_updates=True)
|
||||
|
||||
66
test_runner/regress/test_layer_bloating.py
Normal file
66
test_runner/regress/test_layer_bloating.py
Normal file
@@ -0,0 +1,66 @@
|
||||
import os
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
logical_replication_sync,
|
||||
)
|
||||
from fixtures.pg_version import PgVersion
|
||||
|
||||
|
||||
def test_layer_bloating(neon_simple_env: NeonEnv, vanilla_pg):
|
||||
env = neon_simple_env
|
||||
|
||||
if env.pg_version != PgVersion.V16:
|
||||
pytest.skip("pg_log_standby_snapshot() function is available only in PG16")
|
||||
|
||||
timeline = env.neon_cli.create_branch("test_logical_replication", "empty")
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_logical_replication", config_lines=["log_statement=all"]
|
||||
)
|
||||
|
||||
log.info("postgres is running on 'test_logical_replication' branch")
|
||||
pg_conn = endpoint.connect()
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
# create table...
|
||||
cur.execute("create table t(pk integer primary key)")
|
||||
cur.execute("create publication pub1 for table t")
|
||||
# Create slot to hold WAL
|
||||
cur.execute("select pg_create_logical_replication_slot('my_slot', 'pgoutput')")
|
||||
|
||||
# now start subscriber
|
||||
vanilla_pg.start()
|
||||
vanilla_pg.safe_psql("create table t(pk integer primary key)")
|
||||
|
||||
connstr = endpoint.connstr().replace("'", "''")
|
||||
log.info(f"ep connstr is {endpoint.connstr()}, subscriber connstr {vanilla_pg.connstr()}")
|
||||
vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1")
|
||||
|
||||
cur.execute(
|
||||
"""create or replace function create_snapshots(n integer) returns void as $$
|
||||
declare
|
||||
i integer;
|
||||
begin
|
||||
for i in 1..n loop
|
||||
perform pg_log_standby_snapshot();
|
||||
end loop;
|
||||
end; $$ language plpgsql"""
|
||||
)
|
||||
cur.execute("set statement_timeout=0")
|
||||
cur.execute("select create_snapshots(10000)")
|
||||
# Wait logical replication to sync
|
||||
logical_replication_sync(vanilla_pg, endpoint)
|
||||
time.sleep(10)
|
||||
|
||||
# Check layer file sizes
|
||||
timeline_path = "{}/tenants/{}/timelines/{}/".format(
|
||||
env.pageserver.workdir, env.initial_tenant, timeline
|
||||
)
|
||||
log.info(f"Check {timeline_path}")
|
||||
for filename in os.listdir(timeline_path):
|
||||
if filename.startswith("00000"):
|
||||
log.info(f"layer {filename} size is {os.path.getsize(timeline_path + filename)}")
|
||||
assert os.path.getsize(timeline_path + filename) < 512_000_000
|
||||
@@ -64,18 +64,14 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
|
||||
# Check edge cases
|
||||
# Timestamp is in the future
|
||||
probe_timestamp = tbl[-1][1] + timedelta(hours=1)
|
||||
result = client.timeline_get_lsn_by_timestamp(
|
||||
tenant_id, timeline_id, f"{probe_timestamp.isoformat()}Z"
|
||||
)
|
||||
result = client.timeline_get_lsn_by_timestamp(tenant_id, timeline_id, probe_timestamp)
|
||||
assert result["kind"] == "future"
|
||||
# make sure that we return a well advanced lsn here
|
||||
assert Lsn(result["lsn"]) > start_lsn
|
||||
|
||||
# Timestamp is in the unreachable past
|
||||
probe_timestamp = tbl[0][1] - timedelta(hours=10)
|
||||
result = client.timeline_get_lsn_by_timestamp(
|
||||
tenant_id, timeline_id, f"{probe_timestamp.isoformat()}Z"
|
||||
)
|
||||
result = client.timeline_get_lsn_by_timestamp(tenant_id, timeline_id, probe_timestamp)
|
||||
assert result["kind"] == "past"
|
||||
# make sure that we return the minimum lsn here at the start of the range
|
||||
assert Lsn(result["lsn"]) < start_lsn
|
||||
@@ -83,9 +79,7 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
|
||||
# Probe a bunch of timestamps in the valid range
|
||||
for i in range(1, len(tbl), 100):
|
||||
probe_timestamp = tbl[i][1]
|
||||
result = client.timeline_get_lsn_by_timestamp(
|
||||
tenant_id, timeline_id, f"{probe_timestamp.isoformat()}Z"
|
||||
)
|
||||
result = client.timeline_get_lsn_by_timestamp(tenant_id, timeline_id, probe_timestamp)
|
||||
assert result["kind"] not in ["past", "nodata"]
|
||||
lsn = result["lsn"]
|
||||
# Call get_lsn_by_timestamp to get the LSN
|
||||
@@ -108,9 +102,7 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# Timestamp is in the unreachable past
|
||||
probe_timestamp = tbl[0][1] - timedelta(hours=10)
|
||||
result = client.timeline_get_lsn_by_timestamp(
|
||||
tenant_id, timeline_id_child, f"{probe_timestamp.isoformat()}Z"
|
||||
)
|
||||
result = client.timeline_get_lsn_by_timestamp(tenant_id, timeline_id_child, probe_timestamp)
|
||||
assert result["kind"] == "past"
|
||||
# make sure that we return the minimum lsn here at the start of the range
|
||||
assert Lsn(result["lsn"]) >= last_flush_lsn
|
||||
|
||||
@@ -310,7 +310,7 @@ def test_sharding_service_compute_hook(
|
||||
notifications.append(request.json)
|
||||
return Response(status=200)
|
||||
|
||||
httpserver.expect_request("/notify", method="POST").respond_with_handler(handler)
|
||||
httpserver.expect_request("/notify", method="PUT").respond_with_handler(handler)
|
||||
|
||||
# Start running
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
@@ -29,7 +29,7 @@ chrono = { version = "0.4", default-features = false, features = ["clock", "serd
|
||||
clap = { version = "4", features = ["derive", "string"] }
|
||||
clap_builder = { version = "4", default-features = false, features = ["color", "help", "std", "string", "suggestions", "usage"] }
|
||||
crossbeam-utils = { version = "0.8" }
|
||||
diesel = { version = "2", features = ["postgres", "serde_json"] }
|
||||
diesel = { version = "2", features = ["postgres", "r2d2", "serde_json"] }
|
||||
either = { version = "1" }
|
||||
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
|
||||
futures-channel = { version = "0.3", features = ["sink"] }
|
||||
@@ -90,6 +90,7 @@ anyhow = { version = "1", features = ["backtrace"] }
|
||||
bytes = { version = "1", features = ["serde"] }
|
||||
cc = { version = "1", default-features = false, features = ["parallel"] }
|
||||
chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "wasmbind"] }
|
||||
diesel_derives = { version = "2", features = ["32-column-tables", "postgres", "r2d2", "with-deprecated"] }
|
||||
either = { version = "1" }
|
||||
getrandom = { version = "0.2", default-features = false, features = ["std"] }
|
||||
hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", default-features = false, features = ["raw"] }
|
||||
|
||||
Reference in New Issue
Block a user