Merge branch 'main' into communicator-rewrite

This commit is contained in:
Erik Grinaker
2025-07-14 18:47:23 +02:00
17 changed files with 590 additions and 119 deletions

View File

@@ -31,6 +31,7 @@ config-variables:
- NEON_PROD_AWS_ACCOUNT_ID
- PGREGRESS_PG16_PROJECT_ID
- PGREGRESS_PG17_PROJECT_ID
- PREWARM_PGBENCH_SIZE
- REMOTE_STORAGE_AZURE_CONTAINER
- REMOTE_STORAGE_AZURE_REGION
- SLACK_CICD_CHANNEL_ID

View File

@@ -219,6 +219,7 @@ jobs:
--ignore test_runner/performance/test_cumulative_statistics_persistence.py
--ignore test_runner/performance/test_perf_many_relations.py
--ignore test_runner/performance/test_perf_oltp_large_tenant.py
--ignore test_runner/performance/test_lfc_prewarm.py
env:
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
@@ -410,6 +411,77 @@ jobs:
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
prewarm-test:
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
permissions:
contents: write
statuses: write
id-token: write # aws-actions/configure-aws-credentials
env:
PGBENCH_SIZE: ${{ vars.PREWARM_PGBENCH_SIZE }}
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
DEFAULT_PG_VERSION: 17
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
PLATFORM: "neon-staging"
runs-on: [ self-hosted, us-east-2, x64 ]
container:
image: ghcr.io/neondatabase/build-tools:pinned-bookworm
credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
options: --init
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@e3dd6a429d7300a6a4c196c26e071d42e0343502 # v4.0.2
with:
aws-region: eu-central-1
role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
role-duration-seconds: 18000 # 5 hours
- name: Download Neon artifact
uses: ./.github/actions/download
with:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Run prewarm benchmark
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance/test_lfc_prewarm.py
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 5400
pg_version: ${{ env.DEFAULT_PG_VERSION }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
NEON_API_KEY: ${{ secrets.NEON_STAGING_API_KEY }}
- name: Create Allure report
id: create-allure-report
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
with:
store-test-results-into-db: true
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
generate-matrices:
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
# Create matrices for the benchmarking jobs, so we run benchmarks on rds only once a week (on Saturday)

1
Cargo.lock generated
View File

@@ -4436,6 +4436,7 @@ dependencies = [
"pageserver_client",
"pageserver_client_grpc",
"pageserver_page_api",
"pprof",
"rand 0.8.5",
"reqwest",
"serde",

View File

@@ -396,16 +396,11 @@ pub async fn log_slow<O>(
if !is_slow {
return;
}
let elapsed = elapsed_total.as_secs_f64();
if ready {
info!(
"slow {name} completed after {:.3}s",
elapsed_total.as_secs_f64()
);
info!("slow {name} completed after {elapsed:.3}s");
} else {
info!(
"slow {name} still running after {:.3}s",
elapsed_total.as_secs_f64()
);
info!("slow {name} still running after {elapsed:.3}s");
}
},
)
@@ -432,16 +427,11 @@ pub async fn warn_slow<O>(
if !is_slow {
return;
}
let elapsed = elapsed_total.as_secs_f64();
if ready {
warn!(
"slow {name} completed after {:.3}s",
elapsed_total.as_secs_f64()
);
warn!("slow {name} completed after {elapsed:.3}s");
} else {
warn!(
"slow {name} still running after {:.3}s",
elapsed_total.as_secs_f64()
);
warn!("slow {name} still running after {elapsed:.3}s");
}
},
)

View File

