Compare commits

..

2 Commits

Author SHA1 Message Date
Sergey Melnikov
b555686222 Add test domain for multi cert proxy 2023-04-05 21:43:39 +02:00
Stas Kelvich
c3ca48c62b Support extra domain names for proxy.
Make it possible to specify directory where proxy will look up for
extra certificates. Proxy will iterate through subdirs of that directory
and load `key.pem` and `cert.pem` files from each subdir. Certs directory
structure may look like that:

  certs
  |--example.com
  |  |--key.pem
  |  |--cert.pem
  |--foo.bar
     |--key.pem
     |--cert.pem

Actual domain names are taken from certs and key, subdir names are
ignored.
2023-04-05 20:06:48 +03:00
106 changed files with 1510 additions and 1896 deletions

View File

@@ -23,6 +23,7 @@ settings:
authBackend: "console"
authEndpoint: "http://neon-internal-api.aws.neon.build/management/api/v2"
domain: "*.us-east-2.aws.neon.build"
extraDomains: [ "*.us-east-2.postgres.zenith.tech" ]
sentryEnvironment: "staging"
wssPort: 8443
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.build/billing/api/v1/usage_events"

13
Cargo.lock generated
View File

@@ -841,18 +841,6 @@ dependencies = [
"unicode-width",
]
[[package]]
name = "compute_api"
version = "0.1.0"
dependencies = [
"anyhow",
"chrono",
"serde",
"serde_json",
"serde_with",
"workspace_hack",
]
[[package]]
name = "compute_tools"
version = "0.1.0"
@@ -860,7 +848,6 @@ dependencies = [
"anyhow",
"chrono",
"clap 4.1.4",
"compute_api",
"futures",
"hyper",
"notify",

View File

@@ -132,7 +132,6 @@ tokio-tar = { git = "https://github.com/neondatabase/tokio-tar.git", rev="404df6
heapless = { default-features=false, features=[], git = "https://github.com/japaric/heapless.git", rev = "644653bf3b831c6bb4963be2de24804acf5e5001" } # upstream release pending
## Local libraries
compute_api = { version = "0.1", path = "./libs/compute_api/" }
consumption_metrics = { version = "0.1", path = "./libs/consumption_metrics/" }
metrics = { version = "0.1", path = "./libs/metrics/" }
pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }

View File

@@ -147,15 +147,15 @@ Created an initial timeline 'de200bd42b49cc1814412c7e592dd6e9' at Lsn 0/16B5A50
Setting tenant 9ef87a5bf0d92544f6fafeeb3239695c as a default one
# start postgres compute node
> ./target/debug/neon_local endpoint start main
Starting new endpoint main (PostgreSQL v14) on timeline de200bd42b49cc1814412c7e592dd6e9 ...
> ./target/debug/neon_local pg start main
Starting new postgres (v14) main on timeline de200bd42b49cc1814412c7e592dd6e9 ...
Extracting base backup to create postgres instance: path=.neon/pgdatadirs/tenants/9ef87a5bf0d92544f6fafeeb3239695c/main port=55432
Starting postgres at 'host=127.0.0.1 port=55432 user=cloud_admin dbname=postgres'
Starting postgres node at 'host=127.0.0.1 port=55432 user=cloud_admin dbname=postgres'
# check list of running postgres instances
> ./target/debug/neon_local endpoint list
ENDPOINT ADDRESS TIMELINE BRANCH NAME LSN STATUS
main 127.0.0.1:55432 de200bd42b49cc1814412c7e592dd6e9 main 0/16B5BA8 running
> ./target/debug/neon_local pg list
NODE ADDRESS TIMELINE BRANCH NAME LSN STATUS
main 127.0.0.1:55432 de200bd42b49cc1814412c7e592dd6e9 main 0/16B5BA8 running
```
2. Now, it is possible to connect to postgres and run some queries:
@@ -184,14 +184,14 @@ Created timeline 'b3b863fa45fa9e57e615f9f2d944e601' at Lsn 0/16F9A00 for tenant:
(L) ┗━ @0/16F9A00: migration_check [b3b863fa45fa9e57e615f9f2d944e601]
# start postgres on that branch
> ./target/debug/neon_local endpoint start migration_check --branch-name migration_check
Starting new endpoint migration_check (PostgreSQL v14) on timeline b3b863fa45fa9e57e615f9f2d944e601 ...
> ./target/debug/neon_local pg start migration_check --branch-name migration_check
Starting new postgres migration_check on timeline b3b863fa45fa9e57e615f9f2d944e601 ...
Extracting base backup to create postgres instance: path=.neon/pgdatadirs/tenants/9ef87a5bf0d92544f6fafeeb3239695c/migration_check port=55433
Starting postgres at 'host=127.0.0.1 port=55433 user=cloud_admin dbname=postgres'
Starting postgres node at 'host=127.0.0.1 port=55433 user=cloud_admin dbname=postgres'
# check the new list of running postgres instances
> ./target/debug/neon_local endpoint list
ENDPOINT ADDRESS TIMELINE BRANCH NAME LSN STATUS
> ./target/debug/neon_local pg list
NODE ADDRESS TIMELINE BRANCH NAME LSN STATUS
main 127.0.0.1:55432 de200bd42b49cc1814412c7e592dd6e9 main 0/16F9A38 running
migration_check 127.0.0.1:55433 b3b863fa45fa9e57e615f9f2d944e601 migration_check 0/16F9A70 running

View File

@@ -27,5 +27,4 @@ tracing-subscriber.workspace = true
tracing-utils.workspace = true
url.workspace = true
compute_api.workspace = true
workspace_hack.workspace = true

View File

@@ -34,24 +34,22 @@ use std::fs::File;
use std::panic;
use std::path::Path;
use std::process::exit;
use std::sync::{Arc, Condvar, Mutex};
use std::sync::{Arc, RwLock};
use std::{thread, time::Duration};
use anyhow::{anyhow, Context, Result};
use anyhow::{Context, Result};
use chrono::Utc;
use clap::Arg;
use tracing::{error, info};
use url::Url;
use compute_api::models::{ComputeMetrics, ComputeState, ComputeStatus};
use compute_tools::compute::{ComputeNode, ComputeNodeInner, ParsedSpec};
use compute_tools::configurator::launch_configurator;
use compute_tools::compute::{ComputeMetrics, ComputeNode, ComputeState, ComputeStatus};
use compute_tools::http::api::launch_http_server;
use compute_tools::logger::*;
use compute_tools::monitor::launch_monitor;
use compute_tools::params::*;
use compute_tools::spec::get_spec_from_control_plane;
use compute_tools::pg_helpers::*;
use compute_tools::spec::*;
use url::Url;
fn main() -> Result<()> {
init_tracing_and_logging(DEFAULT_LOG_LEVEL)?;
@@ -64,7 +62,7 @@ fn main() -> Result<()> {
let connstr = matches
.get_one::<String>("connstr")
.expect("Postgres connection string is required");
let spec_json = matches.get_one::<String>("spec");
let spec = matches.get_one::<String>("spec");
let spec_path = matches.get_one::<String>("spec-path");
let compute_id = matches.get_one::<String>("compute-id");
@@ -73,97 +71,40 @@ fn main() -> Result<()> {
// Try to use just 'postgres' if no path is provided
let pgbin = matches.get_one::<String>("pgbin").unwrap();
let mut spec = None;
let mut live_config_allowed = false;
match spec_json {
let spec: ComputeSpec = match spec {
// First, try to get cluster spec from the cli argument
Some(json) => {
spec = Some(serde_json::from_str(json)?);
}
Some(json) => serde_json::from_str(json)?,
None => {
// Second, try to read it from the file if path is provided
if let Some(sp) = spec_path {
let path = Path::new(sp);
let file = File::open(path)?;
spec = Some(serde_json::from_reader(file)?);
serde_json::from_reader(file)?
} else if let Some(id) = compute_id {
if let Some(cp_base) = control_plane_uri {
live_config_allowed = true;
if let Ok(s) = get_spec_from_control_plane(cp_base, id) {
spec = Some(s);
}
let cp_uri = format!("{cp_base}/management/api/v1/{id}/spec");
let jwt: String = match std::env::var("NEON_CONSOLE_JWT") {
Ok(v) => v,
Err(_) => "".to_string(),
};
reqwest::blocking::Client::new()
.get(cp_uri)
.header("Authorization", jwt)
.send()?
.json()?
} else {
panic!("must specify both --control-plane-uri and --compute-id or none");
panic!(
"must specify --control-plane-uri \"{:#?}\" and --compute-id \"{:#?}\"",
control_plane_uri, compute_id
);
}
} else {
panic!(
"compute spec should be provided by one of the following ways: \
--spec OR --spec-path OR --control-plane-uri and --compute-id"
);
panic!("compute spec should be provided via --spec or --spec-path argument");
}
}
};
// Volatile compute state under mutex and condition variable to notify everyone
// who is interested in the state changes.
let compute_node = ComputeNode {
start_time: Utc::now(),
connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?,
pgdata: pgdata.to_string(),
pgbin: pgbin.to_string(),
live_config_allowed,
inner: Mutex::new(ComputeNodeInner {
state: ComputeState {
status: ComputeStatus::Empty,
last_active: Utc::now(),
error: None,
},
spec: None,
metrics: ComputeMetrics::default(),
}),
state_changed: Condvar::new()
};
// If we have a spec already, go immediately into Init state.
let spec_set = spec.is_some();
if let Some(spec) = spec {
let mut inner = compute_node.inner.lock().unwrap();
let parsed_spec = ParsedSpec::try_from(spec)
.map_err(|msg| anyhow!("error parsing compute spec: {msg}"))?;
inner.spec = Some(parsed_spec);
inner.state.status = ComputeStatus::Init;
}
let compute = Arc::new(compute_node);
// Launch http service first, so we were able to serve control-plane
// requests, while configuration is still in progress.
let _http_handle = launch_http_server(&compute).expect("cannot launch http endpoint thread");
if !spec_set {
// No spec was provided earlier, hang waiting for it.
info!("no compute spec provided, waiting");
let mut inner = compute.inner.lock().unwrap();
while inner.state.status != ComputeStatus::ConfigurationPending {
inner = compute.state_changed.wait(inner).unwrap();
if inner.state.status == ComputeStatus::ConfigurationPending {
info!("got spec, continue configuration");
// Spec is already set by the http server handler.
inner.state.status = ComputeStatus::Init;
break;
}
}
};
// We got the spec. Start up
let startup_tracing_context = {
let inner = compute.inner.lock().unwrap();
inner.spec.as_ref().unwrap().spec.startup_tracing_context.clone()
};
// Extract OpenTelemetry context for the startup actions from the spec, and
// attach it to the current tracing context.
//
@@ -179,7 +120,7 @@ fn main() -> Result<()> {
// postgres is configured and up-and-running, we exit this span. Any other
// actions that are performed on incoming HTTP requests, for example, are
// performed in separate spans.
let startup_context_guard = if let Some(ref carrier) = startup_tracing_context {
let startup_context_guard = if let Some(ref carrier) = spec.startup_tracing_context {
use opentelemetry::propagation::TextMapPropagator;
use opentelemetry::sdk::propagation::TraceContextPropagator;
Some(TraceContextPropagator::new().extract(carrier).attach())
@@ -187,10 +128,42 @@ fn main() -> Result<()> {
None
};
// Launch remaining service threads
let pageserver_connstr = spec
.cluster
.settings
.find("neon.pageserver_connstring")
.expect("pageserver connstr should be provided");
let storage_auth_token = spec.storage_auth_token.clone();
let tenant = spec
.cluster
.settings
.find("neon.tenant_id")
.expect("tenant id should be provided");
let timeline = spec
.cluster
.settings
.find("neon.timeline_id")
.expect("tenant id should be provided");
let compute_state = ComputeNode {
start_time: Utc::now(),
connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?,
pgdata: pgdata.to_string(),
pgbin: pgbin.to_string(),
spec,
tenant,
timeline,
pageserver_connstr,
storage_auth_token,
metrics: ComputeMetrics::default(),
state: RwLock::new(ComputeState::new()),
};
let compute = Arc::new(compute_state);
// Launch service threads first, so we were able to serve availability
// requests, while configuration is still in progress.
let _http_handle = launch_http_server(&compute).expect("cannot launch http endpoint thread");
let _monitor_handle = launch_monitor(&compute).expect("cannot launch compute monitor thread");
let _configurator_handle =
launch_configurator(&compute).expect("cannot launch configurator thread");
// Start Postgres
let mut delay_exit = false;
@@ -199,10 +172,10 @@ fn main() -> Result<()> {
Ok(pg) => Some(pg),
Err(err) => {
error!("could not start the compute node: {:?}", err);
let mut inner = compute.inner.lock().unwrap();
inner.state.error = Some(format!("{:?}", err));
inner.state.status = ComputeStatus::Failed;
drop(inner);
let mut state = compute.state.write().unwrap();
state.error = Some(format!("{:?}", err));
state.status = ComputeStatus::Failed;
drop(state);
delay_exit = true;
None
}
@@ -289,7 +262,7 @@ fn cli() -> clap::Command {
Arg::new("control-plane-uri")
.short('p')
.long("control-plane-uri")
.value_name("CONTROL_PLANE_API_BASE_URI"),
.value_name("CONTROL_PLANE"),
)
}

View File

@@ -19,17 +19,16 @@ use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::process::{Command, Stdio};
use std::str::FromStr;
use std::sync::{Condvar, Mutex};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::RwLock;
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use postgres::{Client, NoTls};
use serde::{Serialize, Serializer};
use tokio_postgres;
use tracing::{info, instrument, warn};
use compute_api::models::{ComputeMetrics, ComputeState, ComputeStatus};
use compute_api::spec::ComputeSpec;
use crate::checker::create_writability_check_data;
use crate::config;
use crate::pg_helpers::*;
@@ -42,92 +41,74 @@ pub struct ComputeNode {
pub connstr: url::Url,
pub pgdata: String,
pub pgbin: String,
// We only allow live re- / configuration of the compute node if
// it uses 'pull model', i.e. it can go to control-plane and fetch
// the latest configuration. Otherwise, there could be a case:
// - we start compute with some spec provided as argument
// - we push new spec and it does reconfiguration
// - but then something happens and compute pod / VM is destroyed,
// so k8s controller starts it again with the **old** spec
pub live_config_allowed: bool,
/// Volatile part of the `ComputeNode`, which should be used under `Mutex`.
/// Coupled with `Condvar` to allow notifying HTTP API and configurator
/// thread about state changes. To allow HTTP API server to serving status
/// requests, while configuration is in progress, lock should be held only
/// for short periods of time to do read/write, not the whole configuration
/// process.
pub inner: Mutex<ComputeNodeInner>,
pub state_changed: Condvar,
}
pub struct ComputeNodeInner {
pub state: ComputeState,
pub spec: Option<ParsedSpec>,
pub metrics: ComputeMetrics,
}
#[derive(Clone)]
pub struct ParsedSpec {
pub spec: ComputeSpec,
// extra fields extracted from 'spec'.
pub tenant: String,
pub timeline: String,
pub pageserver_connstr: String,
pub storage_auth_token: Option<String>,
pub metrics: ComputeMetrics,
/// Volatile part of the `ComputeNode` so should be used under `RwLock`
/// to allow HTTP API server to serve status requests, while configuration
/// is in progress.
pub state: RwLock<ComputeState>,
}
impl TryFrom<ComputeSpec> for ParsedSpec {
type Error = String;
fn rfc3339_serialize<S>(x: &DateTime<Utc>, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
x.to_rfc3339().serialize(s)
}
fn try_from(spec: ComputeSpec) -> Result<Self, String> {
let pageserver_connstr = spec
.cluster
.settings
.find("neon.pageserver_connstring")
.ok_or("pageserver connstr should be provided")?;
let storage_auth_token = spec.storage_auth_token.clone();
let tenant = spec
.cluster
.settings
.find("neon.tenant_id")
.ok_or("tenant id should be provided")?;
let timeline = spec
.cluster
.settings
.find("neon.timeline_id")
.ok_or("tenant id should be provided")?;
#[derive(Serialize)]
#[serde(rename_all = "snake_case")]
pub struct ComputeState {
pub status: ComputeStatus,
/// Timestamp of the last Postgres activity
#[serde(serialize_with = "rfc3339_serialize")]
pub last_active: DateTime<Utc>,
pub error: Option<String>,
}
Ok(ParsedSpec {
spec,
pageserver_connstr,
storage_auth_token,
tenant,
timeline,
})
impl ComputeState {
pub fn new() -> Self {
Self {
status: ComputeStatus::Init,
last_active: Utc::now(),
error: None,
}
}
}
impl Default for ComputeState {
fn default() -> Self {
Self::new()
}
}
#[derive(Serialize, Clone, Copy, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum ComputeStatus {
Init,
Running,
Failed,
}
#[derive(Default, Serialize)]
pub struct ComputeMetrics {
pub sync_safekeepers_ms: AtomicU64,
pub basebackup_ms: AtomicU64,
pub config_ms: AtomicU64,
pub total_startup_ms: AtomicU64,
}
impl ComputeNode {
pub fn set_status(&self, status: ComputeStatus) {
let mut inner = self.inner.lock().unwrap();
inner.state.status = status;
self.state_changed.notify_all();
self.state.write().unwrap().status = status;
}
pub fn get_status(&self) -> ComputeStatus {
self.inner.lock().unwrap().state.status
}
pub fn get_state(&self) -> ComputeState {
self.inner.lock().unwrap().state.clone()
}
pub fn get_metrics(&self) -> ComputeMetrics {
self.inner.lock().unwrap().metrics.clone()
self.state.read().unwrap().status
}
// Remove `pgdata` directory and create it again with right permissions.
@@ -143,15 +124,15 @@ impl ComputeNode {
// Get basebackup from the libpq connection to pageserver using `connstr` and
// unarchive it to `pgdata` directory overriding all its previous content.
#[instrument(skip(self, spec))]
fn get_basebackup(&self, spec: &ParsedSpec, lsn: &str) -> Result<()> {
#[instrument(skip(self))]
fn get_basebackup(&self, lsn: &str) -> Result<()> {
let start_time = Utc::now();
let mut config = postgres::Config::from_str(&spec.pageserver_connstr)?;
let mut config = postgres::Config::from_str(&self.pageserver_connstr)?;
// Use the storage auth token from the config file, if given.
// Note: this overrides any password set in the connection string.
if let Some(storage_auth_token) = &spec.storage_auth_token {
if let Some(storage_auth_token) = &self.storage_auth_token {
info!("Got storage auth token from spec file");
config.password(storage_auth_token);
} else {
@@ -160,8 +141,8 @@ impl ComputeNode {
let mut client = config.connect(NoTls)?;
let basebackup_cmd = match lsn {
"0/0" => format!("basebackup {} {}", &spec.tenant, &spec.timeline), // First start of the compute
_ => format!("basebackup {} {} {}", &spec.tenant, &spec.timeline, lsn),
"0/0" => format!("basebackup {} {}", &self.tenant, &self.timeline), // First start of the compute
_ => format!("basebackup {} {} {}", &self.tenant, &self.timeline, lsn),
};
let copyreader = client.copy_out(basebackup_cmd.as_str())?;
@@ -174,24 +155,28 @@ impl ComputeNode {
ar.set_ignore_zeros(true);
ar.unpack(&self.pgdata)?;
self.inner.lock().unwrap().metrics.basebackup_ms = Utc::now()
.signed_duration_since(start_time)
.to_std()
.unwrap()
.as_millis() as u64;
self.metrics.basebackup_ms.store(
Utc::now()
.signed_duration_since(start_time)
.to_std()
.unwrap()
.as_millis() as u64,
Ordering::Relaxed,
);
Ok(())
}
// Run `postgres` in a special mode with `--sync-safekeepers` argument
// and return the reported LSN back to the caller.
#[instrument(skip(self, storage_auth_token))]
fn sync_safekeepers(&self, storage_auth_token: Option<String>) -> Result<String> {
#[instrument(skip(self))]
fn sync_safekeepers(&self) -> Result<String> {
let start_time = Utc::now();
let sync_handle = Command::new(&self.pgbin)
.args(["--sync-safekeepers"])
.env("PGDATA", &self.pgdata) // we cannot use -D in this mode
.envs(if let Some(storage_auth_token) = &storage_auth_token {
.envs(if let Some(storage_auth_token) = &self.storage_auth_token {
vec![("NEON_AUTH_TOKEN", storage_auth_token)]
} else {
vec![]
@@ -216,11 +201,14 @@ impl ComputeNode {
);
}
self.inner.lock().unwrap().metrics.sync_safekeepers_ms = Utc::now()
.signed_duration_since(start_time)
.to_std()
.unwrap()
.as_millis() as u64;
self.metrics.sync_safekeepers_ms.store(
Utc::now()
.signed_duration_since(start_time)
.to_std()
.unwrap()
.as_millis() as u64,
Ordering::Relaxed,
);
let lsn = String::from(String::from_utf8(sync_output.stdout)?.trim());
@@ -229,28 +217,29 @@ impl ComputeNode {
/// Do all the preparations like PGDATA directory creation, configuration,
/// safekeepers sync, basebackup, etc.
#[instrument(skip(self, spec))]
fn prepare_pgdata(&self, spec: &ParsedSpec) -> Result<()> {
#[instrument(skip(self))]
pub fn prepare_pgdata(&self) -> Result<()> {
let spec = &self.spec;
let pgdata_path = Path::new(&self.pgdata);
// Remove/create an empty pgdata directory and put configuration there.
self.create_pgdata()?;
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec.spec)?;
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), spec)?;
info!("starting safekeepers syncing");
let lsn = self
.sync_safekeepers(spec.storage_auth_token.clone())
.sync_safekeepers()
.with_context(|| "failed to sync safekeepers")?;
info!("safekeepers synced at LSN {}", lsn);
info!(
"getting basebackup@{} from pageserver {}",
lsn, &spec.pageserver_connstr
lsn, &self.pageserver_connstr
);
self.get_basebackup(spec, &lsn).with_context(|| {
self.get_basebackup(&lsn).with_context(|| {
format!(
"failed to get basebackup@{} from pageserver {}",
lsn, &spec.pageserver_connstr
lsn, &self.pageserver_connstr
)
})?;
@@ -263,16 +252,13 @@ impl ComputeNode {
/// Start Postgres as a child process and manage DBs/roles.
/// After that this will hang waiting on the postmaster process to exit.
#[instrument(skip(self))]
pub fn start_postgres(
&self,
storage_auth_token: Option<String>,
) -> Result<std::process::Child> {
pub fn start_postgres(&self) -> Result<std::process::Child> {
let pgdata_path = Path::new(&self.pgdata);
// Run postgres as a child process.
let mut pg = Command::new(&self.pgbin)
.args(["-D", &self.pgdata])
.envs(if let Some(storage_auth_token) = &storage_auth_token {
.envs(if let Some(storage_auth_token) = &self.storage_auth_token {
vec![("NEON_AUTH_TOKEN", storage_auth_token)]
} else {
vec![]
@@ -285,9 +271,8 @@ impl ComputeNode {
Ok(pg)
}
/// Do initial configuration of the already started Postgres.
#[instrument(skip(self, spec))]
fn apply_config(&self, spec: &ParsedSpec) -> Result<()> {
#[instrument(skip(self))]
pub fn apply_config(&self) -> Result<()> {
// If connection fails,
// it may be the old node with `zenith_admin` superuser.
//
@@ -318,64 +303,19 @@ impl ComputeNode {
};
// Proceed with post-startup configuration. Note, that order of operations is important.
handle_roles(&spec.spec, &mut client)?;
handle_databases(&spec.spec, &mut client)?;
handle_role_deletions(&spec.spec, self.connstr.as_str(), &mut client)?;
handle_grants(&spec.spec, self.connstr.as_str(), &mut client)?;
handle_roles(&self.spec, &mut client)?;
handle_databases(&self.spec, &mut client)?;
handle_role_deletions(self, &mut client)?;
handle_grants(self, &mut client)?;
create_writability_check_data(&mut client)?;
handle_extensions(&spec.spec, &mut client)?;
handle_extensions(&self.spec, &mut client)?;
// 'Close' connection
drop(client);
info!(
"finished configuration of compute for project {}",
spec.spec.cluster.cluster_id
);
Ok(())
}
// We could've wrapped this around `pg_ctl reload`, but right now we don't use
// `pg_ctl` for start / stop, so this just seems much easier to do as we already
// have opened connection to Postgres and superuser access.
#[instrument(skip(self, client))]
fn pg_reload_conf(&self, client: &mut Client) -> Result<()> {
client.simple_query("SELECT pg_reload_conf()")?;
Ok(())
}
/// Similar to `apply_config()`, but does a bit different sequence of operations,
/// as it's used to reconfigure a previously started and configured Postgres node.
#[instrument(skip(self))]
pub fn reconfigure(&self) -> Result<()> {
let spec = {
let inner = self.inner.lock().unwrap();
inner.spec.as_ref().expect("cannot start_compute without spec").spec.clone()
};
// Write new config
let pgdata_path = Path::new(&self.pgdata);
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec)?;
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
self.pg_reload_conf(&mut client)?;
// Proceed with post-startup configuration. Note, that order of operations is important.
handle_roles(&spec, &mut client)?;
handle_databases(&spec, &mut client)?;
handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
handle_grants(&spec, self.connstr.as_str(), &mut client)?;
handle_extensions(&spec, &mut client)?;
// 'Close' connection
drop(client);
let unknown_op = "unknown".to_string();
let op_id = spec.operation_uuid.as_ref().unwrap_or(&unknown_op);
info!(
"finished reconfiguration of compute node for operation {}",
op_id
self.spec.cluster.cluster_id
);
Ok(())
@@ -383,44 +323,40 @@ impl ComputeNode {
#[instrument(skip(self))]
pub fn start_compute(&self) -> Result<std::process::Child> {
let spec = self
.inner
.lock()
.unwrap()
.spec
.as_ref()
.expect("cannot start_compute without spec")
.clone();
info!(
"starting compute for project {}, operation {}, tenant {}, timeline {}",
spec.spec.cluster.cluster_id,
spec.spec.operation_uuid.as_ref().unwrap(),
spec.tenant,
spec.timeline,
self.spec.cluster.cluster_id,
self.spec.operation_uuid.as_ref().unwrap(),
self.tenant,
self.timeline,
);
self.prepare_pgdata(&spec)?;
self.prepare_pgdata()?;
let start_time = Utc::now();
let pg = self.start_postgres(spec.storage_auth_token.clone())?;
let pg = self.start_postgres()?;
self.apply_config(&spec)?;
self.apply_config()?;
let startup_end_time = Utc::now();
{
let mut inner = self.inner.lock().unwrap();
inner.metrics.config_ms = startup_end_time
self.metrics.config_ms.store(
startup_end_time
.signed_duration_since(start_time)
.to_std()
.unwrap()
.as_millis() as u64;
inner.metrics.total_startup_ms = startup_end_time
.as_millis() as u64,
Ordering::Relaxed,
);
self.metrics.total_startup_ms.store(
startup_end_time
.signed_duration_since(self.start_time)
.to_std()
.unwrap()
.as_millis() as u64;
}
.as_millis() as u64,
Ordering::Relaxed,
);
self.set_status(ComputeStatus::Running);
Ok(pg)

View File

@@ -6,7 +6,7 @@ use std::path::Path;
use anyhow::Result;
use crate::pg_helpers::PgOptionsSerialize;
use compute_api::spec::ComputeSpec;
use crate::spec::ComputeSpec;
/// Check that `line` is inside a text file and put it there if it is not.
/// Create file if it doesn't exist.

View File

@@ -1,53 +0,0 @@
use std::sync::Arc;
use std::thread;
use anyhow::Result;
use tracing::{error, info, instrument};
use crate::compute::ComputeNode;
use compute_api::models::ComputeStatus;
#[instrument(skip(compute))]
fn configurator_main_loop(compute: &Arc<ComputeNode>) {
info!("waiting for reconfiguration requests");
loop {
let inner = compute.inner.lock().unwrap();
let mut inner = compute.state_changed.wait(inner).unwrap();
if inner.state.status == ComputeStatus::ConfigurationPending {
info!("got configuration request");
inner.state.status = ComputeStatus::Configuration;
compute.state_changed.notify_all();
drop(inner);
let mut new_status = ComputeStatus::Failed;
if let Err(e) = compute.reconfigure() {
error!("could not configure compute node: {}", e);
} else {
new_status = ComputeStatus::Running;
info!("compute node configured");
}
// XXX: used to test that API is blocking
// std::thread::sleep(std::time::Duration::from_millis(2000));
compute.set_status(new_status);
} else if inner.state.status == ComputeStatus::Failed {
info!("compute node is now in Failed state, exiting");
break;
} else {
info!("woken up for compute status: {:?}, sleeping", inner.state.status);
}
}
}
pub fn launch_configurator(compute: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
let compute = Arc::clone(compute);
Ok(thread::Builder::new()
.name("compute-configurator".into())
.spawn(move || {
configurator_main_loop(&compute);
info!("configurator thread is exited");
})?)
}

View File

@@ -3,10 +3,7 @@ use std::net::SocketAddr;
use std::sync::Arc;
use std::thread;
use crate::compute::{ComputeNode, ParsedSpec};
use crate::http::models::{ConfigurationRequest, GenericAPIError};
use compute_api::models::ComputeStatus;
use crate::compute::ComputeNode;
use anyhow::Result;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
@@ -15,44 +12,6 @@ use serde_json;
use tracing::{error, info};
use tracing_utils::http::OtelName;
async fn handle_spec_request(req: Request<Body>, compute: &Arc<ComputeNode>) -> Result<(), (String, StatusCode)> {
if !compute.live_config_allowed {
return Err(("live reconfiguration is not allowed for this compute node".to_string(), StatusCode::PRECONDITION_FAILED));
}
let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap();
let spec_raw = String::from_utf8(body_bytes.to_vec()).unwrap();
let request = serde_json::from_str::<ConfigurationRequest>(&spec_raw)
.map_err(|err| (format!("could not parse request json: {err}"), StatusCode::BAD_REQUEST))?;
let spec = ParsedSpec::try_from(request.spec)
.map_err(|err| (format!("could not parse spec: {err}"), StatusCode::BAD_REQUEST))?;
let mut inner = compute.inner.lock().unwrap();
if !(inner.state.status == ComputeStatus::Empty
|| inner.state.status == ComputeStatus::Running)
{
return Err((format!(
"invalid compute status for reconfiguration request: {}",
serde_json::to_string(&inner.state).unwrap()
), StatusCode::PRECONDITION_FAILED));
}
inner.spec = Some(spec);
inner.state.status = ComputeStatus::ConfigurationPending;
compute.state_changed.notify_all();
info!("set new spec and notified configurator");
while inner.state.status != ComputeStatus::Running {
inner = compute.state_changed.wait(inner).unwrap();
info!(
"waiting for compute to become Running, current status: {:?}",
inner.state.status
);
}
drop(inner);
Ok(())
}
// Service function to handle all available routes.
async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body> {
//
@@ -64,44 +23,26 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
// Serialized compute state.
(&Method::GET, "/status") => {
info!("serving /status GET request");
let state = compute.get_state();
Response::new(Body::from(serde_json::to_string(&state).unwrap()))
let state = compute.state.read().unwrap();
Response::new(Body::from(serde_json::to_string(&*state).unwrap()))
}
// Startup metrics in JSON format. Keep /metrics reserved for a possible
// future use for Prometheus metrics format.
(&Method::GET, "/metrics.json") => {
info!("serving /metrics.json GET request");
let metrics = compute.get_metrics();
Response::new(Body::from(serde_json::to_string(&metrics).unwrap()))
Response::new(Body::from(serde_json::to_string(&compute.metrics).unwrap()))
}
// Collect Postgres current usage insights
(&Method::GET, "/insights") => {
info!("serving /insights GET request");
let status = compute.get_status();
if status != ComputeStatus::Running {
let msg = format!("compute is not running, current status: {:?}", status);
error!(msg);
return Response::new(Body::from(msg));
}
let insights = compute.collect_insights().await;
Response::new(Body::from(insights))
}
(&Method::POST, "/check_writability") => {
info!("serving /check_writability POST request");
let status = compute.get_status();
if status != ComputeStatus::Running {
let msg = format!(
"invalid compute status for check_writability request: {:?}",
status
);
error!(msg);
return Response::new(Body::from(msg));
}
let res = crate::checker::check_writability(compute).await;
match res {
Ok(_) => Response::new(Body::from("true")),
@@ -120,24 +61,6 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
))
}
// Accept spec in JSON format and request compute configuration from
// the configurator thread. If anything goes wrong after we set the
// compute state to `ConfigurationPending` and / or sent spec to the
// configurator thread, we basically leave compute in the potentially
// wrong state. That said, it's control-plane's responsibility to
// watch compute state after reconfiguration request and to clean
// restart in case of errors.
(&Method::POST, "/configure") => {
info!("serving /configure POST request");
match handle_spec_request(req, compute).await {
Ok(()) => Response::new(Body::from("ok")),
Err((msg, code) ) => {
error!("error handling /spec request: {msg}");
render_json_error(&msg, code)
}
}
}
// Return the `404 Not Found` for any other routes.
_ => {
let mut not_found = Response::new(Body::from("404 Not Found"));
@@ -147,16 +70,6 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
}
}
fn render_json_error(e: &str, status: StatusCode) -> Response<Body> {
let error = GenericAPIError {
error: e.to_string(),
};
Response::builder()
.status(status)
.body(Body::from(serde_json::to_string(&error).unwrap()))
.unwrap()
}
// Main Hyper HTTP server function that runs it and blocks waiting on it forever.
#[tokio::main]
async fn serve(state: Arc<ComputeNode>) {
@@ -197,6 +110,7 @@ async fn serve(state: Arc<ComputeNode>) {
/// Launch a separate Hyper HTTP API server thread and return its `JoinHandle`.
pub fn launch_http_server(state: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
let state = Arc::clone(state);
Ok(thread::Builder::new()
.name("http-endpoint".into())
.spawn(move || serve(state))?)

View File

@@ -1,2 +1 @@
pub mod api;
pub mod models;

View File

@@ -1,16 +0,0 @@
use serde::{Deserialize, Serialize};
use compute_api::spec::ComputeSpec;
/// We now pass only `spec` in the configuration request, but later we can
/// extend it and something like `restart: bool` or something else. So put
/// `spec` into a struct initially to be more flexible in the future.
#[derive(Deserialize, Debug)]
pub struct ConfigurationRequest {
pub spec: ComputeSpec,
}
#[derive(Serialize, Debug)]
pub struct GenericAPIError {
pub error: String,
}

View File

@@ -11,7 +11,7 @@ paths:
get:
tags:
- Info
summary: Get compute node internal status.
summary: Get compute node internal status
description: ""
operationId: getComputeStatus
responses:
@@ -26,7 +26,7 @@ paths:
get:
tags:
- Info
summary: Get compute node startup metrics in JSON format.
summary: Get compute node startup metrics in JSON format
description: ""
operationId: getComputeMetricsJSON
responses:
@@ -41,9 +41,9 @@ paths:
get:
tags:
- Info
summary: Get current compute insights in JSON format.
summary: Get current compute insights in JSON format
description: |
Note, that this doesn't include any historical data.
Note, that this doesn't include any historical data
operationId: getComputeInsights
responses:
200:
@@ -56,12 +56,12 @@ paths:
/info:
get:
tags:
- Info
summary: Get info about the compute pod / VM.
- "info"
summary: Get info about the compute Pod/VM
description: ""
operationId: getInfo
responses:
200:
"200":
description: Info
content:
application/json:
@@ -72,7 +72,7 @@ paths:
post:
tags:
- Check
summary: Check that we can write new data on this compute.
summary: Check that we can write new data on this compute
description: ""
operationId: checkComputeWritability
responses:
@@ -82,57 +82,9 @@ paths:
text/plain:
schema:
type: string
description: Error text or 'true' if check passed.
description: Error text or 'true' if check passed
example: "true"
/configure:
post:
tags:
- Configure
summary: Request compute node configuration.
description: |
This is a blocking API endpoint, i.e. it blocks waiting until
compute is finished configuration and is in `Running` state.
Optional non-blocking mode could be added later. Currently,
it's also assumed that reconfiguration doesn't require restart.
operationId: configureCompute
requestBody:
description: Configuration request.
required: true
content:
application/json:
schema:
type: object
required:
- spec
properties:
spec:
# XXX: I don't want to explain current spec in the OpenAPI format,
# as it could be changed really soon. Consider doing it later.
type: object
responses:
200:
description: Compute configuration finished.
content:
application/json:
schema:
$ref: "#/components/schemas/ComputeState"
400:
description: Provided spec is invalid.
content:
application/json:
schema:
$ref: "#/components/schemas/GenericError"
412:
description: |
It's not possible to do live-configuration of the compute.
It's either in the wrong state, or compute doesn't use pull
mode of configuration.
content:
application/json:
schema:
$ref: "#/components/schemas/GenericError"
components:
securitySchemes:
JWT:
@@ -143,7 +95,7 @@ components:
schemas:
ComputeMetrics:
type: object
description: Compute startup metrics.
description: Compute startup metrics
required:
- sync_safekeepers_ms
- basebackup_ms
@@ -161,7 +113,7 @@ components:
Info:
type: object
description: Information about VM/Pod.
description: Information about VM/Pod
required:
- num_cpus
properties:
@@ -178,26 +130,17 @@ components:
$ref: '#/components/schemas/ComputeStatus'
last_active:
type: string
description: The last detected compute activity timestamp in UTC and RFC3339 format.
description: The last detected compute activity timestamp in UTC and RFC3339 format
example: "2022-10-12T07:20:50.52Z"
error:
type: string
description: Text of the error during compute startup, if any.
example: ""
tenant:
type: string
description: Identifier of the current tenant served by compute node, if any.
example: c9269c359e9a199fad1ea0981246a78f
timeline:
type: string
description: Identifier of the current timeline served by compute node, if any.
example: ece7de74d4b8cbe5433a68ce4d1b97b4
description: Text of the error during compute startup, if any
ComputeInsights:
type: object
properties:
pg_stat_statements:
description: Contains raw output from pg_stat_statements in JSON format.
description: Contains raw output from pg_stat_statements in JSON format
type: array
items:
type: object
@@ -208,19 +151,6 @@ components:
- init
- failed
- running
example: running
#
# Errors
#
GenericError:
type: object
required:
- error
properties:
error:
type: string
security:
- JWT: []

View File

@@ -4,7 +4,6 @@
//!
pub mod checker;
pub mod config;
pub mod configurator;
pub mod http;
#[macro_use]
pub mod logger;

View File

@@ -46,7 +46,7 @@ fn watch_compute_activity(compute: &ComputeNode) {
AND usename != 'cloud_admin';", // XXX: find a better way to filter other monitors?
&[],
);
let mut last_active = compute.inner.lock().unwrap().state.last_active;
let mut last_active = compute.state.read().unwrap().last_active;
if let Ok(backs) = backends {
let mut idle_backs: Vec<DateTime<Utc>> = vec![];
@@ -87,9 +87,9 @@ fn watch_compute_activity(compute: &ComputeNode) {
}
// Update the last activity in the shared state if we got a more recent one.
let mut inner = compute.inner.lock().unwrap();
if last_active > inner.state.last_active {
inner.state.last_active = last_active;
let mut state = compute.state.write().unwrap();
if last_active > state.last_active {
state.last_active = last_active;
debug!("set the last compute activity time to: {}", last_active);
}
}

View File

@@ -10,12 +10,43 @@ use std::time::{Duration, Instant};
use anyhow::{bail, Result};
use notify::{RecursiveMode, Watcher};
use postgres::{Client, Transaction};
use serde::Deserialize;
use tracing::{debug, instrument};
use compute_api::spec::{Database, GenericOption, GenericOptions, PgIdent, Role};
const POSTGRES_WAIT_TIMEOUT: Duration = Duration::from_millis(60 * 1000); // milliseconds
/// Rust representation of Postgres role info with only those fields
/// that matter for us.
#[derive(Clone, Deserialize)]
pub struct Role {
pub name: PgIdent,
pub encrypted_password: Option<String>,
pub options: GenericOptions,
}
/// Rust representation of Postgres database info with only those fields
/// that matter for us.
#[derive(Clone, Deserialize)]
pub struct Database {
pub name: PgIdent,
pub owner: PgIdent,
pub options: GenericOptions,
}
/// Common type representing both SQL statement params with or without value,
/// like `LOGIN` or `OWNER username` in the `CREATE/ALTER ROLE`, and config
/// options like `wal_level = logical`.
#[derive(Clone, Deserialize)]
pub struct GenericOption {
pub name: String,
pub value: Option<String>,
pub vartype: String,
}
/// Optional collection of `GenericOption`'s. Type alias allows us to
/// declare a `trait` on it.
pub type GenericOptions = Option<Vec<GenericOption>>;
/// Escape a string for including it in a SQL literal
fn escape_literal(s: &str) -> String {
s.replace('\'', "''").replace('\\', "\\\\")
@@ -27,14 +58,9 @@ fn escape_conf_value(s: &str) -> String {
s.replace('\'', "''").replace('\\', "\\\\")
}
trait GenericOptionExt {
fn to_pg_option(&self) -> String;
fn to_pg_setting(&self) -> String;
}
impl GenericOptionExt for GenericOption {
impl GenericOption {
/// Represent `GenericOption` as SQL statement parameter.
fn to_pg_option(&self) -> String {
pub fn to_pg_option(&self) -> String {
if let Some(val) = &self.value {
match self.vartype.as_ref() {
"string" => format!("{} '{}'", self.name, escape_literal(val)),
@@ -46,7 +72,7 @@ impl GenericOptionExt for GenericOption {
}
/// Represent `GenericOption` as configuration option.
fn to_pg_setting(&self) -> String {
pub fn to_pg_setting(&self) -> String {
if let Some(val) = &self.value {
match self.vartype.as_ref() {
"string" => format!("{} = '{}'", self.name, escape_conf_value(val)),
@@ -105,14 +131,10 @@ impl GenericOptionsSearch for GenericOptions {
}
}
pub trait RoleExt {
fn to_pg_options(&self) -> String;
}
impl RoleExt for Role {
impl Role {
/// Serialize a list of role parameters into a Postgres-acceptable
/// string of arguments.
fn to_pg_options(&self) -> String {
pub fn to_pg_options(&self) -> String {
// XXX: consider putting LOGIN as a default option somewhere higher, e.g. in control-plane.
// For now, we do not use generic `options` for roles. Once used, add
// `self.options.as_pg_options()` somewhere here.
@@ -137,17 +159,21 @@ impl RoleExt for Role {
}
}
pub trait DatabaseExt {
fn to_pg_options(&self) -> String;
}
impl Database {
pub fn new(name: PgIdent, owner: PgIdent) -> Self {
Self {
name,
owner,
options: None,
}
}
impl DatabaseExt for Database {
/// Serialize a list of database parameters into a Postgres-acceptable
/// string of arguments.
/// NB: `TEMPLATE` is actually also an identifier, but so far we only need
/// to use `template0` and `template1`, so it is not a problem. Yet in the future
/// it may require a proper quoting too.
fn to_pg_options(&self) -> String {
pub fn to_pg_options(&self) -> String {
let mut params: String = self.options.as_pg_options();
write!(params, " OWNER {}", &self.owner.pg_quote())
.expect("String is documented to not to error during write operations");
@@ -156,6 +182,10 @@ impl DatabaseExt for Database {
}
}
/// String type alias representing Postgres identifier and
/// intended to be used for DB / role names.
pub type PgIdent = String;
/// Generic trait used to provide quoting / encoding for strings used in the
/// Postgres SQL queries and DATABASE_URL.
pub trait Escaping {
@@ -196,11 +226,7 @@ pub fn get_existing_dbs(client: &mut Client) -> Result<Vec<Database>> {
&[],
)?
.iter()
.map(|row| Database {
name: row.get("datname"),
owner: row.get("owner"),
options: None,
})
.map(|row| Database::new(row.get("datname"), row.get("owner")))
.collect();
Ok(postgres_dbs)

View File

@@ -1,38 +1,57 @@
use std::collections::HashMap;
use std::path::Path;
use std::str::FromStr;
use anyhow::Result;
use postgres::config::Config;
use postgres::{Client, NoTls};
use serde::Deserialize;
use tracing::{info, info_span, instrument, span_enabled, warn, Level};
use crate::compute::ComputeNode;
use crate::config;
use crate::params::PG_HBA_ALL_MD5;
use crate::pg_helpers::*;
use compute_api::spec::{ComputeSpec, Database, PgIdent, Role};
/// Cluster spec or configuration represented as an optional number of
/// delta operations + final cluster state description.
#[derive(Clone, Deserialize)]
pub struct ComputeSpec {
pub format_version: f32,
pub timestamp: String,
pub operation_uuid: Option<String>,
/// Expected cluster state at the end of transition process.
pub cluster: Cluster,
pub delta_operations: Option<Vec<DeltaOp>>,
/// Request spec from the control-plane by compute_id. If `NEON_CONSOLE_JWT`
/// env variable is set, it will be used for authorization.
pub fn get_spec_from_control_plane(base_uri: &str, compute_id: &str) -> Result<ComputeSpec> {
let cp_uri = format!("{base_uri}/management/api/v2/computes/{compute_id}/spec");
let jwt: String = match std::env::var("NEON_CONSOLE_JWT") {
Ok(v) => v,
Err(_) => "".to_string(),
};
info!("getting spec from control plane: {}", cp_uri);
pub storage_auth_token: Option<String>,
// TODO: check the response. We should distinguish cases when it's
// - network error, then retry
// - no spec for compute yet, then wait
// - compute id is unknown or any other error, then bail out
let spec = reqwest::blocking::Client::new()
.get(cp_uri)
.header("Authorization", jwt)
.send()?
.json()?;
pub startup_tracing_context: Option<HashMap<String, String>>,
}
Ok(spec)
/// Cluster state seen from the perspective of the external tools
/// like Rails web console.
#[derive(Clone, Deserialize)]
pub struct Cluster {
pub cluster_id: String,
pub name: String,
pub state: Option<String>,
pub roles: Vec<Role>,
pub databases: Vec<Database>,
pub settings: GenericOptions,
}
/// Single cluster state changing operation that could not be represented as
/// a static `Cluster` structure. For example:
/// - DROP DATABASE
/// - DROP ROLE
/// - ALTER ROLE name RENAME TO new_name
/// - ALTER DATABASE name RENAME TO new_name
#[derive(Clone, Deserialize)]
pub struct DeltaOp {
pub action: String,
pub name: PgIdent,
pub new_name: Option<PgIdent>,
}
/// It takes cluster specification and does the following:
@@ -207,8 +226,8 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
/// Reassign all dependent objects and delete requested roles.
#[instrument(skip_all)]
pub fn handle_role_deletions(spec: &ComputeSpec, connstr: &str, client: &mut Client) -> Result<()> {
if let Some(ops) = &spec.delta_operations {
pub fn handle_role_deletions(node: &ComputeNode, client: &mut Client) -> Result<()> {
if let Some(ops) = &node.spec.delta_operations {
// First, reassign all dependent objects to db owners.
info!("reassigning dependent objects of to-be-deleted roles");
@@ -225,7 +244,7 @@ pub fn handle_role_deletions(spec: &ComputeSpec, connstr: &str, client: &mut Cli
// Check that role is still present in Postgres, as this could be a
// restart with the same spec after role deletion.
if op.action == "delete_role" && existing_roles.iter().any(|r| r.name == op.name) {
reassign_owned_objects(spec, connstr, &op.name)?;
reassign_owned_objects(node, &op.name)?;
}
}
@@ -249,10 +268,10 @@ pub fn handle_role_deletions(spec: &ComputeSpec, connstr: &str, client: &mut Cli
}
// Reassign all owned objects in all databases to the owner of the database.
fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent) -> Result<()> {
for db in &spec.cluster.databases {
fn reassign_owned_objects(node: &ComputeNode, role_name: &PgIdent) -> Result<()> {
for db in &node.spec.cluster.databases {
if db.owner != *role_name {
let mut conf = Config::from_str(connstr)?;
let mut conf = Config::from_str(node.connstr.as_str())?;
conf.dbname(&db.name);
let mut client = conf.connect(NoTls)?;
@@ -397,7 +416,9 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
/// Grant CREATE ON DATABASE to the database owner and do some other alters and grants
/// to allow users creating trusted extensions and re-creating `public` schema, for example.
#[instrument(skip_all)]
pub fn handle_grants(spec: &ComputeSpec, connstr: &str, client: &mut Client) -> Result<()> {
pub fn handle_grants(node: &ComputeNode, client: &mut Client) -> Result<()> {
let spec = &node.spec;
info!("cluster spec grants:");
// We now have a separate `web_access` role to connect to the database
@@ -429,8 +450,8 @@ pub fn handle_grants(spec: &ComputeSpec, connstr: &str, client: &mut Client) ->
// Do some per-database access adjustments. We'd better do this at db creation time,
// but CREATE DATABASE isn't transactional. So we cannot create db + do some grants
// atomically.
for db in &spec.cluster.databases {
let mut conf = Config::from_str(connstr)?;
for db in &node.spec.cluster.databases {
let mut conf = Config::from_str(node.connstr.as_str())?;
conf.dbname(&db.name);
let mut db_client = conf.connect(NoTls)?;

View File

@@ -1,13 +1,14 @@
#[cfg(test)]
mod pg_helpers_tests {
use std::fs::File;
use compute_api::spec::{ComputeSpec, GenericOption, GenericOptions, PgIdent};
use compute_tools::pg_helpers::*;
use compute_tools::spec::ComputeSpec;
#[test]
fn params_serialize() {
let file = File::open("../libs/compute_api/tests/cluster_spec.json").unwrap();
let file = File::open("tests/cluster_spec.json").unwrap();
let spec: ComputeSpec = serde_json::from_reader(file).unwrap();
assert_eq!(
@@ -22,7 +23,7 @@ mod pg_helpers_tests {
#[test]
fn settings_serialize() {
let file = File::open("../libs/compute_api/tests/cluster_spec.json").unwrap();
let file = File::open("tests/cluster_spec.json").unwrap();
let spec: ComputeSpec = serde_json::from_reader(file).unwrap();
assert_eq!(

View File

@@ -7,7 +7,7 @@
//!
use anyhow::{anyhow, bail, Context, Result};
use clap::{value_parser, Arg, ArgAction, ArgMatches, Command};
use control_plane::endpoint::ComputeControlPlane;
use control_plane::compute::ComputeControlPlane;
use control_plane::local_env::LocalEnv;
use control_plane::pageserver::PageServerNode;
use control_plane::safekeeper::SafekeeperNode;
@@ -106,8 +106,8 @@ fn main() -> Result<()> {
"start" => handle_start_all(sub_args, &env),
"stop" => handle_stop_all(sub_args, &env),
"pageserver" => handle_pageserver(sub_args, &env),
"pg" => handle_pg(sub_args, &env),
"safekeeper" => handle_safekeeper(sub_args, &env),
"endpoint" => handle_endpoint(sub_args, &env),
_ => bail!("unexpected subcommand {sub_name}"),
};
@@ -470,10 +470,10 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
let mut cplane = ComputeControlPlane::load(env.clone())?;
println!("Importing timeline into pageserver ...");
pageserver.timeline_import(tenant_id, timeline_id, base, pg_wal, pg_version)?;
println!("Creating node for imported timeline ...");
env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?;
println!("Creating endpoint for imported timeline ...");
cplane.new_endpoint(tenant_id, name, timeline_id, None, None, pg_version)?;
cplane.new_node(tenant_id, name, timeline_id, None, None, pg_version)?;
println!("Done");
}
Some(("branch", branch_match)) => {
@@ -521,10 +521,10 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
Ok(())
}
fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
let (sub_name, sub_args) = match ep_match.subcommand() {
Some(ep_subcommand_data) => ep_subcommand_data,
None => bail!("no endpoint subcommand provided"),
fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
let (sub_name, sub_args) = match pg_match.subcommand() {
Some(pg_subcommand_data) => pg_subcommand_data,
None => bail!("no pg subcommand provided"),
};
let mut cplane = ComputeControlPlane::load(env.clone())?;
@@ -546,7 +546,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
table.load_preset(comfy_table::presets::NOTHING);
table.set_header([
"ENDPOINT",
"NODE",
"ADDRESS",
"TIMELINE",
"BRANCH NAME",
@@ -554,39 +554,39 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
"STATUS",
]);
for (endpoint_id, endpoint) in cplane
.endpoints
for ((_, node_name), node) in cplane
.nodes
.iter()
.filter(|(_, endpoint)| endpoint.tenant_id == tenant_id)
.filter(|((node_tenant_id, _), _)| node_tenant_id == &tenant_id)
{
let lsn_str = match endpoint.lsn {
let lsn_str = match node.lsn {
None => {
// -> primary endpoint
// -> primary node
// Use the LSN at the end of the timeline.
timeline_infos
.get(&endpoint.timeline_id)
.get(&node.timeline_id)
.map(|bi| bi.last_record_lsn.to_string())
.unwrap_or_else(|| "?".to_string())
}
Some(lsn) => {
// -> read-only endpoint
// Use the endpoint's LSN.
// -> read-only node
// Use the node's LSN.
lsn.to_string()
}
};
let branch_name = timeline_name_mappings
.get(&TenantTimelineId::new(tenant_id, endpoint.timeline_id))
.get(&TenantTimelineId::new(tenant_id, node.timeline_id))
.map(|name| name.as_str())
.unwrap_or("?");
table.add_row([
endpoint_id.as_str(),
&endpoint.address.to_string(),
&endpoint.timeline_id.to_string(),
node_name.as_str(),
&node.address.to_string(),
&node.timeline_id.to_string(),
branch_name,
lsn_str.as_str(),
endpoint.status(),
node.status(),
]);
}
@@ -597,10 +597,10 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.get_one::<String>("branch-name")
.map(|s| s.as_str())
.unwrap_or(DEFAULT_BRANCH_NAME);
let endpoint_id = sub_args
.get_one::<String>("endpoint_id")
.map(String::to_string)
.unwrap_or_else(|| format!("ep-{branch_name}"));
let node_name = sub_args
.get_one::<String>("node")
.map(|node_name| node_name.to_string())
.unwrap_or_else(|| format!("{branch_name}_node"));
let lsn = sub_args
.get_one::<String>("lsn")
@@ -618,15 +618,15 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.copied()
.context("Failed to parse postgres version from the argument string")?;
cplane.new_endpoint(tenant_id, &endpoint_id, timeline_id, lsn, port, pg_version)?;
cplane.new_node(tenant_id, &node_name, timeline_id, lsn, port, pg_version)?;
}
"start" => {
let port: Option<u16> = sub_args.get_one::<u16>("port").copied();
let endpoint_id = sub_args
.get_one::<String>("endpoint_id")
.ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?;
let node_name = sub_args
.get_one::<String>("node")
.ok_or_else(|| anyhow!("No node name was provided to start"))?;
let endpoint = cplane.endpoints.get(endpoint_id.as_str());
let node = cplane.nodes.get(&(tenant_id, node_name.to_string()));
let auth_token = if matches!(env.pageserver.pg_auth_type, AuthType::NeonJWT) {
let claims = Claims::new(Some(tenant_id), Scope::Tenant);
@@ -636,9 +636,9 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
None
};
if let Some(endpoint) = endpoint {
println!("Starting existing endpoint {endpoint_id}...");
endpoint.start(&auth_token)?;
if let Some(node) = node {
println!("Starting existing postgres {node_name}...");
node.start(&auth_token)?;
} else {
let branch_name = sub_args
.get_one::<String>("branch-name")
@@ -663,33 +663,27 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
// start --port X
// stop
// start <-- will also use port X even without explicit port argument
println!("Starting new endpoint {endpoint_id} (PostgreSQL v{pg_version}) on timeline {timeline_id} ...");
println!("Starting new postgres (v{pg_version}) {node_name} on timeline {timeline_id} ...");
let ep = cplane.new_endpoint(
tenant_id,
endpoint_id,
timeline_id,
lsn,
port,
pg_version,
)?;
ep.start(&auth_token)?;
let node =
cplane.new_node(tenant_id, node_name, timeline_id, lsn, port, pg_version)?;
node.start(&auth_token)?;
}
}
"stop" => {
let endpoint_id = sub_args
.get_one::<String>("endpoint_id")
.ok_or_else(|| anyhow!("No endpoint ID was provided to stop"))?;
let node_name = sub_args
.get_one::<String>("node")
.ok_or_else(|| anyhow!("No node name was provided to stop"))?;
let destroy = sub_args.get_flag("destroy");
let endpoint = cplane
.endpoints
.get(endpoint_id.as_str())
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
endpoint.stop(destroy)?;
let node = cplane
.nodes
.get(&(tenant_id, node_name.to_string()))
.with_context(|| format!("postgres {node_name} is not found"))?;
node.stop(destroy)?;
}
_ => bail!("Unexpected endpoint subcommand '{sub_name}'"),
_ => bail!("Unexpected pg subcommand '{sub_name}'"),
}
Ok(())
@@ -808,7 +802,7 @@ fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
}
fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow::Result<()> {
// Endpoints are not started automatically
// Postgres nodes are not started automatically
broker::start_broker_process(env)?;
@@ -842,10 +836,10 @@ fn handle_stop_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<
fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
let pageserver = PageServerNode::from_env(env);
// Stop all endpoints
// Stop all compute nodes
match ComputeControlPlane::load(env.clone()) {
Ok(cplane) => {
for (_k, node) in cplane.endpoints {
for (_k, node) in cplane.nodes {
if let Err(e) = node.stop(false) {
eprintln!("postgres stop failed: {e:#}");
}
@@ -878,9 +872,7 @@ fn cli() -> Command {
.help("Name of the branch to be created or used as an alias for other services")
.required(false);
let endpoint_id_arg = Arg::new("endpoint_id")
.help("Postgres endpoint id")
.required(false);
let pg_node_arg = Arg::new("node").help("Postgres node name").required(false);
let safekeeper_id_arg = Arg::new("id").help("safekeeper id").required(false);
@@ -1034,27 +1026,27 @@ fn cli() -> Command {
)
)
.subcommand(
Command::new("endpoint")
Command::new("pg")
.arg_required_else_help(true)
.about("Manage postgres instances")
.subcommand(Command::new("list").arg(tenant_id_arg.clone()))
.subcommand(Command::new("create")
.about("Create a compute endpoint")
.arg(endpoint_id_arg.clone())
.about("Create a postgres compute node")
.arg(pg_node_arg.clone())
.arg(branch_name_arg.clone())
.arg(tenant_id_arg.clone())
.arg(lsn_arg.clone())
.arg(port_arg.clone())
.arg(
Arg::new("config-only")
.help("Don't do basebackup, create endpoint directory with only config files")
.help("Don't do basebackup, create compute node with only config files")
.long("config-only")
.required(false))
.arg(pg_version_arg.clone())
)
.subcommand(Command::new("start")
.about("Start postgres.\n If the endpoint doesn't exist yet, it is created.")
.arg(endpoint_id_arg.clone())
.about("Start a postgres compute node.\n This command actually creates new node from scratch, but preserves existing config files")
.arg(pg_node_arg.clone())
.arg(tenant_id_arg.clone())
.arg(branch_name_arg)
.arg(timeline_id_arg)
@@ -1064,7 +1056,7 @@ fn cli() -> Command {
)
.subcommand(
Command::new("stop")
.arg(endpoint_id_arg)
.arg(pg_node_arg)
.arg(tenant_id_arg)
.arg(
Arg::new("destroy")

View File

@@ -25,45 +25,54 @@ use crate::postgresql_conf::PostgresConf;
//
pub struct ComputeControlPlane {
base_port: u16,
// endpoint ID is the key
pub endpoints: BTreeMap<String, Arc<Endpoint>>,
env: LocalEnv,
pageserver: Arc<PageServerNode>,
pub nodes: BTreeMap<(TenantId, String), Arc<PostgresNode>>,
env: LocalEnv,
}
impl ComputeControlPlane {
// Load current endpoints from the endpoints/ subdirectories
// Load current nodes with ports from data directories on disk
// Directory structure has the following layout:
// pgdatadirs
// |- tenants
// | |- <tenant_id>
// | | |- <node name>
pub fn load(env: LocalEnv) -> Result<ComputeControlPlane> {
let pageserver = Arc::new(PageServerNode::from_env(&env));
let mut endpoints = BTreeMap::default();
for endpoint_dir in fs::read_dir(env.endpoints_path())
.with_context(|| format!("failed to list {}", env.endpoints_path().display()))?
let mut nodes = BTreeMap::default();
let pgdatadirspath = &env.pg_data_dirs_path();
for tenant_dir in fs::read_dir(pgdatadirspath)
.with_context(|| format!("failed to list {}", pgdatadirspath.display()))?
{
let ep = Endpoint::from_dir_entry(endpoint_dir?, &env, &pageserver)?;
endpoints.insert(ep.name.clone(), Arc::new(ep));
let tenant_dir = tenant_dir?;
for timeline_dir in fs::read_dir(tenant_dir.path())
.with_context(|| format!("failed to list {}", tenant_dir.path().display()))?
{
let node = PostgresNode::from_dir_entry(timeline_dir?, &env, &pageserver)?;
nodes.insert((node.tenant_id, node.name.clone()), Arc::new(node));
}
}
Ok(ComputeControlPlane {
base_port: 55431,
endpoints,
env,
pageserver,
nodes,
env,
})
}
fn get_port(&mut self) -> u16 {
1 + self
.endpoints
.nodes
.values()
.map(|ep| ep.address.port())
.map(|node| node.address.port())
.max()
.unwrap_or(self.base_port)
}
pub fn new_endpoint(
pub fn new_node(
&mut self,
tenant_id: TenantId,
name: &str,
@@ -71,9 +80,9 @@ impl ComputeControlPlane {
lsn: Option<Lsn>,
port: Option<u16>,
pg_version: u32,
) -> Result<Arc<Endpoint>> {
) -> Result<Arc<PostgresNode>> {
let port = port.unwrap_or_else(|| self.get_port());
let ep = Arc::new(Endpoint {
let node = Arc::new(PostgresNode {
name: name.to_owned(),
address: SocketAddr::new("127.0.0.1".parse().unwrap(), port),
env: self.env.clone(),
@@ -84,45 +93,39 @@ impl ComputeControlPlane {
pg_version,
});
ep.create_pgdata()?;
ep.setup_pg_conf()?;
node.create_pgdata()?;
node.setup_pg_conf()?;
self.endpoints.insert(ep.name.clone(), Arc::clone(&ep));
self.nodes
.insert((tenant_id, node.name.clone()), Arc::clone(&node));
Ok(ep)
Ok(node)
}
}
///////////////////////////////////////////////////////////////////////////////
#[derive(Debug)]
pub struct Endpoint {
/// used as the directory name
name: String,
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
// Some(lsn) if this is a read-only endpoint anchored at 'lsn'. None for the primary.
pub lsn: Option<Lsn>,
// port and address of the Postgres server
pub struct PostgresNode {
pub address: SocketAddr,
pg_version: u32,
// These are not part of the endpoint as such, but the environment
// the endpoint runs in.
name: String,
pub env: LocalEnv,
pageserver: Arc<PageServerNode>,
pub timeline_id: TimelineId,
pub lsn: Option<Lsn>, // if it's a read-only node. None for primary
pub tenant_id: TenantId,
pg_version: u32,
}
impl Endpoint {
impl PostgresNode {
fn from_dir_entry(
entry: std::fs::DirEntry,
env: &LocalEnv,
pageserver: &Arc<PageServerNode>,
) -> Result<Endpoint> {
) -> Result<PostgresNode> {
if !entry.file_type()?.is_dir() {
anyhow::bail!(
"Endpoint::from_dir_entry failed: '{}' is not a directory",
"PostgresNode::from_dir_entry failed: '{}' is not a directory",
entry.path().display()
);
}
@@ -132,7 +135,7 @@ impl Endpoint {
let name = fname.to_str().unwrap().to_string();
// Read config file into memory
let cfg_path = entry.path().join("pgdata").join("postgresql.conf");
let cfg_path = entry.path().join("postgresql.conf");
let cfg_path_str = cfg_path.to_string_lossy();
let mut conf_file = File::open(&cfg_path)
.with_context(|| format!("failed to open config file in {}", cfg_path_str))?;
@@ -158,7 +161,7 @@ impl Endpoint {
conf.parse_field_optional("recovery_target_lsn", &context)?;
// ok now
Ok(Endpoint {
Ok(PostgresNode {
address: SocketAddr::new("127.0.0.1".parse().unwrap(), port),
name,
env: env.clone(),
@@ -266,7 +269,7 @@ impl Endpoint {
}
// Write postgresql.conf with default configuration
// and PG_VERSION file to the data directory of a new endpoint.
// and PG_VERSION file to the data directory of a new node.
fn setup_pg_conf(&self) -> Result<()> {
let mut conf = PostgresConf::new();
conf.append("max_wal_senders", "10");
@@ -286,7 +289,7 @@ impl Endpoint {
// walproposer panics when basebackup is invalid, it is pointless to restart in this case.
conf.append("restart_after_crash", "off");
// Configure the Neon Postgres extension to fetch pages from pageserver
// Configure the node to fetch pages from pageserver
let pageserver_connstr = {
let config = &self.pageserver.pg_connection_config;
let (host, port) = (config.host(), config.port());
@@ -322,7 +325,7 @@ impl Endpoint {
conf.append("max_replication_flush_lag", "10GB");
if !self.env.safekeepers.is_empty() {
// Configure Postgres to connect to the safekeepers
// Configure the node to connect to the safekeepers
conf.append("synchronous_standby_names", "walproposer");
let safekeepers = self
@@ -377,12 +380,8 @@ impl Endpoint {
Ok(())
}
pub fn endpoint_path(&self) -> PathBuf {
self.env.endpoints_path().join(&self.name)
}
pub fn pgdata(&self) -> PathBuf {
self.endpoint_path().join("pgdata")
self.env.pg_data_dir(&self.tenant_id, &self.name)
}
pub fn status(&self) -> &str {
@@ -444,11 +443,12 @@ impl Endpoint {
}
pub fn start(&self, auth_token: &Option<String>) -> Result<()> {
// Bail if the node already running.
if self.status() == "running" {
anyhow::bail!("The endpoint is already running");
anyhow::bail!("The node is already running");
}
// 1. We always start Postgres from scratch, so
// 1. We always start compute node from scratch, so
// if old dir exists, preserve 'postgresql.conf' and drop the directory
let postgresql_conf_path = self.pgdata().join("postgresql.conf");
let postgresql_conf = fs::read(&postgresql_conf_path).with_context(|| {
@@ -470,8 +470,8 @@ impl Endpoint {
File::create(self.pgdata().join("standby.signal"))?;
}
// 4. Finally start postgres
println!("Starting postgres at '{}'", self.connstr());
// 4. Finally start the compute node postgres
println!("Starting postgres node at '{}'", self.connstr());
self.pg_ctl(&["start"], auth_token)
}
@@ -480,7 +480,7 @@ impl Endpoint {
// use immediate shutdown mode, otherwise,
// shutdown gracefully to leave the data directory sane.
//
// Postgres is always started from scratch, so stop
// Compute node always starts from scratch, so stop
// without destroy only used for testing and debugging.
//
if destroy {
@@ -489,7 +489,7 @@ impl Endpoint {
"Destroying postgres data directory '{}'",
self.pgdata().to_str().unwrap()
);
fs::remove_dir_all(self.endpoint_path())?;
fs::remove_dir_all(self.pgdata())?;
} else {
self.pg_ctl(&["stop"], &None)?;
}

View File

@@ -9,7 +9,7 @@
mod background_process;
pub mod broker;
pub mod endpoint;
pub mod compute;
pub mod local_env;
pub mod pageserver;
pub mod postgresql_conf;

View File

@@ -200,8 +200,14 @@ impl LocalEnv {
self.neon_distrib_dir.join("storage_broker")
}
pub fn endpoints_path(&self) -> PathBuf {
self.base_data_dir.join("endpoints")
pub fn pg_data_dirs_path(&self) -> PathBuf {
self.base_data_dir.join("pgdatadirs").join("tenants")
}
pub fn pg_data_dir(&self, tenant_id: &TenantId, branch_name: &str) -> PathBuf {
self.pg_data_dirs_path()
.join(tenant_id.to_string())
.join(branch_name)
}
// TODO: move pageserver files into ./pageserver
@@ -421,7 +427,7 @@ impl LocalEnv {
}
}
fs::create_dir_all(self.endpoints_path())?;
fs::create_dir_all(self.pg_data_dirs_path())?;
for safekeeper in &self.safekeepers {
fs::create_dir_all(SafekeeperNode::datadir_path_by_id(self, safekeeper.id))?;

View File

@@ -1,14 +0,0 @@
[package]
name = "compute_api"
version = "0.1.0"
edition.workspace = true
license.workspace = true
[dependencies]
anyhow.workspace = true
chrono.workspace = true
serde.workspace = true
serde_with.workspace = true
serde_json.workspace = true
workspace_hack.workspace = true

View File

@@ -1,2 +0,0 @@
pub mod models;
pub mod spec;

View File

@@ -1,52 +0,0 @@
//! Structs representing the JSON formats used in the compute_ctl's HTTP API.
use chrono::{DateTime, Utc};
use serde::{Serialize, Serializer};
/// Response of the /status API
///
#[derive(Clone, Serialize)]
#[serde(rename_all = "snake_case")]
pub struct ComputeState {
pub status: ComputeStatus,
/// Timestamp of the last Postgres activity
#[serde(serialize_with = "rfc3339_serialize")]
pub last_active: DateTime<Utc>,
pub error: Option<String>,
}
#[derive(Serialize, Clone, Copy, PartialEq, Eq, Debug)]
#[serde(rename_all = "snake_case")]
pub enum ComputeStatus {
// Spec wasn't provided as start, waiting for it to be
// provided by control-plane.
Empty,
// Compute node has spec and initial startup and
// configuration is in progress.
Init,
// Compute is configured and running.
Running,
// Either startup or configuration failed,
// compute will exit soon or is waiting for
// control-plane to terminate it.
Failed,
// Control-plane requested reconfiguration.
ConfigurationPending,
// New spec is being applied.
Configuration,
}
fn rfc3339_serialize<S>(x: &DateTime<Utc>, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
x.to_rfc3339().serialize(s)
}
/// Response of the /metrics.json API
#[derive(Clone, Default, Serialize)]
pub struct ComputeMetrics {
pub sync_safekeepers_ms: u64,
pub basebackup_ms: u64,
pub config_ms: u64,
pub total_startup_ms: u64,
}

View File

@@ -1,94 +0,0 @@
//! `ComputeSpec` represents the contents of the spec.json file.
//!
//! The spec.json file is used to pass information to 'compute_ctl'. It contains
//! all the information needed to start up the right version of PostgreSQL,
//! and connect it to the storage nodes.
use serde::Deserialize;
use std::collections::HashMap;
/// String type alias representing Postgres identifier and
/// intended to be used for DB / role names.
pub type PgIdent = String;
/// Cluster spec or configuration represented as an optional number of
/// delta operations + final cluster state description.
#[derive(Clone, Debug, Deserialize)]
pub struct ComputeSpec {
pub format_version: f32,
pub timestamp: String,
pub operation_uuid: Option<String>,
/// Expected cluster state at the end of transition process.
pub cluster: Cluster,
pub delta_operations: Option<Vec<DeltaOp>>,
pub storage_auth_token: Option<String>,
pub startup_tracing_context: Option<HashMap<String, String>>,
}
#[derive(Clone, Debug, Deserialize)]
pub struct Cluster {
pub cluster_id: String,
pub name: String,
pub state: Option<String>,
pub roles: Vec<Role>,
pub databases: Vec<Database>,
pub settings: GenericOptions,
}
/// Single cluster state changing operation that could not be represented as
/// a static `Cluster` structure. For example:
/// - DROP DATABASE
/// - DROP ROLE
/// - ALTER ROLE name RENAME TO new_name
/// - ALTER DATABASE name RENAME TO new_name
#[derive(Clone, Debug, Deserialize)]
pub struct DeltaOp {
pub action: String,
pub name: PgIdent,
pub new_name: Option<PgIdent>,
}
/// Rust representation of Postgres role info with only those fields
/// that matter for us.
#[derive(Clone, Debug, Deserialize)]
pub struct Role {
pub name: PgIdent,
pub encrypted_password: Option<String>,
pub options: GenericOptions,
}
/// Rust representation of Postgres database info with only those fields
/// that matter for us.
#[derive(Clone, Debug, Deserialize)]
pub struct Database {
pub name: PgIdent,
pub owner: PgIdent,
pub options: GenericOptions,
}
/// Common type representing both SQL statement params with or without value,
/// like `LOGIN` or `OWNER username` in the `CREATE/ALTER ROLE`, and config
/// options like `wal_level = logical`.
#[derive(Clone, Debug, Deserialize)]
pub struct GenericOption {
pub name: String,
pub value: Option<String>,
pub vartype: String,
}
/// Optional collection of `GenericOption`'s. Type alias allows us to
/// declare a `trait` on it.
pub type GenericOptions = Option<Vec<GenericOption>>;
#[cfg(test)]
mod tests {
use super::*;
use std::fs::File;
#[test]
fn parse_spec_file() {
let file = File::open("tests/cluster_spec.json").unwrap();
let _spec: ComputeSpec = serde_json::from_reader(file).unwrap();
}
}

View File

@@ -53,7 +53,7 @@ pub async fn password_hack(
.await?;
info!(project = &payload.project, "received missing parameter");
creds.project = Some(payload.project.into());
creds.project = Some(payload.project);
let mut node = api.wake_compute(extra, creds).await?;
node.config.password(payload.password);

View File

@@ -2,7 +2,7 @@
use crate::error::UserFacingError;
use pq_proto::StartupMessageParams;
use std::borrow::Cow;
use std::collections::HashSet;
use thiserror::Error;
use tracing::info;
@@ -19,11 +19,10 @@ pub enum ClientCredsParseError {
InconsistentProjectNames { domain: String, option: String },
#[error(
"SNI ('{}') inconsistently formatted with respect to common name ('{}'). \
SNI should be formatted as '<project-name>.{}'.",
.sni, .cn, .cn,
"Common name inferred from SNI ('{}') is not known",
.cn,
)]
InconsistentSni { sni: String, cn: String },
UnknownCommonName { cn: String },
#[error("Project name ('{0}') must contain only alphanumeric characters and hyphen.")]
MalformedProjectName(String),
@@ -37,7 +36,7 @@ impl UserFacingError for ClientCredsParseError {}
pub struct ClientCredentials<'a> {
pub user: &'a str,
// TODO: this is a severe misnomer! We should think of a new name ASAP.
pub project: Option<Cow<'a, str>>,
pub project: Option<String>,
}
impl ClientCredentials<'_> {
@@ -51,7 +50,7 @@ impl<'a> ClientCredentials<'a> {
pub fn parse(
params: &'a StartupMessageParams,
sni: Option<&str>,
common_name: Option<&str>,
common_names: Option<HashSet<String>>,
) -> Result<Self, ClientCredsParseError> {
use ClientCredsParseError::*;
@@ -60,37 +59,43 @@ impl<'a> ClientCredentials<'a> {
let user = get_param("user")?;
// Project name might be passed via PG's command-line options.
let project_option = params.options_raw().and_then(|mut options| {
options
.find_map(|opt| opt.strip_prefix("project="))
.map(Cow::Borrowed)
});
let project_option = params
.options_raw()
.and_then(|mut options| options.find_map(|opt| opt.strip_prefix("project=")))
.map(|name| name.to_string());
// Alternative project name is in fact a subdomain from SNI.
// NOTE: we do not consider SNI if `common_name` is missing.
let project_domain = sni
.zip(common_name)
.map(|(sni, cn)| {
subdomain_from_sni(sni, cn)
.ok_or_else(|| InconsistentSni {
sni: sni.into(),
cn: cn.into(),
let project_from_domain = if let Some(sni_str) = sni {
if let Some(cn) = common_names {
let common_name_from_sni = sni_str.split_once('.').map(|(_, domain)| domain);
let project = common_name_from_sni
.and_then(|domain| {
if cn.contains(domain) {
subdomain_from_sni(sni_str, domain)
} else {
None
}
})
.map(Cow::<'static, str>::Owned)
})
.transpose()?;
.ok_or_else(|| UnknownCommonName {
cn: common_name_from_sni.unwrap_or("").into(),
})?;
let project = match (project_option, project_domain) {
Some(project)
} else {
None
}
} else {
None
};
let project = match (project_option, project_from_domain) {
// Invariant: if we have both project name variants, they should match.
(Some(option), Some(domain)) if option != domain => {
Some(Err(InconsistentProjectNames {
domain: domain.into(),
option: option.into(),
}))
Some(Err(InconsistentProjectNames { domain, option }))
}
// Invariant: project name may not contain certain characters.
(a, b) => a.or(b).map(|name| match project_name_valid(&name) {
false => Err(MalformedProjectName(name.into())),
false => Err(MalformedProjectName(name)),
true => Ok(name),
}),
}
@@ -149,9 +154,9 @@ mod tests {
let options = StartupMessageParams::new([("user", "john_doe")]);
let sni = Some("foo.localhost");
let common_name = Some("localhost");
let common_names = Some(["localhost".into()].into());
let creds = ClientCredentials::parse(&options, sni, common_name)?;
let creds = ClientCredentials::parse(&options, sni, common_names)?;
assert_eq!(creds.user, "john_doe");
assert_eq!(creds.project.as_deref(), Some("foo"));
@@ -177,24 +182,41 @@ mod tests {
let options = StartupMessageParams::new([("user", "john_doe"), ("options", "project=baz")]);
let sni = Some("baz.localhost");
let common_name = Some("localhost");
let common_names = Some(["localhost".into()].into());
let creds = ClientCredentials::parse(&options, sni, common_name)?;
let creds = ClientCredentials::parse(&options, sni, common_names)?;
assert_eq!(creds.user, "john_doe");
assert_eq!(creds.project.as_deref(), Some("baz"));
Ok(())
}
#[test]
fn parse_multi_common_names() -> anyhow::Result<()> {
let options = StartupMessageParams::new([("user", "john_doe")]);
let common_names = Some(["a.com".into(), "b.com".into()].into());
let sni = Some("p1.a.com");
let creds = ClientCredentials::parse(&options, sni, common_names)?;
assert_eq!(creds.project.as_deref(), Some("p1"));
let common_names = Some(["a.com".into(), "b.com".into()].into());
let sni = Some("p1.b.com");
let creds = ClientCredentials::parse(&options, sni, common_names)?;
assert_eq!(creds.project.as_deref(), Some("p1"));
Ok(())
}
#[test]
fn parse_projects_different() {
let options =
StartupMessageParams::new([("user", "john_doe"), ("options", "project=first")]);
let sni = Some("second.localhost");
let common_name = Some("localhost");
let common_names = Some(["localhost".into()].into());
let err = ClientCredentials::parse(&options, sni, common_name).expect_err("should fail");
let err = ClientCredentials::parse(&options, sni, common_names).expect_err("should fail");
match err {
InconsistentProjectNames { domain, option } => {
assert_eq!(option, "first");
@@ -209,13 +231,12 @@ mod tests {
let options = StartupMessageParams::new([("user", "john_doe")]);
let sni = Some("project.localhost");
let common_name = Some("example.com");
let common_names = Some(["example.com".into()].into());
let err = ClientCredentials::parse(&options, sni, common_name).expect_err("should fail");
let err = ClientCredentials::parse(&options, sni, common_names).expect_err("should fail");
match err {
InconsistentSni { sni, cn } => {
assert_eq!(sni, "project.localhost");
assert_eq!(cn, "example.com");
UnknownCommonName { cn } => {
assert_eq!(cn, "localhost");
}
_ => panic!("bad error: {err:?}"),
}

View File

@@ -1,6 +1,12 @@
use crate::auth;
use anyhow::{bail, ensure, Context};
use std::{str::FromStr, sync::Arc, time::Duration};
use anyhow::{bail, ensure, Context, Ok};
use rustls::sign;
use std::{
collections::{HashMap, HashSet},
str::FromStr,
sync::Arc,
time::Duration,
};
pub struct ProxyConfig {
pub tls_config: Option<TlsConfig>,
@@ -16,7 +22,7 @@ pub struct MetricCollectionConfig {
pub struct TlsConfig {
pub config: Arc<rustls::ServerConfig>,
pub common_name: Option<String>,
pub common_names: Option<HashSet<String>>,
}
impl TlsConfig {
@@ -26,28 +32,33 @@ impl TlsConfig {
}
/// Configure TLS for the main endpoint.
pub fn configure_tls(key_path: &str, cert_path: &str) -> anyhow::Result<TlsConfig> {
let key = {
let key_bytes = std::fs::read(key_path).context("TLS key file")?;
let mut keys = rustls_pemfile::pkcs8_private_keys(&mut &key_bytes[..])
.context(format!("Failed to read TLS keys at '{key_path}'"))?;
pub fn configure_tls(
key_path: &str,
cert_path: &str,
certs_dir: Option<&String>,
) -> anyhow::Result<TlsConfig> {
let mut cert_resolver = CertResolver::new();
ensure!(keys.len() == 1, "keys.len() = {} (should be 1)", keys.len());
keys.pop().map(rustls::PrivateKey).unwrap()
};
// add default certificate
cert_resolver.add_cert(key_path, cert_path)?;
let cert_chain_bytes = std::fs::read(cert_path)
.context(format!("Failed to read TLS cert file at '{cert_path}.'"))?;
// add extra certificates
if let Some(certs_dir) = certs_dir {
for entry in std::fs::read_dir(certs_dir)? {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
let key_path = path.join("key.pem");
let cert_path = path.join("cert.pem");
if key_path.exists() && cert_path.exists() {
cert_resolver
.add_cert(&key_path.to_string_lossy(), &cert_path.to_string_lossy())?;
}
}
}
}
let cert_chain = {
rustls_pemfile::certs(&mut &cert_chain_bytes[..])
.context(format!(
"Failed to read TLS certificate chain from bytes from file at '{cert_path}'."
))?
.into_iter()
.map(rustls::Certificate)
.collect()
};
let common_names = cert_resolver.get_common_names();
let config = rustls::ServerConfig::builder()
.with_safe_default_cipher_suites()
@@ -55,27 +66,105 @@ pub fn configure_tls(key_path: &str, cert_path: &str) -> anyhow::Result<TlsConfi
// allow TLS 1.2 to be compatible with older client libraries
.with_protocol_versions(&[&rustls::version::TLS13, &rustls::version::TLS12])?
.with_no_client_auth()
.with_single_cert(cert_chain, key)?
.with_cert_resolver(Arc::new(cert_resolver))
.into();
// determine common name from tls-cert (-c server.crt param).
// used in asserting project name formatting invariant.
let common_name = {
let pem = x509_parser::pem::parse_x509_pem(&cert_chain_bytes)
.context(format!(
"Failed to parse PEM object from bytes from file at '{cert_path}'."
))?
.1;
let common_name = pem.parse_x509()?.subject().to_string();
common_name.strip_prefix("CN=*.").map(|s| s.to_string())
};
Ok(TlsConfig {
config,
common_name,
common_names: Some(common_names),
})
}
struct CertResolver {
certs: HashMap<String, Arc<rustls::sign::CertifiedKey>>,
}
impl CertResolver {
fn new() -> Self {
Self {
certs: HashMap::new(),
}
}
fn add_cert(&mut self, key_path: &str, cert_path: &str) -> anyhow::Result<()> {
let priv_key = {
let key_bytes = std::fs::read(key_path).context("TLS key file")?;
let mut keys = rustls_pemfile::pkcs8_private_keys(&mut &key_bytes[..])
.context(format!("Failed to read TLS keys at '{key_path}'"))?;
ensure!(keys.len() == 1, "keys.len() = {} (should be 1)", keys.len());
keys.pop().map(rustls::PrivateKey).unwrap()
};
let key = sign::any_supported_type(&priv_key).context("invalid private key")?;
let cert_chain_bytes = std::fs::read(cert_path)
.context(format!("Failed to read TLS cert file at '{cert_path}.'"))?;
let cert_chain = {
rustls_pemfile::certs(&mut &cert_chain_bytes[..])
.context(format!(
"Failed to read TLS certificate chain from bytes from file at '{cert_path}'."
))?
.into_iter()
.map(rustls::Certificate)
.collect()
};
let common_name = {
let pem = x509_parser::pem::parse_x509_pem(&cert_chain_bytes)
.context(format!(
"Failed to parse PEM object from bytes from file at '{cert_path}'."
))?
.1;
let common_name = pem.parse_x509()?.subject().to_string();
common_name.strip_prefix("CN=*.").map(|s| s.to_string())
}
.context(format!(
"Failed to parse common name from certificate at '{cert_path}'."
))?;
self.certs.insert(
common_name,
Arc::new(rustls::sign::CertifiedKey::new(cert_chain, key)),
);
Ok(())
}
fn get_common_names(&self) -> HashSet<String> {
self.certs.keys().map(|s| s.to_string()).collect()
}
}
impl rustls::server::ResolvesServerCert for CertResolver {
fn resolve(
&self,
_client_hello: rustls::server::ClientHello,
) -> Option<Arc<rustls::sign::CertifiedKey>> {
// loop here and cut off more and more subdomains until we find
// a match to get a proper wildcard support. OTOH, we now do not
// use nested domains, so keep this simple for now.
//
// With the current coding foo.com will match *.foo.com and that
// repeats behavior of the old code.
if let Some(mut sni_name) = _client_hello.server_name() {
loop {
if let Some(cert) = self.certs.get(sni_name) {
return Some(cert.clone());
}
if let Some((_, rest)) = sni_name.split_once('.') {
sni_name = rest;
} else {
return None;
}
}
} else {
None
}
}
}
/// Helper for cmdline cache options parsing.
pub struct CacheOptions {
/// Max number of entries.

View File

@@ -132,7 +132,11 @@ fn build_config(args: &clap::ArgMatches) -> anyhow::Result<&'static ProxyConfig>
args.get_one::<String>("tls-key"),
args.get_one::<String>("tls-cert"),
) {
(Some(key_path), Some(cert_path)) => Some(config::configure_tls(key_path, cert_path)?),
(Some(key_path), Some(cert_path)) => Some(config::configure_tls(
key_path,
cert_path,
args.get_one::<String>("certs-dir"),
)?),
(None, None) => None,
_ => bail!("either both or neither tls-key and tls-cert must be specified"),
};
@@ -254,6 +258,12 @@ fn cli() -> clap::Command {
.alias("ssl-cert") // backwards compatibility
.help("path to TLS cert for client postgres connections"),
)
// tls-key and tls-cert are for backwards compatibility, we can put all certs in one dir
.arg(
Arg::new("certs-dir")
.long("certs-dir")
.help("path to directory with TLS certificates for client postgres connections"),
)
.arg(
Arg::new("metric-collection-endpoint")
.long("metric-collection-endpoint")

View File

@@ -124,11 +124,11 @@ pub async fn handle_ws_client(
// Extract credentials which we're going to use for auth.
let creds = {
let common_name = tls.and_then(|tls| tls.common_name.as_deref());
let common_names = tls.and_then(|tls| tls.common_names.clone());
let result = config
.auth_backend
.as_ref()
.map(|_| auth::ClientCredentials::parse(&params, hostname, common_name))
.map(|_| auth::ClientCredentials::parse(&params, hostname, common_names))
.transpose();
async { result }.or_else(|e| stream.throw_error(e)).await?
@@ -163,11 +163,11 @@ async fn handle_client(
// Extract credentials which we're going to use for auth.
let creds = {
let sni = stream.get_ref().sni_hostname();
let common_name = tls.and_then(|tls| tls.common_name.as_deref());
let common_names = tls.and_then(|tls| tls.common_names.clone());
let result = config
.auth_backend
.as_ref()
.map(|_| auth::ClientCredentials::parse(&params, sni, common_name))
.map(|_| auth::ClientCredentials::parse(&params, sni, common_names))
.transpose();
async { result }.or_else(|e| stream.throw_error(e)).await?

View File

@@ -54,9 +54,11 @@ fn generate_tls_config<'a>(
.with_single_cert(vec![cert], key)?
.into();
let common_names = Some([common_name.to_owned()].iter().cloned().collect());
TlsConfig {
config,
common_name: Some(common_name.to_string()),
common_names,
}
};

View File

@@ -114,7 +114,7 @@ class NeonCompare(PgCompare):
self.timeline = self.env.neon_cli.create_timeline(branch_name, tenant_id=self.tenant)
# Start pg
self._pg = self.env.endpoints.create_start(branch_name, "main", self.tenant)
self._pg = self.env.postgres.create_start(branch_name, "main", self.tenant)
@property
def pg(self) -> PgProtocol:

View File

@@ -829,7 +829,7 @@ class NeonEnvBuilder:
# Stop all the nodes.
if self.env:
log.info("Cleaning up all storage and compute nodes")
self.env.endpoints.stop_all()
self.env.postgres.stop_all()
for sk in self.env.safekeepers:
sk.stop(immediate=True)
self.env.pageserver.stop(immediate=True)
@@ -893,7 +893,7 @@ class NeonEnv:
self.port_distributor = config.port_distributor
self.s3_mock_server = config.mock_s3_server
self.neon_cli = NeonCli(env=self)
self.endpoints = EndpointFactory(self)
self.postgres = PostgresFactory(self)
self.safekeepers: List[Safekeeper] = []
self.broker = config.broker
self.remote_storage = config.remote_storage
@@ -901,7 +901,6 @@ class NeonEnv:
self.pg_version = config.pg_version
self.neon_binpath = config.neon_binpath
self.pg_distrib_dir = config.pg_distrib_dir
self.endpoint_counter = 0
# generate initial tenant ID here instead of letting 'neon init' generate it,
# so that we don't need to dig it out of the config file afterwards.
@@ -1015,13 +1014,6 @@ class NeonEnv:
priv = (Path(self.repo_dir) / "auth_private_key.pem").read_text()
return AuthKeys(pub=pub, priv=priv)
def generate_endpoint_id(self) -> str:
"""
Generate a unique endpoint ID
"""
self.endpoint_counter += 1
return "ep-" + str(self.endpoint_counter)
@pytest.fixture(scope=shareable_scope)
def _shared_simple_env(
@@ -1080,7 +1072,7 @@ def neon_simple_env(_shared_simple_env: NeonEnv) -> Iterator[NeonEnv]:
"""
yield _shared_simple_env
_shared_simple_env.endpoints.stop_all()
_shared_simple_env.postgres.stop_all()
@pytest.fixture(scope="function")
@@ -1104,7 +1096,7 @@ def neon_env_builder(
neon_env_builder.init_start().
After the initialization, you can launch compute nodes by calling
the functions in the 'env.endpoints' factory object, stop/start the
the functions in the 'env.postgres' factory object, stop/start the
nodes, etc.
"""
@@ -1977,16 +1969,16 @@ class NeonCli(AbstractNeonCli):
args.extend(["-m", "immediate"])
return self.raw_cli(args)
def endpoint_create(
def pg_create(
self,
branch_name: str,
endpoint_id: Optional[str] = None,
node_name: Optional[str] = None,
tenant_id: Optional[TenantId] = None,
lsn: Optional[Lsn] = None,
port: Optional[int] = None,
) -> "subprocess.CompletedProcess[str]":
args = [
"endpoint",
"pg",
"create",
"--tenant-id",
str(tenant_id or self.env.initial_tenant),
@@ -1999,22 +1991,22 @@ class NeonCli(AbstractNeonCli):
args.extend(["--lsn", str(lsn)])
if port is not None:
args.extend(["--port", str(port)])
if endpoint_id is not None:
args.append(endpoint_id)
if node_name is not None:
args.append(node_name)
res = self.raw_cli(args)
res.check_returncode()
return res
def endpoint_start(
def pg_start(
self,
endpoint_id: str,
node_name: str,
tenant_id: Optional[TenantId] = None,
lsn: Optional[Lsn] = None,
port: Optional[int] = None,
) -> "subprocess.CompletedProcess[str]":
args = [
"endpoint",
"pg",
"start",
"--tenant-id",
str(tenant_id or self.env.initial_tenant),
@@ -2025,30 +2017,30 @@ class NeonCli(AbstractNeonCli):
args.append(f"--lsn={lsn}")
if port is not None:
args.append(f"--port={port}")
if endpoint_id is not None:
args.append(endpoint_id)
if node_name is not None:
args.append(node_name)
res = self.raw_cli(args)
res.check_returncode()
return res
def endpoint_stop(
def pg_stop(
self,
endpoint_id: str,
node_name: str,
tenant_id: Optional[TenantId] = None,
destroy=False,
check_return_code=True,
) -> "subprocess.CompletedProcess[str]":
args = [
"endpoint",
"pg",
"stop",
"--tenant-id",
str(tenant_id or self.env.initial_tenant),
]
if destroy:
args.append("--destroy")
if endpoint_id is not None:
args.append(endpoint_id)
if node_name is not None:
args.append(node_name)
return self.raw_cli(args, check_return_code=check_return_code)
@@ -2700,8 +2692,8 @@ def static_proxy(
yield proxy
class Endpoint(PgProtocol):
"""An object representing a Postgres compute endpoint managed by the control plane."""
class Postgres(PgProtocol):
"""An object representing a running postgres daemon."""
def __init__(
self, env: NeonEnv, tenant_id: TenantId, port: int, check_stop_result: bool = True
@@ -2709,40 +2701,33 @@ class Endpoint(PgProtocol):
super().__init__(host="localhost", port=port, user="cloud_admin", dbname="postgres")
self.env = env
self.running = False
self.endpoint_id: Optional[str] = None # dubious, see asserts below
self.node_name: Optional[str] = None # dubious, see asserts below
self.pgdata_dir: Optional[str] = None # Path to computenode PGDATA
self.tenant_id = tenant_id
self.port = port
self.check_stop_result = check_stop_result
# path to conf is <repo_dir>/endpoints/<endpoint_id>/pgdata/postgresql.conf
# path to conf is <repo_dir>/pgdatadirs/tenants/<tenant_id>/<node_name>/postgresql.conf
def create(
self,
branch_name: str,
endpoint_id: Optional[str] = None,
node_name: Optional[str] = None,
lsn: Optional[Lsn] = None,
config_lines: Optional[List[str]] = None,
) -> "Endpoint":
) -> "Postgres":
"""
Create a new Postgres endpoint.
Create the pg data directory.
Returns self.
"""
if not config_lines:
config_lines = []
if endpoint_id is None:
endpoint_id = self.env.generate_endpoint_id()
self.endpoint_id = endpoint_id
self.env.neon_cli.endpoint_create(
branch_name,
endpoint_id=self.endpoint_id,
tenant_id=self.tenant_id,
lsn=lsn,
port=self.port,
self.node_name = node_name or f"{branch_name}_pg_node"
self.env.neon_cli.pg_create(
branch_name, node_name=self.node_name, tenant_id=self.tenant_id, lsn=lsn, port=self.port
)
path = Path("endpoints") / self.endpoint_id / "pgdata"
path = Path("pgdatadirs") / "tenants" / str(self.tenant_id) / self.node_name
self.pgdata_dir = os.path.join(self.env.repo_dir, path)
if config_lines is None:
@@ -2755,30 +2740,26 @@ class Endpoint(PgProtocol):
return self
def start(self) -> "Endpoint":
def start(self) -> "Postgres":
"""
Start the Postgres instance.
Returns self.
"""
assert self.endpoint_id is not None
assert self.node_name is not None
log.info(f"Starting postgres endpoint {self.endpoint_id}")
log.info(f"Starting postgres node {self.node_name}")
self.env.neon_cli.endpoint_start(self.endpoint_id, tenant_id=self.tenant_id, port=self.port)
self.env.neon_cli.pg_start(self.node_name, tenant_id=self.tenant_id, port=self.port)
self.running = True
return self
def endpoint_path(self) -> Path:
"""Path to endpoint directory"""
assert self.endpoint_id
path = Path("endpoints") / self.endpoint_id
return self.env.repo_dir / path
def pg_data_dir_path(self) -> str:
"""Path to Postgres data directory"""
return os.path.join(self.endpoint_path(), "pgdata")
"""Path to data directory"""
assert self.node_name
path = Path("pgdatadirs") / "tenants" / str(self.tenant_id) / self.node_name
return os.path.join(self.env.repo_dir, path)
def pg_xact_dir_path(self) -> str:
"""Path to pg_xact dir"""
@@ -2792,7 +2773,7 @@ class Endpoint(PgProtocol):
"""Path to postgresql.conf"""
return os.path.join(self.pg_data_dir_path(), "postgresql.conf")
def adjust_for_safekeepers(self, safekeepers: str) -> "Endpoint":
def adjust_for_safekeepers(self, safekeepers: str) -> "Postgres":
"""
Adjust instance config for working with wal acceptors instead of
pageserver (pre-configured by CLI) directly.
@@ -2816,7 +2797,7 @@ class Endpoint(PgProtocol):
f.write("neon.safekeepers = '{}'\n".format(safekeepers))
return self
def config(self, lines: List[str]) -> "Endpoint":
def config(self, lines: List[str]) -> "Postgres":
"""
Add lines to postgresql.conf.
Lines should be an array of valid postgresql.conf rows.
@@ -2830,32 +2811,32 @@ class Endpoint(PgProtocol):
return self
def stop(self) -> "Endpoint":
def stop(self) -> "Postgres":
"""
Stop the Postgres instance if it's running.
Returns self.
"""
if self.running:
assert self.endpoint_id is not None
self.env.neon_cli.endpoint_stop(
self.endpoint_id, self.tenant_id, check_return_code=self.check_stop_result
assert self.node_name is not None
self.env.neon_cli.pg_stop(
self.node_name, self.tenant_id, check_return_code=self.check_stop_result
)
self.running = False
return self
def stop_and_destroy(self) -> "Endpoint":
def stop_and_destroy(self) -> "Postgres":
"""
Stop the Postgres instance, then destroy the endpoint.
Stop the Postgres instance, then destroy it.
Returns self.
"""
assert self.endpoint_id is not None
self.env.neon_cli.endpoint_stop(
self.endpoint_id, self.tenant_id, True, check_return_code=self.check_stop_result
assert self.node_name is not None
self.env.neon_cli.pg_stop(
self.node_name, self.tenant_id, True, check_return_code=self.check_stop_result
)
self.endpoint_id = None
self.node_name = None
self.running = False
return self
@@ -2863,12 +2844,13 @@ class Endpoint(PgProtocol):
def create_start(
self,
branch_name: str,
endpoint_id: Optional[str] = None,
node_name: Optional[str] = None,
lsn: Optional[Lsn] = None,
config_lines: Optional[List[str]] = None,
) -> "Endpoint":
) -> "Postgres":
"""
Create an endpoint, apply config, and start Postgres.
Create a Postgres instance, apply config
and then start it.
Returns self.
"""
@@ -2876,7 +2858,7 @@ class Endpoint(PgProtocol):
self.create(
branch_name=branch_name,
endpoint_id=endpoint_id,
node_name=node_name,
config_lines=config_lines,
lsn=lsn,
).start()
@@ -2885,7 +2867,7 @@ class Endpoint(PgProtocol):
return self
def __enter__(self) -> "Endpoint":
def __enter__(self) -> "Postgres":
return self
def __exit__(
@@ -2897,33 +2879,33 @@ class Endpoint(PgProtocol):
self.stop()
class EndpointFactory:
"""An object representing multiple compute endpoints."""
class PostgresFactory:
"""An object representing multiple running postgres daemons."""
def __init__(self, env: NeonEnv):
self.env = env
self.num_instances: int = 0
self.endpoints: List[Endpoint] = []
self.instances: List[Postgres] = []
def create_start(
self,
branch_name: str,
endpoint_id: Optional[str] = None,
node_name: Optional[str] = None,
tenant_id: Optional[TenantId] = None,
lsn: Optional[Lsn] = None,
config_lines: Optional[List[str]] = None,
) -> Endpoint:
ep = Endpoint(
) -> Postgres:
pg = Postgres(
self.env,
tenant_id=tenant_id or self.env.initial_tenant,
port=self.env.port_distributor.get_port(),
)
self.num_instances += 1
self.endpoints.append(ep)
self.instances.append(pg)
return ep.create_start(
return pg.create_start(
branch_name=branch_name,
endpoint_id=endpoint_id,
node_name=node_name,
config_lines=config_lines,
lsn=lsn,
)
@@ -2931,33 +2913,30 @@ class EndpointFactory:
def create(
self,
branch_name: str,
endpoint_id: Optional[str] = None,
node_name: Optional[str] = None,
tenant_id: Optional[TenantId] = None,
lsn: Optional[Lsn] = None,
config_lines: Optional[List[str]] = None,
) -> Endpoint:
ep = Endpoint(
) -> Postgres:
pg = Postgres(
self.env,
tenant_id=tenant_id or self.env.initial_tenant,
port=self.env.port_distributor.get_port(),
)
if endpoint_id is None:
endpoint_id = self.env.generate_endpoint_id()
self.num_instances += 1
self.endpoints.append(ep)
self.instances.append(pg)
return ep.create(
return pg.create(
branch_name=branch_name,
endpoint_id=endpoint_id,
node_name=node_name,
lsn=lsn,
config_lines=config_lines,
)
def stop_all(self) -> "EndpointFactory":
for ep in self.endpoints:
ep.stop()
def stop_all(self) -> "PostgresFactory":
for pg in self.instances:
pg.stop()
return self
@@ -3332,16 +3311,16 @@ def list_files_to_compare(pgdata_dir: Path) -> List[str]:
def check_restored_datadir_content(
test_output_dir: Path,
env: NeonEnv,
endpoint: Endpoint,
pg: Postgres,
):
# Get the timeline ID. We need it for the 'basebackup' command
timeline = TimelineId(endpoint.safe_psql("SHOW neon.timeline_id")[0][0])
timeline = TimelineId(pg.safe_psql("SHOW neon.timeline_id")[0][0])
# stop postgres to ensure that files won't change
endpoint.stop()
pg.stop()
# Take a basebackup from pageserver
restored_dir_path = env.repo_dir / f"{endpoint.endpoint_id}_restored_datadir"
restored_dir_path = env.repo_dir / f"{pg.node_name}_restored_datadir"
restored_dir_path.mkdir(exist_ok=True)
pg_bin = PgBin(test_output_dir, env.pg_distrib_dir, env.pg_version)
@@ -3351,7 +3330,7 @@ def check_restored_datadir_content(
{psql_path} \
--no-psqlrc \
postgres://localhost:{env.pageserver.service_port.pg} \
-c 'basebackup {endpoint.tenant_id} {timeline}' \
-c 'basebackup {pg.tenant_id} {timeline}' \
| tar -x -C {restored_dir_path}
"""
@@ -3368,8 +3347,8 @@ def check_restored_datadir_content(
assert result.returncode == 0
# list files we're going to compare
assert endpoint.pgdata_dir
pgdata_files = list_files_to_compare(Path(endpoint.pgdata_dir))
assert pg.pgdata_dir
pgdata_files = list_files_to_compare(Path(pg.pgdata_dir))
restored_files = list_files_to_compare(restored_dir_path)
# check that file sets are equal
@@ -3380,12 +3359,12 @@ def check_restored_datadir_content(
# We've already filtered all mismatching files in list_files_to_compare(),
# so here expect that the content is identical
(match, mismatch, error) = filecmp.cmpfiles(
endpoint.pgdata_dir, restored_dir_path, pgdata_files, shallow=False
pg.pgdata_dir, restored_dir_path, pgdata_files, shallow=False
)
log.info(f"filecmp result mismatch and error lists:\n\t mismatch={mismatch}\n\t error={error}")
for f in mismatch:
f1 = os.path.join(endpoint.pgdata_dir, f)
f1 = os.path.join(pg.pgdata_dir, f)
f2 = os.path.join(restored_dir_path, f)
stdout_filename = "{}.filediff".format(f2)
@@ -3545,24 +3524,24 @@ def wait_for_last_record_lsn(
def wait_for_last_flush_lsn(
env: NeonEnv, endpoint: Endpoint, tenant: TenantId, timeline: TimelineId
env: NeonEnv, pg: Postgres, tenant: TenantId, timeline: TimelineId
) -> Lsn:
"""Wait for pageserver to catch up the latest flush LSN, returns the last observed lsn."""
last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
last_flush_lsn = Lsn(pg.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
return wait_for_last_record_lsn(env.pageserver.http_client(), tenant, timeline, last_flush_lsn)
def wait_for_wal_insert_lsn(
env: NeonEnv, endpoint: Endpoint, tenant: TenantId, timeline: TimelineId
env: NeonEnv, pg: Postgres, tenant: TenantId, timeline: TimelineId
) -> Lsn:
"""Wait for pageserver to catch up the latest flush LSN, returns the last observed lsn."""
last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_insert_lsn()")[0][0])
last_flush_lsn = Lsn(pg.safe_psql("SELECT pg_current_wal_insert_lsn()")[0][0])
return wait_for_last_record_lsn(env.pageserver.http_client(), tenant, timeline, last_flush_lsn)
def fork_at_current_lsn(
env: NeonEnv,
endpoint: Endpoint,
pg: Postgres,
new_branch_name: str,
ancestor_branch_name: str,
tenant_id: Optional[TenantId] = None,
@@ -3572,7 +3551,7 @@ def fork_at_current_lsn(
The "last LSN" is taken from the given Postgres instance. The pageserver will wait for all the
the WAL up to that LSN to arrive in the pageserver before creating the branch.
"""
current_lsn = endpoint.safe_psql("SELECT pg_current_wal_lsn()")[0][0]
current_lsn = pg.safe_psql("SELECT pg_current_wal_lsn()")[0][0]
return env.neon_cli.create_branch(new_branch_name, ancestor_branch_name, tenant_id, current_lsn)

View File

@@ -52,13 +52,13 @@ def test_branch_creation_heavy_write(neon_compare: NeonCompare, n_branches: int)
def run_pgbench(branch: str):
log.info(f"Start a pgbench workload on branch {branch}")
endpoint = env.endpoints.create_start(branch, tenant_id=tenant)
connstr = endpoint.connstr()
pg = env.postgres.create_start(branch, tenant_id=tenant)
connstr = pg.connstr()
pg_bin.run_capture(["pgbench", "-i", connstr])
pg_bin.run_capture(["pgbench", "-c10", "-T10", connstr])
endpoint.stop()
pg.stop()
env.neon_cli.create_branch("b0", tenant_id=tenant)
@@ -96,8 +96,8 @@ def test_branch_creation_many(neon_compare: NeonCompare, n_branches: int):
env.neon_cli.create_branch("b0")
endpoint = env.endpoints.create_start("b0")
neon_compare.pg_bin.run_capture(["pgbench", "-i", "-s10", endpoint.connstr()])
pg = env.postgres.create_start("b0")
neon_compare.pg_bin.run_capture(["pgbench", "-i", "-s10", pg.connstr()])
branch_creation_durations = []
@@ -124,15 +124,15 @@ def test_branch_creation_many_relations(neon_compare: NeonCompare):
timeline_id = env.neon_cli.create_branch("root")
endpoint = env.endpoints.create_start("root")
with closing(endpoint.connect()) as conn:
pg = env.postgres.create_start("root")
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
for i in range(10000):
cur.execute(f"CREATE TABLE t{i} as SELECT g FROM generate_series(1, 1000) g")
# Wait for the pageserver to finish processing all the pending WALs,
# as we don't want the LSN wait time to be included during the branch creation
flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
flush_lsn = Lsn(pg.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
wait_for_last_record_lsn(
env.pageserver.http_client(), env.initial_tenant, timeline_id, flush_lsn
)
@@ -142,7 +142,7 @@ def test_branch_creation_many_relations(neon_compare: NeonCompare):
# run a concurrent insertion to make the ancestor "busy" during the branch creation
thread = threading.Thread(
target=endpoint.safe_psql, args=("INSERT INTO t0 VALUES (generate_series(1, 100000))",)
target=pg.safe_psql, args=("INSERT INTO t0 VALUES (generate_series(1, 100000))",)
)
thread.start()

View File

@@ -42,41 +42,41 @@ def test_compare_child_and_root_pgbench_perf(neon_compare: NeonCompare):
neon_compare.zenbenchmark.record_pg_bench_result(branch, res)
env.neon_cli.create_branch("root")
endpoint_root = env.endpoints.create_start("root")
pg_bin.run_capture(["pgbench", "-i", endpoint_root.connstr(), "-s10"])
pg_root = env.postgres.create_start("root")
pg_bin.run_capture(["pgbench", "-i", pg_root.connstr(), "-s10"])
fork_at_current_lsn(env, endpoint_root, "child", "root")
fork_at_current_lsn(env, pg_root, "child", "root")
endpoint_child = env.endpoints.create_start("child")
pg_child = env.postgres.create_start("child")
run_pgbench_on_branch("root", ["pgbench", "-c10", "-T10", endpoint_root.connstr()])
run_pgbench_on_branch("child", ["pgbench", "-c10", "-T10", endpoint_child.connstr()])
run_pgbench_on_branch("root", ["pgbench", "-c10", "-T10", pg_root.connstr()])
run_pgbench_on_branch("child", ["pgbench", "-c10", "-T10", pg_child.connstr()])
def test_compare_child_and_root_write_perf(neon_compare: NeonCompare):
env = neon_compare.env
env.neon_cli.create_branch("root")
endpoint_root = env.endpoints.create_start("root")
pg_root = env.postgres.create_start("root")
endpoint_root.safe_psql(
pg_root.safe_psql(
"CREATE TABLE foo(key serial primary key, t text default 'foooooooooooooooooooooooooooooooooooooooooooooooooooo')",
)
env.neon_cli.create_branch("child", "root")
endpoint_child = env.endpoints.create_start("child")
pg_child = env.postgres.create_start("child")
with neon_compare.record_duration("root_run_duration"):
endpoint_root.safe_psql("INSERT INTO foo SELECT FROM generate_series(1,1000000)")
pg_root.safe_psql("INSERT INTO foo SELECT FROM generate_series(1,1000000)")
with neon_compare.record_duration("child_run_duration"):
endpoint_child.safe_psql("INSERT INTO foo SELECT FROM generate_series(1,1000000)")
pg_child.safe_psql("INSERT INTO foo SELECT FROM generate_series(1,1000000)")
def test_compare_child_and_root_read_perf(neon_compare: NeonCompare):
env = neon_compare.env
env.neon_cli.create_branch("root")
endpoint_root = env.endpoints.create_start("root")
pg_root = env.postgres.create_start("root")
endpoint_root.safe_psql_many(
pg_root.safe_psql_many(
[
"CREATE TABLE foo(key serial primary key, t text default 'foooooooooooooooooooooooooooooooooooooooooooooooooooo')",
"INSERT INTO foo SELECT FROM generate_series(1,1000000)",
@@ -84,12 +84,12 @@ def test_compare_child_and_root_read_perf(neon_compare: NeonCompare):
)
env.neon_cli.create_branch("child", "root")
endpoint_child = env.endpoints.create_start("child")
pg_child = env.postgres.create_start("child")
with neon_compare.record_duration("root_run_duration"):
endpoint_root.safe_psql("SELECT count(*) from foo")
pg_root.safe_psql("SELECT count(*) from foo")
with neon_compare.record_duration("child_run_duration"):
endpoint_child.safe_psql("SELECT count(*) from foo")
pg_child.safe_psql("SELECT count(*) from foo")
# -----------------------------------------------------------------------

View File

@@ -35,14 +35,14 @@ def test_bulk_tenant_create(
# if use_safekeepers == 'with_sa':
# wa_factory.start_n_new(3)
endpoint_tenant = env.endpoints.create_start(
pg_tenant = env.postgres.create_start(
f"test_bulk_tenant_create_{tenants_count}_{i}", tenant_id=tenant
)
end = timeit.default_timer()
time_slices.append(end - start)
endpoint_tenant.stop()
pg_tenant.stop()
zenbenchmark.record(
"tenant_creation_time",

View File

@@ -18,8 +18,8 @@ def test_bulk_update(neon_env_builder: NeonEnvBuilder, zenbenchmark, fillfactor)
timeline_id = env.neon_cli.create_branch("test_bulk_update")
tenant_id = env.initial_tenant
endpoint = env.endpoints.create_start("test_bulk_update")
cur = endpoint.connect().cursor()
pg = env.postgres.create_start("test_bulk_update")
cur = pg.connect().cursor()
cur.execute("set statement_timeout=0")
cur.execute(f"create table t(x integer) WITH (fillfactor={fillfactor})")
@@ -28,13 +28,13 @@ def test_bulk_update(neon_env_builder: NeonEnvBuilder, zenbenchmark, fillfactor)
cur.execute(f"insert into t values (generate_series(1,{n_records}))")
cur.execute("vacuum t")
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
with zenbenchmark.record_duration("update-no-prefetch"):
cur.execute("update t set x=x+1")
cur.execute("vacuum t")
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
with zenbenchmark.record_duration("delete-no-prefetch"):
cur.execute("delete from t")
@@ -50,13 +50,13 @@ def test_bulk_update(neon_env_builder: NeonEnvBuilder, zenbenchmark, fillfactor)
cur.execute(f"insert into t2 values (generate_series(1,{n_records}))")
cur.execute("vacuum t2")
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
with zenbenchmark.record_duration("update-with-prefetch"):
cur.execute("update t2 set x=x+1")
cur.execute("vacuum t2")
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
with zenbenchmark.record_duration("delete-with-prefetch"):
cur.execute("delete from t2")

View File

@@ -33,11 +33,11 @@ def test_compaction(neon_compare: NeonCompare):
# Create some tables, and run a bunch of INSERTs and UPDATes on them,
# to generate WAL and layers
endpoint = env.endpoints.create_start(
pg = env.postgres.create_start(
"main", tenant_id=tenant_id, config_lines=["shared_buffers=512MB"]
)
with closing(endpoint.connect()) as conn:
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
for i in range(100):
cur.execute(f"create table tbl{i} (i int, j int);")
@@ -45,7 +45,7 @@ def test_compaction(neon_compare: NeonCompare):
for j in range(100):
cur.execute(f"update tbl{i} set j = {j};")
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
# First compaction generates L1 layers
with neon_compare.zenbenchmark.record_duration("compaction"):

View File

@@ -2,13 +2,13 @@ import threading
import pytest
from fixtures.compare_fixtures import PgCompare
from fixtures.neon_fixtures import PgProtocol
from fixtures.neon_fixtures import Postgres
from performance.test_perf_pgbench import get_scales_matrix
from performance.test_wal_backpressure import record_read_latency
def start_write_workload(pg: PgProtocol, scale: int = 10):
def start_write_workload(pg: Postgres, scale: int = 10):
with pg.connect().cursor() as cur:
cur.execute(f"create table big as select generate_series(1,{scale*100_000})")

View File

@@ -25,8 +25,8 @@ def test_layer_map(neon_env_builder: NeonEnvBuilder, zenbenchmark):
)
env.neon_cli.create_timeline("test_layer_map", tenant_id=tenant)
endpoint = env.endpoints.create_start("test_layer_map", tenant_id=tenant)
cur = endpoint.connect().cursor()
pg = env.postgres.create_start("test_layer_map", tenant_id=tenant)
cur = pg.connect().cursor()
cur.execute("create table t(x integer)")
for i in range(n_iters):
cur.execute(f"insert into t values (generate_series(1,{n_records}))")

View File

@@ -14,19 +14,19 @@ def test_startup(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker
# Start
env.neon_cli.create_branch("test_startup")
with zenbenchmark.record_duration("startup_time"):
endpoint = env.endpoints.create_start("test_startup")
endpoint.safe_psql("select 1;")
pg = env.postgres.create_start("test_startup")
pg.safe_psql("select 1;")
# Restart
endpoint.stop_and_destroy()
pg.stop_and_destroy()
with zenbenchmark.record_duration("restart_time"):
endpoint.create_start("test_startup")
endpoint.safe_psql("select 1;")
pg.create_start("test_startup")
pg.safe_psql("select 1;")
# Fill up
num_rows = 1000000 # 30 MB
num_tables = 100
with closing(endpoint.connect()) as conn:
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
for i in range(num_tables):
cur.execute(f"create table t_{i} (i integer);")
@@ -34,18 +34,18 @@ def test_startup(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker
# Read
with zenbenchmark.record_duration("read_time"):
endpoint.safe_psql("select * from t_0;")
pg.safe_psql("select * from t_0;")
# Read again
with zenbenchmark.record_duration("second_read_time"):
endpoint.safe_psql("select * from t_0;")
pg.safe_psql("select * from t_0;")
# Restart
endpoint.stop_and_destroy()
pg.stop_and_destroy()
with zenbenchmark.record_duration("restart_with_data"):
endpoint.create_start("test_startup")
endpoint.safe_psql("select 1;")
pg.create_start("test_startup")
pg.safe_psql("select 1;")
# Read
with zenbenchmark.record_duration("read_after_restart"):
endpoint.safe_psql("select * from t_0;")
pg.safe_psql("select * from t_0;")

View File

@@ -22,8 +22,8 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
pageserver_http.configure_failpoints(("flush-frozen-before-sync", "sleep(10000)"))
endpoint_branch0 = env.endpoints.create_start("main", tenant_id=tenant)
branch0_cur = endpoint_branch0.connect().cursor()
pg_branch0 = env.postgres.create_start("main", tenant_id=tenant)
branch0_cur = pg_branch0.connect().cursor()
branch0_timeline = TimelineId(query_scalar(branch0_cur, "SHOW neon.timeline_id"))
log.info(f"b0 timeline {branch0_timeline}")
@@ -44,10 +44,10 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
# Create branch1.
env.neon_cli.create_branch("branch1", "main", tenant_id=tenant, ancestor_start_lsn=lsn_100)
endpoint_branch1 = env.endpoints.create_start("branch1", tenant_id=tenant)
pg_branch1 = env.postgres.create_start("branch1", tenant_id=tenant)
log.info("postgres is running on 'branch1' branch")
branch1_cur = endpoint_branch1.connect().cursor()
branch1_cur = pg_branch1.connect().cursor()
branch1_timeline = TimelineId(query_scalar(branch1_cur, "SHOW neon.timeline_id"))
log.info(f"b1 timeline {branch1_timeline}")
@@ -67,9 +67,9 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
# Create branch2.
env.neon_cli.create_branch("branch2", "branch1", tenant_id=tenant, ancestor_start_lsn=lsn_200)
endpoint_branch2 = env.endpoints.create_start("branch2", tenant_id=tenant)
pg_branch2 = env.postgres.create_start("branch2", tenant_id=tenant)
log.info("postgres is running on 'branch2' branch")
branch2_cur = endpoint_branch2.connect().cursor()
branch2_cur = pg_branch2.connect().cursor()
branch2_timeline = TimelineId(query_scalar(branch2_cur, "SHOW neon.timeline_id"))
log.info(f"b2 timeline {branch2_timeline}")

View File

@@ -63,9 +63,9 @@ def test_compute_auth_to_pageserver(neon_env_builder: NeonEnvBuilder):
branch = "test_compute_auth_to_pageserver"
env.neon_cli.create_branch(branch)
endpoint = env.endpoints.create_start(branch)
pg = env.postgres.create_start(branch)
with closing(endpoint.connect()) as conn:
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
# we rely upon autocommit after each statement
# as waiting for acceptors happens there
@@ -82,7 +82,7 @@ def test_auth_failures(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
branch = f"test_auth_failures_auth_enabled_{auth_enabled}"
timeline_id = env.neon_cli.create_branch(branch)
env.endpoints.create_start(branch)
env.postgres.create_start(branch)
tenant_token = env.auth_keys.generate_tenant_token(env.initial_tenant)
invalid_tenant_token = env.auth_keys.generate_tenant_token(TenantId.generate())

View File

@@ -5,7 +5,7 @@ from contextlib import closing, contextmanager
import psycopg2.extras
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import Endpoint, NeonEnvBuilder
from fixtures.neon_fixtures import NeonEnvBuilder, Postgres
pytest_plugins = "fixtures.neon_fixtures"
@@ -20,10 +20,10 @@ def pg_cur(pg):
# Periodically check that all backpressure lags are below the configured threshold,
# assert if they are not.
# If the check query fails, stop the thread. Main thread should notice that and stop the test.
def check_backpressure(endpoint: Endpoint, stop_event: threading.Event, polling_interval=5):
def check_backpressure(pg: Postgres, stop_event: threading.Event, polling_interval=5):
log.info("checks started")
with pg_cur(endpoint) as cur:
with pg_cur(pg) as cur:
cur.execute("CREATE EXTENSION neon") # TODO move it to neon_fixtures?
cur.execute("select pg_size_bytes(current_setting('max_replication_write_lag'))")
@@ -41,7 +41,7 @@ def check_backpressure(endpoint: Endpoint, stop_event: threading.Event, polling_
max_replication_apply_lag_bytes = res[0]
log.info(f"max_replication_apply_lag: {max_replication_apply_lag_bytes} bytes")
with pg_cur(endpoint) as cur:
with pg_cur(pg) as cur:
while not stop_event.is_set():
try:
cur.execute(
@@ -102,14 +102,14 @@ def test_backpressure_received_lsn_lag(neon_env_builder: NeonEnvBuilder):
# Create a branch for us
env.neon_cli.create_branch("test_backpressure")
endpoint = env.endpoints.create_start(
pg = env.postgres.create_start(
"test_backpressure", config_lines=["max_replication_write_lag=30MB"]
)
log.info("postgres is running on 'test_backpressure' branch")
# setup check thread
check_stop_event = threading.Event()
check_thread = threading.Thread(target=check_backpressure, args=(endpoint, check_stop_event))
check_thread = threading.Thread(target=check_backpressure, args=(pg, check_stop_event))
check_thread.start()
# Configure failpoint to slow down walreceiver ingest
@@ -125,7 +125,7 @@ def test_backpressure_received_lsn_lag(neon_env_builder: NeonEnvBuilder):
# because of the lag and waiting for lsn to replay to arrive.
time.sleep(2)
with pg_cur(endpoint) as cur:
with pg_cur(pg) as cur:
# Create and initialize test table
cur.execute("CREATE TABLE foo(x bigint)")

View File

@@ -15,4 +15,4 @@ def test_basebackup_error(neon_simple_env: NeonEnv):
pageserver_http.configure_failpoints(("basebackup-before-control-file", "return"))
with pytest.raises(Exception, match="basebackup-before-control-file"):
env.endpoints.create_start("test_basebackup_error")
env.postgres.create_start("test_basebackup_error")

View File

@@ -67,9 +67,9 @@ def test_branch_and_gc(neon_simple_env: NeonEnv):
)
timeline_main = env.neon_cli.create_timeline("test_main", tenant_id=tenant)
endpoint_main = env.endpoints.create_start("test_main", tenant_id=tenant)
pg_main = env.postgres.create_start("test_main", tenant_id=tenant)
main_cur = endpoint_main.connect().cursor()
main_cur = pg_main.connect().cursor()
main_cur.execute(
"CREATE TABLE foo(key serial primary key, t text default 'foooooooooooooooooooooooooooooooooooooooooooooooooooo')"
@@ -90,9 +90,9 @@ def test_branch_and_gc(neon_simple_env: NeonEnv):
env.neon_cli.create_branch(
"test_branch", "test_main", tenant_id=tenant, ancestor_start_lsn=lsn1
)
endpoint_branch = env.endpoints.create_start("test_branch", tenant_id=tenant)
pg_branch = env.postgres.create_start("test_branch", tenant_id=tenant)
branch_cur = endpoint_branch.connect().cursor()
branch_cur = pg_branch.connect().cursor()
branch_cur.execute("INSERT INTO foo SELECT FROM generate_series(1, 100000)")
assert query_scalar(branch_cur, "SELECT count(*) FROM foo") == 200000
@@ -142,8 +142,8 @@ def test_branch_creation_before_gc(neon_simple_env: NeonEnv):
)
b0 = env.neon_cli.create_branch("b0", tenant_id=tenant)
endpoint0 = env.endpoints.create_start("b0", tenant_id=tenant)
res = endpoint0.safe_psql_many(
pg0 = env.postgres.create_start("b0", tenant_id=tenant)
res = pg0.safe_psql_many(
queries=[
"CREATE TABLE t(key serial primary key)",
"INSERT INTO t SELECT FROM generate_series(1, 100000)",

View File

@@ -18,10 +18,10 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder):
# Branch at the point where only 100 rows were inserted
env.neon_cli.create_branch("test_branch_behind")
endpoint_main = env.endpoints.create_start("test_branch_behind")
pgmain = env.postgres.create_start("test_branch_behind")
log.info("postgres is running on 'test_branch_behind' branch")
main_cur = endpoint_main.connect().cursor()
main_cur = pgmain.connect().cursor()
timeline = TimelineId(query_scalar(main_cur, "SHOW neon.timeline_id"))
@@ -74,15 +74,15 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder):
"test_branch_behind_more", "test_branch_behind", ancestor_start_lsn=lsn_b
)
endpoint_hundred = env.endpoints.create_start("test_branch_behind_hundred")
endpoint_more = env.endpoints.create_start("test_branch_behind_more")
pg_hundred = env.postgres.create_start("test_branch_behind_hundred")
pg_more = env.postgres.create_start("test_branch_behind_more")
# On the 'hundred' branch, we should see only 100 rows
hundred_cur = endpoint_hundred.connect().cursor()
hundred_cur = pg_hundred.connect().cursor()
assert query_scalar(hundred_cur, "SELECT count(*) FROM foo") == 100
# On the 'more' branch, we should see 100200 rows
more_cur = endpoint_more.connect().cursor()
more_cur = pg_more.connect().cursor()
assert query_scalar(more_cur, "SELECT count(*) FROM foo") == 200100
# All the rows are visible on the main branch
@@ -94,8 +94,8 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder):
env.neon_cli.create_branch(
"test_branch_segment_boundary", "test_branch_behind", ancestor_start_lsn=Lsn("0/3000000")
)
endpoint = env.endpoints.create_start("test_branch_segment_boundary")
assert endpoint.safe_psql("SELECT 1")[0][0] == 1
pg = env.postgres.create_start("test_branch_segment_boundary")
assert pg.safe_psql("SELECT 1")[0][0] == 1
# branch at pre-initdb lsn
with pytest.raises(Exception, match="invalid branch start lsn: .*"):

View File

@@ -5,7 +5,7 @@ from typing import List
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import Endpoint, NeonEnv, PgBin
from fixtures.neon_fixtures import NeonEnv, PgBin, Postgres
from fixtures.types import Lsn
from fixtures.utils import query_scalar
from performance.test_perf_pgbench import get_scales_matrix
@@ -40,20 +40,20 @@ def test_branching_with_pgbench(
}
)
def run_pgbench(connstr: str):
def run_pgbench(pg: Postgres):
connstr = pg.connstr()
log.info(f"Start a pgbench workload on pg {connstr}")
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])
pg_bin.run_capture(["pgbench", "-T15", connstr])
env.neon_cli.create_branch("b0", tenant_id=tenant)
endpoints: List[Endpoint] = []
endpoints.append(env.endpoints.create_start("b0", tenant_id=tenant))
pgs: List[Postgres] = []
pgs.append(env.postgres.create_start("b0", tenant_id=tenant))
threads: List[threading.Thread] = []
threads.append(
threading.Thread(target=run_pgbench, args=(endpoints[0].connstr(),), daemon=True)
)
threads.append(threading.Thread(target=run_pgbench, args=(pgs[0],), daemon=True))
threads[-1].start()
thread_limit = 4
@@ -79,18 +79,16 @@ def test_branching_with_pgbench(
else:
env.neon_cli.create_branch("b{}".format(i + 1), "b0", tenant_id=tenant)
endpoints.append(env.endpoints.create_start("b{}".format(i + 1), tenant_id=tenant))
pgs.append(env.postgres.create_start("b{}".format(i + 1), tenant_id=tenant))
threads.append(
threading.Thread(target=run_pgbench, args=(endpoints[-1].connstr(),), daemon=True)
)
threads.append(threading.Thread(target=run_pgbench, args=(pgs[-1],), daemon=True))
threads[-1].start()
for thread in threads:
thread.join()
for ep in endpoints:
res = ep.safe_psql("SELECT count(*) from pgbench_accounts")
for pg in pgs:
res = pg.safe_psql("SELECT count(*) from pgbench_accounts")
assert res[0] == (100000 * scale,)
@@ -112,11 +110,11 @@ def test_branching_unnormalized_start_lsn(neon_simple_env: NeonEnv, pg_bin: PgBi
env = neon_simple_env
env.neon_cli.create_branch("b0")
endpoint0 = env.endpoints.create_start("b0")
pg0 = env.postgres.create_start("b0")
pg_bin.run_capture(["pgbench", "-i", endpoint0.connstr()])
pg_bin.run_capture(["pgbench", "-i", pg0.connstr()])
with endpoint0.cursor() as cur:
with pg0.cursor() as cur:
curr_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
# Specify the `start_lsn` as a number that is divided by `XLOG_BLCKSZ`
@@ -125,6 +123,6 @@ def test_branching_unnormalized_start_lsn(neon_simple_env: NeonEnv, pg_bin: PgBi
log.info(f"Branching b1 from b0 starting at lsn {start_lsn}...")
env.neon_cli.create_branch("b1", "b0", ancestor_start_lsn=start_lsn)
endpoint1 = env.endpoints.create_start("b1")
pg1 = env.postgres.create_start("b1")
pg_bin.run_capture(["pgbench", "-i", endpoint1.connstr()])
pg_bin.run_capture(["pgbench", "-i", pg1.connstr()])

View File

@@ -4,7 +4,7 @@ from typing import List, Tuple
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import Endpoint, NeonEnv, NeonEnvBuilder
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, Postgres
from fixtures.types import TenantId, TimelineId
@@ -24,17 +24,17 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
]
)
tenant_timelines: List[Tuple[TenantId, TimelineId, Endpoint]] = []
tenant_timelines: List[Tuple[TenantId, TimelineId, Postgres]] = []
for n in range(4):
tenant_id, timeline_id = env.neon_cli.create_tenant()
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
with endpoint.cursor() as cur:
pg = env.postgres.create_start("main", tenant_id=tenant_id)
with pg.cursor() as cur:
cur.execute("CREATE TABLE t(key int primary key, value text)")
cur.execute("INSERT INTO t SELECT generate_series(1,100), 'payload'")
endpoint.stop()
tenant_timelines.append((tenant_id, timeline_id, endpoint))
pg.stop()
tenant_timelines.append((tenant_id, timeline_id, pg))
# Stop the pageserver
env.pageserver.stop()

View File

@@ -24,14 +24,14 @@ def test_clog_truncate(neon_simple_env: NeonEnv):
"autovacuum_freeze_max_age=100000",
]
endpoint = env.endpoints.create_start("test_clog_truncate", config_lines=config)
pg = env.postgres.create_start("test_clog_truncate", config_lines=config)
log.info("postgres is running on test_clog_truncate branch")
# Install extension containing function needed for test
endpoint.safe_psql("CREATE EXTENSION neon_test_utils")
pg.safe_psql("CREATE EXTENSION neon_test_utils")
# Consume many xids to advance clog
with endpoint.cursor() as cur:
with pg.cursor() as cur:
cur.execute("select test_consume_xids(1000*1000*10);")
log.info("xids consumed")
@@ -44,7 +44,7 @@ def test_clog_truncate(neon_simple_env: NeonEnv):
# wait for autovacuum to truncate the pg_xact
# XXX Is it worth to add a timeout here?
pg_xact_0000_path = os.path.join(endpoint.pg_xact_dir_path(), "0000")
pg_xact_0000_path = os.path.join(pg.pg_xact_dir_path(), "0000")
log.info(f"pg_xact_0000_path = {pg_xact_0000_path}")
while os.path.isfile(pg_xact_0000_path):
@@ -52,7 +52,7 @@ def test_clog_truncate(neon_simple_env: NeonEnv):
time.sleep(5)
# checkpoint to advance latest lsn
with endpoint.cursor() as cur:
with pg.cursor() as cur:
cur.execute("CHECKPOINT;")
lsn_after_truncation = query_scalar(cur, "select pg_current_wal_insert_lsn()")
@@ -61,10 +61,10 @@ def test_clog_truncate(neon_simple_env: NeonEnv):
env.neon_cli.create_branch(
"test_clog_truncate_new", "test_clog_truncate", ancestor_start_lsn=lsn_after_truncation
)
endpoint2 = env.endpoints.create_start("test_clog_truncate_new")
pg2 = env.postgres.create_start("test_clog_truncate_new")
log.info("postgres is running on test_clog_truncate_new branch")
# check that new node doesn't contain truncated segment
pg_xact_0000_path_new = os.path.join(endpoint2.pg_xact_dir_path(), "0000")
pg_xact_0000_path_new = os.path.join(pg2.pg_xact_dir_path(), "0000")
log.info(f"pg_xact_0000_path_new = {pg_xact_0000_path_new}")
assert os.path.isfile(pg_xact_0000_path_new) is False

View File

@@ -24,8 +24,8 @@ def test_lsof_pageserver_pid(neon_simple_env: NeonEnv):
def start_workload():
env.neon_cli.create_branch("test_lsof_pageserver_pid")
endpoint = env.endpoints.create_start("test_lsof_pageserver_pid")
with closing(endpoint.connect()) as conn:
pg = env.postgres.create_start("test_lsof_pageserver_pid")
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute("CREATE TABLE foo as SELECT x FROM generate_series(1,100000) x")
cur.execute("update foo set x=x+1")

View File

@@ -1,4 +1,3 @@
import copy
import os
import shutil
import subprocess
@@ -51,31 +50,29 @@ def test_create_snapshot(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, test_o
neon_env_builder.preserve_database_files = True
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start("main")
pg = env.postgres.create_start("main")
# FIXME: Is this expected?
env.pageserver.allowed_errors.append(
".*init_tenant_mgr: marking .* as locally complete, while it doesnt exist in remote index.*"
)
pg_bin.run(["pgbench", "--initialize", "--scale=10", endpoint.connstr()])
pg_bin.run(["pgbench", "--time=60", "--progress=2", endpoint.connstr()])
pg_bin.run(
["pg_dumpall", f"--dbname={endpoint.connstr()}", f"--file={test_output_dir / 'dump.sql'}"]
)
pg_bin.run(["pgbench", "--initialize", "--scale=10", pg.connstr()])
pg_bin.run(["pgbench", "--time=60", "--progress=2", pg.connstr()])
pg_bin.run(["pg_dumpall", f"--dbname={pg.connstr()}", f"--file={test_output_dir / 'dump.sql'}"])
snapshot_config = toml.load(test_output_dir / "repo" / "config")
tenant_id = snapshot_config["default_tenant_id"]
timeline_id = dict(snapshot_config["branch_name_mappings"]["main"])[tenant_id]
pageserver_http = env.pageserver.http_client()
lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
lsn = Lsn(pg.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
wait_for_last_record_lsn(pageserver_http, tenant_id, timeline_id, lsn)
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload(pageserver_http, tenant_id, timeline_id, lsn)
env.endpoints.stop_all()
env.postgres.stop_all()
for sk in env.safekeepers:
sk.stop()
env.pageserver.stop()
@@ -95,9 +92,6 @@ def test_backward_compatibility(
pg_version: str,
request: FixtureRequest,
):
"""
Test that the new binaries can read old data
"""
compatibility_snapshot_dir_env = os.environ.get("COMPATIBILITY_SNAPSHOT_DIR")
assert (
compatibility_snapshot_dir_env is not None
@@ -120,7 +114,6 @@ def test_backward_compatibility(
check_neon_works(
test_output_dir / "compatibility_snapshot" / "repo",
neon_binpath,
neon_binpath,
pg_distrib_dir,
pg_version,
port_distributor,
@@ -148,11 +141,7 @@ def test_forward_compatibility(
port_distributor: PortDistributor,
pg_version: str,
request: FixtureRequest,
neon_binpath: Path,
):
"""
Test that the old binaries can read new data
"""
compatibility_neon_bin_env = os.environ.get("COMPATIBILITY_NEON_BIN")
assert compatibility_neon_bin_env is not None, (
"COMPATIBILITY_NEON_BIN is not set. It should be set to a path with Neon binaries "
@@ -187,7 +176,6 @@ def test_forward_compatibility(
check_neon_works(
test_output_dir / "compatibility_snapshot" / "repo",
compatibility_neon_bin,
neon_binpath,
compatibility_postgres_distrib_dir,
pg_version,
port_distributor,
@@ -228,13 +216,9 @@ def prepare_snapshot(
for logfile in repo_dir.glob("**/*.log"):
logfile.unlink()
# Remove old computes in 'endpoints'. Old versions of the control plane used a directory
# called "pgdatadirs". Delete it, too.
if (repo_dir / "endpoints").exists():
shutil.rmtree(repo_dir / "endpoints")
if (repo_dir / "pgdatadirs").exists():
shutil.rmtree(repo_dir / "pgdatadirs")
os.mkdir(repo_dir / "endpoints")
# Remove tenants data for compute
for tenant in (repo_dir / "pgdatadirs" / "tenants").glob("*"):
shutil.rmtree(tenant)
# Remove wal-redo temp directory if it exists. Newer pageserver versions don't create
# them anymore, but old versions did.
@@ -335,8 +319,7 @@ def get_neon_version(neon_binpath: Path):
def check_neon_works(
repo_dir: Path,
neon_target_binpath: Path,
neon_current_binpath: Path,
neon_binpath: Path,
pg_distrib_dir: Path,
pg_version: str,
port_distributor: PortDistributor,
@@ -346,7 +329,7 @@ def check_neon_works(
):
snapshot_config_toml = repo_dir / "config"
snapshot_config = toml.load(snapshot_config_toml)
snapshot_config["neon_distrib_dir"] = str(neon_target_binpath)
snapshot_config["neon_distrib_dir"] = str(neon_binpath)
snapshot_config["postgres_distrib_dir"] = str(pg_distrib_dir)
with (snapshot_config_toml).open("w") as f:
toml.dump(snapshot_config, f)
@@ -357,25 +340,17 @@ def check_neon_works(
config.repo_dir = repo_dir
config.pg_version = pg_version
config.initial_tenant = snapshot_config["default_tenant_id"]
config.neon_binpath = neon_binpath
config.pg_distrib_dir = pg_distrib_dir
config.preserve_database_files = True
# Use the "target" binaries to launch the storage nodes
config_target = config
config_target.neon_binpath = neon_target_binpath
cli_target = NeonCli(config_target)
# And the current binaries to launch computes
config_current = copy.copy(config)
config_current.neon_binpath = neon_current_binpath
cli_current = NeonCli(config_current)
cli_target.raw_cli(["start"])
request.addfinalizer(lambda: cli_target.raw_cli(["stop"]))
cli = NeonCli(config)
cli.raw_cli(["start"])
request.addfinalizer(lambda: cli.raw_cli(["stop"]))
pg_port = port_distributor.get_port()
cli_current.endpoint_start("main", port=pg_port)
request.addfinalizer(lambda: cli_current.endpoint_stop("main"))
cli.pg_start("main", port=pg_port)
request.addfinalizer(lambda: cli.pg_stop("main"))
connstr = f"host=127.0.0.1 port={pg_port} user=cloud_admin dbname=postgres"
pg_bin.run(["pg_dumpall", f"--dbname={connstr}", f"--file={test_output_dir / 'dump.sql'}"])

View File

@@ -13,10 +13,10 @@ def test_sync_safekeepers_logs(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
ctl = ComputeCtl(env)
env.neon_cli.create_branch("test_compute_ctl", "main")
endpoint = env.endpoints.create_start("test_compute_ctl")
endpoint.safe_psql("CREATE TABLE t(key int primary key, value text)")
pg = env.postgres.create_start("test_compute_ctl")
pg.safe_psql("CREATE TABLE t(key int primary key, value text)")
with open(endpoint.config_file_path(), "r") as f:
with open(pg.config_file_path(), "r") as f:
cfg_lines = f.readlines()
cfg_map = {}
for line in cfg_lines:
@@ -24,13 +24,10 @@ def test_sync_safekeepers_logs(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
k, v = line.split("=")
cfg_map[k] = v.strip("\n '\"")
log.info(f"postgres config: {cfg_map}")
pgdata = endpoint.pg_data_dir_path()
pgdata = pg.pg_data_dir_path()
pg_bin_path = os.path.join(pg_bin.pg_bin_path, "postgres")
endpoint.stop_and_destroy()
# stop_and_destroy removes the whole endpoint directory. Recreate it.
Path(pgdata).mkdir(parents=True)
pg.stop_and_destroy()
spec = (
"""

View File

@@ -12,10 +12,10 @@ def test_config(neon_simple_env: NeonEnv):
env.neon_cli.create_branch("test_config", "empty")
# change config
endpoint = env.endpoints.create_start("test_config", config_lines=["log_min_messages=debug1"])
pg = env.postgres.create_start("test_config", config_lines=["log_min_messages=debug1"])
log.info("postgres is running on test_config branch")
with closing(endpoint.connect()) as conn:
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute(
"""

View File

@@ -21,11 +21,11 @@ def test_crafted_wal_end(neon_env_builder: NeonEnvBuilder, wal_type: str):
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_crafted_wal_end")
endpoint = env.endpoints.create("test_crafted_wal_end")
pg = env.postgres.create("test_crafted_wal_end")
wal_craft = WalCraft(env)
endpoint.config(wal_craft.postgres_config())
endpoint.start()
res = endpoint.safe_psql_many(
pg.config(wal_craft.postgres_config())
pg.start()
res = pg.safe_psql_many(
queries=[
"CREATE TABLE keys(key int primary key)",
"INSERT INTO keys SELECT generate_series(1, 100)",
@@ -34,7 +34,7 @@ def test_crafted_wal_end(neon_env_builder: NeonEnvBuilder, wal_type: str):
)
assert res[-1][0] == (5050,)
wal_craft.in_existing(wal_type, endpoint.connstr())
wal_craft.in_existing(wal_type, pg.connstr())
log.info("Restarting all safekeepers and pageservers")
env.pageserver.stop()
@@ -43,7 +43,7 @@ def test_crafted_wal_end(neon_env_builder: NeonEnvBuilder, wal_type: str):
env.pageserver.start()
log.info("Trying more queries")
res = endpoint.safe_psql_many(
res = pg.safe_psql_many(
queries=[
"SELECT SUM(key) FROM keys",
"INSERT INTO keys SELECT generate_series(101, 200)",
@@ -60,7 +60,7 @@ def test_crafted_wal_end(neon_env_builder: NeonEnvBuilder, wal_type: str):
env.pageserver.start()
log.info("Trying more queries (again)")
res = endpoint.safe_psql_many(
res = pg.safe_psql_many(
queries=[
"SELECT SUM(key) FROM keys",
"INSERT INTO keys SELECT generate_series(201, 300)",

View File

@@ -13,10 +13,10 @@ def test_createdb(neon_simple_env: NeonEnv):
env = neon_simple_env
env.neon_cli.create_branch("test_createdb", "empty")
endpoint = env.endpoints.create_start("test_createdb")
pg = env.postgres.create_start("test_createdb")
log.info("postgres is running on 'test_createdb' branch")
with endpoint.cursor() as cur:
with pg.cursor() as cur:
# Cause a 'relmapper' change in the original branch
cur.execute("VACUUM FULL pg_class")
@@ -26,10 +26,10 @@ def test_createdb(neon_simple_env: NeonEnv):
# Create a branch
env.neon_cli.create_branch("test_createdb2", "test_createdb", ancestor_start_lsn=lsn)
endpoint2 = env.endpoints.create_start("test_createdb2")
pg2 = env.postgres.create_start("test_createdb2")
# Test that you can connect to the new database on both branches
for db in (endpoint, endpoint2):
for db in (pg, pg2):
with db.cursor(dbname="foodb") as cur:
# Check database size in both branches
cur.execute(
@@ -55,17 +55,17 @@ def test_createdb(neon_simple_env: NeonEnv):
def test_dropdb(neon_simple_env: NeonEnv, test_output_dir):
env = neon_simple_env
env.neon_cli.create_branch("test_dropdb", "empty")
endpoint = env.endpoints.create_start("test_dropdb")
pg = env.postgres.create_start("test_dropdb")
log.info("postgres is running on 'test_dropdb' branch")
with endpoint.cursor() as cur:
with pg.cursor() as cur:
cur.execute("CREATE DATABASE foodb")
lsn_before_drop = query_scalar(cur, "SELECT pg_current_wal_insert_lsn()")
dboid = query_scalar(cur, "SELECT oid FROM pg_database WHERE datname='foodb';")
with endpoint.cursor() as cur:
with pg.cursor() as cur:
cur.execute("DROP DATABASE foodb")
cur.execute("CHECKPOINT")
@@ -76,29 +76,29 @@ def test_dropdb(neon_simple_env: NeonEnv, test_output_dir):
env.neon_cli.create_branch(
"test_before_dropdb", "test_dropdb", ancestor_start_lsn=lsn_before_drop
)
endpoint_before = env.endpoints.create_start("test_before_dropdb")
pg_before = env.postgres.create_start("test_before_dropdb")
env.neon_cli.create_branch(
"test_after_dropdb", "test_dropdb", ancestor_start_lsn=lsn_after_drop
)
endpoint_after = env.endpoints.create_start("test_after_dropdb")
pg_after = env.postgres.create_start("test_after_dropdb")
# Test that database exists on the branch before drop
endpoint_before.connect(dbname="foodb").close()
pg_before.connect(dbname="foodb").close()
# Test that database subdir exists on the branch before drop
assert endpoint_before.pgdata_dir
dbpath = pathlib.Path(endpoint_before.pgdata_dir) / "base" / str(dboid)
assert pg_before.pgdata_dir
dbpath = pathlib.Path(pg_before.pgdata_dir) / "base" / str(dboid)
log.info(dbpath)
assert os.path.isdir(dbpath) is True
# Test that database subdir doesn't exist on the branch after drop
assert endpoint_after.pgdata_dir
dbpath = pathlib.Path(endpoint_after.pgdata_dir) / "base" / str(dboid)
assert pg_after.pgdata_dir
dbpath = pathlib.Path(pg_after.pgdata_dir) / "base" / str(dboid)
log.info(dbpath)
assert os.path.isdir(dbpath) is False
# Check that we restore the content of the datadir correctly
check_restored_datadir_content(test_output_dir, env, endpoint)
check_restored_datadir_content(test_output_dir, env, pg)

View File

@@ -9,10 +9,10 @@ from fixtures.utils import query_scalar
def test_createuser(neon_simple_env: NeonEnv):
env = neon_simple_env
env.neon_cli.create_branch("test_createuser", "empty")
endpoint = env.endpoints.create_start("test_createuser")
pg = env.postgres.create_start("test_createuser")
log.info("postgres is running on 'test_createuser' branch")
with endpoint.cursor() as cur:
with pg.cursor() as cur:
# Cause a 'relmapper' change in the original branch
cur.execute("CREATE USER testuser with password %s", ("testpwd",))
@@ -22,7 +22,7 @@ def test_createuser(neon_simple_env: NeonEnv):
# Create a branch
env.neon_cli.create_branch("test_createuser2", "test_createuser", ancestor_start_lsn=lsn)
endpoint2 = env.endpoints.create_start("test_createuser2")
pg2 = env.postgres.create_start("test_createuser2")
# Test that you can connect to new branch as a new user
assert endpoint2.safe_psql("select current_user", user="testuser") == [("testuser",)]
assert pg2.safe_psql("select current_user", user="testuser") == [("testuser",)]

View File

@@ -91,8 +91,8 @@ class EvictionEnv:
This assumes that the tenant is still at the state after pbench -i.
"""
lsn = self.pgbench_init_lsns[tenant_id]
with self.neon_env.endpoints.create_start("main", tenant_id=tenant_id, lsn=lsn) as endpoint:
self.pg_bin.run(["pgbench", "-S", endpoint.connstr()])
with self.neon_env.postgres.create_start("main", tenant_id=tenant_id, lsn=lsn) as pg:
self.pg_bin.run(["pgbench", "-S", pg.connstr()])
def pageserver_start_with_disk_usage_eviction(
self, period, max_usage_pct, min_avail_bytes, mock_behavior
@@ -168,9 +168,9 @@ def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> Ev
}
)
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
pg_bin.run(["pgbench", "-i", f"-s{scale}", endpoint.connstr()])
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
with env.postgres.create_start("main", tenant_id=tenant_id) as pg:
pg_bin.run(["pgbench", "-i", f"-s{scale}", pg.connstr()])
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
timelines.append((tenant_id, timeline_id))

View File

@@ -4,7 +4,7 @@ from fixtures.neon_fixtures import NeonEnvBuilder
def test_fsm_truncate(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_fsm_truncate")
endpoint = env.endpoints.create_start("test_fsm_truncate")
endpoint.safe_psql(
pg = env.postgres.create_start("test_fsm_truncate")
pg.safe_psql(
"CREATE TABLE t1(key int); CREATE TABLE t2(key int); TRUNCATE TABLE t1; TRUNCATE TABLE t2;"
)

View File

@@ -24,10 +24,10 @@ def test_fullbackup(
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_fullbackup")
endpoint_main = env.endpoints.create_start("test_fullbackup")
pgmain = env.postgres.create_start("test_fullbackup")
log.info("postgres is running on 'test_fullbackup' branch")
with endpoint_main.cursor() as cur:
with pgmain.cursor() as cur:
timeline = TimelineId(query_scalar(cur, "SHOW neon.timeline_id"))
# data loading may take a while, so increase statement timeout

View File

@@ -5,9 +5,9 @@ import random
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
Endpoint,
NeonEnv,
NeonEnvBuilder,
Postgres,
RemoteStorageKind,
wait_for_last_flush_lsn,
)
@@ -26,9 +26,9 @@ updates_performed = 0
# Run random UPDATEs on test table
async def update_table(endpoint: Endpoint):
async def update_table(pg: Postgres):
global updates_performed
pg_conn = await endpoint.connect_async()
pg_conn = await pg.connect_async()
while updates_performed < updates_to_perform:
updates_performed += 1
@@ -52,10 +52,10 @@ async def gc(env: NeonEnv, timeline: TimelineId):
# At the same time, run UPDATEs and GC
async def update_and_gc(env: NeonEnv, endpoint: Endpoint, timeline: TimelineId):
async def update_and_gc(env: NeonEnv, pg: Postgres, timeline: TimelineId):
workers = []
for worker_id in range(num_connections):
workers.append(asyncio.create_task(update_table(endpoint)))
workers.append(asyncio.create_task(update_table(pg)))
workers.append(asyncio.create_task(gc(env, timeline)))
# await all workers
@@ -72,10 +72,10 @@ def test_gc_aggressive(neon_env_builder: NeonEnvBuilder):
neon_env_builder.pageserver_config_override = "tenant_config={pitr_interval = '0 sec'}"
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_gc_aggressive", "main")
endpoint = env.endpoints.create_start("test_gc_aggressive")
pg = env.postgres.create_start("test_gc_aggressive")
log.info("postgres is running on test_gc_aggressive branch")
with endpoint.cursor() as cur:
with pg.cursor() as cur:
timeline = TimelineId(query_scalar(cur, "SHOW neon.timeline_id"))
# Create table, and insert the first 100 rows
@@ -89,7 +89,7 @@ def test_gc_aggressive(neon_env_builder: NeonEnvBuilder):
)
cur.execute("CREATE INDEX ON foo(id)")
asyncio.run(update_and_gc(env, endpoint, timeline))
asyncio.run(update_and_gc(env, pg, timeline))
cur.execute("SELECT COUNT(*), SUM(counter) FROM foo")
r = cur.fetchone()
@@ -110,11 +110,11 @@ def test_gc_index_upload(neon_env_builder: NeonEnvBuilder, remote_storage_kind:
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_gc_index_upload", "main")
endpoint = env.endpoints.create_start("test_gc_index_upload")
pg = env.postgres.create_start("test_gc_index_upload")
pageserver_http = env.pageserver.http_client()
pg_conn = endpoint.connect()
pg_conn = pg.connect()
cur = pg_conn.cursor()
tenant_id = TenantId(query_scalar(cur, "SHOW neon.tenant_id"))
@@ -146,7 +146,7 @@ def test_gc_index_upload(neon_env_builder: NeonEnvBuilder, remote_storage_kind:
return int(total)
# Sanity check that the metric works
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
pageserver_http.timeline_gc(tenant_id, timeline_id, 10000)
before = get_num_remote_ops("index", "upload")

View File

@@ -31,8 +31,8 @@ def test_gc_cutoff(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
"image_creation_threshold": "2",
}
)
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
connstr = endpoint.connstr(options="-csynchronous_commit=off")
pg = env.postgres.create_start("main", tenant_id=tenant_id)
connstr = pg.connstr(options="-csynchronous_commit=off")
pg_bin.run_capture(["pgbench", "-i", "-s10", connstr])
pageserver_http.configure_failpoints(("after-timeline-gc-removed-layers", "exit"))

View File

@@ -9,10 +9,10 @@ from pathlib import Path
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
Endpoint,
NeonEnv,
NeonEnvBuilder,
PgBin,
Postgres,
wait_for_last_record_lsn,
wait_for_upload,
)
@@ -73,7 +73,7 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
start_lsn = manifest["WAL-Ranges"][0]["Start-LSN"]
end_lsn = manifest["WAL-Ranges"][0]["End-LSN"]
endpoint_id = "ep-import_from_vanilla"
node_name = "import_from_vanilla"
tenant = TenantId.generate()
timeline = TimelineId.generate()
@@ -114,7 +114,7 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
"--timeline-id",
str(timeline),
"--node-name",
endpoint_id,
node_name,
"--base-lsn",
start_lsn,
"--base-tarfile",
@@ -154,8 +154,8 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
wait_for_upload(client, tenant, timeline, Lsn(end_lsn))
# Check it worked
endpoint = env.endpoints.create_start(endpoint_id, tenant_id=tenant)
assert endpoint.safe_psql("select count(*) from t") == [(300000,)]
pg = env.postgres.create_start(node_name, tenant_id=tenant)
assert pg.safe_psql("select count(*) from t") == [(300000,)]
@pytest.mark.timeout(600)
@@ -169,10 +169,10 @@ def test_import_from_pageserver_small(pg_bin: PgBin, neon_env_builder: NeonEnvBu
)
timeline = env.neon_cli.create_branch("test_import_from_pageserver_small")
endpoint = env.endpoints.create_start("test_import_from_pageserver_small")
pg = env.postgres.create_start("test_import_from_pageserver_small")
num_rows = 3000
lsn = _generate_data(num_rows, endpoint)
lsn = _generate_data(num_rows, pg)
_import(num_rows, lsn, env, pg_bin, timeline, env.pg_distrib_dir)
@@ -186,14 +186,14 @@ def test_import_from_pageserver_multisegment(pg_bin: PgBin, neon_env_builder: Ne
env = neon_env_builder.init_start()
timeline = env.neon_cli.create_branch("test_import_from_pageserver_multisegment")
endpoint = env.endpoints.create_start("test_import_from_pageserver_multisegment")
pg = env.postgres.create_start("test_import_from_pageserver_multisegment")
# For `test_import_from_pageserver_multisegment`, we want to make sure that the data
# is large enough to create multi-segment files. Typically, a segment file's size is
# at most 1GB. A large number of inserted rows (`30000000`) is used to increase the
# DB size to above 1GB. Related: https://github.com/neondatabase/neon/issues/2097.
num_rows = 30000000
lsn = _generate_data(num_rows, endpoint)
lsn = _generate_data(num_rows, pg)
logical_size = env.pageserver.http_client().timeline_detail(env.initial_tenant, timeline)[
"current_logical_size"
@@ -214,12 +214,12 @@ def test_import_from_pageserver_multisegment(pg_bin: PgBin, neon_env_builder: Ne
assert cnt_seg_files > 0
def _generate_data(num_rows: int, endpoint: Endpoint) -> Lsn:
def _generate_data(num_rows: int, pg: Postgres) -> Lsn:
"""Generate a table with `num_rows` rows.
Returns:
the latest insert WAL's LSN"""
with closing(endpoint.connect()) as conn:
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
# data loading may take a while, so increase statement timeout
cur.execute("SET statement_timeout='300s'")
@@ -264,7 +264,7 @@ def _import(
tar_output_file = result_basepath + ".stdout"
# Stop the first pageserver instance, erase all its data
env.endpoints.stop_all()
env.postgres.stop_all()
env.pageserver.stop()
dir_to_clear = Path(env.repo_dir) / "tenants"
@@ -279,7 +279,7 @@ def _import(
tenant = TenantId.generate()
# Import to pageserver
endpoint_id = "ep-import_from_pageserver"
node_name = "import_from_pageserver"
client = env.pageserver.http_client()
client.tenant_create(tenant)
env.neon_cli.raw_cli(
@@ -291,7 +291,7 @@ def _import(
"--timeline-id",
str(timeline),
"--node-name",
endpoint_id,
node_name,
"--base-lsn",
str(lsn),
"--base-tarfile",
@@ -306,8 +306,8 @@ def _import(
wait_for_upload(client, tenant, timeline, lsn)
# Check it worked
endpoint = env.endpoints.create_start(endpoint_id, tenant_id=tenant)
assert endpoint.safe_psql("select count(*) from tbl") == [(expected_num_rows,)]
pg = env.postgres.create_start(node_name, tenant_id=tenant)
assert pg.safe_psql("select count(*) from tbl") == [(expected_num_rows,)]
# Take another fullbackup
query = f"fullbackup { tenant} {timeline} {lsn}"

View File

@@ -15,9 +15,9 @@ from fixtures.neon_fixtures import NeonEnvBuilder
def test_large_schema(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start("main")
pg = env.postgres.create_start("main")
conn = endpoint.connect()
conn = pg.connect()
cur = conn.cursor()
tables = 2 # 10 is too much for debug build
@@ -27,18 +27,18 @@ def test_large_schema(neon_env_builder: NeonEnvBuilder):
# Restart compute. Restart is actually not strictly needed.
# It is done mostly because this test originally tries to model the problem reported by Ketteq.
endpoint.stop()
pg.stop()
# Kill and restart the pageserver.
# env.pageserver.stop(immediate=True)
# env.pageserver.start()
endpoint.start()
pg.start()
retry_sleep = 0.5
max_retries = 200
retries = 0
while True:
try:
conn = endpoint.connect()
conn = pg.connect()
cur = conn.cursor()
cur.execute(f"CREATE TABLE if not exists t_{i}(pk integer) partition by range (pk)")
for j in range(1, partitions + 1):
@@ -63,7 +63,7 @@ def test_large_schema(neon_env_builder: NeonEnvBuilder):
raise
break
conn = endpoint.connect()
conn = pg.connect()
cur = conn.cursor()
for i in range(1, tables + 1):
@@ -74,8 +74,8 @@ def test_large_schema(neon_env_builder: NeonEnvBuilder):
cur.execute("select * from pg_depend order by refclassid, refobjid, refobjsubid")
# Check layer file sizes
tenant_id = endpoint.safe_psql("show neon.tenant_id")[0][0]
timeline_id = endpoint.safe_psql("show neon.timeline_id")[0][0]
tenant_id = pg.safe_psql("show neon.tenant_id")[0][0]
timeline_id = pg.safe_psql("show neon.timeline_id")[0][0]
timeline_path = "{}/tenants/{}/timelines/{}/".format(env.repo_dir, tenant_id, timeline_id)
for filename in os.listdir(timeline_path):
if filename.startswith("00000"):

View File

@@ -28,13 +28,13 @@ def test_basic_eviction(
env = neon_env_builder.init_start()
client = env.pageserver.http_client()
endpoint = env.endpoints.create_start("main")
pg = env.postgres.create_start("main")
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
tenant_id = TenantId(pg.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0])
# Create a number of layers in the tenant
with endpoint.cursor() as cur:
with pg.cursor() as cur:
cur.execute("CREATE TABLE foo (t text)")
cur.execute(
"""
@@ -173,15 +173,15 @@ def test_gc_of_remote_layers(neon_env_builder: NeonEnvBuilder):
env.initial_tenant = tenant_id # update_and_gc relies on this
ps_http = env.pageserver.http_client()
endpoint = env.endpoints.create_start("main")
pg = env.postgres.create_start("main")
log.info("fill with data, creating delta & image layers, some of which are GC'able after")
# no particular reason to create the layers like this, but we are sure
# not to hit the image_creation_threshold here.
with endpoint.cursor() as cur:
with pg.cursor() as cur:
cur.execute("create table a (id bigserial primary key, some_value bigint not null)")
cur.execute("insert into a(some_value) select i from generate_series(1, 10000) s(i)")
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
ps_http.timeline_checkpoint(tenant_id, timeline_id)
# Create delta layers, then turn them into image layers.
@@ -192,19 +192,19 @@ def test_gc_of_remote_layers(neon_env_builder: NeonEnvBuilder):
for i in range(0, 2):
for j in range(0, 3):
# create a minimal amount of "delta difficulty" for this table
with endpoint.cursor() as cur:
with pg.cursor() as cur:
cur.execute("update a set some_value = -some_value + %s", (j,))
with endpoint.cursor() as cur:
with pg.cursor() as cur:
# vacuuming should aid to reuse keys, though it's not really important
# with image_creation_threshold=1 which we will use on the last compaction
cur.execute("vacuum")
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
if i == 1 and j == 2 and k == 1:
# last iteration; stop before checkpoint to avoid leaving an inmemory layer
endpoint.stop_and_destroy()
pg.stop_and_destroy()
ps_http.timeline_checkpoint(tenant_id, timeline_id)

View File

@@ -20,7 +20,7 @@ def test_image_layer_writer_fail_before_finish(neon_simple_env: NeonEnv):
}
)
pg = env.endpoints.create_start("main", tenant_id=tenant_id)
pg = env.postgres.create_start("main", tenant_id=tenant_id)
pg.safe_psql_many(
[
"CREATE TABLE foo (t text) WITH (autovacuum_enabled = off)",
@@ -64,8 +64,8 @@ def test_delta_layer_writer_fail_before_finish(neon_simple_env: NeonEnv):
}
)
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
endpoint.safe_psql_many(
pg = env.postgres.create_start("main", tenant_id=tenant_id)
pg.safe_psql_many(
[
"CREATE TABLE foo (t text) WITH (autovacuum_enabled = off)",
"""INSERT INTO foo

View File

@@ -12,10 +12,10 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
new_timeline_id = env.neon_cli.create_branch("test_lsn_mapping")
endpoint_main = env.endpoints.create_start("test_lsn_mapping")
pgmain = env.postgres.create_start("test_lsn_mapping")
log.info("postgres is running on 'test_lsn_mapping' branch")
cur = endpoint_main.connect().cursor()
cur = pgmain.connect().cursor()
# Create table, and insert rows, each in a separate transaction
# Disable synchronous_commit to make this initialization go faster.
#
@@ -35,7 +35,7 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
cur.execute("INSERT INTO foo VALUES (-1)")
# Wait until WAL is received by pageserver
wait_for_last_flush_lsn(env, endpoint_main, env.initial_tenant, new_timeline_id)
wait_for_last_flush_lsn(env, pgmain, env.initial_tenant, new_timeline_id)
with env.pageserver.http_client() as client:
# Check edge cases: timestamp in the future
@@ -61,9 +61,9 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
# Call get_lsn_by_timestamp to get the LSN
# Launch a new read-only node at that LSN, and check that only the rows
# that were supposed to be committed at that point in time are visible.
endpoint_here = env.endpoints.create_start(
branch_name="test_lsn_mapping", endpoint_id="ep-lsn_mapping_read", lsn=lsn
pg_here = env.postgres.create_start(
branch_name="test_lsn_mapping", node_name="test_lsn_mapping_read", lsn=lsn
)
assert endpoint_here.safe_psql("SELECT max(x) FROM foo")[0][0] == i
assert pg_here.safe_psql("SELECT max(x) FROM foo")[0][0] == i
endpoint_here.stop_and_destroy()
pg_here.stop_and_destroy()

View File

@@ -123,9 +123,9 @@ def test_metric_collection(
# before pageserver, pageserver log might contain such errors in the end.
env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*")
env.neon_cli.create_branch("test_metric_collection")
endpoint = env.endpoints.create_start("test_metric_collection")
pg = env.postgres.create_start("test_metric_collection")
pg_conn = endpoint.connect()
pg_conn = pg.connect()
cur = pg_conn.cursor()
tenant_id = TenantId(query_scalar(cur, "SHOW neon.tenant_id"))
@@ -158,7 +158,7 @@ def test_metric_collection(
# upload some data to remote storage
if remote_storage_kind == RemoteStorageKind.LOCAL_FS:
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
pageserver_http = env.pageserver.http_client()
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
pageserver_http.timeline_gc(tenant_id, timeline_id, 10000)

View File

@@ -12,10 +12,10 @@ from fixtures.utils import query_scalar
def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
env = neon_simple_env
env.neon_cli.create_branch("test_multixact", "empty")
endpoint = env.endpoints.create_start("test_multixact")
pg = env.postgres.create_start("test_multixact")
log.info("postgres is running on 'test_multixact' branch")
cur = endpoint.connect().cursor()
cur = pg.connect().cursor()
cur.execute(
"""
CREATE TABLE t1(i int primary key);
@@ -32,7 +32,7 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
connections = []
for i in range(nclients):
# Do not turn on autocommit. We want to hold the key-share locks.
conn = endpoint.connect(autocommit=False)
conn = pg.connect(autocommit=False)
connections.append(conn)
# On each iteration, we commit the previous transaction on a connection,
@@ -65,10 +65,10 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
# Branch at this point
env.neon_cli.create_branch("test_multixact_new", "test_multixact", ancestor_start_lsn=lsn)
endpoint_new = env.endpoints.create_start("test_multixact_new")
pg_new = env.postgres.create_start("test_multixact_new")
log.info("postgres is running on 'test_multixact_new' branch")
next_multixact_id_new = endpoint_new.safe_psql(
next_multixact_id_new = pg_new.safe_psql(
"SELECT next_multixact_id FROM pg_control_checkpoint()"
)[0][0]
@@ -76,4 +76,4 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
assert next_multixact_id_new == next_multixact_id
# Check that we can restore the content of the datadir correctly
check_restored_datadir_content(test_output_dir, env, endpoint)
check_restored_datadir_content(test_output_dir, env, pg)

View File

@@ -9,11 +9,9 @@ def test_neon_cli_basics(neon_env_builder: NeonEnvBuilder, port_distributor: Por
try:
env.neon_cli.start()
env.neon_cli.create_tenant(tenant_id=env.initial_tenant, set_default=True)
env.neon_cli.endpoint_start(endpoint_id="ep-main", port=port_distributor.get_port())
env.neon_cli.pg_start(node_name="main", port=port_distributor.get_port())
env.neon_cli.create_branch(new_branch_name="migration_check")
env.neon_cli.endpoint_start(
endpoint_id="ep-migration_check", port=port_distributor.get_port()
)
env.neon_cli.pg_start(node_name="migration_check", port=port_distributor.get_port())
finally:
env.neon_cli.stop()

View File

@@ -8,9 +8,9 @@ from fixtures.neon_fixtures import NeonEnvBuilder
def test_next_xid(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start("main")
pg = env.postgres.create_start("main")
conn = endpoint.connect()
conn = pg.connect()
cur = conn.cursor()
cur.execute("CREATE TABLE t(x integer)")
@@ -19,17 +19,17 @@ def test_next_xid(neon_env_builder: NeonEnvBuilder):
print(f"iteration {i} / {iterations}")
# Kill and restart the pageserver.
endpoint.stop()
pg.stop()
env.pageserver.stop(immediate=True)
env.pageserver.start()
endpoint.start()
pg.start()
retry_sleep = 0.5
max_retries = 200
retries = 0
while True:
try:
conn = endpoint.connect()
conn = pg.connect()
cur = conn.cursor()
cur.execute(f"INSERT INTO t values({i})")
conn.close()
@@ -48,7 +48,7 @@ def test_next_xid(neon_env_builder: NeonEnvBuilder):
raise
break
conn = endpoint.connect()
conn = pg.connect()
cur = conn.cursor()
cur.execute("SELECT count(*) FROM t")
assert cur.fetchone() == (iterations,)

View File

@@ -5,9 +5,9 @@ from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, PageserverHttpClient
def check_tenant(env: NeonEnv, pageserver_http: PageserverHttpClient):
tenant_id, timeline_id = env.neon_cli.create_tenant()
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
pg = env.postgres.create_start("main", tenant_id=tenant_id)
# we rely upon autocommit after each statement
res_1 = endpoint.safe_psql_many(
res_1 = pg.safe_psql_many(
queries=[
"CREATE TABLE t(key int primary key, value text)",
"INSERT INTO t SELECT generate_series(1,100000), 'payload'",
@@ -18,14 +18,14 @@ def check_tenant(env: NeonEnv, pageserver_http: PageserverHttpClient):
assert res_1[-1][0] == (5000050000,)
# TODO check detach on live instance
log.info("stopping compute")
endpoint.stop()
pg.stop()
log.info("compute stopped")
endpoint.start()
res_2 = endpoint.safe_psql("SELECT sum(key) FROM t")
pg.start()
res_2 = pg.safe_psql("SELECT sum(key) FROM t")
assert res_2[0] == (5000050000,)
endpoint.stop()
pg.stop()
pageserver_http.tenant_detach(tenant_id)

View File

@@ -19,10 +19,10 @@ def test_old_request_lsn(neon_env_builder: NeonEnvBuilder):
neon_env_builder.pageserver_config_override = "tenant_config={pitr_interval = '0 sec'}"
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_old_request_lsn", "main")
endpoint = env.endpoints.create_start("test_old_request_lsn")
pg = env.postgres.create_start("test_old_request_lsn")
log.info("postgres is running on test_old_request_lsn branch")
pg_conn = endpoint.connect()
pg_conn = pg.connect()
cur = pg_conn.cursor()
# Get the timeline ID of our branch. We need it for the 'do_gc' command

View File

@@ -73,17 +73,17 @@ def test_ondemand_download_large_rel(
)
env.initial_tenant = tenant
endpoint = env.endpoints.create_start("main")
pg = env.postgres.create_start("main")
client = env.pageserver.http_client()
tenant_id = endpoint.safe_psql("show neon.tenant_id")[0][0]
timeline_id = endpoint.safe_psql("show neon.timeline_id")[0][0]
tenant_id = pg.safe_psql("show neon.tenant_id")[0][0]
timeline_id = pg.safe_psql("show neon.timeline_id")[0][0]
# We want to make sure that the data is large enough that the keyspace is partitioned.
num_rows = 1000000
with endpoint.cursor() as cur:
with pg.cursor() as cur:
# data loading may take a while, so increase statement timeout
cur.execute("SET statement_timeout='300s'")
cur.execute(
@@ -106,7 +106,7 @@ def test_ondemand_download_large_rel(
log.info("uploads have finished")
##### Stop the first pageserver instance, erase all its data
endpoint.stop()
pg.stop()
env.pageserver.stop()
# remove all the layer files
@@ -117,7 +117,7 @@ def test_ondemand_download_large_rel(
##### Second start, restore the data and ensure it's the same
env.pageserver.start()
endpoint.start()
pg.start()
before_downloads = get_num_downloaded_layers(client, tenant_id, timeline_id)
# Probe in the middle of the table. There's a high chance that the beginning
@@ -125,7 +125,7 @@ def test_ondemand_download_large_rel(
# from other tables, and with the entry that stores the size of the
# relation, so they are likely already downloaded. But the middle of the
# table should not have been needed by anything yet.
with endpoint.cursor() as cur:
with pg.cursor() as cur:
assert query_scalar(cur, "select count(*) from tbl where id = 500000") == 1
after_downloads = get_num_downloaded_layers(client, tenant_id, timeline_id)
@@ -167,17 +167,17 @@ def test_ondemand_download_timetravel(
)
env.initial_tenant = tenant
endpoint = env.endpoints.create_start("main")
pg = env.postgres.create_start("main")
client = env.pageserver.http_client()
tenant_id = endpoint.safe_psql("show neon.tenant_id")[0][0]
timeline_id = endpoint.safe_psql("show neon.timeline_id")[0][0]
tenant_id = pg.safe_psql("show neon.tenant_id")[0][0]
timeline_id = pg.safe_psql("show neon.timeline_id")[0][0]
lsns = []
table_len = 10000
with endpoint.cursor() as cur:
with pg.cursor() as cur:
cur.execute(
f"""
CREATE TABLE testtab(id serial primary key, checkpoint_number int, data text);
@@ -192,7 +192,7 @@ def test_ondemand_download_timetravel(
lsns.append((0, current_lsn))
for checkpoint_number in range(1, 20):
with endpoint.cursor() as cur:
with pg.cursor() as cur:
cur.execute(f"UPDATE testtab SET checkpoint_number = {checkpoint_number}")
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
lsns.append((checkpoint_number, current_lsn))
@@ -204,7 +204,7 @@ def test_ondemand_download_timetravel(
client.timeline_checkpoint(tenant_id, timeline_id)
##### Stop the first pageserver instance, erase all its data
env.endpoints.stop_all()
env.postgres.stop_all()
# wait until pageserver has successfully uploaded all the data to remote storage
wait_for_sk_commit_lsn_to_reach_remote_storage(
@@ -251,10 +251,10 @@ def test_ondemand_download_timetravel(
num_layers_downloaded = [0]
resident_size = [get_resident_physical_size()]
for checkpoint_number, lsn in lsns:
endpoint_old = env.endpoints.create_start(
branch_name="main", endpoint_id=f"ep-old_lsn_{checkpoint_number}", lsn=lsn
pg_old = env.postgres.create_start(
branch_name="main", node_name=f"test_old_lsn_{checkpoint_number}", lsn=lsn
)
with endpoint_old.cursor() as cur:
with pg_old.cursor() as cur:
# assert query_scalar(cur, f"select count(*) from testtab where checkpoint_number={checkpoint_number}") == 100000
assert (
query_scalar(
@@ -331,15 +331,15 @@ def test_download_remote_layers_api(
)
env.initial_tenant = tenant
endpoint = env.endpoints.create_start("main")
pg = env.postgres.create_start("main")
client = env.pageserver.http_client()
tenant_id = endpoint.safe_psql("show neon.tenant_id")[0][0]
timeline_id = endpoint.safe_psql("show neon.timeline_id")[0][0]
tenant_id = pg.safe_psql("show neon.tenant_id")[0][0]
timeline_id = pg.safe_psql("show neon.timeline_id")[0][0]
table_len = 10000
with endpoint.cursor() as cur:
with pg.cursor() as cur:
cur.execute(
f"""
CREATE TABLE testtab(id serial primary key, checkpoint_number int, data text);
@@ -347,7 +347,7 @@ def test_download_remote_layers_api(
"""
)
env.endpoints.stop_all()
env.postgres.stop_all()
wait_for_sk_commit_lsn_to_reach_remote_storage(
tenant_id, timeline_id, env.safekeepers, env.pageserver
@@ -463,8 +463,8 @@ def test_download_remote_layers_api(
sk.start()
# ensure that all the data is back
endpoint_old = env.endpoints.create_start(branch_name="main")
with endpoint_old.cursor() as cur:
pg_old = env.postgres.create_start(branch_name="main")
with pg_old.cursor() as cur:
assert query_scalar(cur, "select count(*) from testtab") == table_len
@@ -513,17 +513,17 @@ def test_compaction_downloads_on_demand_without_image_creation(
env.initial_tenant = tenant_id
pageserver_http = env.pageserver.http_client()
with env.endpoints.create_start("main") as endpoint:
with env.postgres.create_start("main") as pg:
# no particular reason to create the layers like this, but we are sure
# not to hit the image_creation_threshold here.
with endpoint.cursor() as cur:
with pg.cursor() as cur:
cur.execute("create table a as select id::bigint from generate_series(1, 204800) s(id)")
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
with endpoint.cursor() as cur:
with pg.cursor() as cur:
cur.execute("update a set id = -id")
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
layers = pageserver_http.layer_map_info(tenant_id, timeline_id)
@@ -589,32 +589,32 @@ def test_compaction_downloads_on_demand_with_image_creation(
env.initial_tenant = tenant_id
pageserver_http = env.pageserver.http_client()
endpoint = env.endpoints.create_start("main")
pg = env.postgres.create_start("main")
# no particular reason to create the layers like this, but we are sure
# not to hit the image_creation_threshold here.
with endpoint.cursor() as cur:
with pg.cursor() as cur:
cur.execute("create table a (id bigserial primary key, some_value bigint not null)")
cur.execute("insert into a(some_value) select i from generate_series(1, 10000) s(i)")
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
for i in range(0, 2):
for j in range(0, 3):
# create a minimal amount of "delta difficulty" for this table
with endpoint.cursor() as cur:
with pg.cursor() as cur:
cur.execute("update a set some_value = -some_value + %s", (j,))
with endpoint.cursor() as cur:
with pg.cursor() as cur:
# vacuuming should aid to reuse keys, though it's not really important
# with image_creation_threshold=1 which we will use on the last compaction
cur.execute("vacuum")
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
if i == 1 and j == 2:
# last iteration; stop before checkpoint to avoid leaving an inmemory layer
endpoint.stop_and_destroy()
pg.stop_and_destroy()
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)

View File

@@ -150,7 +150,7 @@ def test_pageserver_http_get_wal_receiver_success(neon_simple_env: NeonEnv):
env = neon_simple_env
with env.pageserver.http_client() as client:
tenant_id, timeline_id = env.neon_cli.create_tenant()
endpoint = env.endpoints.create_start(DEFAULT_BRANCH_NAME, tenant_id=tenant_id)
pg = env.postgres.create_start(DEFAULT_BRANCH_NAME, tenant_id=tenant_id)
# Wait to make sure that we get a latest WAL receiver data.
# We need to wait here because it's possible that we don't have access to
@@ -163,7 +163,7 @@ def test_pageserver_http_get_wal_receiver_success(neon_simple_env: NeonEnv):
)
# Make a DB modification then expect getting a new WAL receiver's data.
endpoint.safe_psql("CREATE TABLE t(key int primary key, value text)")
pg.safe_psql("CREATE TABLE t(key int primary key, value text)")
wait_until(
number_of_iterations=5,
interval=1,

View File

@@ -11,11 +11,11 @@ def test_pageserver_catchup_while_compute_down(neon_env_builder: NeonEnvBuilder)
env.neon_cli.create_branch("test_pageserver_catchup_while_compute_down")
# Make shared_buffers large to ensure we won't query pageserver while it is down.
endpoint = env.endpoints.create_start(
pg = env.postgres.create_start(
"test_pageserver_catchup_while_compute_down", config_lines=["shared_buffers=512MB"]
)
pg_conn = endpoint.connect()
pg_conn = pg.connect()
cur = pg_conn.cursor()
# Create table, and insert some rows.
@@ -59,10 +59,10 @@ def test_pageserver_catchup_while_compute_down(neon_env_builder: NeonEnvBuilder)
env.safekeepers[2].start()
# restart compute node
endpoint.stop_and_destroy().create_start("test_pageserver_catchup_while_compute_down")
pg.stop_and_destroy().create_start("test_pageserver_catchup_while_compute_down")
# Ensure that basebackup went correct and pageserver returned all data
pg_conn = endpoint.connect()
pg_conn = pg.connect()
cur = pg_conn.cursor()
cur.execute("SELECT count(*) FROM foo")

View File

@@ -11,9 +11,9 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_pageserver_restart")
endpoint = env.endpoints.create_start("test_pageserver_restart")
pg = env.postgres.create_start("test_pageserver_restart")
pg_conn = endpoint.connect()
pg_conn = pg.connect()
cur = pg_conn.cursor()
# Create table, and insert some rows. Make it big enough that it doesn't fit in
@@ -84,13 +84,13 @@ def test_pageserver_chaos(neon_env_builder: NeonEnvBuilder):
}
)
env.neon_cli.create_timeline("test_pageserver_chaos", tenant_id=tenant)
endpoint = env.endpoints.create_start("test_pageserver_chaos", tenant_id=tenant)
pg = env.postgres.create_start("test_pageserver_chaos", tenant_id=tenant)
# Create table, and insert some rows. Make it big enough that it doesn't fit in
# shared_buffers, otherwise the SELECT after restart will just return answer
# from shared_buffers without hitting the page server, which defeats the point
# of this test.
with closing(endpoint.connect()) as conn:
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute("CREATE TABLE foo (id int, t text, updates int)")
cur.execute("CREATE INDEX ON foo (id)")
@@ -116,12 +116,12 @@ def test_pageserver_chaos(neon_env_builder: NeonEnvBuilder):
# Update the whole table, then immediately kill and restart the pageserver
for i in range(1, 15):
endpoint.safe_psql("UPDATE foo set updates = updates + 1")
pg.safe_psql("UPDATE foo set updates = updates + 1")
# This kills the pageserver immediately, to simulate a crash
env.pageserver.stop(immediate=True)
env.pageserver.start()
# Check that all the updates are visible
num_updates = endpoint.safe_psql("SELECT sum(updates) FROM foo")[0][0]
num_updates = pg.safe_psql("SELECT sum(updates) FROM foo")[0][0]
assert num_updates == i * 100000

View File

@@ -5,7 +5,7 @@ import threading
import time
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, PgBin
from fixtures.neon_fixtures import NeonEnv, PgBin, Postgres
# Test restarting page server, while safekeeper and compute node keep
@@ -13,7 +13,7 @@ from fixtures.neon_fixtures import NeonEnv, PgBin
def test_pageserver_restarts_under_worload(neon_simple_env: NeonEnv, pg_bin: PgBin):
env = neon_simple_env
env.neon_cli.create_branch("test_pageserver_restarts")
endpoint = env.endpoints.create_start("test_pageserver_restarts")
pg = env.postgres.create_start("test_pageserver_restarts")
n_restarts = 10
scale = 10
@@ -23,12 +23,13 @@ def test_pageserver_restarts_under_worload(neon_simple_env: NeonEnv, pg_bin: PgB
r".*Gc failed, retrying in \S+: Cannot run GC iteration on inactive tenant"
)
def run_pgbench(connstr: str):
def run_pgbench(pg: Postgres):
connstr = pg.connstr()
log.info(f"Start a pgbench workload on pg {connstr}")
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])
pg_bin.run_capture(["pgbench", f"-T{n_restarts}", connstr])
thread = threading.Thread(target=run_pgbench, args=(endpoint.connstr(),), daemon=True)
thread = threading.Thread(target=run_pgbench, args=(pg,), daemon=True)
thread.start()
for i in range(n_restarts):

View File

@@ -2,7 +2,7 @@ import asyncio
from io import BytesIO
from fixtures.log_helper import log
from fixtures.neon_fixtures import Endpoint, NeonEnv
from fixtures.neon_fixtures import NeonEnv, Postgres
async def repeat_bytes(buf, repetitions: int):
@@ -10,7 +10,7 @@ async def repeat_bytes(buf, repetitions: int):
yield buf
async def copy_test_data_to_table(endpoint: Endpoint, worker_id: int, table_name: str):
async def copy_test_data_to_table(pg: Postgres, worker_id: int, table_name: str):
buf = BytesIO()
for i in range(1000):
buf.write(
@@ -20,7 +20,7 @@ async def copy_test_data_to_table(endpoint: Endpoint, worker_id: int, table_name
copy_input = repeat_bytes(buf.read(), 5000)
pg_conn = await endpoint.connect_async()
pg_conn = await pg.connect_async()
# PgProtocol.connect_async sets statement_timeout to 2 minutes.
# That's not enough for this test, on a slow system in debug mode.
@@ -29,10 +29,10 @@ async def copy_test_data_to_table(endpoint: Endpoint, worker_id: int, table_name
await pg_conn.copy_to_table(table_name, source=copy_input)
async def parallel_load_same_table(endpoint: Endpoint, n_parallel: int):
async def parallel_load_same_table(pg: Postgres, n_parallel: int):
workers = []
for worker_id in range(n_parallel):
worker = copy_test_data_to_table(endpoint, worker_id, "copytest")
worker = copy_test_data_to_table(pg, worker_id, "copytest")
workers.append(asyncio.create_task(worker))
# await all workers
@@ -43,13 +43,13 @@ async def parallel_load_same_table(endpoint: Endpoint, n_parallel: int):
def test_parallel_copy(neon_simple_env: NeonEnv, n_parallel=5):
env = neon_simple_env
env.neon_cli.create_branch("test_parallel_copy", "empty")
endpoint = env.endpoints.create_start("test_parallel_copy")
pg = env.postgres.create_start("test_parallel_copy")
log.info("postgres is running on 'test_parallel_copy' branch")
# Create test table
conn = endpoint.connect()
conn = pg.connect()
cur = conn.cursor()
cur.execute("CREATE TABLE copytest (i int, t text)")
# Run COPY TO to load the table with parallel connections.
asyncio.run(parallel_load_same_table(endpoint, n_parallel))
asyncio.run(parallel_load_same_table(pg, n_parallel))

View File

@@ -24,8 +24,8 @@ def test_pg_regress(
env.neon_cli.create_branch("test_pg_regress", "empty")
# Connect to postgres and create a database called "regression".
endpoint = env.endpoints.create_start("test_pg_regress")
endpoint.safe_psql("CREATE DATABASE regression")
pg = env.postgres.create_start("test_pg_regress")
pg.safe_psql("CREATE DATABASE regression")
# Create some local directories for pg_regress to run in.
runpath = test_output_dir / "regress"
@@ -49,9 +49,9 @@ def test_pg_regress(
]
env_vars = {
"PGPORT": str(endpoint.default_options["port"]),
"PGUSER": endpoint.default_options["user"],
"PGHOST": endpoint.default_options["host"],
"PGPORT": str(pg.default_options["port"]),
"PGUSER": pg.default_options["user"],
"PGHOST": pg.default_options["host"],
}
# Run the command.
@@ -61,10 +61,10 @@ def test_pg_regress(
pg_bin.run(pg_regress_command, env=env_vars, cwd=runpath)
# checkpoint one more time to ensure that the lsn we get is the latest one
endpoint.safe_psql("CHECKPOINT")
pg.safe_psql("CHECKPOINT")
# Check that we restore the content of the datadir correctly
check_restored_datadir_content(test_output_dir, env, endpoint)
check_restored_datadir_content(test_output_dir, env, pg)
# Run the PostgreSQL "isolation" tests, in src/test/isolation.
@@ -85,10 +85,8 @@ def test_isolation(
env.neon_cli.create_branch("test_isolation", "empty")
# Connect to postgres and create a database called "regression".
# isolation tests use prepared transactions, so enable them
endpoint = env.endpoints.create_start(
"test_isolation", config_lines=["max_prepared_transactions=100"]
)
endpoint.safe_psql("CREATE DATABASE isolation_regression")
pg = env.postgres.create_start("test_isolation", config_lines=["max_prepared_transactions=100"])
pg.safe_psql("CREATE DATABASE isolation_regression")
# Create some local directories for pg_isolation_regress to run in.
runpath = test_output_dir / "regress"
@@ -111,9 +109,9 @@ def test_isolation(
]
env_vars = {
"PGPORT": str(endpoint.default_options["port"]),
"PGUSER": endpoint.default_options["user"],
"PGHOST": endpoint.default_options["host"],
"PGPORT": str(pg.default_options["port"]),
"PGUSER": pg.default_options["user"],
"PGHOST": pg.default_options["host"],
}
# Run the command.
@@ -137,8 +135,8 @@ def test_sql_regress(
env.neon_cli.create_branch("test_sql_regress", "empty")
# Connect to postgres and create a database called "regression".
endpoint = env.endpoints.create_start("test_sql_regress")
endpoint.safe_psql("CREATE DATABASE regression")
pg = env.postgres.create_start("test_sql_regress")
pg.safe_psql("CREATE DATABASE regression")
# Create some local directories for pg_regress to run in.
runpath = test_output_dir / "regress"
@@ -162,9 +160,9 @@ def test_sql_regress(
]
env_vars = {
"PGPORT": str(endpoint.default_options["port"]),
"PGUSER": endpoint.default_options["user"],
"PGHOST": endpoint.default_options["host"],
"PGPORT": str(pg.default_options["port"]),
"PGUSER": pg.default_options["user"],
"PGHOST": pg.default_options["host"],
}
# Run the command.
@@ -174,8 +172,8 @@ def test_sql_regress(
pg_bin.run(pg_regress_command, env=env_vars, cwd=runpath)
# checkpoint one more time to ensure that the lsn we get is the latest one
endpoint.safe_psql("CHECKPOINT")
endpoint.safe_psql("select pg_current_wal_insert_lsn()")[0][0]
pg.safe_psql("CHECKPOINT")
pg.safe_psql("select pg_current_wal_insert_lsn()")[0][0]
# Check that we restore the content of the datadir correctly
check_restored_datadir_content(test_output_dir, env, endpoint)
check_restored_datadir_content(test_output_dir, env, pg)

View File

@@ -15,10 +15,10 @@ def test_pitr_gc(neon_env_builder: NeonEnvBuilder):
)
env = neon_env_builder.init_start()
endpoint_main = env.endpoints.create_start("main")
pgmain = env.postgres.create_start("main")
log.info("postgres is running on 'main' branch")
main_pg_conn = endpoint_main.connect()
main_pg_conn = pgmain.connect()
main_cur = main_pg_conn.cursor()
timeline = TimelineId(query_scalar(main_cur, "SHOW neon.timeline_id"))
@@ -62,10 +62,10 @@ def test_pitr_gc(neon_env_builder: NeonEnvBuilder):
# It must have been preserved by PITR setting
env.neon_cli.create_branch("test_pitr_gc_hundred", "main", ancestor_start_lsn=lsn_a)
endpoint_hundred = env.endpoints.create_start("test_pitr_gc_hundred")
pg_hundred = env.postgres.create_start("test_pitr_gc_hundred")
# On the 'hundred' branch, we should see only 100 rows
hundred_pg_conn = endpoint_hundred.connect()
hundred_pg_conn = pg_hundred.connect()
hundred_cur = hundred_pg_conn.cursor()
hundred_cur.execute("SELECT count(*) FROM foo")
assert hundred_cur.fetchone() == (100,)

View File

@@ -20,22 +20,22 @@ def test_read_request_tracing(neon_env_builder: NeonEnvBuilder):
)
timeline = env.neon_cli.create_timeline("test_trace_replay", tenant_id=tenant)
endpoint = env.endpoints.create_start("test_trace_replay", "main", tenant)
pg = env.postgres.create_start("test_trace_replay", "main", tenant)
with closing(endpoint.connect()) as conn:
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute("create table t (i integer);")
cur.execute(f"insert into t values (generate_series(1,{10000}));")
cur.execute("select count(*) from t;")
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
tenant_id = TenantId(pg.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0])
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
# wait until pageserver receives that data
pageserver_http = env.pageserver.http_client()
wait_for_last_record_lsn(pageserver_http, tenant_id, timeline_id, current_lsn)
# Stop postgres so we drop the connection and flush the traces
endpoint.stop()
# Stop pg so we drop the connection and flush the traces
pg.stop()
trace_path = env.repo_dir / "traces" / str(tenant) / str(timeline)
assert trace_path.exists()

View File

@@ -17,10 +17,10 @@ def test_read_validation(neon_simple_env: NeonEnv):
env = neon_simple_env
env.neon_cli.create_branch("test_read_validation", "empty")
endpoint = env.endpoints.create_start("test_read_validation")
pg = env.postgres.create_start("test_read_validation")
log.info("postgres is running on 'test_read_validation' branch")
with closing(endpoint.connect()) as con:
with closing(pg.connect()) as con:
with con.cursor() as c:
for e in extensions:
c.execute("create extension if not exists {};".format(e))
@@ -144,10 +144,10 @@ def test_read_validation_neg(neon_simple_env: NeonEnv):
env.pageserver.allowed_errors.append(".*invalid LSN\\(0\\) in request.*")
endpoint = env.endpoints.create_start("test_read_validation_neg")
pg = env.postgres.create_start("test_read_validation_neg")
log.info("postgres is running on 'test_read_validation_neg' branch")
with closing(endpoint.connect()) as con:
with closing(pg.connect()) as con:
with con.cursor() as c:
for e in extensions:
c.execute("create extension if not exists {};".format(e))

View File

@@ -14,12 +14,12 @@ from fixtures.utils import query_scalar
def test_readonly_node(neon_simple_env: NeonEnv):
env = neon_simple_env
env.neon_cli.create_branch("test_readonly_node", "empty")
endpoint_main = env.endpoints.create_start("test_readonly_node")
pgmain = env.postgres.create_start("test_readonly_node")
log.info("postgres is running on 'test_readonly_node' branch")
env.pageserver.allowed_errors.append(".*basebackup .* failed: invalid basebackup lsn.*")
main_pg_conn = endpoint_main.connect()
main_pg_conn = pgmain.connect()
main_cur = main_pg_conn.cursor()
# Create table, and insert the first 100 rows
@@ -60,23 +60,23 @@ def test_readonly_node(neon_simple_env: NeonEnv):
log.info("LSN after 400100 rows: " + lsn_c)
# Create first read-only node at the point where only 100 rows were inserted
endpoint_hundred = env.endpoints.create_start(
branch_name="test_readonly_node", endpoint_id="ep-readonly_node_hundred", lsn=lsn_a
pg_hundred = env.postgres.create_start(
branch_name="test_readonly_node", node_name="test_readonly_node_hundred", lsn=lsn_a
)
# And another at the point where 200100 rows were inserted
endpoint_more = env.endpoints.create_start(
branch_name="test_readonly_node", endpoint_id="ep-readonly_node_more", lsn=lsn_b
pg_more = env.postgres.create_start(
branch_name="test_readonly_node", node_name="test_readonly_node_more", lsn=lsn_b
)
# On the 'hundred' node, we should see only 100 rows
hundred_pg_conn = endpoint_hundred.connect()
hundred_pg_conn = pg_hundred.connect()
hundred_cur = hundred_pg_conn.cursor()
hundred_cur.execute("SELECT count(*) FROM foo")
assert hundred_cur.fetchone() == (100,)
# On the 'more' node, we should see 100200 rows
more_pg_conn = endpoint_more.connect()
more_pg_conn = pg_more.connect()
more_cur = more_pg_conn.cursor()
more_cur.execute("SELECT count(*) FROM foo")
assert more_cur.fetchone() == (200100,)
@@ -86,21 +86,21 @@ def test_readonly_node(neon_simple_env: NeonEnv):
assert main_cur.fetchone() == (400100,)
# Check creating a node at segment boundary
endpoint = env.endpoints.create_start(
pg = env.postgres.create_start(
branch_name="test_readonly_node",
endpoint_id="ep-branch_segment_boundary",
node_name="test_branch_segment_boundary",
lsn=Lsn("0/3000000"),
)
cur = endpoint.connect().cursor()
cur = pg.connect().cursor()
cur.execute("SELECT 1")
assert cur.fetchone() == (1,)
# Create node at pre-initdb lsn
with pytest.raises(Exception, match="invalid basebackup lsn"):
# compute node startup with invalid LSN should fail
env.endpoints.create_start(
env.postgres.create_start(
branch_name="test_readonly_node",
endpoint_id="ep-readonly_node_preinitdb",
node_name="test_readonly_node_preinitdb",
lsn=Lsn("0/42"),
)
@@ -110,16 +110,16 @@ def test_timetravel(neon_simple_env: NeonEnv):
env = neon_simple_env
pageserver_http_client = env.pageserver.http_client()
env.neon_cli.create_branch("test_timetravel", "empty")
endpoint = env.endpoints.create_start("test_timetravel")
pg = env.postgres.create_start("test_timetravel")
client = env.pageserver.http_client()
tenant_id = endpoint.safe_psql("show neon.tenant_id")[0][0]
timeline_id = endpoint.safe_psql("show neon.timeline_id")[0][0]
tenant_id = pg.safe_psql("show neon.tenant_id")[0][0]
timeline_id = pg.safe_psql("show neon.timeline_id")[0][0]
lsns = []
with endpoint.cursor() as cur:
with pg.cursor() as cur:
cur.execute(
"""
CREATE TABLE testtab(id serial primary key, iteration int, data text);
@@ -130,7 +130,7 @@ def test_timetravel(neon_simple_env: NeonEnv):
lsns.append((0, current_lsn))
for i in range(1, 5):
with endpoint.cursor() as cur:
with pg.cursor() as cur:
cur.execute(f"UPDATE testtab SET iteration = {i}")
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
lsns.append((i, current_lsn))
@@ -142,14 +142,14 @@ def test_timetravel(neon_simple_env: NeonEnv):
pageserver_http_client.timeline_checkpoint(tenant_id, timeline_id)
##### Restart pageserver
env.endpoints.stop_all()
env.postgres.stop_all()
env.pageserver.stop()
env.pageserver.start()
for i, lsn in lsns:
endpoint_old = env.endpoints.create_start(
branch_name="test_timetravel", endpoint_id=f"ep-old_lsn_{i}", lsn=lsn
pg_old = env.postgres.create_start(
branch_name="test_timetravel", node_name=f"test_old_lsn_{i}", lsn=lsn
)
with endpoint_old.cursor() as cur:
with pg_old.cursor() as cur:
assert query_scalar(cur, f"select count(*) from testtab where iteration={i}") == 100000
assert query_scalar(cur, f"select count(*) from testtab where iteration<>{i}") == 0

View File

@@ -22,10 +22,10 @@ def test_pageserver_recovery(neon_env_builder: NeonEnvBuilder):
# Create a branch for us
env.neon_cli.create_branch("test_pageserver_recovery", "main")
endpoint = env.endpoints.create_start("test_pageserver_recovery")
pg = env.postgres.create_start("test_pageserver_recovery")
log.info("postgres is running on 'test_pageserver_recovery' branch")
with closing(endpoint.connect()) as conn:
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
with env.pageserver.http_client() as pageserver_http:
# Create and initialize test table
@@ -54,7 +54,7 @@ def test_pageserver_recovery(neon_env_builder: NeonEnvBuilder):
env.pageserver.stop()
env.pageserver.start()
with closing(endpoint.connect()) as conn:
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute("select count(*) from foo")
assert cur.fetchone() == (100000,)

View File

@@ -85,17 +85,17 @@ def test_remote_storage_backup_and_restore(
env.pageserver.allowed_errors.append(".*simulated failure of remote operation.*")
pageserver_http = env.pageserver.http_client()
endpoint = env.endpoints.create_start("main")
pg = env.postgres.create_start("main")
client = env.pageserver.http_client()
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
tenant_id = TenantId(pg.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0])
checkpoint_numbers = range(1, 3)
for checkpoint_number in checkpoint_numbers:
with endpoint.cursor() as cur:
with pg.cursor() as cur:
cur.execute(
f"""
CREATE TABLE t{checkpoint_number}(id int primary key, data text);
@@ -124,7 +124,7 @@ def test_remote_storage_backup_and_restore(
)
##### Stop the first pageserver instance, erase all its data
env.endpoints.stop_all()
env.postgres.stop_all()
env.pageserver.stop()
dir_to_clear = Path(env.repo_dir) / "tenants"
@@ -190,8 +190,8 @@ def test_remote_storage_backup_and_restore(
), "current db Lsn should should not be less than the one stored on remote storage"
log.info("select some data, this will cause layers to be downloaded")
endpoint = env.endpoints.create_start("main")
with endpoint.cursor() as cur:
pg = env.postgres.create_start("main")
with pg.cursor() as cur:
for checkpoint_number in checkpoint_numbers:
assert (
query_scalar(cur, f"SELECT data FROM t{checkpoint_number} WHERE id = {data_id};")
@@ -241,9 +241,9 @@ def test_remote_storage_upload_queue_retries(
client = env.pageserver.http_client()
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
pg = env.postgres.create_start("main", tenant_id=tenant_id)
endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)")
pg.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)")
def configure_storage_sync_failpoints(action):
client.configure_failpoints(
@@ -256,7 +256,7 @@ def test_remote_storage_upload_queue_retries(
def overwrite_data_and_wait_for_it_to_arrive_at_pageserver(data):
# create initial set of layers & upload them with failpoints configured
endpoint.safe_psql_many(
pg.safe_psql_many(
[
f"""
INSERT INTO foo (id, val)
@@ -269,7 +269,7 @@ def test_remote_storage_upload_queue_retries(
"VACUUM foo",
]
)
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
def get_queued_count(file_kind, op_kind):
val = client.get_remote_timeline_client_metric(
@@ -346,7 +346,7 @@ def test_remote_storage_upload_queue_retries(
# but how do we validate the result after restore?
env.pageserver.stop(immediate=True)
env.endpoints.stop_all()
env.postgres.stop_all()
dir_to_clear = Path(env.repo_dir) / "tenants"
shutil.rmtree(dir_to_clear)
@@ -365,8 +365,8 @@ def test_remote_storage_upload_queue_retries(
wait_until(30, 1, tenant_active)
log.info("restarting postgres to validate")
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
with endpoint.cursor() as cur:
pg = env.postgres.create_start("main", tenant_id=tenant_id)
with pg.cursor() as cur:
assert query_scalar(cur, "SELECT COUNT(*) FROM foo WHERE val = 'd'") == 10000
@@ -402,13 +402,13 @@ def test_remote_timeline_client_calls_started_metric(
client = env.pageserver.http_client()
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
pg = env.postgres.create_start("main", tenant_id=tenant_id)
endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)")
pg.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)")
def overwrite_data_and_wait_for_it_to_arrive_at_pageserver(data):
# create initial set of layers & upload them with failpoints configured
endpoint.safe_psql_many(
pg.safe_psql_many(
[
f"""
INSERT INTO foo (id, val)
@@ -421,7 +421,7 @@ def test_remote_timeline_client_calls_started_metric(
"VACUUM foo",
]
)
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
calls_started: Dict[Tuple[str, str], List[int]] = {
("layer", "upload"): [0],
@@ -486,7 +486,7 @@ def test_remote_timeline_client_calls_started_metric(
)
env.pageserver.stop(immediate=True)
env.endpoints.stop_all()
env.postgres.stop_all()
dir_to_clear = Path(env.repo_dir) / "tenants"
shutil.rmtree(dir_to_clear)
@@ -505,8 +505,8 @@ def test_remote_timeline_client_calls_started_metric(
wait_until(30, 1, tenant_active)
log.info("restarting postgres to validate")
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
with endpoint.cursor() as cur:
pg = env.postgres.create_start("main", tenant_id=tenant_id)
with pg.cursor() as cur:
assert query_scalar(cur, "SELECT COUNT(*) FROM foo WHERE val = 'd'") == 10000
# ensure that we updated the calls_started download metric
@@ -556,17 +556,17 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue(
)
return int(val) if val is not None else val
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
pg = env.postgres.create_start("main", tenant_id=tenant_id)
client.configure_failpoints(("before-upload-layer", "return"))
endpoint.safe_psql_many(
pg.safe_psql_many(
[
"CREATE TABLE foo (x INTEGER)",
"INSERT INTO foo SELECT g FROM generate_series(1, 10000) g",
]
)
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
# Kick off a checkpoint operation.
# It will get stuck in remote_client.wait_completion(), since the select query will have
@@ -640,8 +640,8 @@ def test_empty_branch_remote_storage_upload(
new_branch_name = "new_branch"
new_branch_timeline_id = env.neon_cli.create_branch(new_branch_name, "main", env.initial_tenant)
with env.endpoints.create_start(new_branch_name, tenant_id=env.initial_tenant) as endpoint:
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, new_branch_timeline_id)
with env.postgres.create_start(new_branch_name, tenant_id=env.initial_tenant) as pg:
wait_for_last_flush_lsn(env, pg, env.initial_tenant, new_branch_timeline_id)
wait_upload_queue_empty(client, env.initial_tenant, new_branch_timeline_id)
timelines_before_detach = set(
@@ -689,8 +689,8 @@ def test_empty_branch_remote_storage_upload_on_restart(
new_branch_name = "new_branch"
new_branch_timeline_id = env.neon_cli.create_branch(new_branch_name, "main", env.initial_tenant)
with env.endpoints.create_start(new_branch_name, tenant_id=env.initial_tenant) as endpoint:
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, new_branch_timeline_id)
with env.postgres.create_start(new_branch_name, tenant_id=env.initial_tenant) as pg:
wait_for_last_flush_lsn(env, pg, env.initial_tenant, new_branch_timeline_id)
wait_upload_queue_empty(client, env.initial_tenant, new_branch_timeline_id)
env.pageserver.stop()

View File

@@ -11,10 +11,10 @@ from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
def test_subxacts(neon_simple_env: NeonEnv, test_output_dir):
env = neon_simple_env
env.neon_cli.create_branch("test_subxacts", "empty")
endpoint = env.endpoints.create_start("test_subxacts")
pg = env.postgres.create_start("test_subxacts")
log.info("postgres is running on 'test_subxacts' branch")
pg_conn = endpoint.connect()
pg_conn = pg.connect()
cur = pg_conn.cursor()
cur.execute(
@@ -37,4 +37,4 @@ def test_subxacts(neon_simple_env: NeonEnv, test_output_dir):
cur.execute("checkpoint")
# Check that we can restore the content of the datadir correctly
check_restored_datadir_content(test_output_dir, env, endpoint)
check_restored_datadir_content(test_output_dir, env, pg)

View File

@@ -44,7 +44,11 @@ tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}"""
tenant, _ = env.neon_cli.create_tenant(conf=new_conf)
env.neon_cli.create_timeline("test_tenant_conf", tenant_id=tenant)
env.endpoints.create_start("test_tenant_conf", "main", tenant)
env.postgres.create_start(
"test_tenant_conf",
"main",
tenant,
)
# check the configuration of the default tenant
# it should match global configuration

View File

@@ -7,11 +7,11 @@ import asyncpg
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
Endpoint,
NeonEnv,
NeonEnvBuilder,
PageserverApiException,
PageserverHttpClient,
Postgres,
RemoteStorageKind,
available_remote_storages,
wait_for_last_record_lsn,
@@ -59,8 +59,8 @@ def test_tenant_reattach(
# create new nenant
tenant_id, timeline_id = env.neon_cli.create_tenant()
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
with endpoint.cursor() as cur:
with env.postgres.create_start("main", tenant_id=tenant_id) as pg:
with pg.cursor() as cur:
cur.execute("CREATE TABLE t(key int primary key, value text)")
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
@@ -99,8 +99,8 @@ def test_tenant_reattach(
assert pageserver_last_record_lsn_before_detach == pageserver_last_record_lsn
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
with endpoint.cursor() as cur:
with env.postgres.create_start("main", tenant_id=tenant_id) as pg:
with pg.cursor() as cur:
assert query_scalar(cur, "SELECT count(*) FROM t") == 100000
# Check that we had to retry the downloads
@@ -157,11 +157,11 @@ async def sleep_and_reattach(pageserver_http: PageserverHttpClient, tenant_id: T
# async guts of test_tenant_reattach_while_bysy test
async def reattach_while_busy(
env: NeonEnv, endpoint: Endpoint, pageserver_http: PageserverHttpClient, tenant_id: TenantId
env: NeonEnv, pg: Postgres, pageserver_http: PageserverHttpClient, tenant_id: TenantId
):
workers = []
for worker_id in range(num_connections):
pg_conn = await endpoint.connect_async()
pg_conn = await pg.connect_async()
workers.append(asyncio.create_task(update_table(pg_conn)))
workers.append(asyncio.create_task(sleep_and_reattach(pageserver_http, tenant_id)))
@@ -238,15 +238,15 @@ def test_tenant_reattach_while_busy(
conf={"checkpoint_distance": "100000"}
)
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
pg = env.postgres.create_start("main", tenant_id=tenant_id)
cur = endpoint.connect().cursor()
cur = pg.connect().cursor()
cur.execute("CREATE TABLE t(id int primary key, counter int)")
cur.execute(f"INSERT INTO t SELECT generate_series(1,{num_rows}), 0")
# Run the test
asyncio.run(reattach_while_busy(env, endpoint, pageserver_http, tenant_id))
asyncio.run(reattach_while_busy(env, pg, pageserver_http, tenant_id))
# Verify table contents
assert query_scalar(cur, "SELECT count(*) FROM t") == num_rows
@@ -278,9 +278,9 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder):
# assert tenant exists on disk
assert (env.repo_dir / "tenants" / str(tenant_id)).exists()
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
pg = env.postgres.create_start("main", tenant_id=tenant_id)
# we rely upon autocommit after each statement
endpoint.safe_psql_many(
pg.safe_psql_many(
queries=[
"CREATE TABLE t(key int primary key, value text)",
"INSERT INTO t SELECT generate_series(1,100000), 'payload'",
@@ -339,9 +339,9 @@ def test_tenant_detach_ignored_tenant(neon_simple_env: NeonEnv):
# assert tenant exists on disk
assert (env.repo_dir / "tenants" / str(tenant_id)).exists()
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
pg = env.postgres.create_start("main", tenant_id=tenant_id)
# we rely upon autocommit after each statement
endpoint.safe_psql_many(
pg.safe_psql_many(
queries=[
"CREATE TABLE t(key int primary key, value text)",
"INSERT INTO t SELECT generate_series(1,100000), 'payload'",
@@ -388,9 +388,9 @@ def test_tenant_detach_regular_tenant(neon_simple_env: NeonEnv):
# assert tenant exists on disk
assert (env.repo_dir / "tenants" / str(tenant_id)).exists()
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
pg = env.postgres.create_start("main", tenant_id=tenant_id)
# we rely upon autocommit after each statement
endpoint.safe_psql_many(
pg.safe_psql_many(
queries=[
"CREATE TABLE t(key int primary key, value text)",
"INSERT INTO t SELECT generate_series(1,100000), 'payload'",
@@ -425,18 +425,18 @@ def test_detach_while_attaching(
##### First start, insert secret data and upload it to the remote storage
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
endpoint = env.endpoints.create_start("main")
pg = env.postgres.create_start("main")
client = env.pageserver.http_client()
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
tenant_id = TenantId(pg.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0])
# Create table, and insert some rows. Make it big enough that it doesn't fit in
# shared_buffers, otherwise the SELECT after restart will just return answer
# from shared_buffers without hitting the page server, which defeats the point
# of this test.
with endpoint.cursor() as cur:
with pg.cursor() as cur:
cur.execute("CREATE TABLE foo (t text)")
cur.execute(
"""
@@ -477,7 +477,7 @@ def test_detach_while_attaching(
# cycle are still running, things could get really confusing..
pageserver_http.tenant_attach(tenant_id)
with endpoint.cursor() as cur:
with pg.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM foo")
@@ -572,14 +572,14 @@ def test_ignored_tenant_download_missing_layers(
)
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
endpoint = env.endpoints.create_start("main")
pg = env.postgres.create_start("main")
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
tenant_id = TenantId(pg.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0])
data_id = 1
data_secret = "very secret secret"
insert_test_data(pageserver_http, tenant_id, timeline_id, data_id, data_secret, endpoint)
insert_test_data(pageserver_http, tenant_id, timeline_id, data_id, data_secret, pg)
tenants_before_ignore = [tenant["id"] for tenant in pageserver_http.tenant_list()]
tenants_before_ignore.sort()
@@ -611,9 +611,9 @@ def test_ignored_tenant_download_missing_layers(
]
assert timelines_before_ignore == timelines_after_ignore, "Should have all timelines back"
endpoint.stop()
endpoint.start()
ensure_test_data(data_id, data_secret, endpoint)
pg.stop()
pg.start()
ensure_test_data(data_id, data_secret, pg)
# Tests that it's possible to `load` broken tenants:
@@ -631,10 +631,10 @@ def test_ignored_tenant_stays_broken_without_metadata(
)
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
endpoint = env.endpoints.create_start("main")
pg = env.postgres.create_start("main")
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
tenant_id = TenantId(pg.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0])
# ignore the tenant and remove its metadata
pageserver_http.tenant_ignore(tenant_id)
@@ -666,9 +666,9 @@ def test_load_attach_negatives(
)
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
endpoint = env.endpoints.create_start("main")
pg = env.postgres.create_start("main")
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
tenant_id = TenantId(pg.safe_psql("show neon.tenant_id")[0][0])
env.pageserver.allowed_errors.append(".*tenant .*? already exists, state:.*")
with pytest.raises(
@@ -707,16 +707,16 @@ def test_ignore_while_attaching(
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
endpoint = env.endpoints.create_start("main")
pg = env.postgres.create_start("main")
pageserver_http = env.pageserver.http_client()
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
tenant_id = TenantId(pg.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0])
data_id = 1
data_secret = "very secret secret"
insert_test_data(pageserver_http, tenant_id, timeline_id, data_id, data_secret, endpoint)
insert_test_data(pageserver_http, tenant_id, timeline_id, data_id, data_secret, pg)
tenants_before_ignore = [tenant["id"] for tenant in pageserver_http.tenant_list()]
@@ -754,9 +754,9 @@ def test_ignore_while_attaching(
wait_until_tenant_state(pageserver_http, tenant_id, "Active", 5)
endpoint.stop()
endpoint.start()
ensure_test_data(data_id, data_secret, endpoint)
pg.stop()
pg.start()
ensure_test_data(data_id, data_secret, pg)
def insert_test_data(
@@ -765,9 +765,9 @@ def insert_test_data(
timeline_id: TimelineId,
data_id: int,
data: str,
endpoint: Endpoint,
pg: Postgres,
):
with endpoint.cursor() as cur:
with pg.cursor() as cur:
cur.execute(
f"""
CREATE TABLE test(id int primary key, secret text);
@@ -787,8 +787,8 @@ def insert_test_data(
wait_for_upload(pageserver_http, tenant_id, timeline_id, current_lsn)
def ensure_test_data(data_id: int, data: str, endpoint: Endpoint):
with endpoint.cursor() as cur:
def ensure_test_data(data_id: int, data: str, pg: Postgres):
with pg.cursor() as cur:
assert (
query_scalar(cur, f"SELECT secret FROM test WHERE id = {data_id};") == data
), "Should have timeline data back"

View File

@@ -7,12 +7,12 @@ from typing import Any, Dict, Optional, Tuple
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
Endpoint,
NeonBroker,
NeonEnv,
NeonEnvBuilder,
PageserverHttpClient,
PortDistributor,
Postgres,
assert_tenant_status,
tenant_exists,
wait_for_last_record_lsn,
@@ -81,20 +81,20 @@ def new_pageserver_service(
@contextmanager
def pg_cur(endpoint):
with closing(endpoint.connect()) as conn:
def pg_cur(pg):
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
yield cur
def load(endpoint: Endpoint, stop_event: threading.Event, load_ok_event: threading.Event):
def load(pg: Postgres, stop_event: threading.Event, load_ok_event: threading.Event):
log.info("load started")
inserted_ctr = 0
failed = False
while not stop_event.is_set():
try:
with pg_cur(endpoint) as cur:
with pg_cur(pg) as cur:
cur.execute("INSERT INTO load VALUES ('some payload')")
inserted_ctr += 1
except: # noqa: E722
@@ -104,7 +104,7 @@ def load(endpoint: Endpoint, stop_event: threading.Event, load_ok_event: threadi
load_ok_event.clear()
else:
if failed:
with pg_cur(endpoint) as cur:
with pg_cur(pg) as cur:
# if we recovered after failure verify that we have correct number of rows
log.info("recovering at %s", inserted_ctr)
cur.execute("SELECT count(*) FROM load")
@@ -118,14 +118,14 @@ def load(endpoint: Endpoint, stop_event: threading.Event, load_ok_event: threadi
def populate_branch(
endpoint: Endpoint,
pg: Postgres,
tenant_id: TenantId,
ps_http: PageserverHttpClient,
create_table: bool,
expected_sum: Optional[int],
) -> Tuple[TimelineId, Lsn]:
# insert some data
with pg_cur(endpoint) as cur:
with pg_cur(pg) as cur:
cur.execute("SHOW neon.timeline_id")
timeline_id = TimelineId(cur.fetchone()[0])
log.info("timeline to relocate %s", timeline_id)
@@ -190,19 +190,19 @@ def check_timeline_attached(
def switch_pg_to_new_pageserver(
env: NeonEnv,
endpoint: Endpoint,
pg: Postgres,
new_pageserver_port: int,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Path:
endpoint.stop()
pg.stop()
pg_config_file_path = Path(endpoint.config_file_path())
pg_config_file_path = Path(pg.config_file_path())
pg_config_file_path.open("a").write(
f"\nneon.pageserver_connstring = 'postgresql://no_user:@localhost:{new_pageserver_port}'"
)
endpoint.start()
pg.start()
timeline_to_detach_local_path = (
env.repo_dir / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
@@ -220,8 +220,8 @@ def switch_pg_to_new_pageserver(
return timeline_to_detach_local_path
def post_migration_check(endpoint: Endpoint, sum_before_migration: int, old_local_path: Path):
with pg_cur(endpoint) as cur:
def post_migration_check(pg: Postgres, sum_before_migration: int, old_local_path: Path):
with pg_cur(pg) as cur:
# check that data is still there
cur.execute("SELECT sum(key) FROM t")
assert cur.fetchone() == (sum_before_migration,)
@@ -282,12 +282,12 @@ def test_tenant_relocation(
log.info("tenant to relocate %s initial_timeline_id %s", tenant_id, initial_timeline_id)
env.neon_cli.create_branch("test_tenant_relocation_main", tenant_id=tenant_id)
ep_main = env.endpoints.create_start(
pg_main = env.postgres.create_start(
branch_name="test_tenant_relocation_main", tenant_id=tenant_id
)
timeline_id_main, current_lsn_main = populate_branch(
ep_main,
pg_main,
tenant_id=tenant_id,
ps_http=pageserver_http,
create_table=True,
@@ -300,12 +300,12 @@ def test_tenant_relocation(
ancestor_start_lsn=current_lsn_main,
tenant_id=tenant_id,
)
ep_second = env.endpoints.create_start(
pg_second = env.postgres.create_start(
branch_name="test_tenant_relocation_second", tenant_id=tenant_id
)
timeline_id_second, current_lsn_second = populate_branch(
ep_second,
pg_second,
tenant_id=tenant_id,
ps_http=pageserver_http,
create_table=False,
@@ -321,14 +321,14 @@ def test_tenant_relocation(
if with_load == "with_load":
# create load table
with pg_cur(ep_main) as cur:
with pg_cur(pg_main) as cur:
cur.execute("CREATE TABLE load(value text)")
load_stop_event = threading.Event()
load_ok_event = threading.Event()
load_thread = threading.Thread(
target=load,
args=(ep_main, load_stop_event, load_ok_event),
args=(pg_main, load_stop_event, load_ok_event),
daemon=True, # To make sure the child dies when the parent errors
)
load_thread.start()
@@ -444,7 +444,7 @@ def test_tenant_relocation(
old_local_path_main = switch_pg_to_new_pageserver(
env,
ep_main,
pg_main,
new_pageserver_pg_port,
tenant_id,
timeline_id_main,
@@ -452,7 +452,7 @@ def test_tenant_relocation(
old_local_path_second = switch_pg_to_new_pageserver(
env,
ep_second,
pg_second,
new_pageserver_pg_port,
tenant_id,
timeline_id_second,
@@ -469,11 +469,11 @@ def test_tenant_relocation(
interval=1,
func=lambda: tenant_exists(pageserver_http, tenant_id),
)
post_migration_check(ep_main, 500500, old_local_path_main)
post_migration_check(ep_second, 1001000, old_local_path_second)
post_migration_check(pg_main, 500500, old_local_path_main)
post_migration_check(pg_second, 1001000, old_local_path_second)
# ensure that we can successfully read all relations on the new pageserver
with pg_cur(ep_second) as cur:
with pg_cur(pg_second) as cur:
cur.execute(
"""
DO $$

View File

@@ -4,10 +4,10 @@ from typing import List, Tuple
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
Endpoint,
NeonEnv,
NeonEnvBuilder,
PageserverHttpClient,
Postgres,
wait_for_last_flush_lsn,
wait_for_wal_insert_lsn,
)
@@ -28,12 +28,12 @@ def test_empty_tenant_size(neon_simple_env: NeonEnv, test_output_dir: Path):
branch_name, main_timeline_id = env.neon_cli.list_timelines(tenant_id)[0]
assert branch_name == main_branch_name
with env.endpoints.create_start(
with env.postgres.create_start(
main_branch_name,
tenant_id=tenant_id,
config_lines=["autovacuum=off", "checkpoint_timeout=10min"],
) as endpoint:
with endpoint.cursor() as cur:
) as pg:
with pg.cursor() as cur:
cur.execute("SELECT 1")
row = cur.fetchone()
assert row is not None
@@ -105,12 +105,12 @@ def test_branched_empty_timeline_size(neon_simple_env: NeonEnv, test_output_dir:
first_branch_timeline_id = env.neon_cli.create_branch("first-branch", tenant_id=tenant_id)
with env.endpoints.create_start("first-branch", tenant_id=tenant_id) as endpoint:
with endpoint.cursor() as cur:
with env.postgres.create_start("first-branch", tenant_id=tenant_id) as pg:
with pg.cursor() as cur:
cur.execute(
"CREATE TABLE t0 AS SELECT i::bigint n FROM generate_series(0, 1000000) s(i)"
)
wait_for_last_flush_lsn(env, endpoint, tenant_id, first_branch_timeline_id)
wait_for_last_flush_lsn(env, pg, tenant_id, first_branch_timeline_id)
size_after_branching = http_client.tenant_size(tenant_id)
log.info(f"size_after_branching: {size_after_branching}")
@@ -164,12 +164,12 @@ def test_branched_from_many_empty_parents_size(neon_simple_env: NeonEnv, test_ou
assert last_branch is not None
with env.endpoints.create_start(last_branch_name, tenant_id=tenant_id) as endpoint:
with endpoint.cursor() as cur:
with env.postgres.create_start(last_branch_name, tenant_id=tenant_id) as pg:
with pg.cursor() as cur:
cur.execute(
"CREATE TABLE t0 AS SELECT i::bigint n FROM generate_series(0, 1000000) s(i)"
)
wait_for_last_flush_lsn(env, endpoint, tenant_id, last_branch)
wait_for_last_flush_lsn(env, pg, tenant_id, last_branch)
size_after_writes = http_client.tenant_size(tenant_id)
assert size_after_writes > initial_size
@@ -194,11 +194,11 @@ def test_branch_point_within_horizon(neon_simple_env: NeonEnv, test_output_dir:
(tenant_id, main_id) = env.neon_cli.create_tenant(conf={"gc_horizon": str(gc_horizon)})
http_client = env.pageserver.http_client()
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
initdb_lsn = wait_for_last_flush_lsn(env, endpoint, tenant_id, main_id)
with endpoint.cursor() as cur:
with env.postgres.create_start("main", tenant_id=tenant_id) as pg:
initdb_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, main_id)
with pg.cursor() as cur:
cur.execute("CREATE TABLE t0 AS SELECT i::bigint n FROM generate_series(0, 1000) s(i)")
flushed_lsn = wait_for_last_flush_lsn(env, endpoint, tenant_id, main_id)
flushed_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, main_id)
size_before_branching = http_client.tenant_size(tenant_id)
@@ -208,10 +208,10 @@ def test_branch_point_within_horizon(neon_simple_env: NeonEnv, test_output_dir:
"branch", tenant_id=tenant_id, ancestor_start_lsn=flushed_lsn
)
with env.endpoints.create_start("branch", tenant_id=tenant_id) as endpoint:
with endpoint.cursor() as cur:
with env.postgres.create_start("branch", tenant_id=tenant_id) as pg:
with pg.cursor() as cur:
cur.execute("CREATE TABLE t1 AS SELECT i::bigint n FROM generate_series(0, 1000) s(i)")
wait_for_last_flush_lsn(env, endpoint, tenant_id, branch_id)
wait_for_last_flush_lsn(env, pg, tenant_id, branch_id)
size_after = http_client.tenant_size(tenant_id)
@@ -237,17 +237,17 @@ def test_parent_within_horizon(neon_simple_env: NeonEnv, test_output_dir: Path):
(tenant_id, main_id) = env.neon_cli.create_tenant(conf={"gc_horizon": str(gc_horizon)})
http_client = env.pageserver.http_client()
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
initdb_lsn = wait_for_last_flush_lsn(env, endpoint, tenant_id, main_id)
with endpoint.cursor() as cur:
with env.postgres.create_start("main", tenant_id=tenant_id) as pg:
initdb_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, main_id)
with pg.cursor() as cur:
cur.execute("CREATE TABLE t0 AS SELECT i::bigint n FROM generate_series(0, 1000) s(i)")
flushed_lsn = wait_for_last_flush_lsn(env, endpoint, tenant_id, main_id)
flushed_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, main_id)
with endpoint.cursor() as cur:
with pg.cursor() as cur:
cur.execute("CREATE TABLE t00 AS SELECT i::bigint n FROM generate_series(0, 2000) s(i)")
wait_for_last_flush_lsn(env, endpoint, tenant_id, main_id)
wait_for_last_flush_lsn(env, pg, tenant_id, main_id)
size_before_branching = http_client.tenant_size(tenant_id)
@@ -257,10 +257,10 @@ def test_parent_within_horizon(neon_simple_env: NeonEnv, test_output_dir: Path):
"branch", tenant_id=tenant_id, ancestor_start_lsn=flushed_lsn
)
with env.endpoints.create_start("branch", tenant_id=tenant_id) as endpoint:
with endpoint.cursor() as cur:
with env.postgres.create_start("branch", tenant_id=tenant_id) as pg:
with pg.cursor() as cur:
cur.execute("CREATE TABLE t1 AS SELECT i::bigint n FROM generate_series(0, 10000) s(i)")
wait_for_last_flush_lsn(env, endpoint, tenant_id, branch_id)
wait_for_last_flush_lsn(env, pg, tenant_id, branch_id)
size_after = http_client.tenant_size(tenant_id)
@@ -297,12 +297,12 @@ def test_only_heads_within_horizon(neon_simple_env: NeonEnv, test_output_dir: Pa
# gc is not expected to change the results
for branch_name, amount in [("main", 2000), ("first", 15000), ("second", 3000)]:
with env.endpoints.create_start(branch_name, tenant_id=tenant_id) as endpoint:
with endpoint.cursor() as cur:
with env.postgres.create_start(branch_name, tenant_id=tenant_id) as pg:
with pg.cursor() as cur:
cur.execute(
f"CREATE TABLE t0 AS SELECT i::bigint n FROM generate_series(0, {amount}) s(i)"
)
wait_for_last_flush_lsn(env, endpoint, tenant_id, ids[branch_name])
wait_for_last_flush_lsn(env, pg, tenant_id, ids[branch_name])
size_now = http_client.tenant_size(tenant_id)
if latest_size is not None:
assert size_now > latest_size
@@ -359,7 +359,7 @@ def test_single_branch_get_tenant_size_grows(
def get_current_consistent_size(
env: NeonEnv,
endpoint: Endpoint,
pg: Postgres,
size_debug_file, # apparently there is no public signature for open()...
http_client: PageserverHttpClient,
tenant_id: TenantId,
@@ -368,7 +368,7 @@ def test_single_branch_get_tenant_size_grows(
consistent = False
size_debug = None
current_lsn = wait_for_wal_insert_lsn(env, endpoint, tenant_id, timeline_id)
current_lsn = wait_for_wal_insert_lsn(env, pg, tenant_id, timeline_id)
# We want to make sure we have a self-consistent set of values.
# Size changes with WAL, so only if both before and after getting
# the size of the tenant reports the same WAL insert LSN, we're OK
@@ -382,35 +382,35 @@ def test_single_branch_get_tenant_size_grows(
size, sizes = http_client.tenant_size_and_modelinputs(tenant_id)
size_debug = http_client.tenant_size_debug(tenant_id)
after_lsn = wait_for_wal_insert_lsn(env, endpoint, tenant_id, timeline_id)
after_lsn = wait_for_wal_insert_lsn(env, pg, tenant_id, timeline_id)
consistent = current_lsn == after_lsn
current_lsn = after_lsn
size_debug_file.write(size_debug)
return (current_lsn, size)
with env.endpoints.create_start(
with env.postgres.create_start(
branch_name,
tenant_id=tenant_id,
### autovacuum is disabled to limit WAL logging.
config_lines=["autovacuum=off"],
) as endpoint:
) as pg:
(initdb_lsn, size) = get_current_consistent_size(
env, endpoint, size_debug_file, http_client, tenant_id, timeline_id
env, pg, size_debug_file, http_client, tenant_id, timeline_id
)
collected_responses.append(("INITDB", initdb_lsn, size))
with endpoint.cursor() as cur:
with pg.cursor() as cur:
cur.execute("CREATE TABLE t0 (i BIGINT NOT NULL) WITH (fillfactor = 40)")
(current_lsn, size) = get_current_consistent_size(
env, endpoint, size_debug_file, http_client, tenant_id, timeline_id
env, pg, size_debug_file, http_client, tenant_id, timeline_id
)
collected_responses.append(("CREATE", current_lsn, size))
batch_size = 100
for i in range(3):
with endpoint.cursor() as cur:
with pg.cursor() as cur:
cur.execute(
f"INSERT INTO t0(i) SELECT i FROM generate_series({batch_size} * %s, ({batch_size} * (%s + 1)) - 1) s(i)",
(i, i),
@@ -419,7 +419,7 @@ def test_single_branch_get_tenant_size_grows(
i += 1
(current_lsn, size) = get_current_consistent_size(
env, endpoint, size_debug_file, http_client, tenant_id, timeline_id
env, pg, size_debug_file, http_client, tenant_id, timeline_id
)
prev_size = collected_responses[-1][2]
@@ -438,7 +438,7 @@ def test_single_branch_get_tenant_size_grows(
collected_responses.append(("INSERT", current_lsn, size))
while True:
with endpoint.cursor() as cur:
with pg.cursor() as cur:
cur.execute(
f"UPDATE t0 SET i = -i WHERE i IN (SELECT i FROM t0 WHERE i > 0 LIMIT {batch_size})"
)
@@ -448,7 +448,7 @@ def test_single_branch_get_tenant_size_grows(
break
(current_lsn, size) = get_current_consistent_size(
env, endpoint, size_debug_file, http_client, tenant_id, timeline_id
env, pg, size_debug_file, http_client, tenant_id, timeline_id
)
prev_size = collected_responses[-1][2]
@@ -458,7 +458,7 @@ def test_single_branch_get_tenant_size_grows(
collected_responses.append(("UPDATE", current_lsn, size))
while True:
with endpoint.cursor() as cur:
with pg.cursor() as cur:
cur.execute(f"DELETE FROM t0 WHERE i IN (SELECT i FROM t0 LIMIT {batch_size})")
deleted = cur.rowcount
@@ -466,7 +466,7 @@ def test_single_branch_get_tenant_size_grows(
break
(current_lsn, size) = get_current_consistent_size(
env, endpoint, size_debug_file, http_client, tenant_id, timeline_id
env, pg, size_debug_file, http_client, tenant_id, timeline_id
)
prev_size = collected_responses[-1][2]
@@ -475,14 +475,14 @@ def test_single_branch_get_tenant_size_grows(
collected_responses.append(("DELETE", current_lsn, size))
with endpoint.cursor() as cur:
with pg.cursor() as cur:
cur.execute("DROP TABLE t0")
# The size of the tenant should still be as large as before we dropped
# the table, because the drop operation can still be undone in the PITR
# defined by gc_horizon.
(current_lsn, size) = get_current_consistent_size(
env, endpoint, size_debug_file, http_client, tenant_id, timeline_id
env, pg, size_debug_file, http_client, tenant_id, timeline_id
)
prev_size = collected_responses[-1][2]
@@ -532,16 +532,16 @@ def test_get_tenant_size_with_multiple_branches(
http_client = env.pageserver.http_client()
main_endpoint = env.endpoints.create_start(main_branch_name, tenant_id=tenant_id)
main_pg = env.postgres.create_start(main_branch_name, tenant_id=tenant_id)
batch_size = 10000
with main_endpoint.cursor() as cur:
with main_pg.cursor() as cur:
cur.execute(
f"CREATE TABLE t0 AS SELECT i::bigint n FROM generate_series(0, {batch_size}) s(i)"
)
wait_for_last_flush_lsn(env, main_endpoint, tenant_id, main_timeline_id)
wait_for_last_flush_lsn(env, main_pg, tenant_id, main_timeline_id)
size_at_branch = http_client.tenant_size(tenant_id)
assert size_at_branch > 0
@@ -552,23 +552,23 @@ def test_get_tenant_size_with_multiple_branches(
size_after_first_branch = http_client.tenant_size(tenant_id)
assert size_after_first_branch == size_at_branch
first_branch_endpoint = env.endpoints.create_start("first-branch", tenant_id=tenant_id)
first_branch_pg = env.postgres.create_start("first-branch", tenant_id=tenant_id)
with first_branch_endpoint.cursor() as cur:
with first_branch_pg.cursor() as cur:
cur.execute(
f"CREATE TABLE t1 AS SELECT i::bigint n FROM generate_series(0, {batch_size}) s(i)"
)
wait_for_last_flush_lsn(env, first_branch_endpoint, tenant_id, first_branch_timeline_id)
wait_for_last_flush_lsn(env, first_branch_pg, tenant_id, first_branch_timeline_id)
size_after_growing_first_branch = http_client.tenant_size(tenant_id)
assert size_after_growing_first_branch > size_after_first_branch
with main_endpoint.cursor() as cur:
with main_pg.cursor() as cur:
cur.execute(
f"CREATE TABLE t1 AS SELECT i::bigint n FROM generate_series(0, 2*{batch_size}) s(i)"
)
wait_for_last_flush_lsn(env, main_endpoint, tenant_id, main_timeline_id)
wait_for_last_flush_lsn(env, main_pg, tenant_id, main_timeline_id)
size_after_continuing_on_main = http_client.tenant_size(tenant_id)
assert size_after_continuing_on_main > size_after_growing_first_branch
@@ -578,31 +578,31 @@ def test_get_tenant_size_with_multiple_branches(
size_after_second_branch = http_client.tenant_size(tenant_id)
assert size_after_second_branch == size_after_continuing_on_main
second_branch_endpoint = env.endpoints.create_start("second-branch", tenant_id=tenant_id)
second_branch_pg = env.postgres.create_start("second-branch", tenant_id=tenant_id)
with second_branch_endpoint.cursor() as cur:
with second_branch_pg.cursor() as cur:
cur.execute(
f"CREATE TABLE t2 AS SELECT i::bigint n FROM generate_series(0, 3*{batch_size}) s(i)"
)
wait_for_last_flush_lsn(env, second_branch_endpoint, tenant_id, second_branch_timeline_id)
wait_for_last_flush_lsn(env, second_branch_pg, tenant_id, second_branch_timeline_id)
size_after_growing_second_branch = http_client.tenant_size(tenant_id)
assert size_after_growing_second_branch > size_after_second_branch
with second_branch_endpoint.cursor() as cur:
with second_branch_pg.cursor() as cur:
cur.execute("DROP TABLE t0")
cur.execute("DROP TABLE t1")
cur.execute("VACUUM FULL")
wait_for_last_flush_lsn(env, second_branch_endpoint, tenant_id, second_branch_timeline_id)
wait_for_last_flush_lsn(env, second_branch_pg, tenant_id, second_branch_timeline_id)
size_after_thinning_branch = http_client.tenant_size(tenant_id)
assert (
size_after_thinning_branch > size_after_growing_second_branch
), "tenant_size should grow with dropped tables and full vacuum"
first_branch_endpoint.stop_and_destroy()
second_branch_endpoint.stop_and_destroy()
main_endpoint.stop()
first_branch_pg.stop_and_destroy()
second_branch_pg.stop_and_destroy()
main_pg.stop()
env.pageserver.stop()
env.pageserver.start()

View File

@@ -31,13 +31,13 @@ def test_tenant_tasks(neon_env_builder: NeonEnvBuilder):
# Create tenant, start compute
tenant, _ = env.neon_cli.create_tenant()
env.neon_cli.create_timeline(name, tenant_id=tenant)
endpoint = env.endpoints.create_start(name, tenant_id=tenant)
pg = env.postgres.create_start(name, tenant_id=tenant)
assert (
get_state(tenant) == "Active"
), "Pageserver should activate a tenant and start background jobs if timelines are loaded"
# Stop compute
endpoint.stop()
pg.stop()
# Delete all timelines on all tenants.
#

View File

@@ -66,17 +66,17 @@ def test_tenants_normal_work(neon_env_builder: NeonEnvBuilder):
env.neon_cli.create_timeline("test_tenants_normal_work", tenant_id=tenant_1)
env.neon_cli.create_timeline("test_tenants_normal_work", tenant_id=tenant_2)
endpoint_tenant1 = env.endpoints.create_start(
pg_tenant1 = env.postgres.create_start(
"test_tenants_normal_work",
tenant_id=tenant_1,
)
endpoint_tenant2 = env.endpoints.create_start(
pg_tenant2 = env.postgres.create_start(
"test_tenants_normal_work",
tenant_id=tenant_2,
)
for endpoint in [endpoint_tenant1, endpoint_tenant2]:
with closing(endpoint.connect()) as conn:
for pg in [pg_tenant1, pg_tenant2]:
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
# we rely upon autocommit after each statement
# as waiting for acceptors happens there
@@ -97,11 +97,11 @@ def test_metrics_normal_work(neon_env_builder: NeonEnvBuilder):
timeline_1 = env.neon_cli.create_timeline("test_metrics_normal_work", tenant_id=tenant_1)
timeline_2 = env.neon_cli.create_timeline("test_metrics_normal_work", tenant_id=tenant_2)
endpoint_tenant1 = env.endpoints.create_start("test_metrics_normal_work", tenant_id=tenant_1)
endpoint_tenant2 = env.endpoints.create_start("test_metrics_normal_work", tenant_id=tenant_2)
pg_tenant1 = env.postgres.create_start("test_metrics_normal_work", tenant_id=tenant_1)
pg_tenant2 = env.postgres.create_start("test_metrics_normal_work", tenant_id=tenant_2)
for endpoint in [endpoint_tenant1, endpoint_tenant2]:
with closing(endpoint.connect()) as conn:
for pg in [pg_tenant1, pg_tenant2]:
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute("CREATE TABLE t(key int primary key, value text)")
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
@@ -242,15 +242,11 @@ def test_pageserver_metrics_removed_after_detach(
env.neon_cli.create_timeline("test_metrics_removed_after_detach", tenant_id=tenant_1)
env.neon_cli.create_timeline("test_metrics_removed_after_detach", tenant_id=tenant_2)
endpoint_tenant1 = env.endpoints.create_start(
"test_metrics_removed_after_detach", tenant_id=tenant_1
)
endpoint_tenant2 = env.endpoints.create_start(
"test_metrics_removed_after_detach", tenant_id=tenant_2
)
pg_tenant1 = env.postgres.create_start("test_metrics_removed_after_detach", tenant_id=tenant_1)
pg_tenant2 = env.postgres.create_start("test_metrics_removed_after_detach", tenant_id=tenant_2)
for endpoint in [endpoint_tenant1, endpoint_tenant2]:
with closing(endpoint.connect()) as conn:
for pg in [pg_tenant1, pg_tenant2]:
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute("CREATE TABLE t(key int primary key, value text)")
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
@@ -321,7 +317,7 @@ def test_pageserver_with_empty_tenants(
), f"Tenant {tenant_with_empty_timelines_dir} should have an empty timelines/ directory"
# Trigger timeline re-initialization after pageserver restart
env.endpoints.stop_all()
env.postgres.stop_all()
env.pageserver.stop()
tenant_without_timelines_dir = env.initial_tenant

View File

@@ -15,10 +15,10 @@ from typing import List, Tuple
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
Endpoint,
LocalFsStorage,
NeonEnv,
NeonEnvBuilder,
Postgres,
RemoteStorageKind,
assert_tenant_status,
available_remote_storages,
@@ -30,10 +30,10 @@ from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import query_scalar, wait_until
async def tenant_workload(env: NeonEnv, endpoint: Endpoint):
async def tenant_workload(env: NeonEnv, pg: Postgres):
await env.pageserver.connect_async()
pg_conn = await endpoint.connect_async()
pg_conn = await pg.connect_async()
await pg_conn.execute("CREATE TABLE t(key int primary key, value text)")
for i in range(1, 100):
@@ -47,10 +47,10 @@ async def tenant_workload(env: NeonEnv, endpoint: Endpoint):
assert res == i * 1000
async def all_tenants_workload(env: NeonEnv, tenants_endpoints):
async def all_tenants_workload(env: NeonEnv, tenants_pgs):
workers = []
for _, endpoint in tenants_endpoints:
worker = tenant_workload(env, endpoint)
for _, pg in tenants_pgs:
worker = tenant_workload(env, pg)
workers.append(asyncio.create_task(worker))
# await all workers
@@ -71,7 +71,7 @@ def test_tenants_many(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Rem
".*init_tenant_mgr: marking .* as locally complete, while it doesnt exist in remote index.*"
)
tenants_endpoints: List[Tuple[TenantId, Endpoint]] = []
tenants_pgs: List[Tuple[TenantId, Postgres]] = []
for _ in range(1, 5):
# Use a tiny checkpoint distance, to create a lot of layers quickly
@@ -82,18 +82,18 @@ def test_tenants_many(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Rem
)
env.neon_cli.create_timeline("test_tenants_many", tenant_id=tenant)
endpoint = env.endpoints.create_start(
pg = env.postgres.create_start(
"test_tenants_many",
tenant_id=tenant,
)
tenants_endpoints.append((tenant, endpoint))
tenants_pgs.append((tenant, pg))
asyncio.run(all_tenants_workload(env, tenants_endpoints))
asyncio.run(all_tenants_workload(env, tenants_pgs))
# Wait for the remote storage uploads to finish
pageserver_http = env.pageserver.http_client()
for tenant, endpoint in tenants_endpoints:
res = endpoint.safe_psql_many(
for tenant, pg in tenants_pgs:
res = pg.safe_psql_many(
["SHOW neon.tenant_id", "SHOW neon.timeline_id", "SELECT pg_current_wal_flush_lsn()"]
)
tenant_id = TenantId(res[0][0][0])
@@ -135,15 +135,15 @@ def test_tenants_attached_after_download(
)
pageserver_http = env.pageserver.http_client()
endpoint = env.endpoints.create_start("main")
pg = env.postgres.create_start("main")
client = env.pageserver.http_client()
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
tenant_id = TenantId(pg.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0])
for checkpoint_number in range(1, 3):
with endpoint.cursor() as cur:
with pg.cursor() as cur:
cur.execute(
f"""
CREATE TABLE t{checkpoint_number}(id int primary key, secret text);
@@ -172,7 +172,7 @@ def test_tenants_attached_after_download(
)
##### Stop the pageserver, erase its layer file to force it being downloaded from S3
env.endpoints.stop_all()
env.postgres.stop_all()
wait_for_sk_commit_lsn_to_reach_remote_storage(
tenant_id, timeline_id, env.safekeepers, env.pageserver
@@ -242,12 +242,12 @@ def test_tenant_redownloads_truncated_file_on_startup(
env.pageserver.allowed_errors.append(".*No timelines to attach received.*")
pageserver_http = env.pageserver.http_client()
endpoint = env.endpoints.create_start("main")
pg = env.postgres.create_start("main")
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
tenant_id = TenantId(pg.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0])
with endpoint.cursor() as cur:
with pg.cursor() as cur:
cur.execute("CREATE TABLE t1 AS VALUES (123, 'foobar');")
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
@@ -255,7 +255,7 @@ def test_tenant_redownloads_truncated_file_on_startup(
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload(pageserver_http, tenant_id, timeline_id, current_lsn)
env.endpoints.stop_all()
env.postgres.stop_all()
env.pageserver.stop()
timeline_dir = Path(env.repo_dir) / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
@@ -311,9 +311,9 @@ def test_tenant_redownloads_truncated_file_on_startup(
os.stat(remote_layer_path).st_size == expected_size
), "truncated file should not had been uploaded around re-download"
endpoint = env.endpoints.create_start("main")
pg = env.postgres.create_start("main")
with endpoint.cursor() as cur:
with pg.cursor() as cur:
cur.execute("INSERT INTO t1 VALUES (234, 'test data');")
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))

View File

@@ -12,13 +12,13 @@ import psycopg2.extras
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
Endpoint,
NeonEnv,
NeonEnvBuilder,
PageserverApiException,
PageserverHttpClient,
PgBin,
PortDistributor,
Postgres,
RemoteStorageKind,
VanillaPostgres,
assert_tenant_status,
@@ -37,10 +37,10 @@ def test_timeline_size(neon_simple_env: NeonEnv):
client = env.pageserver.http_client()
wait_for_timeline_size_init(client, tenant=env.initial_tenant, timeline=new_timeline_id)
endpoint_main = env.endpoints.create_start("test_timeline_size")
pgmain = env.postgres.create_start("test_timeline_size")
log.info("postgres is running on 'test_timeline_size' branch")
with closing(endpoint_main.connect()) as conn:
with closing(pgmain.connect()) as conn:
with conn.cursor() as cur:
cur.execute("CREATE TABLE foo (t text)")
cur.execute(
@@ -73,10 +73,10 @@ def test_timeline_size_createdropdb(neon_simple_env: NeonEnv):
env.initial_tenant, new_timeline_id, include_non_incremental_logical_size=True
)
endpoint_main = env.endpoints.create_start("test_timeline_size_createdropdb")
pgmain = env.postgres.create_start("test_timeline_size_createdropdb")
log.info("postgres is running on 'test_timeline_size_createdropdb' branch")
with closing(endpoint_main.connect()) as conn:
with closing(pgmain.connect()) as conn:
with conn.cursor() as cur:
res = client.timeline_detail(
env.initial_tenant, new_timeline_id, include_non_incremental_logical_size=True
@@ -88,7 +88,7 @@ def test_timeline_size_createdropdb(neon_simple_env: NeonEnv):
), "no writes should not change the incremental logical size"
cur.execute("CREATE DATABASE foodb")
with closing(endpoint_main.connect(dbname="foodb")) as conn:
with closing(pgmain.connect(dbname="foodb")) as conn:
with conn.cursor() as cur2:
cur2.execute("CREATE TABLE foo (t text)")
cur2.execute(
@@ -117,7 +117,7 @@ def test_timeline_size_createdropdb(neon_simple_env: NeonEnv):
# wait until received_lsn_lag is 0
def wait_for_pageserver_catchup(endpoint_main: Endpoint, polling_interval=1, timeout=60):
def wait_for_pageserver_catchup(pgmain: Postgres, polling_interval=1, timeout=60):
started_at = time.time()
received_lsn_lag = 1
@@ -128,7 +128,7 @@ def wait_for_pageserver_catchup(endpoint_main: Endpoint, polling_interval=1, tim
"timed out waiting for pageserver to reach pg_current_wal_flush_lsn()"
)
res = endpoint_main.safe_psql(
res = pgmain.safe_psql(
"""
SELECT
pg_size_pretty(pg_cluster_size()),
@@ -149,20 +149,20 @@ def test_timeline_size_quota(neon_env_builder: NeonEnvBuilder):
wait_for_timeline_size_init(client, tenant=env.initial_tenant, timeline=new_timeline_id)
endpoint_main = env.endpoints.create_start(
pgmain = env.postgres.create_start(
"test_timeline_size_quota",
# Set small limit for the test
config_lines=["neon.max_cluster_size=30MB"],
)
log.info("postgres is running on 'test_timeline_size_quota' branch")
with closing(endpoint_main.connect()) as conn:
with closing(pgmain.connect()) as conn:
with conn.cursor() as cur:
cur.execute("CREATE EXTENSION neon") # TODO move it to neon_fixtures?
cur.execute("CREATE TABLE foo (t text)")
wait_for_pageserver_catchup(endpoint_main)
wait_for_pageserver_catchup(pgmain)
# Insert many rows. This query must fail because of space limit
try:
@@ -174,7 +174,7 @@ def test_timeline_size_quota(neon_env_builder: NeonEnvBuilder):
"""
)
wait_for_pageserver_catchup(endpoint_main)
wait_for_pageserver_catchup(pgmain)
cur.execute(
"""
@@ -194,7 +194,7 @@ def test_timeline_size_quota(neon_env_builder: NeonEnvBuilder):
# drop table to free space
cur.execute("DROP TABLE foo")
wait_for_pageserver_catchup(endpoint_main)
wait_for_pageserver_catchup(pgmain)
# create it again and insert some rows. This query must succeed
cur.execute("CREATE TABLE foo (t text)")
@@ -206,7 +206,7 @@ def test_timeline_size_quota(neon_env_builder: NeonEnvBuilder):
"""
)
wait_for_pageserver_catchup(endpoint_main)
wait_for_pageserver_catchup(pgmain)
cur.execute("SELECT * from pg_size_pretty(pg_cluster_size())")
pg_cluster_size = cur.fetchone()
@@ -230,15 +230,15 @@ def test_timeline_initial_logical_size_calculation_cancellation(
tenant_id, timeline_id = env.neon_cli.create_tenant()
# load in some data
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
endpoint.safe_psql_many(
pg = env.postgres.create_start("main", tenant_id=tenant_id)
pg.safe_psql_many(
[
"CREATE TABLE foo (x INTEGER)",
"INSERT INTO foo SELECT g FROM generate_series(1, 10000) g",
]
)
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
endpoint.stop()
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
pg.stop()
# restart with failpoint inside initial size calculation task
env.pageserver.stop()
@@ -315,9 +315,9 @@ def test_timeline_physical_size_init(
env = neon_env_builder.init_start()
new_timeline_id = env.neon_cli.create_branch("test_timeline_physical_size_init")
endpoint = env.endpoints.create_start("test_timeline_physical_size_init")
pg = env.postgres.create_start("test_timeline_physical_size_init")
endpoint.safe_psql_many(
pg.safe_psql_many(
[
"CREATE TABLE foo (t text)",
"""INSERT INTO foo
@@ -326,7 +326,7 @@ def test_timeline_physical_size_init(
]
)
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, new_timeline_id)
wait_for_last_flush_lsn(env, pg, env.initial_tenant, new_timeline_id)
# restart the pageserer to force calculating timeline's initial physical size
env.pageserver.stop()
@@ -359,9 +359,9 @@ def test_timeline_physical_size_post_checkpoint(
pageserver_http = env.pageserver.http_client()
new_timeline_id = env.neon_cli.create_branch("test_timeline_physical_size_post_checkpoint")
endpoint = env.endpoints.create_start("test_timeline_physical_size_post_checkpoint")
pg = env.postgres.create_start("test_timeline_physical_size_post_checkpoint")
endpoint.safe_psql_many(
pg.safe_psql_many(
[
"CREATE TABLE foo (t text)",
"""INSERT INTO foo
@@ -370,7 +370,7 @@ def test_timeline_physical_size_post_checkpoint(
]
)
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, new_timeline_id)
wait_for_last_flush_lsn(env, pg, env.initial_tenant, new_timeline_id)
pageserver_http.timeline_checkpoint(env.initial_tenant, new_timeline_id)
assert_physical_size_invariants(
@@ -398,7 +398,7 @@ def test_timeline_physical_size_post_compaction(
pageserver_http = env.pageserver.http_client()
new_timeline_id = env.neon_cli.create_branch("test_timeline_physical_size_post_compaction")
endpoint = env.endpoints.create_start("test_timeline_physical_size_post_compaction")
pg = env.postgres.create_start("test_timeline_physical_size_post_compaction")
# We don't want autovacuum to run on the table, while we are calculating the
# physical size, because that could cause a new layer to be created and a
@@ -406,7 +406,7 @@ def test_timeline_physical_size_post_compaction(
# happens, because of some other background activity or autovacuum on other
# tables, we could simply retry the size calculations. It's unlikely that
# that would happen more than once.)
endpoint.safe_psql_many(
pg.safe_psql_many(
[
"CREATE TABLE foo (t text) WITH (autovacuum_enabled = off)",
"""INSERT INTO foo
@@ -415,7 +415,7 @@ def test_timeline_physical_size_post_compaction(
]
)
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, new_timeline_id)
wait_for_last_flush_lsn(env, pg, env.initial_tenant, new_timeline_id)
# shutdown safekeepers to prevent new data from coming in
for sk in env.safekeepers:
@@ -450,10 +450,10 @@ def test_timeline_physical_size_post_gc(
pageserver_http = env.pageserver.http_client()
new_timeline_id = env.neon_cli.create_branch("test_timeline_physical_size_post_gc")
endpoint = env.endpoints.create_start("test_timeline_physical_size_post_gc")
pg = env.postgres.create_start("test_timeline_physical_size_post_gc")
# Like in test_timeline_physical_size_post_compaction, disable autovacuum
endpoint.safe_psql_many(
pg.safe_psql_many(
[
"CREATE TABLE foo (t text) WITH (autovacuum_enabled = off)",
"""INSERT INTO foo
@@ -462,10 +462,10 @@ def test_timeline_physical_size_post_gc(
]
)
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, new_timeline_id)
wait_for_last_flush_lsn(env, pg, env.initial_tenant, new_timeline_id)
pageserver_http.timeline_checkpoint(env.initial_tenant, new_timeline_id)
endpoint.safe_psql(
pg.safe_psql(
"""
INSERT INTO foo
SELECT 'long string to consume some space' || g
@@ -473,7 +473,7 @@ def test_timeline_physical_size_post_gc(
"""
)
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, new_timeline_id)
wait_for_last_flush_lsn(env, pg, env.initial_tenant, new_timeline_id)
pageserver_http.timeline_checkpoint(env.initial_tenant, new_timeline_id)
pageserver_http.timeline_gc(env.initial_tenant, new_timeline_id, gc_horizon=None)
@@ -499,9 +499,9 @@ def test_timeline_size_metrics(
pageserver_http = env.pageserver.http_client()
new_timeline_id = env.neon_cli.create_branch("test_timeline_size_metrics")
endpoint = env.endpoints.create_start("test_timeline_size_metrics")
pg = env.postgres.create_start("test_timeline_size_metrics")
endpoint.safe_psql_many(
pg.safe_psql_many(
[
"CREATE TABLE foo (t text)",
"""INSERT INTO foo
@@ -510,7 +510,7 @@ def test_timeline_size_metrics(
]
)
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, new_timeline_id)
wait_for_last_flush_lsn(env, pg, env.initial_tenant, new_timeline_id)
pageserver_http.timeline_checkpoint(env.initial_tenant, new_timeline_id)
# get the metrics and parse the metric for the current timeline's physical size
@@ -562,7 +562,7 @@ def test_timeline_size_metrics(
# The sum of the sizes of all databases, as seen by pg_database_size(), should also
# be close. Again allow some slack, the logical size metric includes some things like
# the SLRUs that are not included in pg_database_size().
dbsize_sum = endpoint.safe_psql("select sum(pg_database_size(oid)) from pg_database")[0][0]
dbsize_sum = pg.safe_psql("select sum(pg_database_size(oid)) from pg_database")[0][0]
assert math.isclose(dbsize_sum, tl_logical_size_metric, abs_tol=2 * 1024 * 1024)
@@ -596,16 +596,16 @@ def test_tenant_physical_size(
n_rows = random.randint(100, 1000)
timeline = env.neon_cli.create_branch(f"test_tenant_physical_size_{i}", tenant_id=tenant)
endpoint = env.endpoints.create_start(f"test_tenant_physical_size_{i}", tenant_id=tenant)
pg = env.postgres.create_start(f"test_tenant_physical_size_{i}", tenant_id=tenant)
endpoint.safe_psql_many(
pg.safe_psql_many(
[
"CREATE TABLE foo (t text)",
f"INSERT INTO foo SELECT 'long string to consume some space' || g FROM generate_series(1, {n_rows}) g",
]
)
wait_for_last_flush_lsn(env, endpoint, tenant, timeline)
wait_for_last_flush_lsn(env, pg, tenant, timeline)
pageserver_http.timeline_checkpoint(tenant, timeline)
if remote_storage_kind is not None:
@@ -613,7 +613,7 @@ def test_tenant_physical_size(
timeline_total_resident_physical_size += get_timeline_resident_physical_size(timeline)
endpoint.stop()
pg.stop()
# ensure that tenant_status current_physical size reports sum of timeline current_physical_size
tenant_current_physical_size = int(

View File

@@ -27,8 +27,8 @@ def test_truncate(neon_env_builder: NeonEnvBuilder, zenbenchmark):
)
env.neon_cli.create_timeline("test_truncate", tenant_id=tenant)
endpoint = env.endpoints.create_start("test_truncate", tenant_id=tenant)
cur = endpoint.connect().cursor()
pg = env.postgres.create_start("test_truncate", tenant_id=tenant)
cur = pg.connect().cursor()
cur.execute("create table t1(x integer)")
cur.execute(f"insert into t1 values (generate_series(1,{n_records}))")
cur.execute("vacuum t1")

View File

@@ -10,12 +10,10 @@ from fixtures.neon_fixtures import NeonEnv, fork_at_current_lsn
def test_twophase(neon_simple_env: NeonEnv):
env = neon_simple_env
env.neon_cli.create_branch("test_twophase", "empty")
endpoint = env.endpoints.create_start(
"test_twophase", config_lines=["max_prepared_transactions=5"]
)
pg = env.postgres.create_start("test_twophase", config_lines=["max_prepared_transactions=5"])
log.info("postgres is running on 'test_twophase' branch")
conn = endpoint.connect()
conn = pg.connect()
cur = conn.cursor()
cur.execute("CREATE TABLE foo (t text)")
@@ -44,7 +42,7 @@ def test_twophase(neon_simple_env: NeonEnv):
# pg_twophase directory and fsynced
cur.execute("CHECKPOINT")
twophase_files = os.listdir(endpoint.pg_twophase_dir_path())
twophase_files = os.listdir(pg.pg_twophase_dir_path())
log.info(twophase_files)
assert len(twophase_files) == 4
@@ -52,25 +50,25 @@ def test_twophase(neon_simple_env: NeonEnv):
cur.execute("ROLLBACK PREPARED 'insert_four'")
cur.execute("CHECKPOINT")
twophase_files = os.listdir(endpoint.pg_twophase_dir_path())
twophase_files = os.listdir(pg.pg_twophase_dir_path())
log.info(twophase_files)
assert len(twophase_files) == 2
# Create a branch with the transaction in prepared state
fork_at_current_lsn(env, endpoint, "test_twophase_prepared", "test_twophase")
fork_at_current_lsn(env, pg, "test_twophase_prepared", "test_twophase")
# Start compute on the new branch
endpoint2 = env.endpoints.create_start(
pg2 = env.postgres.create_start(
"test_twophase_prepared",
config_lines=["max_prepared_transactions=5"],
)
# Check that we restored only needed twophase files
twophase_files2 = os.listdir(endpoint2.pg_twophase_dir_path())
twophase_files2 = os.listdir(pg2.pg_twophase_dir_path())
log.info(twophase_files2)
assert twophase_files2.sort() == twophase_files.sort()
conn2 = endpoint2.connect()
conn2 = pg2.connect()
cur2 = conn2.cursor()
# On the new branch, commit one of the prepared transactions,

View File

@@ -9,9 +9,9 @@ from fixtures.neon_fixtures import NeonEnv, fork_at_current_lsn
def test_unlogged(neon_simple_env: NeonEnv):
env = neon_simple_env
env.neon_cli.create_branch("test_unlogged", "empty")
endpoint = env.endpoints.create_start("test_unlogged")
pg = env.postgres.create_start("test_unlogged")
conn = endpoint.connect()
conn = pg.connect()
cur = conn.cursor()
cur.execute("CREATE UNLOGGED TABLE iut (id int);")
@@ -20,10 +20,12 @@ def test_unlogged(neon_simple_env: NeonEnv):
cur.execute("INSERT INTO iut values (42);")
# create another compute to fetch inital empty contents from pageserver
fork_at_current_lsn(env, endpoint, "test_unlogged_basebackup", "test_unlogged")
endpoint2 = env.endpoints.create_start("test_unlogged_basebackup")
fork_at_current_lsn(env, pg, "test_unlogged_basebackup", "test_unlogged")
pg2 = env.postgres.create_start(
"test_unlogged_basebackup",
)
conn2 = endpoint2.connect()
conn2 = pg2.connect()
cur2 = conn2.cursor()
# after restart table should be empty but valid
cur2.execute("PREPARE iut_plan (int) AS INSERT INTO iut VALUES ($1)")

Some files were not shown because too many files have changed in this diff Show More