Compare commits

..

4 Commits

Author SHA1 Message Date
Matthias van de Meent
209fd611ac Some work on reducing test setup overhead
With the introduction of test-local tenants and timelines, we can still
allow tests to share a pageserver and/or safekeeper, removing the overhead
of setting up those components.

Future work can add wrappers for Pageserver, Safekeepers, and other APIs
that expose tenant-level configuration options that shouldn't impact
concurrent or sequential tests that reuse the same PS/SK resources.

Important note: This doesn't yet work consistently for all updated tests.

[skip ci]
2024-09-19 09:59:24 +02:00
Matthias van de Meent
31192ee655 Intermediate state for process-local Endpoints when accessing shared state 2024-09-13 02:20:31 +01:00
Matthias van de Meent
8ecbe975cd Intermediate state for process-local Endpoints when accessing shared state 2024-09-12 23:34:20 +01:00
Matthias van de Meent
1e62eb9ba0 Great Success! with shared fixtures
Not everything is done yet, but a good amount of this is done.
2024-09-12 22:05:05 +01:00
53 changed files with 1024 additions and 529 deletions

View File

@@ -1 +0,0 @@
FROM neondatabase/build-tools:pinned

View File

@@ -1,23 +0,0 @@
// https://containers.dev/implementors/json_reference/
{
"name": "Neon",
"build": {
"context": "..",
"dockerfile": "Dockerfile.devcontainer"
},
"postCreateCommand": {
"build neon": "BUILD_TYPE=debug CARGO_BUILD_FLAGS='--features=testing' mold -run make -s -j`nproc`",
"install python deps": "./scripts/pysync"
},
"customizations": {
"vscode": {
"extensions": [
"charliermarsh.ruff",
"github.vscode-github-actions",
"rust-lang.rust-analyzer"
]
}
}
}

View File