@@ -9,18 +9,22 @@
//!
//! * ChannelPool: manages gRPC channels (TCP connections) to a single Pageserver. Multiple clients
//! can acquire and use the same channel concurrently (via HTTP/2 stream multiplexing), up to a
//! per-channel client limit.
//! per-channel client limit. Channels are closed immediately when empty, and indirectly rely on
//! client/stream idle timeouts.
//!
//! * ClientPool: manages gRPC clients for a single tenant shard. Each client acquires a (shared)
//! channel from the ChannelPool for the client's lifetime. A client can only be acquired by a
//! single caller at a time, and is returned to the pool when dropped.
//! single caller at a time, and is returned to the pool when dropped. Idle clients are removed
//! from the pool after a while to free up resources.
//!
//! * StreamPool: manages bidirectional gRPC GetPage streams. Each stream acquires a client from the
//! ClientPool for the stream's lifetime. A stream can only be acquired by a single caller at a
//! time, and is returned to the pool when dropped. The stream only supports sending a single,
//! synchronous request at a time, and does not support pipelining multiple requests from
//! different callers onto the same stream -- instead, we scale out concurrent streams to improve
//! throughput. There are many reasons for this design choice:
//! time, and is returned to the pool when dropped. Idle streams are removed from the pool after
//! a while to free up resources.
//!
//! The stream only supports sending a single, synchronous request at a time, and does not support
//! pipelining multiple requests from different callers onto the same stream -- instead, we scale
//! out concurrent streams to improve throughput. There are many reasons for this design choice:
//!
//! * It (mostly) eliminates head-of-line blocking. A single stream is processed sequentially by
//! a single server task, which may block e.g. on layer downloads, LSN waits, etc.
@@ -29,16 +33,13 @@
//! (e.g. because of a timeout), the request would still be processed by the server and block
//! requests behind it in the stream. It might even block its own timeout retry.
//!
//! * Individual callers can use client-side batching for pipelining.
//!
//! * Stream scheduling becomes significantly simpler and cheaper.
//!
//! * Individual callers can still use client-side batching for pipelining.
//!
//! * Idle streams are cheap. Benchmarks show that an idle GetPage stream takes up about 26 KB
//! per stream (2.5 GB for 100,000 streams), so we can afford to scale out.
//!
//! Idle clients/streams are removed from the pools periodically, to free up server resources.
//! Channels are reaped immediately when unused, and indirectly rely on client/stream idle timeouts.
//!
//! Each channel corresponds to one TCP connection. Each client unary request and each stream
//! corresponds to one HTTP/2 stream and server task.
//!
@@ -435,7 +436,6 @@ struct BiStream {
/// enforced by `StreamGuard::send`.
sender: watch::Sender<page_api::GetPageRequest>,
/// Stream for receiving responses.
/// TODO: consider returning a concrete type from `Client::get_pages`.
receiver: Pin<Box<dyn Stream<Item = tonic::Result<page_api::GetPageResponse>> + Send>>,
}
@@ -465,7 +465,7 @@ impl StreamPool {
///
/// This is very performance-sensitive, as it is on the GetPage hot path.
///
/// TODO: is a Mutex<BTreeMap> performant enough? Will it become too contended? We can't
/// TODO: is a `Mutex<BTreeMap>` performant enough? Will it become too contended? We can't
/// trivially use e.g. DashMap or sharding, because we want to pop lower-ordered streams first
/// to free up higher-ordered channels.
pub async fn get(self: &Arc<Self>) -> tonic::Result<StreamGuard> {
@@ -480,7 +480,7 @@ impl StreamPool {
return Ok(StreamGuard {
pool: Arc::downgrade(self),
stream: Some(entry.stream),
active: false,
can_reuse: true,
permit,
});
}
@@ -500,7 +500,7 @@ impl StreamPool {
sender: req_tx,
receiver: Box::pin(resp_stream),
}),
active: false,
can_reuse: true,
permit,
})
}
@@ -521,7 +521,7 @@ impl Reapable for StreamPool {
pub struct StreamGuard {
pool: Weak<StreamPool>,
stream: Option<BiStream>, // Some until dropped
active: bool, // not returned to pool if true
can_reuse: bool, // returned to pool if true
permit: Option<OwnedSemaphorePermit>, // None if pool is unbounded
}
@@ -543,17 +543,17 @@ impl StreamGuard {
let req_id = req.request_id;
let stream = self.stream.as_mut().expect("not dropped");
// Mark the stream as active. We only allow one request at a time, to avoid HoL-blocking.
// We also don't allow reuse of the stream after an error.
// Mark the stream as not reusable while the request is in flight. We can't return the
// stream to the pool until we receive the response, to avoid head-of-line blocking and
// stale responses. Failed streams can't be reused either.
if !self.can_reuse {
return Err(tonic::Status::internal("stream can't be reused"));
}
self.can_reuse = false;
// Send the request and receive the response.
//
// NB: this uses a watch channel, so it's unsafe to change this code to pipeline requests.
if self.active {
return Err(tonic::Status::internal("stream already active"));
}
self.active = true;
// Send the request and receive the response. If the stream errors for whatever reason, we
// don't reset `active` such that the stream won't be returned to the pool.
stream
.sender
.send(req)
@@ -572,8 +572,8 @@ impl StreamGuard {
)));
}
// Success, mark the stream as inactive.
self.active = false;
// Success, mark the stream as reusable.
self.can_reuse = true;
Ok(resp)
}
@@ -585,10 +585,8 @@ impl Drop for StreamGuard {
return; // pool was dropped
};
// If the stream is still active, we can't return it to the pool. The next caller could be
// head-of-line blocked by an in-flight request, receive a stale response, or the stream may
// have failed.
if self.active {
// If the stream isn't reusable, it can't be returned to the pool.
if !self.can_reuse {
return;
}

View File

@@ -16,6 +16,7 @@ futures.workspace = true
hdrhistogram.workspace = true
humantime.workspace = true
humantime-serde.workspace = true
pprof.workspace = true
rand.workspace = true
reqwest.workspace = true
serde.workspace = true

View File

@@ -0,0 +1,127 @@
use std::sync::Arc;
use anyhow::anyhow;
use futures::StreamExt;
use tonic::transport::Endpoint;
use tracing::info;
use pageserver_page_api::{GetPageClass, GetPageRequest, GetPageStatusCode, ReadLsn, RelTag};
use utils::id::TenantTimelineId;
use utils::lsn::Lsn;
use utils::shard::ShardIndex;
/// Starts a large number of idle gRPC GetPage streams.
#[derive(clap::Parser)]
pub(crate) struct Args {
/// The Pageserver to connect to. Must use grpc://.
#[clap(long, default_value = "grpc://localhost:51051")]
server: String,
/// The Pageserver HTTP API.
#[clap(long, default_value = "http://localhost:9898")]
http_server: String,
/// The number of streams to open.
#[clap(long, default_value = "100000")]
count: usize,
/// Number of streams per connection.
#[clap(long, default_value = "100")]
per_connection: usize,
/// Send a single GetPage request on each stream.
#[clap(long, default_value_t = false)]
send_request: bool,
}
pub(crate) fn main(args: Args) -> anyhow::Result<()> {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
rt.block_on(main_impl(args))
}
async fn main_impl(args: Args) -> anyhow::Result<()> {
// Discover a tenant and timeline to use.
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
reqwest::Client::new(),
args.http_server.clone(),
None,
));
let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
&mgmt_api_client,
crate::util::cli::targets::Spec {
limit_to_first_n_targets: Some(1),
targets: None,
},
)
.await?;
let ttid = timelines
.first()
.ok_or_else(|| anyhow!("no timelines found"))?;
// Set up the initial client.
let endpoint = Endpoint::from_shared(args.server.clone())?;
let connect = async || {
pageserver_page_api::Client::new(
endpoint.connect().await?,
ttid.tenant_id,
ttid.timeline_id,
ShardIndex::unsharded(),
None,
None,
)
};
let mut client = connect().await?;
let mut streams = Vec::with_capacity(args.count);
// Create streams.
for i in 0..args.count {
if i % 100 == 0 {
info!("opened {}/{} streams", i, args.count);
}
if i % args.per_connection == 0 && i > 0 {
client = connect().await?;
}
let (req_tx, req_rx) = tokio::sync::mpsc::unbounded_channel();
let req_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(req_rx);
let mut resp_stream = client.get_pages(req_stream).await?;
// Send request if specified.
if args.send_request {
req_tx.send(GetPageRequest {
request_id: 1.into(),
request_class: GetPageClass::Normal,
read_lsn: ReadLsn {
request_lsn: Lsn::MAX,
not_modified_since_lsn: Some(Lsn(1)),
},
rel: RelTag {
spcnode: 1664, // pg_global
dbnode: 0, // shared database
relnode: 1262, // pg_authid
forknum: 0, // init
},
block_numbers: vec![0],
})?;
let resp = resp_stream
.next()
.await
.transpose()?
.ok_or_else(|| anyhow!("no response"))?;
if resp.status_code != GetPageStatusCode::Ok {
return Err(anyhow!("{} response", resp.status_code));
}
}
// Hold onto streams to avoid closing them.
streams.push((req_tx, resp_stream));
}
info!("opened {} streams, sleeping", args.count);
// Block forever, to hold the idle streams open for inspection.
futures::future::pending::<()>().await;
Ok(())
}

