Compare commits

..

1 Commits

Author SHA1 Message Date
Arseny Sher
cfd78950e2 basic sk bench of pgbench init with perf fixtures 2024-01-30 14:24:27 +03:00
8 changed files with 33 additions and 51 deletions

View File

@@ -508,7 +508,7 @@ jobs:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
TEST_RESULT_CONNSTR: "${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}"
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: std-fs
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring
# XXX: no coverage data handling here, since benchmarks are run on release builds,
# while coverage is currently collected for the debug ones

View File

@@ -1363,22 +1363,16 @@ impl WalIngest {
self.checkpoint.nextMultiOffset = xlrec.moff + xlrec.nmembers;
self.checkpoint_modified = true;
}
let max_mbr_xid = xlrec.members.iter().fold(None, |acc, mbr| {
if let Some(max_xid) = acc {
if mbr.xid.wrapping_sub(max_xid) as i32 > 0 {
Some(mbr.xid)
} else {
acc
}
let max_mbr_xid = xlrec.members.iter().fold(0u32, |acc, mbr| {
if mbr.xid.wrapping_sub(acc) as i32 > 0 {
mbr.xid
} else {
Some(mbr.xid)
acc
}
});
if let Some(max_xid) = max_mbr_xid {
if self.checkpoint.update_next_xid(max_xid) {
self.checkpoint_modified = true;
}
if self.checkpoint.update_next_xid(max_mbr_xid) {
self.checkpoint_modified = true;
}
Ok(())
}

View File

@@ -2,8 +2,7 @@
use crate::{
auth::password_hack::parse_endpoint_param, context::RequestMonitoring, error::UserFacingError,
metrics::NUM_CONNECTION_ACCEPTED_BY_SNI, proxy::NeonOptions, serverless::SERVERLESS_DRIVER_SNI,
EndpointId, RoleName,
metrics::NUM_CONNECTION_ACCEPTED_BY_SNI, proxy::NeonOptions, EndpointId, RoleName,
};
use itertools::Itertools;
use pq_proto::StartupMessageParams;
@@ -55,10 +54,10 @@ impl ComputeUserInfoMaybeEndpoint {
}
}
pub fn endpoint_sni(
sni: &str,
pub fn endpoint_sni<'a>(
sni: &'a str,
common_names: &HashSet<String>,
) -> Result<Option<EndpointId>, ComputeUserInfoParseError> {
) -> Result<&'a str, ComputeUserInfoParseError> {
let Some((subdomain, common_name)) = sni.split_once('.') else {
return Err(ComputeUserInfoParseError::UnknownCommonName { cn: sni.into() });
};
@@ -67,10 +66,7 @@ pub fn endpoint_sni(
cn: common_name.into(),
});
}
if subdomain == SERVERLESS_DRIVER_SNI {
return Ok(None);
}
Ok(Some(EndpointId::from(subdomain)))
Ok(subdomain)
}
impl ComputeUserInfoMaybeEndpoint {
@@ -89,6 +85,7 @@ impl ComputeUserInfoMaybeEndpoint {
// record the values if we have them
ctx.set_application(params.get("application_name").map(SmolStr::from));
ctx.set_user(user.clone());
ctx.set_endpoint_id(sni.map(EndpointId::from));
// Project name might be passed via PG's command-line options.
let endpoint_option = params
@@ -106,7 +103,7 @@ impl ComputeUserInfoMaybeEndpoint {
let endpoint_from_domain = if let Some(sni_str) = sni {
if let Some(cn) = common_names {
endpoint_sni(sni_str, cn)?
Some(EndpointId::from(endpoint_sni(sni_str, cn)?))
} else {
None
}
@@ -120,13 +117,12 @@ impl ComputeUserInfoMaybeEndpoint {
Some(Err(InconsistentProjectNames { domain, option }))
}
// Invariant: project name may not contain certain characters.
(a, b) => a.or(b).map(|name| match project_name_valid(name.as_ref()) {
(a, b) => a.or(b).map(|name| match project_name_valid(&name) {
false => Err(MalformedProjectName(name)),
true => Ok(name),
}),
}
.transpose()?;
ctx.set_endpoint_id(endpoint.clone());
info!(%user, project = endpoint.as_deref(), "credentials");
if sni.is_some() {

View File

@@ -41,8 +41,6 @@ use tokio_util::sync::CancellationToken;
use tracing::{error, info, info_span, warn, Instrument};
use utils::http::{error::ApiError, json::json_response};
pub const SERVERLESS_DRIVER_SNI: &str = "api";
pub async fn task_main(
config: &'static ProxyConfig,
ws_listener: TcpListener,

View File

@@ -1,7 +1,6 @@
use std::sync::Arc;
use anyhow::bail;
use anyhow::Context;
use futures::pin_mut;
use futures::StreamExt;
use hyper::body::HttpBody;
@@ -36,11 +35,11 @@ use crate::config::TlsConfig;
use crate::context::RequestMonitoring;
use crate::metrics::NUM_CONNECTION_REQUESTS_GAUGE;
use crate::proxy::NeonOptions;
use crate::EndpointId;
use crate::RoleName;
use super::conn_pool::ConnInfo;
use super::conn_pool::GlobalConnPool;
use super::SERVERLESS_DRIVER_SNI;
#[derive(serde::Deserialize)]
struct QueryData {
@@ -62,6 +61,7 @@ enum Payload {
const MAX_RESPONSE_SIZE: usize = 10 * 1024 * 1024; // 10 MiB
const MAX_REQUEST_SIZE: u64 = 10 * 1024 * 1024; // 10 MiB
const SERVERLESS_DRIVER_SNI_HOSTNAME_FIRST_PART: &str = "api";
static RAW_TEXT_OUTPUT: HeaderName = HeaderName::from_static("neon-raw-text-output");
static ARRAY_MODE: HeaderName = HeaderName::from_static("neon-array-mode");
@@ -188,7 +188,9 @@ fn get_conn_info(
}
}
let endpoint = endpoint_sni(hostname, &tls.common_names)?.context("malformed endpoint")?;
let endpoint = endpoint_sni(hostname, &tls.common_names)?;
let endpoint: EndpointId = endpoint.into();
ctx.set_endpoint_id(Some(endpoint.clone()));
let pairs = connection_url.query_pairs();
@@ -225,7 +227,8 @@ fn check_matches(sni_hostname: &str, hostname: &str) -> Result<bool, anyhow::Err
let (_, hostname_rest) = hostname
.split_once('.')
.ok_or_else(|| anyhow::anyhow!("Unexpected hostname format."))?;
Ok(sni_hostname_rest == hostname_rest && sni_hostname_first == SERVERLESS_DRIVER_SNI)
Ok(sni_hostname_rest == hostname_rest
&& sni_hostname_first == SERVERLESS_DRIVER_SNI_HOSTNAME_FIRST_PART)
}
// TODO: return different http error codes

View File

@@ -117,7 +117,10 @@ class NeonCompare(PgCompare):
self.timeline = self.env.neon_cli.create_timeline(branch_name, tenant_id=self.tenant)
# Start pg
self._pg = self.env.endpoints.create_start(branch_name, "main", self.tenant)
config_lines = ["max_replication_write_lag=-1", "max_replication_flush_lag=-1"]
self._pg = self.env.endpoints.create_start(
branch_name, "main", self.tenant, config_lines=config_lines
)
@property
def pg(self) -> PgProtocol:
@@ -294,7 +297,7 @@ def remote_compare(zenbenchmark: NeonBenchmarker, remote_pg: RemotePostgres) ->
return RemoteCompare(zenbenchmark, remote_pg)
@pytest.fixture(params=["vanilla_compare", "neon_compare"], ids=["vanilla", "neon"])
@pytest.fixture(params=["neon_compare"], ids=["neon"])
def neon_with_baseline(request: FixtureRequest) -> PgCompare:
"""Parameterized fixture that helps compare neon against vanilla postgres.

View File

@@ -35,7 +35,7 @@ def init_pgbench(env: PgCompare, cmdline, password: None):
t0 = timeit.default_timer()
with env.record_pageserver_writes("init.pageserver_writes"):
out = env.pg_bin.run_capture(cmdline, env=environ)
env.flush()
# env.flush()
duration = timeit.default_timer() - t0
end_timestamp = utc_now_timestamp()
@@ -94,9 +94,7 @@ def run_test_pgbench(env: PgCompare, scale: int, duration: int, workload_type: P
if workload_type == PgBenchLoadType.INIT:
# Run initialize
init_pgbench(
env, ["pgbench", f"-s{scale}", "-i", "-I", "dtGvp", connstr], password=password
)
init_pgbench(env, ["pgbench", f"-s{scale}", "-i", "-I", "dtG", connstr], password=password)
if workload_type == PgBenchLoadType.SIMPLE_UPDATE:
# Run simple-update workload
@@ -151,7 +149,7 @@ def get_durations_matrix(default: int = 45) -> List[int]:
return rv
def get_scales_matrix(default: int = 10) -> List[int]:
def get_scales_matrix(default: int = 100) -> List[int]:
scales = os.getenv("TEST_PG_BENCH_SCALES_MATRIX", default=str(default))
rv = []
for s in scales.split(","):
@@ -172,8 +170,8 @@ def get_scales_matrix(default: int = 10) -> List[int]:
@pytest.mark.parametrize("duration", get_durations_matrix())
def test_pgbench(neon_with_baseline: PgCompare, scale: int, duration: int):
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.INIT)
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SIMPLE_UPDATE)
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SELECT_ONLY)
# run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SIMPLE_UPDATE)
# run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SELECT_ONLY)
# The following 3 tests run on an existing database as it was set up by previous tests,

View File

@@ -203,16 +203,6 @@ def test_import_at_2bil(
$$;
"""
)
# Also create a multi-XID with members past the 2 billion mark
conn2 = endpoint.connect()
cur2 = conn2.cursor()
cur.execute("INSERT INTO t VALUES ('x')")
cur.execute("BEGIN; select * from t WHERE t = 'x' FOR SHARE;")
cur2.execute("BEGIN; select * from t WHERE t = 'x' FOR SHARE;")
cur.execute("COMMIT")
cur2.execute("COMMIT")
# A checkpoint writes a WAL record with xl_xid=0. Many other WAL
# records would have the same effect.
cur.execute("checkpoint")
@@ -227,4 +217,4 @@ def test_import_at_2bil(
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("SELECT count(*) from t")
assert cur.fetchone() == (10000 + 1 + 1,)
assert cur.fetchone() == (10000 + 1,)