Merge pull request #3411 from neondatabase/release_2023_01_23

Fix Release 2023 01 23
This commit is contained in:
Anastasia Lubennikova
2023-01-23 20:10:03 +02:00
committed by GitHub
33 changed files with 773 additions and 350 deletions

View File

@@ -7,7 +7,7 @@ storage:
broker_endpoint: http://storage-broker.prod.local:50051
pageserver_config_stub:
pg_distrib_dir: /usr/local
metric_collection_endpoint: http://console-staging.local/billing/api/v1/usage_events
metric_collection_endpoint: http://console-release.local/billing/api/v1/usage_events
metric_collection_interval: 10min
remote_storage:
bucket_name: "{{ bucket_name }}"

View File

@@ -18,7 +18,7 @@ storage:
ansible_aws_ssm_region: eu-west-1
ansible_aws_ssm_bucket_name: neon-dev-storage-eu-west-1
console_region_id: aws-eu-west-1
sentry_environment: development
sentry_environment: staging
children:
pageservers:

View File

@@ -18,7 +18,7 @@ storage:
ansible_aws_ssm_region: us-east-2
ansible_aws_ssm_bucket_name: neon-staging-storage-us-east-2
console_region_id: aws-us-east-2
sentry_environment: development
sentry_environment: staging
children:
pageservers:
@@ -29,6 +29,8 @@ storage:
ansible_host: i-0565a8b4008aa3f40
pageserver-2.us-east-2.aws.neon.build:
ansible_host: i-01e31cdf7e970586a
pageserver-3.us-east-2.aws.neon.build:
ansible_host: i-0602a0291365ef7cc
safekeepers:
hosts:

View File

@@ -8,7 +8,7 @@ settings:
authBackend: "console"
authEndpoint: "http://console-staging.local/management/api/v2"
domain: "*.eu-west-1.aws.neon.build"
sentryEnvironment: "development"
sentryEnvironment: "staging"
wssPort: 8443
metricCollectionEndpoint: "http://console-staging.local/billing/api/v1/usage_events"
metricCollectionInterval: "1min"

View File

@@ -49,4 +49,4 @@ extraManifests:
- "{{ .Release.Namespace }}"
settings:
sentryEnvironment: "development"
sentryEnvironment: "staging"

View File

@@ -8,7 +8,7 @@ settings:
authBackend: "link"
authEndpoint: "https://console.stage.neon.tech/authenticate_proxy_request/"
uri: "https://console.stage.neon.tech/psql_session/"
sentryEnvironment: "development"
sentryEnvironment: "staging"
metricCollectionEndpoint: "http://console-staging.local/billing/api/v1/usage_events"
metricCollectionInterval: "1min"

View File

@@ -8,7 +8,7 @@ settings:
authBackend: "console"
authEndpoint: "http://console-staging.local/management/api/v2"
domain: "*.cloud.stage.neon.tech"
sentryEnvironment: "development"
sentryEnvironment: "staging"
wssPort: 8443
metricCollectionEndpoint: "http://console-staging.local/billing/api/v1/usage_events"
metricCollectionInterval: "1min"

View File

@@ -8,7 +8,7 @@ settings:
authBackend: "console"
authEndpoint: "http://console-staging.local/management/api/v2"
domain: "*.us-east-2.aws.neon.build"
sentryEnvironment: "development"
sentryEnvironment: "staging"
wssPort: 8443
metricCollectionEndpoint: "http://console-staging.local/billing/api/v1/usage_events"
metricCollectionInterval: "1min"

View File

@@ -49,4 +49,4 @@ extraManifests:
- "{{ .Release.Namespace }}"
settings:
sentryEnvironment: "development"
sentryEnvironment: "staging"

6
Cargo.lock generated
View File