View File

@@ -1,4 +1,7 @@
use std::fs::File;
use clap::Parser;
use tracing::info;
use utils::logging;
/// Re-usable pieces of code that aren't CLI-specific.
@@ -17,38 +20,73 @@ mod cmd {
pub(super) mod aux_files;
pub(super) mod basebackup;
pub(super) mod getpage_latest_lsn;
pub(super) mod idle_streams;
pub(super) mod ondemand_download_churn;
pub(super) mod trigger_initial_size_calculation;
}
/// Component-level performance test for pageserver.
#[derive(clap::Parser)]
enum Args {
struct Args {
/// Takes a client CPU profile into profile.svg. The benchmark must exit cleanly before it's
/// written, e.g. via --runtime.
#[arg(long)]
profile: bool,
#[command(subcommand)]
subcommand: Subcommand,
}
#[derive(clap::Subcommand)]
enum Subcommand {
Basebackup(cmd::basebackup::Args),
GetPageLatestLsn(cmd::getpage_latest_lsn::Args),
TriggerInitialSizeCalculation(cmd::trigger_initial_size_calculation::Args),
OndemandDownloadChurn(cmd::ondemand_download_churn::Args),
AuxFiles(cmd::aux_files::Args),
IdleStreams(cmd::idle_streams::Args),
}
fn main() {
fn main() -> anyhow::Result<()> {
logging::init(
logging::LogFormat::Plain,
logging::TracingErrorLayerEnablement::Disabled,
logging::Output::Stderr,
)
.unwrap();
)?;
logging::replace_panic_hook_with_tracing_panic_hook().forget();
let args = Args::parse();
match args {
Args::Basebackup(args) => cmd::basebackup::main(args),
Args::GetPageLatestLsn(args) => cmd::getpage_latest_lsn::main(args),
Args::TriggerInitialSizeCalculation(args) => {
// Start a CPU profile if requested.
let mut profiler = None;
if args.profile {
profiler = Some(
pprof::ProfilerGuardBuilder::default()
.frequency(1000)
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
.build()?,
);
}
match args.subcommand {
Subcommand::Basebackup(args) => cmd::basebackup::main(args),
Subcommand::GetPageLatestLsn(args) => cmd::getpage_latest_lsn::main(args),
Subcommand::TriggerInitialSizeCalculation(args) => {
cmd::trigger_initial_size_calculation::main(args)
}
Args::OndemandDownloadChurn(args) => cmd::ondemand_download_churn::main(args),
Args::AuxFiles(args) => cmd::aux_files::main(args),
Subcommand::OndemandDownloadChurn(args) => cmd::ondemand_download_churn::main(args),
Subcommand::AuxFiles(args) => cmd::aux_files::main(args),
Subcommand::IdleStreams(args) => cmd::idle_streams::main(args),
}?;
// Generate a CPU flamegraph if requested.
if let Some(profiler) = profiler {
let report = profiler.report().build()?;
drop(profiler); // stop profiling
let file = File::create("profile.svg")?;
report.flamegraph(file)?;
info!("wrote CPU profile flamegraph to profile.svg")
}
.unwrap()
Ok(())
}

View File

@@ -3393,7 +3393,13 @@ impl TenantShard {
.collect_vec();
for timeline in timelines {
timeline.maybe_freeze_ephemeral_layer().await;
// Include a span with the timeline ID. The parent span already has the tenant ID.
let span =
info_span!("maybe_freeze_ephemeral_layer", timeline_id = %timeline.timeline_id);
timeline
.maybe_freeze_ephemeral_layer()
.instrument(span)
.await;
}
}

View File

@@ -1893,6 +1893,8 @@ impl Timeline {
// an ephemeral layer open forever when idle. It also freezes layers if the global limit on
// ephemeral layer bytes has been breached.
pub(super) async fn maybe_freeze_ephemeral_layer(&self) {
debug_assert_current_span_has_tenant_and_timeline_id();
let Ok(mut write_guard) = self.write_lock.try_lock() else {
// If the write lock is held, there is an active wal receiver: rolling open layers
// is their responsibility while they hold this lock.

View File

@@ -28,23 +28,28 @@ use crate::pqproto::CancelKeyData;
use crate::rate_limiter::LeakyBucketRateLimiter;
use crate::redis::keys::KeyPrefix;
use crate::redis::kv_ops::{RedisKVClient, RedisKVClientError};
use crate::util::run_until;
type IpSubnetKey = IpNet;
const CANCEL_KEY_TTL: std::time::Duration = std::time::Duration::from_secs(600);
const CANCEL_KEY_REFRESH: std::time::Duration = std::time::Duration::from_secs(570);
const CANCEL_KEY_TTL: Duration = Duration::from_secs(600);
const CANCEL_KEY_REFRESH: Duration = Duration::from_secs(570);
// Message types for sending through mpsc channel
pub enum CancelKeyOp {
StoreCancelKey {
Store {
key: CancelKeyData,
value: Box<str>,
expire: std::time::Duration,
expire: Duration,
},
GetCancelData {
Refresh {
key: CancelKeyData,
expire: Duration,
},
Get {
key: CancelKeyData,
},
GetCancelDataOld {
GetOld {
key: CancelKeyData,
},
}
@@ -107,7 +112,7 @@ impl Pipeline {
impl CancelKeyOp {
fn register(&self, pipe: &mut Pipeline) {
match self {
CancelKeyOp::StoreCancelKey { key, value, expire } => {
CancelKeyOp::Store { key, value, expire } => {
let key = KeyPrefix::Cancel(*key).build_redis_key();
pipe.add_command(Cmd::set_options(
&key,
@@ -115,11 +120,15 @@ impl CancelKeyOp {
SetOptions::default().with_expiration(SetExpiry::EX(expire.as_secs())),
));
}
CancelKeyOp::GetCancelDataOld { key } => {
CancelKeyOp::Refresh { key, expire } => {
let key = KeyPrefix::Cancel(*key).build_redis_key();
pipe.add_command(Cmd::expire(&key, expire.as_secs() as i64));
}
CancelKeyOp::GetOld { key } => {
let key = KeyPrefix::Cancel(*key).build_redis_key();
pipe.add_command(Cmd::hget(key, "data"));
}
CancelKeyOp::GetCancelData { key } => {
CancelKeyOp::Get { key } => {
let key = KeyPrefix::Cancel(*key).build_redis_key();
pipe.add_command(Cmd::get(key));
}
@@ -263,7 +272,7 @@ impl CancellationHandler {
.proxy
.cancel_channel_size
.guard(RedisMsgKind::Get);
let op = CancelKeyOp::GetCancelData { key };
let op = CancelKeyOp::Get { key };
let result = timeout(
TIMEOUT,
tx.call((guard, op), std::future::pending::<Infallible>()),
@@ -288,7 +297,7 @@ impl CancellationHandler {
.proxy
.cancel_channel_size
.guard(RedisMsgKind::HGet);
let op = CancelKeyOp::GetCancelDataOld { key };
let op = CancelKeyOp::GetOld { key };
timeout(
TIMEOUT,
tx.call((guard, op), std::future::pending::<Infallible>()),
@@ -473,40 +482,95 @@ impl Session {
let mut cancel = pin!(cancel);
enum State {
Set,
Refresh,
}
let mut state = State::Set;
loop {
let guard = Metrics::get()
.proxy
.cancel_channel_size
.guard(RedisMsgKind::Set);
let op = CancelKeyOp::StoreCancelKey {
key: self.key,
value: closure_json.clone(),
expire: CANCEL_KEY_TTL,
let guard_op = match state {
State::Set => {
let guard = Metrics::get()
.proxy
.cancel_channel_size
.guard(RedisMsgKind::Set);
let op = CancelKeyOp::Store {
key: self.key,
value: closure_json.clone(),
expire: CANCEL_KEY_TTL,
};
tracing::debug!(
src=%self.key,
dest=?cancel_closure.cancel_token,
"registering cancellation key"
);
(guard, op)
}
State::Refresh => {
let guard = Metrics::get()
.proxy
.cancel_channel_size
.guard(RedisMsgKind::Expire);
let op = CancelKeyOp::Refresh {
key: self.key,
expire: CANCEL_KEY_TTL,
};
tracing::debug!(
src=%self.key,
dest=?cancel_closure.cancel_token,
"refreshing cancellation key"
);
(guard, op)
}
};
tracing::debug!(
src=%self.key,
dest=?cancel_closure.cancel_token,
"registering cancellation key"
);
match tx.call((guard, op), cancel.as_mut()).await {
Ok(_) => {
match tx.call(guard_op, cancel.as_mut()).await {
// SET returns OK
Ok(Value::Okay) => {
tracing::debug!(
src=%self.key,
dest=?cancel_closure.cancel_token,
"registered cancellation key"
);
// wait before continuing.
tokio::time::sleep(CANCEL_KEY_REFRESH).await;
state = State::Refresh;
}
// EXPIRE returns 1
Ok(Value::Int(1)) => {
tracing::debug!(
src=%self.key,
dest=?cancel_closure.cancel_token,
"refreshed cancellation key"
);
}
Ok(_) => {
// Any other response likely means the key expired.
tracing::warn!(src=%self.key, "refreshing cancellation key failed");
// Re-enter the SET loop to repush full data.
state = State::Set;
}
// retry immediately.
Err(BatchQueueError::Result(error)) => {
tracing::warn!(?error, "error registering cancellation key");
tracing::warn!(?error, "error refreshing cancellation key");
// Small delay to prevent busy loop with high cpu and logging.
tokio::time::sleep(Duration::from_millis(10)).await;
continue;
}
Err(BatchQueueError::Cancelled(Err(_cancelled))) => break,
}
// wait before continuing. break immediately if cancelled.
if run_until(tokio::time::sleep(CANCEL_KEY_REFRESH), cancel.as_mut())
.await
.is_err()
{
break;
}
}
if let Err(err) = cancel_closure

View File

@@ -376,6 +376,7 @@ pub enum Waiting {
pub enum RedisMsgKind {
Set,
Get,
Expire,
HGet,
}

View File

@@ -7,8 +7,16 @@ pub async fn run_until_cancelled<F: Future>(
f: F,
cancellation_token: &CancellationToken,
) -> Option<F::Output> {
match select(pin!(f), pin!(cancellation_token.cancelled())).await {
Either::Left((f, _)) => Some(f),
Either::Right(((), _)) => None,
run_until(f, cancellation_token.cancelled()).await.ok()
}
/// Runs the future `f` unless interrupted by future `condition`.
pub async fn run_until<F1: Future, F2: Future>(
f: F1,
condition: F2,
) -> Result<F1::Output, F2::Output> {
match select(pin!(f), pin!(condition)).await {
Either::Left((f1, _)) => Ok(f1),
Either::Right((f2, _)) => Err(f2),
}
}

View File

@@ -34,7 +34,9 @@ class NeonAPI:
self.retries524 = 0
self.retries4xx = 0
def __request(self, method: str | bytes, endpoint: str, **kwargs: Any) -> requests.Response:
def __request(
self, method: str | bytes, endpoint: str, retry404: bool = False, **kwargs: Any
) -> requests.Response:
kwargs["headers"] = kwargs.get("headers", {})
kwargs["headers"]["Authorization"] = f"Bearer {self.__neon_api_key}"
@@ -55,10 +57,12 @@ class NeonAPI:
resp.raise_for_status()
break
elif resp.status_code >= 400:
if resp.status_code == 422:
if resp.json()["message"] == "branch not ready yet":
retry = True
self.retries4xx += 1
if resp.status_code == 404 and retry404:
retry = True
self.retries4xx += 1
elif resp.status_code == 422 and resp.json()["message"] == "branch not ready yet":
retry = True
self.retries4xx += 1
elif resp.status_code == 423 and resp.json()["message"] in {
"endpoint is in some transitive state, could not suspend",
"project already has running conflicting operations, scheduling of new ones is prohibited",
@@ -66,7 +70,7 @@ class NeonAPI:
retry = True
self.retries4xx += 1
elif resp.status_code == 524:
log.info("The request was timed out, trying to get operations")
log.info("The request was timed out")
retry = True
self.retries524 += 1
if retry:
@@ -203,6 +207,9 @@ class NeonAPI:
resp = self.__request(
"GET",
f"/projects/{project_id}/branches/{branch_id}",
# XXX Retry get parent details to work around the issue
# https://databricks.atlassian.net/browse/LKB-279
retry404=True,
headers={
"Accept": "application/json",
},
@@ -307,6 +314,10 @@ class NeonAPI:
if endpoint_type:
data["endpoint"]["type"] = endpoint_type
if settings:
# otherwise we get 400 "settings must not be nil"
# TODO(myrrc): fix on cplane side
if "pg_settings" not in settings:
settings["pg_settings"] = {}
data["endpoint"]["settings"] = settings
resp = self.__request(

View File

@@ -0,0 +1,167 @@
from __future__ import annotations
import os
import timeit
import traceback
from concurrent.futures import ThreadPoolExecutor as Exec
from pathlib import Path
from time import sleep
from typing import TYPE_CHECKING, Any, cast
import pytest
from fixtures.benchmark_fixture import NeonBenchmarker, PgBenchRunResult
from fixtures.log_helper import log
from fixtures.neon_api import NeonAPI, connection_parameters_to_env
if TYPE_CHECKING:
from fixtures.compare_fixtures import NeonCompare
from fixtures.neon_fixtures import Endpoint, PgBin
from fixtures.pg_version import PgVersion
from performance.test_perf_pgbench import utc_now_timestamp
# These tests compare performance for a write-heavy and read-heavy workloads of an ordinary endpoint
# compared to the endpoint which saves its LFC and prewarms using it on startup.
def test_compare_prewarmed_pgbench_perf(neon_compare: NeonCompare):
env = neon_compare.env
env.create_branch("normal")
env.create_branch("prewarmed")
pg_bin = neon_compare.pg_bin
ep_normal: Endpoint = env.endpoints.create_start("normal")
ep_prewarmed: Endpoint = env.endpoints.create_start("prewarmed", autoprewarm=True)
for ep in [ep_normal, ep_prewarmed]:
connstr: str = ep.connstr()
pg_bin.run(["pgbench", "-i", "-I", "dtGvp", connstr, "-s100"])
ep.safe_psql("CREATE EXTENSION neon")
client = ep.http_client()
client.offload_lfc()
ep.stop()
ep.start()
client.prewarm_lfc_wait()
run_start_timestamp = utc_now_timestamp()
t0 = timeit.default_timer()
out = pg_bin.run_capture(["pgbench", "-c10", "-T10", connstr])
run_duration = timeit.default_timer() - t0
run_end_timestamp = utc_now_timestamp()
stdout = Path(f"{out}.stdout").read_text()
res = PgBenchRunResult.parse_from_stdout(
stdout=stdout,
run_duration=run_duration,
run_start_timestamp=run_start_timestamp,
run_end_timestamp=run_end_timestamp,
)
name: str = cast("str", ep.branch_name)
neon_compare.zenbenchmark.record_pg_bench_result(name, res)
@pytest.mark.remote_cluster
@pytest.mark.timeout(30 * 60)
def test_compare_prewarmed_pgbench_perf_benchmark(
pg_bin: PgBin,
neon_api: NeonAPI,
pg_version: PgVersion,
zenbenchmark: NeonBenchmarker,
):
name = f"Test prewarmed pgbench performance, GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}"
project = neon_api.create_project(pg_version, name)
project_id = project["project"]["id"]
neon_api.wait_for_operation_to_finish(project_id)
err = False
try:
benchmark_impl(pg_bin, neon_api, project, zenbenchmark)
except Exception as e:
err = True
log.error(f"Caught exception: {e}")
log.error(traceback.format_exc())
finally:
assert not err
neon_api.delete_project(project_id)
def benchmark_impl(
pg_bin: PgBin, neon_api: NeonAPI, project: dict[str, Any], zenbenchmark: NeonBenchmarker
):
pgbench_size = int(os.getenv("PGBENCH_SIZE") or "3424") # 50GB
offload_secs = 20
test_duration_min = 5
pgbench_duration = f"-T{test_duration_min * 60}"
# prewarm API is not publicly exposed. In order to test performance of a
# fully prewarmed endpoint, wait after it restarts
prewarmed_sleep_secs = 30
branch_id = project["branch"]["id"]
project_id = project["project"]["id"]
normal_env = connection_parameters_to_env(
project["connection_uris"][0]["connection_parameters"]
)
normal_id = project["endpoints"][0]["id"]
prewarmed_branch_id = neon_api.create_branch(
project_id, "prewarmed", parent_id=branch_id, add_endpoint=False
)["branch"]["id"]
neon_api.wait_for_operation_to_finish(project_id)
ep_prewarmed = neon_api.create_endpoint(
project_id,
prewarmed_branch_id,
endpoint_type="read_write",
settings={"autoprewarm": True, "offload_lfc_interval_seconds": offload_secs},
)
neon_api.wait_for_operation_to_finish(project_id)
prewarmed_env = normal_env.copy()
prewarmed_env["PGHOST"] = ep_prewarmed["endpoint"]["host"]
prewarmed_id = ep_prewarmed["endpoint"]["id"]
def bench(endpoint_name, endpoint_id, env):
pg_bin.run(["pgbench", "-i", "-I", "dtGvp", f"-s{pgbench_size}"], env)
sleep(offload_secs * 2) # ensure LFC is offloaded after pgbench finishes
neon_api.restart_endpoint(project_id, endpoint_id)
sleep(prewarmed_sleep_secs)
run_start_timestamp = utc_now_timestamp()
t0 = timeit.default_timer()
out = pg_bin.run_capture(["pgbench", "-c10", pgbench_duration, "-Mprepared"], env)
run_duration = timeit.default_timer() - t0
run_end_timestamp = utc_now_timestamp()
stdout = Path(f"{out}.stdout").read_text()
res = PgBenchRunResult.parse_from_stdout(
stdout=stdout,
run_duration=run_duration,
run_start_timestamp=run_start_timestamp,
run_end_timestamp=run_end_timestamp,
)
zenbenchmark.record_pg_bench_result(endpoint_name, res)
with Exec(max_workers=2) as exe:
exe.submit(bench, "normal", normal_id, normal_env)
exe.submit(bench, "prewarmed", prewarmed_id, prewarmed_env)
def test_compare_prewarmed_read_perf(neon_compare: NeonCompare):
env = neon_compare.env
env.create_branch("normal")
env.create_branch("prewarmed")
ep_normal: Endpoint = env.endpoints.create_start("normal")
ep_prewarmed: Endpoint = env.endpoints.create_start("prewarmed", autoprewarm=True)
sql = [
"CREATE EXTENSION neon",
"CREATE TABLE foo(key serial primary key, t text default 'foooooooooooooooooooooooooooooooooooooooooooooooooooo')",
"INSERT INTO foo SELECT FROM generate_series(1,1000000)",
]
for ep in [ep_normal, ep_prewarmed]:
ep.safe_psql_many(sql)
client = ep.http_client()
client.offload_lfc()
ep.stop()
ep.start()
client.prewarm_lfc_wait()
with neon_compare.record_duration(f"{ep.branch_name}_run_duration"):
ep.safe_psql("SELECT count(*) from foo")

View File

@@ -13,7 +13,6 @@ from typing import TYPE_CHECKING, Any
import pytest
from fixtures.log_helper import log
from requests import HTTPError
if TYPE_CHECKING:
from pathlib import Path
@@ -153,26 +152,11 @@ class NeonBranch:
return
self.updated_at = datetime.fromisoformat(res["branch"]["updated_at"])
parent_id: str = res["branch"]["parent_id"]
# XXX Retry get parent details to work around the issue
# https://databricks.atlassian.net/browse/LKB-279
target_time = datetime.now() + timedelta(seconds=30)
while datetime.now() < target_time:
try:
parent_def = self.neon_api.get_branch_details(self.project_id, parent_id)
except HTTPError as he:
if he.response.status_code == 404:
log.info("Branch not found, waiting...")
time.sleep(1)
else:
raise HTTPError(he) from he
else:
break
else:
raise RuntimeError(f"Branch {parent_id} not found")
# Creates an object for the parent branch
# After the reset operation a new parent branch is created
parent = NeonBranch(self.project, parent_def, True)
parent = NeonBranch(
self.project, self.neon_api.get_branch_details(self.project_id, parent_id), True
)
self.project.branches[parent_id] = parent
self.parent = parent
parent.children[self.id] = self

View File

@@ -98,7 +98,7 @@ tikv-jemalloc-sys = { version = "0.6", features = ["profiling", "stats", "unpref
time = { version = "0.3", features = ["macros", "serde-well-known"] }
tokio = { version = "1", features = ["full", "test-util"] }
tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring", "tls12"] }
tokio-stream = { version = "0.1", features = ["net"] }
tokio-stream = { version = "0.1", features = ["net", "sync"] }
tokio-util = { version = "0.7", features = ["codec", "compat", "io-util", "rt"] }
toml_edit = { version = "0.22", features = ["serde"] }
tower = { version = "0.5", default-features = false, features = ["balance", "buffer", "limit", "log"] }