@@ -640,8 +640,6 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
}
Some(("branch", branch_match)) => {
let tenant_id = get_tenant_id(branch_match, env)?;
let new_timeline_id =
parse_timeline_id(branch_match)?.unwrap_or(TimelineId::generate());
let new_branch_name = branch_match
.get_one::<String>("branch-name")
.ok_or_else(|| anyhow!("No branch name provided"))?;
@@ -660,6 +658,7 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
.map(|lsn_str| Lsn::from_str(lsn_str))
.transpose()
.context("Failed to parse ancestor start Lsn from the request")?;
let new_timeline_id = TimelineId::generate();
let storage_controller = StorageController::from_env(env);
let create_req = TimelineCreateRequest {
new_timeline_id,
@@ -1571,6 +1570,7 @@ fn cli() -> Command {
.value_parser(value_parser!(PathBuf))
.value_name("config")
)
.arg(pg_version_arg.clone())
.arg(force_arg)
)
.subcommand(
@@ -1583,7 +1583,6 @@ fn cli() -> Command {
.subcommand(Command::new("branch")
.about("Create a new timeline, using another timeline as a base, copying its data")
.arg(tenant_id_arg.clone())
.arg(timeline_id_arg.clone())
.arg(branch_name_arg.clone())
.arg(Arg::new("ancestor-branch-name").long("ancestor-branch-name")
.help("Use last Lsn of another timeline (and its data) as base when creating the new timeline. The timeline gets resolved by its branch name.").required(false))

View File

@@ -89,19 +89,8 @@ impl PageserverUtilization {
/// If a node is currently hosting more work than it can comfortably handle. This does not indicate that
/// it will fail, but it is a strong signal that more work should not be added unless there is no alternative.
///
/// When a node is overloaded, we may override soft affinity preferences and do things like scheduling
/// into a node in a less desirable AZ, if all the nodes in the preferred AZ are overloaded.
pub fn is_overloaded(score: RawScore) -> bool {
// Why the factor of two? This is unscientific but reflects behavior of real systems:
// - In terms of shard counts, a node's preferred max count is a soft limit intended to keep
// startup and housekeeping jobs nice and responsive. We can go to double this limit if needed
// until some more nodes are deployed.
// - In terms of disk space, the node's utilization heuristic assumes every tenant needs to
// hold its biggest timeline fully on disk, which is tends to be an over estimate when
// some tenants are very idle and have dropped layers from disk. In practice going up to
// double is generally better than giving up and scheduling in a sub-optimal AZ.
score >= 2 * Self::UTILIZATION_FULL
score >= Self::UTILIZATION_FULL
}
pub fn adjust_shard_count_max(&mut self, shard_count: u32) {

View File

@@ -81,16 +81,17 @@ pub fn is_expected_io_error(e: &io::Error) -> bool {
)
}
#[async_trait::async_trait]
pub trait Handler<IO> {
/// Handle single query.
/// postgres_backend will issue ReadyForQuery after calling this (this
/// might be not what we want after CopyData streaming, but currently we don't
/// care). It will also flush out the output buffer.
fn process_query(
async fn process_query(
&mut self,
pgb: &mut PostgresBackend<IO>,
query_string: &str,
) -> impl Future<Output = Result<(), QueryError>>;
) -> Result<(), QueryError>;
/// Called on startup packet receival, allows to process params.
///

View File

@@ -23,6 +23,7 @@ async fn make_tcp_pair() -> (TcpStream, TcpStream) {
struct TestHandler {}
#[async_trait::async_trait]
impl<IO: AsyncRead + AsyncWrite + Unpin + Send> Handler<IO> for TestHandler {
// return single col 'hey' for any query
async fn process_query(

View File

@@ -1199,6 +1199,7 @@ impl PageServerHandler {
}
}
#[async_trait::async_trait]
impl<IO> postgres_backend::Handler<IO> for PageServerHandler
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,

View File

@@ -1205,13 +1205,6 @@ impl<'a> DatadirModification<'a> {
img: Bytes,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
let key = rel_block_to_key(rel, blknum);
if !key.is_valid_key_on_write_path() {
anyhow::bail!(
"the request contains data not supported by pageserver at {}",
key
);
}
self.put(rel_block_to_key(rel, blknum), Value::Image(img));
Ok(())
}
@@ -1223,34 +1216,14 @@ impl<'a> DatadirModification<'a> {
blknum: BlockNumber,
img: Bytes,
) -> anyhow::Result<()> {
let key = slru_block_to_key(kind, segno, blknum);
if !key.is_valid_key_on_write_path() {
anyhow::bail!(
"the request contains data not supported by pageserver at {}",
key
);
}
self.put(key, Value::Image(img));
self.put(slru_block_to_key(kind, segno, blknum), Value::Image(img));
Ok(())
}
pub(crate) fn put_rel_page_image_zero(
&mut self,
rel: RelTag,
blknum: BlockNumber,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
let key = rel_block_to_key(rel, blknum);
if !key.is_valid_key_on_write_path() {
anyhow::bail!(
"the request contains data not supported by pageserver: {} @ {}",
key,
self.lsn
);
}
self.pending_zero_data_pages.insert(key.to_compact());
pub(crate) fn put_rel_page_image_zero(&mut self, rel: RelTag, blknum: BlockNumber) {
self.pending_zero_data_pages
.insert(rel_block_to_key(rel, blknum).to_compact());
self.pending_bytes += ZERO_PAGE.len();
Ok(())
}
pub(crate) fn put_slru_page_image_zero(
@@ -1258,18 +1231,10 @@ impl<'a> DatadirModification<'a> {
kind: SlruKind,
segno: u32,
blknum: BlockNumber,
) -> anyhow::Result<()> {
let key = slru_block_to_key(kind, segno, blknum);
if !key.is_valid_key_on_write_path() {
anyhow::bail!(
"the request contains data not supported by pageserver: {} @ {}",
key,
self.lsn
);
}
self.pending_zero_data_pages.insert(key.to_compact());
) {
self.pending_zero_data_pages
.insert(slru_block_to_key(kind, segno, blknum).to_compact());
self.pending_bytes += ZERO_PAGE.len();
Ok(())
}
/// Call this at the end of each WAL record.

View File

@@ -1222,7 +1222,7 @@ impl WalIngest {
if rec.blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0 {
// Tail of last remaining FSM page has to be zeroed.
// We are not precise here and instead of digging in FSM bitmap format just clear the whole page.
modification.put_rel_page_image_zero(rel, fsm_physical_page_no)?;
modification.put_rel_page_image_zero(rel, fsm_physical_page_no);
fsm_physical_page_no += 1;
}
let nblocks = get_relsize(modification, rel, ctx).await?;
@@ -1244,7 +1244,7 @@ impl WalIngest {
if rec.blkno % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 {
// Tail of last remaining vm page has to be zeroed.
// We are not precise here and instead of digging in VM bitmap format just clear the whole page.
modification.put_rel_page_image_zero(rel, vm_page_no)?;
modification.put_rel_page_image_zero(rel, vm_page_no);
vm_page_no += 1;
}
let nblocks = get_relsize(modification, rel, ctx).await?;
@@ -1737,7 +1737,7 @@ impl WalIngest {
continue;
}
modification.put_rel_page_image_zero(rel, gap_blknum)?;
modification.put_rel_page_image_zero(rel, gap_blknum);
}
}
Ok(())
@@ -1803,7 +1803,7 @@ impl WalIngest {
// fill the gap with zeros
for gap_blknum in old_nblocks..blknum {
modification.put_slru_page_image_zero(kind, segno, gap_blknum)?;
modification.put_slru_page_image_zero(kind, segno, gap_blknum);
}
}
Ok(())

View File

@@ -311,9 +311,7 @@ async fn auth_quirks(
let (allowed_ips, maybe_secret) = api.get_allowed_ips_and_secret(ctx, &info).await?;
// check allowed list
if config.ip_allowlist_check_enabled
&& !check_peer_addr_is_in_list(&ctx.peer_addr(), &allowed_ips)
{
if !check_peer_addr_is_in_list(&ctx.peer_addr(), &allowed_ips) {
return Err(auth::AuthError::ip_address_not_allowed(ctx.peer_addr()));
}
@@ -605,7 +603,6 @@ mod tests {
rate_limiter_enabled: true,
rate_limiter: AuthRateLimiter::new(&RateBucketInfo::DEFAULT_AUTH_SET),
rate_limit_ip_subnet: 64,
ip_allowlist_check_enabled: true,
});
async fn read_message(r: &mut (impl AsyncRead + Unpin), b: &mut BytesMut) -> PgMessage {

View File

@@ -538,17 +538,4 @@ mod tests {
));
Ok(())
}
#[test]
fn test_connection_blocker() {
fn check(v: serde_json::Value) -> bool {
let peer_addr = IpAddr::from([127, 0, 0, 1]);
let ip_list: Vec<IpPattern> = serde_json::from_value(v).unwrap();
check_peer_addr_is_in_list(&peer_addr, &ip_list)
}
assert!(check(json!([])));
assert!(check(json!(["127.0.0.1"])));
assert!(!check(json!(["255.255.255.255"])));
}
}

View File

@@ -224,7 +224,6 @@ fn build_config(args: &LocalProxyCliArgs) -> anyhow::Result<&'static ProxyConfig
rate_limiter_enabled: false,
rate_limiter: BucketRateLimiter::new(vec![]),
rate_limit_ip_subnet: 64,
ip_allowlist_check_enabled: true,
},
require_client_ip: false,
handshake_timeout: Duration::from_secs(10),

View File

@@ -224,10 +224,6 @@ struct ProxyCliArgs {
/// Whether to retry the wake_compute request
#[clap(long, default_value = config::RetryConfig::WAKE_COMPUTE_DEFAULT_VALUES)]
wake_compute_retry: String,
/// Configure if this is a private access proxy for the POC: In that case the proxy will ignore the IP allowlist
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
is_private_access_proxy: bool,
}
#[derive(clap::Args, Clone, Copy, Debug)]
@@ -686,7 +682,6 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
rate_limiter_enabled: args.auth_rate_limit_enabled,
rate_limiter: AuthRateLimiter::new(args.auth_rate_limit.clone()),
rate_limit_ip_subnet: args.auth_rate_limit_ip_subnet,
ip_allowlist_check_enabled: !args.is_private_access_proxy,
};
let config = Box::leak(Box::new(ProxyConfig {

View File

@@ -242,6 +242,6 @@ mod tests {
#[test]
fn test() {
let s = "{\"branch_created\":null,\"endpoint_created\":{\"endpoint_id\":\"ep-rapid-thunder-w0qqw2q9\"},\"project_created\":null,\"type\":\"endpoint_created\"}";
serde_json::from_str::<ControlPlaneEventKey>(s).unwrap();
let _: ControlPlaneEventKey = serde_json::from_str(s).unwrap();
}
}

View File

@@ -64,7 +64,6 @@ pub struct AuthenticationConfig {
pub rate_limiter_enabled: bool,
pub rate_limiter: AuthRateLimiter,
pub rate_limit_ip_subnet: u8,
pub ip_allowlist_check_enabled: bool,
}
impl TlsConfig {

View File

@@ -395,7 +395,7 @@ mod tests {
}
}
});
serde_json::from_str::<KickSession<'_>>(&json.to_string())?;
let _: KickSession<'_> = serde_json::from_str(&json.to_string())?;
Ok(())
}
@@ -403,7 +403,7 @@ mod tests {
#[test]
fn parse_db_info() -> anyhow::Result<()> {
// with password
serde_json::from_value::<DatabaseInfo>(json!({
let _: DatabaseInfo = serde_json::from_value(json!({
"host": "localhost",
"port": 5432,
"dbname": "postgres",
@@ -413,7 +413,7 @@ mod tests {
}))?;
// without password
serde_json::from_value::<DatabaseInfo>(json!({
let _: DatabaseInfo = serde_json::from_value(json!({
"host": "localhost",
"port": 5432,
"dbname": "postgres",
@@ -422,7 +422,7 @@ mod tests {
}))?;
// new field (forward compatibility)
serde_json::from_value::<DatabaseInfo>(json!({
let _: DatabaseInfo = serde_json::from_value(json!({
"host": "localhost",
"port": 5432,
"dbname": "postgres",
@@ -441,7 +441,7 @@ mod tests {
"address": "0.0.0.0",
"aux": dummy_aux(),
});
serde_json::from_str::<WakeCompute>(&json.to_string())?;
let _: WakeCompute = serde_json::from_str(&json.to_string())?;
Ok(())
}
@@ -451,18 +451,18 @@ mod tests {
let json = json!({
"role_secret": "secret",
});
serde_json::from_str::<GetRoleSecret>(&json.to_string())?;
let _: GetRoleSecret = serde_json::from_str(&json.to_string())?;
let json = json!({
"role_secret": "secret",
"allowed_ips": ["8.8.8.8"],
});
serde_json::from_str::<GetRoleSecret>(&json.to_string())?;
let _: GetRoleSecret = serde_json::from_str(&json.to_string())?;
let json = json!({
"role_secret": "secret",
"allowed_ips": ["8.8.8.8"],
"project_id": "project",
});
serde_json::from_str::<GetRoleSecret>(&json.to_string())?;
let _: GetRoleSecret = serde_json::from_str(&json.to_string())?;
Ok(())
}

View File

@@ -78,7 +78,7 @@ pub(crate) type ComputeReady = DatabaseInfo;
// TODO: replace with an http-based protocol.
struct MgmtHandler;
#[async_trait::async_trait]
impl postgres_backend::Handler<tokio::net::TcpStream> for MgmtHandler {
async fn process_query(
&mut self,

View File

@@ -6,7 +6,7 @@ use pq_proto::StartupMessageParams;
use smol_str::SmolStr;
use std::net::IpAddr;
use tokio::sync::mpsc;
use tracing::{debug, field::display, info, info_span, Span};
use tracing::{field::display, info, info_span, Span};
use try_lock::TryLock;
use uuid::Uuid;
@@ -362,9 +362,7 @@ impl RequestMonitoringInner {
});
}
if let Some(tx) = self.sender.take() {
tx.send(RequestData::from(&*self))
.inspect_err(|e| debug!("tx send failed: {e}"))
.ok();
let _: Result<(), _> = tx.send(RequestData::from(&*self));
}
}
@@ -373,9 +371,7 @@ impl RequestMonitoringInner {
// Here we log the length of the session.
self.disconnect_timestamp = Some(Utc::now());
if let Some(tx) = self.disconnect_sender.take() {
tx.send(RequestData::from(&*self))
.inspect_err(|e| debug!("tx send failed: {e}"))
.ok();
let _: Result<(), _> = tx.send(RequestData::from(&*self));
}
}
}

View File

@@ -290,7 +290,7 @@ async fn worker_inner(
}
if !w.flushed_row_groups().is_empty() {
let _rtchk: Writer<BytesMut> = upload_parquet(w, len, &storage).await?;
let _: Writer<BytesMut> = upload_parquet(w, len, &storage).await?;
}
Ok(())

View File

@@ -3,7 +3,7 @@
#![deny(
deprecated,
future_incompatible,
let_underscore,
// TODO: consider let_underscore
nonstandard_style,
rust_2024_compatibility
)]

View File

@@ -268,7 +268,7 @@ async fn keepalive_is_inherited() -> anyhow::Result<()> {
anyhow::Ok(keepalive)
});
TcpStream::connect(("127.0.0.1", port)).await?;
let _ = TcpStream::connect(("127.0.0.1", port)).await?;
assert!(t.await??, "keepalive should be inherited");
Ok(())

View File

@@ -6,7 +6,7 @@ use redis::{
ConnectionInfo, IntoConnectionInfo, RedisConnectionInfo, RedisResult,
};
use tokio::task::JoinHandle;
use tracing::{debug, error, info};
use tracing::{error, info};
use super::elasticache::CredentialsProvider;
@@ -109,10 +109,7 @@ impl ConnectionWithCredentialsProvider {
let credentials_provider = credentials_provider.clone();
let con2 = con.clone();
let f = tokio::spawn(async move {
Self::keep_connection(con2, credentials_provider)
.await
.inspect_err(|e| debug!("keep_connection failed: {e}"))
.ok();
let _ = Self::keep_connection(con2, credentials_provider).await;
});
self.refresh_token_task = Some(f);
}

View File

@@ -50,9 +50,7 @@ impl PoolingBackend {
.as_ref()
.map(|()| user_info.clone());
let (allowed_ips, maybe_secret) = backend.get_allowed_ips_and_secret(ctx).await?;
if config.ip_allowlist_check_enabled
&& !check_peer_addr_is_in_list(&ctx.peer_addr(), &allowed_ips)
{
if !check_peer_addr_is_in_list(&ctx.peer_addr(), &allowed_ips) {
return Err(AuthError::ip_address_not_allowed(ctx.peer_addr()));
}
if !self

View File

@@ -12,7 +12,6 @@ use std::{io, task};
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio_rustls::server::TlsStream;
use tracing::debug;
/// Stream wrapper which implements libpq's protocol.
///
@@ -139,10 +138,9 @@ impl<S: AsyncWrite + Unpin> PqStream<S> {
);
// already error case, ignore client IO error
self.write_message(&BeMessage::ErrorResponse(msg, None))
.await
.inspect_err(|e| debug!("write_message failed: {e}"))
.ok();
let _: Result<_, std::io::Error> = self
.write_message(&BeMessage::ErrorResponse(msg, None))
.await;
Err(ReportedError {
source: anyhow::anyhow!(msg),
@@ -166,10 +164,9 @@ impl<S: AsyncWrite + Unpin> PqStream<S> {
);
// already error case, ignore client IO error
self.write_message(&BeMessage::ErrorResponse(&msg, None))
.await
.inspect_err(|e| debug!("write_message failed: {e}"))
.ok();
let _: Result<_, std::io::Error> = self
.write_message(&BeMessage::ErrorResponse(&msg, None))
.await;
Err(ReportedError {
source: anyhow::anyhow!(error),

View File

@@ -57,7 +57,7 @@ mod tests {
fn bad_url() {
let url = "test:foobar";
url.parse::<url::Url>().expect("unexpected parsing failure");
url.parse::<ApiUrl>().expect_err("should not parse");
let _ = url.parse::<ApiUrl>().expect_err("should not parse");
}
#[test]

View File

@@ -2,7 +2,6 @@
//! protocol commands.
use anyhow::Context;
use std::future::Future;
use std::str::{self, FromStr};
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
@@ -96,6 +95,7 @@ fn cmd_to_string(cmd: &SafekeeperPostgresCommand) -> &str {
}
}
#[async_trait::async_trait]
impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
for SafekeeperPostgresHandler
{
@@ -197,51 +197,49 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
Ok(())
}
fn process_query(
async fn process_query(
&mut self,
pgb: &mut PostgresBackend<IO>,
query_string: &str,
) -> impl Future<Output = Result<(), QueryError>> {
Box::pin(async move {
if query_string
.to_ascii_lowercase()
.starts_with("set datestyle to ")
{
// important for debug because psycopg2 executes "SET datestyle TO 'ISO'" on connect
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
return Ok(());
) -> Result<(), QueryError> {
if query_string
.to_ascii_lowercase()
.starts_with("set datestyle to ")
{
// important for debug because psycopg2 executes "SET datestyle TO 'ISO'" on connect
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
return Ok(());
}
let cmd = parse_cmd(query_string)?;
let cmd_str = cmd_to_string(&cmd);
let _guard = PG_QUERIES_GAUGE.with_label_values(&[cmd_str]).guard();
info!("got query {:?}", query_string);
let tenant_id = self.tenant_id.context("tenantid is required")?;
let timeline_id = self.timeline_id.context("timelineid is required")?;
self.check_permission(Some(tenant_id))?;
self.ttid = TenantTimelineId::new(tenant_id, timeline_id);
match cmd {
SafekeeperPostgresCommand::StartWalPush => {
self.handle_start_wal_push(pgb)
.instrument(info_span!("WAL receiver"))
.await
}
let cmd = parse_cmd(query_string)?;
let cmd_str = cmd_to_string(&cmd);
let _guard = PG_QUERIES_GAUGE.with_label_values(&[cmd_str]).guard();
info!("got query {:?}", query_string);
let tenant_id = self.tenant_id.context("tenantid is required")?;
let timeline_id = self.timeline_id.context("timelineid is required")?;
self.check_permission(Some(tenant_id))?;
self.ttid = TenantTimelineId::new(tenant_id, timeline_id);
match cmd {
SafekeeperPostgresCommand::StartWalPush => {
self.handle_start_wal_push(pgb)
.instrument(info_span!("WAL receiver"))
.await
}
SafekeeperPostgresCommand::StartReplication { start_lsn, term } => {
self.handle_start_replication(pgb, start_lsn, term)
.instrument(info_span!("WAL sender"))
.await
}
SafekeeperPostgresCommand::IdentifySystem => self.handle_identify_system(pgb).await,
SafekeeperPostgresCommand::TimelineStatus => self.handle_timeline_status(pgb).await,
SafekeeperPostgresCommand::JSONCtrl { ref cmd } => {
handle_json_ctrl(self, pgb, cmd).await
}
SafekeeperPostgresCommand::StartReplication { start_lsn, term } => {
self.handle_start_replication(pgb, start_lsn, term)
.instrument(info_span!("WAL sender"))
.await
}
})
SafekeeperPostgresCommand::IdentifySystem => self.handle_identify_system(pgb).await,
SafekeeperPostgresCommand::TimelineStatus => self.handle_timeline_status(pgb).await,
SafekeeperPostgresCommand::JSONCtrl { ref cmd } => {
handle_json_ctrl(self, pgb, cmd).await
}
}
}
}

View File

@@ -10,4 +10,8 @@ pytest_plugins = (
"fixtures.compare_fixtures",
"fixtures.slow",
"fixtures.flaky",
"fixtures.shared_fixtures",
"fixtures.function.neon_storage",
"fixtures.session.neon_storage",
"fixtures.session.s3",
)

View File

@@ -0,0 +1,48 @@
from typing import Iterator, Optional, Dict, Any
import pytest
from _pytest.fixtures import FixtureRequest
from fixtures.neon_fixtures import NeonEnv
from fixtures.shared_fixtures import TTenant, TTimeline
@pytest.fixture(scope="function")
def tenant_config() -> Dict[str, Any]:
return dict()
@pytest.fixture(scope="function")
def tenant(
neon_shared_env: NeonEnv,
request: FixtureRequest,
tenant_config: Optional[Dict[str, Any]] = None,
) -> Iterator[TTenant]:
tenant = TTenant(env=neon_shared_env, name=request.node.name, config=tenant_config)
yield tenant
tenant.shutdown_resources()
@pytest.fixture(scope="function")
def timeline(
tenant: TTenant,
) -> Iterator[TTimeline]:
timeline = tenant.default_timeline
yield timeline
@pytest.fixture(scope="function")
def exclusive_tenant(
neon_env: NeonEnv,
request: FixtureRequest,
tenant_config: Optional[Dict[str, Any]] = None,
) -> Iterator[TTenant]:
tenant = TTenant(env=neon_env, name=request.node.name, config=tenant_config)
yield tenant
tenant.shutdown_resources()
@pytest.fixture(scope="function")
def exclusive_timeline(
exclusive_tenant: TTenant,
) -> Iterator[TTimeline]:
timeline = exclusive_tenant.default_timeline
yield timeline

View File

@@ -93,7 +93,6 @@ from fixtures.utils import (
allure_add_grafana_links,
allure_attach_from_dir,
assert_no_errors,
get_self_dir,
print_gc_result,
subprocess_capture,
wait_until,
@@ -126,122 +125,6 @@ Env = Dict[str, str]
DEFAULT_OUTPUT_DIR: str = "test_output"
DEFAULT_BRANCH_NAME: str = "main"
BASE_PORT: int = 15000
@pytest.fixture(scope="session")
def base_dir() -> Iterator[Path]:
# find the base directory (currently this is the git root)
base_dir = get_self_dir().parent.parent
log.info(f"base_dir is {base_dir}")
yield base_dir
@pytest.fixture(scope="function")
def neon_binpath(base_dir: Path, build_type: str) -> Iterator[Path]:
if os.getenv("REMOTE_ENV"):
# we are in remote env and do not have neon binaries locally
# this is the case for benchmarks run on self-hosted runner
return
# Find the neon binaries.
if env_neon_bin := os.environ.get("NEON_BIN"):
binpath = Path(env_neon_bin)
else:
binpath = base_dir / "target" / build_type
log.info(f"neon_binpath is {binpath}")
if not (binpath / "pageserver").exists():
raise Exception(f"neon binaries not found at '{binpath}'")
yield binpath
@pytest.fixture(scope="function")
def pg_distrib_dir(base_dir: Path) -> Iterator[Path]:
if env_postgres_bin := os.environ.get("POSTGRES_DISTRIB_DIR"):
distrib_dir = Path(env_postgres_bin).resolve()
else:
distrib_dir = base_dir / "pg_install"
log.info(f"pg_distrib_dir is {distrib_dir}")
yield distrib_dir
@pytest.fixture(scope="session")
def top_output_dir(base_dir: Path) -> Iterator[Path]:
# Compute the top-level directory for all tests.
if env_test_output := os.environ.get("TEST_OUTPUT"):
output_dir = Path(env_test_output).resolve()
else:
output_dir = base_dir / DEFAULT_OUTPUT_DIR
output_dir.mkdir(exist_ok=True)
log.info(f"top_output_dir is {output_dir}")
yield output_dir
@pytest.fixture(scope="function")
def versioned_pg_distrib_dir(pg_distrib_dir: Path, pg_version: PgVersion) -> Iterator[Path]:
versioned_dir = pg_distrib_dir / pg_version.v_prefixed
psql_bin_path = versioned_dir / "bin/psql"
postgres_bin_path = versioned_dir / "bin/postgres"
if os.getenv("REMOTE_ENV"):
# When testing against a remote server, we only need the client binary.
if not psql_bin_path.exists():
raise Exception(f"psql not found at '{psql_bin_path}'")
else:
if not postgres_bin_path.exists():
raise Exception(f"postgres not found at '{postgres_bin_path}'")
log.info(f"versioned_pg_distrib_dir is {versioned_dir}")
yield versioned_dir
@pytest.fixture(scope="session")
def neon_api_key() -> str:
api_key = os.getenv("NEON_API_KEY")
if not api_key:
raise AssertionError("Set the NEON_API_KEY environment variable")
return api_key
@pytest.fixture(scope="session")
def neon_api_base_url() -> str:
return os.getenv("NEON_API_BASE_URL", "https://console-stage.neon.build/api/v2")
@pytest.fixture(scope="session")
def neon_api(neon_api_key: str, neon_api_base_url: str) -> NeonAPI:
return NeonAPI(neon_api_key, neon_api_base_url)
@pytest.fixture(scope="session")
def worker_port_num():
return (32768 - BASE_PORT) // int(os.environ.get("PYTEST_XDIST_WORKER_COUNT", "1"))
@pytest.fixture(scope="session")
def worker_seq_no(worker_id: str) -> int:
# worker_id is a pytest-xdist fixture
# it can be master or gw<number>
# parse it to always get a number
if worker_id == "master":
return 0
assert worker_id.startswith("gw")
return int(worker_id[2:])
@pytest.fixture(scope="session")
def worker_base_port(worker_seq_no: int, worker_port_num: int) -> int:
# so we divide ports in ranges of ports
# so workers have disjoint set of ports for services
return BASE_PORT + worker_seq_no * worker_port_num
def get_dir_size(path: str) -> int:
"""Return size in bytes."""
@@ -253,11 +136,6 @@ def get_dir_size(path: str) -> int:
return totalbytes
@pytest.fixture(scope="session")
def port_distributor(worker_base_port: int, worker_port_num: int) -> PortDistributor:
return PortDistributor(base_port=worker_base_port, port_number=worker_port_num)
@pytest.fixture(scope="function")
def default_broker(
port_distributor: PortDistributor,
@@ -273,18 +151,6 @@ def default_broker(
broker.stop()
@pytest.fixture(scope="session")
def run_id() -> Iterator[uuid.UUID]:
yield uuid.uuid4()
@pytest.fixture(scope="session")
def mock_s3_server(port_distributor: PortDistributor) -> Iterator[MockS3Server]:
mock_s3_server = MockS3Server(port_distributor.get_port())
yield mock_s3_server
mock_s3_server.kill()
class PgProtocol:
"""Reusable connection logic"""
@@ -484,6 +350,7 @@ class NeonEnvBuilder:
safekeeper_extra_opts: Optional[list[str]] = None,
storage_controller_port_override: Optional[int] = None,
pageserver_io_buffer_alignment: Optional[int] = None,
shared: Optional[bool] = False,
):
self.repo_dir = repo_dir
self.rust_log_override = rust_log_override
@@ -540,8 +407,8 @@ class NeonEnvBuilder:
self.pageserver_io_buffer_alignment = pageserver_io_buffer_alignment
assert test_name.startswith(
"test_"
assert (
test_name.startswith("test_") or shared
), "Unexpectedly instantiated from outside a test function"
self.test_name = test_name
@@ -553,6 +420,10 @@ class NeonEnvBuilder:
self.env = NeonEnv(self)
return self.env
def start(self):
assert self.env is not None, "environment is not already initialized, call init() first"
self.env.start()
def init_start(
self,
initial_tenant_conf: Optional[Dict[str, Any]] = None,
@@ -568,7 +439,7 @@ class NeonEnvBuilder:
Configuring pageserver with remote storage is now the default. There will be a warning if pageserver is created without one.
"""
env = self.init_configs(default_remote_storage_if_missing=default_remote_storage_if_missing)
env.start()
self.start()
# Prepare the default branch to start the postgres on later.
# Pageserver itself does not create tenants and timelines, until started first and asked via HTTP API.
@@ -1069,6 +940,9 @@ class NeonEnv:
self.pg_distrib_dir = config.pg_distrib_dir
self.endpoint_counter = 0
self.storage_controller_config = config.storage_controller_config
# generate initial tenant ID here instead of letting 'neon init' generate it,
# so that we don't need to dig it out of the config file afterwards.
self.initial_tenant = config.initial_tenant
self.initial_timeline = config.initial_timeline
@@ -1446,6 +1320,21 @@ def neon_simple_env(
yield env
@pytest.fixture(scope="function")
def neon_endpoint(request: FixtureRequest, neon_shared_env: NeonEnv) -> Endpoint:
neon_shared_env.neon_cli.create_branch(request.node.name)
ep = neon_shared_env.endpoints.create_start(request.node.name)
try:
yield ep
finally:
if ep.is_running():
try:
ep.stop()
except BaseException:
pass
@pytest.fixture(scope="function")
def neon_env_builder(
pytestconfig: Config,
@@ -1514,6 +1403,14 @@ class PageserverPort:
http: int
CREATE_TIMELINE_ID_EXTRACTOR: re.Pattern = re.compile( # type: ignore[type-arg]
r"^Created timeline '(?P<timeline_id>[^']+)'", re.MULTILINE
)
TIMELINE_DATA_EXTRACTOR: re.Pattern = re.compile( # type: ignore[type-arg]
r"\s?(?P<branch_name>[^\s]+)\s\[(?P<timeline_id>[^\]]+)\]", re.MULTILINE
)
class AbstractNeonCli(abc.ABC):
"""
A typed wrapper around an arbitrary Neon CLI tool.
@@ -1742,9 +1639,6 @@ class NeonCli(AbstractNeonCli):
tenant_id: Optional[TenantId] = None,
timeline_id: Optional[TimelineId] = None,
) -> TimelineId:
if timeline_id is None:
timeline_id = TimelineId.generate()
cmd = [
"timeline",
"create",
@@ -1752,16 +1646,23 @@ class NeonCli(AbstractNeonCli):
new_branch_name,
"--tenant-id",
str(tenant_id or self.env.initial_tenant),
"--timeline-id",
str(timeline_id),
"--pg-version",
self.env.pg_version,
]
if timeline_id is not None:
cmd.extend(["--timeline-id", str(timeline_id)])
res = self.raw_cli(cmd)
res.check_returncode()
return timeline_id
matches = CREATE_TIMELINE_ID_EXTRACTOR.search(res.stdout)
created_timeline_id = None
if matches is not None:
created_timeline_id = matches.group("timeline_id")
return TimelineId(str(created_timeline_id))
def create_branch(
self,
@@ -1769,17 +1670,12 @@ class NeonCli(AbstractNeonCli):
ancestor_branch_name: Optional[str] = None,
tenant_id: Optional[TenantId] = None,
ancestor_start_lsn: Optional[Lsn] = None,
new_timeline_id: Optional[TimelineId] = None,
) -> TimelineId:
if new_timeline_id is None:
new_timeline_id = TimelineId.generate()
cmd = [
"timeline",
"branch",
"--branch-name",
new_branch_name,
"--timeline-id",
str(new_timeline_id),
"--tenant-id",
str(tenant_id or self.env.initial_tenant),
]
@@ -1791,7 +1687,16 @@ class NeonCli(AbstractNeonCli):
res = self.raw_cli(cmd)
res.check_returncode()
return TimelineId(str(new_timeline_id))
matches = CREATE_TIMELINE_ID_EXTRACTOR.search(res.stdout)
created_timeline_id = None
if matches is not None:
created_timeline_id = matches.group("timeline_id")
if created_timeline_id is None:
raise Exception("could not find timeline id after `neon timeline create` invocation")
else:
return TimelineId(str(created_timeline_id))
def list_timelines(self, tenant_id: Optional[TenantId] = None) -> List[Tuple[str, TimelineId]]:
"""
@@ -1800,9 +1705,6 @@ class NeonCli(AbstractNeonCli):
# main [b49f7954224a0ad25cc0013ea107b54b]
# ┣━ @0/16B5A50: test_cli_branch_list_main [20f98c79111b9015d84452258b7d5540]
TIMELINE_DATA_EXTRACTOR: re.Pattern = re.compile( # type: ignore[type-arg]
r"\s?(?P<branch_name>[^\s]+)\s\[(?P<timeline_id>[^\]]+)\]", re.MULTILINE
)
res = self.raw_cli(
["timeline", "list", "--tenant-id", str(tenant_id or self.env.initial_tenant)]
)
@@ -4788,27 +4690,35 @@ def _get_test_dir(request: FixtureRequest, top_output_dir: Path, prefix: str) ->
return test_dir
def get_test_output_dir(request: FixtureRequest, top_output_dir: Path) -> Path:
def get_test_output_dir(
request: FixtureRequest, top_output_dir: Path, prefix: Optional[str] = None
) -> Path:
"""
The working directory for a test.
"""
return _get_test_dir(request, top_output_dir, "")
return _get_test_dir(request, top_output_dir, prefix or "")
def get_test_overlay_dir(request: FixtureRequest, top_output_dir: Path) -> Path:
def get_test_overlay_dir(
request: FixtureRequest, top_output_dir: Path, prefix: Optional[str] = None
) -> Path:
"""
Directory that contains `upperdir` and `workdir` for overlayfs mounts
that a test creates. See `NeonEnvBuilder.overlay_mount`.
"""
return _get_test_dir(request, top_output_dir, "overlay-")
return _get_test_dir(request, top_output_dir, f"overlay-{prefix or ''}")
def get_shared_snapshot_dir_path(top_output_dir: Path, snapshot_name: str) -> Path:
return top_output_dir / "shared-snapshots" / snapshot_name
def get_shared_snapshot_dir_path(
top_output_dir: Path, snapshot_name: str, prefix: Optional[str] = None
) -> Path:
return top_output_dir / f"{prefix or ''}shared-snapshots" / snapshot_name
def get_test_repo_dir(request: FixtureRequest, top_output_dir: Path) -> Path:
return get_test_output_dir(request, top_output_dir) / "repo"
def get_test_repo_dir(
request: FixtureRequest, top_output_dir: Path, prefix: Optional[str] = None
) -> Path:
return get_test_output_dir(request, top_output_dir, prefix or "") / "repo"
def pytest_addoption(parser: Parser):
@@ -5173,7 +5083,7 @@ def tenant_get_shards(
return [(TenantShardId(tenant_id, 0, 0), override_pageserver or env.pageserver)]
def wait_replica_caughtup(primary: Endpoint, secondary: Endpoint):
def wait_replica_caughtup(primary: PgProtocol, secondary: PgProtocol):
primary_lsn = Lsn(
primary.safe_psql_scalar("SELECT pg_current_wal_flush_lsn()", log_query=False)
)

View File

@@ -14,32 +14,60 @@ Dynamically parametrize tests by different parameters
"""
@pytest.fixture(scope="function", autouse=True)
def pg_version() -> Optional[PgVersion]:
return None
def get_pgversions():
if (v := os.getenv("DEFAULT_PG_VERSION")) is None:
pg_versions = [version for version in PgVersion if version != PgVersion.NOT_SET]
else:
pg_versions = [PgVersion(v)]
return pg_versions
@pytest.fixture(scope="function", autouse=True)
def build_type() -> Optional[str]:
return None
@pytest.fixture(
scope="session",
autouse=True,
params=get_pgversions(),
ids=lambda v: f"pg{v}",
)
def pg_version(request) -> Optional[PgVersion]:
return request.param
@pytest.fixture(scope="function", autouse=True)
def get_buildtypes():
if (bt := os.getenv("BUILD_TYPE")) is None:
build_types = ["debug", "release"]
else:
build_types = [bt.lower()]
return build_types
@pytest.fixture(
scope="session",
autouse=True,
params=get_buildtypes(),
ids=lambda t: f"{t}",
)
def build_type(request) -> Optional[str]:
return request.param
@pytest.fixture(scope="session", autouse=True)
def platform() -> Optional[str]:
return None
@pytest.fixture(scope="function", autouse=True)
@pytest.fixture(scope="session", autouse=True)
def pageserver_virtual_file_io_engine() -> Optional[str]:
return os.getenv("PAGESERVER_VIRTUAL_FILE_IO_ENGINE")
@pytest.fixture(scope="function", autouse=True)
@pytest.fixture(scope="session", autouse=True)
def pageserver_io_buffer_alignment() -> Optional[int]:
return None
@pytest.fixture(scope="function", autouse=True)
@pytest.fixture(scope="session", autouse=True)
def pageserver_aux_file_policy() -> Optional[AuxFileStore]:
return None
@@ -53,26 +81,12 @@ def get_pageserver_default_tenant_config_compaction_algorithm() -> Optional[Dict
return v
@pytest.fixture(scope="function", autouse=True)
@pytest.fixture(scope="session", autouse=True)
def pageserver_default_tenant_config_compaction_algorithm() -> Optional[Dict[str, Any]]:
return get_pageserver_default_tenant_config_compaction_algorithm()
def pytest_generate_tests(metafunc: Metafunc):
if (bt := os.getenv("BUILD_TYPE")) is None:
build_types = ["debug", "release"]
else:
build_types = [bt.lower()]
metafunc.parametrize("build_type", build_types)
if (v := os.getenv("DEFAULT_PG_VERSION")) is None:
pg_versions = [version for version in PgVersion if version != PgVersion.NOT_SET]
else:
pg_versions = [PgVersion(v)]
metafunc.parametrize("pg_version", pg_versions, ids=map(lambda v: f"pg{v}", pg_versions))
# A hacky way to parametrize tests only for `pageserver_virtual_file_io_engine=std-fs`
# And do not change test name for default `pageserver_virtual_file_io_engine=tokio-epoll-uring` to keep tests statistics
if (io_engine := os.getenv("PAGESERVER_VIRTUAL_FILE_IO_ENGINE", "")) not in (
@@ -89,6 +103,7 @@ def pytest_generate_tests(metafunc: Metafunc):
"pageserver_default_tenant_config_compaction_algorithm",
[explicit_default],
ids=[explicit_default["kind"]],
scope="session",
)
# For performance tests, parametrize also by platform

View File

View File

@@ -0,0 +1,303 @@
import os
import shutil
import subprocess
import uuid
from pathlib import Path
from typing import Any, Dict, Iterator, Optional, cast
import pytest
from _pytest.config import Config
from _pytest.fixtures import FixtureRequest
from fixtures import overlayfs
from fixtures.broker import NeonBroker
from fixtures.log_helper import log
from fixtures.neon_api import NeonAPI
from fixtures.neon_fixtures import (
DEFAULT_OUTPUT_DIR,
NeonEnv,
NeonEnvBuilder,
get_test_output_dir,
get_test_overlay_dir,
get_test_repo_dir,
)
from fixtures.pg_version import PgVersion
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import MockS3Server
from fixtures.utils import AuxFileStore, allure_attach_from_dir, get_self_dir
BASE_PORT: int = 15000
@pytest.fixture(scope="session")
def base_dir() -> Iterator[Path]:
# find the base directory (currently this is the git root)
base_dir = get_self_dir().parent.parent
log.info(f"base_dir is {base_dir}")
yield base_dir
@pytest.fixture(scope="session")
def neon_binpath(base_dir: Path, build_type: str) -> Iterator[Path]:
if os.getenv("REMOTE_ENV"):
# we are in remote env and do not have neon binaries locally
# this is the case for benchmarks run on self-hosted runner
return
# Find the neon binaries.
if env_neon_bin := os.environ.get("NEON_BIN"):
binpath = Path(env_neon_bin)
else:
binpath = base_dir / "target" / build_type
log.info(f"neon_binpath is {binpath}")
if not (binpath / "pageserver").exists():
raise Exception(f"neon binaries not found at '{binpath}'")
yield binpath
@pytest.fixture(scope="session")
def pg_distrib_dir(base_dir: Path) -> Iterator[Path]:
if env_postgres_bin := os.environ.get("POSTGRES_DISTRIB_DIR"):
distrib_dir = Path(env_postgres_bin).resolve()
else:
distrib_dir = base_dir / "pg_install"
log.info(f"pg_distrib_dir is {distrib_dir}")
yield distrib_dir
@pytest.fixture(scope="session")
def top_output_dir(base_dir: Path) -> Iterator[Path]:
# Compute the top-level directory for all tests.
if env_test_output := os.environ.get("TEST_OUTPUT"):
output_dir = Path(env_test_output).resolve()
else:
output_dir = base_dir / DEFAULT_OUTPUT_DIR
output_dir.mkdir(exist_ok=True)
log.info(f"top_output_dir is {output_dir}")
yield output_dir
@pytest.fixture(scope="session")
def versioned_pg_distrib_dir(pg_distrib_dir: Path, pg_version: PgVersion) -> Iterator[Path]:
versioned_dir = pg_distrib_dir / pg_version.v_prefixed
psql_bin_path = versioned_dir / "bin/psql"
postgres_bin_path = versioned_dir / "bin/postgres"
if os.getenv("REMOTE_ENV"):
# When testing against a remote server, we only need the client binary.
if not psql_bin_path.exists():
raise Exception(f"psql not found at '{psql_bin_path}'")
else:
if not postgres_bin_path.exists():
raise Exception(f"postgres not found at '{postgres_bin_path}'")
log.info(f"versioned_pg_distrib_dir is {versioned_dir}")
yield versioned_dir
@pytest.fixture(scope="session")
def neon_api_key() -> str:
api_key = os.getenv("NEON_API_KEY")
if not api_key:
raise AssertionError("Set the NEON_API_KEY environment variable")
return api_key
@pytest.fixture(scope="session")
def neon_api_base_url() -> str:
return os.getenv("NEON_API_BASE_URL", "https://console-stage.neon.build/api/v2")
@pytest.fixture(scope="session")
def neon_api(neon_api_key: str, neon_api_base_url: str) -> NeonAPI:
return NeonAPI(neon_api_key, neon_api_base_url)
@pytest.fixture(scope="session")
def worker_port_num():
return (32768 - BASE_PORT) // int(os.environ.get("PYTEST_XDIST_WORKER_COUNT", "1"))
@pytest.fixture(scope="session")
def worker_seq_no(worker_id: str) -> int:
# worker_id is a pytest-xdist fixture
# it can be master or gw<number>
# parse it to always get a number
if worker_id == "master":
return 0
assert worker_id.startswith("gw")
return int(worker_id[2:])
@pytest.fixture(scope="session")
def worker_base_port(worker_seq_no: int, worker_port_num: int) -> int:
# so we divide ports in ranges of ports
# so workers have disjoint set of ports for services
return BASE_PORT + worker_seq_no * worker_port_num
@pytest.fixture(scope="session")
def port_distributor(worker_base_port: int, worker_port_num: int) -> PortDistributor:
return PortDistributor(base_port=worker_base_port, port_number=worker_port_num)
@pytest.fixture(scope="session")
def shared_broker(
port_distributor: PortDistributor,
shared_test_output_dir: Path,
neon_binpath: Path,
) -> Iterator[NeonBroker]:
# multiple pytest sessions could get launched in parallel, get them different ports/datadirs
client_port = port_distributor.get_port()
broker_logfile = shared_test_output_dir / "repo" / "storage_broker.log"
broker = NeonBroker(logfile=broker_logfile, port=client_port, neon_binpath=neon_binpath)
yield broker
broker.stop()
@pytest.fixture(scope="session")
def run_id() -> Iterator[uuid.UUID]:
yield uuid.uuid4()
@pytest.fixture(scope="session", autouse=True)
def neon_shared_env(
pytestconfig: Config,
port_distributor: PortDistributor,
mock_s3_server: MockS3Server,
shared_broker: NeonBroker,
run_id: uuid.UUID,
top_output_dir: Path,
shared_test_output_dir: Path,
neon_binpath: Path,
build_type: str,
pg_distrib_dir: Path,
pg_version: PgVersion,
pageserver_virtual_file_io_engine: str,
pageserver_aux_file_policy: Optional[AuxFileStore],
pageserver_default_tenant_config_compaction_algorithm: Optional[Dict[str, Any]],
pageserver_io_buffer_alignment: Optional[int],
request: FixtureRequest,
) -> Iterator[NeonEnv]:
"""
Simple Neon environment, with no authentication and no safekeepers.
This fixture will use RemoteStorageKind.LOCAL_FS with pageserver.
"""
prefix = f"shared[{build_type}-{pg_version.v_prefixed}]-"
# Create the environment in the per-test output directory
repo_dir = get_test_repo_dir(request, top_output_dir, prefix)
with NeonEnvBuilder(
top_output_dir=top_output_dir,
repo_dir=repo_dir,
port_distributor=port_distributor,
broker=shared_broker,
mock_s3_server=mock_s3_server,
neon_binpath=neon_binpath,
pg_distrib_dir=pg_distrib_dir,
pg_version=pg_version,
run_id=run_id,
preserve_database_files=cast(bool, pytestconfig.getoption("--preserve-database-files")),
test_name=f"{prefix}{request.node.name}",
test_output_dir=shared_test_output_dir,
pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine,
pageserver_aux_file_policy=pageserver_aux_file_policy,
pageserver_default_tenant_config_compaction_algorithm=pageserver_default_tenant_config_compaction_algorithm,
pageserver_io_buffer_alignment=pageserver_io_buffer_alignment,
shared=True,
) as builder:
env = builder.init_start()
yield env
# This is autouse, so the test output directory always gets created, even
# if a test doesn't put anything there.
#
# NB: we request the overlay dir fixture so the fixture does its cleanups
@pytest.fixture(scope="session")
def shared_test_output_dir(
request: FixtureRequest,
pg_version: PgVersion,
build_type: str,
top_output_dir: Path,
shared_test_overlay_dir: Path,
) -> Iterator[Path]:
"""Create the working directory for shared tests."""
prefix = f"shared[{build_type}-{pg_version.v_prefixed}]-"
# one directory per test
test_dir = get_test_output_dir(request, top_output_dir, prefix)
log.info(f"test_output_dir is {test_dir}")
shutil.rmtree(test_dir, ignore_errors=True)
test_dir.mkdir()
yield test_dir
# Allure artifacts creation might involve the creation of `.tar.zst` archives,
# which aren't going to be used if Allure results collection is not enabled
# (i.e. --alluredir is not set).
# Skip `allure_attach_from_dir` in this case
if not request.config.getoption("--alluredir"):
return
preserve_database_files = False
for k, v in request.node.user_properties:
# NB: the neon_env_builder fixture uses this fixture (test_output_dir).
# So, neon_env_builder's cleanup runs before here.
# The cleanup propagates NeonEnvBuilder.preserve_database_files into this user property.
if k == "preserve_database_files":
assert isinstance(v, bool)
preserve_database_files = v
allure_attach_from_dir(test_dir, preserve_database_files)
@pytest.fixture(scope="session")
def shared_test_overlay_dir(request: FixtureRequest, top_output_dir: Path) -> Optional[Path]:
"""
Idempotently create a test's overlayfs mount state directory.
If the functionality isn't enabled via env var, returns None.
The procedure cleans up after previous runs that were aborted (e.g. due to Ctrl-C, OOM kills, etc).
"""
if os.getenv("NEON_ENV_BUILDER_USE_OVERLAYFS_FOR_SNAPSHOTS") is None:
return None
overlay_dir = get_test_overlay_dir(request, top_output_dir)
log.info(f"test_overlay_dir is {overlay_dir}")
overlay_dir.mkdir(exist_ok=True)
# unmount stale overlayfs mounts which subdirectories of `overlay_dir/*` as the overlayfs `upperdir` and `workdir`
for mountpoint in overlayfs.iter_mounts_beneath(get_test_output_dir(request, top_output_dir)):
cmd = ["sudo", "umount", str(mountpoint)]
log.info(
f"Unmounting stale overlayfs mount probably created during earlier test run: {cmd}"
)
subprocess.run(cmd, capture_output=True, check=True)
# the overlayfs `workdir`` is owned by `root`, shutil.rmtree won't work.
cmd = ["sudo", "rm", "-rf", str(overlay_dir)]
subprocess.run(cmd, capture_output=True, check=True)
overlay_dir.mkdir()
return overlay_dir
# no need to clean up anything: on clean shutdown,
# NeonEnvBuilder.overlay_cleanup_teardown takes care of cleanup
# and on unclean shutdown, this function will take care of it
# on the next test run

View File

@@ -0,0 +1,13 @@
from typing import Iterator
import pytest
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import MockS3Server
@pytest.fixture(scope="session")
def mock_s3_server(port_distributor: PortDistributor) -> Iterator[MockS3Server]:
mock_s3_server = MockS3Server(port_distributor.get_port())
yield mock_s3_server
mock_s3_server.kill()

View File

@@ -0,0 +1,348 @@
from functools import partial
from pathlib import Path
from typing import Any, Optional, cast, List, Dict
import pytest
from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import Endpoint, NeonEnv, PgProtocol, DEFAULT_BRANCH_NAME, tenant_get_shards, \
check_restored_datadir_content
from fixtures.pageserver.utils import wait_for_last_record_lsn
from fixtures.safekeeper.utils import are_walreceivers_absent
from fixtures.utils import wait_until
"""
In this file most important resources are exposed as function-level fixtures
that depend on session-level resources like pageservers and safekeepers.
The main rationale here is that we don't need to initialize a new SK/PS/etc
every time we want to test something that has branching: we can just as well
reuse still-available PageServers from other tests of the same kind.
"""
@pytest.fixture(scope="session")
def shared_storage_repo_path(build_type: str, base_dir: Path) -> Path:
return base_dir / f"shared[{build_type}]" / "repo"
class TEndpoint(PgProtocol):
def clear_shared_buffers(self, cursor: Optional[Any] = None):
"""
Best-effort way to clear postgres buffers. Pinned buffers will not be 'cleared.'
Might also clear LFC.
"""
if cursor is not None:
cursor.execute("select clear_buffer_cache()")
else:
self.safe_psql("select clear_buffer_cache()")
_TEndpoint = Endpoint
class TTimeline:
__primary: Optional[_TEndpoint]
__secondary: Optional[_TEndpoint]
def __init__(self, timeline_id: TimelineId, tenant: "TTenant", name: str):
self.id = timeline_id
self.tenant = tenant
self.name = name
self.__primary = None
self.__secondary = None
@property
def primary(self) -> TEndpoint:
if self.__primary is None:
self.__primary = cast(_TEndpoint, self.tenant.create_endpoint(
name=self._get_full_endpoint_name("primary"),
timeline_id=self.id,
primary=True,
running=True,
))
return cast(TEndpoint, self.__primary)
def primary_with_config(self, config_lines: List[str], reboot: bool = False):
if self.__primary is None:
self.__primary = cast(_TEndpoint, self.tenant.create_endpoint(
name=self._get_full_endpoint_name("primary"),
timeline_id=self.id,
primary=True,
running=True,
config_lines=config_lines,
))
else:
self.__primary.config(config_lines)
if reboot:
if self.__primary.is_running():
self.__primary.stop()
self.__primary.start()
return cast(TTimeline, self.__primary)
@property
def secondary(self) -> TEndpoint:
if self.__secondary is None:
self.__secondary = cast(_TEndpoint, self.tenant.create_endpoint(
name=self._get_full_endpoint_name("secondary"),
timeline_id=self.id,
primary=False,
running=True,
))
return cast(TEndpoint, self.__secondary)
def secondary_with_config(self, config_lines: List[str], reboot: bool = False) -> TEndpoint:
if self.__secondary is None:
self.__secondary = cast(_TEndpoint, self.tenant.create_endpoint(
name=self._get_full_endpoint_name("secondary"),
timeline_id=self.id,
primary=False,
running=True,
config_lines=config_lines,
))
else:
self.__secondary.config(config_lines)
if reboot:
if self.__secondary.is_running():
self.__secondary.stop()
self.__secondary.start()
return cast(TEndpoint, self.__secondary)
def _get_full_endpoint_name(self, name) -> str:
return f"{self.name}.{name}"
def create_branch(self, name: str, lsn: Optional[Lsn]) -> "TTimeline":
return self.tenant.create_timeline(
new_name=name,
parent_name=self.name,
branch_at=lsn,
)
def stop_and_flush(self, endpoint: TEndpoint):
self.tenant.stop_endpoint(endpoint)
self.tenant.flush_timeline_data(self)
def checkpoint(self, **kwargs):
self.tenant.checkpoint_timeline(self, **kwargs)
class TTenant:
"""
An object representing a single test case on a shared pageserver.
All operations here are safe practically safe.
"""
def __init__(
self,
env: NeonEnv,
name: str,
config: Optional[Dict[str, Any]] = None,
):
self.id = TenantId.generate()
self.timelines = []
self.timeline_names = {}
self.timeline_ids = {}
self.name = name
# publicly inaccessible stuff, used during shutdown
self.__endpoints: List[Endpoint] = []
self.__env: NeonEnv = env
env.neon_cli.create_tenant(
tenant_id=self.id,
set_default=False,
conf=config
)
self.first_timeline_id = env.neon_cli.create_branch(
new_branch_name=f"{self.name}:{DEFAULT_BRANCH_NAME}",
ancestor_branch_name=DEFAULT_BRANCH_NAME,
tenant_id=self.id,
)
self._add_timeline(
TTimeline(
timeline_id=self.first_timeline_id,
tenant=self,
name=DEFAULT_BRANCH_NAME,
)
)
def _add_timeline(self, timeline: TTimeline):
assert timeline.tenant == self
assert timeline not in self.timelines
assert timeline.id is not None
self.timelines.append(timeline)
self.timeline_ids[timeline.id] = timeline
if timeline.name is not None:
self.timeline_names[timeline.name] = timeline
def create_timeline(
self,
new_name: str,
parent_name: Optional[str] = None,
parent_id: Optional[TimelineId] = None,
branch_at: Optional[Lsn] = None,
) -> TTimeline:
if parent_name is not None:
pass
elif parent_id is not None:
assert parent_name is None
parent = self.timeline_ids[parent_id]
parent_name = parent.name
else:
raise LookupError("Timeline creation requires parent by either ID or name")
assert parent_name is not None
new_id = self.__env.neon_cli.create_branch(
new_branch_name=f"{self.name}:{new_name}",
ancestor_branch_name=f"{self.name}:{parent_name}",
tenant_id=self.id,
ancestor_start_lsn=branch_at,
)
new_tl = TTimeline(
timeline_id=new_id,
tenant=self,
name=new_name,
)
self._add_timeline(new_tl)
return new_tl
@property
def default_timeline(self) -> TTimeline:
return self.timeline_ids.get(self.first_timeline_id)
def start_endpoint(self, ep: TEndpoint):
if ep not in self.__endpoints:
return
ep = cast(Endpoint, ep)
if not ep.is_running():
ep.start()
def stop_endpoint(self, ep: TEndpoint, mode: str = "fast"):
if ep not in self.__endpoints:
return
ep = cast(Endpoint, ep)
if ep.is_running():
ep.stop(mode=mode)
def create_endpoint(
self,
name: str,
timeline_id: TimelineId,
primary: bool = True,
running: bool = False,
port: Optional[int] = None,
http_port: Optional[int] = None,
lsn: Optional[Lsn] = None,
config_lines: Optional[List[str]] = None,
) -> TEndpoint:
endpoint: _TEndpoint
if port is None:
port = self.__env.port_distributor.get_port()
if http_port is None:
http_port = self.__env.port_distributor.get_port()
if running:
endpoint = self.__env.endpoints.create_start(
branch_name=self.timeline_ids[timeline_id].name,
endpoint_id=f"{self.name}.{name}",
tenant_id=self.id,
lsn=lsn,
hot_standby=not primary,
config_lines=config_lines,
)
else:
endpoint = self.__env.endpoints.create(
branch_name=self.timeline_ids[timeline_id].name,
endpoint_id=f"{self.name}.{name}",
tenant_id=self.id,
lsn=lsn,
hot_standby=not primary,
config_lines=config_lines,
)
self.__endpoints.append(endpoint)
return endpoint
def shutdown_resources(self):
for ep in self.__endpoints:
try:
ep.stop_and_destroy("fast")
except BaseException as e:
log.error("Error encountered while shutting down endpoint %s", ep.endpoint_id, exc_info=e)
def reconfigure(self, endpoint: TEndpoint, lines: List[str], restart: bool):
if endpoint not in self.__endpoints:
return
endpoint = cast(_TEndpoint, endpoint)
was_running = endpoint.is_running()
if restart and was_running:
endpoint.stop()
endpoint.config(lines)
if restart:
endpoint.start()
def flush_timeline_data(self, timeline: TTimeline) -> Lsn:
commit_lsn: Lsn = Lsn(0)
# In principle in the absense of failures polling single sk would be enough.
for sk in self.__env.safekeepers:
cli = sk.http_client()
# wait until compute connections are gone
wait_until(30, 0.5, partial(are_walreceivers_absent, cli, self.id, timeline.id))
commit_lsn = max(cli.get_commit_lsn(self.id, timeline.id), commit_lsn)
# Note: depending on WAL filtering implementation, probably most shards
# won't be able to reach commit_lsn (unless gaps are also ack'ed), so this
# is broken in sharded case.
shards = tenant_get_shards(env=self.__env, tenant_id=self.id)
for tenant_shard_id, pageserver in shards:
log.info(
f"flush_ep_to_pageserver: waiting for {commit_lsn} on shard {tenant_shard_id} on pageserver {pageserver.id})"
)
waited = wait_for_last_record_lsn(
pageserver.http_client(), tenant_shard_id, timeline.id, commit_lsn
)
assert waited >= commit_lsn
return commit_lsn
def checkpoint_timeline(self, timeline: TTimeline, **kwargs):
self.__env.pageserver.http_client().timeline_checkpoint(
tenant_id=self.id,
timeline_id=timeline.id,
**kwargs,
)
def pgdatadir(self, endpoint: TEndpoint):
if endpoint not in self.__endpoints:
return None
return cast(Endpoint, endpoint).pgdata_dir
def check_restored_datadir_content(self, path, endpoint: TEndpoint, *args, **kwargs):
if endpoint not in self.__endpoints:
return
check_restored_datadir_content(path, self.__env, cast(Endpoint, endpoint), *args, **kwargs)

View File

@@ -84,7 +84,7 @@ def test_storage_controller_many_tenants(
compute_reconfigure_listener.register_on_notify(lambda body: time.sleep(0.01))
env = neon_env_builder.init_configs()
env.start()
neon_env_builder.start()
# We will intentionally stress reconciler concurrrency, which triggers a warning when lots
# of shards are hitting the delayed path.

View File

@@ -15,6 +15,7 @@ from fixtures.neon_fixtures import (
)
from fixtures.pageserver.http import PageserverApiException
from fixtures.pageserver.utils import wait_until_tenant_active
from fixtures.shared_fixtures import TTimeline
from fixtures.utils import query_scalar
from performance.test_perf_pgbench import get_scales_matrix
from requests import RequestException
@@ -115,13 +116,10 @@ def test_branching_with_pgbench(
# This test checks if the pageserver is able to handle a "unnormalized" starting LSN.
#
# Related: see discussion in https://github.com/neondatabase/neon/pull/2143#issuecomment-1209092186
def test_branching_unnormalized_start_lsn(neon_simple_env: NeonEnv, pg_bin: PgBin):
def test_branching_unnormalized_start_lsn(timeline: TTimeline, pg_bin: PgBin):
XLOG_BLCKSZ = 8192
env = neon_simple_env
env.neon_cli.create_branch("b0")
endpoint0 = env.endpoints.create_start("b0")
endpoint0 = timeline.primary
pg_bin.run_capture(["pgbench", "-i", endpoint0.connstr()])
@@ -133,8 +131,8 @@ def test_branching_unnormalized_start_lsn(neon_simple_env: NeonEnv, pg_bin: PgBi
start_lsn = Lsn((int(curr_lsn) - XLOG_BLCKSZ) // XLOG_BLCKSZ * XLOG_BLCKSZ)
log.info(f"Branching b1 from b0 starting at lsn {start_lsn}...")
env.neon_cli.create_branch("b1", "b0", ancestor_start_lsn=start_lsn)
endpoint1 = env.endpoints.create_start("b1")
tl2 = timeline.create_branch("branch", start_lsn)
endpoint1 = tl2.primary
pg_bin.run_capture(["pgbench", "-i", endpoint1.connstr()])

View File

@@ -1,14 +1,13 @@
from fixtures.neon_fixtures import NeonEnvBuilder, flush_ep_to_pageserver
from pathlib import Path
from fixtures.neon_fixtures import NeonEnvBuilder, flush_ep_to_pageserver, test_output_dir
from fixtures.shared_fixtures import TTimeline
def do_combocid_op(neon_env_builder: NeonEnvBuilder, op):
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start(
"main",
config_lines=[
"shared_buffers='1MB'",
],
)
def do_combocid_op(timeline: TTimeline, op):
endpoint = timeline.primary_with_config(config_lines=[
"shared_buffers='1MB'",
])
conn = endpoint.connect()
cur = conn.cursor()
@@ -43,32 +42,26 @@ def do_combocid_op(neon_env_builder: NeonEnvBuilder, op):
assert len(rows) == 500
cur.execute("rollback")
flush_ep_to_pageserver(env, endpoint, env.initial_tenant, env.initial_timeline)
env.pageserver.http_client().timeline_checkpoint(
env.initial_tenant, env.initial_timeline, compact=False, wait_until_uploaded=True
)
timeline.stop_and_flush(endpoint)
timeline.checkpoint(compact=False, wait_until_uploaded=True)
def test_combocid_delete(neon_env_builder: NeonEnvBuilder):
do_combocid_op(neon_env_builder, "delete from t")
def test_combocid_delete(timeline: TTimeline):
do_combocid_op(timeline, "delete from t")
def test_combocid_update(neon_env_builder: NeonEnvBuilder):
do_combocid_op(neon_env_builder, "update t set val=val+1")
def test_combocid_update(timeline: TTimeline):
do_combocid_op(timeline, "update t set val=val+1")
def test_combocid_lock(neon_env_builder: NeonEnvBuilder):
do_combocid_op(neon_env_builder, "select * from t for update")
def test_combocid_lock(timeline: TTimeline):
do_combocid_op(timeline, "select * from t for update")
def test_combocid_multi_insert(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start(
"main",
config_lines=[
"shared_buffers='1MB'",
],
)
def test_combocid_multi_insert(timeline: TTimeline, test_output_dir: Path):
endpoint = timeline.primary_with_config(config_lines=[
"shared_buffers='1MB'",
])
conn = endpoint.connect()
cur = conn.cursor()
@@ -77,7 +70,7 @@ def test_combocid_multi_insert(neon_env_builder: NeonEnvBuilder):
cur.execute("CREATE EXTENSION neon_test_utils")
cur.execute("create table t(id integer, val integer)")
file_path = f"{endpoint.pg_data_dir_path()}/t.csv"
file_path = str(test_output_dir / "t.csv")
cur.execute(f"insert into t select g, 0 from generate_series(1,{n_records}) g")
cur.execute(f"copy t to '{file_path}'")
cur.execute("truncate table t")
@@ -106,15 +99,12 @@ def test_combocid_multi_insert(neon_env_builder: NeonEnvBuilder):
cur.execute("rollback")
flush_ep_to_pageserver(env, endpoint, env.initial_tenant, env.initial_timeline)
env.pageserver.http_client().timeline_checkpoint(
env.initial_tenant, env.initial_timeline, compact=False, wait_until_uploaded=True
)
timeline.stop_and_flush(endpoint)
timeline.checkpoint(compact=False, wait_until_uploaded=True)
def test_combocid(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start("main")
def test_combocid(timeline: TTimeline):
endpoint = timeline.primary
conn = endpoint.connect()
cur = conn.cursor()
@@ -147,7 +137,5 @@ def test_combocid(neon_env_builder: NeonEnvBuilder):
cur.execute("rollback")
flush_ep_to_pageserver(env, endpoint, env.initial_tenant, env.initial_timeline)
env.pageserver.http_client().timeline_checkpoint(
env.initial_tenant, env.initial_timeline, compact=False, wait_until_uploaded=True
)
timeline.stop_and_flush(endpoint)
timeline.checkpoint(compact=False, wait_until_uploaded=True)

View File

@@ -178,7 +178,7 @@ def test_backward_compatibility(
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.from_repo_dir(compatibility_snapshot_dir / "repo")
env.pageserver.allowed_errors.append(ingest_lag_log_line)
env.start()
neon_env_builder.start()
check_neon_works(
env,
@@ -265,7 +265,7 @@ def test_forward_compatibility(
# does not include logs from previous runs
assert not env.pageserver.log_contains("git-env:" + prev_pageserver_version)
env.start()
neon_env_builder.start()
# ensure the specified pageserver is running
assert env.pageserver.log_contains("git-env:" + prev_pageserver_version)

View File

@@ -5,6 +5,7 @@ import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
from fixtures.pg_version import PgVersion
from fixtures.shared_fixtures import TTimeline
from fixtures.utils import query_scalar
@@ -12,18 +13,17 @@ from fixtures.utils import query_scalar
# Test CREATE DATABASE when there have been relmapper changes
#
@pytest.mark.parametrize("strategy", ["file_copy", "wal_log"])
def test_createdb(neon_simple_env: NeonEnv, strategy: str):
env = neon_simple_env
if env.pg_version == PgVersion.V14 and strategy == "wal_log":
def test_createdb(timeline: TTimeline, pg_version: PgVersion, strategy: str):
if pg_version == PgVersion.V14 and strategy == "wal_log":
pytest.skip("wal_log strategy not supported on PostgreSQL 14")
endpoint = env.endpoints.create_start("main")
endpoint = timeline.primary
with endpoint.cursor() as cur:
# Cause a 'relmapper' change in the original branch
cur.execute("VACUUM FULL pg_class")
if env.pg_version == PgVersion.V14:
if pg_version == PgVersion.V14:
cur.execute("CREATE DATABASE foodb")
else:
cur.execute(f"CREATE DATABASE foodb STRATEGY={strategy}")
@@ -31,8 +31,8 @@ def test_createdb(neon_simple_env: NeonEnv, strategy: str):
lsn = query_scalar(cur, "SELECT pg_current_wal_insert_lsn()")
# Create a branch
env.neon_cli.create_branch("test_createdb2", "main", ancestor_start_lsn=lsn)
endpoint2 = env.endpoints.create_start("test_createdb2")
tl2 = timeline.create_branch("createdb2", lsn=lsn)
endpoint2 = tl2.primary
# Test that you can connect to the new database on both branches
for db in (endpoint, endpoint2):
@@ -58,9 +58,8 @@ def test_createdb(neon_simple_env: NeonEnv, strategy: str):
#
# Test DROP DATABASE
#
def test_dropdb(neon_simple_env: NeonEnv, test_output_dir):
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
def test_dropdb(timeline: TTimeline, test_output_dir):
endpoint = timeline.primary
with endpoint.cursor() as cur:
cur.execute("CREATE DATABASE foodb")
@@ -77,28 +76,28 @@ def test_dropdb(neon_simple_env: NeonEnv, test_output_dir):
lsn_after_drop = query_scalar(cur, "SELECT pg_current_wal_insert_lsn()")
# Create two branches before and after database drop.
env.neon_cli.create_branch("test_before_dropdb", "main", ancestor_start_lsn=lsn_before_drop)
endpoint_before = env.endpoints.create_start("test_before_dropdb")
tl_before = timeline.create_branch("test_before_dropdb", lsn=lsn_before_drop)
endpoint_before = tl_before.primary
env.neon_cli.create_branch("test_after_dropdb", "main", ancestor_start_lsn=lsn_after_drop)
endpoint_after = env.endpoints.create_start("test_after_dropdb")
tl_after = timeline.create_branch("test_after_dropdb", lsn=lsn_after_drop)
endpoint_after = tl_after.primary
# Test that database exists on the branch before drop
endpoint_before.connect(dbname="foodb").close()
# Test that database subdir exists on the branch before drop
assert endpoint_before.pgdata_dir
dbpath = pathlib.Path(endpoint_before.pgdata_dir) / "base" / str(dboid)
assert timeline.tenant.pgdatadir(endpoint_before)
dbpath = pathlib.Path(timeline.tenant.pgdatadir(endpoint_before)) / "base" / str(dboid)
log.info(dbpath)
assert os.path.isdir(dbpath) is True
# Test that database subdir doesn't exist on the branch after drop
assert endpoint_after.pgdata_dir
dbpath = pathlib.Path(endpoint_after.pgdata_dir) / "base" / str(dboid)
assert timeline.tenant.pgdatadir(endpoint_after)
dbpath = pathlib.Path(timeline.tenant.pgdatadir(endpoint_after)) / "base" / str(dboid)
log.info(dbpath)
assert os.path.isdir(dbpath) is False
# Check that we restore the content of the datadir correctly
check_restored_datadir_content(test_output_dir, env, endpoint)
timeline.tenant.check_restored_datadir_content(test_output_dir, endpoint)

View File

@@ -1,13 +1,13 @@
from fixtures.neon_fixtures import NeonEnv
from fixtures.shared_fixtures import TTimeline
from fixtures.utils import query_scalar
#
# Test CREATE USER to check shared catalog restore
#
def test_createuser(neon_simple_env: NeonEnv):
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
def test_createuser(timeline: TTimeline):
endpoint = timeline.primary
with endpoint.cursor() as cur:
# Cause a 'relmapper' change in the original branch
@@ -18,8 +18,8 @@ def test_createuser(neon_simple_env: NeonEnv):
lsn = query_scalar(cur, "SELECT pg_current_wal_insert_lsn()")
# Create a branch
env.neon_cli.create_branch("test_createuser2", "main", ancestor_start_lsn=lsn)
endpoint2 = env.endpoints.create_start("test_createuser2")
branch = timeline.create_branch("test_createuser2", lsn=lsn)
endpoint2 = branch.primary
# Test that you can connect to new branch as a new user
assert endpoint2.safe_psql("select current_user", user="testuser") == [("testuser",)]

View File

@@ -1,5 +1,6 @@
import pytest
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.neon_fixtures import Endpoint
from fixtures.shared_fixtures import TTimeline
@pytest.mark.parametrize(
@@ -10,14 +11,11 @@ from fixtures.neon_fixtures import NeonEnvBuilder
"💣", # calls `trigger_segfault` internally
],
)
def test_endpoint_crash(neon_env_builder: NeonEnvBuilder, sql_func: str):
def test_endpoint_crash(timeline: TTimeline, sql_func: str):
"""
Test that triggering crash from neon_test_utils crashes the endpoint
"""
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_endpoint_crash")
endpoint = env.endpoints.create_start("test_endpoint_crash")
endpoint = timeline.primary
endpoint.safe_psql("CREATE EXTENSION neon_test_utils;")
with pytest.raises(Exception, match="This probably means the server terminated abnormally"):
endpoint.safe_psql(f"SELECT {sql_func}();")

View File

@@ -1,10 +1,8 @@
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.shared_fixtures import TTimeline
def test_fsm_truncate(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_fsm_truncate")
endpoint = env.endpoints.create_start("test_fsm_truncate")
def test_fsm_truncate(timeline: TTimeline):
endpoint = timeline.primary
endpoint.safe_psql(
"CREATE TABLE t1(key int); CREATE TABLE t2(key int); TRUNCATE TABLE t1; TRUNCATE TABLE t2;"
)

View File

@@ -1,17 +1,15 @@
import time
from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup
from fixtures.shared_fixtures import TTimeline
#
# Test that redo of XLOG_GIN_VACUUM_PAGE doesn't produce error
#
def test_gin_redo(neon_simple_env: NeonEnv):
env = neon_simple_env
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
time.sleep(1)
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
def test_gin_redo(timeline: TTimeline):
primary = timeline.primary
secondary = timeline.secondary
con = primary.connect()
cur = con.cursor()
cur.execute("create table gin_test_tbl(id integer, i int4[])")

View File

@@ -14,6 +14,7 @@ from fixtures.neon_fixtures import (
tenant_get_shards,
wait_replica_caughtup,
)
from fixtures.shared_fixtures import TTimeline
from fixtures.utils import wait_until
@@ -221,29 +222,20 @@ def pgbench_accounts_initialized(ep):
#
# Without hs feedback enabled we'd see 'User query might have needed to see row
# versions that must be removed.' errors.
def test_hot_standby_feedback(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
env = neon_env_builder.init_start()
agressive_vacuum_conf = [
def test_hot_standby_feedback(timeline: TTimeline, pg_bin: PgBin):
with timeline.primary_with_config(config_lines=[
"log_autovacuum_min_duration = 0",
"autovacuum_naptime = 10s",
"autovacuum_vacuum_threshold = 25",
"autovacuum_vacuum_scale_factor = 0.1",
"autovacuum_vacuum_cost_delay = -1",
]
with env.endpoints.create_start(
branch_name="main", endpoint_id="primary", config_lines=agressive_vacuum_conf
) as primary:
]) as primary:
# It would be great to have more strict max_standby_streaming_delay=0s here, but then sometimes it fails with
# 'User was holding shared buffer pin for too long.'.
with env.endpoints.new_replica_start(
origin=primary,
endpoint_id="secondary",
config_lines=[
"max_standby_streaming_delay=2s",
"neon.protocol_version=2",
"hot_standby_feedback=true",
],
) as secondary:
with timeline.secondary_with_config(config_lines=[
"max_standby_streaming_delay=2s",
"hot_standby_feedback=true",
]) as secondary:
log.info(
f"primary connstr is {primary.connstr()}, secondary connstr {secondary.connstr()}"
)
@@ -286,20 +278,15 @@ def test_hot_standby_feedback(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
# Test race condition between WAL replay and backends performing queries
# https://github.com/neondatabase/neon/issues/7791
def test_replica_query_race(neon_simple_env: NeonEnv):
env = neon_simple_env
primary_ep = env.endpoints.create_start(
branch_name="main",
endpoint_id="primary",
)
def test_replica_query_race(timeline: TTimeline):
primary_ep = timeline.primary
with primary_ep.connect() as p_con:
with p_con.cursor() as p_cur:
p_cur.execute("CREATE EXTENSION neon_test_utils")
p_cur.execute("CREATE TABLE test AS SELECT 0 AS counter")
standby_ep = env.endpoints.new_replica_start(origin=primary_ep, endpoint_id="standby")
standby_ep = timeline.secondary
wait_replica_caughtup(primary_ep, standby_ep)
# In primary, run a lot of UPDATEs on a single page

View File

@@ -8,22 +8,19 @@ import time
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, PgBin
from fixtures.shared_fixtures import TTimeline
#
# Test branching, when a transaction is in prepared state
#
@pytest.mark.timeout(600)
def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin):
env = neon_simple_env
endpoint = env.endpoints.create_start(
"main",
config_lines=[
"neon.file_cache_path='file.cache'",
"neon.max_file_cache_size=512MB",
"neon.file_cache_size_limit=512MB",
],
)
def test_lfc_resize(timeline: TTimeline, pg_bin: PgBin):
endpoint = timeline.primary_with_config(config_lines=[
"neon.file_cache_path='file.cache'",
"neon.max_file_cache_size=512MB",
"neon.file_cache_size_limit=512MB",
])
n_resize = 10
scale = 100
@@ -49,7 +46,7 @@ def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin):
thread.join()
lfc_file_path = f"{endpoint.pg_data_dir_path()}/file.cache"
lfc_file_path = timeline.tenant.pgdatadir(endpoint) / "file.cache"
lfc_file_size = os.path.getsize(lfc_file_path)
res = subprocess.run(["ls", "-sk", lfc_file_path], check=True, text=True, capture_output=True)
lfc_file_blocks = re.findall("([0-9A-F]+)", res.stdout)[0]

View File

@@ -3,6 +3,7 @@ from pathlib import Path
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from fixtures.shared_fixtures import TTimeline
from fixtures.utils import query_scalar
@@ -73,18 +74,14 @@ WITH (fillfactor='100');
assert blocks < 10
def test_sliding_working_set_approximation(neon_simple_env: NeonEnv):
env = neon_simple_env
def test_sliding_working_set_approximation(timeline: TTimeline):
endpoint = timeline.primary_with_config(config_lines=[
"autovacuum = off",
"shared_buffers=1MB",
"neon.max_file_cache_size=256MB",
"neon.file_cache_size_limit=245MB",
])
endpoint = env.endpoints.create_start(
branch_name="main",
config_lines=[
"autovacuum = off",
"shared_buffers=1MB",
"neon.max_file_cache_size=256MB",
"neon.file_cache_size_limit=245MB",
],
)
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("create extension neon")

View File

@@ -1,4 +1,5 @@
from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
from fixtures.shared_fixtures import TTimeline
from fixtures.utils import query_scalar
@@ -12,9 +13,8 @@ from fixtures.utils import query_scalar
# is enough to verify that the WAL records are handled correctly
# in the pageserver.
#
def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
def test_multixact(timeline: TTimeline, test_output_dir):
endpoint = timeline.primary
cur = endpoint.connect().cursor()
cur.execute(
@@ -72,10 +72,8 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
assert int(next_multixact_id) > int(next_multixact_id_old)
# Branch at this point
env.neon_cli.create_branch(
"test_multixact_new", ancestor_branch_name="main", ancestor_start_lsn=lsn
)
endpoint_new = env.endpoints.create_start("test_multixact_new")
branch = timeline.create_branch("test_multixact_new", lsn=lsn)
endpoint_new = branch.primary
next_multixact_id_new = endpoint_new.safe_psql(
"SELECT next_multixact_id FROM pg_control_checkpoint()"
@@ -85,4 +83,4 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
assert next_multixact_id_new == next_multixact_id
# Check that we can restore the content of the datadir correctly
check_restored_datadir_content(test_output_dir, env, endpoint)
timeline.tenant.check_restored_datadir_content(test_output_dir, endpoint)

View File

@@ -374,7 +374,7 @@ def test_sharding_split_smoke(
non_default_tenant_config = {"gc_horizon": 77 * 1024 * 1024}
env = neon_env_builder.init_configs(True)
env.start()
neon_env_builder.start()
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
env.neon_cli.create_tenant(
@@ -1436,7 +1436,7 @@ def test_sharding_unlogged_relation(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_pageservers = 2
env = neon_env_builder.init_configs()
env.start()
neon_env_builder.start()
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
@@ -1475,7 +1475,7 @@ def test_top_tenants(neon_env_builder: NeonEnvBuilder):
"""
env = neon_env_builder.init_configs()
env.start()
neon_env_builder.start()
tenants = []
n_tenants = 8

View File

@@ -2,7 +2,7 @@ import time
from contextlib import closing
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, fork_at_current_lsn
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
from fixtures.utils import query_scalar
@@ -57,7 +57,7 @@ def test_vm_bit_clear(neon_simple_env: NeonEnv):
cur.execute("UPDATE vmtest_cold_update2 SET id = 5000, filler=repeat('x', 200) WHERE id = 1")
# Branch at this point, to test that later
fork_at_current_lsn(env, endpoint, "test_vm_bit_clear_new", "main")
# fork_at_current_lsn(env, endpoint, "test_vm_bit_clear_new", "main")
# Clear the buffer cache, to force the VM page to be re-fetched from
# the page server
@@ -91,6 +91,7 @@ def test_vm_bit_clear(neon_simple_env: NeonEnv):
# a dirty VM page is evicted. If the VM bit was not correctly cleared by the
# earlier WAL record, the full-page image hides the problem. Starting a new
# server at the right point-in-time avoids that full-page image.
endpoint_new = env.endpoints.create_start("test_vm_bit_clear_new")
pg_new_conn = endpoint_new.connect()

View File

@@ -1,11 +1,11 @@
{
"v16": [
"16.4",
"0baa7346dfd42d61912eeca554c9bb0a190f0a1e"
"6e9a4ff6249ac02b8175054b7b3f7dfb198be48b"
],
"v15": [
"15.8",
"6f6d77fb5960602fcd3fd130aca9f99ecb1619c9"
"49d5e576a56e4cc59cd6a6a0791b2324b9fa675e"
],
"v14": [
"14.13",