mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-10 22:20:38 +00:00
Compare commits
72 Commits
bodobolero
...
issue_3387
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
91b6ac2043 | ||
|
|
8bd70a3d30 | ||
|
|
eb2b8ab3b4 | ||
|
|
5bdf6ef378 | ||
|
|
c8367b1ea5 | ||
|
|
3c6f779698 | ||
|
|
f67f0c1c11 | ||
|
|
edb02d3299 | ||
|
|
664a69e65b | ||
|
|
478322ebf9 | ||
|
|
802f174072 | ||
|
|
47f9890bae | ||
|
|
262265daad | ||
|
|
300da5b872 | ||
|
|
7b22b5c433 | ||
|
|
ffca97bc1e | ||
|
|
cb356f3259 | ||
|
|
c85374295f | ||
|
|
4992160677 | ||
|
|
bd535b3371 | ||
|
|
d90c5a03af | ||
|
|
2d02cc9079 | ||
|
|
49ad94b99f | ||
|
|
948a217398 | ||
|
|
125381eae7 | ||
|
|
cd01bbc715 | ||
|
|
d8b5e3b88d | ||
|
|
06d25f2186 | ||
|
|
f759b561f3 | ||
|
|
ece0555600 | ||
|
|
73ea0a0b01 | ||
|
|
d8f6d6fd6f | ||
|
|
d24de169a7 | ||
|
|
0816168296 | ||
|
|
277b44d57a | ||
|
|
68c2c3880e | ||
|
|
49da498f65 | ||
|
|
2c76ba3dd7 | ||
|
|
dbe3dc69ad | ||
|
|
8e5bb3ed49 | ||
|
|
ab0be7b8da | ||
|
|
b4c55f5d24 | ||
|
|
ede70d833c | ||
|
|
70c3d18bb0 | ||
|
|
7a491f52c4 | ||
|
|
323c4ecb4f | ||
|
|
3d2466607e | ||
|
|
ed478b39f4 | ||
|
|
91585a558d | ||
|
|
93467eae1f | ||
|
|
f3aac81d19 | ||
|
|
979ad60c19 | ||
|
|
9316cb1b1f | ||
|
|
e7939a527a | ||
|
|
36d26665e1 | ||
|
|
873347f977 | ||
|
|
e814ac16f9 | ||
|
|
ad3055d386 | ||
|
|
94e03eb452 | ||
|
|
380f26ef79 | ||
|
|
3c5b7f59d7 | ||
|
|
fee89f80b5 | ||
|
|
41cce8eaf1 | ||
|
|
f88fe0218d | ||
|
|
cc856eca85 | ||
|
|
cf350c6002 | ||
|
|
0ce6b6a0a3 | ||
|
|
73f247d537 | ||
|
|
960be82183 | ||
|
|
806e5a6c19 | ||
|
|
8d5df07cce | ||
|
|
df7a9d1407 |
2
.github/ansible/production.hosts.yaml
vendored
2
.github/ansible/production.hosts.yaml
vendored
@@ -7,7 +7,7 @@ storage:
|
||||
broker_endpoint: http://storage-broker.prod.local:50051
|
||||
pageserver_config_stub:
|
||||
pg_distrib_dir: /usr/local
|
||||
metric_collection_endpoint: http://console-staging.local/billing/api/v1/usage_events
|
||||
metric_collection_endpoint: http://console-release.local/billing/api/v1/usage_events
|
||||
metric_collection_interval: 10min
|
||||
remote_storage:
|
||||
bucket_name: "{{ bucket_name }}"
|
||||
|
||||
2
.github/ansible/staging.eu-west-1.hosts.yaml
vendored
2
.github/ansible/staging.eu-west-1.hosts.yaml
vendored
@@ -18,7 +18,7 @@ storage:
|
||||
ansible_aws_ssm_region: eu-west-1
|
||||
ansible_aws_ssm_bucket_name: neon-dev-storage-eu-west-1
|
||||
console_region_id: aws-eu-west-1
|
||||
sentry_environment: development
|
||||
sentry_environment: staging
|
||||
|
||||
children:
|
||||
pageservers:
|
||||
|
||||
4
.github/ansible/staging.us-east-2.hosts.yaml
vendored
4
.github/ansible/staging.us-east-2.hosts.yaml
vendored
@@ -18,7 +18,7 @@ storage:
|
||||
ansible_aws_ssm_region: us-east-2
|
||||
ansible_aws_ssm_bucket_name: neon-staging-storage-us-east-2
|
||||
console_region_id: aws-us-east-2
|
||||
sentry_environment: development
|
||||
sentry_environment: staging
|
||||
|
||||
children:
|
||||
pageservers:
|
||||
@@ -29,6 +29,8 @@ storage:
|
||||
ansible_host: i-0565a8b4008aa3f40
|
||||
pageserver-2.us-east-2.aws.neon.build:
|
||||
ansible_host: i-01e31cdf7e970586a
|
||||
pageserver-3.us-east-2.aws.neon.build:
|
||||
ansible_host: i-0602a0291365ef7cc
|
||||
|
||||
safekeepers:
|
||||
hosts:
|
||||
|
||||
@@ -8,7 +8,7 @@ settings:
|
||||
authBackend: "console"
|
||||
authEndpoint: "http://console-staging.local/management/api/v2"
|
||||
domain: "*.eu-west-1.aws.neon.build"
|
||||
sentryEnvironment: "development"
|
||||
sentryEnvironment: "staging"
|
||||
wssPort: 8443
|
||||
metricCollectionEndpoint: "http://console-staging.local/billing/api/v1/usage_events"
|
||||
metricCollectionInterval: "1min"
|
||||
|
||||
@@ -49,4 +49,4 @@ extraManifests:
|
||||
- "{{ .Release.Namespace }}"
|
||||
|
||||
settings:
|
||||
sentryEnvironment: "development"
|
||||
sentryEnvironment: "staging"
|
||||
|
||||
@@ -8,7 +8,7 @@ settings:
|
||||
authBackend: "link"
|
||||
authEndpoint: "https://console.stage.neon.tech/authenticate_proxy_request/"
|
||||
uri: "https://console.stage.neon.tech/psql_session/"
|
||||
sentryEnvironment: "development"
|
||||
sentryEnvironment: "staging"
|
||||
metricCollectionEndpoint: "http://console-staging.local/billing/api/v1/usage_events"
|
||||
metricCollectionInterval: "1min"
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@ settings:
|
||||
authBackend: "console"
|
||||
authEndpoint: "http://console-staging.local/management/api/v2"
|
||||
domain: "*.cloud.stage.neon.tech"
|
||||
sentryEnvironment: "development"
|
||||
sentryEnvironment: "staging"
|
||||
wssPort: 8443
|
||||
metricCollectionEndpoint: "http://console-staging.local/billing/api/v1/usage_events"
|
||||
metricCollectionInterval: "1min"
|
||||
|
||||
@@ -8,7 +8,7 @@ settings:
|
||||
authBackend: "console"
|
||||
authEndpoint: "http://console-staging.local/management/api/v2"
|
||||
domain: "*.us-east-2.aws.neon.build"
|
||||
sentryEnvironment: "development"
|
||||
sentryEnvironment: "staging"
|
||||
wssPort: 8443
|
||||
metricCollectionEndpoint: "http://console-staging.local/billing/api/v1/usage_events"
|
||||
metricCollectionInterval: "1min"
|
||||
|
||||
@@ -49,4 +49,4 @@ extraManifests:
|
||||
- "{{ .Release.Namespace }}"
|
||||
|
||||
settings:
|
||||
sentryEnvironment: "development"
|
||||
sentryEnvironment: "staging"
|
||||
|
||||
6
Cargo.lock
generated
6
Cargo.lock
generated
@@ -833,10 +833,8 @@ dependencies = [
|
||||
"anyhow",
|
||||
"chrono",
|
||||
"clap 4.0.32",
|
||||
"env_logger",
|
||||
"futures",
|
||||
"hyper",
|
||||
"log",
|
||||
"notify",
|
||||
"postgres",
|
||||
"regex",
|
||||
@@ -845,6 +843,8 @@ dependencies = [
|
||||
"tar",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"url",
|
||||
"workspace_hack",
|
||||
]
|
||||
@@ -1954,7 +1954,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -4565,6 +4564,7 @@ dependencies = [
|
||||
"tower",
|
||||
"tracing",
|
||||
"tracing-core",
|
||||
"tracing-subscriber",
|
||||
"url",
|
||||
]
|
||||
|
||||
|
||||
@@ -8,10 +8,8 @@ license.workspace = true
|
||||
anyhow.workspace = true
|
||||
chrono.workspace = true
|
||||
clap.workspace = true
|
||||
env_logger.workspace = true
|
||||
futures.workspace = true
|
||||
hyper = { workspace = true, features = ["full"] }
|
||||
log = { workspace = true, features = ["std", "serde"] }
|
||||
notify.workspace = true
|
||||
postgres.workspace = true
|
||||
regex.workspace = true
|
||||
@@ -20,6 +18,8 @@ serde_json.workspace = true
|
||||
tar.workspace = true
|
||||
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
|
||||
tokio-postgres.workspace = true
|
||||
tracing.workspace = true
|
||||
tracing-subscriber.workspace = true
|
||||
url.workspace = true
|
||||
|
||||
workspace_hack.workspace = true
|
||||
|
||||
@@ -40,7 +40,7 @@ use std::{thread, time::Duration};
|
||||
use anyhow::{Context, Result};
|
||||
use chrono::Utc;
|
||||
use clap::Arg;
|
||||
use log::{error, info};
|
||||
use tracing::{error, info};
|
||||
|
||||
use compute_tools::compute::{ComputeMetrics, ComputeNode, ComputeState, ComputeStatus};
|
||||
use compute_tools::http::api::launch_http_server;
|
||||
@@ -53,7 +53,6 @@ use compute_tools::spec::*;
|
||||
use url::Url;
|
||||
|
||||
fn main() -> Result<()> {
|
||||
// TODO: re-use `utils::logging` later
|
||||
init_logger(DEFAULT_LOG_LEVEL)?;
|
||||
|
||||
let matches = cli().get_matches();
|
||||
@@ -122,29 +121,45 @@ fn main() -> Result<()> {
|
||||
// Also spawn the thread responsible for handling the VM informant -- if it's present
|
||||
let _vm_informant_handle = spawn_vm_informant_if_present().expect("cannot launch VM informant");
|
||||
|
||||
// Run compute (Postgres) and hang waiting on it.
|
||||
match compute.prepare_and_run() {
|
||||
Ok(ec) => {
|
||||
let code = ec.code().unwrap_or(1);
|
||||
info!("Postgres exited with code {}, shutting down", code);
|
||||
exit(code)
|
||||
}
|
||||
Err(error) => {
|
||||
error!("could not start the compute node: {:?}", error);
|
||||
|
||||
// Start Postgres
|
||||
let mut delay_exit = false;
|
||||
let mut exit_code = None;
|
||||
let pg = match compute.start_compute() {
|
||||
Ok(pg) => Some(pg),
|
||||
Err(err) => {
|
||||
error!("could not start the compute node: {:?}", err);
|
||||
let mut state = compute.state.write().unwrap();
|
||||
state.error = Some(format!("{:?}", error));
|
||||
state.error = Some(format!("{:?}", err));
|
||||
state.status = ComputeStatus::Failed;
|
||||
drop(state);
|
||||
|
||||
// Keep serving HTTP requests, so the cloud control plane was able to
|
||||
// get the actual error.
|
||||
info!("giving control plane 30s to collect the error before shutdown");
|
||||
thread::sleep(Duration::from_secs(30));
|
||||
info!("shutting down");
|
||||
Err(error)
|
||||
delay_exit = true;
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
// Wait for the child Postgres process forever. In this state Ctrl+C will
|
||||
// propagate to Postgres and it will be shut down as well.
|
||||
if let Some(mut pg) = pg {
|
||||
let ecode = pg
|
||||
.wait()
|
||||
.expect("failed to start waiting on Postgres process");
|
||||
info!("Postgres exited with code {}, shutting down", ecode);
|
||||
exit_code = ecode.code()
|
||||
}
|
||||
|
||||
if let Err(err) = compute.check_for_core_dumps() {
|
||||
error!("error while checking for core dumps: {err:?}");
|
||||
}
|
||||
|
||||
// If launch failed, keep serving HTTP requests for a while, so the cloud
|
||||
// control plane can get the actual error.
|
||||
if delay_exit {
|
||||
info!("giving control plane 30s to collect the error before shutdown");
|
||||
thread::sleep(Duration::from_secs(30));
|
||||
info!("shutting down");
|
||||
}
|
||||
|
||||
exit(exit_code.unwrap_or(1))
|
||||
}
|
||||
|
||||
fn cli() -> clap::Command {
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
use anyhow::{anyhow, Result};
|
||||
use log::error;
|
||||
use postgres::Client;
|
||||
use tokio_postgres::NoTls;
|
||||
use tracing::{error, instrument};
|
||||
|
||||
use crate::compute::ComputeNode;
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub fn create_writability_check_data(client: &mut Client) -> Result<()> {
|
||||
let query = "
|
||||
CREATE TABLE IF NOT EXISTS health_check (
|
||||
@@ -21,6 +22,7 @@ pub fn create_writability_check_data(client: &mut Client) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub async fn check_writability(compute: &ComputeNode) -> Result<()> {
|
||||
let (client, connection) = tokio_postgres::connect(compute.connstr.as_str(), NoTls).await?;
|
||||
if client.is_closed() {
|
||||
|
||||
@@ -17,15 +17,15 @@
|
||||
use std::fs;
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
use std::path::Path;
|
||||
use std::process::{Command, ExitStatus, Stdio};
|
||||
use std::process::{Command, Stdio};
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::RwLock;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use chrono::{DateTime, Utc};
|
||||
use log::{info, warn};
|
||||
use postgres::{Client, NoTls};
|
||||
use serde::{Serialize, Serializer};
|
||||
use tracing::{info, instrument, warn};
|
||||
|
||||
use crate::checker::create_writability_check_data;
|
||||
use crate::config;
|
||||
@@ -121,6 +121,7 @@ impl ComputeNode {
|
||||
|
||||
// Get basebackup from the libpq connection to pageserver using `connstr` and
|
||||
// unarchive it to `pgdata` directory overriding all its previous content.
|
||||
#[instrument(skip(self))]
|
||||
fn get_basebackup(&self, lsn: &str) -> Result<()> {
|
||||
let start_time = Utc::now();
|
||||
|
||||
@@ -154,6 +155,7 @@ impl ComputeNode {
|
||||
|
||||
// Run `postgres` in a special mode with `--sync-safekeepers` argument
|
||||
// and return the reported LSN back to the caller.
|
||||
#[instrument(skip(self))]
|
||||
fn sync_safekeepers(&self) -> Result<String> {
|
||||
let start_time = Utc::now();
|
||||
|
||||
@@ -196,6 +198,7 @@ impl ComputeNode {
|
||||
|
||||
/// Do all the preparations like PGDATA directory creation, configuration,
|
||||
/// safekeepers sync, basebackup, etc.
|
||||
#[instrument(skip(self))]
|
||||
pub fn prepare_pgdata(&self) -> Result<()> {
|
||||
let spec = &self.spec;
|
||||
let pgdata_path = Path::new(&self.pgdata);
|
||||
@@ -229,9 +232,8 @@ impl ComputeNode {
|
||||
|
||||
/// Start Postgres as a child process and manage DBs/roles.
|
||||
/// After that this will hang waiting on the postmaster process to exit.
|
||||
pub fn run(&self) -> Result<ExitStatus> {
|
||||
let start_time = Utc::now();
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub fn start_postgres(&self) -> Result<std::process::Child> {
|
||||
let pgdata_path = Path::new(&self.pgdata);
|
||||
|
||||
// Run postgres as a child process.
|
||||
@@ -242,10 +244,15 @@ impl ComputeNode {
|
||||
|
||||
wait_for_postgres(&mut pg, pgdata_path)?;
|
||||
|
||||
Ok(pg)
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub fn apply_config(&self) -> Result<()> {
|
||||
// If connection fails,
|
||||
// it may be the old node with `zenith_admin` superuser.
|
||||
//
|
||||
// In this case we need to connect with old `zenith_admin`name
|
||||
// In this case we need to connect with old `zenith_admin` name
|
||||
// and create new user. We cannot simply rename connected user,
|
||||
// but we can create a new one and grant it all privileges.
|
||||
let mut client = match Client::connect(self.connstr.as_str(), NoTls) {
|
||||
@@ -271,6 +278,7 @@ impl ComputeNode {
|
||||
Ok(client) => client,
|
||||
};
|
||||
|
||||
// Proceed with post-startup configuration. Note, that order of operations is important.
|
||||
handle_roles(&self.spec, &mut client)?;
|
||||
handle_databases(&self.spec, &mut client)?;
|
||||
handle_role_deletions(self, &mut client)?;
|
||||
@@ -279,8 +287,34 @@ impl ComputeNode {
|
||||
|
||||
// 'Close' connection
|
||||
drop(client);
|
||||
let startup_end_time = Utc::now();
|
||||
|
||||
info!(
|
||||
"finished configuration of compute for project {}",
|
||||
self.spec.cluster.cluster_id
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub fn start_compute(&self) -> Result<std::process::Child> {
|
||||
info!(
|
||||
"starting compute for project {}, operation {}, tenant {}, timeline {}",
|
||||
self.spec.cluster.cluster_id,
|
||||
self.spec.operation_uuid.as_ref().unwrap(),
|
||||
self.tenant,
|
||||
self.timeline,
|
||||
);
|
||||
|
||||
self.prepare_pgdata()?;
|
||||
|
||||
let start_time = Utc::now();
|
||||
|
||||
let pg = self.start_postgres()?;
|
||||
|
||||
self.apply_config()?;
|
||||
|
||||
let startup_end_time = Utc::now();
|
||||
self.metrics.config_ms.store(
|
||||
startup_end_time
|
||||
.signed_duration_since(start_time)
|
||||
@@ -300,34 +334,7 @@ impl ComputeNode {
|
||||
|
||||
self.set_status(ComputeStatus::Running);
|
||||
|
||||
info!(
|
||||
"finished configuration of compute for project {}",
|
||||
self.spec.cluster.cluster_id
|
||||
);
|
||||
|
||||
// Wait for child Postgres process basically forever. In this state Ctrl+C
|
||||
// will propagate to Postgres and it will be shut down as well.
|
||||
let ecode = pg
|
||||
.wait()
|
||||
.expect("failed to start waiting on Postgres process");
|
||||
|
||||
self.check_for_core_dumps()
|
||||
.expect("failed to check for core dumps");
|
||||
|
||||
Ok(ecode)
|
||||
}
|
||||
|
||||
pub fn prepare_and_run(&self) -> Result<ExitStatus> {
|
||||
info!(
|
||||
"starting compute for project {}, operation {}, tenant {}, timeline {}",
|
||||
self.spec.cluster.cluster_id,
|
||||
self.spec.operation_uuid.as_ref().unwrap(),
|
||||
self.tenant,
|
||||
self.timeline,
|
||||
);
|
||||
|
||||
self.prepare_pgdata()?;
|
||||
self.run()
|
||||
Ok(pg)
|
||||
}
|
||||
|
||||
// Look for core dumps and collect backtraces.
|
||||
@@ -340,7 +347,7 @@ impl ComputeNode {
|
||||
//
|
||||
// Use that as a default location and pattern, except macos where core dumps are written
|
||||
// to /cores/ directory by default.
|
||||
fn check_for_core_dumps(&self) -> Result<()> {
|
||||
pub fn check_for_core_dumps(&self) -> Result<()> {
|
||||
let core_dump_dir = match std::env::consts::OS {
|
||||
"macos" => Path::new("/cores/"),
|
||||
_ => Path::new(&self.pgdata),
|
||||
|
||||
@@ -6,8 +6,8 @@ use std::thread;
|
||||
use anyhow::Result;
|
||||
use hyper::service::{make_service_fn, service_fn};
|
||||
use hyper::{Body, Method, Request, Response, Server, StatusCode};
|
||||
use log::{error, info};
|
||||
use serde_json;
|
||||
use tracing::{error, info};
|
||||
|
||||
use crate::compute::ComputeNode;
|
||||
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use log::{info, warn};
|
||||
use std::path::Path;
|
||||
use std::process;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use tracing::{info, warn};
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
|
||||
|
||||
@@ -1,42 +1,20 @@
|
||||
use std::io::Write;
|
||||
|
||||
use anyhow::Result;
|
||||
use chrono::Utc;
|
||||
use env_logger::{Builder, Env};
|
||||
|
||||
macro_rules! info_println {
|
||||
($($tts:tt)*) => {
|
||||
if log_enabled!(Level::Info) {
|
||||
println!($($tts)*);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! info_print {
|
||||
($($tts:tt)*) => {
|
||||
if log_enabled!(Level::Info) {
|
||||
print!($($tts)*);
|
||||
}
|
||||
}
|
||||
}
|
||||
use tracing_subscriber::layer::SubscriberExt;
|
||||
use tracing_subscriber::prelude::*;
|
||||
|
||||
/// Initialize `env_logger` using either `default_level` or
|
||||
/// `RUST_LOG` environment variable as default log level.
|
||||
pub fn init_logger(default_level: &str) -> Result<()> {
|
||||
let env = Env::default().filter_or("RUST_LOG", default_level);
|
||||
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
|
||||
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(default_level));
|
||||
|
||||
Builder::from_env(env)
|
||||
.format(|buf, record| {
|
||||
let thread_handle = std::thread::current();
|
||||
writeln!(
|
||||
buf,
|
||||
"{} [{}] {}: {}",
|
||||
Utc::now().format("%Y-%m-%d %H:%M:%S%.3f %Z"),
|
||||
thread_handle.name().unwrap_or("main"),
|
||||
record.level(),
|
||||
record.args()
|
||||
)
|
||||
})
|
||||
let fmt_layer = tracing_subscriber::fmt::layer()
|
||||
.with_target(false)
|
||||
.with_writer(std::io::stderr);
|
||||
|
||||
tracing_subscriber::registry()
|
||||
.with(env_filter)
|
||||
.with(fmt_layer)
|
||||
.init();
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -3,8 +3,8 @@ use std::{thread, time};
|
||||
|
||||
use anyhow::Result;
|
||||
use chrono::{DateTime, Utc};
|
||||
use log::{debug, info};
|
||||
use postgres::{Client, NoTls};
|
||||
use tracing::{debug, info};
|
||||
|
||||
use crate::compute::ComputeNode;
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@ use anyhow::{bail, Result};
|
||||
use notify::{RecursiveMode, Watcher};
|
||||
use postgres::{Client, Transaction};
|
||||
use serde::Deserialize;
|
||||
use tracing::{debug, instrument};
|
||||
|
||||
const POSTGRES_WAIT_TIMEOUT: Duration = Duration::from_millis(60 * 1000); // milliseconds
|
||||
|
||||
@@ -129,8 +130,8 @@ impl Role {
|
||||
/// Serialize a list of role parameters into a Postgres-acceptable
|
||||
/// string of arguments.
|
||||
pub fn to_pg_options(&self) -> String {
|
||||
// XXX: consider putting LOGIN as a default option somewhere higher, e.g. in Rails.
|
||||
// For now we do not use generic `options` for roles. Once used, add
|
||||
// XXX: consider putting LOGIN as a default option somewhere higher, e.g. in control-plane.
|
||||
// For now, we do not use generic `options` for roles. Once used, add
|
||||
// `self.options.as_pg_options()` somewhere here.
|
||||
let mut params: String = "LOGIN".to_string();
|
||||
|
||||
@@ -229,6 +230,7 @@ pub fn get_existing_dbs(client: &mut Client) -> Result<Vec<Database>> {
|
||||
/// Wait for Postgres to become ready to accept connections. It's ready to
|
||||
/// accept connections when the state-field in `pgdata/postmaster.pid` says
|
||||
/// 'ready'.
|
||||
#[instrument(skip(pg))]
|
||||
pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> {
|
||||
let pid_path = pgdata.join("postmaster.pid");
|
||||
|
||||
@@ -287,18 +289,18 @@ pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> {
|
||||
}
|
||||
|
||||
let res = rx.recv_timeout(Duration::from_millis(100));
|
||||
log::debug!("woken up by notify: {res:?}");
|
||||
debug!("woken up by notify: {res:?}");
|
||||
// If there are multiple events in the channel already, we only need to be
|
||||
// check once. Swallow the extra events before we go ahead to check the
|
||||
// pid file.
|
||||
while let Ok(res) = rx.try_recv() {
|
||||
log::debug!("swallowing extra event: {res:?}");
|
||||
debug!("swallowing extra event: {res:?}");
|
||||
}
|
||||
|
||||
// Check that we can open pid file first.
|
||||
if let Ok(file) = File::open(&pid_path) {
|
||||
if !postmaster_pid_seen {
|
||||
log::debug!("postmaster.pid appeared");
|
||||
debug!("postmaster.pid appeared");
|
||||
watcher
|
||||
.unwatch(pgdata)
|
||||
.expect("Failed to remove pgdata dir watch");
|
||||
@@ -314,7 +316,7 @@ pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> {
|
||||
// Pid file could be there and we could read it, but it could be empty, for example.
|
||||
if let Some(Ok(line)) = last_line {
|
||||
let status = line.trim();
|
||||
log::debug!("last line of postmaster.pid: {status:?}");
|
||||
debug!("last line of postmaster.pid: {status:?}");
|
||||
|
||||
// Now Postgres is ready to accept connections
|
||||
if status == "ready" {
|
||||
@@ -330,7 +332,7 @@ pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
log::info!("PostgreSQL is now running, continuing to configure it");
|
||||
tracing::info!("PostgreSQL is now running, continuing to configure it");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,12 +1,11 @@
|
||||
use std::path::Path;
|
||||
use std::str::FromStr;
|
||||
use std::time::Instant;
|
||||
|
||||
use anyhow::Result;
|
||||
use log::{info, log_enabled, warn, Level};
|
||||
use postgres::config::Config;
|
||||
use postgres::{Client, NoTls};
|
||||
use serde::Deserialize;
|
||||
use tracing::{info, info_span, instrument, span_enabled, warn, Level};
|
||||
|
||||
use crate::compute::ComputeNode;
|
||||
use crate::config;
|
||||
@@ -80,23 +79,25 @@ pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
|
||||
|
||||
/// Given a cluster spec json and open transaction it handles roles creation,
|
||||
/// deletion and update.
|
||||
#[instrument(skip_all)]
|
||||
pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
|
||||
let mut xact = client.transaction()?;
|
||||
let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
|
||||
|
||||
// Print a list of existing Postgres roles (only in debug mode)
|
||||
info!("postgres roles:");
|
||||
for r in &existing_roles {
|
||||
info_println!(
|
||||
"{} - {}:{}",
|
||||
" ".repeat(27 + 5),
|
||||
r.name,
|
||||
if r.encrypted_password.is_some() {
|
||||
"[FILTERED]"
|
||||
} else {
|
||||
"(null)"
|
||||
}
|
||||
);
|
||||
if span_enabled!(Level::INFO) {
|
||||
info!("postgres roles:");
|
||||
for r in &existing_roles {
|
||||
info!(
|
||||
" - {}:{}",
|
||||
r.name,
|
||||
if r.encrypted_password.is_some() {
|
||||
"[FILTERED]"
|
||||
} else {
|
||||
"(null)"
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Process delta operations first
|
||||
@@ -137,58 +138,68 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
|
||||
info!("cluster spec roles:");
|
||||
for role in &spec.cluster.roles {
|
||||
let name = &role.name;
|
||||
|
||||
info_print!(
|
||||
"{} - {}:{}",
|
||||
" ".repeat(27 + 5),
|
||||
name,
|
||||
if role.encrypted_password.is_some() {
|
||||
"[FILTERED]"
|
||||
} else {
|
||||
"(null)"
|
||||
}
|
||||
);
|
||||
|
||||
// XXX: with a limited number of roles it is fine, but consider making it a HashMap
|
||||
let pg_role = existing_roles.iter().find(|r| r.name == *name);
|
||||
|
||||
if let Some(r) = pg_role {
|
||||
let mut update_role = false;
|
||||
|
||||
enum RoleAction {
|
||||
None,
|
||||
Update,
|
||||
Create,
|
||||
}
|
||||
let action = if let Some(r) = pg_role {
|
||||
if (r.encrypted_password.is_none() && role.encrypted_password.is_some())
|
||||
|| (r.encrypted_password.is_some() && role.encrypted_password.is_none())
|
||||
{
|
||||
update_role = true;
|
||||
RoleAction::Update
|
||||
} else if let Some(pg_pwd) = &r.encrypted_password {
|
||||
// Check whether password changed or not (trim 'md5:' prefix first)
|
||||
update_role = pg_pwd[3..] != *role.encrypted_password.as_ref().unwrap();
|
||||
if pg_pwd[3..] != *role.encrypted_password.as_ref().unwrap() {
|
||||
RoleAction::Update
|
||||
} else {
|
||||
RoleAction::None
|
||||
}
|
||||
} else {
|
||||
RoleAction::None
|
||||
}
|
||||
} else {
|
||||
RoleAction::Create
|
||||
};
|
||||
|
||||
if update_role {
|
||||
match action {
|
||||
RoleAction::None => {}
|
||||
RoleAction::Update => {
|
||||
let mut query: String = format!("ALTER ROLE {} ", name.pg_quote());
|
||||
info_print!(" -> update");
|
||||
|
||||
query.push_str(&role.to_pg_options());
|
||||
xact.execute(query.as_str(), &[])?;
|
||||
}
|
||||
} else {
|
||||
info!("role name: '{}'", &name);
|
||||
let mut query: String = format!("CREATE ROLE {} ", name.pg_quote());
|
||||
info!("role create query: '{}'", &query);
|
||||
info_print!(" -> create");
|
||||
RoleAction::Create => {
|
||||
let mut query: String = format!("CREATE ROLE {} ", name.pg_quote());
|
||||
info!("role create query: '{}'", &query);
|
||||
query.push_str(&role.to_pg_options());
|
||||
xact.execute(query.as_str(), &[])?;
|
||||
|
||||
query.push_str(&role.to_pg_options());
|
||||
xact.execute(query.as_str(), &[])?;
|
||||
|
||||
let grant_query = format!(
|
||||
"GRANT pg_read_all_data, pg_write_all_data TO {}",
|
||||
name.pg_quote()
|
||||
);
|
||||
xact.execute(grant_query.as_str(), &[])?;
|
||||
info!("role grant query: '{}'", &grant_query);
|
||||
let grant_query = format!(
|
||||
"GRANT pg_read_all_data, pg_write_all_data TO {}",
|
||||
name.pg_quote()
|
||||
);
|
||||
xact.execute(grant_query.as_str(), &[])?;
|
||||
info!("role grant query: '{}'", &grant_query);
|
||||
}
|
||||
}
|
||||
|
||||
info_print!("\n");
|
||||
if span_enabled!(Level::INFO) {
|
||||
let pwd = if role.encrypted_password.is_some() {
|
||||
"[FILTERED]"
|
||||
} else {
|
||||
"(null)"
|
||||
};
|
||||
let action_str = match action {
|
||||
RoleAction::None => "",
|
||||
RoleAction::Create => " -> create",
|
||||
RoleAction::Update => " -> update",
|
||||
};
|
||||
info!(" - {}:{}{}", name, pwd, action_str);
|
||||
}
|
||||
}
|
||||
|
||||
xact.commit()?;
|
||||
@@ -197,12 +208,25 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
|
||||
}
|
||||
|
||||
/// Reassign all dependent objects and delete requested roles.
|
||||
#[instrument(skip_all)]
|
||||
pub fn handle_role_deletions(node: &ComputeNode, client: &mut Client) -> Result<()> {
|
||||
if let Some(ops) = &node.spec.delta_operations {
|
||||
// First, reassign all dependent objects to db owners.
|
||||
info!("reassigning dependent objects of to-be-deleted roles");
|
||||
|
||||
// Fetch existing roles. We could've exported and used `existing_roles` from
|
||||
// `handle_roles()`, but we only make this list there before creating new roles.
|
||||
// Which is probably fine as we never create to-be-deleted roles, but that'd
|
||||
// just look a bit untidy. Anyway, the entire `pg_roles` should be in shared
|
||||
// buffers already, so this shouldn't be a big deal.
|
||||
let mut xact = client.transaction()?;
|
||||
let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
|
||||
xact.commit()?;
|
||||
|
||||
for op in ops {
|
||||
if op.action == "delete_role" {
|
||||
// Check that role is still present in Postgres, as this could be a
|
||||
// restart with the same spec after role deletion.
|
||||
if op.action == "delete_role" && existing_roles.iter().any(|r| r.name == op.name) {
|
||||
reassign_owned_objects(node, &op.name)?;
|
||||
}
|
||||
}
|
||||
@@ -261,13 +285,16 @@ fn reassign_owned_objects(node: &ComputeNode, role_name: &PgIdent) -> Result<()>
|
||||
/// like `CREATE DATABASE` and `DROP DATABASE` do not support it. Statement-level
|
||||
/// atomicity should be enough here due to the order of operations and various checks,
|
||||
/// which together provide us idempotency.
|
||||
#[instrument(skip_all)]
|
||||
pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
|
||||
let existing_dbs: Vec<Database> = get_existing_dbs(client)?;
|
||||
|
||||
// Print a list of existing Postgres databases (only in debug mode)
|
||||
info!("postgres databases:");
|
||||
for r in &existing_dbs {
|
||||
info_println!("{} - {}:{}", " ".repeat(27 + 5), r.name, r.owner);
|
||||
if span_enabled!(Level::INFO) {
|
||||
info!("postgres databases:");
|
||||
for r in &existing_dbs {
|
||||
info!(" {}:{}", r.name, r.owner);
|
||||
}
|
||||
}
|
||||
|
||||
// Process delta operations first
|
||||
@@ -310,13 +337,15 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
|
||||
for db in &spec.cluster.databases {
|
||||
let name = &db.name;
|
||||
|
||||
info_print!("{} - {}:{}", " ".repeat(27 + 5), db.name, db.owner);
|
||||
|
||||
// XXX: with a limited number of databases it is fine, but consider making it a HashMap
|
||||
let pg_db = existing_dbs.iter().find(|r| r.name == *name);
|
||||
|
||||
let start_time = Instant::now();
|
||||
if let Some(r) = pg_db {
|
||||
enum DatabaseAction {
|
||||
None,
|
||||
Update,
|
||||
Create,
|
||||
}
|
||||
let action = if let Some(r) = pg_db {
|
||||
// XXX: db owner name is returned as quoted string from Postgres,
|
||||
// when quoting is needed.
|
||||
let new_owner = if r.owner.starts_with('"') {
|
||||
@@ -326,29 +355,42 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
|
||||
};
|
||||
|
||||
if new_owner != r.owner {
|
||||
// Update the owner
|
||||
DatabaseAction::Update
|
||||
} else {
|
||||
DatabaseAction::None
|
||||
}
|
||||
} else {
|
||||
DatabaseAction::Create
|
||||
};
|
||||
|
||||
match action {
|
||||
DatabaseAction::None => {}
|
||||
DatabaseAction::Update => {
|
||||
let query: String = format!(
|
||||
"ALTER DATABASE {} OWNER TO {}",
|
||||
name.pg_quote(),
|
||||
db.owner.pg_quote()
|
||||
);
|
||||
info_print!(" -> update");
|
||||
|
||||
let _ = info_span!("executing", query).entered();
|
||||
client.execute(query.as_str(), &[])?;
|
||||
let elapsed = start_time.elapsed().as_millis();
|
||||
info_print!(" ({} ms)", elapsed);
|
||||
}
|
||||
} else {
|
||||
let mut query: String = format!("CREATE DATABASE {} ", name.pg_quote());
|
||||
info_print!(" -> create");
|
||||
DatabaseAction::Create => {
|
||||
let mut query: String = format!("CREATE DATABASE {} ", name.pg_quote());
|
||||
query.push_str(&db.to_pg_options());
|
||||
let _ = info_span!("executing", query).entered();
|
||||
client.execute(query.as_str(), &[])?;
|
||||
}
|
||||
};
|
||||
|
||||
query.push_str(&db.to_pg_options());
|
||||
client.execute(query.as_str(), &[])?;
|
||||
|
||||
let elapsed = start_time.elapsed().as_millis();
|
||||
info_print!(" ({} ms)", elapsed);
|
||||
if span_enabled!(Level::INFO) {
|
||||
let action_str = match action {
|
||||
DatabaseAction::None => "",
|
||||
DatabaseAction::Create => " -> create",
|
||||
DatabaseAction::Update => " -> update",
|
||||
};
|
||||
info!(" - {}:{}{}", db.name, db.owner, action_str);
|
||||
}
|
||||
|
||||
info_print!("\n");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -356,6 +398,7 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
|
||||
|
||||
/// Grant CREATE ON DATABASE to the database owner and do some other alters and grants
|
||||
/// to allow users creating trusted extensions and re-creating `public` schema, for example.
|
||||
#[instrument(skip_all)]
|
||||
pub fn handle_grants(node: &ComputeNode, client: &mut Client) -> Result<()> {
|
||||
let spec = &node.spec;
|
||||
|
||||
|
||||
@@ -134,22 +134,25 @@ impl<K: std::hash::Hash + Eq + 'static> Storage<K> {
|
||||
op: Cow<'static, str>,
|
||||
lsn: u64,
|
||||
size: Option<u64>,
|
||||
) where
|
||||
) -> anyhow::Result<()>
|
||||
where
|
||||
K: std::borrow::Borrow<Q>,
|
||||
Q: std::hash::Hash + Eq + std::fmt::Debug,
|
||||
{
|
||||
let lastseg_id = *self.branches.get(branch).unwrap();
|
||||
let Some(lastseg_id) = self.branches.get(branch).copied() else { anyhow::bail!("branch not found: {branch:?}") };
|
||||
let newseg_id = self.segments.len();
|
||||
let lastseg = &mut self.segments[lastseg_id];
|
||||
|
||||
assert!(lsn > lastseg.end_lsn);
|
||||
|
||||
let Some(start_size) = lastseg.end_size else { anyhow::bail!("no end_size on latest segment for {branch:?}") };
|
||||
|
||||
let newseg = Segment {
|
||||
op,
|
||||
parent: Some(lastseg_id),
|
||||
start_lsn: lastseg.end_lsn,
|
||||
end_lsn: lsn,
|
||||
start_size: lastseg.end_size.unwrap(),
|
||||
start_size,
|
||||
end_size: size,
|
||||
children_after: Vec::new(),
|
||||
needed: false,
|
||||
@@ -158,6 +161,8 @@ impl<K: std::hash::Hash + Eq + 'static> Storage<K> {
|
||||
|
||||
self.segments.push(newseg);
|
||||
*self.branches.get_mut(branch).expect("read already") = newseg_id;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Advances the branch with the named operation, by the relative LSN and logical size bytes.
|
||||
@@ -167,21 +172,24 @@ impl<K: std::hash::Hash + Eq + 'static> Storage<K> {
|
||||
op: Cow<'static, str>,
|
||||
lsn_bytes: u64,
|
||||
size_bytes: i64,
|
||||
) where
|
||||
) -> anyhow::Result<()>
|
||||
where
|
||||
K: std::borrow::Borrow<Q>,
|
||||
Q: std::hash::Hash + Eq,
|
||||
Q: std::hash::Hash + Eq + std::fmt::Debug,
|
||||
{
|
||||
let lastseg_id = *self.branches.get(branch).unwrap();
|
||||
let Some(lastseg_id) = self.branches.get(branch).copied() else { anyhow::bail!("branch not found: {branch:?}") };
|
||||
let newseg_id = self.segments.len();
|
||||
let lastseg = &mut self.segments[lastseg_id];
|
||||
|
||||
let Some(last_end_size) = lastseg.end_size else { anyhow::bail!("no end_size on latest segment for {branch:?}") };
|
||||
|
||||
let newseg = Segment {
|
||||
op,
|
||||
parent: Some(lastseg_id),
|
||||
start_lsn: lastseg.end_lsn,
|
||||
end_lsn: lastseg.end_lsn + lsn_bytes,
|
||||
start_size: lastseg.end_size.unwrap(),
|
||||
end_size: Some((lastseg.end_size.unwrap() as i64 + size_bytes) as u64),
|
||||
start_size: last_end_size,
|
||||
end_size: Some((last_end_size as i64 + size_bytes) as u64),
|
||||
children_after: Vec::new(),
|
||||
needed: false,
|
||||
};
|
||||
@@ -189,33 +197,33 @@ impl<K: std::hash::Hash + Eq + 'static> Storage<K> {
|
||||
|
||||
self.segments.push(newseg);
|
||||
*self.branches.get_mut(branch).expect("read already") = newseg_id;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn insert<Q: ?Sized>(&mut self, branch: &Q, bytes: u64)
|
||||
pub fn insert<Q: ?Sized>(&mut self, branch: &Q, bytes: u64) -> anyhow::Result<()>
|
||||
where
|
||||
K: std::borrow::Borrow<Q>,
|
||||
Q: std::hash::Hash + Eq,
|
||||
Q: std::hash::Hash + Eq + std::fmt::Debug,
|
||||
{
|
||||
self.modify_branch(branch, "insert".into(), bytes, bytes as i64);
|
||||
self.modify_branch(branch, "insert".into(), bytes, bytes as i64)
|
||||
}
|
||||
|
||||
pub fn update<Q: ?Sized>(&mut self, branch: &Q, bytes: u64)
|
||||
pub fn update<Q: ?Sized>(&mut self, branch: &Q, bytes: u64) -> anyhow::Result<()>
|
||||
where
|
||||
K: std::borrow::Borrow<Q>,
|
||||
Q: std::hash::Hash + Eq,
|
||||
Q: std::hash::Hash + Eq + std::fmt::Debug,
|
||||
{
|
||||
self.modify_branch(branch, "update".into(), bytes, 0i64);
|
||||
self.modify_branch(branch, "update".into(), bytes, 0i64)
|
||||
}
|
||||
|
||||
pub fn delete<Q: ?Sized>(&mut self, branch: &Q, bytes: u64)
|
||||
pub fn delete<Q: ?Sized>(&mut self, branch: &Q, bytes: u64) -> anyhow::Result<()>
|
||||
where
|
||||
K: std::borrow::Borrow<Q>,
|
||||
Q: std::hash::Hash + Eq,
|
||||
Q: std::hash::Hash + Eq + std::fmt::Debug,
|
||||
{
|
||||
self.modify_branch(branch, "delete".into(), bytes, -(bytes as i64));
|
||||
self.modify_branch(branch, "delete".into(), bytes, -(bytes as i64))
|
||||
}
|
||||
|
||||
/// Panics if the parent branch cannot be found.
|
||||
pub fn branch<Q: ?Sized>(&mut self, parent: &Q, name: K) -> anyhow::Result<()>
|
||||
where
|
||||
K: std::borrow::Borrow<Q> + std::fmt::Debug,
|
||||
@@ -236,7 +244,7 @@ impl<K: std::hash::Hash + Eq + 'static> Storage<K> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn calculate(&mut self, retention_period: u64) -> SegmentSize {
|
||||
pub fn calculate(&mut self, retention_period: u64) -> anyhow::Result<SegmentSize> {
|
||||
// Phase 1: Mark all the segments that need to be retained
|
||||
for (_branch, &last_seg_id) in self.branches.iter() {
|
||||
let last_seg = &self.segments[last_seg_id];
|
||||
@@ -261,7 +269,7 @@ impl<K: std::hash::Hash + Eq + 'static> Storage<K> {
|
||||
self.size_from_snapshot_later(0)
|
||||
}
|
||||
|
||||
fn size_from_wal(&self, seg_id: usize) -> SegmentSize {
|
||||
fn size_from_wal(&self, seg_id: usize) -> anyhow::Result<SegmentSize> {
|
||||
let seg = &self.segments[seg_id];
|
||||
|
||||
let this_size = seg.end_lsn - seg.start_lsn;
|
||||
@@ -272,10 +280,10 @@ impl<K: std::hash::Hash + Eq + 'static> Storage<K> {
|
||||
for &child_id in seg.children_after.iter() {
|
||||
// try each child both ways
|
||||
let child = &self.segments[child_id];
|
||||
let p1 = self.size_from_wal(child_id);
|
||||
let p1 = self.size_from_wal(child_id)?;
|
||||
|
||||
let p = if !child.needed {
|
||||
let p2 = self.size_from_snapshot_later(child_id);
|
||||
let p2 = self.size_from_snapshot_later(child_id)?;
|
||||
if p1.total() < p2.total() {
|
||||
p1
|
||||
} else {
|
||||
@@ -286,15 +294,15 @@ impl<K: std::hash::Hash + Eq + 'static> Storage<K> {
|
||||
};
|
||||
children.push(p);
|
||||
}
|
||||
SegmentSize {
|
||||
Ok(SegmentSize {
|
||||
seg_id,
|
||||
method: if seg.needed { WalNeeded } else { Wal },
|
||||
this_size,
|
||||
children,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn size_from_snapshot_later(&self, seg_id: usize) -> SegmentSize {
|
||||
fn size_from_snapshot_later(&self, seg_id: usize) -> anyhow::Result<SegmentSize> {
|
||||
// If this is needed, then it's time to do the snapshot and continue
|
||||
// with wal method.
|
||||
let seg = &self.segments[seg_id];
|
||||
@@ -305,10 +313,10 @@ impl<K: std::hash::Hash + Eq + 'static> Storage<K> {
|
||||
for &child_id in seg.children_after.iter() {
|
||||
// try each child both ways
|
||||
let child = &self.segments[child_id];
|
||||
let p1 = self.size_from_wal(child_id);
|
||||
let p1 = self.size_from_wal(child_id)?;
|
||||
|
||||
let p = if !child.needed {
|
||||
let p2 = self.size_from_snapshot_later(child_id);
|
||||
let p2 = self.size_from_snapshot_later(child_id)?;
|
||||
if p1.total() < p2.total() {
|
||||
p1
|
||||
} else {
|
||||
@@ -319,12 +327,12 @@ impl<K: std::hash::Hash + Eq + 'static> Storage<K> {
|
||||
};
|
||||
children.push(p);
|
||||
}
|
||||
SegmentSize {
|
||||
Ok(SegmentSize {
|
||||
seg_id,
|
||||
method: WalNeeded,
|
||||
this_size: seg.start_size,
|
||||
children,
|
||||
}
|
||||
})
|
||||
} else {
|
||||
// If any of the direct children are "needed", need to be able to reconstruct here
|
||||
let mut children_needed = false;
|
||||
@@ -339,7 +347,7 @@ impl<K: std::hash::Hash + Eq + 'static> Storage<K> {
|
||||
let method1 = if !children_needed {
|
||||
let mut children = Vec::new();
|
||||
for child in seg.children_after.iter() {
|
||||
children.push(self.size_from_snapshot_later(*child));
|
||||
children.push(self.size_from_snapshot_later(*child)?);
|
||||
}
|
||||
Some(SegmentSize {
|
||||
seg_id,
|
||||
@@ -355,20 +363,25 @@ impl<K: std::hash::Hash + Eq + 'static> Storage<K> {
|
||||
let method2 = if children_needed || seg.children_after.len() >= 2 {
|
||||
let mut children = Vec::new();
|
||||
for child in seg.children_after.iter() {
|
||||
children.push(self.size_from_wal(*child));
|
||||
children.push(self.size_from_wal(*child)?);
|
||||
}
|
||||
let Some(this_size) = seg.end_size else { anyhow::bail!("no end_size at junction {seg_id}") };
|
||||
Some(SegmentSize {
|
||||
seg_id,
|
||||
method: SnapshotAfter,
|
||||
this_size: seg.end_size.unwrap(),
|
||||
this_size,
|
||||
children,
|
||||
})
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
match (method1, method2) {
|
||||
(None, None) => panic!(),
|
||||
Ok(match (method1, method2) {
|
||||
(None, None) => anyhow::bail!(
|
||||
"neither method was applicable: children_after={}, children_needed={}",
|
||||
seg.children_after.len(),
|
||||
children_needed
|
||||
),
|
||||
(Some(method), None) => method,
|
||||
(None, Some(method)) => method,
|
||||
(Some(method1), Some(method2)) => {
|
||||
@@ -378,7 +391,7 @@ impl<K: std::hash::Hash + Eq + 'static> Storage<K> {
|
||||
method2
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -7,118 +7,118 @@
|
||||
use tenant_size_model::{Segment, SegmentSize, Storage};
|
||||
|
||||
// Main branch only. Some updates on it.
|
||||
fn scenario_1() -> (Vec<Segment>, SegmentSize) {
|
||||
fn scenario_1() -> anyhow::Result<(Vec<Segment>, SegmentSize)> {
|
||||
// Create main branch
|
||||
let mut storage = Storage::new("main");
|
||||
|
||||
// Bulk load 5 GB of data to it
|
||||
storage.insert("main", 5_000);
|
||||
storage.insert("main", 5_000)?;
|
||||
|
||||
// Stream of updates
|
||||
for _ in 0..5 {
|
||||
storage.update("main", 1_000);
|
||||
storage.update("main", 1_000)?;
|
||||
}
|
||||
|
||||
let size = storage.calculate(1000);
|
||||
let size = storage.calculate(1000)?;
|
||||
|
||||
(storage.into_segments(), size)
|
||||
Ok((storage.into_segments(), size))
|
||||
}
|
||||
|
||||
// Main branch only. Some updates on it.
|
||||
fn scenario_2() -> (Vec<Segment>, SegmentSize) {
|
||||
fn scenario_2() -> anyhow::Result<(Vec<Segment>, SegmentSize)> {
|
||||
// Create main branch
|
||||
let mut storage = Storage::new("main");
|
||||
|
||||
// Bulk load 5 GB of data to it
|
||||
storage.insert("main", 5_000);
|
||||
storage.insert("main", 5_000)?;
|
||||
|
||||
// Stream of updates
|
||||
for _ in 0..5 {
|
||||
storage.update("main", 1_000);
|
||||
storage.update("main", 1_000)?;
|
||||
}
|
||||
|
||||
// Branch
|
||||
storage.branch("main", "child").unwrap();
|
||||
storage.update("child", 1_000);
|
||||
storage.branch("main", "child")?;
|
||||
storage.update("child", 1_000)?;
|
||||
|
||||
// More updates on parent
|
||||
storage.update("main", 1_000);
|
||||
storage.update("main", 1_000)?;
|
||||
|
||||
let size = storage.calculate(1000);
|
||||
let size = storage.calculate(1000)?;
|
||||
|
||||
(storage.into_segments(), size)
|
||||
Ok((storage.into_segments(), size))
|
||||
}
|
||||
|
||||
// Like 2, but more updates on main
|
||||
fn scenario_3() -> (Vec<Segment>, SegmentSize) {
|
||||
fn scenario_3() -> anyhow::Result<(Vec<Segment>, SegmentSize)> {
|
||||
// Create main branch
|
||||
let mut storage = Storage::new("main");
|
||||
|
||||
// Bulk load 5 GB of data to it
|
||||
storage.insert("main", 5_000);
|
||||
storage.insert("main", 5_000)?;
|
||||
|
||||
// Stream of updates
|
||||
for _ in 0..5 {
|
||||
storage.update("main", 1_000);
|
||||
storage.update("main", 1_000)?;
|
||||
}
|
||||
|
||||
// Branch
|
||||
storage.branch("main", "child").unwrap();
|
||||
storage.update("child", 1_000);
|
||||
storage.branch("main", "child")?;
|
||||
storage.update("child", 1_000)?;
|
||||
|
||||
// More updates on parent
|
||||
for _ in 0..5 {
|
||||
storage.update("main", 1_000);
|
||||
storage.update("main", 1_000)?;
|
||||
}
|
||||
|
||||
let size = storage.calculate(1000);
|
||||
let size = storage.calculate(1000)?;
|
||||
|
||||
(storage.into_segments(), size)
|
||||
Ok((storage.into_segments(), size))
|
||||
}
|
||||
|
||||
// Diverged branches
|
||||
fn scenario_4() -> (Vec<Segment>, SegmentSize) {
|
||||
fn scenario_4() -> anyhow::Result<(Vec<Segment>, SegmentSize)> {
|
||||
// Create main branch
|
||||
let mut storage = Storage::new("main");
|
||||
|
||||
// Bulk load 5 GB of data to it
|
||||
storage.insert("main", 5_000);
|
||||
storage.insert("main", 5_000)?;
|
||||
|
||||
// Stream of updates
|
||||
for _ in 0..5 {
|
||||
storage.update("main", 1_000);
|
||||
storage.update("main", 1_000)?;
|
||||
}
|
||||
|
||||
// Branch
|
||||
storage.branch("main", "child").unwrap();
|
||||
storage.update("child", 1_000);
|
||||
storage.branch("main", "child")?;
|
||||
storage.update("child", 1_000)?;
|
||||
|
||||
// More updates on parent
|
||||
for _ in 0..8 {
|
||||
storage.update("main", 1_000);
|
||||
storage.update("main", 1_000)?;
|
||||
}
|
||||
|
||||
let size = storage.calculate(1000);
|
||||
let size = storage.calculate(1000)?;
|
||||
|
||||
(storage.into_segments(), size)
|
||||
Ok((storage.into_segments(), size))
|
||||
}
|
||||
|
||||
fn scenario_5() -> (Vec<Segment>, SegmentSize) {
|
||||
fn scenario_5() -> anyhow::Result<(Vec<Segment>, SegmentSize)> {
|
||||
let mut storage = Storage::new("a");
|
||||
storage.insert("a", 5000);
|
||||
storage.branch("a", "b").unwrap();
|
||||
storage.update("b", 4000);
|
||||
storage.update("a", 2000);
|
||||
storage.branch("a", "c").unwrap();
|
||||
storage.insert("c", 4000);
|
||||
storage.insert("a", 2000);
|
||||
storage.insert("a", 5000)?;
|
||||
storage.branch("a", "b")?;
|
||||
storage.update("b", 4000)?;
|
||||
storage.update("a", 2000)?;
|
||||
storage.branch("a", "c")?;
|
||||
storage.insert("c", 4000)?;
|
||||
storage.insert("a", 2000)?;
|
||||
|
||||
let size = storage.calculate(5000);
|
||||
let size = storage.calculate(5000)?;
|
||||
|
||||
(storage.into_segments(), size)
|
||||
Ok((storage.into_segments(), size))
|
||||
}
|
||||
|
||||
fn scenario_6() -> (Vec<Segment>, SegmentSize) {
|
||||
fn scenario_6() -> anyhow::Result<(Vec<Segment>, SegmentSize)> {
|
||||
use std::borrow::Cow;
|
||||
|
||||
const NO_OP: Cow<'static, str> = Cow::Borrowed("");
|
||||
@@ -133,18 +133,18 @@ fn scenario_6() -> (Vec<Segment>, SegmentSize) {
|
||||
|
||||
let mut storage = Storage::new(None);
|
||||
|
||||
storage.branch(&None, branches[0]).unwrap(); // at 0
|
||||
storage.modify_branch(&branches[0], NO_OP, 108951064, 43696128); // at 108951064
|
||||
storage.branch(&branches[0], branches[1]).unwrap(); // at 108951064
|
||||
storage.modify_branch(&branches[1], NO_OP, 15560408, -1851392); // at 124511472
|
||||
storage.modify_branch(&branches[0], NO_OP, 174464360, -1531904); // at 283415424
|
||||
storage.branch(&branches[0], branches[2]).unwrap(); // at 283415424
|
||||
storage.modify_branch(&branches[2], NO_OP, 15906192, 8192); // at 299321616
|
||||
storage.modify_branch(&branches[0], NO_OP, 18909976, 32768); // at 302325400
|
||||
storage.branch(&None, branches[0])?; // at 0
|
||||
storage.modify_branch(&branches[0], NO_OP, 108951064, 43696128)?; // at 108951064
|
||||
storage.branch(&branches[0], branches[1])?; // at 108951064
|
||||
storage.modify_branch(&branches[1], NO_OP, 15560408, -1851392)?; // at 124511472
|
||||
storage.modify_branch(&branches[0], NO_OP, 174464360, -1531904)?; // at 283415424
|
||||
storage.branch(&branches[0], branches[2])?; // at 283415424
|
||||
storage.modify_branch(&branches[2], NO_OP, 15906192, 8192)?; // at 299321616
|
||||
storage.modify_branch(&branches[0], NO_OP, 18909976, 32768)?; // at 302325400
|
||||
|
||||
let size = storage.calculate(100_000);
|
||||
let size = storage.calculate(100_000)?;
|
||||
|
||||
(storage.into_segments(), size)
|
||||
Ok((storage.into_segments(), size))
|
||||
}
|
||||
|
||||
fn main() {
|
||||
@@ -163,7 +163,8 @@ fn main() {
|
||||
eprintln!("invalid scenario {}", other);
|
||||
std::process::exit(1);
|
||||
}
|
||||
};
|
||||
}
|
||||
.unwrap();
|
||||
|
||||
graphviz_tree(&segments, &size);
|
||||
}
|
||||
@@ -251,7 +252,7 @@ fn graphviz_tree(segments: &[Segment], tree: &SegmentSize) {
|
||||
|
||||
#[test]
|
||||
fn scenarios_return_same_size() {
|
||||
type ScenarioFn = fn() -> (Vec<Segment>, SegmentSize);
|
||||
type ScenarioFn = fn() -> anyhow::Result<(Vec<Segment>, SegmentSize)>;
|
||||
let truths: &[(u32, ScenarioFn, _)] = &[
|
||||
(line!(), scenario_1, 8000),
|
||||
(line!(), scenario_2, 9000),
|
||||
@@ -262,7 +263,7 @@ fn scenarios_return_same_size() {
|
||||
];
|
||||
|
||||
for (line, scenario, expected) in truths {
|
||||
let (_, size) = scenario();
|
||||
let (_, size) = scenario().unwrap();
|
||||
assert_eq!(*expected, size.total_children(), "scenario on line {line}");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ use strum_macros::{EnumString, EnumVariantNames};
|
||||
pub enum LogFormat {
|
||||
Plain,
|
||||
Json,
|
||||
Test,
|
||||
}
|
||||
|
||||
impl LogFormat {
|
||||
@@ -39,6 +40,7 @@ pub fn init(log_format: LogFormat) -> anyhow::Result<()> {
|
||||
match log_format {
|
||||
LogFormat::Json => base_logger.json().init(),
|
||||
LogFormat::Plain => base_logger.init(),
|
||||
LogFormat::Test => base_logger.with_test_writer().init(),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -59,7 +59,7 @@ pub async fn collect_metrics(
|
||||
None,
|
||||
None,
|
||||
"synthetic size calculation",
|
||||
true,
|
||||
false,
|
||||
async move {
|
||||
calculate_synthetic_size_worker(synthetic_size_calculation_interval)
|
||||
.instrument(info_span!("synthetic_size_worker"))
|
||||
|
||||
@@ -916,6 +916,7 @@ pub fn make_router(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/download_remote_layers",
|
||||
timeline_download_remote_layers_handler_post,
|
||||
)
|
||||
.post("/add_forced_now", handle_add_forced_now)
|
||||
.get(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/download_remote_layers",
|
||||
timeline_download_remote_layers_handler_get,
|
||||
@@ -926,3 +927,14 @@ pub fn make_router(
|
||||
)
|
||||
.any(handler_404))
|
||||
}
|
||||
|
||||
async fn handle_add_forced_now(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let now = get_query_param(&req, "now")?;
|
||||
|
||||
let now = chrono::DateTime::parse_from_rfc3339(&now).unwrap();
|
||||
let now = now.with_timezone(&chrono::Utc);
|
||||
|
||||
crate::tenant::timeline::Timeline::force_next_now(now.into());
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
@@ -1405,15 +1405,15 @@ fn slru_segment_key_range(kind: SlruKind, segno: u32) -> Range<Key> {
|
||||
Key {
|
||||
field1: 0x01,
|
||||
field2,
|
||||
field3: segno,
|
||||
field4: 0,
|
||||
field3: 1,
|
||||
field4: segno,
|
||||
field5: 0,
|
||||
field6: 0,
|
||||
}..Key {
|
||||
field1: 0x01,
|
||||
field2,
|
||||
field3: segno,
|
||||
field4: 0,
|
||||
field3: 1,
|
||||
field4: segno,
|
||||
field5: 1,
|
||||
field6: 0,
|
||||
}
|
||||
|
||||
@@ -90,7 +90,7 @@ pub mod mgr;
|
||||
pub mod tasks;
|
||||
pub mod upload_queue;
|
||||
|
||||
mod timeline;
|
||||
pub mod timeline;
|
||||
|
||||
pub mod size;
|
||||
|
||||
@@ -1797,9 +1797,12 @@ impl Tenant {
|
||||
let mut target_config_file = VirtualFile::open_with_options(
|
||||
target_config_path,
|
||||
OpenOptions::new()
|
||||
.truncate(true) // This needed for overwriting with small config files
|
||||
// This needed for overwriting with small config files
|
||||
.truncate(true)
|
||||
.write(true)
|
||||
.create_new(first_save),
|
||||
.create_new(first_save)
|
||||
// this will be ignored if create_new(true)
|
||||
.create(true),
|
||||
)?;
|
||||
|
||||
target_config_file
|
||||
@@ -2627,8 +2630,10 @@ where
|
||||
pub mod harness {
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use once_cell::sync::Lazy;
|
||||
use once_cell::sync::OnceCell;
|
||||
use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
use std::{fs, path::PathBuf};
|
||||
use utils::logging;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::{
|
||||
@@ -2692,6 +2697,8 @@ pub mod harness {
|
||||
),
|
||||
}
|
||||
|
||||
static LOG_HANDLE: OnceCell<()> = OnceCell::new();
|
||||
|
||||
impl<'a> TenantHarness<'a> {
|
||||
pub fn create(test_name: &'static str) -> anyhow::Result<Self> {
|
||||
Self::create_internal(test_name, false)
|
||||
@@ -2706,6 +2713,10 @@ pub mod harness {
|
||||
(Some(LOCK.read().unwrap()), None)
|
||||
};
|
||||
|
||||
LOG_HANDLE.get_or_init(|| {
|
||||
logging::init(logging::LogFormat::Test).expect("Failed to init test logging")
|
||||
});
|
||||
|
||||
let repo_dir = PageServerConf::test_repo_dir(test_name);
|
||||
let _ = fs::remove_dir_all(&repo_dir);
|
||||
fs::create_dir_all(&repo_dir)?;
|
||||
|
||||
@@ -202,6 +202,13 @@ impl<T: ?Sized> PartialEq for LayerRTreeObject<T> {
|
||||
// references. Clippy complains about this. In practice it
|
||||
// seems to work, the assertion below would be triggered
|
||||
// otherwise but this ought to be fixed.
|
||||
|
||||
{
|
||||
let left = Arc::as_ptr(&self.layer);
|
||||
let right = Arc::as_ptr(&other.layer);
|
||||
tracing::info!(?left, ?right, "comparing ptr_eq");
|
||||
}
|
||||
|
||||
#[allow(clippy::vtable_address_comparisons)]
|
||||
Arc::ptr_eq(&self.layer, &other.layer)
|
||||
}
|
||||
@@ -250,15 +257,32 @@ where
|
||||
L: ?Sized + Layer,
|
||||
{
|
||||
///
|
||||
/// Find the latest layer that covers the given 'key', with lsn <
|
||||
/// 'end_lsn'.
|
||||
/// Find the latest layer (by lsn.end) that covers the given
|
||||
/// 'key', with lsn.start < 'end_lsn'.
|
||||
///
|
||||
/// Returns the layer, if any, and an 'lsn_floor' value that
|
||||
/// indicates which portion of the layer the caller should
|
||||
/// check. 'lsn_floor' is normally the start-LSN of the layer, but
|
||||
/// can be greater if there is an overlapping layer that might
|
||||
/// contain the version, even if it's missing from the returned
|
||||
/// layer.
|
||||
/// The caller of this function is the page reconstruction
|
||||
/// algorithm looking for the next relevant delta layer, or
|
||||
/// the terminal image layer. The caller will pass the lsn_floor
|
||||
/// value as end_lsn in the next call to search.
|
||||
///
|
||||
/// If there's an image layer exactly below the given end_lsn,
|
||||
/// search should return that layer regardless if there are
|
||||
/// overlapping deltas.
|
||||
///
|
||||
/// If the latest layer is a delta and there is an overlapping
|
||||
/// image with it below, the lsn_floor returned should be right
|
||||
/// above that image so we don't skip it in the search. Otherwise
|
||||
/// the lsn_floor returned should be the bottom of the delta layer
|
||||
/// because we should make as much progress down the lsn axis
|
||||
/// as possible. It's fine if this way we skip some overlapping
|
||||
/// deltas, because the delta we returned would contain the same
|
||||
/// wal content.
|
||||
///
|
||||
/// TODO: This API is convoluted and inefficient. If the caller
|
||||
/// makes N search calls, we'll end up finding the same latest
|
||||
/// image layer N times. We should either cache the latest image
|
||||
/// layer result, or simplify the api to `get_latest_image` and
|
||||
/// `get_latest_delta`, and only call `get_latest_image` once.
|
||||
///
|
||||
/// NOTE: This only searches the 'historic' layers, *not* the
|
||||
/// 'open' and 'frozen' layers!
|
||||
@@ -401,7 +425,9 @@ where
|
||||
NUM_ONDISK_LAYERS.dec();
|
||||
}
|
||||
|
||||
/// Is there a newer image layer for given key- and LSN-range?
|
||||
/// Is there a newer image layer for given key- and LSN-range? Or a set
|
||||
/// of image layers within the specified lsn range that cover the entire
|
||||
/// specified key range?
|
||||
///
|
||||
/// This is used for garbage collection, to determine if an old layer can
|
||||
/// be deleted.
|
||||
@@ -488,8 +514,8 @@ where
|
||||
|
||||
///
|
||||
/// Divide the whole given range of keys into sub-ranges based on the latest
|
||||
/// image layer that covers each range. (This is used when creating new
|
||||
/// image layers)
|
||||
/// image layer that covers each range at the specified lsn (inclusive).
|
||||
/// This is used when creating new image layers.
|
||||
///
|
||||
// FIXME: clippy complains that the result type is very complex. She's probably
|
||||
// right...
|
||||
@@ -541,8 +567,15 @@ where
|
||||
Ok(ranges)
|
||||
}
|
||||
|
||||
/// Count how many L1 delta layers there are that overlap with the
|
||||
/// given key and LSN range.
|
||||
/// Count the height of the tallest stack of deltas in this 2d region.
|
||||
///
|
||||
/// This number is used to compute the largest number of deltas that
|
||||
/// we'll need to visit for any page reconstruction in this region.
|
||||
/// We use this heuristic to decide whether to create an image layer.
|
||||
///
|
||||
/// TODO currently we just return the total number of deltas in the
|
||||
/// region, no matter if they're stacked on top of each other
|
||||
/// or next to each other.
|
||||
pub fn count_deltas(&self, key_range: &Range<Key>, lsn_range: &Range<Lsn>) -> Result<usize> {
|
||||
let mut result = 0;
|
||||
if lsn_range.start >= lsn_range.end {
|
||||
|
||||
@@ -23,7 +23,13 @@ use tracing::*;
|
||||
pub struct ModelInputs {
|
||||
updates: Vec<Update>,
|
||||
retention_period: u64,
|
||||
|
||||
/// Relevant lsns per timeline.
|
||||
///
|
||||
/// This field is not required for deserialization purposes, which is mostly used in tests. The
|
||||
/// LSNs explain the outcome (updates) but are not needed in size calculation.
|
||||
#[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
|
||||
#[serde(default)]
|
||||
timeline_inputs: HashMap<TimelineId, TimelineInputs>,
|
||||
}
|
||||
|
||||
@@ -32,6 +38,8 @@ pub struct ModelInputs {
|
||||
#[serde_with::serde_as]
|
||||
#[derive(Debug, serde::Serialize, serde::Deserialize)]
|
||||
struct TimelineInputs {
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
ancestor_lsn: Lsn,
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
last_record: Lsn,
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
@@ -178,19 +186,20 @@ pub(super) async fn gather_inputs(
|
||||
// our advantage with `?` error handling.
|
||||
let mut joinset = tokio::task::JoinSet::new();
|
||||
|
||||
let timelines = tenant
|
||||
// refresh is needed to update gc related pitr_cutoff and horizon_cutoff
|
||||
tenant
|
||||
.refresh_gc_info()
|
||||
.await
|
||||
.context("Failed to refresh gc_info before gathering inputs")?;
|
||||
|
||||
let timelines = tenant.list_timelines();
|
||||
|
||||
if timelines.is_empty() {
|
||||
// All timelines are below tenant's gc_horizon; alternative would be to use
|
||||
// Tenant::list_timelines but then those gc_info's would not be updated yet, possibly
|
||||
// missing GcInfo::retain_lsns or having obsolete values for cutoff's.
|
||||
// perhaps the tenant has just been created, and as such doesn't have any data yet
|
||||
return Ok(ModelInputs {
|
||||
updates: vec![],
|
||||
retention_period: 0,
|
||||
timeline_inputs: HashMap::new(),
|
||||
timeline_inputs: HashMap::default(),
|
||||
});
|
||||
}
|
||||
|
||||
@@ -201,13 +210,25 @@ pub(super) async fn gather_inputs(
|
||||
|
||||
let mut updates = Vec::new();
|
||||
|
||||
// record the per timline values used to determine `retention_period`
|
||||
// record the per timeline values useful to debug the model inputs, also used to track
|
||||
// ancestor_lsn without keeping a hold of Timeline
|
||||
let mut timeline_inputs = HashMap::with_capacity(timelines.len());
|
||||
|
||||
// used to determine the `retention_period` for the size model
|
||||
let mut max_cutoff_distance = None;
|
||||
|
||||
// mapping from (TimelineId, Lsn) => if this branch point has been handled already via
|
||||
// GcInfo::retain_lsns or if it needs to have its logical_size calculated.
|
||||
let mut referenced_branch_froms = HashMap::<(TimelineId, Lsn), bool>::new();
|
||||
|
||||
for timeline in timelines {
|
||||
if !timeline.is_active() {
|
||||
anyhow::bail!(
|
||||
"timeline {} is not active, cannot calculate tenant_size now",
|
||||
timeline.timeline_id
|
||||
);
|
||||
}
|
||||
|
||||
let last_record_lsn = timeline.get_last_record_lsn();
|
||||
|
||||
let (interesting_lsns, horizon_cutoff, pitr_cutoff, next_gc_cutoff) = {
|
||||
@@ -273,13 +294,30 @@ pub(super) async fn gather_inputs(
|
||||
|
||||
// all timelines branch from something, because it might be impossible to pinpoint
|
||||
// which is the tenant_size_model's "default" branch.
|
||||
|
||||
let ancestor_lsn = timeline.get_ancestor_lsn();
|
||||
|
||||
updates.push(Update {
|
||||
lsn: timeline.get_ancestor_lsn(),
|
||||
lsn: ancestor_lsn,
|
||||
command: Command::BranchFrom(timeline.get_ancestor_timeline_id()),
|
||||
timeline_id: timeline.timeline_id,
|
||||
});
|
||||
|
||||
if let Some(parent_timeline_id) = timeline.get_ancestor_timeline_id() {
|
||||
// refresh_gc_info will update branchpoints and pitr_cutoff but only do it for branches
|
||||
// which are over gc_horizon. for example, a "main" branch which never received any
|
||||
// updates apart from initdb not have branch points recorded.
|
||||
referenced_branch_froms
|
||||
.entry((parent_timeline_id, timeline.get_ancestor_lsn()))
|
||||
.or_default();
|
||||
}
|
||||
|
||||
for (lsn, _kind) in &interesting_lsns {
|
||||
// mark this visited so don't need to re-process this parent
|
||||
*referenced_branch_froms
|
||||
.entry((timeline.timeline_id, *lsn))
|
||||
.or_default() = true;
|
||||
|
||||
if let Some(size) = logical_size_cache.get(&(timeline.timeline_id, *lsn)) {
|
||||
updates.push(Update {
|
||||
lsn: *lsn,
|
||||
@@ -295,22 +333,10 @@ pub(super) async fn gather_inputs(
|
||||
}
|
||||
}
|
||||
|
||||
// all timelines also have an end point if they have made any progress
|
||||
if last_record_lsn > timeline.get_ancestor_lsn()
|
||||
&& !interesting_lsns
|
||||
.iter()
|
||||
.any(|(lsn, _)| lsn == &last_record_lsn)
|
||||
{
|
||||
updates.push(Update {
|
||||
lsn: last_record_lsn,
|
||||
command: Command::EndOfBranch,
|
||||
timeline_id: timeline.timeline_id,
|
||||
});
|
||||
}
|
||||
|
||||
timeline_inputs.insert(
|
||||
timeline.timeline_id,
|
||||
TimelineInputs {
|
||||
ancestor_lsn,
|
||||
last_record: last_record_lsn,
|
||||
// this is not used above, because it might not have updated recently enough
|
||||
latest_gc_cutoff: *timeline.get_latest_gc_cutoff_lsn(),
|
||||
@@ -321,6 +347,80 @@ pub(super) async fn gather_inputs(
|
||||
);
|
||||
}
|
||||
|
||||
// iterate over discovered branch points and make sure we are getting logical sizes at those
|
||||
// points.
|
||||
for ((timeline_id, lsn), handled) in referenced_branch_froms.iter() {
|
||||
if *handled {
|
||||
continue;
|
||||
}
|
||||
|
||||
let timeline_id = *timeline_id;
|
||||
let lsn = *lsn;
|
||||
|
||||
match timeline_inputs.get(&timeline_id) {
|
||||
Some(inputs) if inputs.ancestor_lsn == lsn => {
|
||||
// we don't need an update at this branch point which is also point where
|
||||
// timeline_id branch was branched from.
|
||||
continue;
|
||||
}
|
||||
Some(_) => {}
|
||||
None => {
|
||||
// we should have this because we have iterated through all of the timelines
|
||||
anyhow::bail!("missing timeline_input for {timeline_id}")
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(size) = logical_size_cache.get(&(timeline_id, lsn)) {
|
||||
updates.push(Update {
|
||||
lsn,
|
||||
timeline_id,
|
||||
command: Command::Update(*size),
|
||||
});
|
||||
|
||||
needed_cache.insert((timeline_id, lsn));
|
||||
} else {
|
||||
let timeline = tenant
|
||||
.get_timeline(timeline_id, false)
|
||||
.context("find referenced ancestor timeline")?;
|
||||
let parallel_size_calcs = Arc::clone(limit);
|
||||
joinset.spawn(calculate_logical_size(
|
||||
parallel_size_calcs,
|
||||
timeline.clone(),
|
||||
lsn,
|
||||
));
|
||||
|
||||
if let Some(parent_id) = timeline.get_ancestor_timeline_id() {
|
||||
// we should not find new ones because we iterated tenants all timelines
|
||||
anyhow::ensure!(
|
||||
timeline_inputs.contains_key(&parent_id),
|
||||
"discovered new timeline {parent_id} (parent of {timeline_id})"
|
||||
);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// finally add in EndOfBranch for all timelines where their last_record_lsn is not a branch
|
||||
// point. this is needed by the model.
|
||||
for (timeline_id, inputs) in timeline_inputs.iter() {
|
||||
let lsn = inputs.last_record;
|
||||
|
||||
if referenced_branch_froms.contains_key(&(*timeline_id, lsn)) {
|
||||
// this means that the (timeline_id, last_record_lsn) represents a branch point
|
||||
// we do not want to add EndOfBranch updates for these points because it doesn't fit
|
||||
// into the current tenant_size_model.
|
||||
continue;
|
||||
}
|
||||
|
||||
if lsn > inputs.ancestor_lsn {
|
||||
// all timelines also have an end point if they have made any progress
|
||||
updates.push(Update {
|
||||
lsn,
|
||||
command: Command::EndOfBranch,
|
||||
timeline_id: *timeline_id,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let mut have_any_error = false;
|
||||
|
||||
while let Some(res) = joinset.join_next().await {
|
||||
@@ -379,6 +479,7 @@ pub(super) async fn gather_inputs(
|
||||
// handled by the variant order in `Command`.
|
||||
//
|
||||
updates.sort_unstable();
|
||||
|
||||
// And another sort to handle Command::BranchFrom ordering
|
||||
// in case when there are multiple branches at the same LSN.
|
||||
let sorted_updates = sort_updates_in_tree_order(updates)?;
|
||||
@@ -413,10 +514,10 @@ impl ModelInputs {
|
||||
let Lsn(now) = *lsn;
|
||||
match op {
|
||||
Command::Update(sz) => {
|
||||
storage.insert_point(&Some(*timeline_id), "".into(), now, Some(*sz));
|
||||
storage.insert_point(&Some(*timeline_id), "".into(), now, Some(*sz))?;
|
||||
}
|
||||
Command::EndOfBranch => {
|
||||
storage.insert_point(&Some(*timeline_id), "".into(), now, None);
|
||||
storage.insert_point(&Some(*timeline_id), "".into(), now, None)?;
|
||||
}
|
||||
Command::BranchFrom(parent) => {
|
||||
// This branch command may fail if it cannot find a parent to branch from.
|
||||
@@ -425,7 +526,7 @@ impl ModelInputs {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(storage.calculate(self.retention_period).total_children())
|
||||
Ok(storage.calculate(self.retention_period)?.total_children())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -574,7 +675,10 @@ fn updates_sort() {
|
||||
fn verify_size_for_multiple_branches() {
|
||||
// this is generated from integration test test_tenant_size_with_multiple_branches, but this way
|
||||
// it has the stable lsn's
|
||||
let doc = r#"{"updates":[{"lsn":"0/0","command":{"branch_from":null},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/176FA40","command":{"update":25763840},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/176FA40","command":{"branch_from":"cd9d9409c216e64bf580904facedb01b"},"timeline_id":"10b532a550540bc15385eac4edde416a"},{"lsn":"0/1819818","command":{"update":26075136},"timeline_id":"10b532a550540bc15385eac4edde416a"},{"lsn":"0/18B5E40","command":{"update":26427392},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/18D3DF0","command":{"update":26492928},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/18D3DF0","command":{"branch_from":"cd9d9409c216e64bf580904facedb01b"},"timeline_id":"230fc9d756f7363574c0d66533564dcc"},{"lsn":"0/220F438","command":{"update":25239552},"timeline_id":"230fc9d756f7363574c0d66533564dcc"}],"retention_period":131072,"timeline_inputs":{"cd9d9409c216e64bf580904facedb01b":{"last_record":"0/18D5E40","latest_gc_cutoff":"0/169ACF0","horizon_cutoff":"0/18B5E40","pitr_cutoff":"0/18B5E40","next_gc_cutoff":"0/18B5E40"},"10b532a550540bc15385eac4edde416a":{"last_record":"0/1839818","latest_gc_cutoff":"0/169ACF0","horizon_cutoff":"0/1819818","pitr_cutoff":"0/1819818","next_gc_cutoff":"0/1819818"},"230fc9d756f7363574c0d66533564dcc":{"last_record":"0/222F438","latest_gc_cutoff":"0/169ACF0","horizon_cutoff":"0/220F438","pitr_cutoff":"0/220F438","next_gc_cutoff":"0/220F438"}}}"#;
|
||||
//
|
||||
// timelineinputs have been left out, because those explain the inputs, but don't participate
|
||||
// in further size calculations.
|
||||
let doc = r#"{"updates":[{"lsn":"0/0","command":{"branch_from":null},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/176FA40","command":{"update":25763840},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/176FA40","command":{"branch_from":"cd9d9409c216e64bf580904facedb01b"},"timeline_id":"10b532a550540bc15385eac4edde416a"},{"lsn":"0/1819818","command":{"update":26075136},"timeline_id":"10b532a550540bc15385eac4edde416a"},{"lsn":"0/18B5E40","command":{"update":26427392},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/18D3DF0","command":{"update":26492928},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/18D3DF0","command":{"branch_from":"cd9d9409c216e64bf580904facedb01b"},"timeline_id":"230fc9d756f7363574c0d66533564dcc"},{"lsn":"0/220F438","command":{"update":25239552},"timeline_id":"230fc9d756f7363574c0d66533564dcc"}],"retention_period":131072}"#;
|
||||
|
||||
let inputs: ModelInputs = serde_json::from_str(doc).unwrap();
|
||||
|
||||
|
||||
@@ -52,6 +52,8 @@ async fn compaction_loop(tenant_id: TenantId) {
|
||||
info!("starting");
|
||||
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
|
||||
async {
|
||||
let mut first = true;
|
||||
|
||||
loop {
|
||||
trace!("waking up");
|
||||
|
||||
@@ -68,10 +70,14 @@ async fn compaction_loop(tenant_id: TenantId) {
|
||||
|
||||
let mut sleep_duration = tenant.get_compaction_period();
|
||||
if sleep_duration == Duration::ZERO {
|
||||
info!("automatic compaction is disabled");
|
||||
if first {
|
||||
info!("automatic compaction is disabled");
|
||||
}
|
||||
first = false;
|
||||
// check again in 10 seconds, in case it's been enabled again.
|
||||
sleep_duration = Duration::from_secs(10);
|
||||
} else {
|
||||
first = true;
|
||||
// Run compaction
|
||||
if let Err(e) = tenant.compaction_iteration().await {
|
||||
sleep_duration = wait_duration;
|
||||
@@ -103,6 +109,7 @@ async fn gc_loop(tenant_id: TenantId) {
|
||||
info!("starting");
|
||||
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
|
||||
async {
|
||||
let mut first = true;
|
||||
loop {
|
||||
trace!("waking up");
|
||||
|
||||
@@ -121,10 +128,14 @@ async fn gc_loop(tenant_id: TenantId) {
|
||||
let gc_horizon = tenant.get_gc_horizon();
|
||||
let mut sleep_duration = gc_period;
|
||||
if sleep_duration == Duration::ZERO {
|
||||
info!("automatic GC is disabled");
|
||||
if first {
|
||||
info!("automatic GC is disabled");
|
||||
}
|
||||
first = false;
|
||||
// check again in 10 seconds, in case it's been enabled again.
|
||||
sleep_duration = Duration::from_secs(10);
|
||||
} else {
|
||||
first = true;
|
||||
// Run gc
|
||||
if gc_horizon > 0 {
|
||||
if let Err(e) = tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval()).await
|
||||
|
||||
@@ -15,7 +15,7 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
|
||||
use std::cmp::{max, min, Ordering};
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::fs;
|
||||
use std::ops::{Deref, Range};
|
||||
use std::path::{Path, PathBuf};
|
||||
@@ -75,6 +75,9 @@ enum FlushLoopState {
|
||||
Exited,
|
||||
}
|
||||
|
||||
pub static PENDING_NOWS: once_cell::sync::Lazy<Mutex<VecDeque<SystemTime>>> =
|
||||
once_cell::sync::Lazy::new(|| Default::default());
|
||||
|
||||
pub struct Timeline {
|
||||
conf: &'static PageServerConf,
|
||||
tenant_conf: Arc<RwLock<TenantConfOpt>>,
|
||||
@@ -1370,9 +1373,13 @@ impl Timeline {
|
||||
let self_calculation = Arc::clone(self);
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let blocking_span = tracing::info_span!("blocking");
|
||||
|
||||
let calculation = async {
|
||||
let cancel = cancel.child_token();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
// spans cannot be automatically moved/hoisted to spawn_blocking, do that manually
|
||||
let _entered = blocking_span.entered();
|
||||
// Run in a separate thread since this can do a lot of
|
||||
// synchronous file IO without .await inbetween
|
||||
// if there are no RemoteLayers that would require downloading.
|
||||
@@ -2623,6 +2630,10 @@ impl Timeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn force_next_now(next: SystemTime) {
|
||||
PENDING_NOWS.lock().unwrap().push_back(next)
|
||||
}
|
||||
|
||||
/// Update information about which layer files need to be retained on
|
||||
/// garbage collection. This is separate from actually performing the GC,
|
||||
/// and is updated more frequently, so that compaction can remove obsolete
|
||||
@@ -2670,10 +2681,28 @@ impl Timeline {
|
||||
// work, so avoid calling it altogether if time-based retention is not
|
||||
// configured. It would be pointless anyway.
|
||||
let pitr_cutoff = if pitr != Duration::ZERO {
|
||||
let now = SystemTime::now();
|
||||
let now = PENDING_NOWS.lock().unwrap().pop_front();
|
||||
let now = if let Some(now) = now {
|
||||
let dt = chrono::DateTime::<chrono::Utc>::from(now);
|
||||
let dt = dt.to_rfc3339_opts(chrono::SecondsFormat::Micros, true);
|
||||
tracing::warn!(now = dt, "using forced now");
|
||||
now
|
||||
} else {
|
||||
SystemTime::now()
|
||||
};
|
||||
|
||||
if let Some(pitr_cutoff_timestamp) = now.checked_sub(pitr) {
|
||||
let pitr_timestamp = to_pg_timestamp(pitr_cutoff_timestamp);
|
||||
|
||||
{
|
||||
let dt = chrono::DateTime::<chrono::Utc>::from(now);
|
||||
let dt = dt.to_rfc3339_opts(chrono::SecondsFormat::Micros, true);
|
||||
info!(
|
||||
?pitr,
|
||||
pitr_cutoff_timestamp = dt,
|
||||
"searching lsn for timestamp"
|
||||
);
|
||||
}
|
||||
match self.find_lsn_for_timestamp(pitr_timestamp).await? {
|
||||
LsnForTimestamp::Present(lsn) => lsn,
|
||||
LsnForTimestamp::Future(lsn) => {
|
||||
|
||||
@@ -626,24 +626,20 @@ impl PostgresRedoProcess {
|
||||
|
||||
// Create empty data directory for wal-redo postgres, deleting old one first.
|
||||
if datadir.exists() {
|
||||
info!(
|
||||
"old temporary datadir {} exists, removing",
|
||||
datadir.display()
|
||||
);
|
||||
fs::remove_dir_all(&datadir)?;
|
||||
info!("old temporary datadir {datadir:?} exists, removing");
|
||||
fs::remove_dir_all(&datadir).map_err(|e| {
|
||||
Error::new(
|
||||
e.kind(),
|
||||
format!("Old temporary dir {datadir:?} removal failure: {e}"),
|
||||
)
|
||||
})?;
|
||||
}
|
||||
let pg_bin_dir_path = conf.pg_bin_dir(pg_version).map_err(|e| {
|
||||
Error::new(
|
||||
ErrorKind::Other,
|
||||
format!("incorrect pg_bin_dir path: {}", e),
|
||||
)
|
||||
})?;
|
||||
let pg_lib_dir_path = conf.pg_lib_dir(pg_version).map_err(|e| {
|
||||
Error::new(
|
||||
ErrorKind::Other,
|
||||
format!("incorrect pg_lib_dir path: {}", e),
|
||||
)
|
||||
})?;
|
||||
let pg_bin_dir_path = conf
|
||||
.pg_bin_dir(pg_version)
|
||||
.map_err(|e| Error::new(ErrorKind::Other, format!("incorrect pg_bin_dir path: {e}")))?;
|
||||
let pg_lib_dir_path = conf
|
||||
.pg_lib_dir(pg_version)
|
||||
.map_err(|e| Error::new(ErrorKind::Other, format!("incorrect pg_lib_dir path: {e}")))?;
|
||||
|
||||
info!("running initdb in {}", datadir.display());
|
||||
let initdb = Command::new(pg_bin_dir_path.join("initdb"))
|
||||
|
||||
@@ -1206,6 +1206,9 @@ class PageserverHttpClient(requests.Session):
|
||||
return res_json
|
||||
|
||||
def tenant_size(self, tenant_id: TenantId) -> int:
|
||||
return self.tenant_size_and_modelinputs(tenant_id)[0]
|
||||
|
||||
def tenant_size_and_modelinputs(self, tenant_id: TenantId) -> Tuple[int, Dict[str, Any]]:
|
||||
"""
|
||||
Returns the tenant size, together with the model inputs as the second tuple item.
|
||||
"""
|
||||
@@ -1216,9 +1219,9 @@ class PageserverHttpClient(requests.Session):
|
||||
assert TenantId(res["id"]) == tenant_id
|
||||
size = res["size"]
|
||||
assert type(size) == int
|
||||
# there are additional inputs, which are the collected raw information before being fed to the tenant_size_model
|
||||
# there are no tests for those right now.
|
||||
return size
|
||||
inputs = res["inputs"]
|
||||
assert type(inputs) is dict
|
||||
return (size, inputs)
|
||||
|
||||
def timeline_list(
|
||||
self,
|
||||
|
||||
@@ -194,7 +194,7 @@ def test_sync_safekeepers_logs(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
)
|
||||
except TimeoutExpired as exc:
|
||||
ctl_logs = (exc.stderr or b"").decode("utf-8")
|
||||
log.info("compute_ctl output:\n{ctl_logs}")
|
||||
log.info(f"compute_ctl stderr:\n{ctl_logs}")
|
||||
|
||||
with ExternalProcessManager(Path(pgdata) / "postmaster.pid"):
|
||||
start = "starting safekeepers syncing"
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
from typing import List, Tuple
|
||||
from typing import Any, List, Tuple
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, wait_for_last_flush_lsn
|
||||
from fixtures.types import Lsn
|
||||
@@ -9,28 +10,247 @@ def test_empty_tenant_size(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
(tenant_id, _) = env.neon_cli.create_tenant()
|
||||
http_client = env.pageserver.http_client()
|
||||
size = http_client.tenant_size(tenant_id)
|
||||
initial_size = http_client.tenant_size(tenant_id)
|
||||
|
||||
# we should never have zero, because there should be the initdb however
|
||||
# this is questionable if we should have anything in this case, as the
|
||||
# gc_cutoff is negative
|
||||
assert (
|
||||
size == 0
|
||||
), "initial implementation returns zero tenant_size before last_record_lsn is past gc_horizon"
|
||||
# we should never have zero, because there should be the initdb "changes"
|
||||
assert initial_size > 0, "initial implementation returns ~initdb tenant_size"
|
||||
|
||||
with env.postgres.create_start("main", tenant_id=tenant_id) as pg:
|
||||
main_branch_name = "main"
|
||||
|
||||
with env.postgres.create_start(
|
||||
main_branch_name,
|
||||
tenant_id=tenant_id,
|
||||
config_lines=["autovacuum=off", "checkpoint_timeout=10min"],
|
||||
) as pg:
|
||||
with pg.cursor() as cur:
|
||||
cur.execute("SELECT 1")
|
||||
row = cur.fetchone()
|
||||
assert row is not None
|
||||
assert row[0] == 1
|
||||
size = http_client.tenant_size(tenant_id)
|
||||
assert size == 0, "starting idle compute should not change the tenant size"
|
||||
# we've disabled the autovacuum and checkpoint
|
||||
# so background processes should not change the size.
|
||||
# If this test will flake we should probably loosen the check
|
||||
assert size == initial_size, "starting idle compute should not change the tenant size"
|
||||
|
||||
# the size should be the same, until we increase the size over the
|
||||
# gc_horizon
|
||||
size = http_client.tenant_size(tenant_id)
|
||||
assert size == 0, "tenant_size should not be affected by shutdown of compute"
|
||||
size, inputs = http_client.tenant_size_and_modelinputs(tenant_id)
|
||||
assert size == initial_size, "tenant_size should not be affected by shutdown of compute"
|
||||
|
||||
expected_commands: List[Any] = [{"branch_from": None}, "end_of_branch"]
|
||||
actual_commands: List[Any] = list(map(lambda x: x["command"], inputs["updates"])) # type: ignore
|
||||
assert actual_commands == expected_commands
|
||||
|
||||
|
||||
def test_branched_empty_timeline_size(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Issue found in production. Because the ancestor branch was under
|
||||
gc_horizon, the branchpoint was "dangling" and the computation could not be
|
||||
done.
|
||||
|
||||
Assuming gc_horizon = 50
|
||||
root: I 0---10------>20
|
||||
branch: |-------------------I---------->150
|
||||
gc_horizon
|
||||
"""
|
||||
env = neon_simple_env
|
||||
(tenant_id, _) = env.neon_cli.create_tenant()
|
||||
http_client = env.pageserver.http_client()
|
||||
|
||||
initial_size = http_client.tenant_size(tenant_id)
|
||||
|
||||
first_branch_timeline_id = env.neon_cli.create_branch("first-branch", tenant_id=tenant_id)
|
||||
|
||||
with env.postgres.create_start("first-branch", tenant_id=tenant_id) as pg:
|
||||
with pg.cursor() as cur:
|
||||
cur.execute(
|
||||
"CREATE TABLE t0 AS SELECT i::bigint n FROM generate_series(0, 1000000) s(i)"
|
||||
)
|
||||
wait_for_last_flush_lsn(env, pg, tenant_id, first_branch_timeline_id)
|
||||
|
||||
size_after_branching = http_client.tenant_size(tenant_id)
|
||||
log.info(f"size_after_branching: {size_after_branching}")
|
||||
|
||||
assert size_after_branching > initial_size
|
||||
|
||||
|
||||
def test_branched_from_many_empty_parents_size(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
More general version of test_branched_empty_timeline_size
|
||||
|
||||
Assuming gc_horizon = 50
|
||||
|
||||
root: I 0------10
|
||||
first: I 10
|
||||
nth_0: I 10
|
||||
nth_1: I 10
|
||||
nth_n: 10------------I--------100
|
||||
"""
|
||||
env = neon_simple_env
|
||||
(tenant_id, _) = env.neon_cli.create_tenant()
|
||||
http_client = env.pageserver.http_client()
|
||||
|
||||
initial_size = http_client.tenant_size(tenant_id)
|
||||
|
||||
first_branch_name = "first"
|
||||
env.neon_cli.create_branch(first_branch_name, tenant_id=tenant_id)
|
||||
|
||||
size_after_branching = http_client.tenant_size(tenant_id)
|
||||
|
||||
# this might be flaky like test_get_tenant_size_with_multiple_branches
|
||||
# https://github.com/neondatabase/neon/issues/2962
|
||||
assert size_after_branching == initial_size
|
||||
|
||||
last_branch_name = first_branch_name
|
||||
last_branch = None
|
||||
|
||||
for i in range(0, 4):
|
||||
latest_branch_name = f"nth_{i}"
|
||||
last_branch = env.neon_cli.create_branch(
|
||||
latest_branch_name, ancestor_branch_name=last_branch_name, tenant_id=tenant_id
|
||||
)
|
||||
last_branch_name = latest_branch_name
|
||||
|
||||
size_after_branching = http_client.tenant_size(tenant_id)
|
||||
assert size_after_branching == initial_size
|
||||
|
||||
assert last_branch is not None
|
||||
|
||||
with env.postgres.create_start(last_branch_name, tenant_id=tenant_id) as pg:
|
||||
with pg.cursor() as cur:
|
||||
cur.execute(
|
||||
"CREATE TABLE t0 AS SELECT i::bigint n FROM generate_series(0, 1000000) s(i)"
|
||||
)
|
||||
wait_for_last_flush_lsn(env, pg, tenant_id, last_branch)
|
||||
|
||||
size_after_writes = http_client.tenant_size(tenant_id)
|
||||
assert size_after_writes > initial_size
|
||||
|
||||
|
||||
@pytest.mark.skip("This should work, but is left out because assumed covered by other tests")
|
||||
def test_branch_point_within_horizon(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
gc_horizon = 15
|
||||
|
||||
main: 0--I-10------>20
|
||||
branch: |-------------------I---------->150
|
||||
gc_horizon
|
||||
"""
|
||||
|
||||
env = neon_simple_env
|
||||
gc_horizon = 20_000
|
||||
(tenant_id, main_id) = env.neon_cli.create_tenant(conf={"gc_horizon": str(gc_horizon)})
|
||||
http_client = env.pageserver.http_client()
|
||||
|
||||
with env.postgres.create_start("main", tenant_id=tenant_id) as pg:
|
||||
initdb_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, main_id)
|
||||
with pg.cursor() as cur:
|
||||
cur.execute("CREATE TABLE t0 AS SELECT i::bigint n FROM generate_series(0, 1000) s(i)")
|
||||
flushed_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, main_id)
|
||||
|
||||
size_before_branching = http_client.tenant_size(tenant_id)
|
||||
|
||||
assert flushed_lsn.lsn_int - gc_horizon > initdb_lsn.lsn_int
|
||||
|
||||
branch_id = env.neon_cli.create_branch(
|
||||
"branch", tenant_id=tenant_id, ancestor_start_lsn=flushed_lsn
|
||||
)
|
||||
|
||||
with env.postgres.create_start("branch", tenant_id=tenant_id) as pg:
|
||||
with pg.cursor() as cur:
|
||||
cur.execute("CREATE TABLE t1 AS SELECT i::bigint n FROM generate_series(0, 1000) s(i)")
|
||||
wait_for_last_flush_lsn(env, pg, tenant_id, branch_id)
|
||||
|
||||
size_after = http_client.tenant_size(tenant_id)
|
||||
|
||||
assert size_before_branching < size_after
|
||||
|
||||
|
||||
@pytest.mark.skip("This should work, but is left out because assumed covered by other tests")
|
||||
def test_parent_within_horizon(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
gc_horizon = 5
|
||||
|
||||
main: 0----10----I->20
|
||||
branch: |-------------------I---------->150
|
||||
gc_horizon
|
||||
"""
|
||||
|
||||
env = neon_simple_env
|
||||
gc_horizon = 200_000
|
||||
(tenant_id, main_id) = env.neon_cli.create_tenant(conf={"gc_horizon": str(gc_horizon)})
|
||||
http_client = env.pageserver.http_client()
|
||||
|
||||
with env.postgres.create_start("main", tenant_id=tenant_id) as pg:
|
||||
initdb_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, main_id)
|
||||
with pg.cursor() as cur:
|
||||
cur.execute("CREATE TABLE t0 AS SELECT i::bigint n FROM generate_series(0, 1000) s(i)")
|
||||
|
||||
flushed_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, main_id)
|
||||
|
||||
with pg.cursor() as cur:
|
||||
cur.execute("CREATE TABLE t00 AS SELECT i::bigint n FROM generate_series(0, 2000) s(i)")
|
||||
|
||||
wait_for_last_flush_lsn(env, pg, tenant_id, main_id)
|
||||
|
||||
size_before_branching = http_client.tenant_size(tenant_id)
|
||||
|
||||
assert flushed_lsn.lsn_int - gc_horizon > initdb_lsn.lsn_int
|
||||
|
||||
branch_id = env.neon_cli.create_branch(
|
||||
"branch", tenant_id=tenant_id, ancestor_start_lsn=flushed_lsn
|
||||
)
|
||||
|
||||
with env.postgres.create_start("branch", tenant_id=tenant_id) as pg:
|
||||
with pg.cursor() as cur:
|
||||
cur.execute("CREATE TABLE t1 AS SELECT i::bigint n FROM generate_series(0, 10000) s(i)")
|
||||
wait_for_last_flush_lsn(env, pg, tenant_id, branch_id)
|
||||
|
||||
size_after = http_client.tenant_size(tenant_id)
|
||||
|
||||
assert size_before_branching < size_after
|
||||
|
||||
|
||||
@pytest.mark.skip("This should work, but is left out because assumed covered by other tests")
|
||||
def test_only_heads_within_horizon(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
gc_horizon = small
|
||||
|
||||
main: 0--------10-----I>20
|
||||
first: |-----------------------------I>150
|
||||
second: |---------I>30
|
||||
"""
|
||||
|
||||
env = neon_simple_env
|
||||
(tenant_id, main_id) = env.neon_cli.create_tenant(conf={"gc_horizon": "1024"})
|
||||
http_client = env.pageserver.http_client()
|
||||
|
||||
initial_size = http_client.tenant_size(tenant_id)
|
||||
|
||||
first_id = env.neon_cli.create_branch("first", tenant_id=tenant_id)
|
||||
second_id = env.neon_cli.create_branch("second", tenant_id=tenant_id)
|
||||
|
||||
ids = {"main": main_id, "first": first_id, "second": second_id}
|
||||
|
||||
latest_size = None
|
||||
|
||||
# gc is not expected to change the results
|
||||
|
||||
for branch_name, amount in [("main", 2000), ("first", 15000), ("second", 3000)]:
|
||||
with env.postgres.create_start(branch_name, tenant_id=tenant_id) as pg:
|
||||
with pg.cursor() as cur:
|
||||
cur.execute(
|
||||
f"CREATE TABLE t0 AS SELECT i::bigint n FROM generate_series(0, {amount}) s(i)"
|
||||
)
|
||||
wait_for_last_flush_lsn(env, pg, tenant_id, ids[branch_name])
|
||||
size_now = http_client.tenant_size(tenant_id)
|
||||
if latest_size is not None:
|
||||
assert size_now > latest_size
|
||||
else:
|
||||
assert size_now > initial_size
|
||||
|
||||
latest_size = size_now
|
||||
|
||||
|
||||
def test_single_branch_get_tenant_size_grows(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
@@ -26,7 +26,7 @@ futures-util = { version = "0.3", features = ["channel", "io", "sink"] }
|
||||
indexmap = { version = "1", default-features = false, features = ["std"] }
|
||||
itertools = { version = "0.10" }
|
||||
libc = { version = "0.2", features = ["extra_traits"] }
|
||||
log = { version = "0.4", default-features = false, features = ["serde", "std"] }
|
||||
log = { version = "0.4", default-features = false, features = ["std"] }
|
||||
memchr = { version = "2" }
|
||||
nom = { version = "7" }
|
||||
num-bigint = { version = "0.4" }
|
||||
@@ -45,6 +45,7 @@ tokio-util = { version = "0.7", features = ["codec", "io"] }
|
||||
tower = { version = "0.4", features = ["balance", "buffer", "limit", "retry", "timeout", "util"] }
|
||||
tracing = { version = "0.1", features = ["log"] }
|
||||
tracing-core = { version = "0.1" }
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
|
||||
url = { version = "2", features = ["serde"] }
|
||||
|
||||
[build-dependencies]
|
||||
@@ -54,7 +55,7 @@ either = { version = "1" }
|
||||
indexmap = { version = "1", default-features = false, features = ["std"] }
|
||||
itertools = { version = "0.10" }
|
||||
libc = { version = "0.2", features = ["extra_traits"] }
|
||||
log = { version = "0.4", default-features = false, features = ["serde", "std"] }
|
||||
log = { version = "0.4", default-features = false, features = ["std"] }
|
||||
memchr = { version = "2" }
|
||||
nom = { version = "7" }
|
||||
prost = { version = "0.11" }
|
||||
|
||||
Reference in New Issue
Block a user