mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-22 04:42:56 +00:00
Compare commits
11 Commits
faster-ci
...
hackathon/
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
05a8ec269a | ||
|
|
fcab61bdcd | ||
|
|
9e3ead3689 | ||
|
|
8dc069037b | ||
|
|
0a363c3dce | ||
|
|
aeca15008c | ||
|
|
43846b72fa | ||
|
|
cb060548fb | ||
|
|
bae793ffcd | ||
|
|
26b5fcdc50 | ||
|
|
97582178cb |
1
.devcontainer/Dockerfile.devcontainer
Normal file
1
.devcontainer/Dockerfile.devcontainer
Normal file
@@ -0,0 +1 @@
|
||||
FROM neondatabase/build-tools:pinned
|
||||
23
.devcontainer/devcontainer.json
Normal file
23
.devcontainer/devcontainer.json
Normal file
@@ -0,0 +1,23 @@
|
||||
// https://containers.dev/implementors/json_reference/
|
||||
{
|
||||
"name": "Neon",
|
||||
"build": {
|
||||
"context": "..",
|
||||
"dockerfile": "Dockerfile.devcontainer"
|
||||
},
|
||||
|
||||
"postCreateCommand": {
|
||||
"build neon": "BUILD_TYPE=debug CARGO_BUILD_FLAGS='--features=testing' mold -run make -s -j`nproc`",
|
||||
"install python deps": "./scripts/pysync"
|
||||
},
|
||||
|
||||
"customizations": {
|
||||
"vscode": {
|
||||
"extensions": [
|
||||
"charliermarsh.ruff",
|
||||
"github.vscode-github-actions",
|
||||
"rust-lang.rust-analyzer"
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -640,6 +640,8 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
|
||||
}
|
||||
Some(("branch", branch_match)) => {
|
||||
let tenant_id = get_tenant_id(branch_match, env)?;
|
||||
let new_timeline_id =
|
||||
parse_timeline_id(branch_match)?.unwrap_or(TimelineId::generate());
|
||||
let new_branch_name = branch_match
|
||||
.get_one::<String>("branch-name")
|
||||
.ok_or_else(|| anyhow!("No branch name provided"))?;
|
||||
@@ -658,7 +660,6 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
|
||||
.map(|lsn_str| Lsn::from_str(lsn_str))
|
||||
.transpose()
|
||||
.context("Failed to parse ancestor start Lsn from the request")?;
|
||||
let new_timeline_id = TimelineId::generate();
|
||||
let storage_controller = StorageController::from_env(env);
|
||||
let create_req = TimelineCreateRequest {
|
||||
new_timeline_id,
|
||||
@@ -1570,7 +1571,6 @@ fn cli() -> Command {
|
||||
.value_parser(value_parser!(PathBuf))
|
||||
.value_name("config")
|
||||
)
|
||||
.arg(pg_version_arg.clone())
|
||||
.arg(force_arg)
|
||||
)
|
||||
.subcommand(
|
||||
@@ -1583,6 +1583,7 @@ fn cli() -> Command {
|
||||
.subcommand(Command::new("branch")
|
||||
.about("Create a new timeline, using another timeline as a base, copying its data")
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(timeline_id_arg.clone())
|
||||
.arg(branch_name_arg.clone())
|
||||
.arg(Arg::new("ancestor-branch-name").long("ancestor-branch-name")
|
||||
.help("Use last Lsn of another timeline (and its data) as base when creating the new timeline. The timeline gets resolved by its branch name.").required(false))
|
||||
|
||||
@@ -89,8 +89,19 @@ impl PageserverUtilization {
|
||||
|
||||
/// If a node is currently hosting more work than it can comfortably handle. This does not indicate that
|
||||
/// it will fail, but it is a strong signal that more work should not be added unless there is no alternative.
|
||||
///
|
||||
/// When a node is overloaded, we may override soft affinity preferences and do things like scheduling
|
||||
/// into a node in a less desirable AZ, if all the nodes in the preferred AZ are overloaded.
|
||||
pub fn is_overloaded(score: RawScore) -> bool {
|
||||
score >= Self::UTILIZATION_FULL
|
||||
// Why the factor of two? This is unscientific but reflects behavior of real systems:
|
||||
// - In terms of shard counts, a node's preferred max count is a soft limit intended to keep
|
||||
// startup and housekeeping jobs nice and responsive. We can go to double this limit if needed
|
||||
// until some more nodes are deployed.
|
||||
// - In terms of disk space, the node's utilization heuristic assumes every tenant needs to
|
||||
// hold its biggest timeline fully on disk, which is tends to be an over estimate when
|
||||
// some tenants are very idle and have dropped layers from disk. In practice going up to
|
||||
// double is generally better than giving up and scheduling in a sub-optimal AZ.
|
||||
score >= 2 * Self::UTILIZATION_FULL
|
||||
}
|
||||
|
||||
pub fn adjust_shard_count_max(&mut self, shard_count: u32) {
|
||||
|
||||
@@ -81,17 +81,16 @@ pub fn is_expected_io_error(e: &io::Error) -> bool {
|
||||
)
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait Handler<IO> {
|
||||
/// Handle single query.
|
||||
/// postgres_backend will issue ReadyForQuery after calling this (this
|
||||
/// might be not what we want after CopyData streaming, but currently we don't
|
||||
/// care). It will also flush out the output buffer.
|
||||
async fn process_query(
|
||||
fn process_query(
|
||||
&mut self,
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
query_string: &str,
|
||||
) -> Result<(), QueryError>;
|
||||
) -> impl Future<Output = Result<(), QueryError>>;
|
||||
|
||||
/// Called on startup packet receival, allows to process params.
|
||||
///
|
||||
|
||||
@@ -23,7 +23,6 @@ async fn make_tcp_pair() -> (TcpStream, TcpStream) {
|
||||
|
||||
struct TestHandler {}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<IO: AsyncRead + AsyncWrite + Unpin + Send> Handler<IO> for TestHandler {
|
||||
// return single col 'hey' for any query
|
||||
async fn process_query(
|
||||
|
||||
@@ -1199,7 +1199,6 @@ impl PageServerHandler {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<IO> postgres_backend::Handler<IO> for PageServerHandler
|
||||
where
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
|
||||
|
||||
@@ -1205,6 +1205,13 @@ impl<'a> DatadirModification<'a> {
|
||||
img: Bytes,
|
||||
) -> anyhow::Result<()> {
|
||||
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
|
||||
let key = rel_block_to_key(rel, blknum);
|
||||
if !key.is_valid_key_on_write_path() {
|
||||
anyhow::bail!(
|
||||
"the request contains data not supported by pageserver at {}",
|
||||
key
|
||||
);
|
||||
}
|
||||
self.put(rel_block_to_key(rel, blknum), Value::Image(img));
|
||||
Ok(())
|
||||
}
|
||||
@@ -1216,14 +1223,34 @@ impl<'a> DatadirModification<'a> {
|
||||
blknum: BlockNumber,
|
||||
img: Bytes,
|
||||
) -> anyhow::Result<()> {
|
||||
self.put(slru_block_to_key(kind, segno, blknum), Value::Image(img));
|
||||
let key = slru_block_to_key(kind, segno, blknum);
|
||||
if !key.is_valid_key_on_write_path() {
|
||||
anyhow::bail!(
|
||||
"the request contains data not supported by pageserver at {}",
|
||||
key
|
||||
);
|
||||
}
|
||||
self.put(key, Value::Image(img));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn put_rel_page_image_zero(&mut self, rel: RelTag, blknum: BlockNumber) {
|
||||
self.pending_zero_data_pages
|
||||
.insert(rel_block_to_key(rel, blknum).to_compact());
|
||||
pub(crate) fn put_rel_page_image_zero(
|
||||
&mut self,
|
||||
rel: RelTag,
|
||||
blknum: BlockNumber,
|
||||
) -> anyhow::Result<()> {
|
||||
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
|
||||
let key = rel_block_to_key(rel, blknum);
|
||||
if !key.is_valid_key_on_write_path() {
|
||||
anyhow::bail!(
|
||||
"the request contains data not supported by pageserver: {} @ {}",
|
||||
key,
|
||||
self.lsn
|
||||
);
|
||||
}
|
||||
self.pending_zero_data_pages.insert(key.to_compact());
|
||||
self.pending_bytes += ZERO_PAGE.len();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn put_slru_page_image_zero(
|
||||
@@ -1231,10 +1258,18 @@ impl<'a> DatadirModification<'a> {
|
||||
kind: SlruKind,
|
||||
segno: u32,
|
||||
blknum: BlockNumber,
|
||||
) {
|
||||
self.pending_zero_data_pages
|
||||
.insert(slru_block_to_key(kind, segno, blknum).to_compact());
|
||||
) -> anyhow::Result<()> {
|
||||
let key = slru_block_to_key(kind, segno, blknum);
|
||||
if !key.is_valid_key_on_write_path() {
|
||||
anyhow::bail!(
|
||||
"the request contains data not supported by pageserver: {} @ {}",
|
||||
key,
|
||||
self.lsn
|
||||
);
|
||||
}
|
||||
self.pending_zero_data_pages.insert(key.to_compact());
|
||||
self.pending_bytes += ZERO_PAGE.len();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Call this at the end of each WAL record.
|
||||
|
||||
@@ -1222,7 +1222,7 @@ impl WalIngest {
|
||||
if rec.blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0 {
|
||||
// Tail of last remaining FSM page has to be zeroed.
|
||||
// We are not precise here and instead of digging in FSM bitmap format just clear the whole page.
|
||||
modification.put_rel_page_image_zero(rel, fsm_physical_page_no);
|
||||
modification.put_rel_page_image_zero(rel, fsm_physical_page_no)?;
|
||||
fsm_physical_page_no += 1;
|
||||
}
|
||||
let nblocks = get_relsize(modification, rel, ctx).await?;
|
||||
@@ -1244,7 +1244,7 @@ impl WalIngest {
|
||||
if rec.blkno % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 {
|
||||
// Tail of last remaining vm page has to be zeroed.
|
||||
// We are not precise here and instead of digging in VM bitmap format just clear the whole page.
|
||||
modification.put_rel_page_image_zero(rel, vm_page_no);
|
||||
modification.put_rel_page_image_zero(rel, vm_page_no)?;
|
||||
vm_page_no += 1;
|
||||
}
|
||||
let nblocks = get_relsize(modification, rel, ctx).await?;
|
||||
@@ -1737,7 +1737,7 @@ impl WalIngest {
|
||||
continue;
|
||||
}
|
||||
|
||||
modification.put_rel_page_image_zero(rel, gap_blknum);
|
||||
modification.put_rel_page_image_zero(rel, gap_blknum)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
@@ -1803,7 +1803,7 @@ impl WalIngest {
|
||||
|
||||
// fill the gap with zeros
|
||||
for gap_blknum in old_nblocks..blknum {
|
||||
modification.put_slru_page_image_zero(kind, segno, gap_blknum);
|
||||
modification.put_slru_page_image_zero(kind, segno, gap_blknum)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@@ -311,7 +311,9 @@ async fn auth_quirks(
|
||||
let (allowed_ips, maybe_secret) = api.get_allowed_ips_and_secret(ctx, &info).await?;
|
||||
|
||||
// check allowed list
|
||||
if !check_peer_addr_is_in_list(&ctx.peer_addr(), &allowed_ips) {
|
||||
if config.ip_allowlist_check_enabled
|
||||
&& !check_peer_addr_is_in_list(&ctx.peer_addr(), &allowed_ips)
|
||||
{
|
||||
return Err(auth::AuthError::ip_address_not_allowed(ctx.peer_addr()));
|
||||
}
|
||||
|
||||
@@ -603,6 +605,7 @@ mod tests {
|
||||
rate_limiter_enabled: true,
|
||||
rate_limiter: AuthRateLimiter::new(&RateBucketInfo::DEFAULT_AUTH_SET),
|
||||
rate_limit_ip_subnet: 64,
|
||||
ip_allowlist_check_enabled: true,
|
||||
});
|
||||
|
||||
async fn read_message(r: &mut (impl AsyncRead + Unpin), b: &mut BytesMut) -> PgMessage {
|
||||
|
||||
@@ -538,4 +538,17 @@ mod tests {
|
||||
));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_connection_blocker() {
|
||||
fn check(v: serde_json::Value) -> bool {
|
||||
let peer_addr = IpAddr::from([127, 0, 0, 1]);
|
||||
let ip_list: Vec<IpPattern> = serde_json::from_value(v).unwrap();
|
||||
check_peer_addr_is_in_list(&peer_addr, &ip_list)
|
||||
}
|
||||
|
||||
assert!(check(json!([])));
|
||||
assert!(check(json!(["127.0.0.1"])));
|
||||
assert!(!check(json!(["255.255.255.255"])));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -224,6 +224,7 @@ fn build_config(args: &LocalProxyCliArgs) -> anyhow::Result<&'static ProxyConfig
|
||||
rate_limiter_enabled: false,
|
||||
rate_limiter: BucketRateLimiter::new(vec![]),
|
||||
rate_limit_ip_subnet: 64,
|
||||
ip_allowlist_check_enabled: true,
|
||||
},
|
||||
require_client_ip: false,
|
||||
handshake_timeout: Duration::from_secs(10),
|
||||
|
||||
@@ -224,6 +224,10 @@ struct ProxyCliArgs {
|
||||
/// Whether to retry the wake_compute request
|
||||
#[clap(long, default_value = config::RetryConfig::WAKE_COMPUTE_DEFAULT_VALUES)]
|
||||
wake_compute_retry: String,
|
||||
|
||||
/// Configure if this is a private access proxy for the POC: In that case the proxy will ignore the IP allowlist
|
||||
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
|
||||
is_private_access_proxy: bool,
|
||||
}
|
||||
|
||||
#[derive(clap::Args, Clone, Copy, Debug)]
|
||||
@@ -682,6 +686,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
rate_limiter_enabled: args.auth_rate_limit_enabled,
|
||||
rate_limiter: AuthRateLimiter::new(args.auth_rate_limit.clone()),
|
||||
rate_limit_ip_subnet: args.auth_rate_limit_ip_subnet,
|
||||
ip_allowlist_check_enabled: !args.is_private_access_proxy,
|
||||
};
|
||||
|
||||
let config = Box::leak(Box::new(ProxyConfig {
|
||||
|
||||
2
proxy/src/cache/endpoints.rs
vendored
2
proxy/src/cache/endpoints.rs
vendored
@@ -242,6 +242,6 @@ mod tests {
|
||||
#[test]
|
||||
fn test() {
|
||||
let s = "{\"branch_created\":null,\"endpoint_created\":{\"endpoint_id\":\"ep-rapid-thunder-w0qqw2q9\"},\"project_created\":null,\"type\":\"endpoint_created\"}";
|
||||
let _: ControlPlaneEventKey = serde_json::from_str(s).unwrap();
|
||||
serde_json::from_str::<ControlPlaneEventKey>(s).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -64,6 +64,7 @@ pub struct AuthenticationConfig {
|
||||
pub rate_limiter_enabled: bool,
|
||||
pub rate_limiter: AuthRateLimiter,
|
||||
pub rate_limit_ip_subnet: u8,
|
||||
pub ip_allowlist_check_enabled: bool,
|
||||
}
|
||||
|
||||
impl TlsConfig {
|
||||
|
||||
@@ -395,7 +395,7 @@ mod tests {
|
||||
}
|
||||
}
|
||||
});
|
||||
let _: KickSession<'_> = serde_json::from_str(&json.to_string())?;
|
||||
serde_json::from_str::<KickSession<'_>>(&json.to_string())?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -403,7 +403,7 @@ mod tests {
|
||||
#[test]
|
||||
fn parse_db_info() -> anyhow::Result<()> {
|
||||
// with password
|
||||
let _: DatabaseInfo = serde_json::from_value(json!({
|
||||
serde_json::from_value::<DatabaseInfo>(json!({
|
||||
"host": "localhost",
|
||||
"port": 5432,
|
||||
"dbname": "postgres",
|
||||
@@ -413,7 +413,7 @@ mod tests {
|
||||
}))?;
|
||||
|
||||
// without password
|
||||
let _: DatabaseInfo = serde_json::from_value(json!({
|
||||
serde_json::from_value::<DatabaseInfo>(json!({
|
||||
"host": "localhost",
|
||||
"port": 5432,
|
||||
"dbname": "postgres",
|
||||
@@ -422,7 +422,7 @@ mod tests {
|
||||
}))?;
|
||||
|
||||
// new field (forward compatibility)
|
||||
let _: DatabaseInfo = serde_json::from_value(json!({
|
||||
serde_json::from_value::<DatabaseInfo>(json!({
|
||||
"host": "localhost",
|
||||
"port": 5432,
|
||||
"dbname": "postgres",
|
||||
@@ -441,7 +441,7 @@ mod tests {
|
||||
"address": "0.0.0.0",
|
||||
"aux": dummy_aux(),
|
||||
});
|
||||
let _: WakeCompute = serde_json::from_str(&json.to_string())?;
|
||||
serde_json::from_str::<WakeCompute>(&json.to_string())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -451,18 +451,18 @@ mod tests {
|
||||
let json = json!({
|
||||
"role_secret": "secret",
|
||||
});
|
||||
let _: GetRoleSecret = serde_json::from_str(&json.to_string())?;
|
||||
serde_json::from_str::<GetRoleSecret>(&json.to_string())?;
|
||||
let json = json!({
|
||||
"role_secret": "secret",
|
||||
"allowed_ips": ["8.8.8.8"],
|
||||
});
|
||||
let _: GetRoleSecret = serde_json::from_str(&json.to_string())?;
|
||||
serde_json::from_str::<GetRoleSecret>(&json.to_string())?;
|
||||
let json = json!({
|
||||
"role_secret": "secret",
|
||||
"allowed_ips": ["8.8.8.8"],
|
||||
"project_id": "project",
|
||||
});
|
||||
let _: GetRoleSecret = serde_json::from_str(&json.to_string())?;
|
||||
serde_json::from_str::<GetRoleSecret>(&json.to_string())?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -78,7 +78,7 @@ pub(crate) type ComputeReady = DatabaseInfo;
|
||||
|
||||
// TODO: replace with an http-based protocol.
|
||||
struct MgmtHandler;
|
||||
#[async_trait::async_trait]
|
||||
|
||||
impl postgres_backend::Handler<tokio::net::TcpStream> for MgmtHandler {
|
||||
async fn process_query(
|
||||
&mut self,
|
||||
|
||||
@@ -6,7 +6,7 @@ use pq_proto::StartupMessageParams;
|
||||
use smol_str::SmolStr;
|
||||
use std::net::IpAddr;
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::{field::display, info, info_span, Span};
|
||||
use tracing::{debug, field::display, info, info_span, Span};
|
||||
use try_lock::TryLock;
|
||||
use uuid::Uuid;
|
||||
|
||||
@@ -362,7 +362,9 @@ impl RequestMonitoringInner {
|
||||
});
|
||||
}
|
||||
if let Some(tx) = self.sender.take() {
|
||||
let _: Result<(), _> = tx.send(RequestData::from(&*self));
|
||||
tx.send(RequestData::from(&*self))
|
||||
.inspect_err(|e| debug!("tx send failed: {e}"))
|
||||
.ok();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -371,7 +373,9 @@ impl RequestMonitoringInner {
|
||||
// Here we log the length of the session.
|
||||
self.disconnect_timestamp = Some(Utc::now());
|
||||
if let Some(tx) = self.disconnect_sender.take() {
|
||||
let _: Result<(), _> = tx.send(RequestData::from(&*self));
|
||||
tx.send(RequestData::from(&*self))
|
||||
.inspect_err(|e| debug!("tx send failed: {e}"))
|
||||
.ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -290,7 +290,7 @@ async fn worker_inner(
|
||||
}
|
||||
|
||||
if !w.flushed_row_groups().is_empty() {
|
||||
let _: Writer<BytesMut> = upload_parquet(w, len, &storage).await?;
|
||||
let _rtchk: Writer<BytesMut> = upload_parquet(w, len, &storage).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
#![deny(
|
||||
deprecated,
|
||||
future_incompatible,
|
||||
// TODO: consider let_underscore
|
||||
let_underscore,
|
||||
nonstandard_style,
|
||||
rust_2024_compatibility
|
||||
)]
|
||||
|
||||
@@ -268,7 +268,7 @@ async fn keepalive_is_inherited() -> anyhow::Result<()> {
|
||||
anyhow::Ok(keepalive)
|
||||
});
|
||||
|
||||
let _ = TcpStream::connect(("127.0.0.1", port)).await?;
|
||||
TcpStream::connect(("127.0.0.1", port)).await?;
|
||||
assert!(t.await??, "keepalive should be inherited");
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -6,7 +6,7 @@ use redis::{
|
||||
ConnectionInfo, IntoConnectionInfo, RedisConnectionInfo, RedisResult,
|
||||
};
|
||||
use tokio::task::JoinHandle;
|
||||
use tracing::{error, info};
|
||||
use tracing::{debug, error, info};
|
||||
|
||||
use super::elasticache::CredentialsProvider;
|
||||
|
||||
@@ -109,7 +109,10 @@ impl ConnectionWithCredentialsProvider {
|
||||
let credentials_provider = credentials_provider.clone();
|
||||
let con2 = con.clone();
|
||||
let f = tokio::spawn(async move {
|
||||
let _ = Self::keep_connection(con2, credentials_provider).await;
|
||||
Self::keep_connection(con2, credentials_provider)
|
||||
.await
|
||||
.inspect_err(|e| debug!("keep_connection failed: {e}"))
|
||||
.ok();
|
||||
});
|
||||
self.refresh_token_task = Some(f);
|
||||
}
|
||||
|
||||
@@ -50,7 +50,9 @@ impl PoolingBackend {
|
||||
.as_ref()
|
||||
.map(|()| user_info.clone());
|
||||
let (allowed_ips, maybe_secret) = backend.get_allowed_ips_and_secret(ctx).await?;
|
||||
if !check_peer_addr_is_in_list(&ctx.peer_addr(), &allowed_ips) {
|
||||
if config.ip_allowlist_check_enabled
|
||||
&& !check_peer_addr_is_in_list(&ctx.peer_addr(), &allowed_ips)
|
||||
{
|
||||
return Err(AuthError::ip_address_not_allowed(ctx.peer_addr()));
|
||||
}
|
||||
if !self
|
||||
|
||||
@@ -12,6 +12,7 @@ use std::{io, task};
|
||||
use thiserror::Error;
|
||||
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
|
||||
use tokio_rustls::server::TlsStream;
|
||||
use tracing::debug;
|
||||
|
||||
/// Stream wrapper which implements libpq's protocol.
|
||||
///
|
||||
@@ -138,9 +139,10 @@ impl<S: AsyncWrite + Unpin> PqStream<S> {
|
||||
);
|
||||
|
||||
// already error case, ignore client IO error
|
||||
let _: Result<_, std::io::Error> = self
|
||||
.write_message(&BeMessage::ErrorResponse(msg, None))
|
||||
.await;
|
||||
self.write_message(&BeMessage::ErrorResponse(msg, None))
|
||||
.await
|
||||
.inspect_err(|e| debug!("write_message failed: {e}"))
|
||||
.ok();
|
||||
|
||||
Err(ReportedError {
|
||||
source: anyhow::anyhow!(msg),
|
||||
@@ -164,9 +166,10 @@ impl<S: AsyncWrite + Unpin> PqStream<S> {
|
||||
);
|
||||
|
||||
// already error case, ignore client IO error
|
||||
let _: Result<_, std::io::Error> = self
|
||||
.write_message(&BeMessage::ErrorResponse(&msg, None))
|
||||
.await;
|
||||
self.write_message(&BeMessage::ErrorResponse(&msg, None))
|
||||
.await
|
||||
.inspect_err(|e| debug!("write_message failed: {e}"))
|
||||
.ok();
|
||||
|
||||
Err(ReportedError {
|
||||
source: anyhow::anyhow!(error),
|
||||
|
||||
@@ -57,7 +57,7 @@ mod tests {
|
||||
fn bad_url() {
|
||||
let url = "test:foobar";
|
||||
url.parse::<url::Url>().expect("unexpected parsing failure");
|
||||
let _ = url.parse::<ApiUrl>().expect_err("should not parse");
|
||||
url.parse::<ApiUrl>().expect_err("should not parse");
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
//! protocol commands.
|
||||
|
||||
use anyhow::Context;
|
||||
use std::future::Future;
|
||||
use std::str::{self, FromStr};
|
||||
use std::sync::Arc;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
@@ -95,7 +96,6 @@ fn cmd_to_string(cmd: &SafekeeperPostgresCommand) -> &str {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
for SafekeeperPostgresHandler
|
||||
{
|
||||
@@ -197,49 +197,51 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn process_query(
|
||||
fn process_query(
|
||||
&mut self,
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
query_string: &str,
|
||||
) -> Result<(), QueryError> {
|
||||
if query_string
|
||||
.to_ascii_lowercase()
|
||||
.starts_with("set datestyle to ")
|
||||
{
|
||||
// important for debug because psycopg2 executes "SET datestyle TO 'ISO'" on connect
|
||||
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let cmd = parse_cmd(query_string)?;
|
||||
let cmd_str = cmd_to_string(&cmd);
|
||||
|
||||
let _guard = PG_QUERIES_GAUGE.with_label_values(&[cmd_str]).guard();
|
||||
|
||||
info!("got query {:?}", query_string);
|
||||
|
||||
let tenant_id = self.tenant_id.context("tenantid is required")?;
|
||||
let timeline_id = self.timeline_id.context("timelineid is required")?;
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
self.ttid = TenantTimelineId::new(tenant_id, timeline_id);
|
||||
|
||||
match cmd {
|
||||
SafekeeperPostgresCommand::StartWalPush => {
|
||||
self.handle_start_wal_push(pgb)
|
||||
.instrument(info_span!("WAL receiver"))
|
||||
.await
|
||||
) -> impl Future<Output = Result<(), QueryError>> {
|
||||
Box::pin(async move {
|
||||
if query_string
|
||||
.to_ascii_lowercase()
|
||||
.starts_with("set datestyle to ")
|
||||
{
|
||||
// important for debug because psycopg2 executes "SET datestyle TO 'ISO'" on connect
|
||||
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
return Ok(());
|
||||
}
|
||||
SafekeeperPostgresCommand::StartReplication { start_lsn, term } => {
|
||||
self.handle_start_replication(pgb, start_lsn, term)
|
||||
.instrument(info_span!("WAL sender"))
|
||||
.await
|
||||
|
||||
let cmd = parse_cmd(query_string)?;
|
||||
let cmd_str = cmd_to_string(&cmd);
|
||||
|
||||
let _guard = PG_QUERIES_GAUGE.with_label_values(&[cmd_str]).guard();
|
||||
|
||||
info!("got query {:?}", query_string);
|
||||
|
||||
let tenant_id = self.tenant_id.context("tenantid is required")?;
|
||||
let timeline_id = self.timeline_id.context("timelineid is required")?;
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
self.ttid = TenantTimelineId::new(tenant_id, timeline_id);
|
||||
|
||||
match cmd {
|
||||
SafekeeperPostgresCommand::StartWalPush => {
|
||||
self.handle_start_wal_push(pgb)
|
||||
.instrument(info_span!("WAL receiver"))
|
||||
.await
|
||||
}
|
||||
SafekeeperPostgresCommand::StartReplication { start_lsn, term } => {
|
||||
self.handle_start_replication(pgb, start_lsn, term)
|
||||
.instrument(info_span!("WAL sender"))
|
||||
.await
|
||||
}
|
||||
SafekeeperPostgresCommand::IdentifySystem => self.handle_identify_system(pgb).await,
|
||||
SafekeeperPostgresCommand::TimelineStatus => self.handle_timeline_status(pgb).await,
|
||||
SafekeeperPostgresCommand::JSONCtrl { ref cmd } => {
|
||||
handle_json_ctrl(self, pgb, cmd).await
|
||||
}
|
||||
}
|
||||
SafekeeperPostgresCommand::IdentifySystem => self.handle_identify_system(pgb).await,
|
||||
SafekeeperPostgresCommand::TimelineStatus => self.handle_timeline_status(pgb).await,
|
||||
SafekeeperPostgresCommand::JSONCtrl { ref cmd } => {
|
||||
handle_json_ctrl(self, pgb, cmd).await
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -10,8 +10,4 @@ pytest_plugins = (
|
||||
"fixtures.compare_fixtures",
|
||||
"fixtures.slow",
|
||||
"fixtures.flaky",
|
||||
"fixtures.shared_fixtures",
|
||||
"fixtures.function.neon_storage",
|
||||
"fixtures.session.neon_storage",
|
||||
"fixtures.session.s3",
|
||||
)
|
||||
|
||||
@@ -1,48 +0,0 @@
|
||||
from typing import Iterator, Optional, Dict, Any
|
||||
|
||||
import pytest
|
||||
from _pytest.fixtures import FixtureRequest
|
||||
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
from fixtures.shared_fixtures import TTenant, TTimeline
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def tenant_config() -> Dict[str, Any]:
|
||||
return dict()
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def tenant(
|
||||
neon_shared_env: NeonEnv,
|
||||
request: FixtureRequest,
|
||||
tenant_config: Optional[Dict[str, Any]] = None,
|
||||
) -> Iterator[TTenant]:
|
||||
tenant = TTenant(env=neon_shared_env, name=request.node.name, config=tenant_config)
|
||||
yield tenant
|
||||
tenant.shutdown_resources()
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def timeline(
|
||||
tenant: TTenant,
|
||||
) -> Iterator[TTimeline]:
|
||||
timeline = tenant.default_timeline
|
||||
yield timeline
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def exclusive_tenant(
|
||||
neon_env: NeonEnv,
|
||||
request: FixtureRequest,
|
||||
tenant_config: Optional[Dict[str, Any]] = None,
|
||||
) -> Iterator[TTenant]:
|
||||
tenant = TTenant(env=neon_env, name=request.node.name, config=tenant_config)
|
||||
yield tenant
|
||||
tenant.shutdown_resources()
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def exclusive_timeline(
|
||||
exclusive_tenant: TTenant,
|
||||
) -> Iterator[TTimeline]:
|
||||
timeline = exclusive_tenant.default_timeline
|
||||
yield timeline
|
||||
@@ -93,6 +93,7 @@ from fixtures.utils import (
|
||||
allure_add_grafana_links,
|
||||
allure_attach_from_dir,
|
||||
assert_no_errors,
|
||||
get_self_dir,
|
||||
print_gc_result,
|
||||
subprocess_capture,
|
||||
wait_until,
|
||||
@@ -125,6 +126,122 @@ Env = Dict[str, str]
|
||||
DEFAULT_OUTPUT_DIR: str = "test_output"
|
||||
DEFAULT_BRANCH_NAME: str = "main"
|
||||
|
||||
BASE_PORT: int = 15000
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def base_dir() -> Iterator[Path]:
|
||||
# find the base directory (currently this is the git root)
|
||||
base_dir = get_self_dir().parent.parent
|
||||
log.info(f"base_dir is {base_dir}")
|
||||
|
||||
yield base_dir
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def neon_binpath(base_dir: Path, build_type: str) -> Iterator[Path]:
|
||||
if os.getenv("REMOTE_ENV"):
|
||||
# we are in remote env and do not have neon binaries locally
|
||||
# this is the case for benchmarks run on self-hosted runner
|
||||
return
|
||||
|
||||
# Find the neon binaries.
|
||||
if env_neon_bin := os.environ.get("NEON_BIN"):
|
||||
binpath = Path(env_neon_bin)
|
||||
else:
|
||||
binpath = base_dir / "target" / build_type
|
||||
log.info(f"neon_binpath is {binpath}")
|
||||
|
||||
if not (binpath / "pageserver").exists():
|
||||
raise Exception(f"neon binaries not found at '{binpath}'")
|
||||
|
||||
yield binpath
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def pg_distrib_dir(base_dir: Path) -> Iterator[Path]:
|
||||
if env_postgres_bin := os.environ.get("POSTGRES_DISTRIB_DIR"):
|
||||
distrib_dir = Path(env_postgres_bin).resolve()
|
||||
else:
|
||||
distrib_dir = base_dir / "pg_install"
|
||||
|
||||
log.info(f"pg_distrib_dir is {distrib_dir}")
|
||||
yield distrib_dir
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def top_output_dir(base_dir: Path) -> Iterator[Path]:
|
||||
# Compute the top-level directory for all tests.
|
||||
if env_test_output := os.environ.get("TEST_OUTPUT"):
|
||||
output_dir = Path(env_test_output).resolve()
|
||||
else:
|
||||
output_dir = base_dir / DEFAULT_OUTPUT_DIR
|
||||
output_dir.mkdir(exist_ok=True)
|
||||
|
||||
log.info(f"top_output_dir is {output_dir}")
|
||||
yield output_dir
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def versioned_pg_distrib_dir(pg_distrib_dir: Path, pg_version: PgVersion) -> Iterator[Path]:
|
||||
versioned_dir = pg_distrib_dir / pg_version.v_prefixed
|
||||
|
||||
psql_bin_path = versioned_dir / "bin/psql"
|
||||
postgres_bin_path = versioned_dir / "bin/postgres"
|
||||
|
||||
if os.getenv("REMOTE_ENV"):
|
||||
# When testing against a remote server, we only need the client binary.
|
||||
if not psql_bin_path.exists():
|
||||
raise Exception(f"psql not found at '{psql_bin_path}'")
|
||||
else:
|
||||
if not postgres_bin_path.exists():
|
||||
raise Exception(f"postgres not found at '{postgres_bin_path}'")
|
||||
|
||||
log.info(f"versioned_pg_distrib_dir is {versioned_dir}")
|
||||
yield versioned_dir
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def neon_api_key() -> str:
|
||||
api_key = os.getenv("NEON_API_KEY")
|
||||
if not api_key:
|
||||
raise AssertionError("Set the NEON_API_KEY environment variable")
|
||||
|
||||
return api_key
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def neon_api_base_url() -> str:
|
||||
return os.getenv("NEON_API_BASE_URL", "https://console-stage.neon.build/api/v2")
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def neon_api(neon_api_key: str, neon_api_base_url: str) -> NeonAPI:
|
||||
return NeonAPI(neon_api_key, neon_api_base_url)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def worker_port_num():
|
||||
return (32768 - BASE_PORT) // int(os.environ.get("PYTEST_XDIST_WORKER_COUNT", "1"))
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def worker_seq_no(worker_id: str) -> int:
|
||||
# worker_id is a pytest-xdist fixture
|
||||
# it can be master or gw<number>
|
||||
# parse it to always get a number
|
||||
if worker_id == "master":
|
||||
return 0
|
||||
assert worker_id.startswith("gw")
|
||||
return int(worker_id[2:])
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def worker_base_port(worker_seq_no: int, worker_port_num: int) -> int:
|
||||
# so we divide ports in ranges of ports
|
||||
# so workers have disjoint set of ports for services
|
||||
return BASE_PORT + worker_seq_no * worker_port_num
|
||||
|
||||
|
||||
def get_dir_size(path: str) -> int:
|
||||
"""Return size in bytes."""
|
||||
@@ -136,6 +253,11 @@ def get_dir_size(path: str) -> int:
|
||||
return totalbytes
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def port_distributor(worker_base_port: int, worker_port_num: int) -> PortDistributor:
|
||||
return PortDistributor(base_port=worker_base_port, port_number=worker_port_num)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def default_broker(
|
||||
port_distributor: PortDistributor,
|
||||
@@ -151,6 +273,18 @@ def default_broker(
|
||||
broker.stop()
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def run_id() -> Iterator[uuid.UUID]:
|
||||
yield uuid.uuid4()
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def mock_s3_server(port_distributor: PortDistributor) -> Iterator[MockS3Server]:
|
||||
mock_s3_server = MockS3Server(port_distributor.get_port())
|
||||
yield mock_s3_server
|
||||
mock_s3_server.kill()
|
||||
|
||||
|
||||
class PgProtocol:
|
||||
"""Reusable connection logic"""
|
||||
|
||||
@@ -350,7 +484,6 @@ class NeonEnvBuilder:
|
||||
safekeeper_extra_opts: Optional[list[str]] = None,
|
||||
storage_controller_port_override: Optional[int] = None,
|
||||
pageserver_io_buffer_alignment: Optional[int] = None,
|
||||
shared: Optional[bool] = False,
|
||||
):
|
||||
self.repo_dir = repo_dir
|
||||
self.rust_log_override = rust_log_override
|
||||
@@ -407,8 +540,8 @@ class NeonEnvBuilder:
|
||||
|
||||
self.pageserver_io_buffer_alignment = pageserver_io_buffer_alignment
|
||||
|
||||
assert (
|
||||
test_name.startswith("test_") or shared
|
||||
assert test_name.startswith(
|
||||
"test_"
|
||||
), "Unexpectedly instantiated from outside a test function"
|
||||
self.test_name = test_name
|
||||
|
||||
@@ -420,10 +553,6 @@ class NeonEnvBuilder:
|
||||
self.env = NeonEnv(self)
|
||||
return self.env
|
||||
|
||||
def start(self):
|
||||
assert self.env is not None, "environment is not already initialized, call init() first"
|
||||
self.env.start()
|
||||
|
||||
def init_start(
|
||||
self,
|
||||
initial_tenant_conf: Optional[Dict[str, Any]] = None,
|
||||
@@ -439,7 +568,7 @@ class NeonEnvBuilder:
|
||||
Configuring pageserver with remote storage is now the default. There will be a warning if pageserver is created without one.
|
||||
"""
|
||||
env = self.init_configs(default_remote_storage_if_missing=default_remote_storage_if_missing)
|
||||
self.start()
|
||||
env.start()
|
||||
|
||||
# Prepare the default branch to start the postgres on later.
|
||||
# Pageserver itself does not create tenants and timelines, until started first and asked via HTTP API.
|
||||
@@ -940,9 +1069,6 @@ class NeonEnv:
|
||||
self.pg_distrib_dir = config.pg_distrib_dir
|
||||
self.endpoint_counter = 0
|
||||
self.storage_controller_config = config.storage_controller_config
|
||||
|
||||
# generate initial tenant ID here instead of letting 'neon init' generate it,
|
||||
# so that we don't need to dig it out of the config file afterwards.
|
||||
self.initial_tenant = config.initial_tenant
|
||||
self.initial_timeline = config.initial_timeline
|
||||
|
||||
@@ -1320,21 +1446,6 @@ def neon_simple_env(
|
||||
yield env
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def neon_endpoint(request: FixtureRequest, neon_shared_env: NeonEnv) -> Endpoint:
|
||||
neon_shared_env.neon_cli.create_branch(request.node.name)
|
||||
ep = neon_shared_env.endpoints.create_start(request.node.name)
|
||||
|
||||
try:
|
||||
yield ep
|
||||
finally:
|
||||
if ep.is_running():
|
||||
try:
|
||||
ep.stop()
|
||||
except BaseException:
|
||||
pass
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def neon_env_builder(
|
||||
pytestconfig: Config,
|
||||
@@ -1403,14 +1514,6 @@ class PageserverPort:
|
||||
http: int
|
||||
|
||||
|
||||
CREATE_TIMELINE_ID_EXTRACTOR: re.Pattern = re.compile( # type: ignore[type-arg]
|
||||
r"^Created timeline '(?P<timeline_id>[^']+)'", re.MULTILINE
|
||||
)
|
||||
TIMELINE_DATA_EXTRACTOR: re.Pattern = re.compile( # type: ignore[type-arg]
|
||||
r"\s?(?P<branch_name>[^\s]+)\s\[(?P<timeline_id>[^\]]+)\]", re.MULTILINE
|
||||
)
|
||||
|
||||
|
||||
class AbstractNeonCli(abc.ABC):
|
||||
"""
|
||||
A typed wrapper around an arbitrary Neon CLI tool.
|
||||
@@ -1639,6 +1742,9 @@ class NeonCli(AbstractNeonCli):
|
||||
tenant_id: Optional[TenantId] = None,
|
||||
timeline_id: Optional[TimelineId] = None,
|
||||
) -> TimelineId:
|
||||
if timeline_id is None:
|
||||
timeline_id = TimelineId.generate()
|
||||
|
||||
cmd = [
|
||||
"timeline",
|
||||
"create",
|
||||
@@ -1646,23 +1752,16 @@ class NeonCli(AbstractNeonCli):
|
||||
new_branch_name,
|
||||
"--tenant-id",
|
||||
str(tenant_id or self.env.initial_tenant),
|
||||
"--timeline-id",
|
||||
str(timeline_id),
|
||||
"--pg-version",
|
||||
self.env.pg_version,
|
||||
]
|
||||
|
||||
if timeline_id is not None:
|
||||
cmd.extend(["--timeline-id", str(timeline_id)])
|
||||
|
||||
res = self.raw_cli(cmd)
|
||||
res.check_returncode()
|
||||
|
||||
matches = CREATE_TIMELINE_ID_EXTRACTOR.search(res.stdout)
|
||||
|
||||
created_timeline_id = None
|
||||
if matches is not None:
|
||||
created_timeline_id = matches.group("timeline_id")
|
||||
|
||||
return TimelineId(str(created_timeline_id))
|
||||
return timeline_id
|
||||
|
||||
def create_branch(
|
||||
self,
|
||||
@@ -1670,12 +1769,17 @@ class NeonCli(AbstractNeonCli):
|
||||
ancestor_branch_name: Optional[str] = None,
|
||||
tenant_id: Optional[TenantId] = None,
|
||||
ancestor_start_lsn: Optional[Lsn] = None,
|
||||
new_timeline_id: Optional[TimelineId] = None,
|
||||
) -> TimelineId:
|
||||
if new_timeline_id is None:
|
||||
new_timeline_id = TimelineId.generate()
|
||||
cmd = [
|
||||
"timeline",
|
||||
"branch",
|
||||
"--branch-name",
|
||||
new_branch_name,
|
||||
"--timeline-id",
|
||||
str(new_timeline_id),
|
||||
"--tenant-id",
|
||||
str(tenant_id or self.env.initial_tenant),
|
||||
]
|
||||
@@ -1687,16 +1791,7 @@ class NeonCli(AbstractNeonCli):
|
||||
res = self.raw_cli(cmd)
|
||||
res.check_returncode()
|
||||
|
||||
matches = CREATE_TIMELINE_ID_EXTRACTOR.search(res.stdout)
|
||||
|
||||
created_timeline_id = None
|
||||
if matches is not None:
|
||||
created_timeline_id = matches.group("timeline_id")
|
||||
|
||||
if created_timeline_id is None:
|
||||
raise Exception("could not find timeline id after `neon timeline create` invocation")
|
||||
else:
|
||||
return TimelineId(str(created_timeline_id))
|
||||
return TimelineId(str(new_timeline_id))
|
||||
|
||||
def list_timelines(self, tenant_id: Optional[TenantId] = None) -> List[Tuple[str, TimelineId]]:
|
||||
"""
|
||||
@@ -1705,6 +1800,9 @@ class NeonCli(AbstractNeonCli):
|
||||
|
||||
# main [b49f7954224a0ad25cc0013ea107b54b]
|
||||
# ┣━ @0/16B5A50: test_cli_branch_list_main [20f98c79111b9015d84452258b7d5540]
|
||||
TIMELINE_DATA_EXTRACTOR: re.Pattern = re.compile( # type: ignore[type-arg]
|
||||
r"\s?(?P<branch_name>[^\s]+)\s\[(?P<timeline_id>[^\]]+)\]", re.MULTILINE
|
||||
)
|
||||
res = self.raw_cli(
|
||||
["timeline", "list", "--tenant-id", str(tenant_id or self.env.initial_tenant)]
|
||||
)
|
||||
@@ -4690,35 +4788,27 @@ def _get_test_dir(request: FixtureRequest, top_output_dir: Path, prefix: str) ->
|
||||
return test_dir
|
||||
|
||||
|
||||
def get_test_output_dir(
|
||||
request: FixtureRequest, top_output_dir: Path, prefix: Optional[str] = None
|
||||
) -> Path:
|
||||
def get_test_output_dir(request: FixtureRequest, top_output_dir: Path) -> Path:
|
||||
"""
|
||||
The working directory for a test.
|
||||
"""
|
||||
return _get_test_dir(request, top_output_dir, prefix or "")
|
||||
return _get_test_dir(request, top_output_dir, "")
|
||||
|
||||
|
||||
def get_test_overlay_dir(
|
||||
request: FixtureRequest, top_output_dir: Path, prefix: Optional[str] = None
|
||||
) -> Path:
|
||||
def get_test_overlay_dir(request: FixtureRequest, top_output_dir: Path) -> Path:
|
||||
"""
|
||||
Directory that contains `upperdir` and `workdir` for overlayfs mounts
|
||||
that a test creates. See `NeonEnvBuilder.overlay_mount`.
|
||||
"""
|
||||
return _get_test_dir(request, top_output_dir, f"overlay-{prefix or ''}")
|
||||
return _get_test_dir(request, top_output_dir, "overlay-")
|
||||
|
||||
|
||||
def get_shared_snapshot_dir_path(
|
||||
top_output_dir: Path, snapshot_name: str, prefix: Optional[str] = None
|
||||
) -> Path:
|
||||
return top_output_dir / f"{prefix or ''}shared-snapshots" / snapshot_name
|
||||
def get_shared_snapshot_dir_path(top_output_dir: Path, snapshot_name: str) -> Path:
|
||||
return top_output_dir / "shared-snapshots" / snapshot_name
|
||||
|
||||
|
||||
def get_test_repo_dir(
|
||||
request: FixtureRequest, top_output_dir: Path, prefix: Optional[str] = None
|
||||
) -> Path:
|
||||
return get_test_output_dir(request, top_output_dir, prefix or "") / "repo"
|
||||
def get_test_repo_dir(request: FixtureRequest, top_output_dir: Path) -> Path:
|
||||
return get_test_output_dir(request, top_output_dir) / "repo"
|
||||
|
||||
|
||||
def pytest_addoption(parser: Parser):
|
||||
@@ -5083,7 +5173,7 @@ def tenant_get_shards(
|
||||
return [(TenantShardId(tenant_id, 0, 0), override_pageserver or env.pageserver)]
|
||||
|
||||
|
||||
def wait_replica_caughtup(primary: PgProtocol, secondary: PgProtocol):
|
||||
def wait_replica_caughtup(primary: Endpoint, secondary: Endpoint):
|
||||
primary_lsn = Lsn(
|
||||
primary.safe_psql_scalar("SELECT pg_current_wal_flush_lsn()", log_query=False)
|
||||
)
|
||||
|
||||
@@ -14,60 +14,32 @@ Dynamically parametrize tests by different parameters
|
||||
"""
|
||||
|
||||
|
||||
def get_pgversions():
|
||||
if (v := os.getenv("DEFAULT_PG_VERSION")) is None:
|
||||
pg_versions = [version for version in PgVersion if version != PgVersion.NOT_SET]
|
||||
else:
|
||||
pg_versions = [PgVersion(v)]
|
||||
|
||||
return pg_versions
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def pg_version() -> Optional[PgVersion]:
|
||||
return None
|
||||
|
||||
|
||||
@pytest.fixture(
|
||||
scope="session",
|
||||
autouse=True,
|
||||
params=get_pgversions(),
|
||||
ids=lambda v: f"pg{v}",
|
||||
)
|
||||
def pg_version(request) -> Optional[PgVersion]:
|
||||
return request.param
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def build_type() -> Optional[str]:
|
||||
return None
|
||||
|
||||
|
||||
def get_buildtypes():
|
||||
if (bt := os.getenv("BUILD_TYPE")) is None:
|
||||
build_types = ["debug", "release"]
|
||||
else:
|
||||
build_types = [bt.lower()]
|
||||
|
||||
return build_types
|
||||
|
||||
|
||||
@pytest.fixture(
|
||||
scope="session",
|
||||
autouse=True,
|
||||
params=get_buildtypes(),
|
||||
ids=lambda t: f"{t}",
|
||||
)
|
||||
def build_type(request) -> Optional[str]:
|
||||
return request.param
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def platform() -> Optional[str]:
|
||||
return None
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def pageserver_virtual_file_io_engine() -> Optional[str]:
|
||||
return os.getenv("PAGESERVER_VIRTUAL_FILE_IO_ENGINE")
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def pageserver_io_buffer_alignment() -> Optional[int]:
|
||||
return None
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def pageserver_aux_file_policy() -> Optional[AuxFileStore]:
|
||||
return None
|
||||
|
||||
@@ -81,12 +53,26 @@ def get_pageserver_default_tenant_config_compaction_algorithm() -> Optional[Dict
|
||||
return v
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def pageserver_default_tenant_config_compaction_algorithm() -> Optional[Dict[str, Any]]:
|
||||
return get_pageserver_default_tenant_config_compaction_algorithm()
|
||||
|
||||
|
||||
def pytest_generate_tests(metafunc: Metafunc):
|
||||
if (bt := os.getenv("BUILD_TYPE")) is None:
|
||||
build_types = ["debug", "release"]
|
||||
else:
|
||||
build_types = [bt.lower()]
|
||||
|
||||
metafunc.parametrize("build_type", build_types)
|
||||
|
||||
if (v := os.getenv("DEFAULT_PG_VERSION")) is None:
|
||||
pg_versions = [version for version in PgVersion if version != PgVersion.NOT_SET]
|
||||
else:
|
||||
pg_versions = [PgVersion(v)]
|
||||
|
||||
metafunc.parametrize("pg_version", pg_versions, ids=map(lambda v: f"pg{v}", pg_versions))
|
||||
|
||||
# A hacky way to parametrize tests only for `pageserver_virtual_file_io_engine=std-fs`
|
||||
# And do not change test name for default `pageserver_virtual_file_io_engine=tokio-epoll-uring` to keep tests statistics
|
||||
if (io_engine := os.getenv("PAGESERVER_VIRTUAL_FILE_IO_ENGINE", "")) not in (
|
||||
@@ -103,7 +89,6 @@ def pytest_generate_tests(metafunc: Metafunc):
|
||||
"pageserver_default_tenant_config_compaction_algorithm",
|
||||
[explicit_default],
|
||||
ids=[explicit_default["kind"]],
|
||||
scope="session",
|
||||
)
|
||||
|
||||
# For performance tests, parametrize also by platform
|
||||
|
||||
@@ -1,303 +0,0 @@
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Iterator, Optional, cast
|
||||
|
||||
import pytest
|
||||
from _pytest.config import Config
|
||||
from _pytest.fixtures import FixtureRequest
|
||||
|
||||
from fixtures import overlayfs
|
||||
from fixtures.broker import NeonBroker
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_api import NeonAPI
|
||||
from fixtures.neon_fixtures import (
|
||||
DEFAULT_OUTPUT_DIR,
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
get_test_output_dir,
|
||||
get_test_overlay_dir,
|
||||
get_test_repo_dir,
|
||||
)
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.port_distributor import PortDistributor
|
||||
from fixtures.remote_storage import MockS3Server
|
||||
from fixtures.utils import AuxFileStore, allure_attach_from_dir, get_self_dir
|
||||
|
||||
BASE_PORT: int = 15000
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def base_dir() -> Iterator[Path]:
|
||||
# find the base directory (currently this is the git root)
|
||||
base_dir = get_self_dir().parent.parent
|
||||
log.info(f"base_dir is {base_dir}")
|
||||
|
||||
yield base_dir
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def neon_binpath(base_dir: Path, build_type: str) -> Iterator[Path]:
|
||||
if os.getenv("REMOTE_ENV"):
|
||||
# we are in remote env and do not have neon binaries locally
|
||||
# this is the case for benchmarks run on self-hosted runner
|
||||
return
|
||||
|
||||
# Find the neon binaries.
|
||||
if env_neon_bin := os.environ.get("NEON_BIN"):
|
||||
binpath = Path(env_neon_bin)
|
||||
else:
|
||||
binpath = base_dir / "target" / build_type
|
||||
log.info(f"neon_binpath is {binpath}")
|
||||
|
||||
if not (binpath / "pageserver").exists():
|
||||
raise Exception(f"neon binaries not found at '{binpath}'")
|
||||
|
||||
yield binpath
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def pg_distrib_dir(base_dir: Path) -> Iterator[Path]:
|
||||
if env_postgres_bin := os.environ.get("POSTGRES_DISTRIB_DIR"):
|
||||
distrib_dir = Path(env_postgres_bin).resolve()
|
||||
else:
|
||||
distrib_dir = base_dir / "pg_install"
|
||||
|
||||
log.info(f"pg_distrib_dir is {distrib_dir}")
|
||||
yield distrib_dir
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def top_output_dir(base_dir: Path) -> Iterator[Path]:
|
||||
# Compute the top-level directory for all tests.
|
||||
if env_test_output := os.environ.get("TEST_OUTPUT"):
|
||||
output_dir = Path(env_test_output).resolve()
|
||||
else:
|
||||
output_dir = base_dir / DEFAULT_OUTPUT_DIR
|
||||
output_dir.mkdir(exist_ok=True)
|
||||
|
||||
log.info(f"top_output_dir is {output_dir}")
|
||||
yield output_dir
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def versioned_pg_distrib_dir(pg_distrib_dir: Path, pg_version: PgVersion) -> Iterator[Path]:
|
||||
versioned_dir = pg_distrib_dir / pg_version.v_prefixed
|
||||
|
||||
psql_bin_path = versioned_dir / "bin/psql"
|
||||
postgres_bin_path = versioned_dir / "bin/postgres"
|
||||
|
||||
if os.getenv("REMOTE_ENV"):
|
||||
# When testing against a remote server, we only need the client binary.
|
||||
if not psql_bin_path.exists():
|
||||
raise Exception(f"psql not found at '{psql_bin_path}'")
|
||||
else:
|
||||
if not postgres_bin_path.exists():
|
||||
raise Exception(f"postgres not found at '{postgres_bin_path}'")
|
||||
|
||||
log.info(f"versioned_pg_distrib_dir is {versioned_dir}")
|
||||
yield versioned_dir
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def neon_api_key() -> str:
|
||||
api_key = os.getenv("NEON_API_KEY")
|
||||
if not api_key:
|
||||
raise AssertionError("Set the NEON_API_KEY environment variable")
|
||||
|
||||
return api_key
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def neon_api_base_url() -> str:
|
||||
return os.getenv("NEON_API_BASE_URL", "https://console-stage.neon.build/api/v2")
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def neon_api(neon_api_key: str, neon_api_base_url: str) -> NeonAPI:
|
||||
return NeonAPI(neon_api_key, neon_api_base_url)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def worker_port_num():
|
||||
return (32768 - BASE_PORT) // int(os.environ.get("PYTEST_XDIST_WORKER_COUNT", "1"))
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def worker_seq_no(worker_id: str) -> int:
|
||||
# worker_id is a pytest-xdist fixture
|
||||
# it can be master or gw<number>
|
||||
# parse it to always get a number
|
||||
if worker_id == "master":
|
||||
return 0
|
||||
assert worker_id.startswith("gw")
|
||||
return int(worker_id[2:])
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def worker_base_port(worker_seq_no: int, worker_port_num: int) -> int:
|
||||
# so we divide ports in ranges of ports
|
||||
# so workers have disjoint set of ports for services
|
||||
return BASE_PORT + worker_seq_no * worker_port_num
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def port_distributor(worker_base_port: int, worker_port_num: int) -> PortDistributor:
|
||||
return PortDistributor(base_port=worker_base_port, port_number=worker_port_num)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def shared_broker(
|
||||
port_distributor: PortDistributor,
|
||||
shared_test_output_dir: Path,
|
||||
neon_binpath: Path,
|
||||
) -> Iterator[NeonBroker]:
|
||||
# multiple pytest sessions could get launched in parallel, get them different ports/datadirs
|
||||
client_port = port_distributor.get_port()
|
||||
broker_logfile = shared_test_output_dir / "repo" / "storage_broker.log"
|
||||
|
||||
broker = NeonBroker(logfile=broker_logfile, port=client_port, neon_binpath=neon_binpath)
|
||||
yield broker
|
||||
broker.stop()
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def run_id() -> Iterator[uuid.UUID]:
|
||||
yield uuid.uuid4()
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def neon_shared_env(
|
||||
pytestconfig: Config,
|
||||
port_distributor: PortDistributor,
|
||||
mock_s3_server: MockS3Server,
|
||||
shared_broker: NeonBroker,
|
||||
run_id: uuid.UUID,
|
||||
top_output_dir: Path,
|
||||
shared_test_output_dir: Path,
|
||||
neon_binpath: Path,
|
||||
build_type: str,
|
||||
pg_distrib_dir: Path,
|
||||
pg_version: PgVersion,
|
||||
pageserver_virtual_file_io_engine: str,
|
||||
pageserver_aux_file_policy: Optional[AuxFileStore],
|
||||
pageserver_default_tenant_config_compaction_algorithm: Optional[Dict[str, Any]],
|
||||
pageserver_io_buffer_alignment: Optional[int],
|
||||
request: FixtureRequest,
|
||||
) -> Iterator[NeonEnv]:
|
||||
"""
|
||||
Simple Neon environment, with no authentication and no safekeepers.
|
||||
|
||||
This fixture will use RemoteStorageKind.LOCAL_FS with pageserver.
|
||||
"""
|
||||
|
||||
prefix = f"shared[{build_type}-{pg_version.v_prefixed}]-"
|
||||
|
||||
# Create the environment in the per-test output directory
|
||||
repo_dir = get_test_repo_dir(request, top_output_dir, prefix)
|
||||
|
||||
with NeonEnvBuilder(
|
||||
top_output_dir=top_output_dir,
|
||||
repo_dir=repo_dir,
|
||||
port_distributor=port_distributor,
|
||||
broker=shared_broker,
|
||||
mock_s3_server=mock_s3_server,
|
||||
neon_binpath=neon_binpath,
|
||||
pg_distrib_dir=pg_distrib_dir,
|
||||
pg_version=pg_version,
|
||||
run_id=run_id,
|
||||
preserve_database_files=cast(bool, pytestconfig.getoption("--preserve-database-files")),
|
||||
test_name=f"{prefix}{request.node.name}",
|
||||
test_output_dir=shared_test_output_dir,
|
||||
pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine,
|
||||
pageserver_aux_file_policy=pageserver_aux_file_policy,
|
||||
pageserver_default_tenant_config_compaction_algorithm=pageserver_default_tenant_config_compaction_algorithm,
|
||||
pageserver_io_buffer_alignment=pageserver_io_buffer_alignment,
|
||||
shared=True,
|
||||
) as builder:
|
||||
env = builder.init_start()
|
||||
|
||||
yield env
|
||||
|
||||
|
||||
# This is autouse, so the test output directory always gets created, even
|
||||
# if a test doesn't put anything there.
|
||||
#
|
||||
# NB: we request the overlay dir fixture so the fixture does its cleanups
|
||||
@pytest.fixture(scope="session")
|
||||
def shared_test_output_dir(
|
||||
request: FixtureRequest,
|
||||
pg_version: PgVersion,
|
||||
build_type: str,
|
||||
top_output_dir: Path,
|
||||
shared_test_overlay_dir: Path,
|
||||
) -> Iterator[Path]:
|
||||
"""Create the working directory for shared tests."""
|
||||
|
||||
prefix = f"shared[{build_type}-{pg_version.v_prefixed}]-"
|
||||
|
||||
# one directory per test
|
||||
test_dir = get_test_output_dir(request, top_output_dir, prefix)
|
||||
|
||||
log.info(f"test_output_dir is {test_dir}")
|
||||
shutil.rmtree(test_dir, ignore_errors=True)
|
||||
test_dir.mkdir()
|
||||
|
||||
yield test_dir
|
||||
|
||||
# Allure artifacts creation might involve the creation of `.tar.zst` archives,
|
||||
# which aren't going to be used if Allure results collection is not enabled
|
||||
# (i.e. --alluredir is not set).
|
||||
# Skip `allure_attach_from_dir` in this case
|
||||
if not request.config.getoption("--alluredir"):
|
||||
return
|
||||
|
||||
preserve_database_files = False
|
||||
for k, v in request.node.user_properties:
|
||||
# NB: the neon_env_builder fixture uses this fixture (test_output_dir).
|
||||
# So, neon_env_builder's cleanup runs before here.
|
||||
# The cleanup propagates NeonEnvBuilder.preserve_database_files into this user property.
|
||||
if k == "preserve_database_files":
|
||||
assert isinstance(v, bool)
|
||||
preserve_database_files = v
|
||||
|
||||
allure_attach_from_dir(test_dir, preserve_database_files)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def shared_test_overlay_dir(request: FixtureRequest, top_output_dir: Path) -> Optional[Path]:
|
||||
"""
|
||||
Idempotently create a test's overlayfs mount state directory.
|
||||
If the functionality isn't enabled via env var, returns None.
|
||||
|
||||
The procedure cleans up after previous runs that were aborted (e.g. due to Ctrl-C, OOM kills, etc).
|
||||
"""
|
||||
|
||||
if os.getenv("NEON_ENV_BUILDER_USE_OVERLAYFS_FOR_SNAPSHOTS") is None:
|
||||
return None
|
||||
|
||||
overlay_dir = get_test_overlay_dir(request, top_output_dir)
|
||||
log.info(f"test_overlay_dir is {overlay_dir}")
|
||||
|
||||
overlay_dir.mkdir(exist_ok=True)
|
||||
# unmount stale overlayfs mounts which subdirectories of `overlay_dir/*` as the overlayfs `upperdir` and `workdir`
|
||||
for mountpoint in overlayfs.iter_mounts_beneath(get_test_output_dir(request, top_output_dir)):
|
||||
cmd = ["sudo", "umount", str(mountpoint)]
|
||||
log.info(
|
||||
f"Unmounting stale overlayfs mount probably created during earlier test run: {cmd}"
|
||||
)
|
||||
subprocess.run(cmd, capture_output=True, check=True)
|
||||
# the overlayfs `workdir`` is owned by `root`, shutil.rmtree won't work.
|
||||
cmd = ["sudo", "rm", "-rf", str(overlay_dir)]
|
||||
subprocess.run(cmd, capture_output=True, check=True)
|
||||
|
||||
overlay_dir.mkdir()
|
||||
|
||||
return overlay_dir
|
||||
|
||||
# no need to clean up anything: on clean shutdown,
|
||||
# NeonEnvBuilder.overlay_cleanup_teardown takes care of cleanup
|
||||
# and on unclean shutdown, this function will take care of it
|
||||
# on the next test run
|
||||
@@ -1,13 +0,0 @@
|
||||
from typing import Iterator
|
||||
|
||||
import pytest
|
||||
|
||||
from fixtures.port_distributor import PortDistributor
|
||||
from fixtures.remote_storage import MockS3Server
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def mock_s3_server(port_distributor: PortDistributor) -> Iterator[MockS3Server]:
|
||||
mock_s3_server = MockS3Server(port_distributor.get_port())
|
||||
yield mock_s3_server
|
||||
mock_s3_server.kill()
|
||||
@@ -1,348 +0,0 @@
|
||||
from functools import partial
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional, cast, List, Dict
|
||||
|
||||
import pytest
|
||||
|
||||
from fixtures.common_types import Lsn, TenantId, TimelineId
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import Endpoint, NeonEnv, PgProtocol, DEFAULT_BRANCH_NAME, tenant_get_shards, \
|
||||
check_restored_datadir_content
|
||||
from fixtures.pageserver.utils import wait_for_last_record_lsn
|
||||
from fixtures.safekeeper.utils import are_walreceivers_absent
|
||||
from fixtures.utils import wait_until
|
||||
|
||||
"""
|
||||
In this file most important resources are exposed as function-level fixtures
|
||||
that depend on session-level resources like pageservers and safekeepers.
|
||||
|
||||
The main rationale here is that we don't need to initialize a new SK/PS/etc
|
||||
every time we want to test something that has branching: we can just as well
|
||||
reuse still-available PageServers from other tests of the same kind.
|
||||
"""
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def shared_storage_repo_path(build_type: str, base_dir: Path) -> Path:
|
||||
return base_dir / f"shared[{build_type}]" / "repo"
|
||||
|
||||
|
||||
class TEndpoint(PgProtocol):
|
||||
def clear_shared_buffers(self, cursor: Optional[Any] = None):
|
||||
"""
|
||||
Best-effort way to clear postgres buffers. Pinned buffers will not be 'cleared.'
|
||||
|
||||
Might also clear LFC.
|
||||
"""
|
||||
if cursor is not None:
|
||||
cursor.execute("select clear_buffer_cache()")
|
||||
else:
|
||||
self.safe_psql("select clear_buffer_cache()")
|
||||
|
||||
_TEndpoint = Endpoint
|
||||
|
||||
|
||||
class TTimeline:
|
||||
__primary: Optional[_TEndpoint]
|
||||
__secondary: Optional[_TEndpoint]
|
||||
|
||||
def __init__(self, timeline_id: TimelineId, tenant: "TTenant", name: str):
|
||||
self.id = timeline_id
|
||||
self.tenant = tenant
|
||||
self.name = name
|
||||
self.__primary = None
|
||||
self.__secondary = None
|
||||
|
||||
@property
|
||||
def primary(self) -> TEndpoint:
|
||||
if self.__primary is None:
|
||||
self.__primary = cast(_TEndpoint, self.tenant.create_endpoint(
|
||||
name=self._get_full_endpoint_name("primary"),
|
||||
timeline_id=self.id,
|
||||
primary=True,
|
||||
running=True,
|
||||
))
|
||||
|
||||
return cast(TEndpoint, self.__primary)
|
||||
|
||||
def primary_with_config(self, config_lines: List[str], reboot: bool = False):
|
||||
if self.__primary is None:
|
||||
self.__primary = cast(_TEndpoint, self.tenant.create_endpoint(
|
||||
name=self._get_full_endpoint_name("primary"),
|
||||
timeline_id=self.id,
|
||||
primary=True,
|
||||
running=True,
|
||||
config_lines=config_lines,
|
||||
))
|
||||
else:
|
||||
self.__primary.config(config_lines)
|
||||
if reboot:
|
||||
if self.__primary.is_running():
|
||||
self.__primary.stop()
|
||||
self.__primary.start()
|
||||
|
||||
return cast(TTimeline, self.__primary)
|
||||
|
||||
@property
|
||||
def secondary(self) -> TEndpoint:
|
||||
if self.__secondary is None:
|
||||
self.__secondary = cast(_TEndpoint, self.tenant.create_endpoint(
|
||||
name=self._get_full_endpoint_name("secondary"),
|
||||
timeline_id=self.id,
|
||||
primary=False,
|
||||
running=True,
|
||||
))
|
||||
|
||||
return cast(TEndpoint, self.__secondary)
|
||||
|
||||
def secondary_with_config(self, config_lines: List[str], reboot: bool = False) -> TEndpoint:
|
||||
if self.__secondary is None:
|
||||
self.__secondary = cast(_TEndpoint, self.tenant.create_endpoint(
|
||||
name=self._get_full_endpoint_name("secondary"),
|
||||
timeline_id=self.id,
|
||||
primary=False,
|
||||
running=True,
|
||||
config_lines=config_lines,
|
||||
))
|
||||
else:
|
||||
self.__secondary.config(config_lines)
|
||||
if reboot:
|
||||
if self.__secondary.is_running():
|
||||
self.__secondary.stop()
|
||||
self.__secondary.start()
|
||||
|
||||
return cast(TEndpoint, self.__secondary)
|
||||
|
||||
def _get_full_endpoint_name(self, name) -> str:
|
||||
return f"{self.name}.{name}"
|
||||
|
||||
def create_branch(self, name: str, lsn: Optional[Lsn]) -> "TTimeline":
|
||||
return self.tenant.create_timeline(
|
||||
new_name=name,
|
||||
parent_name=self.name,
|
||||
branch_at=lsn,
|
||||
)
|
||||
|
||||
def stop_and_flush(self, endpoint: TEndpoint):
|
||||
self.tenant.stop_endpoint(endpoint)
|
||||
self.tenant.flush_timeline_data(self)
|
||||
|
||||
def checkpoint(self, **kwargs):
|
||||
self.tenant.checkpoint_timeline(self, **kwargs)
|
||||
|
||||
|
||||
class TTenant:
|
||||
"""
|
||||
An object representing a single test case on a shared pageserver.
|
||||
All operations here are safe practically safe.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
env: NeonEnv,
|
||||
name: str,
|
||||
config: Optional[Dict[str, Any]] = None,
|
||||
):
|
||||
self.id = TenantId.generate()
|
||||
self.timelines = []
|
||||
self.timeline_names = {}
|
||||
self.timeline_ids = {}
|
||||
self.name = name
|
||||
|
||||
# publicly inaccessible stuff, used during shutdown
|
||||
self.__endpoints: List[Endpoint] = []
|
||||
self.__env: NeonEnv = env
|
||||
|
||||
env.neon_cli.create_tenant(
|
||||
tenant_id=self.id,
|
||||
set_default=False,
|
||||
conf=config
|
||||
)
|
||||
|
||||
self.first_timeline_id = env.neon_cli.create_branch(
|
||||
new_branch_name=f"{self.name}:{DEFAULT_BRANCH_NAME}",
|
||||
ancestor_branch_name=DEFAULT_BRANCH_NAME,
|
||||
tenant_id=self.id,
|
||||
)
|
||||
|
||||
self._add_timeline(
|
||||
TTimeline(
|
||||
timeline_id=self.first_timeline_id,
|
||||
tenant=self,
|
||||
name=DEFAULT_BRANCH_NAME,
|
||||
)
|
||||
)
|
||||
|
||||
def _add_timeline(self, timeline: TTimeline):
|
||||
assert timeline.tenant == self
|
||||
assert timeline not in self.timelines
|
||||
assert timeline.id is not None
|
||||
|
||||
self.timelines.append(timeline)
|
||||
self.timeline_ids[timeline.id] = timeline
|
||||
|
||||
if timeline.name is not None:
|
||||
self.timeline_names[timeline.name] = timeline
|
||||
|
||||
def create_timeline(
|
||||
self,
|
||||
new_name: str,
|
||||
parent_name: Optional[str] = None,
|
||||
parent_id: Optional[TimelineId] = None,
|
||||
branch_at: Optional[Lsn] = None,
|
||||
) -> TTimeline:
|
||||
if parent_name is not None:
|
||||
pass
|
||||
elif parent_id is not None:
|
||||
assert parent_name is None
|
||||
parent = self.timeline_ids[parent_id]
|
||||
parent_name = parent.name
|
||||
else:
|
||||
raise LookupError("Timeline creation requires parent by either ID or name")
|
||||
|
||||
assert parent_name is not None
|
||||
|
||||
new_id = self.__env.neon_cli.create_branch(
|
||||
new_branch_name=f"{self.name}:{new_name}",
|
||||
ancestor_branch_name=f"{self.name}:{parent_name}",
|
||||
tenant_id=self.id,
|
||||
ancestor_start_lsn=branch_at,
|
||||
)
|
||||
|
||||
new_tl = TTimeline(
|
||||
timeline_id=new_id,
|
||||
tenant=self,
|
||||
name=new_name,
|
||||
)
|
||||
self._add_timeline(new_tl)
|
||||
|
||||
return new_tl
|
||||
|
||||
@property
|
||||
def default_timeline(self) -> TTimeline:
|
||||
return self.timeline_ids.get(self.first_timeline_id)
|
||||
|
||||
def start_endpoint(self, ep: TEndpoint):
|
||||
if ep not in self.__endpoints:
|
||||
return
|
||||
|
||||
ep = cast(Endpoint, ep)
|
||||
|
||||
if not ep.is_running():
|
||||
ep.start()
|
||||
|
||||
def stop_endpoint(self, ep: TEndpoint, mode: str = "fast"):
|
||||
if ep not in self.__endpoints:
|
||||
return
|
||||
|
||||
ep = cast(Endpoint, ep)
|
||||
|
||||
if ep.is_running():
|
||||
ep.stop(mode=mode)
|
||||
|
||||
def create_endpoint(
|
||||
self,
|
||||
name: str,
|
||||
timeline_id: TimelineId,
|
||||
primary: bool = True,
|
||||
running: bool = False,
|
||||
port: Optional[int] = None,
|
||||
http_port: Optional[int] = None,
|
||||
lsn: Optional[Lsn] = None,
|
||||
config_lines: Optional[List[str]] = None,
|
||||
) -> TEndpoint:
|
||||
endpoint: _TEndpoint
|
||||
|
||||
if port is None:
|
||||
port = self.__env.port_distributor.get_port()
|
||||
|
||||
if http_port is None:
|
||||
http_port = self.__env.port_distributor.get_port()
|
||||
|
||||
if running:
|
||||
endpoint = self.__env.endpoints.create_start(
|
||||
branch_name=self.timeline_ids[timeline_id].name,
|
||||
endpoint_id=f"{self.name}.{name}",
|
||||
tenant_id=self.id,
|
||||
lsn=lsn,
|
||||
hot_standby=not primary,
|
||||
config_lines=config_lines,
|
||||
)
|
||||
else:
|
||||
endpoint = self.__env.endpoints.create(
|
||||
branch_name=self.timeline_ids[timeline_id].name,
|
||||
endpoint_id=f"{self.name}.{name}",
|
||||
tenant_id=self.id,
|
||||
lsn=lsn,
|
||||
hot_standby=not primary,
|
||||
config_lines=config_lines,
|
||||
)
|
||||
|
||||
self.__endpoints.append(endpoint)
|
||||
|
||||
return endpoint
|
||||
|
||||
def shutdown_resources(self):
|
||||
for ep in self.__endpoints:
|
||||
try:
|
||||
ep.stop_and_destroy("fast")
|
||||
except BaseException as e:
|
||||
log.error("Error encountered while shutting down endpoint %s", ep.endpoint_id, exc_info=e)
|
||||
|
||||
def reconfigure(self, endpoint: TEndpoint, lines: List[str], restart: bool):
|
||||
if endpoint not in self.__endpoints:
|
||||
return
|
||||
|
||||
endpoint = cast(_TEndpoint, endpoint)
|
||||
was_running = endpoint.is_running()
|
||||
if restart and was_running:
|
||||
endpoint.stop()
|
||||
|
||||
endpoint.config(lines)
|
||||
|
||||
if restart:
|
||||
endpoint.start()
|
||||
|
||||
def flush_timeline_data(self, timeline: TTimeline) -> Lsn:
|
||||
commit_lsn: Lsn = Lsn(0)
|
||||
# In principle in the absense of failures polling single sk would be enough.
|
||||
for sk in self.__env.safekeepers:
|
||||
cli = sk.http_client()
|
||||
# wait until compute connections are gone
|
||||
wait_until(30, 0.5, partial(are_walreceivers_absent, cli, self.id, timeline.id))
|
||||
commit_lsn = max(cli.get_commit_lsn(self.id, timeline.id), commit_lsn)
|
||||
|
||||
# Note: depending on WAL filtering implementation, probably most shards
|
||||
# won't be able to reach commit_lsn (unless gaps are also ack'ed), so this
|
||||
# is broken in sharded case.
|
||||
shards = tenant_get_shards(env=self.__env, tenant_id=self.id)
|
||||
for tenant_shard_id, pageserver in shards:
|
||||
log.info(
|
||||
f"flush_ep_to_pageserver: waiting for {commit_lsn} on shard {tenant_shard_id} on pageserver {pageserver.id})"
|
||||
)
|
||||
waited = wait_for_last_record_lsn(
|
||||
pageserver.http_client(), tenant_shard_id, timeline.id, commit_lsn
|
||||
)
|
||||
|
||||
assert waited >= commit_lsn
|
||||
|
||||
return commit_lsn
|
||||
|
||||
def checkpoint_timeline(self, timeline: TTimeline, **kwargs):
|
||||
self.__env.pageserver.http_client().timeline_checkpoint(
|
||||
tenant_id=self.id,
|
||||
timeline_id=timeline.id,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
def pgdatadir(self, endpoint: TEndpoint):
|
||||
if endpoint not in self.__endpoints:
|
||||
return None
|
||||
|
||||
return cast(Endpoint, endpoint).pgdata_dir
|
||||
|
||||
def check_restored_datadir_content(self, path, endpoint: TEndpoint, *args, **kwargs):
|
||||
if endpoint not in self.__endpoints:
|
||||
return
|
||||
|
||||
check_restored_datadir_content(path, self.__env, cast(Endpoint, endpoint), *args, **kwargs)
|
||||
@@ -84,7 +84,7 @@ def test_storage_controller_many_tenants(
|
||||
compute_reconfigure_listener.register_on_notify(lambda body: time.sleep(0.01))
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
neon_env_builder.start()
|
||||
env.start()
|
||||
|
||||
# We will intentionally stress reconciler concurrrency, which triggers a warning when lots
|
||||
# of shards are hitting the delayed path.
|
||||
|
||||
@@ -15,7 +15,6 @@ from fixtures.neon_fixtures import (
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverApiException
|
||||
from fixtures.pageserver.utils import wait_until_tenant_active
|
||||
from fixtures.shared_fixtures import TTimeline
|
||||
from fixtures.utils import query_scalar
|
||||
from performance.test_perf_pgbench import get_scales_matrix
|
||||
from requests import RequestException
|
||||
@@ -116,10 +115,13 @@ def test_branching_with_pgbench(
|
||||
# This test checks if the pageserver is able to handle a "unnormalized" starting LSN.
|
||||
#
|
||||
# Related: see discussion in https://github.com/neondatabase/neon/pull/2143#issuecomment-1209092186
|
||||
def test_branching_unnormalized_start_lsn(timeline: TTimeline, pg_bin: PgBin):
|
||||
def test_branching_unnormalized_start_lsn(neon_simple_env: NeonEnv, pg_bin: PgBin):
|
||||
XLOG_BLCKSZ = 8192
|
||||
|
||||
endpoint0 = timeline.primary
|
||||
env = neon_simple_env
|
||||
|
||||
env.neon_cli.create_branch("b0")
|
||||
endpoint0 = env.endpoints.create_start("b0")
|
||||
|
||||
pg_bin.run_capture(["pgbench", "-i", endpoint0.connstr()])
|
||||
|
||||
@@ -131,8 +133,8 @@ def test_branching_unnormalized_start_lsn(timeline: TTimeline, pg_bin: PgBin):
|
||||
start_lsn = Lsn((int(curr_lsn) - XLOG_BLCKSZ) // XLOG_BLCKSZ * XLOG_BLCKSZ)
|
||||
|
||||
log.info(f"Branching b1 from b0 starting at lsn {start_lsn}...")
|
||||
tl2 = timeline.create_branch("branch", start_lsn)
|
||||
endpoint1 = tl2.primary
|
||||
env.neon_cli.create_branch("b1", "b0", ancestor_start_lsn=start_lsn)
|
||||
endpoint1 = env.endpoints.create_start("b1")
|
||||
|
||||
pg_bin.run_capture(["pgbench", "-i", endpoint1.connstr()])
|
||||
|
||||
|
||||
@@ -1,13 +1,14 @@
|
||||
from pathlib import Path
|
||||
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, flush_ep_to_pageserver, test_output_dir
|
||||
from fixtures.shared_fixtures import TTimeline
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, flush_ep_to_pageserver
|
||||
|
||||
|
||||
def do_combocid_op(timeline: TTimeline, op):
|
||||
endpoint = timeline.primary_with_config(config_lines=[
|
||||
"shared_buffers='1MB'",
|
||||
])
|
||||
def do_combocid_op(neon_env_builder: NeonEnvBuilder, op):
|
||||
env = neon_env_builder.init_start()
|
||||
endpoint = env.endpoints.create_start(
|
||||
"main",
|
||||
config_lines=[
|
||||
"shared_buffers='1MB'",
|
||||
],
|
||||
)
|
||||
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
@@ -42,26 +43,32 @@ def do_combocid_op(timeline: TTimeline, op):
|
||||
assert len(rows) == 500
|
||||
|
||||
cur.execute("rollback")
|
||||
timeline.stop_and_flush(endpoint)
|
||||
timeline.checkpoint(compact=False, wait_until_uploaded=True)
|
||||
flush_ep_to_pageserver(env, endpoint, env.initial_tenant, env.initial_timeline)
|
||||
env.pageserver.http_client().timeline_checkpoint(
|
||||
env.initial_tenant, env.initial_timeline, compact=False, wait_until_uploaded=True
|
||||
)
|
||||
|
||||
|
||||
def test_combocid_delete(timeline: TTimeline):
|
||||
do_combocid_op(timeline, "delete from t")
|
||||
def test_combocid_delete(neon_env_builder: NeonEnvBuilder):
|
||||
do_combocid_op(neon_env_builder, "delete from t")
|
||||
|
||||
|
||||
def test_combocid_update(timeline: TTimeline):
|
||||
do_combocid_op(timeline, "update t set val=val+1")
|
||||
def test_combocid_update(neon_env_builder: NeonEnvBuilder):
|
||||
do_combocid_op(neon_env_builder, "update t set val=val+1")
|
||||
|
||||
|
||||
def test_combocid_lock(timeline: TTimeline):
|
||||
do_combocid_op(timeline, "select * from t for update")
|
||||
def test_combocid_lock(neon_env_builder: NeonEnvBuilder):
|
||||
do_combocid_op(neon_env_builder, "select * from t for update")
|
||||
|
||||
|
||||
def test_combocid_multi_insert(timeline: TTimeline, test_output_dir: Path):
|
||||
endpoint = timeline.primary_with_config(config_lines=[
|
||||
"shared_buffers='1MB'",
|
||||
])
|
||||
def test_combocid_multi_insert(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
endpoint = env.endpoints.create_start(
|
||||
"main",
|
||||
config_lines=[
|
||||
"shared_buffers='1MB'",
|
||||
],
|
||||
)
|
||||
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
@@ -70,7 +77,7 @@ def test_combocid_multi_insert(timeline: TTimeline, test_output_dir: Path):
|
||||
cur.execute("CREATE EXTENSION neon_test_utils")
|
||||
|
||||
cur.execute("create table t(id integer, val integer)")
|
||||
file_path = str(test_output_dir / "t.csv")
|
||||
file_path = f"{endpoint.pg_data_dir_path()}/t.csv"
|
||||
cur.execute(f"insert into t select g, 0 from generate_series(1,{n_records}) g")
|
||||
cur.execute(f"copy t to '{file_path}'")
|
||||
cur.execute("truncate table t")
|
||||
@@ -99,12 +106,15 @@ def test_combocid_multi_insert(timeline: TTimeline, test_output_dir: Path):
|
||||
|
||||
cur.execute("rollback")
|
||||
|
||||
timeline.stop_and_flush(endpoint)
|
||||
timeline.checkpoint(compact=False, wait_until_uploaded=True)
|
||||
flush_ep_to_pageserver(env, endpoint, env.initial_tenant, env.initial_timeline)
|
||||
env.pageserver.http_client().timeline_checkpoint(
|
||||
env.initial_tenant, env.initial_timeline, compact=False, wait_until_uploaded=True
|
||||
)
|
||||
|
||||
|
||||
def test_combocid(timeline: TTimeline):
|
||||
endpoint = timeline.primary
|
||||
def test_combocid(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
@@ -137,5 +147,7 @@ def test_combocid(timeline: TTimeline):
|
||||
|
||||
cur.execute("rollback")
|
||||
|
||||
timeline.stop_and_flush(endpoint)
|
||||
timeline.checkpoint(compact=False, wait_until_uploaded=True)
|
||||
flush_ep_to_pageserver(env, endpoint, env.initial_tenant, env.initial_timeline)
|
||||
env.pageserver.http_client().timeline_checkpoint(
|
||||
env.initial_tenant, env.initial_timeline, compact=False, wait_until_uploaded=True
|
||||
)
|
||||
|
||||
@@ -178,7 +178,7 @@ def test_backward_compatibility(
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.from_repo_dir(compatibility_snapshot_dir / "repo")
|
||||
env.pageserver.allowed_errors.append(ingest_lag_log_line)
|
||||
neon_env_builder.start()
|
||||
env.start()
|
||||
|
||||
check_neon_works(
|
||||
env,
|
||||
@@ -265,7 +265,7 @@ def test_forward_compatibility(
|
||||
# does not include logs from previous runs
|
||||
assert not env.pageserver.log_contains("git-env:" + prev_pageserver_version)
|
||||
|
||||
neon_env_builder.start()
|
||||
env.start()
|
||||
|
||||
# ensure the specified pageserver is running
|
||||
assert env.pageserver.log_contains("git-env:" + prev_pageserver_version)
|
||||
|
||||
@@ -5,7 +5,6 @@ import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.shared_fixtures import TTimeline
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
|
||||
@@ -13,17 +12,18 @@ from fixtures.utils import query_scalar
|
||||
# Test CREATE DATABASE when there have been relmapper changes
|
||||
#
|
||||
@pytest.mark.parametrize("strategy", ["file_copy", "wal_log"])
|
||||
def test_createdb(timeline: TTimeline, pg_version: PgVersion, strategy: str):
|
||||
if pg_version == PgVersion.V14 and strategy == "wal_log":
|
||||
def test_createdb(neon_simple_env: NeonEnv, strategy: str):
|
||||
env = neon_simple_env
|
||||
if env.pg_version == PgVersion.V14 and strategy == "wal_log":
|
||||
pytest.skip("wal_log strategy not supported on PostgreSQL 14")
|
||||
|
||||
endpoint = timeline.primary
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
with endpoint.cursor() as cur:
|
||||
# Cause a 'relmapper' change in the original branch
|
||||
cur.execute("VACUUM FULL pg_class")
|
||||
|
||||
if pg_version == PgVersion.V14:
|
||||
if env.pg_version == PgVersion.V14:
|
||||
cur.execute("CREATE DATABASE foodb")
|
||||
else:
|
||||
cur.execute(f"CREATE DATABASE foodb STRATEGY={strategy}")
|
||||
@@ -31,8 +31,8 @@ def test_createdb(timeline: TTimeline, pg_version: PgVersion, strategy: str):
|
||||
lsn = query_scalar(cur, "SELECT pg_current_wal_insert_lsn()")
|
||||
|
||||
# Create a branch
|
||||
tl2 = timeline.create_branch("createdb2", lsn=lsn)
|
||||
endpoint2 = tl2.primary
|
||||
env.neon_cli.create_branch("test_createdb2", "main", ancestor_start_lsn=lsn)
|
||||
endpoint2 = env.endpoints.create_start("test_createdb2")
|
||||
|
||||
# Test that you can connect to the new database on both branches
|
||||
for db in (endpoint, endpoint2):
|
||||
@@ -58,8 +58,9 @@ def test_createdb(timeline: TTimeline, pg_version: PgVersion, strategy: str):
|
||||
#
|
||||
# Test DROP DATABASE
|
||||
#
|
||||
def test_dropdb(timeline: TTimeline, test_output_dir):
|
||||
endpoint = timeline.primary
|
||||
def test_dropdb(neon_simple_env: NeonEnv, test_output_dir):
|
||||
env = neon_simple_env
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
with endpoint.cursor() as cur:
|
||||
cur.execute("CREATE DATABASE foodb")
|
||||
@@ -76,28 +77,28 @@ def test_dropdb(timeline: TTimeline, test_output_dir):
|
||||
lsn_after_drop = query_scalar(cur, "SELECT pg_current_wal_insert_lsn()")
|
||||
|
||||
# Create two branches before and after database drop.
|
||||
tl_before = timeline.create_branch("test_before_dropdb", lsn=lsn_before_drop)
|
||||
endpoint_before = tl_before.primary
|
||||
env.neon_cli.create_branch("test_before_dropdb", "main", ancestor_start_lsn=lsn_before_drop)
|
||||
endpoint_before = env.endpoints.create_start("test_before_dropdb")
|
||||
|
||||
tl_after = timeline.create_branch("test_after_dropdb", lsn=lsn_after_drop)
|
||||
endpoint_after = tl_after.primary
|
||||
env.neon_cli.create_branch("test_after_dropdb", "main", ancestor_start_lsn=lsn_after_drop)
|
||||
endpoint_after = env.endpoints.create_start("test_after_dropdb")
|
||||
|
||||
# Test that database exists on the branch before drop
|
||||
endpoint_before.connect(dbname="foodb").close()
|
||||
|
||||
# Test that database subdir exists on the branch before drop
|
||||
assert timeline.tenant.pgdatadir(endpoint_before)
|
||||
dbpath = pathlib.Path(timeline.tenant.pgdatadir(endpoint_before)) / "base" / str(dboid)
|
||||
assert endpoint_before.pgdata_dir
|
||||
dbpath = pathlib.Path(endpoint_before.pgdata_dir) / "base" / str(dboid)
|
||||
log.info(dbpath)
|
||||
|
||||
assert os.path.isdir(dbpath) is True
|
||||
|
||||
# Test that database subdir doesn't exist on the branch after drop
|
||||
assert timeline.tenant.pgdatadir(endpoint_after)
|
||||
dbpath = pathlib.Path(timeline.tenant.pgdatadir(endpoint_after)) / "base" / str(dboid)
|
||||
assert endpoint_after.pgdata_dir
|
||||
dbpath = pathlib.Path(endpoint_after.pgdata_dir) / "base" / str(dboid)
|
||||
log.info(dbpath)
|
||||
|
||||
assert os.path.isdir(dbpath) is False
|
||||
|
||||
# Check that we restore the content of the datadir correctly
|
||||
timeline.tenant.check_restored_datadir_content(test_output_dir, endpoint)
|
||||
check_restored_datadir_content(test_output_dir, env, endpoint)
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
from fixtures.shared_fixtures import TTimeline
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
|
||||
#
|
||||
# Test CREATE USER to check shared catalog restore
|
||||
#
|
||||
def test_createuser(timeline: TTimeline):
|
||||
endpoint = timeline.primary
|
||||
def test_createuser(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
with endpoint.cursor() as cur:
|
||||
# Cause a 'relmapper' change in the original branch
|
||||
@@ -18,8 +18,8 @@ def test_createuser(timeline: TTimeline):
|
||||
lsn = query_scalar(cur, "SELECT pg_current_wal_insert_lsn()")
|
||||
|
||||
# Create a branch
|
||||
branch = timeline.create_branch("test_createuser2", lsn=lsn)
|
||||
endpoint2 = branch.primary
|
||||
env.neon_cli.create_branch("test_createuser2", "main", ancestor_start_lsn=lsn)
|
||||
endpoint2 = env.endpoints.create_start("test_createuser2")
|
||||
|
||||
# Test that you can connect to new branch as a new user
|
||||
assert endpoint2.safe_psql("select current_user", user="testuser") == [("testuser",)]
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import Endpoint
|
||||
from fixtures.shared_fixtures import TTimeline
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@@ -11,11 +10,14 @@ from fixtures.shared_fixtures import TTimeline
|
||||
"💣", # calls `trigger_segfault` internally
|
||||
],
|
||||
)
|
||||
def test_endpoint_crash(timeline: TTimeline, sql_func: str):
|
||||
def test_endpoint_crash(neon_env_builder: NeonEnvBuilder, sql_func: str):
|
||||
"""
|
||||
Test that triggering crash from neon_test_utils crashes the endpoint
|
||||
"""
|
||||
endpoint = timeline.primary
|
||||
env = neon_env_builder.init_start()
|
||||
env.neon_cli.create_branch("test_endpoint_crash")
|
||||
endpoint = env.endpoints.create_start("test_endpoint_crash")
|
||||
|
||||
endpoint.safe_psql("CREATE EXTENSION neon_test_utils;")
|
||||
with pytest.raises(Exception, match="This probably means the server terminated abnormally"):
|
||||
endpoint.safe_psql(f"SELECT {sql_func}();")
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
from fixtures.shared_fixtures import TTimeline
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
|
||||
|
||||
def test_fsm_truncate(timeline: TTimeline):
|
||||
endpoint = timeline.primary
|
||||
def test_fsm_truncate(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
env.neon_cli.create_branch("test_fsm_truncate")
|
||||
endpoint = env.endpoints.create_start("test_fsm_truncate")
|
||||
endpoint.safe_psql(
|
||||
"CREATE TABLE t1(key int); CREATE TABLE t2(key int); TRUNCATE TABLE t1; TRUNCATE TABLE t2;"
|
||||
)
|
||||
|
||||
@@ -1,15 +1,17 @@
|
||||
import time
|
||||
|
||||
from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup
|
||||
from fixtures.shared_fixtures import TTimeline
|
||||
|
||||
|
||||
#
|
||||
# Test that redo of XLOG_GIN_VACUUM_PAGE doesn't produce error
|
||||
#
|
||||
def test_gin_redo(timeline: TTimeline):
|
||||
primary = timeline.primary
|
||||
secondary = timeline.secondary
|
||||
def test_gin_redo(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
|
||||
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
|
||||
time.sleep(1)
|
||||
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
|
||||
con = primary.connect()
|
||||
cur = con.cursor()
|
||||
cur.execute("create table gin_test_tbl(id integer, i int4[])")
|
||||
|
||||
@@ -14,7 +14,6 @@ from fixtures.neon_fixtures import (
|
||||
tenant_get_shards,
|
||||
wait_replica_caughtup,
|
||||
)
|
||||
from fixtures.shared_fixtures import TTimeline
|
||||
from fixtures.utils import wait_until
|
||||
|
||||
|
||||
@@ -222,20 +221,29 @@ def pgbench_accounts_initialized(ep):
|
||||
#
|
||||
# Without hs feedback enabled we'd see 'User query might have needed to see row
|
||||
# versions that must be removed.' errors.
|
||||
def test_hot_standby_feedback(timeline: TTimeline, pg_bin: PgBin):
|
||||
with timeline.primary_with_config(config_lines=[
|
||||
def test_hot_standby_feedback(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
env = neon_env_builder.init_start()
|
||||
agressive_vacuum_conf = [
|
||||
"log_autovacuum_min_duration = 0",
|
||||
"autovacuum_naptime = 10s",
|
||||
"autovacuum_vacuum_threshold = 25",
|
||||
"autovacuum_vacuum_scale_factor = 0.1",
|
||||
"autovacuum_vacuum_cost_delay = -1",
|
||||
]) as primary:
|
||||
]
|
||||
with env.endpoints.create_start(
|
||||
branch_name="main", endpoint_id="primary", config_lines=agressive_vacuum_conf
|
||||
) as primary:
|
||||
# It would be great to have more strict max_standby_streaming_delay=0s here, but then sometimes it fails with
|
||||
# 'User was holding shared buffer pin for too long.'.
|
||||
with timeline.secondary_with_config(config_lines=[
|
||||
"max_standby_streaming_delay=2s",
|
||||
"hot_standby_feedback=true",
|
||||
]) as secondary:
|
||||
with env.endpoints.new_replica_start(
|
||||
origin=primary,
|
||||
endpoint_id="secondary",
|
||||
config_lines=[
|
||||
"max_standby_streaming_delay=2s",
|
||||
"neon.protocol_version=2",
|
||||
"hot_standby_feedback=true",
|
||||
],
|
||||
) as secondary:
|
||||
log.info(
|
||||
f"primary connstr is {primary.connstr()}, secondary connstr {secondary.connstr()}"
|
||||
)
|
||||
@@ -278,15 +286,20 @@ def test_hot_standby_feedback(timeline: TTimeline, pg_bin: PgBin):
|
||||
|
||||
# Test race condition between WAL replay and backends performing queries
|
||||
# https://github.com/neondatabase/neon/issues/7791
|
||||
def test_replica_query_race(timeline: TTimeline):
|
||||
primary_ep = timeline.primary
|
||||
def test_replica_query_race(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
|
||||
primary_ep = env.endpoints.create_start(
|
||||
branch_name="main",
|
||||
endpoint_id="primary",
|
||||
)
|
||||
|
||||
with primary_ep.connect() as p_con:
|
||||
with p_con.cursor() as p_cur:
|
||||
p_cur.execute("CREATE EXTENSION neon_test_utils")
|
||||
p_cur.execute("CREATE TABLE test AS SELECT 0 AS counter")
|
||||
|
||||
standby_ep = timeline.secondary
|
||||
standby_ep = env.endpoints.new_replica_start(origin=primary_ep, endpoint_id="standby")
|
||||
wait_replica_caughtup(primary_ep, standby_ep)
|
||||
|
||||
# In primary, run a lot of UPDATEs on a single page
|
||||
|
||||
@@ -8,19 +8,22 @@ import time
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, PgBin
|
||||
from fixtures.shared_fixtures import TTimeline
|
||||
|
||||
|
||||
#
|
||||
# Test branching, when a transaction is in prepared state
|
||||
#
|
||||
@pytest.mark.timeout(600)
|
||||
def test_lfc_resize(timeline: TTimeline, pg_bin: PgBin):
|
||||
endpoint = timeline.primary_with_config(config_lines=[
|
||||
"neon.file_cache_path='file.cache'",
|
||||
"neon.max_file_cache_size=512MB",
|
||||
"neon.file_cache_size_limit=512MB",
|
||||
])
|
||||
def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin):
|
||||
env = neon_simple_env
|
||||
endpoint = env.endpoints.create_start(
|
||||
"main",
|
||||
config_lines=[
|
||||
"neon.file_cache_path='file.cache'",
|
||||
"neon.max_file_cache_size=512MB",
|
||||
"neon.file_cache_size_limit=512MB",
|
||||
],
|
||||
)
|
||||
n_resize = 10
|
||||
scale = 100
|
||||
|
||||
@@ -46,7 +49,7 @@ def test_lfc_resize(timeline: TTimeline, pg_bin: PgBin):
|
||||
|
||||
thread.join()
|
||||
|
||||
lfc_file_path = timeline.tenant.pgdatadir(endpoint) / "file.cache"
|
||||
lfc_file_path = f"{endpoint.pg_data_dir_path()}/file.cache"
|
||||
lfc_file_size = os.path.getsize(lfc_file_path)
|
||||
res = subprocess.run(["ls", "-sk", lfc_file_path], check=True, text=True, capture_output=True)
|
||||
lfc_file_blocks = re.findall("([0-9A-F]+)", res.stdout)[0]
|
||||
|
||||
@@ -3,7 +3,6 @@ from pathlib import Path
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
from fixtures.shared_fixtures import TTimeline
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
|
||||
@@ -74,14 +73,18 @@ WITH (fillfactor='100');
|
||||
assert blocks < 10
|
||||
|
||||
|
||||
def test_sliding_working_set_approximation(timeline: TTimeline):
|
||||
endpoint = timeline.primary_with_config(config_lines=[
|
||||
"autovacuum = off",
|
||||
"shared_buffers=1MB",
|
||||
"neon.max_file_cache_size=256MB",
|
||||
"neon.file_cache_size_limit=245MB",
|
||||
])
|
||||
def test_sliding_working_set_approximation(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
|
||||
endpoint = env.endpoints.create_start(
|
||||
branch_name="main",
|
||||
config_lines=[
|
||||
"autovacuum = off",
|
||||
"shared_buffers=1MB",
|
||||
"neon.max_file_cache_size=256MB",
|
||||
"neon.file_cache_size_limit=245MB",
|
||||
],
|
||||
)
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
cur.execute("create extension neon")
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
|
||||
from fixtures.shared_fixtures import TTimeline
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
|
||||
@@ -13,8 +12,9 @@ from fixtures.utils import query_scalar
|
||||
# is enough to verify that the WAL records are handled correctly
|
||||
# in the pageserver.
|
||||
#
|
||||
def test_multixact(timeline: TTimeline, test_output_dir):
|
||||
endpoint = timeline.primary
|
||||
def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
|
||||
env = neon_simple_env
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
cur = endpoint.connect().cursor()
|
||||
cur.execute(
|
||||
@@ -72,8 +72,10 @@ def test_multixact(timeline: TTimeline, test_output_dir):
|
||||
assert int(next_multixact_id) > int(next_multixact_id_old)
|
||||
|
||||
# Branch at this point
|
||||
branch = timeline.create_branch("test_multixact_new", lsn=lsn)
|
||||
endpoint_new = branch.primary
|
||||
env.neon_cli.create_branch(
|
||||
"test_multixact_new", ancestor_branch_name="main", ancestor_start_lsn=lsn
|
||||
)
|
||||
endpoint_new = env.endpoints.create_start("test_multixact_new")
|
||||
|
||||
next_multixact_id_new = endpoint_new.safe_psql(
|
||||
"SELECT next_multixact_id FROM pg_control_checkpoint()"
|
||||
@@ -83,4 +85,4 @@ def test_multixact(timeline: TTimeline, test_output_dir):
|
||||
assert next_multixact_id_new == next_multixact_id
|
||||
|
||||
# Check that we can restore the content of the datadir correctly
|
||||
timeline.tenant.check_restored_datadir_content(test_output_dir, endpoint)
|
||||
check_restored_datadir_content(test_output_dir, env, endpoint)
|
||||
|
||||
@@ -374,7 +374,7 @@ def test_sharding_split_smoke(
|
||||
non_default_tenant_config = {"gc_horizon": 77 * 1024 * 1024}
|
||||
|
||||
env = neon_env_builder.init_configs(True)
|
||||
neon_env_builder.start()
|
||||
env.start()
|
||||
tenant_id = TenantId.generate()
|
||||
timeline_id = TimelineId.generate()
|
||||
env.neon_cli.create_tenant(
|
||||
@@ -1436,7 +1436,7 @@ def test_sharding_unlogged_relation(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
neon_env_builder.num_pageservers = 2
|
||||
env = neon_env_builder.init_configs()
|
||||
neon_env_builder.start()
|
||||
env.start()
|
||||
|
||||
tenant_id = TenantId.generate()
|
||||
timeline_id = TimelineId.generate()
|
||||
@@ -1475,7 +1475,7 @@ def test_top_tenants(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
neon_env_builder.start()
|
||||
env.start()
|
||||
|
||||
tenants = []
|
||||
n_tenants = 8
|
||||
|
||||
@@ -2,7 +2,7 @@ import time
|
||||
from contextlib import closing
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, fork_at_current_lsn
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
|
||||
@@ -57,7 +57,7 @@ def test_vm_bit_clear(neon_simple_env: NeonEnv):
|
||||
cur.execute("UPDATE vmtest_cold_update2 SET id = 5000, filler=repeat('x', 200) WHERE id = 1")
|
||||
|
||||
# Branch at this point, to test that later
|
||||
# fork_at_current_lsn(env, endpoint, "test_vm_bit_clear_new", "main")
|
||||
fork_at_current_lsn(env, endpoint, "test_vm_bit_clear_new", "main")
|
||||
|
||||
# Clear the buffer cache, to force the VM page to be re-fetched from
|
||||
# the page server
|
||||
@@ -91,7 +91,6 @@ def test_vm_bit_clear(neon_simple_env: NeonEnv):
|
||||
# a dirty VM page is evicted. If the VM bit was not correctly cleared by the
|
||||
# earlier WAL record, the full-page image hides the problem. Starting a new
|
||||
# server at the right point-in-time avoids that full-page image.
|
||||
|
||||
endpoint_new = env.endpoints.create_start("test_vm_bit_clear_new")
|
||||
|
||||
pg_new_conn = endpoint_new.connect()
|
||||
|
||||
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 0353a8f5f7...6f6d77fb59
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 49386938eb...0baa7346df
4
vendor/revisions.json
vendored
4
vendor/revisions.json
vendored
@@ -1,11 +1,11 @@
|
||||
{
|
||||
"v16": [
|
||||
"16.4",
|
||||
"6e9a4ff6249ac02b8175054b7b3f7dfb198be48b"
|
||||
"0baa7346dfd42d61912eeca554c9bb0a190f0a1e"
|
||||
],
|
||||
"v15": [
|
||||
"15.8",
|
||||
"49d5e576a56e4cc59cd6a6a0791b2324b9fa675e"
|
||||
"6f6d77fb5960602fcd3fd130aca9f99ecb1619c9"
|
||||
],
|
||||
"v14": [
|
||||
"14.13",
|
||||
|
||||
Reference in New Issue
Block a user