@@ -833,10 +833,8 @@ dependencies = [
"anyhow",
"chrono",
"clap 4.0.32",
"env_logger",
"futures",
"hyper",
"log",
"notify",
"postgres",
"regex",
@@ -845,6 +843,8 @@ dependencies = [
"tar",
"tokio",
"tokio-postgres",
"tracing",
"tracing-subscriber",
"url",
"workspace_hack",
]
@@ -1954,7 +1954,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e"
dependencies = [
"cfg-if",
"serde",
]
[[package]]
@@ -4565,6 +4564,7 @@ dependencies = [
"tower",
"tracing",
"tracing-core",
"tracing-subscriber",
"url",
]

View File

@@ -8,10 +8,8 @@ license.workspace = true
anyhow.workspace = true
chrono.workspace = true
clap.workspace = true
env_logger.workspace = true
futures.workspace = true
hyper = { workspace = true, features = ["full"] }
log = { workspace = true, features = ["std", "serde"] }
notify.workspace = true
postgres.workspace = true
regex.workspace = true
@@ -20,6 +18,8 @@ serde_json.workspace = true
tar.workspace = true
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
tokio-postgres.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
url.workspace = true
workspace_hack.workspace = true

View File

@@ -40,7 +40,7 @@ use std::{thread, time::Duration};
use anyhow::{Context, Result};
use chrono::Utc;
use clap::Arg;
use log::{error, info};
use tracing::{error, info};
use compute_tools::compute::{ComputeMetrics, ComputeNode, ComputeState, ComputeStatus};
use compute_tools::http::api::launch_http_server;
@@ -53,7 +53,6 @@ use compute_tools::spec::*;
use url::Url;
fn main() -> Result<()> {
// TODO: re-use `utils::logging` later
init_logger(DEFAULT_LOG_LEVEL)?;
let matches = cli().get_matches();
@@ -122,29 +121,45 @@ fn main() -> Result<()> {
// Also spawn the thread responsible for handling the VM informant -- if it's present
let _vm_informant_handle = spawn_vm_informant_if_present().expect("cannot launch VM informant");
// Run compute (Postgres) and hang waiting on it.
match compute.prepare_and_run() {
Ok(ec) => {
let code = ec.code().unwrap_or(1);
info!("Postgres exited with code {}, shutting down", code);
exit(code)
}
Err(error) => {
error!("could not start the compute node: {:?}", error);
// Start Postgres
let mut delay_exit = false;
let mut exit_code = None;
let pg = match compute.start_compute() {
Ok(pg) => Some(pg),
Err(err) => {
error!("could not start the compute node: {:?}", err);
let mut state = compute.state.write().unwrap();
state.error = Some(format!("{:?}", error));
state.error = Some(format!("{:?}", err));
state.status = ComputeStatus::Failed;
drop(state);
// Keep serving HTTP requests, so the cloud control plane was able to
// get the actual error.
info!("giving control plane 30s to collect the error before shutdown");
thread::sleep(Duration::from_secs(30));
info!("shutting down");
Err(error)
delay_exit = true;
None
}
};
// Wait for the child Postgres process forever. In this state Ctrl+C will
// propagate to Postgres and it will be shut down as well.
if let Some(mut pg) = pg {
let ecode = pg
.wait()
.expect("failed to start waiting on Postgres process");
info!("Postgres exited with code {}, shutting down", ecode);
exit_code = ecode.code()
}
if let Err(err) = compute.check_for_core_dumps() {
error!("error while checking for core dumps: {err:?}");
}
// If launch failed, keep serving HTTP requests for a while, so the cloud
// control plane can get the actual error.
if delay_exit {
info!("giving control plane 30s to collect the error before shutdown");
thread::sleep(Duration::from_secs(30));
info!("shutting down");
}
exit(exit_code.unwrap_or(1))
}
fn cli() -> clap::Command {

View File

@@ -1,10 +1,11 @@
use anyhow::{anyhow, Result};
use log::error;
use postgres::Client;
use tokio_postgres::NoTls;
use tracing::{error, instrument};
use crate::compute::ComputeNode;
#[instrument(skip_all)]
pub fn create_writability_check_data(client: &mut Client) -> Result<()> {
let query = "
CREATE TABLE IF NOT EXISTS health_check (
@@ -21,6 +22,7 @@ pub fn create_writability_check_data(client: &mut Client) -> Result<()> {
Ok(())
}
#[instrument(skip_all)]
pub async fn check_writability(compute: &ComputeNode) -> Result<()> {
let (client, connection) = tokio_postgres::connect(compute.connstr.as_str(), NoTls).await?;
if client.is_closed() {

View File

@@ -17,15 +17,15 @@
use std::fs;
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::process::{Command, ExitStatus, Stdio};
use std::process::{Command, Stdio};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::RwLock;
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use log::{info, warn};
use postgres::{Client, NoTls};
use serde::{Serialize, Serializer};
use tracing::{info, instrument, warn};
use crate::checker::create_writability_check_data;
use crate::config;
@@ -121,6 +121,7 @@ impl ComputeNode {
// Get basebackup from the libpq connection to pageserver using `connstr` and
// unarchive it to `pgdata` directory overriding all its previous content.
#[instrument(skip(self))]
fn get_basebackup(&self, lsn: &str) -> Result<()> {
let start_time = Utc::now();
@@ -154,6 +155,7 @@ impl ComputeNode {
// Run `postgres` in a special mode with `--sync-safekeepers` argument
// and return the reported LSN back to the caller.
#[instrument(skip(self))]
fn sync_safekeepers(&self) -> Result<String> {
let start_time = Utc::now();
@@ -196,6 +198,7 @@ impl ComputeNode {
/// Do all the preparations like PGDATA directory creation, configuration,
/// safekeepers sync, basebackup, etc.
#[instrument(skip(self))]
pub fn prepare_pgdata(&self) -> Result<()> {
let spec = &self.spec;
let pgdata_path = Path::new(&self.pgdata);
@@ -229,9 +232,8 @@ impl ComputeNode {
/// Start Postgres as a child process and manage DBs/roles.
/// After that this will hang waiting on the postmaster process to exit.
pub fn run(&self) -> Result<ExitStatus> {
let start_time = Utc::now();
#[instrument(skip(self))]
pub fn start_postgres(&self) -> Result<std::process::Child> {
let pgdata_path = Path::new(&self.pgdata);
// Run postgres as a child process.
@@ -242,10 +244,15 @@ impl ComputeNode {
wait_for_postgres(&mut pg, pgdata_path)?;
Ok(pg)
}
#[instrument(skip(self))]
pub fn apply_config(&self) -> Result<()> {
// If connection fails,
// it may be the old node with `zenith_admin` superuser.
//
// In this case we need to connect with old `zenith_admin`name
// In this case we need to connect with old `zenith_admin` name
// and create new user. We cannot simply rename connected user,
// but we can create a new one and grant it all privileges.
let mut client = match Client::connect(self.connstr.as_str(), NoTls) {
@@ -271,6 +278,7 @@ impl ComputeNode {
Ok(client) => client,
};
// Proceed with post-startup configuration. Note, that order of operations is important.
handle_roles(&self.spec, &mut client)?;
handle_databases(&self.spec, &mut client)?;
handle_role_deletions(self, &mut client)?;
@@ -279,8 +287,34 @@ impl ComputeNode {
// 'Close' connection
drop(client);
let startup_end_time = Utc::now();
info!(
"finished configuration of compute for project {}",
self.spec.cluster.cluster_id
);
Ok(())
}
#[instrument(skip(self))]
pub fn start_compute(&self) -> Result<std::process::Child> {
info!(
"starting compute for project {}, operation {}, tenant {}, timeline {}",
self.spec.cluster.cluster_id,
self.spec.operation_uuid.as_ref().unwrap(),
self.tenant,
self.timeline,
);
self.prepare_pgdata()?;
let start_time = Utc::now();
let pg = self.start_postgres()?;
self.apply_config()?;
let startup_end_time = Utc::now();
self.metrics.config_ms.store(
startup_end_time
.signed_duration_since(start_time)
@@ -300,34 +334,7 @@ impl ComputeNode {
self.set_status(ComputeStatus::Running);
info!(
"finished configuration of compute for project {}",
self.spec.cluster.cluster_id
);
// Wait for child Postgres process basically forever. In this state Ctrl+C
// will propagate to Postgres and it will be shut down as well.
let ecode = pg
.wait()
.expect("failed to start waiting on Postgres process");
self.check_for_core_dumps()
.expect("failed to check for core dumps");
Ok(ecode)
}
pub fn prepare_and_run(&self) -> Result<ExitStatus> {
info!(
"starting compute for project {}, operation {}, tenant {}, timeline {}",
self.spec.cluster.cluster_id,
self.spec.operation_uuid.as_ref().unwrap(),
self.tenant,
self.timeline,
);
self.prepare_pgdata()?;
self.run()
Ok(pg)
}
// Look for core dumps and collect backtraces.
@@ -340,7 +347,7 @@ impl ComputeNode {
//
// Use that as a default location and pattern, except macos where core dumps are written
// to /cores/ directory by default.
fn check_for_core_dumps(&self) -> Result<()> {
pub fn check_for_core_dumps(&self) -> Result<()> {
let core_dump_dir = match std::env::consts::OS {
"macos" => Path::new("/cores/"),
_ => Path::new(&self.pgdata),

View File

@@ -6,8 +6,8 @@ use std::thread;
use anyhow::Result;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use log::{error, info};
use serde_json;
use tracing::{error, info};
use crate::compute::ComputeNode;

View File

@@ -1,8 +1,8 @@
use log::{info, warn};
use std::path::Path;
use std::process;
use std::thread;
use std::time::Duration;
use tracing::{info, warn};
use anyhow::{Context, Result};

View File

@@ -1,42 +1,20 @@
use std::io::Write;
use anyhow::Result;
use chrono::Utc;
use env_logger::{Builder, Env};
macro_rules! info_println {
($($tts:tt)*) => {
if log_enabled!(Level::Info) {
println!($($tts)*);
}
}
}
macro_rules! info_print {
($($tts:tt)*) => {
if log_enabled!(Level::Info) {
print!($($tts)*);
}
}
}
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::prelude::*;
/// Initialize `env_logger` using either `default_level` or
/// `RUST_LOG` environment variable as default log level.
pub fn init_logger(default_level: &str) -> Result<()> {
let env = Env::default().filter_or("RUST_LOG", default_level);
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(default_level));
Builder::from_env(env)
.format(|buf, record| {
let thread_handle = std::thread::current();
writeln!(
buf,
"{} [{}] {}: {}",
Utc::now().format("%Y-%m-%d %H:%M:%S%.3f %Z"),
thread_handle.name().unwrap_or("main"),
record.level(),
record.args()
)
})
let fmt_layer = tracing_subscriber::fmt::layer()
.with_target(false)
.with_writer(std::io::stderr);
tracing_subscriber::registry()
.with(env_filter)
.with(fmt_layer)
.init();
Ok(())

View File

@@ -3,8 +3,8 @@ use std::{thread, time};
use anyhow::Result;
use chrono::{DateTime, Utc};
use log::{debug, info};
use postgres::{Client, NoTls};
use tracing::{debug, info};
use crate::compute::ComputeNode;

View File

@@ -11,6 +11,7 @@ use anyhow::{bail, Result};
use notify::{RecursiveMode, Watcher};
use postgres::{Client, Transaction};
use serde::Deserialize;
use tracing::{debug, instrument};
const POSTGRES_WAIT_TIMEOUT: Duration = Duration::from_millis(60 * 1000); // milliseconds
@@ -129,8 +130,8 @@ impl Role {
/// Serialize a list of role parameters into a Postgres-acceptable
/// string of arguments.
pub fn to_pg_options(&self) -> String {
// XXX: consider putting LOGIN as a default option somewhere higher, e.g. in Rails.
// For now we do not use generic `options` for roles. Once used, add
// XXX: consider putting LOGIN as a default option somewhere higher, e.g. in control-plane.
// For now, we do not use generic `options` for roles. Once used, add
// `self.options.as_pg_options()` somewhere here.
let mut params: String = "LOGIN".to_string();
@@ -229,6 +230,7 @@ pub fn get_existing_dbs(client: &mut Client) -> Result<Vec<Database>> {
/// Wait for Postgres to become ready to accept connections. It's ready to
/// accept connections when the state-field in `pgdata/postmaster.pid` says
/// 'ready'.
#[instrument(skip(pg))]
pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> {
let pid_path = pgdata.join("postmaster.pid");
@@ -287,18 +289,18 @@ pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> {
}
let res = rx.recv_timeout(Duration::from_millis(100));
log::debug!("woken up by notify: {res:?}");
debug!("woken up by notify: {res:?}");
// If there are multiple events in the channel already, we only need to be
// check once. Swallow the extra events before we go ahead to check the
// pid file.
while let Ok(res) = rx.try_recv() {
log::debug!("swallowing extra event: {res:?}");
debug!("swallowing extra event: {res:?}");
}
// Check that we can open pid file first.
if let Ok(file) = File::open(&pid_path) {
if !postmaster_pid_seen {
log::debug!("postmaster.pid appeared");
debug!("postmaster.pid appeared");
watcher
.unwatch(pgdata)
.expect("Failed to remove pgdata dir watch");
@@ -314,7 +316,7 @@ pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> {
// Pid file could be there and we could read it, but it could be empty, for example.
if let Some(Ok(line)) = last_line {
let status = line.trim();
log::debug!("last line of postmaster.pid: {status:?}");
debug!("last line of postmaster.pid: {status:?}");
// Now Postgres is ready to accept connections
if status == "ready" {
@@ -330,7 +332,7 @@ pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> {
}
}
log::info!("PostgreSQL is now running, continuing to configure it");
tracing::info!("PostgreSQL is now running, continuing to configure it");
Ok(())
}

View File

@@ -1,12 +1,11 @@
use std::path::Path;
use std::str::FromStr;
use std::time::Instant;
use anyhow::Result;
use log::{info, log_enabled, warn, Level};
use postgres::config::Config;
use postgres::{Client, NoTls};
use serde::Deserialize;
use tracing::{info, info_span, instrument, span_enabled, warn, Level};
use crate::compute::ComputeNode;
use crate::config;
@@ -80,23 +79,25 @@ pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
/// Given a cluster spec json and open transaction it handles roles creation,
/// deletion and update.
#[instrument(skip_all)]
pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
let mut xact = client.transaction()?;
let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
// Print a list of existing Postgres roles (only in debug mode)
info!("postgres roles:");
for r in &existing_roles {
info_println!(
"{} - {}:{}",
" ".repeat(27 + 5),
r.name,
if r.encrypted_password.is_some() {
"[FILTERED]"
} else {
"(null)"
}
);
if span_enabled!(Level::INFO) {
info!("postgres roles:");
for r in &existing_roles {
info!(
" - {}:{}",
r.name,
if r.encrypted_password.is_some() {
"[FILTERED]"
} else {
"(null)"
}
);
}
}
// Process delta operations first
@@ -137,58 +138,68 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
info!("cluster spec roles:");
for role in &spec.cluster.roles {
let name = &role.name;
info_print!(
"{} - {}:{}",
" ".repeat(27 + 5),
name,
if role.encrypted_password.is_some() {
"[FILTERED]"
} else {
"(null)"
}
);
// XXX: with a limited number of roles it is fine, but consider making it a HashMap
let pg_role = existing_roles.iter().find(|r| r.name == *name);
if let Some(r) = pg_role {
let mut update_role = false;
enum RoleAction {
None,
Update,
Create,
}
let action = if let Some(r) = pg_role {
if (r.encrypted_password.is_none() && role.encrypted_password.is_some())
|| (r.encrypted_password.is_some() && role.encrypted_password.is_none())
{
update_role = true;
RoleAction::Update
} else if let Some(pg_pwd) = &r.encrypted_password {
// Check whether password changed or not (trim 'md5:' prefix first)
update_role = pg_pwd[3..] != *role.encrypted_password.as_ref().unwrap();
if pg_pwd[3..] != *role.encrypted_password.as_ref().unwrap() {
RoleAction::Update
} else {
RoleAction::None
}
} else {
RoleAction::None
}
} else {
RoleAction::Create
};
if update_role {
match action {
RoleAction::None => {}
RoleAction::Update => {
let mut query: String = format!("ALTER ROLE {} ", name.pg_quote());
info_print!(" -> update");
query.push_str(&role.to_pg_options());
xact.execute(query.as_str(), &[])?;
}
} else {
info!("role name: '{}'", &name);
let mut query: String = format!("CREATE ROLE {} ", name.pg_quote());
info!("role create query: '{}'", &query);
info_print!(" -> create");
RoleAction::Create => {
let mut query: String = format!("CREATE ROLE {} ", name.pg_quote());
info!("role create query: '{}'", &query);
query.push_str(&role.to_pg_options());
xact.execute(query.as_str(), &[])?;
query.push_str(&role.to_pg_options());
xact.execute(query.as_str(), &[])?;
let grant_query = format!(
"GRANT pg_read_all_data, pg_write_all_data TO {}",
name.pg_quote()
);
xact.execute(grant_query.as_str(), &[])?;
info!("role grant query: '{}'", &grant_query);
let grant_query = format!(
"GRANT pg_read_all_data, pg_write_all_data TO {}",
name.pg_quote()
);
xact.execute(grant_query.as_str(), &[])?;
info!("role grant query: '{}'", &grant_query);
}
}
info_print!("\n");
if span_enabled!(Level::INFO) {
let pwd = if role.encrypted_password.is_some() {
"[FILTERED]"
} else {
"(null)"
};
let action_str = match action {
RoleAction::None => "",
RoleAction::Create => " -> create",
RoleAction::Update => " -> update",
};
info!(" - {}:{}{}", name, pwd, action_str);
}
}
xact.commit()?;
@@ -197,12 +208,25 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
}
/// Reassign all dependent objects and delete requested roles.
#[instrument(skip_all)]
pub fn handle_role_deletions(node: &ComputeNode, client: &mut Client) -> Result<()> {
if let Some(ops) = &node.spec.delta_operations {
// First, reassign all dependent objects to db owners.
info!("reassigning dependent objects of to-be-deleted roles");
// Fetch existing roles. We could've exported and used `existing_roles` from
// `handle_roles()`, but we only make this list there before creating new roles.
// Which is probably fine as we never create to-be-deleted roles, but that'd
// just look a bit untidy. Anyway, the entire `pg_roles` should be in shared
// buffers already, so this shouldn't be a big deal.
let mut xact = client.transaction()?;
let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
xact.commit()?;
for op in ops {
if op.action == "delete_role" {
// Check that role is still present in Postgres, as this could be a
// restart with the same spec after role deletion.
if op.action == "delete_role" && existing_roles.iter().any(|r| r.name == op.name) {
reassign_owned_objects(node, &op.name)?;
}
}
@@ -261,13 +285,16 @@ fn reassign_owned_objects(node: &ComputeNode, role_name: &PgIdent) -> Result<()>
/// like `CREATE DATABASE` and `DROP DATABASE` do not support it. Statement-level
/// atomicity should be enough here due to the order of operations and various checks,
/// which together provide us idempotency.
#[instrument(skip_all)]
pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
let existing_dbs: Vec<Database> = get_existing_dbs(client)?;
// Print a list of existing Postgres databases (only in debug mode)
info!("postgres databases:");
for r in &existing_dbs {
info_println!("{} - {}:{}", " ".repeat(27 + 5), r.name, r.owner);
if span_enabled!(Level::INFO) {
info!("postgres databases:");
for r in &existing_dbs {
info!(" {}:{}", r.name, r.owner);
}
}
// Process delta operations first
@@ -310,13 +337,15 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
for db in &spec.cluster.databases {
let name = &db.name;
info_print!("{} - {}:{}", " ".repeat(27 + 5), db.name, db.owner);
// XXX: with a limited number of databases it is fine, but consider making it a HashMap
let pg_db = existing_dbs.iter().find(|r| r.name == *name);
let start_time = Instant::now();
if let Some(r) = pg_db {
enum DatabaseAction {
None,
Update,
Create,
}
let action = if let Some(r) = pg_db {
// XXX: db owner name is returned as quoted string from Postgres,
// when quoting is needed.
let new_owner = if r.owner.starts_with('"') {
@@ -326,29 +355,42 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
};
if new_owner != r.owner {
// Update the owner
DatabaseAction::Update
} else {
DatabaseAction::None
}
} else {
DatabaseAction::Create
};
match action {
DatabaseAction::None => {}
DatabaseAction::Update => {
let query: String = format!(
"ALTER DATABASE {} OWNER TO {}",
name.pg_quote(),
db.owner.pg_quote()
);
info_print!(" -> update");
let _ = info_span!("executing", query).entered();
client.execute(query.as_str(), &[])?;
let elapsed = start_time.elapsed().as_millis();
info_print!(" ({} ms)", elapsed);
}
} else {
let mut query: String = format!("CREATE DATABASE {} ", name.pg_quote());
info_print!(" -> create");
DatabaseAction::Create => {
let mut query: String = format!("CREATE DATABASE {} ", name.pg_quote());
query.push_str(&db.to_pg_options());
let _ = info_span!("executing", query).entered();
client.execute(query.as_str(), &[])?;
}
};
query.push_str(&db.to_pg_options());
client.execute(query.as_str(), &[])?;
let elapsed = start_time.elapsed().as_millis();
info_print!(" ({} ms)", elapsed);
if span_enabled!(Level::INFO) {
let action_str = match action {
DatabaseAction::None => "",
DatabaseAction::Create => " -> create",
DatabaseAction::Update => " -> update",
};
info!(" - {}:{}{}", db.name, db.owner, action_str);
}
info_print!("\n");
}
Ok(())
@@ -356,6 +398,7 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
/// Grant CREATE ON DATABASE to the database owner and do some other alters and grants
/// to allow users creating trusted extensions and re-creating `public` schema, for example.
#[instrument(skip_all)]
pub fn handle_grants(node: &ComputeNode, client: &mut Client) -> Result<()> {
let spec = &node.spec;

View File

@@ -134,22 +134,25 @@ impl<K: std::hash::Hash + Eq + 'static> Storage<K> {
op: Cow<'static, str>,
lsn: u64,
size: Option<u64>,
) where
) -> anyhow::Result<()>
where
K: std::borrow::Borrow<Q>,
Q: std::hash::Hash + Eq + std::fmt::Debug,
{
let lastseg_id = *self.branches.get(branch).unwrap();
let Some(lastseg_id) = self.branches.get(branch).copied() else { anyhow::bail!("branch not found: {branch:?}") };
let newseg_id = self.segments.len();
let lastseg = &mut self.segments[lastseg_id];
assert!(lsn > lastseg.end_lsn);
let Some(start_size) = lastseg.end_size else { anyhow::bail!("no end_size on latest segment for {branch:?}") };
let newseg = Segment {
op,
parent: Some(lastseg_id),
start_lsn: lastseg.end_lsn,
end_lsn: lsn,
start_size: lastseg.end_size.unwrap(),
start_size,
end_size: size,
children_after: Vec::new(),
needed: false,
@@ -158,6 +161,8 @@ impl<K: std::hash::Hash + Eq + 'static> Storage<K> {
self.segments.push(newseg);
*self.branches.get_mut(branch).expect("read already") = newseg_id;
Ok(())
}
/// Advances the branch with the named operation, by the relative LSN and logical size bytes.
@@ -167,21 +172,24 @@ impl<K: std::hash::Hash + Eq + 'static> Storage<K> {
op: Cow<'static, str>,
lsn_bytes: u64,
size_bytes: i64,
) where
) -> anyhow::Result<()>
where
K: std::borrow::Borrow<Q>,
Q: std::hash::Hash + Eq,
Q: std::hash::Hash + Eq + std::fmt::Debug,
{
let lastseg_id = *self.branches.get(branch).unwrap();
let Some(lastseg_id) = self.branches.get(branch).copied() else { anyhow::bail!("branch not found: {branch:?}") };
let newseg_id = self.segments.len();
let lastseg = &mut self.segments[lastseg_id];
let Some(last_end_size) = lastseg.end_size else { anyhow::bail!("no end_size on latest segment for {branch:?}") };
let newseg = Segment {
op,
parent: Some(lastseg_id),
start_lsn: lastseg.end_lsn,
end_lsn: lastseg.end_lsn + lsn_bytes,
start_size: lastseg.end_size.unwrap(),
end_size: Some((lastseg.end_size.unwrap() as i64 + size_bytes) as u64),
start_size: last_end_size,
end_size: Some((last_end_size as i64 + size_bytes) as u64),
children_after: Vec::new(),
needed: false,
};
@@ -189,33 +197,33 @@ impl<K: std::hash::Hash + Eq + 'static> Storage<K> {
self.segments.push(newseg);
*self.branches.get_mut(branch).expect("read already") = newseg_id;
Ok(())
}
pub fn insert<Q: ?Sized>(&mut self, branch: &Q, bytes: u64)
pub fn insert<Q: ?Sized>(&mut self, branch: &Q, bytes: u64) -> anyhow::Result<()>
where
K: std::borrow::Borrow<Q>,
Q: std::hash::Hash + Eq,
Q: std::hash::Hash + Eq + std::fmt::Debug,
{
self.modify_branch(branch, "insert".into(), bytes, bytes as i64);
self.modify_branch(branch, "insert".into(), bytes, bytes as i64)
}
pub fn update<Q: ?Sized>(&mut self, branch: &Q, bytes: u64)
pub fn update<Q: ?Sized>(&mut self, branch: &Q, bytes: u64) -> anyhow::Result<()>
where
K: std::borrow::Borrow<Q>,
Q: std::hash::Hash + Eq,
Q: std::hash::Hash + Eq + std::fmt::Debug,
{
self.modify_branch(branch, "update".into(), bytes, 0i64);
self.modify_branch(branch, "update".into(), bytes, 0i64)
}
pub fn delete<Q: ?Sized>(&mut self, branch: &Q, bytes: u64)
pub fn delete<Q: ?Sized>(&mut self, branch: &Q, bytes: u64) -> anyhow::Result<()>
where
K: std::borrow::Borrow<Q>,
Q: std::hash::Hash + Eq,
Q: std::hash::Hash + Eq + std::fmt::Debug,
{
self.modify_branch(branch, "delete".into(), bytes, -(bytes as i64));
self.modify_branch(branch, "delete".into(), bytes, -(bytes as i64))
}
/// Panics if the parent branch cannot be found.
pub fn branch<Q: ?Sized>(&mut self, parent: &Q, name: K) -> anyhow::Result<()>
where
K: std::borrow::Borrow<Q> + std::fmt::Debug,
@@ -236,7 +244,7 @@ impl<K: std::hash::Hash + Eq + 'static> Storage<K> {
Ok(())
}
pub fn calculate(&mut self, retention_period: u64) -> SegmentSize {
pub fn calculate(&mut self, retention_period: u64) -> anyhow::Result<SegmentSize> {
// Phase 1: Mark all the segments that need to be retained
for (_branch, &last_seg_id) in self.branches.iter() {
let last_seg = &self.segments[last_seg_id];
@@ -261,7 +269,7 @@ impl<K: std::hash::Hash + Eq + 'static> Storage<K> {
self.size_from_snapshot_later(0)
}
fn size_from_wal(&self, seg_id: usize) -> SegmentSize {
fn size_from_wal(&self, seg_id: usize) -> anyhow::Result<SegmentSize> {
let seg = &self.segments[seg_id];
let this_size = seg.end_lsn - seg.start_lsn;
@@ -272,10 +280,10 @@ impl<K: std::hash::Hash + Eq + 'static> Storage<K> {
for &child_id in seg.children_after.iter() {
// try each child both ways
let child = &self.segments[child_id];
let p1 = self.size_from_wal(child_id);
let p1 = self.size_from_wal(child_id)?;
let p = if !child.needed {
let p2 = self.size_from_snapshot_later(child_id);
let p2 = self.size_from_snapshot_later(child_id)?;
if p1.total() < p2.total() {
p1
} else {
@@ -286,15 +294,15 @@ impl<K: std::hash::Hash + Eq + 'static> Storage<K> {
};
children.push(p);
}
SegmentSize {
Ok(SegmentSize {
seg_id,
method: if seg.needed { WalNeeded } else { Wal },
this_size,
children,
}
})
}
fn size_from_snapshot_later(&self, seg_id: usize) -> SegmentSize {
fn size_from_snapshot_later(&self, seg_id: usize) -> anyhow::Result<SegmentSize> {
// If this is needed, then it's time to do the snapshot and continue
// with wal method.
let seg = &self.segments[seg_id];
@@ -305,10 +313,10 @@ impl<K: std::hash::Hash + Eq + 'static> Storage<K> {
for &child_id in seg.children_after.iter() {
// try each child both ways
let child = &self.segments[child_id];
let p1 = self.size_from_wal(child_id);
let p1 = self.size_from_wal(child_id)?;
let p = if !child.needed {
let p2 = self.size_from_snapshot_later(child_id);
let p2 = self.size_from_snapshot_later(child_id)?;
if p1.total() < p2.total() {
p1
} else {
@@ -319,12 +327,12 @@ impl<K: std::hash::Hash + Eq + 'static> Storage<K> {
};
children.push(p);
}
SegmentSize {
Ok(SegmentSize {
seg_id,
method: WalNeeded,
this_size: seg.start_size,
children,
}
})
} else {
// If any of the direct children are "needed", need to be able to reconstruct here
let mut children_needed = false;
@@ -339,7 +347,7 @@ impl<K: std::hash::Hash + Eq + 'static> Storage<K> {
let method1 = if !children_needed {
let mut children = Vec::new();
for child in seg.children_after.iter() {
children.push(self.size_from_snapshot_later(*child));
children.push(self.size_from_snapshot_later(*child)?);
}
Some(SegmentSize {
seg_id,
@@ -355,20 +363,25 @@ impl<K: std::hash::Hash + Eq + 'static> Storage<K> {
let method2 = if children_needed || seg.children_after.len() >= 2 {
let mut children = Vec::new();
for child in seg.children_after.iter() {
children.push(self.size_from_wal(*child));
children.push(self.size_from_wal(*child)?);
}
let Some(this_size) = seg.end_size else { anyhow::bail!("no end_size at junction {seg_id}") };
Some(SegmentSize {
seg_id,
method: SnapshotAfter,
this_size: seg.end_size.unwrap(),
this_size,
children,
})
} else {
None
};
match (method1, method2) {
(None, None) => panic!(),
Ok(match (method1, method2) {
(None, None) => anyhow::bail!(
"neither method was applicable: children_after={}, children_needed={}",
seg.children_after.len(),
children_needed
),
(Some(method), None) => method,
(None, Some(method)) => method,
(Some(method1), Some(method2)) => {
@@ -378,7 +391,7 @@ impl<K: std::hash::Hash + Eq + 'static> Storage<K> {
method2
}
}
}
})
}
}

View File

@@ -7,118 +7,118 @@
use tenant_size_model::{Segment, SegmentSize, Storage};
// Main branch only. Some updates on it.
fn scenario_1() -> (Vec<Segment>, SegmentSize) {
fn scenario_1() -> anyhow::Result<(Vec<Segment>, SegmentSize)> {
// Create main branch
let mut storage = Storage::new("main");
// Bulk load 5 GB of data to it
storage.insert("main", 5_000);
storage.insert("main", 5_000)?;
// Stream of updates
for _ in 0..5 {
storage.update("main", 1_000);
storage.update("main", 1_000)?;
}
let size = storage.calculate(1000);
let size = storage.calculate(1000)?;
(storage.into_segments(), size)
Ok((storage.into_segments(), size))
}
// Main branch only. Some updates on it.
fn scenario_2() -> (Vec<Segment>, SegmentSize) {
fn scenario_2() -> anyhow::Result<(Vec<Segment>, SegmentSize)> {
// Create main branch
let mut storage = Storage::new("main");
// Bulk load 5 GB of data to it
storage.insert("main", 5_000);
storage.insert("main", 5_000)?;
// Stream of updates
for _ in 0..5 {
storage.update("main", 1_000);
storage.update("main", 1_000)?;
}
// Branch
storage.branch("main", "child").unwrap();
storage.update("child", 1_000);
storage.branch("main", "child")?;
storage.update("child", 1_000)?;
// More updates on parent
storage.update("main", 1_000);
storage.update("main", 1_000)?;
let size = storage.calculate(1000);
let size = storage.calculate(1000)?;
(storage.into_segments(), size)
Ok((storage.into_segments(), size))
}
// Like 2, but more updates on main
fn scenario_3() -> (Vec<Segment>, SegmentSize) {
fn scenario_3() -> anyhow::Result<(Vec<Segment>, SegmentSize)> {
// Create main branch
let mut storage = Storage::new("main");
// Bulk load 5 GB of data to it
storage.insert("main", 5_000);
storage.insert("main", 5_000)?;
// Stream of updates
for _ in 0..5 {
storage.update("main", 1_000);
storage.update("main", 1_000)?;
}
// Branch
storage.branch("main", "child").unwrap();
storage.update("child", 1_000);
storage.branch("main", "child")?;
storage.update("child", 1_000)?;
// More updates on parent
for _ in 0..5 {
storage.update("main", 1_000);
storage.update("main", 1_000)?;
}
let size = storage.calculate(1000);
let size = storage.calculate(1000)?;
(storage.into_segments(), size)
Ok((storage.into_segments(), size))
}
// Diverged branches
fn scenario_4() -> (Vec<Segment>, SegmentSize) {
fn scenario_4() -> anyhow::Result<(Vec<Segment>, SegmentSize)> {
// Create main branch
let mut storage = Storage::new("main");
// Bulk load 5 GB of data to it
storage.insert("main", 5_000);
storage.insert("main", 5_000)?;
// Stream of updates
for _ in 0..5 {
storage.update("main", 1_000);
storage.update("main", 1_000)?;
}
// Branch
storage.branch("main", "child").unwrap();
storage.update("child", 1_000);
storage.branch("main", "child")?;
storage.update("child", 1_000)?;
// More updates on parent
for _ in 0..8 {
storage.update("main", 1_000);
storage.update("main", 1_000)?;
}
let size = storage.calculate(1000);
let size = storage.calculate(1000)?;
(storage.into_segments(), size)
Ok((storage.into_segments(), size))
}
fn scenario_5() -> (Vec<Segment>, SegmentSize) {
fn scenario_5() -> anyhow::Result<(Vec<Segment>, SegmentSize)> {
let mut storage = Storage::new("a");
storage.insert("a", 5000);
storage.branch("a", "b").unwrap();
storage.update("b", 4000);
storage.update("a", 2000);
storage.branch("a", "c").unwrap();
storage.insert("c", 4000);
storage.insert("a", 2000);
storage.insert("a", 5000)?;
storage.branch("a", "b")?;
storage.update("b", 4000)?;
storage.update("a", 2000)?;
storage.branch("a", "c")?;
storage.insert("c", 4000)?;
storage.insert("a", 2000)?;
let size = storage.calculate(5000);
let size = storage.calculate(5000)?;
(storage.into_segments(), size)
Ok((storage.into_segments(), size))
}
fn scenario_6() -> (Vec<Segment>, SegmentSize) {
fn scenario_6() -> anyhow::Result<(Vec<Segment>, SegmentSize)> {
use std::borrow::Cow;
const NO_OP: Cow<'static, str> = Cow::Borrowed("");
@@ -133,18 +133,18 @@ fn scenario_6() -> (Vec<Segment>, SegmentSize) {
let mut storage = Storage::new(None);
storage.branch(&None, branches[0]).unwrap(); // at 0
storage.modify_branch(&branches[0], NO_OP, 108951064, 43696128); // at 108951064
storage.branch(&branches[0], branches[1]).unwrap(); // at 108951064
storage.modify_branch(&branches[1], NO_OP, 15560408, -1851392); // at 124511472
storage.modify_branch(&branches[0], NO_OP, 174464360, -1531904); // at 283415424
storage.branch(&branches[0], branches[2]).unwrap(); // at 283415424
storage.modify_branch(&branches[2], NO_OP, 15906192, 8192); // at 299321616
storage.modify_branch(&branches[0], NO_OP, 18909976, 32768); // at 302325400
storage.branch(&None, branches[0])?; // at 0
storage.modify_branch(&branches[0], NO_OP, 108951064, 43696128)?; // at 108951064
storage.branch(&branches[0], branches[1])?; // at 108951064
storage.modify_branch(&branches[1], NO_OP, 15560408, -1851392)?; // at 124511472
storage.modify_branch(&branches[0], NO_OP, 174464360, -1531904)?; // at 283415424
storage.branch(&branches[0], branches[2])?; // at 283415424
storage.modify_branch(&branches[2], NO_OP, 15906192, 8192)?; // at 299321616
storage.modify_branch(&branches[0], NO_OP, 18909976, 32768)?; // at 302325400
let size = storage.calculate(100_000);
let size = storage.calculate(100_000)?;
(storage.into_segments(), size)
Ok((storage.into_segments(), size))
}
fn main() {
@@ -163,7 +163,8 @@ fn main() {
eprintln!("invalid scenario {}", other);
std::process::exit(1);
}
};
}
.unwrap();
graphviz_tree(&segments, &size);
}
@@ -251,7 +252,7 @@ fn graphviz_tree(segments: &[Segment], tree: &SegmentSize) {
#[test]
fn scenarios_return_same_size() {
type ScenarioFn = fn() -> (Vec<Segment>, SegmentSize);
type ScenarioFn = fn() -> anyhow::Result<(Vec<Segment>, SegmentSize)>;
let truths: &[(u32, ScenarioFn, _)] = &[
(line!(), scenario_1, 8000),
(line!(), scenario_2, 9000),
@@ -262,7 +263,7 @@ fn scenarios_return_same_size() {
];
for (line, scenario, expected) in truths {
let (_, size) = scenario();
let (_, size) = scenario().unwrap();
assert_eq!(*expected, size.total_children(), "scenario on line {line}");
}
}

View File

@@ -8,6 +8,7 @@ use strum_macros::{EnumString, EnumVariantNames};
pub enum LogFormat {
Plain,
Json,
Test,
}
impl LogFormat {
@@ -39,6 +40,7 @@ pub fn init(log_format: LogFormat) -> anyhow::Result<()> {
match log_format {
LogFormat::Json => base_logger.json().init(),
LogFormat::Plain => base_logger.init(),
LogFormat::Test => base_logger.with_test_writer().init(),
}
Ok(())

View File

@@ -59,7 +59,7 @@ pub async fn collect_metrics(
None,
None,
"synthetic size calculation",
true,
false,
async move {
calculate_synthetic_size_worker(synthetic_size_calculation_interval)
.instrument(info_span!("synthetic_size_worker"))

View File

@@ -1405,15 +1405,15 @@ fn slru_segment_key_range(kind: SlruKind, segno: u32) -> Range<Key> {
Key {
field1: 0x01,
field2,
field3: segno,
field4: 0,
field3: 1,
field4: segno,
field5: 0,
field6: 0,
}..Key {
field1: 0x01,
field2,
field3: segno,
field4: 0,
field3: 1,
field4: segno,
field5: 1,
field6: 0,
}

View File

@@ -2627,8 +2627,10 @@ where
pub mod harness {
use bytes::{Bytes, BytesMut};
use once_cell::sync::Lazy;
use once_cell::sync::OnceCell;
use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::{fs, path::PathBuf};
use utils::logging;
use utils::lsn::Lsn;
use crate::{
@@ -2692,6 +2694,8 @@ pub mod harness {
),
}
static LOG_HANDLE: OnceCell<()> = OnceCell::new();
impl<'a> TenantHarness<'a> {
pub fn create(test_name: &'static str) -> anyhow::Result<Self> {
Self::create_internal(test_name, false)
@@ -2706,6 +2710,10 @@ pub mod harness {
(Some(LOCK.read().unwrap()), None)
};
LOG_HANDLE.get_or_init(|| {
logging::init(logging::LogFormat::Test).expect("Failed to init test logging")
});
let repo_dir = PageServerConf::test_repo_dir(test_name);
let _ = fs::remove_dir_all(&repo_dir);
fs::create_dir_all(&repo_dir)?;

View File

@@ -250,15 +250,32 @@ where
L: ?Sized + Layer,
{
///
/// Find the latest layer that covers the given 'key', with lsn <
/// 'end_lsn'.
/// Find the latest layer (by lsn.end) that covers the given
/// 'key', with lsn.start < 'end_lsn'.
///
/// Returns the layer, if any, and an 'lsn_floor' value that
/// indicates which portion of the layer the caller should
/// check. 'lsn_floor' is normally the start-LSN of the layer, but
/// can be greater if there is an overlapping layer that might
/// contain the version, even if it's missing from the returned
/// layer.
/// The caller of this function is the page reconstruction
/// algorithm looking for the next relevant delta layer, or
/// the terminal image layer. The caller will pass the lsn_floor
/// value as end_lsn in the next call to search.
///
/// If there's an image layer exactly below the given end_lsn,
/// search should return that layer regardless if there are
/// overlapping deltas.
///
/// If the latest layer is a delta and there is an overlapping
/// image with it below, the lsn_floor returned should be right
/// above that image so we don't skip it in the search. Otherwise
/// the lsn_floor returned should be the bottom of the delta layer
/// because we should make as much progress down the lsn axis
/// as possible. It's fine if this way we skip some overlapping
/// deltas, because the delta we returned would contain the same
/// wal content.
///
/// TODO: This API is convoluted and inefficient. If the caller
/// makes N search calls, we'll end up finding the same latest
/// image layer N times. We should either cache the latest image
/// layer result, or simplify the api to `get_latest_image` and
/// `get_latest_delta`, and only call `get_latest_image` once.
///
/// NOTE: This only searches the 'historic' layers, *not* the
/// 'open' and 'frozen' layers!
@@ -401,7 +418,9 @@ where
NUM_ONDISK_LAYERS.dec();
}
/// Is there a newer image layer for given key- and LSN-range?
/// Is there a newer image layer for given key- and LSN-range? Or a set
/// of image layers within the specified lsn range that cover the entire
/// specified key range?
///
/// This is used for garbage collection, to determine if an old layer can
/// be deleted.
@@ -488,8 +507,8 @@ where
///
/// Divide the whole given range of keys into sub-ranges based on the latest
/// image layer that covers each range. (This is used when creating new
/// image layers)
/// image layer that covers each range at the specified lsn (inclusive).
/// This is used when creating new image layers.
///
// FIXME: clippy complains that the result type is very complex. She's probably
// right...
@@ -541,8 +560,15 @@ where
Ok(ranges)
}
/// Count how many L1 delta layers there are that overlap with the
/// given key and LSN range.
/// Count the height of the tallest stack of deltas in this 2d region.
///
/// This number is used to compute the largest number of deltas that
/// we'll need to visit for any page reconstruction in this region.
/// We use this heuristic to decide whether to create an image layer.
///
/// TODO currently we just return the total number of deltas in the
/// region, no matter if they're stacked on top of each other
/// or next to each other.
pub fn count_deltas(&self, key_range: &Range<Key>, lsn_range: &Range<Lsn>) -> Result<usize> {
let mut result = 0;
if lsn_range.start >= lsn_range.end {

View File

@@ -23,7 +23,13 @@ use tracing::*;
pub struct ModelInputs {
updates: Vec<Update>,
retention_period: u64,
/// Relevant lsns per timeline.
///
/// This field is not required for deserialization purposes, which is mostly used in tests. The
/// LSNs explain the outcome (updates) but are not needed in size calculation.
#[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
#[serde(default)]
timeline_inputs: HashMap<TimelineId, TimelineInputs>,
}
@@ -32,6 +38,8 @@ pub struct ModelInputs {
#[serde_with::serde_as]
#[derive(Debug, serde::Serialize, serde::Deserialize)]
struct TimelineInputs {
#[serde_as(as = "serde_with::DisplayFromStr")]
ancestor_lsn: Lsn,
#[serde_as(as = "serde_with::DisplayFromStr")]
last_record: Lsn,
#[serde_as(as = "serde_with::DisplayFromStr")]
@@ -178,19 +186,20 @@ pub(super) async fn gather_inputs(
// our advantage with `?` error handling.
let mut joinset = tokio::task::JoinSet::new();
let timelines = tenant
// refresh is needed to update gc related pitr_cutoff and horizon_cutoff
tenant
.refresh_gc_info()
.await
.context("Failed to refresh gc_info before gathering inputs")?;
let timelines = tenant.list_timelines();
if timelines.is_empty() {
// All timelines are below tenant's gc_horizon; alternative would be to use
// Tenant::list_timelines but then those gc_info's would not be updated yet, possibly
// missing GcInfo::retain_lsns or having obsolete values for cutoff's.
// perhaps the tenant has just been created, and as such doesn't have any data yet
return Ok(ModelInputs {
updates: vec![],
retention_period: 0,
timeline_inputs: HashMap::new(),
timeline_inputs: HashMap::default(),
});
}
@@ -201,13 +210,25 @@ pub(super) async fn gather_inputs(
let mut updates = Vec::new();
// record the per timline values used to determine `retention_period`
// record the per timeline values useful to debug the model inputs, also used to track
// ancestor_lsn without keeping a hold of Timeline
let mut timeline_inputs = HashMap::with_capacity(timelines.len());
// used to determine the `retention_period` for the size model
let mut max_cutoff_distance = None;
// mapping from (TimelineId, Lsn) => if this branch point has been handled already via
// GcInfo::retain_lsns or if it needs to have its logical_size calculated.
let mut referenced_branch_froms = HashMap::<(TimelineId, Lsn), bool>::new();
for timeline in timelines {
if !timeline.is_active() {
anyhow::bail!(
"timeline {} is not active, cannot calculate tenant_size now",
timeline.timeline_id
);
}
let last_record_lsn = timeline.get_last_record_lsn();
let (interesting_lsns, horizon_cutoff, pitr_cutoff, next_gc_cutoff) = {
@@ -273,13 +294,30 @@ pub(super) async fn gather_inputs(
// all timelines branch from something, because it might be impossible to pinpoint
// which is the tenant_size_model's "default" branch.
let ancestor_lsn = timeline.get_ancestor_lsn();
updates.push(Update {
lsn: timeline.get_ancestor_lsn(),
lsn: ancestor_lsn,
command: Command::BranchFrom(timeline.get_ancestor_timeline_id()),
timeline_id: timeline.timeline_id,
});
if let Some(parent_timeline_id) = timeline.get_ancestor_timeline_id() {
// refresh_gc_info will update branchpoints and pitr_cutoff but only do it for branches
// which are over gc_horizon. for example, a "main" branch which never received any
// updates apart from initdb not have branch points recorded.
referenced_branch_froms
.entry((parent_timeline_id, timeline.get_ancestor_lsn()))
.or_default();
}
for (lsn, _kind) in &interesting_lsns {
// mark this visited so don't need to re-process this parent
*referenced_branch_froms
.entry((timeline.timeline_id, *lsn))
.or_default() = true;
if let Some(size) = logical_size_cache.get(&(timeline.timeline_id, *lsn)) {
updates.push(Update {
lsn: *lsn,
@@ -295,22 +333,10 @@ pub(super) async fn gather_inputs(
}
}
// all timelines also have an end point if they have made any progress
if last_record_lsn > timeline.get_ancestor_lsn()
&& !interesting_lsns
.iter()
.any(|(lsn, _)| lsn == &last_record_lsn)
{
updates.push(Update {
lsn: last_record_lsn,
command: Command::EndOfBranch,
timeline_id: timeline.timeline_id,
});
}
timeline_inputs.insert(
timeline.timeline_id,
TimelineInputs {
ancestor_lsn,
last_record: last_record_lsn,
// this is not used above, because it might not have updated recently enough
latest_gc_cutoff: *timeline.get_latest_gc_cutoff_lsn(),
@@ -321,6 +347,80 @@ pub(super) async fn gather_inputs(
);
}
// iterate over discovered branch points and make sure we are getting logical sizes at those
// points.
for ((timeline_id, lsn), handled) in referenced_branch_froms.iter() {
if *handled {
continue;
}
let timeline_id = *timeline_id;
let lsn = *lsn;
match timeline_inputs.get(&timeline_id) {
Some(inputs) if inputs.ancestor_lsn == lsn => {
// we don't need an update at this branch point which is also point where
// timeline_id branch was branched from.
continue;
}
Some(_) => {}
None => {
// we should have this because we have iterated through all of the timelines
anyhow::bail!("missing timeline_input for {timeline_id}")
}
}
if let Some(size) = logical_size_cache.get(&(timeline_id, lsn)) {
updates.push(Update {
lsn,
timeline_id,
command: Command::Update(*size),
});
needed_cache.insert((timeline_id, lsn));
} else {
let timeline = tenant
.get_timeline(timeline_id, false)
.context("find referenced ancestor timeline")?;
let parallel_size_calcs = Arc::clone(limit);
joinset.spawn(calculate_logical_size(
parallel_size_calcs,
timeline.clone(),
lsn,
));
if let Some(parent_id) = timeline.get_ancestor_timeline_id() {
// we should not find new ones because we iterated tenants all timelines
anyhow::ensure!(
timeline_inputs.contains_key(&parent_id),
"discovered new timeline {parent_id} (parent of {timeline_id})"
);
}
};
}
// finally add in EndOfBranch for all timelines where their last_record_lsn is not a branch
// point. this is needed by the model.
for (timeline_id, inputs) in timeline_inputs.iter() {
let lsn = inputs.last_record;
if referenced_branch_froms.contains_key(&(*timeline_id, lsn)) {
// this means that the (timeline_id, last_record_lsn) represents a branch point
// we do not want to add EndOfBranch updates for these points because it doesn't fit
// into the current tenant_size_model.
continue;
}
if lsn > inputs.ancestor_lsn {
// all timelines also have an end point if they have made any progress
updates.push(Update {
lsn,
command: Command::EndOfBranch,
timeline_id: *timeline_id,
});
}
}
let mut have_any_error = false;
while let Some(res) = joinset.join_next().await {
@@ -379,6 +479,7 @@ pub(super) async fn gather_inputs(
// handled by the variant order in `Command`.
//
updates.sort_unstable();
// And another sort to handle Command::BranchFrom ordering
// in case when there are multiple branches at the same LSN.
let sorted_updates = sort_updates_in_tree_order(updates)?;
@@ -413,10 +514,10 @@ impl ModelInputs {
let Lsn(now) = *lsn;
match op {
Command::Update(sz) => {
storage.insert_point(&Some(*timeline_id), "".into(), now, Some(*sz));
storage.insert_point(&Some(*timeline_id), "".into(), now, Some(*sz))?;
}
Command::EndOfBranch => {
storage.insert_point(&Some(*timeline_id), "".into(), now, None);
storage.insert_point(&Some(*timeline_id), "".into(), now, None)?;
}
Command::BranchFrom(parent) => {
// This branch command may fail if it cannot find a parent to branch from.
@@ -425,7 +526,7 @@ impl ModelInputs {
}
}
Ok(storage.calculate(self.retention_period).total_children())
Ok(storage.calculate(self.retention_period)?.total_children())
}
}
@@ -574,7 +675,10 @@ fn updates_sort() {
fn verify_size_for_multiple_branches() {
// this is generated from integration test test_tenant_size_with_multiple_branches, but this way
// it has the stable lsn's
let doc = r#"{"updates":[{"lsn":"0/0","command":{"branch_from":null},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/176FA40","command":{"update":25763840},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/176FA40","command":{"branch_from":"cd9d9409c216e64bf580904facedb01b"},"timeline_id":"10b532a550540bc15385eac4edde416a"},{"lsn":"0/1819818","command":{"update":26075136},"timeline_id":"10b532a550540bc15385eac4edde416a"},{"lsn":"0/18B5E40","command":{"update":26427392},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/18D3DF0","command":{"update":26492928},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/18D3DF0","command":{"branch_from":"cd9d9409c216e64bf580904facedb01b"},"timeline_id":"230fc9d756f7363574c0d66533564dcc"},{"lsn":"0/220F438","command":{"update":25239552},"timeline_id":"230fc9d756f7363574c0d66533564dcc"}],"retention_period":131072,"timeline_inputs":{"cd9d9409c216e64bf580904facedb01b":{"last_record":"0/18D5E40","latest_gc_cutoff":"0/169ACF0","horizon_cutoff":"0/18B5E40","pitr_cutoff":"0/18B5E40","next_gc_cutoff":"0/18B5E40"},"10b532a550540bc15385eac4edde416a":{"last_record":"0/1839818","latest_gc_cutoff":"0/169ACF0","horizon_cutoff":"0/1819818","pitr_cutoff":"0/1819818","next_gc_cutoff":"0/1819818"},"230fc9d756f7363574c0d66533564dcc":{"last_record":"0/222F438","latest_gc_cutoff":"0/169ACF0","horizon_cutoff":"0/220F438","pitr_cutoff":"0/220F438","next_gc_cutoff":"0/220F438"}}}"#;
//
// timelineinputs have been left out, because those explain the inputs, but don't participate
// in further size calculations.
let doc = r#"{"updates":[{"lsn":"0/0","command":{"branch_from":null},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/176FA40","command":{"update":25763840},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/176FA40","command":{"branch_from":"cd9d9409c216e64bf580904facedb01b"},"timeline_id":"10b532a550540bc15385eac4edde416a"},{"lsn":"0/1819818","command":{"update":26075136},"timeline_id":"10b532a550540bc15385eac4edde416a"},{"lsn":"0/18B5E40","command":{"update":26427392},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/18D3DF0","command":{"update":26492928},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/18D3DF0","command":{"branch_from":"cd9d9409c216e64bf580904facedb01b"},"timeline_id":"230fc9d756f7363574c0d66533564dcc"},{"lsn":"0/220F438","command":{"update":25239552},"timeline_id":"230fc9d756f7363574c0d66533564dcc"}],"retention_period":131072}"#;
let inputs: ModelInputs = serde_json::from_str(doc).unwrap();

View File

@@ -626,24 +626,20 @@ impl PostgresRedoProcess {
// Create empty data directory for wal-redo postgres, deleting old one first.
if datadir.exists() {
info!(
"old temporary datadir {} exists, removing",
datadir.display()
);
fs::remove_dir_all(&datadir)?;
info!("old temporary datadir {datadir:?} exists, removing");
fs::remove_dir_all(&datadir).map_err(|e| {
Error::new(
e.kind(),
format!("Old temporary dir {datadir:?} removal failure: {e}"),
)
})?;
}
let pg_bin_dir_path = conf.pg_bin_dir(pg_version).map_err(|e| {
Error::new(
ErrorKind::Other,
format!("incorrect pg_bin_dir path: {}", e),
)
})?;
let pg_lib_dir_path = conf.pg_lib_dir(pg_version).map_err(|e| {
Error::new(
ErrorKind::Other,
format!("incorrect pg_lib_dir path: {}", e),
)
})?;
let pg_bin_dir_path = conf
.pg_bin_dir(pg_version)
.map_err(|e| Error::new(ErrorKind::Other, format!("incorrect pg_bin_dir path: {e}")))?;
let pg_lib_dir_path = conf
.pg_lib_dir(pg_version)
.map_err(|e| Error::new(ErrorKind::Other, format!("incorrect pg_lib_dir path: {e}")))?;
info!("running initdb in {}", datadir.display());
let initdb = Command::new(pg_bin_dir_path.join("initdb"))

View File

@@ -1206,6 +1206,9 @@ class PageserverHttpClient(requests.Session):
return res_json
def tenant_size(self, tenant_id: TenantId) -> int:
return self.tenant_size_and_modelinputs(tenant_id)[0]
def tenant_size_and_modelinputs(self, tenant_id: TenantId) -> Tuple[int, Dict[str, Any]]:
"""
Returns the tenant size, together with the model inputs as the second tuple item.
"""
@@ -1216,9 +1219,9 @@ class PageserverHttpClient(requests.Session):
assert TenantId(res["id"]) == tenant_id
size = res["size"]
assert type(size) == int
# there are additional inputs, which are the collected raw information before being fed to the tenant_size_model
# there are no tests for those right now.
return size
inputs = res["inputs"]
assert type(inputs) is dict
return (size, inputs)
def timeline_list(
self,

View File

@@ -194,7 +194,7 @@ def test_sync_safekeepers_logs(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
)
except TimeoutExpired as exc:
ctl_logs = (exc.stderr or b"").decode("utf-8")
log.info("compute_ctl output:\n{ctl_logs}")
log.info(f"compute_ctl stderr:\n{ctl_logs}")
with ExternalProcessManager(Path(pgdata) / "postmaster.pid"):
start = "starting safekeepers syncing"

View File

@@ -1,5 +1,6 @@
from typing import List, Tuple
from typing import Any, List, Tuple
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, wait_for_last_flush_lsn
from fixtures.types import Lsn
@@ -9,28 +10,247 @@ def test_empty_tenant_size(neon_simple_env: NeonEnv):
env = neon_simple_env
(tenant_id, _) = env.neon_cli.create_tenant()
http_client = env.pageserver.http_client()
size = http_client.tenant_size(tenant_id)
initial_size = http_client.tenant_size(tenant_id)
# we should never have zero, because there should be the initdb however
# this is questionable if we should have anything in this case, as the
# gc_cutoff is negative
assert (
size == 0
), "initial implementation returns zero tenant_size before last_record_lsn is past gc_horizon"
# we should never have zero, because there should be the initdb "changes"
assert initial_size > 0, "initial implementation returns ~initdb tenant_size"
with env.postgres.create_start("main", tenant_id=tenant_id) as pg:
main_branch_name = "main"
with env.postgres.create_start(
main_branch_name,
tenant_id=tenant_id,
config_lines=["autovacuum=off", "checkpoint_timeout=10min"],
) as pg:
with pg.cursor() as cur:
cur.execute("SELECT 1")
row = cur.fetchone()
assert row is not None
assert row[0] == 1
size = http_client.tenant_size(tenant_id)
assert size == 0, "starting idle compute should not change the tenant size"
# we've disabled the autovacuum and checkpoint
# so background processes should not change the size.
# If this test will flake we should probably loosen the check
assert size == initial_size, "starting idle compute should not change the tenant size"
# the size should be the same, until we increase the size over the
# gc_horizon
size = http_client.tenant_size(tenant_id)
assert size == 0, "tenant_size should not be affected by shutdown of compute"
size, inputs = http_client.tenant_size_and_modelinputs(tenant_id)
assert size == initial_size, "tenant_size should not be affected by shutdown of compute"
expected_commands: List[Any] = [{"branch_from": None}, "end_of_branch"]
actual_commands: List[Any] = list(map(lambda x: x["command"], inputs["updates"])) # type: ignore
assert actual_commands == expected_commands
def test_branched_empty_timeline_size(neon_simple_env: NeonEnv):
"""
Issue found in production. Because the ancestor branch was under
gc_horizon, the branchpoint was "dangling" and the computation could not be
done.
Assuming gc_horizon = 50
root: I 0---10------>20
branch: |-------------------I---------->150
gc_horizon
"""
env = neon_simple_env
(tenant_id, _) = env.neon_cli.create_tenant()
http_client = env.pageserver.http_client()
initial_size = http_client.tenant_size(tenant_id)
first_branch_timeline_id = env.neon_cli.create_branch("first-branch", tenant_id=tenant_id)
with env.postgres.create_start("first-branch", tenant_id=tenant_id) as pg:
with pg.cursor() as cur:
cur.execute(
"CREATE TABLE t0 AS SELECT i::bigint n FROM generate_series(0, 1000000) s(i)"
)
wait_for_last_flush_lsn(env, pg, tenant_id, first_branch_timeline_id)
size_after_branching = http_client.tenant_size(tenant_id)
log.info(f"size_after_branching: {size_after_branching}")
assert size_after_branching > initial_size
def test_branched_from_many_empty_parents_size(neon_simple_env: NeonEnv):
"""
More general version of test_branched_empty_timeline_size
Assuming gc_horizon = 50
root: I 0------10
first: I 10
nth_0: I 10
nth_1: I 10
nth_n: 10------------I--------100
"""
env = neon_simple_env
(tenant_id, _) = env.neon_cli.create_tenant()
http_client = env.pageserver.http_client()
initial_size = http_client.tenant_size(tenant_id)
first_branch_name = "first"
env.neon_cli.create_branch(first_branch_name, tenant_id=tenant_id)
size_after_branching = http_client.tenant_size(tenant_id)
# this might be flaky like test_get_tenant_size_with_multiple_branches
# https://github.com/neondatabase/neon/issues/2962
assert size_after_branching == initial_size
last_branch_name = first_branch_name
last_branch = None
for i in range(0, 4):
latest_branch_name = f"nth_{i}"
last_branch = env.neon_cli.create_branch(
latest_branch_name, ancestor_branch_name=last_branch_name, tenant_id=tenant_id
)
last_branch_name = latest_branch_name
size_after_branching = http_client.tenant_size(tenant_id)
assert size_after_branching == initial_size
assert last_branch is not None
with env.postgres.create_start(last_branch_name, tenant_id=tenant_id) as pg:
with pg.cursor() as cur:
cur.execute(
"CREATE TABLE t0 AS SELECT i::bigint n FROM generate_series(0, 1000000) s(i)"
)
wait_for_last_flush_lsn(env, pg, tenant_id, last_branch)
size_after_writes = http_client.tenant_size(tenant_id)
assert size_after_writes > initial_size
@pytest.mark.skip("This should work, but is left out because assumed covered by other tests")
def test_branch_point_within_horizon(neon_simple_env: NeonEnv):
"""
gc_horizon = 15
main: 0--I-10------>20
branch: |-------------------I---------->150
gc_horizon
"""
env = neon_simple_env
gc_horizon = 20_000
(tenant_id, main_id) = env.neon_cli.create_tenant(conf={"gc_horizon": str(gc_horizon)})
http_client = env.pageserver.http_client()
with env.postgres.create_start("main", tenant_id=tenant_id) as pg:
initdb_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, main_id)
with pg.cursor() as cur:
cur.execute("CREATE TABLE t0 AS SELECT i::bigint n FROM generate_series(0, 1000) s(i)")
flushed_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, main_id)
size_before_branching = http_client.tenant_size(tenant_id)
assert flushed_lsn.lsn_int - gc_horizon > initdb_lsn.lsn_int
branch_id = env.neon_cli.create_branch(
"branch", tenant_id=tenant_id, ancestor_start_lsn=flushed_lsn
)
with env.postgres.create_start("branch", tenant_id=tenant_id) as pg:
with pg.cursor() as cur:
cur.execute("CREATE TABLE t1 AS SELECT i::bigint n FROM generate_series(0, 1000) s(i)")
wait_for_last_flush_lsn(env, pg, tenant_id, branch_id)
size_after = http_client.tenant_size(tenant_id)
assert size_before_branching < size_after
@pytest.mark.skip("This should work, but is left out because assumed covered by other tests")
def test_parent_within_horizon(neon_simple_env: NeonEnv):
"""
gc_horizon = 5
main: 0----10----I->20
branch: |-------------------I---------->150
gc_horizon
"""
env = neon_simple_env
gc_horizon = 200_000
(tenant_id, main_id) = env.neon_cli.create_tenant(conf={"gc_horizon": str(gc_horizon)})
http_client = env.pageserver.http_client()
with env.postgres.create_start("main", tenant_id=tenant_id) as pg:
initdb_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, main_id)
with pg.cursor() as cur:
cur.execute("CREATE TABLE t0 AS SELECT i::bigint n FROM generate_series(0, 1000) s(i)")
flushed_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, main_id)
with pg.cursor() as cur:
cur.execute("CREATE TABLE t00 AS SELECT i::bigint n FROM generate_series(0, 2000) s(i)")
wait_for_last_flush_lsn(env, pg, tenant_id, main_id)
size_before_branching = http_client.tenant_size(tenant_id)
assert flushed_lsn.lsn_int - gc_horizon > initdb_lsn.lsn_int
branch_id = env.neon_cli.create_branch(
"branch", tenant_id=tenant_id, ancestor_start_lsn=flushed_lsn
)
with env.postgres.create_start("branch", tenant_id=tenant_id) as pg:
with pg.cursor() as cur:
cur.execute("CREATE TABLE t1 AS SELECT i::bigint n FROM generate_series(0, 10000) s(i)")
wait_for_last_flush_lsn(env, pg, tenant_id, branch_id)
size_after = http_client.tenant_size(tenant_id)
assert size_before_branching < size_after
@pytest.mark.skip("This should work, but is left out because assumed covered by other tests")
def test_only_heads_within_horizon(neon_simple_env: NeonEnv):
"""
gc_horizon = small
main: 0--------10-----I>20
first: |-----------------------------I>150
second: |---------I>30
"""
env = neon_simple_env
(tenant_id, main_id) = env.neon_cli.create_tenant(conf={"gc_horizon": "1024"})
http_client = env.pageserver.http_client()
initial_size = http_client.tenant_size(tenant_id)
first_id = env.neon_cli.create_branch("first", tenant_id=tenant_id)
second_id = env.neon_cli.create_branch("second", tenant_id=tenant_id)
ids = {"main": main_id, "first": first_id, "second": second_id}
latest_size = None
# gc is not expected to change the results
for branch_name, amount in [("main", 2000), ("first", 15000), ("second", 3000)]:
with env.postgres.create_start(branch_name, tenant_id=tenant_id) as pg:
with pg.cursor() as cur:
cur.execute(
f"CREATE TABLE t0 AS SELECT i::bigint n FROM generate_series(0, {amount}) s(i)"
)
wait_for_last_flush_lsn(env, pg, tenant_id, ids[branch_name])
size_now = http_client.tenant_size(tenant_id)
if latest_size is not None:
assert size_now > latest_size
else:
assert size_now > initial_size
latest_size = size_now
def test_single_branch_get_tenant_size_grows(neon_env_builder: NeonEnvBuilder):

View File

@@ -26,7 +26,7 @@ futures-util = { version = "0.3", features = ["channel", "io", "sink"] }
indexmap = { version = "1", default-features = false, features = ["std"] }
itertools = { version = "0.10" }
libc = { version = "0.2", features = ["extra_traits"] }
log = { version = "0.4", default-features = false, features = ["serde", "std"] }
log = { version = "0.4", default-features = false, features = ["std"] }
memchr = { version = "2" }
nom = { version = "7" }
num-bigint = { version = "0.4" }
@@ -45,6 +45,7 @@ tokio-util = { version = "0.7", features = ["codec", "io"] }
tower = { version = "0.4", features = ["balance", "buffer", "limit", "retry", "timeout", "util"] }
tracing = { version = "0.1", features = ["log"] }
tracing-core = { version = "0.1" }
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
url = { version = "2", features = ["serde"] }
[build-dependencies]
@@ -54,7 +55,7 @@ either = { version = "1" }
indexmap = { version = "1", default-features = false, features = ["std"] }
itertools = { version = "0.10" }
libc = { version = "0.2", features = ["extra_traits"] }
log = { version = "0.4", default-features = false, features = ["serde", "std"] }
log = { version = "0.4", default-features = false, features = ["std"] }
memchr = { version = "2" }
nom = { version = "7" }
prost = { version = "0.11" }