Compare commits

..

11 Commits

Author SHA1 Message Date
Alexander Bayandin
05a8ec269a wip 2024-09-12 21:12:44 +01:00
Stefan Radig
fcab61bdcd Prototype implementation for private access poc (#8976)
## Problem
For the Private Access POC we want users to be able to disable access
from the public proxy. To limit the number of changes this can be done
by configuring an IP allowlist [ "255.255.255.255" ]. For the Private
Access proxy a new commandline flag allows to disable IP allowlist
completely.

See
https://www.notion.so/neondatabase/Neon-Private-Access-POC-Proposal-8f707754e1ab4190ad5709da7832f020?d=887495c15e884aa4973f973a8a0a582a#7ac6ec249b524a74adbeddc4b84b8f5f
for details about the POC.,

## Summary of changes
- Adding the commandline flag is_private_access_proxy=true will disable
IP allowlist
2024-09-12 15:55:12 +01:00
Tristan Partin
9e3ead3689 Collect the last of on-demand WAL download in CreateReplicationSlot reverts
Signed-off-by: Tristan Partin <tristan@neon.tech>
2024-09-12 11:31:38 +01:00
Heikki Linnakangas
8dc069037b Remove NeonEnvBuilder.start() function
It feels wrong to me to start() from the builder object. Surely the
thing you start is the environment itself, not its configuration.
2024-09-12 01:28:56 +03:00
Heikki Linnakangas
0a363c3dce Add --timeline-id option to "neon_local timeline branch" command
Makes it consistent with the "timeline create" and "timeline import"
commands, which allowed you to pass the timeline id as argument. This
also makes it unnecessary to parse the timeline ID from the output in
the python function that calls it.
2024-09-12 01:28:56 +03:00
Heikki Linnakangas
aeca15008c Remove obsolete and misleading comment
The tenant ID was not actually generated here but in NeonEnvBuilder.
And the "neon_local init" command hasn't been able to generate the
initial tenant since 8712e1899e anyway.
2024-09-12 01:28:56 +03:00
Heikki Linnakangas
43846b72fa Remove unused "neon_local init --pg-version" arg
It has been unused since commit 8712e1899e, when it stopped creating
the initial timeline.
2024-09-12 01:28:56 +03:00
John Spray
cb060548fb libs: tweak PageserverUtilization::is_overloaded (#8946)
## Problem

Having run in production for a while, we see that nodes are generally
safely oversubscribed by about a factor of 2.

## Summary of changes

Tweak the is_overloaded method to check for utililzation over 200%
rather than over 100%
2024-09-11 18:45:34 +01:00
Folke Behrens
bae793ffcd proxy: Handle all let underscore instances (#8898)
* Most can be simply replaced
* One instance renamed to _rtchk (return-type check)
2024-09-10 15:36:08 +02:00
John Spray
26b5fcdc50 reinstate write-path key check (#8973)
## Problem

In https://github.com/neondatabase/neon/pull/8621, validation of keys
during ingest was removed because the places where we actually store
keys are now past the point where we have already converted them to
CompactKey (i128) representation.

## Summary of changes

Reinstate validation at an earlier stage in ingest. This doesn't cover
literally every place we write a key, but it covers most cases where
we're trusting postgres to give us a valid key (i.e. one that doesn't
try and use a custom spacenode).
2024-09-10 12:54:25 +01:00
Arpad Müller
97582178cb Remove async_trait from the Handler trait (#8958)
Newest attempt to remove `async_trait` from the Handler trait.

Earlier attempts were in #7301 and #8296 .
2024-09-10 02:40:00 +02:00
53 changed files with 529 additions and 1024 deletions

View File

@@ -0,0 +1 @@
FROM neondatabase/build-tools:pinned

View File

@@ -0,0 +1,23 @@
// https://containers.dev/implementors/json_reference/
{
"name": "Neon",
"build": {
"context": "..",
"dockerfile": "Dockerfile.devcontainer"
},
"postCreateCommand": {
"build neon": "BUILD_TYPE=debug CARGO_BUILD_FLAGS='--features=testing' mold -run make -s -j`nproc`",
"install python deps": "./scripts/pysync"
},
"customizations": {
"vscode": {
"extensions": [
"charliermarsh.ruff",
"github.vscode-github-actions",
"rust-lang.rust-analyzer"
]
}
}
}

View File

@@ -640,6 +640,8 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
}
Some(("branch", branch_match)) => {
let tenant_id = get_tenant_id(branch_match, env)?;
let new_timeline_id =
parse_timeline_id(branch_match)?.unwrap_or(TimelineId::generate());
let new_branch_name = branch_match
.get_one::<String>("branch-name")
.ok_or_else(|| anyhow!("No branch name provided"))?;
@@ -658,7 +660,6 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
.map(|lsn_str| Lsn::from_str(lsn_str))
.transpose()
.context("Failed to parse ancestor start Lsn from the request")?;
let new_timeline_id = TimelineId::generate();
let storage_controller = StorageController::from_env(env);
let create_req = TimelineCreateRequest {
new_timeline_id,
@@ -1570,7 +1571,6 @@ fn cli() -> Command {
.value_parser(value_parser!(PathBuf))
.value_name("config")
)
.arg(pg_version_arg.clone())
.arg(force_arg)
)
.subcommand(
@@ -1583,6 +1583,7 @@ fn cli() -> Command {
.subcommand(Command::new("branch")
.about("Create a new timeline, using another timeline as a base, copying its data")
.arg(tenant_id_arg.clone())
.arg(timeline_id_arg.clone())
.arg(branch_name_arg.clone())
.arg(Arg::new("ancestor-branch-name").long("ancestor-branch-name")
.help("Use last Lsn of another timeline (and its data) as base when creating the new timeline. The timeline gets resolved by its branch name.").required(false))

View File

@@ -89,8 +89,19 @@ impl PageserverUtilization {
/// If a node is currently hosting more work than it can comfortably handle. This does not indicate that
/// it will fail, but it is a strong signal that more work should not be added unless there is no alternative.
///
/// When a node is overloaded, we may override soft affinity preferences and do things like scheduling
/// into a node in a less desirable AZ, if all the nodes in the preferred AZ are overloaded.
pub fn is_overloaded(score: RawScore) -> bool {
score >= Self::UTILIZATION_FULL
// Why the factor of two? This is unscientific but reflects behavior of real systems:
// - In terms of shard counts, a node's preferred max count is a soft limit intended to keep
// startup and housekeeping jobs nice and responsive. We can go to double this limit if needed
// until some more nodes are deployed.
// - In terms of disk space, the node's utilization heuristic assumes every tenant needs to
// hold its biggest timeline fully on disk, which is tends to be an over estimate when
// some tenants are very idle and have dropped layers from disk. In practice going up to
// double is generally better than giving up and scheduling in a sub-optimal AZ.
score >= 2 * Self::UTILIZATION_FULL
}
pub fn adjust_shard_count_max(&mut self, shard_count: u32) {

View File

@@ -81,17 +81,16 @@ pub fn is_expected_io_error(e: &io::Error) -> bool {
)
}
#[async_trait::async_trait]
pub trait Handler<IO> {
/// Handle single query.
/// postgres_backend will issue ReadyForQuery after calling this (this
/// might be not what we want after CopyData streaming, but currently we don't
/// care). It will also flush out the output buffer.
async fn process_query(
fn process_query(
&mut self,
pgb: &mut PostgresBackend<IO>,
query_string: &str,
) -> Result<(), QueryError>;
) -> impl Future<Output = Result<(), QueryError>>;
/// Called on startup packet receival, allows to process params.
///

View File

@@ -23,7 +23,6 @@ async fn make_tcp_pair() -> (TcpStream, TcpStream) {
struct TestHandler {}
#[async_trait::async_trait]
impl<IO: AsyncRead + AsyncWrite + Unpin + Send> Handler<IO> for TestHandler {
// return single col 'hey' for any query
async fn process_query(

View File

@@ -1199,7 +1199,6 @@ impl PageServerHandler {
}
}
#[async_trait::async_trait]
impl<IO> postgres_backend::Handler<IO> for PageServerHandler
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,

View File

@@ -1205,6 +1205,13 @@ impl<'a> DatadirModification<'a> {
img: Bytes,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
let key = rel_block_to_key(rel, blknum);
if !key.is_valid_key_on_write_path() {
anyhow::bail!(
"the request contains data not supported by pageserver at {}",
key
);
}
self.put(rel_block_to_key(rel, blknum), Value::Image(img));
Ok(())
}
@@ -1216,14 +1223,34 @@ impl<'a> DatadirModification<'a> {
blknum: BlockNumber,
img: Bytes,
) -> anyhow::Result<()> {
self.put(slru_block_to_key(kind, segno, blknum), Value::Image(img));
let key = slru_block_to_key(kind, segno, blknum);
if !key.is_valid_key_on_write_path() {
anyhow::bail!(
"the request contains data not supported by pageserver at {}",
key
);
}
self.put(key, Value::Image(img));
Ok(())
}
pub(crate) fn put_rel_page_image_zero(&mut self, rel: RelTag, blknum: BlockNumber) {
self.pending_zero_data_pages
.insert(rel_block_to_key(rel, blknum).to_compact());
pub(crate) fn put_rel_page_image_zero(
&mut self,
rel: RelTag,
blknum: BlockNumber,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
let key = rel_block_to_key(rel, blknum);
if !key.is_valid_key_on_write_path() {
anyhow::bail!(
"the request contains data not supported by pageserver: {} @ {}",
key,
self.lsn
);
}
self.pending_zero_data_pages.insert(key.to_compact());
self.pending_bytes += ZERO_PAGE.len();
Ok(())
}
pub(crate) fn put_slru_page_image_zero(
@@ -1231,10 +1258,18 @@ impl<'a> DatadirModification<'a> {
kind: SlruKind,
segno: u32,
blknum: BlockNumber,
) {
self.pending_zero_data_pages
.insert(slru_block_to_key(kind, segno, blknum).to_compact());
) -> anyhow::Result<()> {
let key = slru_block_to_key(kind, segno, blknum);
if !key.is_valid_key_on_write_path() {
anyhow::bail!(
"the request contains data not supported by pageserver: {} @ {}",
key,
self.lsn
);
}
self.pending_zero_data_pages.insert(key.to_compact());
self.pending_bytes += ZERO_PAGE.len();
Ok(())
}
/// Call this at the end of each WAL record.

View File

@@ -1222,7 +1222,7 @@ impl WalIngest {
if rec.blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0 {
// Tail of last remaining FSM page has to be zeroed.
// We are not precise here and instead of digging in FSM bitmap format just clear the whole page.
modification.put_rel_page_image_zero(rel, fsm_physical_page_no);
modification.put_rel_page_image_zero(rel, fsm_physical_page_no)?;
fsm_physical_page_no += 1;
}
let nblocks = get_relsize(modification, rel, ctx).await?;
@@ -1244,7 +1244,7 @@ impl WalIngest {
if rec.blkno % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 {
// Tail of last remaining vm page has to be zeroed.
// We are not precise here and instead of digging in VM bitmap format just clear the whole page.
modification.put_rel_page_image_zero(rel, vm_page_no);
modification.put_rel_page_image_zero(rel, vm_page_no)?;
vm_page_no += 1;
}
let nblocks = get_relsize(modification, rel, ctx).await?;
@@ -1737,7 +1737,7 @@ impl WalIngest {
continue;
}
modification.put_rel_page_image_zero(rel, gap_blknum);
modification.put_rel_page_image_zero(rel, gap_blknum)?;
}
}
Ok(())
@@ -1803,7 +1803,7 @@ impl WalIngest {
// fill the gap with zeros
for gap_blknum in old_nblocks..blknum {
modification.put_slru_page_image_zero(kind, segno, gap_blknum);
modification.put_slru_page_image_zero(kind, segno, gap_blknum)?;
}
}
Ok(())

View File

@@ -311,7 +311,9 @@ async fn auth_quirks(
let (allowed_ips, maybe_secret) = api.get_allowed_ips_and_secret(ctx, &info).await?;
// check allowed list
if !check_peer_addr_is_in_list(&ctx.peer_addr(), &allowed_ips) {
if config.ip_allowlist_check_enabled
&& !check_peer_addr_is_in_list(&ctx.peer_addr(), &allowed_ips)
{
return Err(auth::AuthError::ip_address_not_allowed(ctx.peer_addr()));
}
@@ -603,6 +605,7 @@ mod tests {
rate_limiter_enabled: true,
rate_limiter: AuthRateLimiter::new(&RateBucketInfo::DEFAULT_AUTH_SET),
rate_limit_ip_subnet: 64,
ip_allowlist_check_enabled: true,
});
async fn read_message(r: &mut (impl AsyncRead + Unpin), b: &mut BytesMut) -> PgMessage {

View File

@@ -538,4 +538,17 @@ mod tests {
));
Ok(())
}
#[test]
fn test_connection_blocker() {
fn check(v: serde_json::Value) -> bool {
let peer_addr = IpAddr::from([127, 0, 0, 1]);
let ip_list: Vec<IpPattern> = serde_json::from_value(v).unwrap();
check_peer_addr_is_in_list(&peer_addr, &ip_list)
}
assert!(check(json!([])));
assert!(check(json!(["127.0.0.1"])));
assert!(!check(json!(["255.255.255.255"])));
}
}

View File

@@ -224,6 +224,7 @@ fn build_config(args: &LocalProxyCliArgs) -> anyhow::Result<&'static ProxyConfig
rate_limiter_enabled: false,
rate_limiter: BucketRateLimiter::new(vec![]),
rate_limit_ip_subnet: 64,
ip_allowlist_check_enabled: true,
},
require_client_ip: false,
handshake_timeout: Duration::from_secs(10),

View File

@@ -224,6 +224,10 @@ struct ProxyCliArgs {
/// Whether to retry the wake_compute request
#[clap(long, default_value = config::RetryConfig::WAKE_COMPUTE_DEFAULT_VALUES)]
wake_compute_retry: String,
/// Configure if this is a private access proxy for the POC: In that case the proxy will ignore the IP allowlist
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
is_private_access_proxy: bool,
}
#[derive(clap::Args, Clone, Copy, Debug)]
@@ -682,6 +686,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
rate_limiter_enabled: args.auth_rate_limit_enabled,
rate_limiter: AuthRateLimiter::new(args.auth_rate_limit.clone()),
rate_limit_ip_subnet: args.auth_rate_limit_ip_subnet,
ip_allowlist_check_enabled: !args.is_private_access_proxy,
};
let config = Box::leak(Box::new(ProxyConfig {

View File

@@ -242,6 +242,6 @@ mod tests {
#[test]
fn test() {
let s = "{\"branch_created\":null,\"endpoint_created\":{\"endpoint_id\":\"ep-rapid-thunder-w0qqw2q9\"},\"project_created\":null,\"type\":\"endpoint_created\"}";
let _: ControlPlaneEventKey = serde_json::from_str(s).unwrap();
serde_json::from_str::<ControlPlaneEventKey>(s).unwrap();
}
}

View File

@@ -64,6 +64,7 @@ pub struct AuthenticationConfig {
pub rate_limiter_enabled: bool,
pub rate_limiter: AuthRateLimiter,
pub rate_limit_ip_subnet: u8,
pub ip_allowlist_check_enabled: bool,
}
impl TlsConfig {

View File

@@ -395,7 +395,7 @@ mod tests {
}
}
});
let _: KickSession<'_> = serde_json::from_str(&json.to_string())?;
serde_json::from_str::<KickSession<'_>>(&json.to_string())?;
Ok(())
}
@@ -403,7 +403,7 @@ mod tests {
#[test]
fn parse_db_info() -> anyhow::Result<()> {
// with password
let _: DatabaseInfo = serde_json::from_value(json!({
serde_json::from_value::<DatabaseInfo>(json!({
"host": "localhost",
"port": 5432,
"dbname": "postgres",
@@ -413,7 +413,7 @@ mod tests {
}))?;
// without password
let _: DatabaseInfo = serde_json::from_value(json!({
serde_json::from_value::<DatabaseInfo>(json!({
"host": "localhost",
"port": 5432,
"dbname": "postgres",
@@ -422,7 +422,7 @@ mod tests {
}))?;
// new field (forward compatibility)
let _: DatabaseInfo = serde_json::from_value(json!({
serde_json::from_value::<DatabaseInfo>(json!({
"host": "localhost",
"port": 5432,
"dbname": "postgres",
@@ -441,7 +441,7 @@ mod tests {
"address": "0.0.0.0",
"aux": dummy_aux(),
});
let _: WakeCompute = serde_json::from_str(&json.to_string())?;
serde_json::from_str::<WakeCompute>(&json.to_string())?;
Ok(())
}
@@ -451,18 +451,18 @@ mod tests {
let json = json!({
"role_secret": "secret",
});
let _: GetRoleSecret = serde_json::from_str(&json.to_string())?;
serde_json::from_str::<GetRoleSecret>(&json.to_string())?;
let json = json!({
"role_secret": "secret",
"allowed_ips": ["8.8.8.8"],
});
let _: GetRoleSecret = serde_json::from_str(&json.to_string())?;
serde_json::from_str::<GetRoleSecret>(&json.to_string())?;
let json = json!({
"role_secret": "secret",
"allowed_ips": ["8.8.8.8"],
"project_id": "project",
});
let _: GetRoleSecret = serde_json::from_str(&json.to_string())?;
serde_json::from_str::<GetRoleSecret>(&json.to_string())?;
Ok(())
}

View File

@@ -78,7 +78,7 @@ pub(crate) type ComputeReady = DatabaseInfo;
// TODO: replace with an http-based protocol.
struct MgmtHandler;
#[async_trait::async_trait]
impl postgres_backend::Handler<tokio::net::TcpStream> for MgmtHandler {
async fn process_query(
&mut self,

View File

@@ -6,7 +6,7 @@ use pq_proto::StartupMessageParams;
use smol_str::SmolStr;
use std::net::IpAddr;
use tokio::sync::mpsc;
use tracing::{field::display, info, info_span, Span};
use tracing::{debug, field::display, info, info_span, Span};
use try_lock::TryLock;
use uuid::Uuid;
@@ -362,7 +362,9 @@ impl RequestMonitoringInner {
});
}
if let Some(tx) = self.sender.take() {
let _: Result<(), _> = tx.send(RequestData::from(&*self));
tx.send(RequestData::from(&*self))
.inspect_err(|e| debug!("tx send failed: {e}"))
.ok();
}
}
@@ -371,7 +373,9 @@ impl RequestMonitoringInner {
// Here we log the length of the session.
self.disconnect_timestamp = Some(Utc::now());
if let Some(tx) = self.disconnect_sender.take() {
let _: Result<(), _> = tx.send(RequestData::from(&*self));
tx.send(RequestData::from(&*self))
.inspect_err(|e| debug!("tx send failed: {e}"))
.ok();
}
}
}

View File

@@ -290,7 +290,7 @@ async fn worker_inner(
}
if !w.flushed_row_groups().is_empty() {
let _: Writer<BytesMut> = upload_parquet(w, len, &storage).await?;
let _rtchk: Writer<BytesMut> = upload_parquet(w, len, &storage).await?;
}
Ok(())

View File

@@ -3,7 +3,7 @@
#![deny(
deprecated,
future_incompatible,
// TODO: consider let_underscore
let_underscore,
nonstandard_style,
rust_2024_compatibility
)]

View File

@@ -268,7 +268,7 @@ async fn keepalive_is_inherited() -> anyhow::Result<()> {
anyhow::Ok(keepalive)
});
let _ = TcpStream::connect(("127.0.0.1", port)).await?;
TcpStream::connect(("127.0.0.1", port)).await?;
assert!(t.await??, "keepalive should be inherited");
Ok(())

View File

@@ -6,7 +6,7 @@ use redis::{
ConnectionInfo, IntoConnectionInfo, RedisConnectionInfo, RedisResult,
};
use tokio::task::JoinHandle;
use tracing::{error, info};
use tracing::{debug, error, info};
use super::elasticache::CredentialsProvider;
@@ -109,7 +109,10 @@ impl ConnectionWithCredentialsProvider {
let credentials_provider = credentials_provider.clone();
let con2 = con.clone();
let f = tokio::spawn(async move {
let _ = Self::keep_connection(con2, credentials_provider).await;
Self::keep_connection(con2, credentials_provider)
.await
.inspect_err(|e| debug!("keep_connection failed: {e}"))
.ok();
});
self.refresh_token_task = Some(f);
}

View File

@@ -50,7 +50,9 @@ impl PoolingBackend {
.as_ref()
.map(|()| user_info.clone());
let (allowed_ips, maybe_secret) = backend.get_allowed_ips_and_secret(ctx).await?;
if !check_peer_addr_is_in_list(&ctx.peer_addr(), &allowed_ips) {
if config.ip_allowlist_check_enabled
&& !check_peer_addr_is_in_list(&ctx.peer_addr(), &allowed_ips)
{
return Err(AuthError::ip_address_not_allowed(ctx.peer_addr()));
}
if !self

View File

@@ -12,6 +12,7 @@ use std::{io, task};
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio_rustls::server::TlsStream;
use tracing::debug;
/// Stream wrapper which implements libpq's protocol.
///
@@ -138,9 +139,10 @@ impl<S: AsyncWrite + Unpin> PqStream<S> {
);
// already error case, ignore client IO error
let _: Result<_, std::io::Error> = self
.write_message(&BeMessage::ErrorResponse(msg, None))
.await;
self.write_message(&BeMessage::ErrorResponse(msg, None))
.await
.inspect_err(|e| debug!("write_message failed: {e}"))
.ok();
Err(ReportedError {
source: anyhow::anyhow!(msg),
@@ -164,9 +166,10 @@ impl<S: AsyncWrite + Unpin> PqStream<S> {
);
// already error case, ignore client IO error
let _: Result<_, std::io::Error> = self
.write_message(&BeMessage::ErrorResponse(&msg, None))
.await;
self.write_message(&BeMessage::ErrorResponse(&msg, None))
.await
.inspect_err(|e| debug!("write_message failed: {e}"))
.ok();
Err(ReportedError {
source: anyhow::anyhow!(error),

View File

@@ -57,7 +57,7 @@ mod tests {
fn bad_url() {
let url = "test:foobar";
url.parse::<url::Url>().expect("unexpected parsing failure");
let _ = url.parse::<ApiUrl>().expect_err("should not parse");
url.parse::<ApiUrl>().expect_err("should not parse");
}
#[test]

View File

@@ -2,6 +2,7 @@
//! protocol commands.
use anyhow::Context;
use std::future::Future;
use std::str::{self, FromStr};
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
@@ -95,7 +96,6 @@ fn cmd_to_string(cmd: &SafekeeperPostgresCommand) -> &str {
}
}
#[async_trait::async_trait]
impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
for SafekeeperPostgresHandler
{
@@ -197,49 +197,51 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
Ok(())
}
async fn process_query(
fn process_query(
&mut self,
pgb: &mut PostgresBackend<IO>,
query_string: &str,
) -> Result<(), QueryError> {
if query_string
.to_ascii_lowercase()
.starts_with("set datestyle to ")
{
// important for debug because psycopg2 executes "SET datestyle TO 'ISO'" on connect
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
return Ok(());
}
let cmd = parse_cmd(query_string)?;
let cmd_str = cmd_to_string(&cmd);
let _guard = PG_QUERIES_GAUGE.with_label_values(&[cmd_str]).guard();
info!("got query {:?}", query_string);
let tenant_id = self.tenant_id.context("tenantid is required")?;
let timeline_id = self.timeline_id.context("timelineid is required")?;
self.check_permission(Some(tenant_id))?;
self.ttid = TenantTimelineId::new(tenant_id, timeline_id);
match cmd {
SafekeeperPostgresCommand::StartWalPush => {
self.handle_start_wal_push(pgb)
.instrument(info_span!("WAL receiver"))
.await
) -> impl Future<Output = Result<(), QueryError>> {
Box::pin(async move {
if query_string
.to_ascii_lowercase()
.starts_with("set datestyle to ")
{
// important for debug because psycopg2 executes "SET datestyle TO 'ISO'" on connect
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
return Ok(());
}
SafekeeperPostgresCommand::StartReplication { start_lsn, term } => {
self.handle_start_replication(pgb, start_lsn, term)
.instrument(info_span!("WAL sender"))
.await
let cmd = parse_cmd(query_string)?;
let cmd_str = cmd_to_string(&cmd);
let _guard = PG_QUERIES_GAUGE.with_label_values(&[cmd_str]).guard();
info!("got query {:?}", query_string);
let tenant_id = self.tenant_id.context("tenantid is required")?;
let timeline_id = self.timeline_id.context("timelineid is required")?;
self.check_permission(Some(tenant_id))?;
self.ttid = TenantTimelineId::new(tenant_id, timeline_id);
match cmd {
SafekeeperPostgresCommand::StartWalPush => {
self.handle_start_wal_push(pgb)
.instrument(info_span!("WAL receiver"))
.await
}
SafekeeperPostgresCommand::StartReplication { start_lsn, term } => {
self.handle_start_replication(pgb, start_lsn, term)
.instrument(info_span!("WAL sender"))
.await
}
SafekeeperPostgresCommand::IdentifySystem => self.handle_identify_system(pgb).await,
SafekeeperPostgresCommand::TimelineStatus => self.handle_timeline_status(pgb).await,
SafekeeperPostgresCommand::JSONCtrl { ref cmd } => {
handle_json_ctrl(self, pgb, cmd).await
}
}
SafekeeperPostgresCommand::IdentifySystem => self.handle_identify_system(pgb).await,
SafekeeperPostgresCommand::TimelineStatus => self.handle_timeline_status(pgb).await,
SafekeeperPostgresCommand::JSONCtrl { ref cmd } => {
handle_json_ctrl(self, pgb, cmd).await
}
}
})
}
}

View File

@@ -10,8 +10,4 @@ pytest_plugins = (
"fixtures.compare_fixtures",
"fixtures.slow",
"fixtures.flaky",
"fixtures.shared_fixtures",
"fixtures.function.neon_storage",
"fixtures.session.neon_storage",
"fixtures.session.s3",
)

View File

@@ -1,48 +0,0 @@
from typing import Iterator, Optional, Dict, Any
import pytest
from _pytest.fixtures import FixtureRequest
from fixtures.neon_fixtures import NeonEnv
from fixtures.shared_fixtures import TTenant, TTimeline
@pytest.fixture(scope="function")
def tenant_config() -> Dict[str, Any]:
return dict()
@pytest.fixture(scope="function")
def tenant(
neon_shared_env: NeonEnv,
request: FixtureRequest,
tenant_config: Optional[Dict[str, Any]] = None,
) -> Iterator[TTenant]:
tenant = TTenant(env=neon_shared_env, name=request.node.name, config=tenant_config)
yield tenant
tenant.shutdown_resources()
@pytest.fixture(scope="function")
def timeline(
tenant: TTenant,
) -> Iterator[TTimeline]:
timeline = tenant.default_timeline
yield timeline
@pytest.fixture(scope="function")
def exclusive_tenant(
neon_env: NeonEnv,
request: FixtureRequest,
tenant_config: Optional[Dict[str, Any]] = None,
) -> Iterator[TTenant]:
tenant = TTenant(env=neon_env, name=request.node.name, config=tenant_config)
yield tenant
tenant.shutdown_resources()
@pytest.fixture(scope="function")
def exclusive_timeline(
exclusive_tenant: TTenant,
) -> Iterator[TTimeline]:
timeline = exclusive_tenant.default_timeline
yield timeline

View File

@@ -93,6 +93,7 @@ from fixtures.utils import (
allure_add_grafana_links,
allure_attach_from_dir,
assert_no_errors,
get_self_dir,
print_gc_result,
subprocess_capture,
wait_until,
@@ -125,6 +126,122 @@ Env = Dict[str, str]
DEFAULT_OUTPUT_DIR: str = "test_output"
DEFAULT_BRANCH_NAME: str = "main"
BASE_PORT: int = 15000
@pytest.fixture(scope="session")
def base_dir() -> Iterator[Path]:
# find the base directory (currently this is the git root)
base_dir = get_self_dir().parent.parent
log.info(f"base_dir is {base_dir}")
yield base_dir
@pytest.fixture(scope="function")
def neon_binpath(base_dir: Path, build_type: str) -> Iterator[Path]:
if os.getenv("REMOTE_ENV"):
# we are in remote env and do not have neon binaries locally
# this is the case for benchmarks run on self-hosted runner
return
# Find the neon binaries.
if env_neon_bin := os.environ.get("NEON_BIN"):
binpath = Path(env_neon_bin)
else:
binpath = base_dir / "target" / build_type
log.info(f"neon_binpath is {binpath}")
if not (binpath / "pageserver").exists():
raise Exception(f"neon binaries not found at '{binpath}'")
yield binpath
@pytest.fixture(scope="function")
def pg_distrib_dir(base_dir: Path) -> Iterator[Path]:
if env_postgres_bin := os.environ.get("POSTGRES_DISTRIB_DIR"):
distrib_dir = Path(env_postgres_bin).resolve()
else:
distrib_dir = base_dir / "pg_install"
log.info(f"pg_distrib_dir is {distrib_dir}")
yield distrib_dir
@pytest.fixture(scope="session")
def top_output_dir(base_dir: Path) -> Iterator[Path]:
# Compute the top-level directory for all tests.
if env_test_output := os.environ.get("TEST_OUTPUT"):
output_dir = Path(env_test_output).resolve()
else:
output_dir = base_dir / DEFAULT_OUTPUT_DIR
output_dir.mkdir(exist_ok=True)
log.info(f"top_output_dir is {output_dir}")
yield output_dir
@pytest.fixture(scope="function")
def versioned_pg_distrib_dir(pg_distrib_dir: Path, pg_version: PgVersion) -> Iterator[Path]:
versioned_dir = pg_distrib_dir / pg_version.v_prefixed
psql_bin_path = versioned_dir / "bin/psql"
postgres_bin_path = versioned_dir / "bin/postgres"
if os.getenv("REMOTE_ENV"):
# When testing against a remote server, we only need the client binary.
if not psql_bin_path.exists():
raise Exception(f"psql not found at '{psql_bin_path}'")
else:
if not postgres_bin_path.exists():
raise Exception(f"postgres not found at '{postgres_bin_path}'")
log.info(f"versioned_pg_distrib_dir is {versioned_dir}")
yield versioned_dir
@pytest.fixture(scope="session")
def neon_api_key() -> str:
api_key = os.getenv("NEON_API_KEY")
if not api_key:
raise AssertionError("Set the NEON_API_KEY environment variable")
return api_key
@pytest.fixture(scope="session")
def neon_api_base_url() -> str:
return os.getenv("NEON_API_BASE_URL", "https://console-stage.neon.build/api/v2")
@pytest.fixture(scope="session")
def neon_api(neon_api_key: str, neon_api_base_url: str) -> NeonAPI:
return NeonAPI(neon_api_key, neon_api_base_url)
@pytest.fixture(scope="session")
def worker_port_num():
return (32768 - BASE_PORT) // int(os.environ.get("PYTEST_XDIST_WORKER_COUNT", "1"))
@pytest.fixture(scope="session")
def worker_seq_no(worker_id: str) -> int:
# worker_id is a pytest-xdist fixture
# it can be master or gw<number>
# parse it to always get a number
if worker_id == "master":
return 0
assert worker_id.startswith("gw")
return int(worker_id[2:])
@pytest.fixture(scope="session")
def worker_base_port(worker_seq_no: int, worker_port_num: int) -> int:
# so we divide ports in ranges of ports
# so workers have disjoint set of ports for services
return BASE_PORT + worker_seq_no * worker_port_num
def get_dir_size(path: str) -> int:
"""Return size in bytes."""
@@ -136,6 +253,11 @@ def get_dir_size(path: str) -> int:
return totalbytes
@pytest.fixture(scope="session")
def port_distributor(worker_base_port: int, worker_port_num: int) -> PortDistributor:
return PortDistributor(base_port=worker_base_port, port_number=worker_port_num)
@pytest.fixture(scope="function")
def default_broker(
port_distributor: PortDistributor,
@@ -151,6 +273,18 @@ def default_broker(
broker.stop()
@pytest.fixture(scope="session")
def run_id() -> Iterator[uuid.UUID]:
yield uuid.uuid4()
@pytest.fixture(scope="session")
def mock_s3_server(port_distributor: PortDistributor) -> Iterator[MockS3Server]:
mock_s3_server = MockS3Server(port_distributor.get_port())
yield mock_s3_server
mock_s3_server.kill()
class PgProtocol:
"""Reusable connection logic"""
@@ -350,7 +484,6 @@ class NeonEnvBuilder:
safekeeper_extra_opts: Optional[list[str]] = None,
storage_controller_port_override: Optional[int] = None,
pageserver_io_buffer_alignment: Optional[int] = None,
shared: Optional[bool] = False,
):
self.repo_dir = repo_dir
self.rust_log_override = rust_log_override
@@ -407,8 +540,8 @@ class NeonEnvBuilder:
self.pageserver_io_buffer_alignment = pageserver_io_buffer_alignment
assert (
test_name.startswith("test_") or shared
assert test_name.startswith(
"test_"
), "Unexpectedly instantiated from outside a test function"
self.test_name = test_name
@@ -420,10 +553,6 @@ class NeonEnvBuilder:
self.env = NeonEnv(self)
return self.env
def start(self):
assert self.env is not None, "environment is not already initialized, call init() first"
self.env.start()
def init_start(
self,
initial_tenant_conf: Optional[Dict[str, Any]] = None,
@@ -439,7 +568,7 @@ class NeonEnvBuilder:
Configuring pageserver with remote storage is now the default. There will be a warning if pageserver is created without one.
"""
env = self.init_configs(default_remote_storage_if_missing=default_remote_storage_if_missing)
self.start()
env.start()
# Prepare the default branch to start the postgres on later.
# Pageserver itself does not create tenants and timelines, until started first and asked via HTTP API.
@@ -940,9 +1069,6 @@ class NeonEnv:
self.pg_distrib_dir = config.pg_distrib_dir
self.endpoint_counter = 0
self.storage_controller_config = config.storage_controller_config
# generate initial tenant ID here instead of letting 'neon init' generate it,
# so that we don't need to dig it out of the config file afterwards.
self.initial_tenant = config.initial_tenant
self.initial_timeline = config.initial_timeline
@@ -1320,21 +1446,6 @@ def neon_simple_env(
yield env
@pytest.fixture(scope="function")
def neon_endpoint(request: FixtureRequest, neon_shared_env: NeonEnv) -> Endpoint:
neon_shared_env.neon_cli.create_branch(request.node.name)
ep = neon_shared_env.endpoints.create_start(request.node.name)
try:
yield ep
finally:
if ep.is_running():
try:
ep.stop()
except BaseException:
pass
@pytest.fixture(scope="function")
def neon_env_builder(
pytestconfig: Config,
@@ -1403,14 +1514,6 @@ class PageserverPort:
http: int
CREATE_TIMELINE_ID_EXTRACTOR: re.Pattern = re.compile( # type: ignore[type-arg]
r"^Created timeline '(?P<timeline_id>[^']+)'", re.MULTILINE
)
TIMELINE_DATA_EXTRACTOR: re.Pattern = re.compile( # type: ignore[type-arg]
r"\s?(?P<branch_name>[^\s]+)\s\[(?P<timeline_id>[^\]]+)\]", re.MULTILINE
)
class AbstractNeonCli(abc.ABC):
"""
A typed wrapper around an arbitrary Neon CLI tool.
@@ -1639,6 +1742,9 @@ class NeonCli(AbstractNeonCli):
tenant_id: Optional[TenantId] = None,
timeline_id: Optional[TimelineId] = None,
) -> TimelineId:
if timeline_id is None:
timeline_id = TimelineId.generate()
cmd = [
"timeline",
"create",
@@ -1646,23 +1752,16 @@ class NeonCli(AbstractNeonCli):
new_branch_name,
"--tenant-id",
str(tenant_id or self.env.initial_tenant),
"--timeline-id",
str(timeline_id),
"--pg-version",
self.env.pg_version,
]
if timeline_id is not None:
cmd.extend(["--timeline-id", str(timeline_id)])
res = self.raw_cli(cmd)
res.check_returncode()
matches = CREATE_TIMELINE_ID_EXTRACTOR.search(res.stdout)
created_timeline_id = None
if matches is not None:
created_timeline_id = matches.group("timeline_id")
return TimelineId(str(created_timeline_id))
return timeline_id
def create_branch(
self,
@@ -1670,12 +1769,17 @@ class NeonCli(AbstractNeonCli):
ancestor_branch_name: Optional[str] = None,
tenant_id: Optional[TenantId] = None,
ancestor_start_lsn: Optional[Lsn] = None,
new_timeline_id: Optional[TimelineId] = None,
) -> TimelineId:
if new_timeline_id is None:
new_timeline_id = TimelineId.generate()
cmd = [
"timeline",
"branch",
"--branch-name",
new_branch_name,
"--timeline-id",
str(new_timeline_id),
"--tenant-id",
str(tenant_id or self.env.initial_tenant),
]
@@ -1687,16 +1791,7 @@ class NeonCli(AbstractNeonCli):
res = self.raw_cli(cmd)
res.check_returncode()
matches = CREATE_TIMELINE_ID_EXTRACTOR.search(res.stdout)
created_timeline_id = None
if matches is not None:
created_timeline_id = matches.group("timeline_id")
if created_timeline_id is None:
raise Exception("could not find timeline id after `neon timeline create` invocation")
else:
return TimelineId(str(created_timeline_id))
return TimelineId(str(new_timeline_id))
def list_timelines(self, tenant_id: Optional[TenantId] = None) -> List[Tuple[str, TimelineId]]:
"""
@@ -1705,6 +1800,9 @@ class NeonCli(AbstractNeonCli):
# main [b49f7954224a0ad25cc0013ea107b54b]
# ┣━ @0/16B5A50: test_cli_branch_list_main [20f98c79111b9015d84452258b7d5540]
TIMELINE_DATA_EXTRACTOR: re.Pattern = re.compile( # type: ignore[type-arg]
r"\s?(?P<branch_name>[^\s]+)\s\[(?P<timeline_id>[^\]]+)\]", re.MULTILINE
)
res = self.raw_cli(
["timeline", "list", "--tenant-id", str(tenant_id or self.env.initial_tenant)]
)
@@ -4690,35 +4788,27 @@ def _get_test_dir(request: FixtureRequest, top_output_dir: Path, prefix: str) ->
return test_dir
def get_test_output_dir(
request: FixtureRequest, top_output_dir: Path, prefix: Optional[str] = None
) -> Path:
def get_test_output_dir(request: FixtureRequest, top_output_dir: Path) -> Path:
"""
The working directory for a test.
"""
return _get_test_dir(request, top_output_dir, prefix or "")
return _get_test_dir(request, top_output_dir, "")
def get_test_overlay_dir(
request: FixtureRequest, top_output_dir: Path, prefix: Optional[str] = None
) -> Path:
def get_test_overlay_dir(request: FixtureRequest, top_output_dir: Path) -> Path:
"""
Directory that contains `upperdir` and `workdir` for overlayfs mounts
that a test creates. See `NeonEnvBuilder.overlay_mount`.
"""
return _get_test_dir(request, top_output_dir, f"overlay-{prefix or ''}")
return _get_test_dir(request, top_output_dir, "overlay-")
def get_shared_snapshot_dir_path(
top_output_dir: Path, snapshot_name: str, prefix: Optional[str] = None
) -> Path:
return top_output_dir / f"{prefix or ''}shared-snapshots" / snapshot_name
def get_shared_snapshot_dir_path(top_output_dir: Path, snapshot_name: str) -> Path:
return top_output_dir / "shared-snapshots" / snapshot_name
def get_test_repo_dir(
request: FixtureRequest, top_output_dir: Path, prefix: Optional[str] = None
) -> Path:
return get_test_output_dir(request, top_output_dir, prefix or "") / "repo"
def get_test_repo_dir(request: FixtureRequest, top_output_dir: Path) -> Path:
return get_test_output_dir(request, top_output_dir) / "repo"
def pytest_addoption(parser: Parser):
@@ -5083,7 +5173,7 @@ def tenant_get_shards(
return [(TenantShardId(tenant_id, 0, 0), override_pageserver or env.pageserver)]
def wait_replica_caughtup(primary: PgProtocol, secondary: PgProtocol):
def wait_replica_caughtup(primary: Endpoint, secondary: Endpoint):
primary_lsn = Lsn(
primary.safe_psql_scalar("SELECT pg_current_wal_flush_lsn()", log_query=False)
)

View File

@@ -14,60 +14,32 @@ Dynamically parametrize tests by different parameters
"""
def get_pgversions():
if (v := os.getenv("DEFAULT_PG_VERSION")) is None:
pg_versions = [version for version in PgVersion if version != PgVersion.NOT_SET]
else:
pg_versions = [PgVersion(v)]
return pg_versions
@pytest.fixture(scope="function", autouse=True)
def pg_version() -> Optional[PgVersion]:
return None
@pytest.fixture(
scope="session",
autouse=True,
params=get_pgversions(),
ids=lambda v: f"pg{v}",
)
def pg_version(request) -> Optional[PgVersion]:
return request.param
@pytest.fixture(scope="function", autouse=True)
def build_type() -> Optional[str]:
return None
def get_buildtypes():
if (bt := os.getenv("BUILD_TYPE")) is None:
build_types = ["debug", "release"]
else:
build_types = [bt.lower()]
return build_types
@pytest.fixture(
scope="session",
autouse=True,
params=get_buildtypes(),
ids=lambda t: f"{t}",
)
def build_type(request) -> Optional[str]:
return request.param
@pytest.fixture(scope="session", autouse=True)
@pytest.fixture(scope="function", autouse=True)
def platform() -> Optional[str]:
return None
@pytest.fixture(scope="session", autouse=True)
@pytest.fixture(scope="function", autouse=True)
def pageserver_virtual_file_io_engine() -> Optional[str]:
return os.getenv("PAGESERVER_VIRTUAL_FILE_IO_ENGINE")
@pytest.fixture(scope="session", autouse=True)
@pytest.fixture(scope="function", autouse=True)
def pageserver_io_buffer_alignment() -> Optional[int]:
return None
@pytest.fixture(scope="session", autouse=True)
@pytest.fixture(scope="function", autouse=True)
def pageserver_aux_file_policy() -> Optional[AuxFileStore]:
return None
@@ -81,12 +53,26 @@ def get_pageserver_default_tenant_config_compaction_algorithm() -> Optional[Dict
return v
@pytest.fixture(scope="session", autouse=True)
@pytest.fixture(scope="function", autouse=True)
def pageserver_default_tenant_config_compaction_algorithm() -> Optional[Dict[str, Any]]:
return get_pageserver_default_tenant_config_compaction_algorithm()
def pytest_generate_tests(metafunc: Metafunc):
if (bt := os.getenv("BUILD_TYPE")) is None:
build_types = ["debug", "release"]
else:
build_types = [bt.lower()]
metafunc.parametrize("build_type", build_types)
if (v := os.getenv("DEFAULT_PG_VERSION")) is None:
pg_versions = [version for version in PgVersion if version != PgVersion.NOT_SET]
else:
pg_versions = [PgVersion(v)]
metafunc.parametrize("pg_version", pg_versions, ids=map(lambda v: f"pg{v}", pg_versions))
# A hacky way to parametrize tests only for `pageserver_virtual_file_io_engine=std-fs`
# And do not change test name for default `pageserver_virtual_file_io_engine=tokio-epoll-uring` to keep tests statistics
if (io_engine := os.getenv("PAGESERVER_VIRTUAL_FILE_IO_ENGINE", "")) not in (
@@ -103,7 +89,6 @@ def pytest_generate_tests(metafunc: Metafunc):
"pageserver_default_tenant_config_compaction_algorithm",
[explicit_default],
ids=[explicit_default["kind"]],
scope="session",
)
# For performance tests, parametrize also by platform

View File

@@ -1,303 +0,0 @@
import os
import shutil
import subprocess
import uuid
from pathlib import Path
from typing import Any, Dict, Iterator, Optional, cast
import pytest
from _pytest.config import Config
from _pytest.fixtures import FixtureRequest
from fixtures import overlayfs
from fixtures.broker import NeonBroker
from fixtures.log_helper import log
from fixtures.neon_api import NeonAPI
from fixtures.neon_fixtures import (
DEFAULT_OUTPUT_DIR,
NeonEnv,
NeonEnvBuilder,
get_test_output_dir,
get_test_overlay_dir,
get_test_repo_dir,
)
from fixtures.pg_version import PgVersion
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import MockS3Server
from fixtures.utils import AuxFileStore, allure_attach_from_dir, get_self_dir
BASE_PORT: int = 15000
@pytest.fixture(scope="session")
def base_dir() -> Iterator[Path]:
# find the base directory (currently this is the git root)
base_dir = get_self_dir().parent.parent
log.info(f"base_dir is {base_dir}")
yield base_dir
@pytest.fixture(scope="session")
def neon_binpath(base_dir: Path, build_type: str) -> Iterator[Path]:
if os.getenv("REMOTE_ENV"):
# we are in remote env and do not have neon binaries locally
# this is the case for benchmarks run on self-hosted runner
return
# Find the neon binaries.
if env_neon_bin := os.environ.get("NEON_BIN"):
binpath = Path(env_neon_bin)
else:
binpath = base_dir / "target" / build_type
log.info(f"neon_binpath is {binpath}")
if not (binpath / "pageserver").exists():
raise Exception(f"neon binaries not found at '{binpath}'")
yield binpath
@pytest.fixture(scope="session")
def pg_distrib_dir(base_dir: Path) -> Iterator[Path]:
if env_postgres_bin := os.environ.get("POSTGRES_DISTRIB_DIR"):
distrib_dir = Path(env_postgres_bin).resolve()
else:
distrib_dir = base_dir / "pg_install"
log.info(f"pg_distrib_dir is {distrib_dir}")
yield distrib_dir
@pytest.fixture(scope="session")
def top_output_dir(base_dir: Path) -> Iterator[Path]:
# Compute the top-level directory for all tests.
if env_test_output := os.environ.get("TEST_OUTPUT"):
output_dir = Path(env_test_output).resolve()
else:
output_dir = base_dir / DEFAULT_OUTPUT_DIR
output_dir.mkdir(exist_ok=True)
log.info(f"top_output_dir is {output_dir}")
yield output_dir
@pytest.fixture(scope="session")
def versioned_pg_distrib_dir(pg_distrib_dir: Path, pg_version: PgVersion) -> Iterator[Path]:
versioned_dir = pg_distrib_dir / pg_version.v_prefixed
psql_bin_path = versioned_dir / "bin/psql"
postgres_bin_path = versioned_dir / "bin/postgres"
if os.getenv("REMOTE_ENV"):
# When testing against a remote server, we only need the client binary.
if not psql_bin_path.exists():
raise Exception(f"psql not found at '{psql_bin_path}'")
else:
if not postgres_bin_path.exists():
raise Exception(f"postgres not found at '{postgres_bin_path}'")
log.info(f"versioned_pg_distrib_dir is {versioned_dir}")
yield versioned_dir
@pytest.fixture(scope="session")
def neon_api_key() -> str:
api_key = os.getenv("NEON_API_KEY")
if not api_key:
raise AssertionError("Set the NEON_API_KEY environment variable")
return api_key
@pytest.fixture(scope="session")
def neon_api_base_url() -> str:
return os.getenv("NEON_API_BASE_URL", "https://console-stage.neon.build/api/v2")
@pytest.fixture(scope="session")
def neon_api(neon_api_key: str, neon_api_base_url: str) -> NeonAPI:
return NeonAPI(neon_api_key, neon_api_base_url)
@pytest.fixture(scope="session")
def worker_port_num():
return (32768 - BASE_PORT) // int(os.environ.get("PYTEST_XDIST_WORKER_COUNT", "1"))
@pytest.fixture(scope="session")
def worker_seq_no(worker_id: str) -> int:
# worker_id is a pytest-xdist fixture
# it can be master or gw<number>
# parse it to always get a number
if worker_id == "master":
return 0
assert worker_id.startswith("gw")
return int(worker_id[2:])
@pytest.fixture(scope="session")
def worker_base_port(worker_seq_no: int, worker_port_num: int) -> int:
# so we divide ports in ranges of ports
# so workers have disjoint set of ports for services
return BASE_PORT + worker_seq_no * worker_port_num
@pytest.fixture(scope="session")
def port_distributor(worker_base_port: int, worker_port_num: int) -> PortDistributor:
return PortDistributor(base_port=worker_base_port, port_number=worker_port_num)
@pytest.fixture(scope="session")
def shared_broker(
port_distributor: PortDistributor,
shared_test_output_dir: Path,
neon_binpath: Path,
) -> Iterator[NeonBroker]:
# multiple pytest sessions could get launched in parallel, get them different ports/datadirs
client_port = port_distributor.get_port()
broker_logfile = shared_test_output_dir / "repo" / "storage_broker.log"
broker = NeonBroker(logfile=broker_logfile, port=client_port, neon_binpath=neon_binpath)
yield broker
broker.stop()
@pytest.fixture(scope="session")
def run_id() -> Iterator[uuid.UUID]:
yield uuid.uuid4()
@pytest.fixture(scope="session", autouse=True)
def neon_shared_env(
pytestconfig: Config,
port_distributor: PortDistributor,
mock_s3_server: MockS3Server,
shared_broker: NeonBroker,
run_id: uuid.UUID,
top_output_dir: Path,
shared_test_output_dir: Path,
neon_binpath: Path,
build_type: str,
pg_distrib_dir: Path,
pg_version: PgVersion,
pageserver_virtual_file_io_engine: str,
pageserver_aux_file_policy: Optional[AuxFileStore],
pageserver_default_tenant_config_compaction_algorithm: Optional[Dict[str, Any]],
pageserver_io_buffer_alignment: Optional[int],
request: FixtureRequest,
) -> Iterator[NeonEnv]:
"""
Simple Neon environment, with no authentication and no safekeepers.
This fixture will use RemoteStorageKind.LOCAL_FS with pageserver.
"""
prefix = f"shared[{build_type}-{pg_version.v_prefixed}]-"
# Create the environment in the per-test output directory
repo_dir = get_test_repo_dir(request, top_output_dir, prefix)
with NeonEnvBuilder(
top_output_dir=top_output_dir,
repo_dir=repo_dir,
port_distributor=port_distributor,
broker=shared_broker,
mock_s3_server=mock_s3_server,
neon_binpath=neon_binpath,
pg_distrib_dir=pg_distrib_dir,
pg_version=pg_version,
run_id=run_id,
preserve_database_files=cast(bool, pytestconfig.getoption("--preserve-database-files")),
test_name=f"{prefix}{request.node.name}",
test_output_dir=shared_test_output_dir,
pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine,
pageserver_aux_file_policy=pageserver_aux_file_policy,
pageserver_default_tenant_config_compaction_algorithm=pageserver_default_tenant_config_compaction_algorithm,
pageserver_io_buffer_alignment=pageserver_io_buffer_alignment,
shared=True,
) as builder:
env = builder.init_start()
yield env
# This is autouse, so the test output directory always gets created, even
# if a test doesn't put anything there.
#
# NB: we request the overlay dir fixture so the fixture does its cleanups
@pytest.fixture(scope="session")
def shared_test_output_dir(
request: FixtureRequest,
pg_version: PgVersion,
build_type: str,
top_output_dir: Path,
shared_test_overlay_dir: Path,
) -> Iterator[Path]:
"""Create the working directory for shared tests."""
prefix = f"shared[{build_type}-{pg_version.v_prefixed}]-"
# one directory per test
test_dir = get_test_output_dir(request, top_output_dir, prefix)
log.info(f"test_output_dir is {test_dir}")
shutil.rmtree(test_dir, ignore_errors=True)
test_dir.mkdir()
yield test_dir
# Allure artifacts creation might involve the creation of `.tar.zst` archives,
# which aren't going to be used if Allure results collection is not enabled
# (i.e. --alluredir is not set).
# Skip `allure_attach_from_dir` in this case
if not request.config.getoption("--alluredir"):
return
preserve_database_files = False
for k, v in request.node.user_properties:
# NB: the neon_env_builder fixture uses this fixture (test_output_dir).
# So, neon_env_builder's cleanup runs before here.
# The cleanup propagates NeonEnvBuilder.preserve_database_files into this user property.
if k == "preserve_database_files":
assert isinstance(v, bool)
preserve_database_files = v
allure_attach_from_dir(test_dir, preserve_database_files)
@pytest.fixture(scope="session")
def shared_test_overlay_dir(request: FixtureRequest, top_output_dir: Path) -> Optional[Path]:
"""
Idempotently create a test's overlayfs mount state directory.
If the functionality isn't enabled via env var, returns None.
The procedure cleans up after previous runs that were aborted (e.g. due to Ctrl-C, OOM kills, etc).
"""
if os.getenv("NEON_ENV_BUILDER_USE_OVERLAYFS_FOR_SNAPSHOTS") is None:
return None
overlay_dir = get_test_overlay_dir(request, top_output_dir)
log.info(f"test_overlay_dir is {overlay_dir}")
overlay_dir.mkdir(exist_ok=True)
# unmount stale overlayfs mounts which subdirectories of `overlay_dir/*` as the overlayfs `upperdir` and `workdir`
for mountpoint in overlayfs.iter_mounts_beneath(get_test_output_dir(request, top_output_dir)):
cmd = ["sudo", "umount", str(mountpoint)]
log.info(
f"Unmounting stale overlayfs mount probably created during earlier test run: {cmd}"
)
subprocess.run(cmd, capture_output=True, check=True)
# the overlayfs `workdir`` is owned by `root`, shutil.rmtree won't work.
cmd = ["sudo", "rm", "-rf", str(overlay_dir)]
subprocess.run(cmd, capture_output=True, check=True)
overlay_dir.mkdir()
return overlay_dir
# no need to clean up anything: on clean shutdown,
# NeonEnvBuilder.overlay_cleanup_teardown takes care of cleanup
# and on unclean shutdown, this function will take care of it
# on the next test run

View File

@@ -1,13 +0,0 @@
from typing import Iterator
import pytest
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import MockS3Server
@pytest.fixture(scope="session")
def mock_s3_server(port_distributor: PortDistributor) -> Iterator[MockS3Server]:
mock_s3_server = MockS3Server(port_distributor.get_port())
yield mock_s3_server
mock_s3_server.kill()

View File

@@ -1,348 +0,0 @@
from functools import partial
from pathlib import Path
from typing import Any, Optional, cast, List, Dict
import pytest
from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import Endpoint, NeonEnv, PgProtocol, DEFAULT_BRANCH_NAME, tenant_get_shards, \
check_restored_datadir_content
from fixtures.pageserver.utils import wait_for_last_record_lsn
from fixtures.safekeeper.utils import are_walreceivers_absent
from fixtures.utils import wait_until
"""
In this file most important resources are exposed as function-level fixtures
that depend on session-level resources like pageservers and safekeepers.
The main rationale here is that we don't need to initialize a new SK/PS/etc
every time we want to test something that has branching: we can just as well
reuse still-available PageServers from other tests of the same kind.
"""
@pytest.fixture(scope="session")
def shared_storage_repo_path(build_type: str, base_dir: Path) -> Path:
return base_dir / f"shared[{build_type}]" / "repo"
class TEndpoint(PgProtocol):
def clear_shared_buffers(self, cursor: Optional[Any] = None):
"""
Best-effort way to clear postgres buffers. Pinned buffers will not be 'cleared.'
Might also clear LFC.
"""
if cursor is not None:
cursor.execute("select clear_buffer_cache()")
else:
self.safe_psql("select clear_buffer_cache()")
_TEndpoint = Endpoint
class TTimeline:
__primary: Optional[_TEndpoint]
__secondary: Optional[_TEndpoint]
def __init__(self, timeline_id: TimelineId, tenant: "TTenant", name: str):
self.id = timeline_id
self.tenant = tenant
self.name = name
self.__primary = None
self.__secondary = None
@property
def primary(self) -> TEndpoint:
if self.__primary is None:
self.__primary = cast(_TEndpoint, self.tenant.create_endpoint(
name=self._get_full_endpoint_name("primary"),
timeline_id=self.id,
primary=True,
running=True,
))
return cast(TEndpoint, self.__primary)
def primary_with_config(self, config_lines: List[str], reboot: bool = False):
if self.__primary is None:
self.__primary = cast(_TEndpoint, self.tenant.create_endpoint(
name=self._get_full_endpoint_name("primary"),
timeline_id=self.id,
primary=True,
running=True,
config_lines=config_lines,
))
else:
self.__primary.config(config_lines)
if reboot:
if self.__primary.is_running():
self.__primary.stop()
self.__primary.start()
return cast(TTimeline, self.__primary)
@property
def secondary(self) -> TEndpoint:
if self.__secondary is None:
self.__secondary = cast(_TEndpoint, self.tenant.create_endpoint(
name=self._get_full_endpoint_name("secondary"),
timeline_id=self.id,
primary=False,
running=True,
))
return cast(TEndpoint, self.__secondary)
def secondary_with_config(self, config_lines: List[str], reboot: bool = False) -> TEndpoint:
if self.__secondary is None:
self.__secondary = cast(_TEndpoint, self.tenant.create_endpoint(
name=self._get_full_endpoint_name("secondary"),
timeline_id=self.id,
primary=False,
running=True,
config_lines=config_lines,
))
else:
self.__secondary.config(config_lines)
if reboot:
if self.__secondary.is_running():
self.__secondary.stop()
self.__secondary.start()
return cast(TEndpoint, self.__secondary)
def _get_full_endpoint_name(self, name) -> str:
return f"{self.name}.{name}"
def create_branch(self, name: str, lsn: Optional[Lsn]) -> "TTimeline":
return self.tenant.create_timeline(
new_name=name,
parent_name=self.name,
branch_at=lsn,
)
def stop_and_flush(self, endpoint: TEndpoint):
self.tenant.stop_endpoint(endpoint)
self.tenant.flush_timeline_data(self)
def checkpoint(self, **kwargs):
self.tenant.checkpoint_timeline(self, **kwargs)
class TTenant:
"""
An object representing a single test case on a shared pageserver.
All operations here are safe practically safe.
"""
def __init__(
self,
env: NeonEnv,
name: str,
config: Optional[Dict[str, Any]] = None,
):
self.id = TenantId.generate()
self.timelines = []
self.timeline_names = {}
self.timeline_ids = {}
self.name = name
# publicly inaccessible stuff, used during shutdown
self.__endpoints: List[Endpoint] = []
self.__env: NeonEnv = env
env.neon_cli.create_tenant(
tenant_id=self.id,
set_default=False,
conf=config
)
self.first_timeline_id = env.neon_cli.create_branch(
new_branch_name=f"{self.name}:{DEFAULT_BRANCH_NAME}",
ancestor_branch_name=DEFAULT_BRANCH_NAME,
tenant_id=self.id,
)
self._add_timeline(
TTimeline(
timeline_id=self.first_timeline_id,
tenant=self,
name=DEFAULT_BRANCH_NAME,
)
)
def _add_timeline(self, timeline: TTimeline):
assert timeline.tenant == self
assert timeline not in self.timelines
assert timeline.id is not None
self.timelines.append(timeline)
self.timeline_ids[timeline.id] = timeline
if timeline.name is not None:
self.timeline_names[timeline.name] = timeline
def create_timeline(
self,
new_name: str,
parent_name: Optional[str] = None,
parent_id: Optional[TimelineId] = None,
branch_at: Optional[Lsn] = None,
) -> TTimeline:
if parent_name is not None:
pass
elif parent_id is not None:
assert parent_name is None
parent = self.timeline_ids[parent_id]
parent_name = parent.name
else:
raise LookupError("Timeline creation requires parent by either ID or name")
assert parent_name is not None
new_id = self.__env.neon_cli.create_branch(
new_branch_name=f"{self.name}:{new_name}",
ancestor_branch_name=f"{self.name}:{parent_name}",
tenant_id=self.id,
ancestor_start_lsn=branch_at,
)
new_tl = TTimeline(
timeline_id=new_id,
tenant=self,
name=new_name,
)
self._add_timeline(new_tl)
return new_tl
@property
def default_timeline(self) -> TTimeline:
return self.timeline_ids.get(self.first_timeline_id)
def start_endpoint(self, ep: TEndpoint):
if ep not in self.__endpoints:
return
ep = cast(Endpoint, ep)
if not ep.is_running():
ep.start()
def stop_endpoint(self, ep: TEndpoint, mode: str = "fast"):
if ep not in self.__endpoints:
return
ep = cast(Endpoint, ep)
if ep.is_running():
ep.stop(mode=mode)
def create_endpoint(
self,
name: str,
timeline_id: TimelineId,
primary: bool = True,
running: bool = False,
port: Optional[int] = None,
http_port: Optional[int] = None,
lsn: Optional[Lsn] = None,
config_lines: Optional[List[str]] = None,
) -> TEndpoint:
endpoint: _TEndpoint
if port is None:
port = self.__env.port_distributor.get_port()
if http_port is None:
http_port = self.__env.port_distributor.get_port()
if running:
endpoint = self.__env.endpoints.create_start(
branch_name=self.timeline_ids[timeline_id].name,
endpoint_id=f"{self.name}.{name}",
tenant_id=self.id,
lsn=lsn,
hot_standby=not primary,
config_lines=config_lines,
)
else:
endpoint = self.__env.endpoints.create(
branch_name=self.timeline_ids[timeline_id].name,
endpoint_id=f"{self.name}.{name}",
tenant_id=self.id,
lsn=lsn,
hot_standby=not primary,
config_lines=config_lines,
)
self.__endpoints.append(endpoint)
return endpoint
def shutdown_resources(self):
for ep in self.__endpoints:
try:
ep.stop_and_destroy("fast")
except BaseException as e:
log.error("Error encountered while shutting down endpoint %s", ep.endpoint_id, exc_info=e)
def reconfigure(self, endpoint: TEndpoint, lines: List[str], restart: bool):
if endpoint not in self.__endpoints:
return
endpoint = cast(_TEndpoint, endpoint)
was_running = endpoint.is_running()
if restart and was_running:
endpoint.stop()
endpoint.config(lines)
if restart:
endpoint.start()
def flush_timeline_data(self, timeline: TTimeline) -> Lsn:
commit_lsn: Lsn = Lsn(0)
# In principle in the absense of failures polling single sk would be enough.
for sk in self.__env.safekeepers:
cli = sk.http_client()
# wait until compute connections are gone
wait_until(30, 0.5, partial(are_walreceivers_absent, cli, self.id, timeline.id))
commit_lsn = max(cli.get_commit_lsn(self.id, timeline.id), commit_lsn)
# Note: depending on WAL filtering implementation, probably most shards
# won't be able to reach commit_lsn (unless gaps are also ack'ed), so this
# is broken in sharded case.
shards = tenant_get_shards(env=self.__env, tenant_id=self.id)
for tenant_shard_id, pageserver in shards:
log.info(
f"flush_ep_to_pageserver: waiting for {commit_lsn} on shard {tenant_shard_id} on pageserver {pageserver.id})"
)
waited = wait_for_last_record_lsn(
pageserver.http_client(), tenant_shard_id, timeline.id, commit_lsn
)
assert waited >= commit_lsn
return commit_lsn
def checkpoint_timeline(self, timeline: TTimeline, **kwargs):
self.__env.pageserver.http_client().timeline_checkpoint(
tenant_id=self.id,
timeline_id=timeline.id,
**kwargs,
)
def pgdatadir(self, endpoint: TEndpoint):
if endpoint not in self.__endpoints:
return None
return cast(Endpoint, endpoint).pgdata_dir
def check_restored_datadir_content(self, path, endpoint: TEndpoint, *args, **kwargs):
if endpoint not in self.__endpoints:
return
check_restored_datadir_content(path, self.__env, cast(Endpoint, endpoint), *args, **kwargs)

View File

@@ -84,7 +84,7 @@ def test_storage_controller_many_tenants(
compute_reconfigure_listener.register_on_notify(lambda body: time.sleep(0.01))
env = neon_env_builder.init_configs()
neon_env_builder.start()
env.start()
# We will intentionally stress reconciler concurrrency, which triggers a warning when lots
# of shards are hitting the delayed path.

View File

@@ -15,7 +15,6 @@ from fixtures.neon_fixtures import (
)
from fixtures.pageserver.http import PageserverApiException
from fixtures.pageserver.utils import wait_until_tenant_active
from fixtures.shared_fixtures import TTimeline
from fixtures.utils import query_scalar
from performance.test_perf_pgbench import get_scales_matrix
from requests import RequestException
@@ -116,10 +115,13 @@ def test_branching_with_pgbench(
# This test checks if the pageserver is able to handle a "unnormalized" starting LSN.
#
# Related: see discussion in https://github.com/neondatabase/neon/pull/2143#issuecomment-1209092186
def test_branching_unnormalized_start_lsn(timeline: TTimeline, pg_bin: PgBin):
def test_branching_unnormalized_start_lsn(neon_simple_env: NeonEnv, pg_bin: PgBin):
XLOG_BLCKSZ = 8192
endpoint0 = timeline.primary
env = neon_simple_env
env.neon_cli.create_branch("b0")
endpoint0 = env.endpoints.create_start("b0")
pg_bin.run_capture(["pgbench", "-i", endpoint0.connstr()])
@@ -131,8 +133,8 @@ def test_branching_unnormalized_start_lsn(timeline: TTimeline, pg_bin: PgBin):
start_lsn = Lsn((int(curr_lsn) - XLOG_BLCKSZ) // XLOG_BLCKSZ * XLOG_BLCKSZ)
log.info(f"Branching b1 from b0 starting at lsn {start_lsn}...")
tl2 = timeline.create_branch("branch", start_lsn)
endpoint1 = tl2.primary
env.neon_cli.create_branch("b1", "b0", ancestor_start_lsn=start_lsn)
endpoint1 = env.endpoints.create_start("b1")
pg_bin.run_capture(["pgbench", "-i", endpoint1.connstr()])

View File

@@ -1,13 +1,14 @@
from pathlib import Path
from fixtures.neon_fixtures import NeonEnvBuilder, flush_ep_to_pageserver, test_output_dir
from fixtures.shared_fixtures import TTimeline
from fixtures.neon_fixtures import NeonEnvBuilder, flush_ep_to_pageserver
def do_combocid_op(timeline: TTimeline, op):
endpoint = timeline.primary_with_config(config_lines=[
"shared_buffers='1MB'",
])
def do_combocid_op(neon_env_builder: NeonEnvBuilder, op):
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start(
"main",
config_lines=[
"shared_buffers='1MB'",
],
)
conn = endpoint.connect()
cur = conn.cursor()
@@ -42,26 +43,32 @@ def do_combocid_op(timeline: TTimeline, op):
assert len(rows) == 500
cur.execute("rollback")
timeline.stop_and_flush(endpoint)
timeline.checkpoint(compact=False, wait_until_uploaded=True)
flush_ep_to_pageserver(env, endpoint, env.initial_tenant, env.initial_timeline)
env.pageserver.http_client().timeline_checkpoint(
env.initial_tenant, env.initial_timeline, compact=False, wait_until_uploaded=True
)
def test_combocid_delete(timeline: TTimeline):
do_combocid_op(timeline, "delete from t")
def test_combocid_delete(neon_env_builder: NeonEnvBuilder):
do_combocid_op(neon_env_builder, "delete from t")
def test_combocid_update(timeline: TTimeline):
do_combocid_op(timeline, "update t set val=val+1")
def test_combocid_update(neon_env_builder: NeonEnvBuilder):
do_combocid_op(neon_env_builder, "update t set val=val+1")
def test_combocid_lock(timeline: TTimeline):
do_combocid_op(timeline, "select * from t for update")
def test_combocid_lock(neon_env_builder: NeonEnvBuilder):
do_combocid_op(neon_env_builder, "select * from t for update")
def test_combocid_multi_insert(timeline: TTimeline, test_output_dir: Path):
endpoint = timeline.primary_with_config(config_lines=[
"shared_buffers='1MB'",
])
def test_combocid_multi_insert(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start(
"main",
config_lines=[
"shared_buffers='1MB'",
],
)
conn = endpoint.connect()
cur = conn.cursor()
@@ -70,7 +77,7 @@ def test_combocid_multi_insert(timeline: TTimeline, test_output_dir: Path):
cur.execute("CREATE EXTENSION neon_test_utils")
cur.execute("create table t(id integer, val integer)")
file_path = str(test_output_dir / "t.csv")
file_path = f"{endpoint.pg_data_dir_path()}/t.csv"
cur.execute(f"insert into t select g, 0 from generate_series(1,{n_records}) g")
cur.execute(f"copy t to '{file_path}'")
cur.execute("truncate table t")
@@ -99,12 +106,15 @@ def test_combocid_multi_insert(timeline: TTimeline, test_output_dir: Path):
cur.execute("rollback")
timeline.stop_and_flush(endpoint)
timeline.checkpoint(compact=False, wait_until_uploaded=True)
flush_ep_to_pageserver(env, endpoint, env.initial_tenant, env.initial_timeline)
env.pageserver.http_client().timeline_checkpoint(
env.initial_tenant, env.initial_timeline, compact=False, wait_until_uploaded=True
)
def test_combocid(timeline: TTimeline):
endpoint = timeline.primary
def test_combocid(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start("main")
conn = endpoint.connect()
cur = conn.cursor()
@@ -137,5 +147,7 @@ def test_combocid(timeline: TTimeline):
cur.execute("rollback")
timeline.stop_and_flush(endpoint)
timeline.checkpoint(compact=False, wait_until_uploaded=True)
flush_ep_to_pageserver(env, endpoint, env.initial_tenant, env.initial_timeline)
env.pageserver.http_client().timeline_checkpoint(
env.initial_tenant, env.initial_timeline, compact=False, wait_until_uploaded=True
)

View File

@@ -178,7 +178,7 @@ def test_backward_compatibility(
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.from_repo_dir(compatibility_snapshot_dir / "repo")
env.pageserver.allowed_errors.append(ingest_lag_log_line)
neon_env_builder.start()
env.start()
check_neon_works(
env,
@@ -265,7 +265,7 @@ def test_forward_compatibility(
# does not include logs from previous runs
assert not env.pageserver.log_contains("git-env:" + prev_pageserver_version)
neon_env_builder.start()
env.start()
# ensure the specified pageserver is running
assert env.pageserver.log_contains("git-env:" + prev_pageserver_version)

View File

@@ -5,7 +5,6 @@ import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
from fixtures.pg_version import PgVersion
from fixtures.shared_fixtures import TTimeline
from fixtures.utils import query_scalar
@@ -13,17 +12,18 @@ from fixtures.utils import query_scalar
# Test CREATE DATABASE when there have been relmapper changes
#
@pytest.mark.parametrize("strategy", ["file_copy", "wal_log"])
def test_createdb(timeline: TTimeline, pg_version: PgVersion, strategy: str):
if pg_version == PgVersion.V14 and strategy == "wal_log":
def test_createdb(neon_simple_env: NeonEnv, strategy: str):
env = neon_simple_env
if env.pg_version == PgVersion.V14 and strategy == "wal_log":
pytest.skip("wal_log strategy not supported on PostgreSQL 14")
endpoint = timeline.primary
endpoint = env.endpoints.create_start("main")
with endpoint.cursor() as cur:
# Cause a 'relmapper' change in the original branch
cur.execute("VACUUM FULL pg_class")
if pg_version == PgVersion.V14:
if env.pg_version == PgVersion.V14:
cur.execute("CREATE DATABASE foodb")
else:
cur.execute(f"CREATE DATABASE foodb STRATEGY={strategy}")
@@ -31,8 +31,8 @@ def test_createdb(timeline: TTimeline, pg_version: PgVersion, strategy: str):
lsn = query_scalar(cur, "SELECT pg_current_wal_insert_lsn()")
# Create a branch
tl2 = timeline.create_branch("createdb2", lsn=lsn)
endpoint2 = tl2.primary
env.neon_cli.create_branch("test_createdb2", "main", ancestor_start_lsn=lsn)
endpoint2 = env.endpoints.create_start("test_createdb2")
# Test that you can connect to the new database on both branches
for db in (endpoint, endpoint2):
@@ -58,8 +58,9 @@ def test_createdb(timeline: TTimeline, pg_version: PgVersion, strategy: str):
#
# Test DROP DATABASE
#
def test_dropdb(timeline: TTimeline, test_output_dir):
endpoint = timeline.primary
def test_dropdb(neon_simple_env: NeonEnv, test_output_dir):
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
with endpoint.cursor() as cur:
cur.execute("CREATE DATABASE foodb")
@@ -76,28 +77,28 @@ def test_dropdb(timeline: TTimeline, test_output_dir):
lsn_after_drop = query_scalar(cur, "SELECT pg_current_wal_insert_lsn()")
# Create two branches before and after database drop.
tl_before = timeline.create_branch("test_before_dropdb", lsn=lsn_before_drop)
endpoint_before = tl_before.primary
env.neon_cli.create_branch("test_before_dropdb", "main", ancestor_start_lsn=lsn_before_drop)
endpoint_before = env.endpoints.create_start("test_before_dropdb")
tl_after = timeline.create_branch("test_after_dropdb", lsn=lsn_after_drop)
endpoint_after = tl_after.primary
env.neon_cli.create_branch("test_after_dropdb", "main", ancestor_start_lsn=lsn_after_drop)
endpoint_after = env.endpoints.create_start("test_after_dropdb")
# Test that database exists on the branch before drop
endpoint_before.connect(dbname="foodb").close()
# Test that database subdir exists on the branch before drop
assert timeline.tenant.pgdatadir(endpoint_before)
dbpath = pathlib.Path(timeline.tenant.pgdatadir(endpoint_before)) / "base" / str(dboid)
assert endpoint_before.pgdata_dir
dbpath = pathlib.Path(endpoint_before.pgdata_dir) / "base" / str(dboid)
log.info(dbpath)
assert os.path.isdir(dbpath) is True
# Test that database subdir doesn't exist on the branch after drop
assert timeline.tenant.pgdatadir(endpoint_after)
dbpath = pathlib.Path(timeline.tenant.pgdatadir(endpoint_after)) / "base" / str(dboid)
assert endpoint_after.pgdata_dir
dbpath = pathlib.Path(endpoint_after.pgdata_dir) / "base" / str(dboid)
log.info(dbpath)
assert os.path.isdir(dbpath) is False
# Check that we restore the content of the datadir correctly
timeline.tenant.check_restored_datadir_content(test_output_dir, endpoint)
check_restored_datadir_content(test_output_dir, env, endpoint)

View File

@@ -1,13 +1,13 @@
from fixtures.neon_fixtures import NeonEnv
from fixtures.shared_fixtures import TTimeline
from fixtures.utils import query_scalar
#
# Test CREATE USER to check shared catalog restore
#
def test_createuser(timeline: TTimeline):
endpoint = timeline.primary
def test_createuser(neon_simple_env: NeonEnv):
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
with endpoint.cursor() as cur:
# Cause a 'relmapper' change in the original branch
@@ -18,8 +18,8 @@ def test_createuser(timeline: TTimeline):
lsn = query_scalar(cur, "SELECT pg_current_wal_insert_lsn()")
# Create a branch
branch = timeline.create_branch("test_createuser2", lsn=lsn)
endpoint2 = branch.primary
env.neon_cli.create_branch("test_createuser2", "main", ancestor_start_lsn=lsn)
endpoint2 = env.endpoints.create_start("test_createuser2")
# Test that you can connect to new branch as a new user
assert endpoint2.safe_psql("select current_user", user="testuser") == [("testuser",)]

View File

@@ -1,6 +1,5 @@
import pytest
from fixtures.neon_fixtures import Endpoint
from fixtures.shared_fixtures import TTimeline
from fixtures.neon_fixtures import NeonEnvBuilder
@pytest.mark.parametrize(
@@ -11,11 +10,14 @@ from fixtures.shared_fixtures import TTimeline
"💣", # calls `trigger_segfault` internally
],
)
def test_endpoint_crash(timeline: TTimeline, sql_func: str):
def test_endpoint_crash(neon_env_builder: NeonEnvBuilder, sql_func: str):
"""
Test that triggering crash from neon_test_utils crashes the endpoint
"""
endpoint = timeline.primary
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_endpoint_crash")
endpoint = env.endpoints.create_start("test_endpoint_crash")
endpoint.safe_psql("CREATE EXTENSION neon_test_utils;")
with pytest.raises(Exception, match="This probably means the server terminated abnormally"):
endpoint.safe_psql(f"SELECT {sql_func}();")

View File

@@ -1,8 +1,10 @@
from fixtures.shared_fixtures import TTimeline
from fixtures.neon_fixtures import NeonEnvBuilder
def test_fsm_truncate(timeline: TTimeline):
endpoint = timeline.primary
def test_fsm_truncate(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_fsm_truncate")
endpoint = env.endpoints.create_start("test_fsm_truncate")
endpoint.safe_psql(
"CREATE TABLE t1(key int); CREATE TABLE t2(key int); TRUNCATE TABLE t1; TRUNCATE TABLE t2;"
)

View File

@@ -1,15 +1,17 @@
import time
from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup
from fixtures.shared_fixtures import TTimeline
#
# Test that redo of XLOG_GIN_VACUUM_PAGE doesn't produce error
#
def test_gin_redo(timeline: TTimeline):
primary = timeline.primary
secondary = timeline.secondary
def test_gin_redo(neon_simple_env: NeonEnv):
env = neon_simple_env
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
time.sleep(1)
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
con = primary.connect()
cur = con.cursor()
cur.execute("create table gin_test_tbl(id integer, i int4[])")

View File

@@ -14,7 +14,6 @@ from fixtures.neon_fixtures import (
tenant_get_shards,
wait_replica_caughtup,
)
from fixtures.shared_fixtures import TTimeline
from fixtures.utils import wait_until
@@ -222,20 +221,29 @@ def pgbench_accounts_initialized(ep):
#
# Without hs feedback enabled we'd see 'User query might have needed to see row
# versions that must be removed.' errors.
def test_hot_standby_feedback(timeline: TTimeline, pg_bin: PgBin):
with timeline.primary_with_config(config_lines=[
def test_hot_standby_feedback(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
env = neon_env_builder.init_start()
agressive_vacuum_conf = [
"log_autovacuum_min_duration = 0",
"autovacuum_naptime = 10s",
"autovacuum_vacuum_threshold = 25",
"autovacuum_vacuum_scale_factor = 0.1",
"autovacuum_vacuum_cost_delay = -1",
]) as primary:
]
with env.endpoints.create_start(
branch_name="main", endpoint_id="primary", config_lines=agressive_vacuum_conf
) as primary:
# It would be great to have more strict max_standby_streaming_delay=0s here, but then sometimes it fails with
# 'User was holding shared buffer pin for too long.'.
with timeline.secondary_with_config(config_lines=[
"max_standby_streaming_delay=2s",
"hot_standby_feedback=true",
]) as secondary:
with env.endpoints.new_replica_start(
origin=primary,
endpoint_id="secondary",
config_lines=[
"max_standby_streaming_delay=2s",
"neon.protocol_version=2",
"hot_standby_feedback=true",
],
) as secondary:
log.info(
f"primary connstr is {primary.connstr()}, secondary connstr {secondary.connstr()}"
)
@@ -278,15 +286,20 @@ def test_hot_standby_feedback(timeline: TTimeline, pg_bin: PgBin):
# Test race condition between WAL replay and backends performing queries
# https://github.com/neondatabase/neon/issues/7791
def test_replica_query_race(timeline: TTimeline):
primary_ep = timeline.primary
def test_replica_query_race(neon_simple_env: NeonEnv):
env = neon_simple_env
primary_ep = env.endpoints.create_start(
branch_name="main",
endpoint_id="primary",
)
with primary_ep.connect() as p_con:
with p_con.cursor() as p_cur:
p_cur.execute("CREATE EXTENSION neon_test_utils")
p_cur.execute("CREATE TABLE test AS SELECT 0 AS counter")
standby_ep = timeline.secondary
standby_ep = env.endpoints.new_replica_start(origin=primary_ep, endpoint_id="standby")
wait_replica_caughtup(primary_ep, standby_ep)
# In primary, run a lot of UPDATEs on a single page

View File

@@ -8,19 +8,22 @@ import time
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, PgBin
from fixtures.shared_fixtures import TTimeline
#
# Test branching, when a transaction is in prepared state
#
@pytest.mark.timeout(600)
def test_lfc_resize(timeline: TTimeline, pg_bin: PgBin):
endpoint = timeline.primary_with_config(config_lines=[
"neon.file_cache_path='file.cache'",
"neon.max_file_cache_size=512MB",
"neon.file_cache_size_limit=512MB",
])
def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin):
env = neon_simple_env
endpoint = env.endpoints.create_start(
"main",
config_lines=[
"neon.file_cache_path='file.cache'",
"neon.max_file_cache_size=512MB",
"neon.file_cache_size_limit=512MB",
],
)
n_resize = 10
scale = 100
@@ -46,7 +49,7 @@ def test_lfc_resize(timeline: TTimeline, pg_bin: PgBin):
thread.join()
lfc_file_path = timeline.tenant.pgdatadir(endpoint) / "file.cache"
lfc_file_path = f"{endpoint.pg_data_dir_path()}/file.cache"
lfc_file_size = os.path.getsize(lfc_file_path)
res = subprocess.run(["ls", "-sk", lfc_file_path], check=True, text=True, capture_output=True)
lfc_file_blocks = re.findall("([0-9A-F]+)", res.stdout)[0]

View File

@@ -3,7 +3,6 @@ from pathlib import Path
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from fixtures.shared_fixtures import TTimeline
from fixtures.utils import query_scalar
@@ -74,14 +73,18 @@ WITH (fillfactor='100');
assert blocks < 10
def test_sliding_working_set_approximation(timeline: TTimeline):
endpoint = timeline.primary_with_config(config_lines=[
"autovacuum = off",
"shared_buffers=1MB",
"neon.max_file_cache_size=256MB",
"neon.file_cache_size_limit=245MB",
])
def test_sliding_working_set_approximation(neon_simple_env: NeonEnv):
env = neon_simple_env
endpoint = env.endpoints.create_start(
branch_name="main",
config_lines=[
"autovacuum = off",
"shared_buffers=1MB",
"neon.max_file_cache_size=256MB",
"neon.file_cache_size_limit=245MB",
],
)
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("create extension neon")

View File

@@ -1,5 +1,4 @@
from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
from fixtures.shared_fixtures import TTimeline
from fixtures.utils import query_scalar
@@ -13,8 +12,9 @@ from fixtures.utils import query_scalar
# is enough to verify that the WAL records are handled correctly
# in the pageserver.
#
def test_multixact(timeline: TTimeline, test_output_dir):
endpoint = timeline.primary
def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
cur = endpoint.connect().cursor()
cur.execute(
@@ -72,8 +72,10 @@ def test_multixact(timeline: TTimeline, test_output_dir):
assert int(next_multixact_id) > int(next_multixact_id_old)
# Branch at this point
branch = timeline.create_branch("test_multixact_new", lsn=lsn)
endpoint_new = branch.primary
env.neon_cli.create_branch(
"test_multixact_new", ancestor_branch_name="main", ancestor_start_lsn=lsn
)
endpoint_new = env.endpoints.create_start("test_multixact_new")
next_multixact_id_new = endpoint_new.safe_psql(
"SELECT next_multixact_id FROM pg_control_checkpoint()"
@@ -83,4 +85,4 @@ def test_multixact(timeline: TTimeline, test_output_dir):
assert next_multixact_id_new == next_multixact_id
# Check that we can restore the content of the datadir correctly
timeline.tenant.check_restored_datadir_content(test_output_dir, endpoint)
check_restored_datadir_content(test_output_dir, env, endpoint)

View File

@@ -374,7 +374,7 @@ def test_sharding_split_smoke(
non_default_tenant_config = {"gc_horizon": 77 * 1024 * 1024}
env = neon_env_builder.init_configs(True)
neon_env_builder.start()
env.start()
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
env.neon_cli.create_tenant(
@@ -1436,7 +1436,7 @@ def test_sharding_unlogged_relation(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_pageservers = 2
env = neon_env_builder.init_configs()
neon_env_builder.start()
env.start()
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
@@ -1475,7 +1475,7 @@ def test_top_tenants(neon_env_builder: NeonEnvBuilder):
"""
env = neon_env_builder.init_configs()
neon_env_builder.start()
env.start()
tenants = []
n_tenants = 8

View File

@@ -2,7 +2,7 @@ import time
from contextlib import closing
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, fork_at_current_lsn
from fixtures.utils import query_scalar
@@ -57,7 +57,7 @@ def test_vm_bit_clear(neon_simple_env: NeonEnv):
cur.execute("UPDATE vmtest_cold_update2 SET id = 5000, filler=repeat('x', 200) WHERE id = 1")
# Branch at this point, to test that later
# fork_at_current_lsn(env, endpoint, "test_vm_bit_clear_new", "main")
fork_at_current_lsn(env, endpoint, "test_vm_bit_clear_new", "main")
# Clear the buffer cache, to force the VM page to be re-fetched from
# the page server
@@ -91,7 +91,6 @@ def test_vm_bit_clear(neon_simple_env: NeonEnv):
# a dirty VM page is evicted. If the VM bit was not correctly cleared by the
# earlier WAL record, the full-page image hides the problem. Starting a new
# server at the right point-in-time avoids that full-page image.
endpoint_new = env.endpoints.create_start("test_vm_bit_clear_new")
pg_new_conn = endpoint_new.connect()

View File

@@ -1,11 +1,11 @@
{
"v16": [
"16.4",
"6e9a4ff6249ac02b8175054b7b3f7dfb198be48b"
"0baa7346dfd42d61912eeca554c9bb0a190f0a1e"
],
"v15": [
"15.8",
"49d5e576a56e4cc59cd6a6a0791b2324b9fa675e"
"6f6d77fb5960602fcd3fd130aca9f99ecb1619c9"
],
"v14": [
"14.13",