mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-06 18:00:37 +00:00
Compare commits
1 Commits
release-47
...
sk-basic-b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cfd78950e2 |
2
.github/workflows/build_and_test.yml
vendored
2
.github/workflows/build_and_test.yml
vendored
@@ -508,7 +508,7 @@ jobs:
|
||||
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
|
||||
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
|
||||
TEST_RESULT_CONNSTR: "${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}"
|
||||
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: std-fs
|
||||
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring
|
||||
# XXX: no coverage data handling here, since benchmarks are run on release builds,
|
||||
# while coverage is currently collected for the debug ones
|
||||
|
||||
|
||||
@@ -1363,22 +1363,16 @@ impl WalIngest {
|
||||
self.checkpoint.nextMultiOffset = xlrec.moff + xlrec.nmembers;
|
||||
self.checkpoint_modified = true;
|
||||
}
|
||||
let max_mbr_xid = xlrec.members.iter().fold(None, |acc, mbr| {
|
||||
if let Some(max_xid) = acc {
|
||||
if mbr.xid.wrapping_sub(max_xid) as i32 > 0 {
|
||||
Some(mbr.xid)
|
||||
} else {
|
||||
acc
|
||||
}
|
||||
let max_mbr_xid = xlrec.members.iter().fold(0u32, |acc, mbr| {
|
||||
if mbr.xid.wrapping_sub(acc) as i32 > 0 {
|
||||
mbr.xid
|
||||
} else {
|
||||
Some(mbr.xid)
|
||||
acc
|
||||
}
|
||||
});
|
||||
|
||||
if let Some(max_xid) = max_mbr_xid {
|
||||
if self.checkpoint.update_next_xid(max_xid) {
|
||||
self.checkpoint_modified = true;
|
||||
}
|
||||
if self.checkpoint.update_next_xid(max_mbr_xid) {
|
||||
self.checkpoint_modified = true;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -2,8 +2,7 @@
|
||||
|
||||
use crate::{
|
||||
auth::password_hack::parse_endpoint_param, context::RequestMonitoring, error::UserFacingError,
|
||||
metrics::NUM_CONNECTION_ACCEPTED_BY_SNI, proxy::NeonOptions, serverless::SERVERLESS_DRIVER_SNI,
|
||||
EndpointId, RoleName,
|
||||
metrics::NUM_CONNECTION_ACCEPTED_BY_SNI, proxy::NeonOptions, EndpointId, RoleName,
|
||||
};
|
||||
use itertools::Itertools;
|
||||
use pq_proto::StartupMessageParams;
|
||||
@@ -55,10 +54,10 @@ impl ComputeUserInfoMaybeEndpoint {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn endpoint_sni(
|
||||
sni: &str,
|
||||
pub fn endpoint_sni<'a>(
|
||||
sni: &'a str,
|
||||
common_names: &HashSet<String>,
|
||||
) -> Result<Option<EndpointId>, ComputeUserInfoParseError> {
|
||||
) -> Result<&'a str, ComputeUserInfoParseError> {
|
||||
let Some((subdomain, common_name)) = sni.split_once('.') else {
|
||||
return Err(ComputeUserInfoParseError::UnknownCommonName { cn: sni.into() });
|
||||
};
|
||||
@@ -67,10 +66,7 @@ pub fn endpoint_sni(
|
||||
cn: common_name.into(),
|
||||
});
|
||||
}
|
||||
if subdomain == SERVERLESS_DRIVER_SNI {
|
||||
return Ok(None);
|
||||
}
|
||||
Ok(Some(EndpointId::from(subdomain)))
|
||||
Ok(subdomain)
|
||||
}
|
||||
|
||||
impl ComputeUserInfoMaybeEndpoint {
|
||||
@@ -89,6 +85,7 @@ impl ComputeUserInfoMaybeEndpoint {
|
||||
// record the values if we have them
|
||||
ctx.set_application(params.get("application_name").map(SmolStr::from));
|
||||
ctx.set_user(user.clone());
|
||||
ctx.set_endpoint_id(sni.map(EndpointId::from));
|
||||
|
||||
// Project name might be passed via PG's command-line options.
|
||||
let endpoint_option = params
|
||||
@@ -106,7 +103,7 @@ impl ComputeUserInfoMaybeEndpoint {
|
||||
|
||||
let endpoint_from_domain = if let Some(sni_str) = sni {
|
||||
if let Some(cn) = common_names {
|
||||
endpoint_sni(sni_str, cn)?
|
||||
Some(EndpointId::from(endpoint_sni(sni_str, cn)?))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
@@ -120,13 +117,12 @@ impl ComputeUserInfoMaybeEndpoint {
|
||||
Some(Err(InconsistentProjectNames { domain, option }))
|
||||
}
|
||||
// Invariant: project name may not contain certain characters.
|
||||
(a, b) => a.or(b).map(|name| match project_name_valid(name.as_ref()) {
|
||||
(a, b) => a.or(b).map(|name| match project_name_valid(&name) {
|
||||
false => Err(MalformedProjectName(name)),
|
||||
true => Ok(name),
|
||||
}),
|
||||
}
|
||||
.transpose()?;
|
||||
ctx.set_endpoint_id(endpoint.clone());
|
||||
|
||||
info!(%user, project = endpoint.as_deref(), "credentials");
|
||||
if sni.is_some() {
|
||||
|
||||
@@ -41,8 +41,6 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info, info_span, warn, Instrument};
|
||||
use utils::http::{error::ApiError, json::json_response};
|
||||
|
||||
pub const SERVERLESS_DRIVER_SNI: &str = "api";
|
||||
|
||||
pub async fn task_main(
|
||||
config: &'static ProxyConfig,
|
||||
ws_listener: TcpListener,
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::bail;
|
||||
use anyhow::Context;
|
||||
use futures::pin_mut;
|
||||
use futures::StreamExt;
|
||||
use hyper::body::HttpBody;
|
||||
@@ -36,11 +35,11 @@ use crate::config::TlsConfig;
|
||||
use crate::context::RequestMonitoring;
|
||||
use crate::metrics::NUM_CONNECTION_REQUESTS_GAUGE;
|
||||
use crate::proxy::NeonOptions;
|
||||
use crate::EndpointId;
|
||||
use crate::RoleName;
|
||||
|
||||
use super::conn_pool::ConnInfo;
|
||||
use super::conn_pool::GlobalConnPool;
|
||||
use super::SERVERLESS_DRIVER_SNI;
|
||||
|
||||
#[derive(serde::Deserialize)]
|
||||
struct QueryData {
|
||||
@@ -62,6 +61,7 @@ enum Payload {
|
||||
|
||||
const MAX_RESPONSE_SIZE: usize = 10 * 1024 * 1024; // 10 MiB
|
||||
const MAX_REQUEST_SIZE: u64 = 10 * 1024 * 1024; // 10 MiB
|
||||
const SERVERLESS_DRIVER_SNI_HOSTNAME_FIRST_PART: &str = "api";
|
||||
|
||||
static RAW_TEXT_OUTPUT: HeaderName = HeaderName::from_static("neon-raw-text-output");
|
||||
static ARRAY_MODE: HeaderName = HeaderName::from_static("neon-array-mode");
|
||||
@@ -188,7 +188,9 @@ fn get_conn_info(
|
||||
}
|
||||
}
|
||||
|
||||
let endpoint = endpoint_sni(hostname, &tls.common_names)?.context("malformed endpoint")?;
|
||||
let endpoint = endpoint_sni(hostname, &tls.common_names)?;
|
||||
|
||||
let endpoint: EndpointId = endpoint.into();
|
||||
ctx.set_endpoint_id(Some(endpoint.clone()));
|
||||
|
||||
let pairs = connection_url.query_pairs();
|
||||
@@ -225,7 +227,8 @@ fn check_matches(sni_hostname: &str, hostname: &str) -> Result<bool, anyhow::Err
|
||||
let (_, hostname_rest) = hostname
|
||||
.split_once('.')
|
||||
.ok_or_else(|| anyhow::anyhow!("Unexpected hostname format."))?;
|
||||
Ok(sni_hostname_rest == hostname_rest && sni_hostname_first == SERVERLESS_DRIVER_SNI)
|
||||
Ok(sni_hostname_rest == hostname_rest
|
||||
&& sni_hostname_first == SERVERLESS_DRIVER_SNI_HOSTNAME_FIRST_PART)
|
||||
}
|
||||
|
||||
// TODO: return different http error codes
|
||||
|
||||
@@ -117,7 +117,10 @@ class NeonCompare(PgCompare):
|
||||
self.timeline = self.env.neon_cli.create_timeline(branch_name, tenant_id=self.tenant)
|
||||
|
||||
# Start pg
|
||||
self._pg = self.env.endpoints.create_start(branch_name, "main", self.tenant)
|
||||
config_lines = ["max_replication_write_lag=-1", "max_replication_flush_lag=-1"]
|
||||
self._pg = self.env.endpoints.create_start(
|
||||
branch_name, "main", self.tenant, config_lines=config_lines
|
||||
)
|
||||
|
||||
@property
|
||||
def pg(self) -> PgProtocol:
|
||||
@@ -294,7 +297,7 @@ def remote_compare(zenbenchmark: NeonBenchmarker, remote_pg: RemotePostgres) ->
|
||||
return RemoteCompare(zenbenchmark, remote_pg)
|
||||
|
||||
|
||||
@pytest.fixture(params=["vanilla_compare", "neon_compare"], ids=["vanilla", "neon"])
|
||||
@pytest.fixture(params=["neon_compare"], ids=["neon"])
|
||||
def neon_with_baseline(request: FixtureRequest) -> PgCompare:
|
||||
"""Parameterized fixture that helps compare neon against vanilla postgres.
|
||||
|
||||
|
||||
@@ -35,7 +35,7 @@ def init_pgbench(env: PgCompare, cmdline, password: None):
|
||||
t0 = timeit.default_timer()
|
||||
with env.record_pageserver_writes("init.pageserver_writes"):
|
||||
out = env.pg_bin.run_capture(cmdline, env=environ)
|
||||
env.flush()
|
||||
# env.flush()
|
||||
|
||||
duration = timeit.default_timer() - t0
|
||||
end_timestamp = utc_now_timestamp()
|
||||
@@ -94,9 +94,7 @@ def run_test_pgbench(env: PgCompare, scale: int, duration: int, workload_type: P
|
||||
|
||||
if workload_type == PgBenchLoadType.INIT:
|
||||
# Run initialize
|
||||
init_pgbench(
|
||||
env, ["pgbench", f"-s{scale}", "-i", "-I", "dtGvp", connstr], password=password
|
||||
)
|
||||
init_pgbench(env, ["pgbench", f"-s{scale}", "-i", "-I", "dtG", connstr], password=password)
|
||||
|
||||
if workload_type == PgBenchLoadType.SIMPLE_UPDATE:
|
||||
# Run simple-update workload
|
||||
@@ -151,7 +149,7 @@ def get_durations_matrix(default: int = 45) -> List[int]:
|
||||
return rv
|
||||
|
||||
|
||||
def get_scales_matrix(default: int = 10) -> List[int]:
|
||||
def get_scales_matrix(default: int = 100) -> List[int]:
|
||||
scales = os.getenv("TEST_PG_BENCH_SCALES_MATRIX", default=str(default))
|
||||
rv = []
|
||||
for s in scales.split(","):
|
||||
@@ -172,8 +170,8 @@ def get_scales_matrix(default: int = 10) -> List[int]:
|
||||
@pytest.mark.parametrize("duration", get_durations_matrix())
|
||||
def test_pgbench(neon_with_baseline: PgCompare, scale: int, duration: int):
|
||||
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.INIT)
|
||||
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SIMPLE_UPDATE)
|
||||
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SELECT_ONLY)
|
||||
# run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SIMPLE_UPDATE)
|
||||
# run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SELECT_ONLY)
|
||||
|
||||
|
||||
# The following 3 tests run on an existing database as it was set up by previous tests,
|
||||
|
||||
@@ -203,16 +203,6 @@ def test_import_at_2bil(
|
||||
$$;
|
||||
"""
|
||||
)
|
||||
|
||||
# Also create a multi-XID with members past the 2 billion mark
|
||||
conn2 = endpoint.connect()
|
||||
cur2 = conn2.cursor()
|
||||
cur.execute("INSERT INTO t VALUES ('x')")
|
||||
cur.execute("BEGIN; select * from t WHERE t = 'x' FOR SHARE;")
|
||||
cur2.execute("BEGIN; select * from t WHERE t = 'x' FOR SHARE;")
|
||||
cur.execute("COMMIT")
|
||||
cur2.execute("COMMIT")
|
||||
|
||||
# A checkpoint writes a WAL record with xl_xid=0. Many other WAL
|
||||
# records would have the same effect.
|
||||
cur.execute("checkpoint")
|
||||
@@ -227,4 +217,4 @@ def test_import_at_2bil(
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
cur.execute("SELECT count(*) from t")
|
||||
assert cur.fetchone() == (10000 + 1 + 1,)
|
||||
assert cur.fetchone() == (10000 + 1,)
|
||||
|
||||
Reference in New Issue
Block a user