mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-26 01:20:38 +00:00
Compare commits
5 Commits
release-83
...
problame/a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1155945a3e | ||
|
|
42922cebe0 | ||
|
|
69e0c65393 | ||
|
|
2f0677be26 | ||
|
|
062c7b9a76 |
@@ -29,12 +29,13 @@
|
||||
//! ```sh
|
||||
//! compute_ctl -D /var/db/postgres/compute \
|
||||
//! -C 'postgresql://cloud_admin@localhost/postgres' \
|
||||
//! -c /var/db/postgres/configs/config.json \
|
||||
//! -S /var/db/postgres/specs/current.json \
|
||||
//! -b /usr/local/bin/postgres \
|
||||
//! -r http://pg-ext-s3-gateway \
|
||||
//! ```
|
||||
use std::ffi::OsString;
|
||||
use std::fs::File;
|
||||
use std::path::Path;
|
||||
use std::process::exit;
|
||||
use std::sync::mpsc;
|
||||
use std::thread;
|
||||
@@ -42,7 +43,8 @@ use std::time::Duration;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use clap::Parser;
|
||||
use compute_api::responses::ComputeConfig;
|
||||
use compute_api::responses::ComputeCtlConfig;
|
||||
use compute_api::spec::ComputeSpec;
|
||||
use compute_tools::compute::{
|
||||
BUILD_TAG, ComputeNode, ComputeNodeParams, forward_termination_signal,
|
||||
};
|
||||
@@ -116,10 +118,8 @@ struct Cli {
|
||||
#[arg(long)]
|
||||
pub set_disk_quota_for_fs: Option<String>,
|
||||
|
||||
// TODO(tristan957): remove alias after compatibility tests are no longer
|
||||
// an issue
|
||||
#[arg(short = 'c', long, alias = "spec-path")]
|
||||
pub config: Option<OsString>,
|
||||
#[arg(short = 'S', long, group = "spec-path")]
|
||||
pub spec_path: Option<OsString>,
|
||||
|
||||
#[arg(short = 'i', long, group = "compute-id")]
|
||||
pub compute_id: String,
|
||||
@@ -127,9 +127,8 @@ struct Cli {
|
||||
#[arg(
|
||||
short = 'p',
|
||||
long,
|
||||
conflicts_with = "config",
|
||||
value_name = "CONTROL_PLANE_API_BASE_URL",
|
||||
requires = "compute-id"
|
||||
conflicts_with = "spec-path",
|
||||
value_name = "CONTROL_PLANE_API_BASE_URL"
|
||||
)]
|
||||
pub control_plane_uri: Option<String>,
|
||||
}
|
||||
@@ -155,7 +154,7 @@ fn main() -> Result<()> {
|
||||
|
||||
let connstr = Url::parse(&cli.connstr).context("cannot parse connstr as a URL")?;
|
||||
|
||||
let cli_spec = get_config(&cli)?;
|
||||
let cli_spec = try_spec_from_cli(&cli)?;
|
||||
|
||||
let compute_node = ComputeNode::new(
|
||||
ComputeNodeParams {
|
||||
@@ -202,17 +201,27 @@ async fn init() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_config(cli: &Cli) -> Result<ComputeConfig> {
|
||||
// First, read the config from the path if provided
|
||||
if let Some(ref config) = cli.config {
|
||||
let file = File::open(config)?;
|
||||
return Ok(serde_json::from_reader(&file)?);
|
||||
fn try_spec_from_cli(cli: &Cli) -> Result<CliSpecParams> {
|
||||
// First, read spec from the path if provided
|
||||
if let Some(ref spec_path) = cli.spec_path {
|
||||
let file = File::open(Path::new(spec_path))?;
|
||||
return Ok(CliSpecParams {
|
||||
spec: Some(serde_json::from_reader(file)?),
|
||||
compute_ctl_config: ComputeCtlConfig::default(),
|
||||
});
|
||||
}
|
||||
|
||||
// If the config wasn't provided in the CLI arguments, then retrieve it from
|
||||
if cli.control_plane_uri.is_none() {
|
||||
panic!("must specify --control-plane-uri");
|
||||
};
|
||||
|
||||
// If the spec wasn't provided in the CLI arguments, then retrieve it from
|
||||
// the control plane
|
||||
match get_config_from_control_plane(cli.control_plane_uri.as_ref().unwrap(), &cli.compute_id) {
|
||||
Ok(config) => Ok(config),
|
||||
match get_spec_from_control_plane(cli.control_plane_uri.as_ref().unwrap(), &cli.compute_id) {
|
||||
Ok(resp) => Ok(CliSpecParams {
|
||||
spec: resp.0,
|
||||
compute_ctl_config: resp.1,
|
||||
}),
|
||||
Err(e) => {
|
||||
error!(
|
||||
"cannot get response from control plane: {}\n\
|
||||
@@ -224,6 +233,13 @@ fn get_config(cli: &Cli) -> Result<ComputeConfig> {
|
||||
}
|
||||
}
|
||||
|
||||
struct CliSpecParams {
|
||||
/// If a spec was provided via CLI or file, the [`ComputeSpec`]
|
||||
spec: Option<ComputeSpec>,
|
||||
#[allow(dead_code)]
|
||||
compute_ctl_config: ComputeCtlConfig,
|
||||
}
|
||||
|
||||
fn deinit_and_exit(exit_code: Option<i32>) -> ! {
|
||||
// Shutdown trace pipeline gracefully, so that it has a chance to send any
|
||||
// pending traces before we exit. Shutting down OTEL tracing provider may
|
||||
|
||||
@@ -19,13 +19,13 @@ pub(crate) static INSTALLED_EXTENSIONS: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
// but for all our APIs we defined a 'slug'/method/operationId in the OpenAPI spec.
|
||||
// And it's fair to call it a 'RPC' (Remote Procedure Call).
|
||||
pub enum CPlaneRequestRPC {
|
||||
GetConfig,
|
||||
GetSpec,
|
||||
}
|
||||
|
||||
impl CPlaneRequestRPC {
|
||||
pub fn as_str(&self) -> &str {
|
||||
match self {
|
||||
CPlaneRequestRPC::GetConfig => "GetConfig",
|
||||
CPlaneRequestRPC::GetSpec => "GetSpec",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,8 +3,9 @@ use std::path::Path;
|
||||
|
||||
use anyhow::{Result, anyhow, bail};
|
||||
use compute_api::responses::{
|
||||
ComputeConfig, ControlPlaneComputeStatus, ControlPlaneConfigResponse,
|
||||
ComputeCtlConfig, ControlPlaneComputeStatus, ControlPlaneSpecResponse,
|
||||
};
|
||||
use compute_api::spec::ComputeSpec;
|
||||
use reqwest::StatusCode;
|
||||
use tokio_postgres::Client;
|
||||
use tracing::{error, info, instrument};
|
||||
@@ -20,7 +21,7 @@ use crate::params::PG_HBA_ALL_MD5;
|
||||
fn do_control_plane_request(
|
||||
uri: &str,
|
||||
jwt: &str,
|
||||
) -> Result<ControlPlaneConfigResponse, (bool, String, String)> {
|
||||
) -> Result<ControlPlaneSpecResponse, (bool, String, String)> {
|
||||
let resp = reqwest::blocking::Client::new()
|
||||
.get(uri)
|
||||
.header("Authorization", format!("Bearer {}", jwt))
|
||||
@@ -28,14 +29,14 @@ fn do_control_plane_request(
|
||||
.map_err(|e| {
|
||||
(
|
||||
true,
|
||||
format!("could not perform request to control plane: {:?}", e),
|
||||
format!("could not perform spec request to control plane: {:?}", e),
|
||||
UNKNOWN_HTTP_STATUS.to_string(),
|
||||
)
|
||||
})?;
|
||||
|
||||
let status = resp.status();
|
||||
match status {
|
||||
StatusCode::OK => match resp.json::<ControlPlaneConfigResponse>() {
|
||||
StatusCode::OK => match resp.json::<ControlPlaneSpecResponse>() {
|
||||
Ok(spec_resp) => Ok(spec_resp),
|
||||
Err(e) => Err((
|
||||
true,
|
||||
@@ -68,35 +69,40 @@ fn do_control_plane_request(
|
||||
}
|
||||
}
|
||||
|
||||
/// Request config from the control-plane by compute_id. If
|
||||
/// `NEON_CONTROL_PLANE_TOKEN` env variable is set, it will be used for
|
||||
/// authorization.
|
||||
pub fn get_config_from_control_plane(base_uri: &str, compute_id: &str) -> Result<ComputeConfig> {
|
||||
/// Request spec from the control-plane by compute_id. If `NEON_CONTROL_PLANE_TOKEN`
|
||||
/// env variable is set, it will be used for authorization.
|
||||
pub fn get_spec_from_control_plane(
|
||||
base_uri: &str,
|
||||
compute_id: &str,
|
||||
) -> Result<(Option<ComputeSpec>, ComputeCtlConfig)> {
|
||||
let cp_uri = format!("{base_uri}/compute/api/v2/computes/{compute_id}/spec");
|
||||
let jwt: String = std::env::var("NEON_CONTROL_PLANE_TOKEN").unwrap_or_default();
|
||||
let jwt: String = match std::env::var("NEON_CONTROL_PLANE_TOKEN") {
|
||||
Ok(v) => v,
|
||||
Err(_) => "".to_string(),
|
||||
};
|
||||
let mut attempt = 1;
|
||||
|
||||
info!("getting config from control plane: {}", cp_uri);
|
||||
info!("getting spec from control plane: {}", cp_uri);
|
||||
|
||||
// Do 3 attempts to get spec from the control plane using the following logic:
|
||||
// - network error -> then retry
|
||||
// - compute id is unknown or any other error -> bail out
|
||||
// - no spec for compute yet (Empty state) -> return Ok(None)
|
||||
// - got config -> return Ok(Some(config))
|
||||
// - got spec -> return Ok(Some(spec))
|
||||
while attempt < 4 {
|
||||
let result = match do_control_plane_request(&cp_uri, &jwt) {
|
||||
Ok(config_resp) => {
|
||||
Ok(spec_resp) => {
|
||||
CPLANE_REQUESTS_TOTAL
|
||||
.with_label_values(&[
|
||||
CPlaneRequestRPC::GetConfig.as_str(),
|
||||
CPlaneRequestRPC::GetSpec.as_str(),
|
||||
&StatusCode::OK.to_string(),
|
||||
])
|
||||
.inc();
|
||||
match config_resp.status {
|
||||
ControlPlaneComputeStatus::Empty => Ok(config_resp.into()),
|
||||
match spec_resp.status {
|
||||
ControlPlaneComputeStatus::Empty => Ok((None, spec_resp.compute_ctl_config)),
|
||||
ControlPlaneComputeStatus::Attached => {
|
||||
if config_resp.spec.is_some() {
|
||||
Ok(config_resp.into())
|
||||
if let Some(spec) = spec_resp.spec {
|
||||
Ok((Some(spec), spec_resp.compute_ctl_config))
|
||||
} else {
|
||||
bail!("compute is attached, but spec is empty")
|
||||
}
|
||||
@@ -105,7 +111,7 @@ pub fn get_config_from_control_plane(base_uri: &str, compute_id: &str) -> Result
|
||||
}
|
||||
Err((retry, msg, status)) => {
|
||||
CPLANE_REQUESTS_TOTAL
|
||||
.with_label_values(&[CPlaneRequestRPC::GetConfig.as_str(), &status])
|
||||
.with_label_values(&[CPlaneRequestRPC::GetSpec.as_str(), &status])
|
||||
.inc();
|
||||
if retry {
|
||||
Err(anyhow!(msg))
|
||||
@@ -116,7 +122,7 @@ pub fn get_config_from_control_plane(base_uri: &str, compute_id: &str) -> Result
|
||||
};
|
||||
|
||||
if let Err(e) = &result {
|
||||
error!("attempt {} to get config failed with: {}", attempt, e);
|
||||
error!("attempt {} to get spec failed with: {}", attempt, e);
|
||||
} else {
|
||||
return result;
|
||||
}
|
||||
@@ -127,13 +133,13 @@ pub fn get_config_from_control_plane(base_uri: &str, compute_id: &str) -> Result
|
||||
|
||||
// All attempts failed, return error.
|
||||
Err(anyhow::anyhow!(
|
||||
"Exhausted all attempts to retrieve the config from the control plane"
|
||||
"Exhausted all attempts to retrieve the spec from the control plane"
|
||||
))
|
||||
}
|
||||
|
||||
/// Check `pg_hba.conf` and update if needed to allow external connections.
|
||||
pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
|
||||
// XXX: consider making it a part of config.json
|
||||
// XXX: consider making it a part of spec.json
|
||||
let pghba_path = pgdata_path.join("pg_hba.conf");
|
||||
|
||||
if config::line_in_file(&pghba_path, PG_HBA_ALL_MD5)? {
|
||||
@@ -147,7 +153,7 @@ pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
|
||||
|
||||
/// Create a standby.signal file
|
||||
pub fn add_standby_signal(pgdata_path: &Path) -> Result<()> {
|
||||
// XXX: consider making it a part of config.json
|
||||
// XXX: consider making it a part of spec.json
|
||||
let signalfile = pgdata_path.join("standby.signal");
|
||||
|
||||
if !signalfile.exists() {
|
||||
|
||||
@@ -29,7 +29,7 @@
|
||||
//! compute.log - log output of `compute_ctl` and `postgres`
|
||||
//! endpoint.json - serialized `EndpointConf` struct
|
||||
//! postgresql.conf - postgresql settings
|
||||
//! config.json - passed to `compute_ctl`
|
||||
//! spec.json - passed to `compute_ctl`
|
||||
//! pgdata/
|
||||
//! postgresql.conf - copy of postgresql.conf created by `compute_ctl`
|
||||
//! zenith.signal
|
||||
@@ -46,9 +46,7 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
|
||||
|
||||
use anyhow::{Context, Result, anyhow, bail};
|
||||
use compute_api::requests::ConfigurationRequest;
|
||||
use compute_api::responses::{
|
||||
ComputeConfig, ComputeCtlConfig, ComputeStatus, ComputeStatusResponse,
|
||||
};
|
||||
use compute_api::responses::{ComputeCtlConfig, ComputeStatus, ComputeStatusResponse};
|
||||
use compute_api::spec::{
|
||||
Cluster, ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, Database, PgIdent,
|
||||
RemoteExtSpec, Role,
|
||||
@@ -621,101 +619,90 @@ impl Endpoint {
|
||||
remote_extensions = None;
|
||||
};
|
||||
|
||||
// Create config file
|
||||
let config = {
|
||||
let mut spec = ComputeSpec {
|
||||
skip_pg_catalog_updates: self.skip_pg_catalog_updates,
|
||||
format_version: 1.0,
|
||||
operation_uuid: None,
|
||||
features: self.features.clone(),
|
||||
swap_size_bytes: None,
|
||||
disk_quota_bytes: None,
|
||||
disable_lfc_resizing: None,
|
||||
cluster: Cluster {
|
||||
cluster_id: None, // project ID: not used
|
||||
name: None, // project name: not used
|
||||
state: None,
|
||||
roles: if create_test_user {
|
||||
vec![Role {
|
||||
name: PgIdent::from_str("test").unwrap(),
|
||||
encrypted_password: None,
|
||||
options: None,
|
||||
}]
|
||||
} else {
|
||||
Vec::new()
|
||||
},
|
||||
databases: if create_test_user {
|
||||
vec![Database {
|
||||
name: PgIdent::from_str("neondb").unwrap(),
|
||||
owner: PgIdent::from_str("test").unwrap(),
|
||||
options: None,
|
||||
restrict_conn: false,
|
||||
invalid: false,
|
||||
}]
|
||||
} else {
|
||||
Vec::new()
|
||||
},
|
||||
settings: None,
|
||||
postgresql_conf: Some(postgresql_conf.clone()),
|
||||
},
|
||||
delta_operations: None,
|
||||
tenant_id: Some(self.tenant_id),
|
||||
timeline_id: Some(self.timeline_id),
|
||||
project_id: None,
|
||||
branch_id: None,
|
||||
endpoint_id: Some(self.endpoint_id.clone()),
|
||||
mode: self.mode,
|
||||
pageserver_connstring: Some(pageserver_connstring),
|
||||
safekeepers_generation: safekeepers_generation.map(|g| g.into_inner()),
|
||||
safekeeper_connstrings,
|
||||
storage_auth_token: auth_token.clone(),
|
||||
remote_extensions,
|
||||
pgbouncer_settings: None,
|
||||
shard_stripe_size: Some(shard_stripe_size),
|
||||
local_proxy_config: None,
|
||||
reconfigure_concurrency: self.reconfigure_concurrency,
|
||||
drop_subscriptions_before_start: self.drop_subscriptions_before_start,
|
||||
audit_log_level: ComputeAudit::Disabled,
|
||||
logs_export_host: None::<String>,
|
||||
};
|
||||
|
||||
// this strange code is needed to support respec() in tests
|
||||
if self.cluster.is_some() {
|
||||
debug!("Cluster is already set in the endpoint spec, using it");
|
||||
spec.cluster = self.cluster.clone().unwrap();
|
||||
|
||||
debug!("spec.cluster {:?}", spec.cluster);
|
||||
|
||||
// fill missing fields again
|
||||
if create_test_user {
|
||||
spec.cluster.roles.push(Role {
|
||||
// Create spec file
|
||||
let mut spec = ComputeSpec {
|
||||
skip_pg_catalog_updates: self.skip_pg_catalog_updates,
|
||||
format_version: 1.0,
|
||||
operation_uuid: None,
|
||||
features: self.features.clone(),
|
||||
swap_size_bytes: None,
|
||||
disk_quota_bytes: None,
|
||||
disable_lfc_resizing: None,
|
||||
cluster: Cluster {
|
||||
cluster_id: None, // project ID: not used
|
||||
name: None, // project name: not used
|
||||
state: None,
|
||||
roles: if create_test_user {
|
||||
vec![Role {
|
||||
name: PgIdent::from_str("test").unwrap(),
|
||||
encrypted_password: None,
|
||||
options: None,
|
||||
});
|
||||
spec.cluster.databases.push(Database {
|
||||
}]
|
||||
} else {
|
||||
Vec::new()
|
||||
},
|
||||
databases: if create_test_user {
|
||||
vec![Database {
|
||||
name: PgIdent::from_str("neondb").unwrap(),
|
||||
owner: PgIdent::from_str("test").unwrap(),
|
||||
options: None,
|
||||
restrict_conn: false,
|
||||
invalid: false,
|
||||
});
|
||||
}
|
||||
spec.cluster.postgresql_conf = Some(postgresql_conf);
|
||||
}
|
||||
|
||||
ComputeConfig {
|
||||
spec: Some(spec),
|
||||
compute_ctl_config: ComputeCtlConfig::default(),
|
||||
}
|
||||
}]
|
||||
} else {
|
||||
Vec::new()
|
||||
},
|
||||
settings: None,
|
||||
postgresql_conf: Some(postgresql_conf.clone()),
|
||||
},
|
||||
delta_operations: None,
|
||||
tenant_id: Some(self.tenant_id),
|
||||
timeline_id: Some(self.timeline_id),
|
||||
project_id: None,
|
||||
branch_id: None,
|
||||
endpoint_id: Some(self.endpoint_id.clone()),
|
||||
mode: self.mode,
|
||||
pageserver_connstring: Some(pageserver_connstring),
|
||||
safekeepers_generation: safekeepers_generation.map(|g| g.into_inner()),
|
||||
safekeeper_connstrings,
|
||||
storage_auth_token: auth_token.clone(),
|
||||
remote_extensions,
|
||||
pgbouncer_settings: None,
|
||||
shard_stripe_size: Some(shard_stripe_size),
|
||||
local_proxy_config: None,
|
||||
reconfigure_concurrency: self.reconfigure_concurrency,
|
||||
drop_subscriptions_before_start: self.drop_subscriptions_before_start,
|
||||
audit_log_level: ComputeAudit::Disabled,
|
||||
logs_export_host: None::<String>,
|
||||
};
|
||||
|
||||
// TODO(tristan957): Remove the write to spec.json after compatibility
|
||||
// tests work themselves out
|
||||
// this strange code is needed to support respec() in tests
|
||||
if self.cluster.is_some() {
|
||||
debug!("Cluster is already set in the endpoint spec, using it");
|
||||
spec.cluster = self.cluster.clone().unwrap();
|
||||
|
||||
debug!("spec.cluster {:?}", spec.cluster);
|
||||
|
||||
// fill missing fields again
|
||||
if create_test_user {
|
||||
spec.cluster.roles.push(Role {
|
||||
name: PgIdent::from_str("test").unwrap(),
|
||||
encrypted_password: None,
|
||||
options: None,
|
||||
});
|
||||
spec.cluster.databases.push(Database {
|
||||
name: PgIdent::from_str("neondb").unwrap(),
|
||||
owner: PgIdent::from_str("test").unwrap(),
|
||||
options: None,
|
||||
restrict_conn: false,
|
||||
invalid: false,
|
||||
});
|
||||
}
|
||||
spec.cluster.postgresql_conf = Some(postgresql_conf);
|
||||
}
|
||||
|
||||
let spec_path = self.endpoint_path().join("spec.json");
|
||||
std::fs::write(spec_path, serde_json::to_string_pretty(&config.spec)?)?;
|
||||
let config_path = self.endpoint_path().join("config.json");
|
||||
std::fs::write(config_path, serde_json::to_string_pretty(&config)?)?;
|
||||
std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
|
||||
|
||||
// Open log file. We'll redirect the stdout and stderr of `compute_ctl` to it.
|
||||
let logfile = std::fs::OpenOptions::new()
|
||||
@@ -723,16 +710,6 @@ impl Endpoint {
|
||||
.append(true)
|
||||
.open(self.endpoint_path().join("compute.log"))?;
|
||||
|
||||
// TODO(tristan957): Remove when compatibility tests are no longer an
|
||||
// issue
|
||||
let old_compute_ctl = {
|
||||
let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl"));
|
||||
let help_output = cmd.arg("--help").output()?;
|
||||
let help_output = String::from_utf8_lossy(&help_output.stdout);
|
||||
|
||||
!help_output.contains("--config")
|
||||
};
|
||||
|
||||
// Launch compute_ctl
|
||||
let conn_str = self.connstr("cloud_admin", "postgres");
|
||||
println!("Starting postgres node at '{}'", conn_str);
|
||||
@@ -751,18 +728,9 @@ impl Endpoint {
|
||||
])
|
||||
.args(["--pgdata", self.pgdata().to_str().unwrap()])
|
||||
.args(["--connstr", &conn_str])
|
||||
// TODO(tristan957): Change this to --config when compatibility tests
|
||||
// are no longer an issue
|
||||
.args([
|
||||
"--spec-path",
|
||||
self.endpoint_path()
|
||||
.join(if old_compute_ctl {
|
||||
"spec.json"
|
||||
} else {
|
||||
"config.json"
|
||||
})
|
||||
.to_str()
|
||||
.unwrap(),
|
||||
self.endpoint_path().join("spec.json").to_str().unwrap(),
|
||||
])
|
||||
.args([
|
||||
"--pgbin",
|
||||
@@ -905,12 +873,10 @@ impl Endpoint {
|
||||
stripe_size: Option<ShardStripeSize>,
|
||||
safekeepers: Option<Vec<NodeId>>,
|
||||
) -> Result<()> {
|
||||
let (mut spec, compute_ctl_config) = {
|
||||
let config_path = self.endpoint_path().join("config.json");
|
||||
let file = std::fs::File::open(config_path)?;
|
||||
let config: ComputeConfig = serde_json::from_reader(file)?;
|
||||
|
||||
(config.spec.unwrap(), config.compute_ctl_config)
|
||||
let mut spec: ComputeSpec = {
|
||||
let spec_path = self.endpoint_path().join("spec.json");
|
||||
let file = std::fs::File::open(spec_path)?;
|
||||
serde_json::from_reader(file)?
|
||||
};
|
||||
|
||||
let postgresql_conf = self.read_postgresql_conf()?;
|
||||
@@ -960,7 +926,7 @@ impl Endpoint {
|
||||
.body(
|
||||
serde_json::to_string(&ConfigurationRequest {
|
||||
spec,
|
||||
compute_ctl_config,
|
||||
compute_ctl_config: ComputeCtlConfig::default(),
|
||||
})
|
||||
.unwrap(),
|
||||
)
|
||||
|
||||
@@ -535,11 +535,6 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'gc_compaction_enabled' as bool")?,
|
||||
gc_compaction_verification: settings
|
||||
.remove("gc_compaction_verification")
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'gc_compaction_verification' as bool")?,
|
||||
gc_compaction_initial_threshold_kb: settings
|
||||
.remove("gc_compaction_initial_threshold_kb")
|
||||
.map(|x| x.parse::<u64>())
|
||||
|
||||
@@ -11,8 +11,8 @@ generate_id() {
|
||||
|
||||
PG_VERSION=${PG_VERSION:-14}
|
||||
|
||||
CONFIG_FILE_ORG=/var/db/postgres/configs/config.json
|
||||
CONFIG_FILE=/tmp/config.json
|
||||
SPEC_FILE_ORG=/var/db/postgres/specs/spec.json
|
||||
SPEC_FILE=/tmp/spec.json
|
||||
|
||||
echo "Waiting pageserver become ready."
|
||||
while ! nc -z pageserver 6400; do
|
||||
@@ -20,7 +20,7 @@ while ! nc -z pageserver 6400; do
|
||||
done
|
||||
echo "Page server is ready."
|
||||
|
||||
cp ${CONFIG_FILE_ORG} ${CONFIG_FILE}
|
||||
cp ${SPEC_FILE_ORG} ${SPEC_FILE}
|
||||
|
||||
if [ -n "${TENANT_ID:-}" ] && [ -n "${TIMELINE_ID:-}" ]; then
|
||||
tenant_id=${TENANT_ID}
|
||||
@@ -73,27 +73,17 @@ else
|
||||
ulid_extension=ulid
|
||||
fi
|
||||
echo "Adding pgx_ulid"
|
||||
shared_libraries=$(jq -r '.spec.cluster.settings[] | select(.name=="shared_preload_libraries").value' ${CONFIG_FILE})
|
||||
sed -i "s/${shared_libraries}/${shared_libraries},${ulid_extension}/" ${CONFIG_FILE}
|
||||
shared_libraries=$(jq -r '.cluster.settings[] | select(.name=="shared_preload_libraries").value' ${SPEC_FILE})
|
||||
sed -i "s/${shared_libraries}/${shared_libraries},${ulid_extension}/" ${SPEC_FILE}
|
||||
echo "Overwrite tenant id and timeline id in spec file"
|
||||
sed -i "s/TENANT_ID/${tenant_id}/" ${CONFIG_FILE}
|
||||
sed -i "s/TIMELINE_ID/${timeline_id}/" ${CONFIG_FILE}
|
||||
sed -i "s/TENANT_ID/${tenant_id}/" ${SPEC_FILE}
|
||||
sed -i "s/TIMELINE_ID/${timeline_id}/" ${SPEC_FILE}
|
||||
|
||||
cat ${CONFIG_FILE}
|
||||
|
||||
# TODO(tristan957): Remove these workarounds for backwards compatibility after
|
||||
# the next compute release. That includes these next few lines and the
|
||||
# --spec-path in the compute_ctl invocation.
|
||||
if compute_ctl --help | grep --quiet -- '--config'; then
|
||||
SPEC_PATH="$CONFIG_FILE"
|
||||
else
|
||||
jq '.spec' < "$CONFIG_FILE" > /tmp/spec.json
|
||||
SPEC_PATH=/tmp/spec.json
|
||||
fi
|
||||
cat ${SPEC_FILE}
|
||||
|
||||
echo "Start compute node"
|
||||
/usr/local/bin/compute_ctl --pgdata /var/db/postgres/compute \
|
||||
-C "postgresql://cloud_admin@localhost:55433/postgres" \
|
||||
-b /usr/local/bin/postgres \
|
||||
--compute-id "compute-$RANDOM" \
|
||||
--spec-path "$SPEC_PATH"
|
||||
-S ${SPEC_FILE}
|
||||
|
||||
@@ -1,148 +0,0 @@
|
||||
{
|
||||
"spec": {
|
||||
"format_version": 1.0,
|
||||
|
||||
"timestamp": "2022-10-12T18:00:00.000Z",
|
||||
"operation_uuid": "0f657b36-4b0f-4a2d-9c2e-1dcd615e7d8c",
|
||||
|
||||
"cluster": {
|
||||
"cluster_id": "docker_compose",
|
||||
"name": "docker_compose_test",
|
||||
"state": "restarted",
|
||||
"roles": [
|
||||
{
|
||||
"name": "cloud_admin",
|
||||
"encrypted_password": "b093c0d3b281ba6da1eacc608620abd8",
|
||||
"options": null
|
||||
}
|
||||
],
|
||||
"databases": [
|
||||
],
|
||||
"settings": [
|
||||
{
|
||||
"name": "fsync",
|
||||
"value": "off",
|
||||
"vartype": "bool"
|
||||
},
|
||||
{
|
||||
"name": "wal_level",
|
||||
"value": "logical",
|
||||
"vartype": "enum"
|
||||
},
|
||||
{
|
||||
"name": "wal_log_hints",
|
||||
"value": "on",
|
||||
"vartype": "bool"
|
||||
},
|
||||
{
|
||||
"name": "log_connections",
|
||||
"value": "on",
|
||||
"vartype": "bool"
|
||||
},
|
||||
{
|
||||
"name": "port",
|
||||
"value": "55433",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "shared_buffers",
|
||||
"value": "1MB",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "max_connections",
|
||||
"value": "100",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "listen_addresses",
|
||||
"value": "0.0.0.0",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "max_wal_senders",
|
||||
"value": "10",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "max_replication_slots",
|
||||
"value": "10",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "wal_sender_timeout",
|
||||
"value": "5s",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "wal_keep_size",
|
||||
"value": "0",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "password_encryption",
|
||||
"value": "md5",
|
||||
"vartype": "enum"
|
||||
},
|
||||
{
|
||||
"name": "restart_after_crash",
|
||||
"value": "off",
|
||||
"vartype": "bool"
|
||||
},
|
||||
{
|
||||
"name": "synchronous_standby_names",
|
||||
"value": "walproposer",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "shared_preload_libraries",
|
||||
"value": "neon,pg_cron,timescaledb,pg_stat_statements",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "neon.safekeepers",
|
||||
"value": "safekeeper1:5454,safekeeper2:5454,safekeeper3:5454",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "neon.timeline_id",
|
||||
"value": "TIMELINE_ID",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "neon.tenant_id",
|
||||
"value": "TENANT_ID",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "neon.pageserver_connstring",
|
||||
"value": "host=pageserver port=6400",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "max_replication_write_lag",
|
||||
"value": "500MB",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "max_replication_flush_lag",
|
||||
"value": "10GB",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "cron.database",
|
||||
"value": "postgres",
|
||||
"vartype": "string"
|
||||
}
|
||||
]
|
||||
},
|
||||
|
||||
"delta_operations": [
|
||||
]
|
||||
},
|
||||
"compute_ctl_config": {
|
||||
"jwks": {
|
||||
"keys": []
|
||||
}
|
||||
}
|
||||
}
|
||||
141
docker-compose/compute_wrapper/var/db/postgres/specs/spec.json
Normal file
141
docker-compose/compute_wrapper/var/db/postgres/specs/spec.json
Normal file
@@ -0,0 +1,141 @@
|
||||
{
|
||||
"format_version": 1.0,
|
||||
|
||||
"timestamp": "2022-10-12T18:00:00.000Z",
|
||||
"operation_uuid": "0f657b36-4b0f-4a2d-9c2e-1dcd615e7d8c",
|
||||
|
||||
"cluster": {
|
||||
"cluster_id": "docker_compose",
|
||||
"name": "docker_compose_test",
|
||||
"state": "restarted",
|
||||
"roles": [
|
||||
{
|
||||
"name": "cloud_admin",
|
||||
"encrypted_password": "b093c0d3b281ba6da1eacc608620abd8",
|
||||
"options": null
|
||||
}
|
||||
],
|
||||
"databases": [
|
||||
],
|
||||
"settings": [
|
||||
{
|
||||
"name": "fsync",
|
||||
"value": "off",
|
||||
"vartype": "bool"
|
||||
},
|
||||
{
|
||||
"name": "wal_level",
|
||||
"value": "logical",
|
||||
"vartype": "enum"
|
||||
},
|
||||
{
|
||||
"name": "wal_log_hints",
|
||||
"value": "on",
|
||||
"vartype": "bool"
|
||||
},
|
||||
{
|
||||
"name": "log_connections",
|
||||
"value": "on",
|
||||
"vartype": "bool"
|
||||
},
|
||||
{
|
||||
"name": "port",
|
||||
"value": "55433",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "shared_buffers",
|
||||
"value": "1MB",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "max_connections",
|
||||
"value": "100",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "listen_addresses",
|
||||
"value": "0.0.0.0",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "max_wal_senders",
|
||||
"value": "10",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "max_replication_slots",
|
||||
"value": "10",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "wal_sender_timeout",
|
||||
"value": "5s",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "wal_keep_size",
|
||||
"value": "0",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "password_encryption",
|
||||
"value": "md5",
|
||||
"vartype": "enum"
|
||||
},
|
||||
{
|
||||
"name": "restart_after_crash",
|
||||
"value": "off",
|
||||
"vartype": "bool"
|
||||
},
|
||||
{
|
||||
"name": "synchronous_standby_names",
|
||||
"value": "walproposer",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "shared_preload_libraries",
|
||||
"value": "neon,pg_cron,timescaledb,pg_stat_statements",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "neon.safekeepers",
|
||||
"value": "safekeeper1:5454,safekeeper2:5454,safekeeper3:5454",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "neon.timeline_id",
|
||||
"value": "TIMELINE_ID",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "neon.tenant_id",
|
||||
"value": "TENANT_ID",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "neon.pageserver_connstring",
|
||||
"value": "host=pageserver port=6400",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "max_replication_write_lag",
|
||||
"value": "500MB",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "max_replication_flush_lag",
|
||||
"value": "10GB",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "cron.database",
|
||||
"value": "postgres",
|
||||
"vartype": "string"
|
||||
}
|
||||
]
|
||||
},
|
||||
|
||||
"delta_operations": [
|
||||
]
|
||||
}
|
||||
@@ -159,7 +159,7 @@ services:
|
||||
#- RUST_BACKTRACE=1
|
||||
# Mount the test files directly, for faster editing cycle.
|
||||
volumes:
|
||||
- ./compute_wrapper/var/db/postgres/configs/:/var/db/postgres/configs/
|
||||
- ./compute_wrapper/var/db/postgres/specs/:/var/db/postgres/specs/
|
||||
- ./compute_wrapper/shell/:/shell/
|
||||
ports:
|
||||
- 55433:55433 # pg protocol handler
|
||||
|
||||
@@ -14,32 +14,6 @@ pub struct GenericAPIError {
|
||||
pub error: String,
|
||||
}
|
||||
|
||||
/// All configuration parameters necessary for a compute. When
|
||||
/// [`ComputeConfig::spec`] is provided, it means that the compute is attached
|
||||
/// to a tenant. [`ComputeConfig::compute_ctl_config`] will always be provided
|
||||
/// and contains parameters necessary for operating `compute_ctl` independently
|
||||
/// of whether a tenant is attached to the compute or not.
|
||||
///
|
||||
/// This also happens to be the body of `compute_ctl`'s /configure request.
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
pub struct ComputeConfig {
|
||||
/// The compute spec
|
||||
pub spec: Option<ComputeSpec>,
|
||||
|
||||
/// The compute_ctl configuration
|
||||
#[allow(dead_code)]
|
||||
pub compute_ctl_config: ComputeCtlConfig,
|
||||
}
|
||||
|
||||
impl From<ControlPlaneConfigResponse> for ComputeConfig {
|
||||
fn from(value: ControlPlaneConfigResponse) -> Self {
|
||||
Self {
|
||||
spec: value.spec,
|
||||
compute_ctl_config: value.compute_ctl_config,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct ExtensionInstallResponse {
|
||||
pub extension: PgIdent,
|
||||
@@ -187,7 +161,7 @@ pub struct TlsConfig {
|
||||
|
||||
/// Response of the `/computes/{compute_id}/spec` control-plane API.
|
||||
#[derive(Deserialize, Debug)]
|
||||
pub struct ControlPlaneConfigResponse {
|
||||
pub struct ControlPlaneSpecResponse {
|
||||
pub spec: Option<ComputeSpec>,
|
||||
pub status: ControlPlaneComputeStatus,
|
||||
pub compute_ctl_config: ComputeCtlConfig,
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
//! The ComputeSpec contains all the information needed to start up
|
||||
//! the right version of PostgreSQL, and connect it to the storage nodes.
|
||||
//! It can be passed as part of the `config.json`, or the control plane can
|
||||
//! provide it by calling the compute_ctl's `/compute_ctl` endpoint, or
|
||||
//! compute_ctl can fetch it by calling the control plane's API.
|
||||
//! `ComputeSpec` represents the contents of the spec.json file.
|
||||
//!
|
||||
//! The spec.json file is used to pass information to 'compute_ctl'. It contains
|
||||
//! all the information needed to start up the right version of PostgreSQL,
|
||||
//! and connect it to the storage nodes.
|
||||
use std::collections::HashMap;
|
||||
|
||||
use indexmap::IndexMap;
|
||||
|
||||
@@ -452,8 +452,6 @@ pub struct TenantConfigToml {
|
||||
// gc-compaction related configs
|
||||
/// Enable automatic gc-compaction trigger on this tenant.
|
||||
pub gc_compaction_enabled: bool,
|
||||
/// Enable verification of gc-compaction results.
|
||||
pub gc_compaction_verification: bool,
|
||||
/// The initial threshold for gc-compaction in KB. Once the total size of layers below the gc-horizon is above this threshold,
|
||||
/// gc-compaction will be triggered.
|
||||
pub gc_compaction_initial_threshold_kb: u64,
|
||||
@@ -694,7 +692,6 @@ pub mod tenant_conf_defaults {
|
||||
// image layers should be created.
|
||||
pub const DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD: u8 = 2;
|
||||
pub const DEFAULT_GC_COMPACTION_ENABLED: bool = false;
|
||||
pub const DEFAULT_GC_COMPACTION_VERIFICATION: bool = true;
|
||||
pub const DEFAULT_GC_COMPACTION_INITIAL_THRESHOLD_KB: u64 = 5 * 1024 * 1024; // 5GB
|
||||
pub const DEFAULT_GC_COMPACTION_RATIO_PERCENT: u64 = 100;
|
||||
}
|
||||
@@ -749,7 +746,6 @@ impl Default for TenantConfigToml {
|
||||
wal_receiver_protocol_override: None,
|
||||
rel_size_v2_enabled: false,
|
||||
gc_compaction_enabled: DEFAULT_GC_COMPACTION_ENABLED,
|
||||
gc_compaction_verification: DEFAULT_GC_COMPACTION_VERIFICATION,
|
||||
gc_compaction_initial_threshold_kb: DEFAULT_GC_COMPACTION_INITIAL_THRESHOLD_KB,
|
||||
gc_compaction_ratio_percent: DEFAULT_GC_COMPACTION_RATIO_PERCENT,
|
||||
sampling_ratio: None,
|
||||
|
||||
@@ -576,8 +576,6 @@ pub struct TenantConfigPatch {
|
||||
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
|
||||
pub gc_compaction_enabled: FieldPatch<bool>,
|
||||
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
|
||||
pub gc_compaction_verification: FieldPatch<bool>,
|
||||
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
|
||||
pub gc_compaction_initial_threshold_kb: FieldPatch<u64>,
|
||||
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
|
||||
pub gc_compaction_ratio_percent: FieldPatch<u64>,
|
||||
@@ -698,9 +696,6 @@ pub struct TenantConfig {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub gc_compaction_enabled: Option<bool>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub gc_compaction_verification: Option<bool>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub gc_compaction_initial_threshold_kb: Option<u64>,
|
||||
|
||||
@@ -749,7 +744,6 @@ impl TenantConfig {
|
||||
mut wal_receiver_protocol_override,
|
||||
mut rel_size_v2_enabled,
|
||||
mut gc_compaction_enabled,
|
||||
mut gc_compaction_verification,
|
||||
mut gc_compaction_initial_threshold_kb,
|
||||
mut gc_compaction_ratio_percent,
|
||||
mut sampling_ratio,
|
||||
@@ -841,9 +835,6 @@ impl TenantConfig {
|
||||
patch
|
||||
.gc_compaction_enabled
|
||||
.apply(&mut gc_compaction_enabled);
|
||||
patch
|
||||
.gc_compaction_verification
|
||||
.apply(&mut gc_compaction_verification);
|
||||
patch
|
||||
.gc_compaction_initial_threshold_kb
|
||||
.apply(&mut gc_compaction_initial_threshold_kb);
|
||||
@@ -885,7 +876,6 @@ impl TenantConfig {
|
||||
wal_receiver_protocol_override,
|
||||
rel_size_v2_enabled,
|
||||
gc_compaction_enabled,
|
||||
gc_compaction_verification,
|
||||
gc_compaction_initial_threshold_kb,
|
||||
gc_compaction_ratio_percent,
|
||||
sampling_ratio,
|
||||
@@ -984,9 +974,6 @@ impl TenantConfig {
|
||||
gc_compaction_enabled: self
|
||||
.gc_compaction_enabled
|
||||
.unwrap_or(global_conf.gc_compaction_enabled),
|
||||
gc_compaction_verification: self
|
||||
.gc_compaction_verification
|
||||
.unwrap_or(global_conf.gc_compaction_verification),
|
||||
gc_compaction_initial_threshold_kb: self
|
||||
.gc_compaction_initial_threshold_kb
|
||||
.unwrap_or(global_conf.gc_compaction_initial_threshold_kb),
|
||||
|
||||
@@ -126,7 +126,7 @@ async fn ingest(
|
||||
max_concurrency: NonZeroUsize::new(1).unwrap(),
|
||||
});
|
||||
let (_desc, path) = layer
|
||||
.write_to_disk(&ctx, None, l0_flush_state.inner())
|
||||
.write_to_disk(&ctx, None, l0_flush_state.inner(), &gate, cancel.clone())
|
||||
.await?
|
||||
.unwrap();
|
||||
tokio::fs::remove_file(path).await?;
|
||||
|
||||
@@ -9257,7 +9257,6 @@ mod tests {
|
||||
&[Lsn(0x20), Lsn(0x40), Lsn(0x50)],
|
||||
3,
|
||||
None,
|
||||
true,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -9382,15 +9381,7 @@ mod tests {
|
||||
),
|
||||
];
|
||||
let res = tline
|
||||
.generate_key_retention(
|
||||
key,
|
||||
&history,
|
||||
Lsn(0x60),
|
||||
&[Lsn(0x40), Lsn(0x50)],
|
||||
3,
|
||||
None,
|
||||
true,
|
||||
)
|
||||
.generate_key_retention(key, &history, Lsn(0x60), &[Lsn(0x40), Lsn(0x50)], 3, None)
|
||||
.await
|
||||
.unwrap();
|
||||
let expected_res = KeyHistoryRetention {
|
||||
@@ -9469,7 +9460,6 @@ mod tests {
|
||||
&[],
|
||||
3,
|
||||
Some((key, Lsn(0x10), Bytes::copy_from_slice(b"0x10"))),
|
||||
true,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -9518,7 +9508,6 @@ mod tests {
|
||||
&[Lsn(0x30)],
|
||||
3,
|
||||
Some((key, Lsn(0x10), Bytes::copy_from_slice(b"0x10"))),
|
||||
true,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -22,6 +22,7 @@ use bytes::{BufMut, BytesMut};
|
||||
use pageserver_api::models::ImageCompressionAlgorithm;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::context::RequestContext;
|
||||
@@ -169,7 +170,13 @@ pub struct BlobWriter<const BUFFERED: bool> {
|
||||
}
|
||||
|
||||
impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
pub fn new(inner: VirtualFile, start_offset: u64) -> Self {
|
||||
pub fn new(
|
||||
inner: VirtualFile,
|
||||
start_offset: u64,
|
||||
_gate: &utils::sync::gate::Gate,
|
||||
_cancel: CancellationToken,
|
||||
_ctx: &RequestContext,
|
||||
) -> Self {
|
||||
Self {
|
||||
inner,
|
||||
offset: start_offset,
|
||||
@@ -432,12 +439,14 @@ pub(crate) mod tests {
|
||||
) -> Result<(Utf8TempDir, Utf8PathBuf, Vec<u64>), Error> {
|
||||
let temp_dir = camino_tempfile::tempdir()?;
|
||||
let pathbuf = temp_dir.path().join("file");
|
||||
let gate = utils::sync::gate::Gate::default();
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
// Write part (in block to drop the file)
|
||||
let mut offsets = Vec::new();
|
||||
{
|
||||
let file = VirtualFile::create(pathbuf.as_path(), ctx).await?;
|
||||
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
|
||||
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0, &gate, cancel.clone(), ctx);
|
||||
for blob in blobs.iter() {
|
||||
let (_, res) = if compression {
|
||||
let res = wtr
|
||||
|
||||
@@ -28,6 +28,11 @@ pub struct EphemeralFile {
|
||||
_timeline_id: TimelineId,
|
||||
page_cache_file_id: page_cache::FileId,
|
||||
bytes_written: u64,
|
||||
// Always Some except during Drop
|
||||
inner: Option<Inner>,
|
||||
}
|
||||
|
||||
struct Inner {
|
||||
buffered_writer: owned_buffers_io::write::BufferedWriter<IoBufferMut, VirtualFile>,
|
||||
/// Gate guard is held on as long as we need to do operations in the path (delete on drop)
|
||||
_gate_guard: utils::sync::gate::GateGuard,
|
||||
@@ -44,9 +49,9 @@ impl EphemeralFile {
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<EphemeralFile> {
|
||||
static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
|
||||
static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
|
||||
let filename_disambiguator =
|
||||
NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
let filename = conf
|
||||
.timeline_path(&tenant_shard_id, &timeline_id)
|
||||
@@ -73,34 +78,68 @@ impl EphemeralFile {
|
||||
_timeline_id: timeline_id,
|
||||
page_cache_file_id,
|
||||
bytes_written: 0,
|
||||
buffered_writer: owned_buffers_io::write::BufferedWriter::new(
|
||||
file,
|
||||
|| IoBufferMut::with_capacity(TAIL_SZ),
|
||||
gate.enter()?,
|
||||
cancel.child_token(),
|
||||
ctx,
|
||||
info_span!(parent: None, "ephemeral_file_buffered_writer", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %filename),
|
||||
),
|
||||
_gate_guard: gate.enter()?,
|
||||
inner: Some(Inner {
|
||||
buffered_writer: owned_buffers_io::write::BufferedWriter::new(
|
||||
file,
|
||||
|| IoBufferMut::with_capacity(TAIL_SZ),
|
||||
gate.enter()?,
|
||||
cancel.child_token(),
|
||||
ctx,
|
||||
info_span!(parent: None, "ephemeral_file_buffered_writer", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %filename),
|
||||
),
|
||||
_gate_guard: gate.enter()?,
|
||||
}),
|
||||
})
|
||||
}
|
||||
|
||||
fn buffered_writer(
|
||||
&self,
|
||||
) -> &owned_buffers_io::write::BufferedWriter<IoBufferMut, VirtualFile> {
|
||||
&self
|
||||
.inner
|
||||
.as_ref()
|
||||
.expect("we never take out except during drop")
|
||||
.buffered_writer
|
||||
}
|
||||
fn buffered_writer_mut(
|
||||
&mut self,
|
||||
) -> &mut owned_buffers_io::write::BufferedWriter<IoBufferMut, VirtualFile> {
|
||||
&mut self
|
||||
.inner
|
||||
.as_mut()
|
||||
.expect("we never take out except during drop")
|
||||
.buffered_writer
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for EphemeralFile {
|
||||
fn drop(&mut self) {
|
||||
// unlink the file
|
||||
// we are clear to do this, because we have entered a gate
|
||||
let path = self.buffered_writer.as_inner().path();
|
||||
let res = std::fs::remove_file(path);
|
||||
if let Err(e) = res {
|
||||
if e.kind() != std::io::ErrorKind::NotFound {
|
||||
// just never log the not found errors, we cannot do anything for them; on detach
|
||||
// the tenant directory is already gone.
|
||||
//
|
||||
// not found files might also be related to https://github.com/neondatabase/neon/issues/2442
|
||||
error!("could not remove ephemeral file '{path}': {e}");
|
||||
let inner = self.inner.take().expect("we never take out except here");
|
||||
|
||||
tokio::spawn(async move {
|
||||
let Inner {
|
||||
buffered_writer,
|
||||
_gate_guard,
|
||||
} = inner;
|
||||
|
||||
// XXX kinda ugly that we have this Arc here, would like to call VirtualFile::remove()
|
||||
let virtual_file: Arc<VirtualFile> = buffered_writer.into_inner_no_flush().await;
|
||||
let path = virtual_file.path();
|
||||
let res = std::fs::remove_file(path);
|
||||
if let Err(e) = res {
|
||||
if e.kind() != std::io::ErrorKind::NotFound {
|
||||
// TODO: can we retry?
|
||||
|
||||
// just never log the not found errors, we cannot do anything for them; on detach
|
||||
// the tenant directory is already gone.
|
||||
//
|
||||
// not found files might also be related to https://github.com/neondatabase/neon/issues/2442
|
||||
error!("could not remove ephemeral file '{path}': {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
drop(_gate_guard);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -168,7 +207,7 @@ impl EphemeralFile {
|
||||
|
||||
// Write the payload
|
||||
let (nwritten, control) = self
|
||||
.buffered_writer
|
||||
.buffered_writer_mut()
|
||||
.write_buffered_borrowed_controlled(srcbuf, ctx)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
@@ -193,9 +232,9 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
|
||||
dst: tokio_epoll_uring::Slice<B>,
|
||||
ctx: &RequestContext,
|
||||
) -> std::io::Result<(tokio_epoll_uring::Slice<B>, usize)> {
|
||||
let submitted_offset = self.buffered_writer.bytes_submitted();
|
||||
let submitted_offset = self.buffered_writer().bytes_submitted();
|
||||
|
||||
let mutable = match self.buffered_writer.inspect_mutable() {
|
||||
let mutable = match self.buffered_writer().inspect_mutable() {
|
||||
Some(mutable) => &mutable[0..mutable.pending()],
|
||||
None => {
|
||||
// Timeline::cancel and hence buffered writer flush was cancelled.
|
||||
@@ -204,7 +243,7 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
|
||||
}
|
||||
};
|
||||
|
||||
let maybe_flushed = self.buffered_writer.inspect_maybe_flushed();
|
||||
let maybe_flushed = self.buffered_writer().inspect_maybe_flushed();
|
||||
|
||||
let dst_cap = dst.bytes_total().into_u64();
|
||||
let end = {
|
||||
@@ -262,7 +301,7 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
|
||||
let mutable_range = Range(std::cmp::max(start, submitted_offset), end);
|
||||
|
||||
let dst = if written_range.len() > 0 {
|
||||
let file: &VirtualFile = self.buffered_writer.as_inner();
|
||||
let file: &VirtualFile = self.buffered_writer().as_inner();
|
||||
let bounds = dst.bounds();
|
||||
let slice = file
|
||||
.read_exact_at(dst.slice(0..written_range.len().into_usize()), start, ctx)
|
||||
@@ -419,7 +458,7 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mutable = file.buffered_writer.mutable();
|
||||
let mutable = file.buffered_writer().mutable();
|
||||
let cap = mutable.capacity();
|
||||
let align = mutable.align();
|
||||
|
||||
@@ -456,13 +495,13 @@ mod tests {
|
||||
assert_eq!(&buf, &content[range]);
|
||||
}
|
||||
|
||||
let file_contents = std::fs::read(file.buffered_writer.as_inner().path()).unwrap();
|
||||
let file_contents = std::fs::read(file.buffered_writer().as_inner().path()).unwrap();
|
||||
assert!(file_contents == content[0..cap * 2]);
|
||||
|
||||
let maybe_flushed_buffer_contents = file.buffered_writer.inspect_maybe_flushed().unwrap();
|
||||
let maybe_flushed_buffer_contents = file.buffered_writer().inspect_maybe_flushed().unwrap();
|
||||
assert_eq!(&maybe_flushed_buffer_contents[..], &content[cap..cap * 2]);
|
||||
|
||||
let mutable_buffer_contents = file.buffered_writer.mutable();
|
||||
let mutable_buffer_contents = file.buffered_writer().mutable();
|
||||
assert_eq!(mutable_buffer_contents, &content[cap * 2..write_nbytes]);
|
||||
}
|
||||
|
||||
@@ -477,7 +516,7 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
// mutable buffer and maybe_flushed buffer each has `cap` bytes.
|
||||
let cap = file.buffered_writer.mutable().capacity();
|
||||
let cap = file.buffered_writer().mutable().capacity();
|
||||
|
||||
let content: Vec<u8> = rand::thread_rng()
|
||||
.sample_iter(rand::distributions::Standard)
|
||||
@@ -489,18 +528,18 @@ mod tests {
|
||||
// assert the state is as this test expects it to be
|
||||
let load_io_buf_res = file.load_to_io_buf(&ctx).await.unwrap();
|
||||
assert_eq!(&load_io_buf_res[..], &content[0..cap * 2 + cap / 2]);
|
||||
let md = file.buffered_writer.as_inner().path().metadata().unwrap();
|
||||
let md = file.buffered_writer().as_inner().path().metadata().unwrap();
|
||||
assert_eq!(
|
||||
md.len(),
|
||||
2 * cap.into_u64(),
|
||||
"buffered writer requires one write to be flushed if we write 2.5x buffer capacity"
|
||||
);
|
||||
assert_eq!(
|
||||
&file.buffered_writer.inspect_maybe_flushed().unwrap()[0..cap],
|
||||
&file.buffered_writer().inspect_maybe_flushed().unwrap()[0..cap],
|
||||
&content[cap..cap * 2]
|
||||
);
|
||||
assert_eq!(
|
||||
&file.buffered_writer.mutable()[0..cap / 2],
|
||||
&file.buffered_writer().mutable()[0..cap / 2],
|
||||
&content[cap * 2..cap * 2 + cap / 2]
|
||||
);
|
||||
}
|
||||
@@ -522,7 +561,7 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mutable = file.buffered_writer.mutable();
|
||||
let mutable = file.buffered_writer().mutable();
|
||||
let cap = mutable.capacity();
|
||||
let align = mutable.align();
|
||||
let content: Vec<u8> = rand::thread_rng()
|
||||
|
||||
@@ -5,6 +5,7 @@ use std::sync::Arc;
|
||||
use bytes::Bytes;
|
||||
use pageserver_api::key::{KEY_SIZE, Key};
|
||||
use pageserver_api::value::Value;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::id::TimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::shard::TenantShardId;
|
||||
@@ -179,7 +180,7 @@ impl BatchLayerWriter {
|
||||
|
||||
/// An image writer that takes images and produces multiple image layers.
|
||||
#[must_use]
|
||||
pub struct SplitImageLayerWriter {
|
||||
pub struct SplitImageLayerWriter<'a> {
|
||||
inner: ImageLayerWriter,
|
||||
target_layer_size: u64,
|
||||
lsn: Lsn,
|
||||
@@ -188,9 +189,12 @@ pub struct SplitImageLayerWriter {
|
||||
tenant_shard_id: TenantShardId,
|
||||
batches: BatchLayerWriter,
|
||||
start_key: Key,
|
||||
gate: &'a utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
impl SplitImageLayerWriter {
|
||||
impl<'a> SplitImageLayerWriter<'a> {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn new(
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
@@ -198,6 +202,8 @@ impl SplitImageLayerWriter {
|
||||
start_key: Key,
|
||||
lsn: Lsn,
|
||||
target_layer_size: u64,
|
||||
gate: &'a utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Self> {
|
||||
Ok(Self {
|
||||
@@ -208,6 +214,8 @@ impl SplitImageLayerWriter {
|
||||
tenant_shard_id,
|
||||
&(start_key..Key::MAX),
|
||||
lsn,
|
||||
gate,
|
||||
cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
@@ -217,6 +225,8 @@ impl SplitImageLayerWriter {
|
||||
batches: BatchLayerWriter::new(conf).await?,
|
||||
lsn,
|
||||
start_key,
|
||||
gate,
|
||||
cancel,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -239,6 +249,8 @@ impl SplitImageLayerWriter {
|
||||
self.tenant_shard_id,
|
||||
&(key..Key::MAX),
|
||||
self.lsn,
|
||||
self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -291,7 +303,7 @@ impl SplitImageLayerWriter {
|
||||
/// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
|
||||
/// will split them into multiple files based on size.
|
||||
#[must_use]
|
||||
pub struct SplitDeltaLayerWriter {
|
||||
pub struct SplitDeltaLayerWriter<'a> {
|
||||
inner: Option<(Key, DeltaLayerWriter)>,
|
||||
target_layer_size: u64,
|
||||
conf: &'static PageServerConf,
|
||||
@@ -300,15 +312,19 @@ pub struct SplitDeltaLayerWriter {
|
||||
lsn_range: Range<Lsn>,
|
||||
last_key_written: Key,
|
||||
batches: BatchLayerWriter,
|
||||
gate: &'a utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
impl SplitDeltaLayerWriter {
|
||||
impl<'a> SplitDeltaLayerWriter<'a> {
|
||||
pub async fn new(
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
lsn_range: Range<Lsn>,
|
||||
target_layer_size: u64,
|
||||
gate: &'a utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<Self> {
|
||||
Ok(Self {
|
||||
target_layer_size,
|
||||
@@ -319,6 +335,8 @@ impl SplitDeltaLayerWriter {
|
||||
lsn_range,
|
||||
last_key_written: Key::MIN,
|
||||
batches: BatchLayerWriter::new(conf).await?,
|
||||
gate,
|
||||
cancel,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -344,6 +362,8 @@ impl SplitDeltaLayerWriter {
|
||||
self.tenant_shard_id,
|
||||
key,
|
||||
self.lsn_range.clone(),
|
||||
self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
@@ -362,6 +382,8 @@ impl SplitDeltaLayerWriter {
|
||||
self.tenant_shard_id,
|
||||
key,
|
||||
self.lsn_range.clone(),
|
||||
self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -469,6 +491,8 @@ mod tests {
|
||||
get_key(0),
|
||||
Lsn(0x18),
|
||||
4 * 1024 * 1024,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
@@ -480,6 +504,8 @@ mod tests {
|
||||
tenant.tenant_shard_id,
|
||||
Lsn(0x18)..Lsn(0x20),
|
||||
4 * 1024 * 1024,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -546,6 +572,8 @@ mod tests {
|
||||
get_key(0),
|
||||
Lsn(0x18),
|
||||
4 * 1024 * 1024,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
@@ -556,6 +584,8 @@ mod tests {
|
||||
tenant.tenant_shard_id,
|
||||
Lsn(0x18)..Lsn(0x20),
|
||||
4 * 1024 * 1024,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -643,6 +673,8 @@ mod tests {
|
||||
get_key(0),
|
||||
Lsn(0x18),
|
||||
4 * 1024,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
@@ -654,6 +686,8 @@ mod tests {
|
||||
tenant.tenant_shard_id,
|
||||
Lsn(0x18)..Lsn(0x20),
|
||||
4 * 1024,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -730,6 +764,8 @@ mod tests {
|
||||
tenant.tenant_shard_id,
|
||||
Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
|
||||
4 * 1024 * 1024,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -34,6 +34,7 @@ use std::ops::Range;
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
|
||||
use anyhow::{Context, Result, bail, ensure};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
@@ -45,11 +46,10 @@ use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::ImageCompressionAlgorithm;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_api::value::Value;
|
||||
use rand::Rng;
|
||||
use rand::distributions::Alphanumeric;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::OnceCell;
|
||||
use tokio_epoll_uring::IoBuf;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::bin_ser::BeSer;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
@@ -287,19 +287,19 @@ impl DeltaLayer {
|
||||
key_start: Key,
|
||||
lsn_range: &Range<Lsn>,
|
||||
) -> Utf8PathBuf {
|
||||
let rand_string: String = rand::thread_rng()
|
||||
.sample_iter(&Alphanumeric)
|
||||
.take(8)
|
||||
.map(char::from)
|
||||
.collect();
|
||||
// Never reuse a filename in the lifetime of a pageserver process so that we need
|
||||
// not worry about laggard Drop impl's async unlink hitting an already reused filename.
|
||||
static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
|
||||
let filename_disambiguator =
|
||||
NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
conf.timeline_path(tenant_shard_id, timeline_id)
|
||||
.join(format!(
|
||||
"{}-XXX__{:016X}-{:016X}.{}.{}",
|
||||
"{}-XXX__{:016X}-{:016X}.{:x}.{}",
|
||||
key_start,
|
||||
u64::from(lsn_range.start),
|
||||
u64::from(lsn_range.end),
|
||||
rand_string,
|
||||
filename_disambiguator,
|
||||
TEMP_FILE_SUFFIX,
|
||||
))
|
||||
}
|
||||
@@ -394,18 +394,23 @@ struct DeltaLayerWriterInner {
|
||||
|
||||
// Number of key-lsns in the layer.
|
||||
num_keys: usize,
|
||||
|
||||
_gate_guard: utils::sync::gate::GateGuard,
|
||||
}
|
||||
|
||||
impl DeltaLayerWriterInner {
|
||||
///
|
||||
/// Start building a new delta layer.
|
||||
///
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn new(
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
key_start: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
gate: &utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Self> {
|
||||
// Create the file initially with a temporary filename. We don't know
|
||||
@@ -420,7 +425,7 @@ impl DeltaLayerWriterInner {
|
||||
let mut file = VirtualFile::create(&path, ctx).await?;
|
||||
// make room for the header block
|
||||
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
|
||||
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
|
||||
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx);
|
||||
|
||||
// Initialize the b-tree index builder
|
||||
let block_buf = BlockBuf::new();
|
||||
@@ -435,6 +440,7 @@ impl DeltaLayerWriterInner {
|
||||
tree: tree_builder,
|
||||
blob_writer,
|
||||
num_keys: 0,
|
||||
_gate_guard: gate.enter()?,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -628,12 +634,15 @@ impl DeltaLayerWriter {
|
||||
///
|
||||
/// Start building a new delta layer.
|
||||
///
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn new(
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
key_start: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
gate: &utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Self> {
|
||||
Ok(Self {
|
||||
@@ -644,6 +653,8 @@ impl DeltaLayerWriter {
|
||||
tenant_shard_id,
|
||||
key_start,
|
||||
lsn_range,
|
||||
gate,
|
||||
cancel,
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
@@ -719,12 +730,22 @@ impl DeltaLayerWriter {
|
||||
|
||||
impl Drop for DeltaLayerWriter {
|
||||
fn drop(&mut self) {
|
||||
if let Some(inner) = self.inner.take() {
|
||||
// We want to remove the virtual file here, so it's fine to not
|
||||
// having completely flushed unwritten data.
|
||||
let vfile = inner.blob_writer.into_inner_no_flush();
|
||||
let Some(inner) = self.inner.take() else {
|
||||
return;
|
||||
};
|
||||
|
||||
tokio::spawn(async move {
|
||||
let DeltaLayerWriterInner {
|
||||
blob_writer,
|
||||
_gate_guard,
|
||||
..
|
||||
} = inner;
|
||||
|
||||
let vfile = blob_writer.into_inner_no_flush();
|
||||
vfile.remove();
|
||||
}
|
||||
|
||||
drop(_gate_guard);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1600,8 +1621,8 @@ pub(crate) mod test {
|
||||
use bytes::Bytes;
|
||||
use itertools::MinMaxResult;
|
||||
use pageserver_api::value::Value;
|
||||
use rand::RngCore;
|
||||
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
|
||||
use rand::{Rng, RngCore};
|
||||
|
||||
use super::*;
|
||||
use crate::DEFAULT_PG_VERSION;
|
||||
@@ -1885,6 +1906,8 @@ pub(crate) mod test {
|
||||
harness.tenant_shard_id,
|
||||
entries_meta.key_range.start,
|
||||
entries_meta.lsn_range.clone(),
|
||||
&timeline.gate,
|
||||
timeline.cancel.clone(),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -2079,6 +2102,8 @@ pub(crate) mod test {
|
||||
tenant.tenant_shard_id,
|
||||
Key::MIN,
|
||||
Lsn(0x11)..truncate_at,
|
||||
&branch.gate,
|
||||
branch.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
@@ -2213,6 +2238,8 @@ pub(crate) mod test {
|
||||
tenant.tenant_shard_id,
|
||||
*key_start,
|
||||
(*lsn_min)..lsn_end,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -32,6 +32,7 @@ use std::ops::Range;
|
||||
use std::os::unix::prelude::FileExt;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
|
||||
use anyhow::{Context, Result, bail, ensure};
|
||||
use bytes::Bytes;
|
||||
@@ -43,11 +44,10 @@ use pageserver_api::key::{DBDIR_KEY, KEY_SIZE, Key};
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::shard::{ShardIdentity, TenantShardId};
|
||||
use pageserver_api::value::Value;
|
||||
use rand::Rng;
|
||||
use rand::distributions::Alphanumeric;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::OnceCell;
|
||||
use tokio_stream::StreamExt;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::bin_ser::BeSer;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
@@ -251,14 +251,17 @@ impl ImageLayer {
|
||||
tenant_shard_id: TenantShardId,
|
||||
fname: &ImageLayerName,
|
||||
) -> Utf8PathBuf {
|
||||
let rand_string: String = rand::thread_rng()
|
||||
.sample_iter(&Alphanumeric)
|
||||
.take(8)
|
||||
.map(char::from)
|
||||
.collect();
|
||||
// Never reuse a filename in the lifetime of a pageserver process so that we need
|
||||
// not worry about laggard Drop impl's async unlink hitting an already reused filename.
|
||||
static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
|
||||
let filename_disambiguator =
|
||||
NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
conf.timeline_path(&tenant_shard_id, &timeline_id)
|
||||
.join(format!("{fname}.{rand_string}.{TEMP_FILE_SUFFIX}"))
|
||||
.join(format!(
|
||||
"{fname}.{:x}.{TEMP_FILE_SUFFIX}",
|
||||
filename_disambiguator
|
||||
))
|
||||
}
|
||||
|
||||
///
|
||||
@@ -742,18 +745,23 @@ struct ImageLayerWriterInner {
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
last_written_key: Key,
|
||||
|
||||
_gate_guard: utils::sync::gate::GateGuard,
|
||||
}
|
||||
|
||||
impl ImageLayerWriterInner {
|
||||
///
|
||||
/// Start building a new image layer.
|
||||
///
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn new(
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
key_range: &Range<Key>,
|
||||
lsn: Lsn,
|
||||
gate: &utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Self> {
|
||||
// Create the file initially with a temporary filename.
|
||||
@@ -780,7 +788,7 @@ impl ImageLayerWriterInner {
|
||||
};
|
||||
// make room for the header block
|
||||
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
|
||||
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
|
||||
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx);
|
||||
|
||||
// Initialize the b-tree index builder
|
||||
let block_buf = BlockBuf::new();
|
||||
@@ -801,6 +809,7 @@ impl ImageLayerWriterInner {
|
||||
num_keys: 0,
|
||||
#[cfg(feature = "testing")]
|
||||
last_written_key: Key::MIN,
|
||||
_gate_guard: gate.enter()?,
|
||||
};
|
||||
|
||||
Ok(writer)
|
||||
@@ -988,18 +997,30 @@ impl ImageLayerWriter {
|
||||
///
|
||||
/// Start building a new image layer.
|
||||
///
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn new(
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
key_range: &Range<Key>,
|
||||
lsn: Lsn,
|
||||
gate: &utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ImageLayerWriter> {
|
||||
Ok(Self {
|
||||
inner: Some(
|
||||
ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn, ctx)
|
||||
.await?,
|
||||
ImageLayerWriterInner::new(
|
||||
conf,
|
||||
timeline_id,
|
||||
tenant_shard_id,
|
||||
key_range,
|
||||
lsn,
|
||||
gate,
|
||||
cancel,
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
),
|
||||
})
|
||||
}
|
||||
@@ -1050,9 +1071,22 @@ impl ImageLayerWriter {
|
||||
|
||||
impl Drop for ImageLayerWriter {
|
||||
fn drop(&mut self) {
|
||||
if let Some(inner) = self.inner.take() {
|
||||
inner.blob_writer.into_inner().remove();
|
||||
}
|
||||
let Some(inner) = self.inner.take() else {
|
||||
return;
|
||||
};
|
||||
|
||||
tokio::spawn(async move {
|
||||
let ImageLayerWriterInner {
|
||||
blob_writer,
|
||||
_gate_guard,
|
||||
..
|
||||
} = inner;
|
||||
|
||||
let vfile = blob_writer.into_inner();
|
||||
vfile.remove();
|
||||
|
||||
drop(_gate_guard);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1203,6 +1237,8 @@ mod test {
|
||||
harness.tenant_shard_id,
|
||||
&range,
|
||||
lsn,
|
||||
&timeline.gate,
|
||||
timeline.cancel.clone(),
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
@@ -1268,6 +1304,8 @@ mod test {
|
||||
harness.tenant_shard_id,
|
||||
&range,
|
||||
lsn,
|
||||
&timeline.gate,
|
||||
timeline.cancel.clone(),
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
@@ -1346,6 +1384,8 @@ mod test {
|
||||
tenant.tenant_shard_id,
|
||||
&key_range,
|
||||
lsn,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -719,6 +719,8 @@ impl InMemoryLayer {
|
||||
ctx: &RequestContext,
|
||||
key_range: Option<Range<Key>>,
|
||||
l0_flush_global_state: &l0_flush::Inner,
|
||||
gate: &utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<Option<(PersistentLayerDesc, Utf8PathBuf)>> {
|
||||
// Grab the lock in read-mode. We hold it over the I/O, but because this
|
||||
// layer is not writeable anymore, no one should be trying to acquire the
|
||||
@@ -759,6 +761,8 @@ impl InMemoryLayer {
|
||||
self.tenant_shard_id,
|
||||
Key::MIN,
|
||||
self.start_lsn..end_lsn,
|
||||
gate,
|
||||
cancel,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -2723,10 +2723,6 @@ impl Timeline {
|
||||
.tenant_conf
|
||||
.gc_compaction_enabled
|
||||
.unwrap_or(self.conf.default_tenant_conf.gc_compaction_enabled);
|
||||
let gc_compaction_verification = tenant_conf
|
||||
.tenant_conf
|
||||
.gc_compaction_verification
|
||||
.unwrap_or(self.conf.default_tenant_conf.gc_compaction_verification);
|
||||
let gc_compaction_initial_threshold_kb = tenant_conf
|
||||
.tenant_conf
|
||||
.gc_compaction_initial_threshold_kb
|
||||
@@ -2741,7 +2737,6 @@ impl Timeline {
|
||||
.unwrap_or(self.conf.default_tenant_conf.gc_compaction_ratio_percent);
|
||||
GcCompactionCombinedSettings {
|
||||
gc_compaction_enabled,
|
||||
gc_compaction_verification,
|
||||
gc_compaction_initial_threshold_kb,
|
||||
gc_compaction_ratio_percent,
|
||||
}
|
||||
@@ -4810,7 +4805,13 @@ impl Timeline {
|
||||
let ctx = ctx.attached_child();
|
||||
let work = async move {
|
||||
let Some((desc, path)) = frozen_layer
|
||||
.write_to_disk(&ctx, key_range, self_clone.l0_flush_global_state.inner())
|
||||
.write_to_disk(
|
||||
&ctx,
|
||||
key_range,
|
||||
self_clone.l0_flush_global_state.inner(),
|
||||
&self_clone.gate,
|
||||
self_clone.cancel.clone(),
|
||||
)
|
||||
.await?
|
||||
else {
|
||||
return Ok(None);
|
||||
@@ -5348,6 +5349,8 @@ impl Timeline {
|
||||
self.tenant_shard_id,
|
||||
&img_range,
|
||||
lsn,
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -6712,6 +6715,8 @@ impl Timeline {
|
||||
self.tenant_shard_id,
|
||||
&(min_key..end_key),
|
||||
lsn,
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -6773,6 +6778,8 @@ impl Timeline {
|
||||
self.tenant_shard_id,
|
||||
deltas.key_range.start,
|
||||
deltas.lsn_range,
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -80,7 +80,6 @@ impl std::fmt::Display for GcCompactionJobId {
|
||||
|
||||
pub struct GcCompactionCombinedSettings {
|
||||
pub gc_compaction_enabled: bool,
|
||||
pub gc_compaction_verification: bool,
|
||||
pub gc_compaction_initial_threshold_kb: u64,
|
||||
pub gc_compaction_ratio_percent: u64,
|
||||
}
|
||||
@@ -226,7 +225,6 @@ impl GcCompactionQueue {
|
||||
gc_compaction_enabled,
|
||||
gc_compaction_initial_threshold_kb,
|
||||
gc_compaction_ratio_percent,
|
||||
..
|
||||
} = timeline.get_gc_compaction_settings();
|
||||
if !gc_compaction_enabled {
|
||||
return Ok(());
|
||||
@@ -749,8 +747,8 @@ impl KeyHistoryRetention {
|
||||
async fn pipe_to(
|
||||
self,
|
||||
key: Key,
|
||||
delta_writer: &mut SplitDeltaLayerWriter,
|
||||
mut image_writer: Option<&mut SplitImageLayerWriter>,
|
||||
delta_writer: &mut SplitDeltaLayerWriter<'_>,
|
||||
mut image_writer: Option<&mut SplitImageLayerWriter<'_>>,
|
||||
stat: &mut CompactionStatistics,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
@@ -790,114 +788,6 @@ impl KeyHistoryRetention {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Verify if every key in the retention is readable by replaying the logs.
|
||||
async fn verify(
|
||||
&self,
|
||||
key: Key,
|
||||
base_img_from_ancestor: &Option<(Key, Lsn, Bytes)>,
|
||||
full_history: &[(Key, Lsn, Value)],
|
||||
tline: &Arc<Timeline>,
|
||||
) -> anyhow::Result<()> {
|
||||
// Usually the min_lsn should be the first record but we do a full iteration to be safe.
|
||||
let Some(min_lsn) = full_history.iter().map(|(_, lsn, _)| *lsn).min() else {
|
||||
// This should never happen b/c if we don't have any history of a key, we won't even do `generate_key_retention`.
|
||||
return Ok(());
|
||||
};
|
||||
let Some(max_lsn) = full_history.iter().map(|(_, lsn, _)| *lsn).max() else {
|
||||
// This should never happen b/c if we don't have any history of a key, we won't even do `generate_key_retention`.
|
||||
return Ok(());
|
||||
};
|
||||
let mut base_img = base_img_from_ancestor
|
||||
.as_ref()
|
||||
.map(|(_, lsn, img)| (*lsn, img));
|
||||
let mut history = Vec::new();
|
||||
|
||||
async fn collect_and_verify(
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
base_img: &Option<(Lsn, &Bytes)>,
|
||||
history: &[(Lsn, &NeonWalRecord)],
|
||||
tline: &Arc<Timeline>,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut records = history
|
||||
.iter()
|
||||
.map(|(lsn, val)| (*lsn, (*val).clone()))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// WAL redo requires records in the reverse LSN order
|
||||
records.reverse();
|
||||
let data = ValueReconstructState {
|
||||
img: base_img.as_ref().map(|(lsn, img)| (*lsn, (*img).clone())),
|
||||
records,
|
||||
};
|
||||
|
||||
tline
|
||||
.reconstruct_value(key, lsn, data, RedoAttemptType::GcCompaction)
|
||||
.await
|
||||
.with_context(|| format!("verification failed for key {} at lsn {}", key, lsn))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
for (retain_lsn, KeyLogAtLsn(logs)) in &self.below_horizon {
|
||||
for (lsn, val) in logs {
|
||||
match val {
|
||||
Value::Image(img) => {
|
||||
base_img = Some((*lsn, img));
|
||||
history.clear();
|
||||
}
|
||||
Value::WalRecord(rec) if val.will_init() => {
|
||||
base_img = None;
|
||||
history.clear();
|
||||
history.push((*lsn, rec));
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
history.push((*lsn, rec));
|
||||
}
|
||||
}
|
||||
}
|
||||
if *retain_lsn >= min_lsn {
|
||||
// Only verify after the key appears in the full history for the first time.
|
||||
|
||||
if base_img.is_none() && history.is_empty() {
|
||||
anyhow::bail!(
|
||||
"verificatoin failed: key {} has no history at {}",
|
||||
key,
|
||||
retain_lsn
|
||||
);
|
||||
};
|
||||
// We don't modify history: in theory, we could replace the history with a single
|
||||
// image as in `generate_key_retention` to make redos at later LSNs faster. But we
|
||||
// want to verify everything as if they are read from the real layer map.
|
||||
collect_and_verify(key, *retain_lsn, &base_img, &history, tline).await?;
|
||||
}
|
||||
}
|
||||
|
||||
for (lsn, val) in &self.above_horizon.0 {
|
||||
match val {
|
||||
Value::Image(img) => {
|
||||
// Above the GC horizon, we verify every time we see an image.
|
||||
collect_and_verify(key, *lsn, &base_img, &history, tline).await?;
|
||||
base_img = Some((*lsn, img));
|
||||
history.clear();
|
||||
}
|
||||
Value::WalRecord(rec) if val.will_init() => {
|
||||
// Above the GC horizon, we verify every time we see an init record.
|
||||
collect_and_verify(key, *lsn, &base_img, &history, tline).await?;
|
||||
base_img = None;
|
||||
history.clear();
|
||||
history.push((*lsn, rec));
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
history.push((*lsn, rec));
|
||||
}
|
||||
}
|
||||
}
|
||||
// Ensure the latest record is readable.
|
||||
collect_and_verify(key, max_lsn, &base_img, &history, tline).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Default)]
|
||||
@@ -1229,17 +1119,7 @@ impl Timeline {
|
||||
// being potentially much longer.
|
||||
let rewrite_max = partition_count;
|
||||
|
||||
let outcome = self
|
||||
.compact_shard_ancestors(
|
||||
rewrite_max,
|
||||
options.flags.contains(CompactFlags::YieldForL0),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
match outcome {
|
||||
CompactionOutcome::Pending | CompactionOutcome::YieldForL0 => return Ok(outcome),
|
||||
CompactionOutcome::Done | CompactionOutcome::Skipped => {}
|
||||
}
|
||||
self.compact_shard_ancestors(rewrite_max, ctx).await?;
|
||||
}
|
||||
|
||||
Ok(CompactionOutcome::Done)
|
||||
@@ -1256,12 +1136,11 @@ impl Timeline {
|
||||
async fn compact_shard_ancestors(
|
||||
self: &Arc<Self>,
|
||||
rewrite_max: usize,
|
||||
yield_for_l0: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<CompactionOutcome, CompactionError> {
|
||||
let mut outcome = CompactionOutcome::Done;
|
||||
) -> Result<(), CompactionError> {
|
||||
let mut drop_layers = Vec::new();
|
||||
let mut layers_to_rewrite: Vec<Layer> = Vec::new();
|
||||
let mut rewrite_max_exceeded: bool = false;
|
||||
|
||||
// We will use the Lsn cutoff of the last GC as a threshold for rewriting layers: if a
|
||||
// layer is behind this Lsn, it indicates that the layer is being retained beyond the
|
||||
@@ -1354,8 +1233,8 @@ impl Timeline {
|
||||
debug!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
|
||||
layers_to_rewrite.len()
|
||||
);
|
||||
outcome = CompactionOutcome::Pending;
|
||||
break;
|
||||
rewrite_max_exceeded = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Fall through: all our conditions for doing a rewrite passed.
|
||||
@@ -1367,7 +1246,7 @@ impl Timeline {
|
||||
|
||||
// Drop out early if there's nothing to do.
|
||||
if layers_to_rewrite.is_empty() && drop_layers.is_empty() {
|
||||
return Ok(CompactionOutcome::Done);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
info!(
|
||||
@@ -1394,6 +1273,8 @@ impl Timeline {
|
||||
self.tenant_shard_id,
|
||||
&layer.layer_desc().key_range,
|
||||
layer.layer_desc().image_layer_lsn(),
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
@@ -1435,20 +1316,6 @@ impl Timeline {
|
||||
// the layer has no data for us with the ShardedRange check above, but
|
||||
drop_layers.push(layer);
|
||||
}
|
||||
|
||||
// Yield for L0 compaction if necessary, but make sure we update the layer map below
|
||||
// with the work we've already done.
|
||||
if yield_for_l0
|
||||
&& self
|
||||
.l0_compaction_trigger
|
||||
.notified()
|
||||
.now_or_never()
|
||||
.is_some()
|
||||
{
|
||||
info!("shard ancestor compaction yielding for L0 compaction");
|
||||
outcome = CompactionOutcome::YieldForL0;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
for layer in &drop_layers {
|
||||
@@ -1472,36 +1339,27 @@ impl Timeline {
|
||||
// necessary for correctness, but it simplifies testing, and avoids proceeding with another
|
||||
// Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
|
||||
// load.
|
||||
if outcome != CompactionOutcome::YieldForL0 {
|
||||
info!("shard ancestor compaction waiting for uploads");
|
||||
tokio::select! {
|
||||
result = self.remote_client.wait_completion() => match result {
|
||||
Ok(()) => {},
|
||||
Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)),
|
||||
Err(WaitCompletionError::UploadQueueShutDownOrStopped) => {
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
}
|
||||
},
|
||||
// Don't wait if there's L0 compaction to do. We don't need to update the outcome
|
||||
// here, because we've already done the actual work.
|
||||
_ = self.l0_compaction_trigger.notified(), if yield_for_l0 => {},
|
||||
info!("shard ancestor compaction waiting for uploads");
|
||||
match self.remote_client.wait_completion().await {
|
||||
Ok(()) => (),
|
||||
Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)),
|
||||
Err(WaitCompletionError::UploadQueueShutDownOrStopped) => {
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
}
|
||||
}
|
||||
|
||||
info!(
|
||||
"shard ancestor compaction done in {:.3}s{}",
|
||||
started.elapsed().as_secs_f64(),
|
||||
match outcome {
|
||||
CompactionOutcome::Pending =>
|
||||
format!(", with pending work (rewrite_max={rewrite_max})"),
|
||||
CompactionOutcome::YieldForL0 => String::from(", yielding for L0 compaction"),
|
||||
CompactionOutcome::Skipped | CompactionOutcome::Done => String::new(),
|
||||
match rewrite_max_exceeded {
|
||||
true => format!(", more work pending due to rewrite_max={rewrite_max}"),
|
||||
false => String::new(),
|
||||
}
|
||||
);
|
||||
|
||||
fail::fail_point!("compact-shard-ancestors-persistent");
|
||||
|
||||
Ok(outcome)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Update the LayerVisibilityHint of layers covered by image layers, based on whether there is
|
||||
@@ -2033,6 +1891,8 @@ impl Timeline {
|
||||
debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
|
||||
lsn_range.clone()
|
||||
},
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
@@ -2320,7 +2180,6 @@ impl Timeline {
|
||||
/// ```
|
||||
///
|
||||
/// Note that `accumulated_values` must be sorted by LSN and should belong to a single key.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) async fn generate_key_retention(
|
||||
self: &Arc<Timeline>,
|
||||
key: Key,
|
||||
@@ -2329,7 +2188,6 @@ impl Timeline {
|
||||
retain_lsn_below_horizon: &[Lsn],
|
||||
delta_threshold_cnt: usize,
|
||||
base_img_from_ancestor: Option<(Key, Lsn, Bytes)>,
|
||||
verification: bool,
|
||||
) -> anyhow::Result<KeyHistoryRetention> {
|
||||
// Pre-checks for the invariants
|
||||
|
||||
@@ -2416,8 +2274,8 @@ impl Timeline {
|
||||
"should have at least below + above horizon batches"
|
||||
);
|
||||
let mut replay_history: Vec<(Key, Lsn, Value)> = Vec::new();
|
||||
if let Some((key, lsn, ref img)) = base_img_from_ancestor {
|
||||
replay_history.push((key, lsn, Value::Image(img.clone())));
|
||||
if let Some((key, lsn, img)) = base_img_from_ancestor {
|
||||
replay_history.push((key, lsn, Value::Image(img)));
|
||||
}
|
||||
|
||||
/// Generate debug information for the replay history
|
||||
@@ -2531,15 +2389,22 @@ impl Timeline {
|
||||
// Whether to reconstruct the image. In debug mode, we will generate an image
|
||||
// at every retain_lsn to ensure data is not corrupted, but we won't put the
|
||||
// image into the final layer.
|
||||
let img_and_lsn = if produce_image {
|
||||
let generate_image = produce_image || debug_mode;
|
||||
if produce_image {
|
||||
records_since_last_image = 0;
|
||||
}
|
||||
let img_and_lsn = if generate_image {
|
||||
let replay_history_for_debug = if debug_mode {
|
||||
Some(replay_history.clone())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let replay_history_for_debug_ref = replay_history_for_debug.as_deref();
|
||||
let history = std::mem::take(&mut replay_history);
|
||||
let history = if produce_image {
|
||||
std::mem::take(&mut replay_history)
|
||||
} else {
|
||||
replay_history.clone()
|
||||
};
|
||||
let mut img = None;
|
||||
let mut records = Vec::with_capacity(history.len());
|
||||
if let (_, lsn, Value::Image(val)) = history.first().as_ref().unwrap() {
|
||||
@@ -2574,7 +2439,6 @@ impl Timeline {
|
||||
records.push((lsn, rec));
|
||||
}
|
||||
}
|
||||
// WAL redo requires records in the reverse LSN order
|
||||
records.reverse();
|
||||
let state = ValueReconstructState { img, records };
|
||||
// last batch does not generate image so i is always in range, unless we force generate
|
||||
@@ -2607,16 +2471,10 @@ impl Timeline {
|
||||
assert_eq!(retention.len(), lsn_split_points.len() + 1);
|
||||
for (idx, logs) in retention.into_iter().enumerate() {
|
||||
if idx == lsn_split_points.len() {
|
||||
let retention = KeyHistoryRetention {
|
||||
return Ok(KeyHistoryRetention {
|
||||
below_horizon: result,
|
||||
above_horizon: KeyLogAtLsn(logs),
|
||||
};
|
||||
if verification {
|
||||
retention
|
||||
.verify(key, &base_img_from_ancestor, full_history, self)
|
||||
.await?;
|
||||
}
|
||||
return Ok(retention);
|
||||
});
|
||||
} else {
|
||||
result.push((lsn_split_points[idx], KeyLogAtLsn(logs)));
|
||||
}
|
||||
@@ -3083,9 +2941,6 @@ impl Timeline {
|
||||
}
|
||||
(false, res)
|
||||
};
|
||||
|
||||
let verification = self.get_gc_compaction_settings().gc_compaction_verification;
|
||||
|
||||
info!(
|
||||
"picked {} layers for compaction ({} layers need rewriting) with max_layer_lsn={} min_layer_lsn={} gc_cutoff={} lowest_retain_lsn={}, key_range={}..{}, has_data_below={}",
|
||||
job_desc.selected_layers.len(),
|
||||
@@ -3232,6 +3087,8 @@ impl Timeline {
|
||||
job_desc.compaction_key_range.start,
|
||||
lowest_retain_lsn,
|
||||
self.get_compaction_target_size(),
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
@@ -3248,6 +3105,8 @@ impl Timeline {
|
||||
self.tenant_shard_id,
|
||||
lowest_retain_lsn..end_lsn,
|
||||
self.get_compaction_target_size(),
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
)
|
||||
.await
|
||||
.context("failed to create delta layer writer")
|
||||
@@ -3344,6 +3203,8 @@ impl Timeline {
|
||||
self.tenant_shard_id,
|
||||
desc.key_range.start,
|
||||
desc.lsn_range.clone(),
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
@@ -3361,6 +3222,8 @@ impl Timeline {
|
||||
self.tenant_shard_id,
|
||||
job_desc.compaction_key_range.end,
|
||||
desc.lsn_range.clone(),
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
@@ -3402,7 +3265,6 @@ impl Timeline {
|
||||
.await
|
||||
.context("failed to get ancestor image")
|
||||
.map_err(CompactionError::Other)?,
|
||||
verification,
|
||||
)
|
||||
.await
|
||||
.context("failed to generate key retention")
|
||||
@@ -3443,7 +3305,6 @@ impl Timeline {
|
||||
.await
|
||||
.context("failed to get ancestor image")
|
||||
.map_err(CompactionError::Other)?,
|
||||
verification,
|
||||
)
|
||||
.await
|
||||
.context("failed to generate key retention")
|
||||
@@ -3932,6 +3793,8 @@ impl CompactionJobExecutor for TimelineAdaptor {
|
||||
self.timeline.tenant_shard_id,
|
||||
key_range.start,
|
||||
lsn_range.clone(),
|
||||
&self.timeline.gate,
|
||||
self.timeline.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -4007,6 +3870,8 @@ impl TimelineAdaptor {
|
||||
self.timeline.tenant_shard_id,
|
||||
key_range,
|
||||
lsn,
|
||||
&self.timeline.gate,
|
||||
self.timeline.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -231,6 +231,8 @@ async fn generate_tombstone_image_layer(
|
||||
detached.tenant_shard_id,
|
||||
&key_range,
|
||||
image_lsn,
|
||||
&detached.gate,
|
||||
detached.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
@@ -779,6 +781,8 @@ async fn copy_lsn_prefix(
|
||||
target_timeline.tenant_shard_id,
|
||||
layer.layer_desc().key_range.start,
|
||||
layer.layer_desc().lsn_range.start..end_lsn,
|
||||
&target_timeline.gate,
|
||||
target_timeline.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -738,6 +738,8 @@ impl ChunkProcessingJob {
|
||||
self.timeline.tenant_shard_id,
|
||||
&self.range,
|
||||
self.pgdata_lsn,
|
||||
&self.timeline.gate,
|
||||
self.timeline.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -134,6 +134,20 @@ where
|
||||
Ok((bytes_amount, writer))
|
||||
}
|
||||
|
||||
pub async fn into_inner_no_flush(self) -> Arc<W> {
|
||||
let Self {
|
||||
mutable: buf,
|
||||
maybe_flushed: _,
|
||||
writer,
|
||||
mut flush_handle,
|
||||
bytes_submitted: _,
|
||||
} = self;
|
||||
// If the flush task panicked, that's fine.
|
||||
let _ = flush_handle.shutdown().await;
|
||||
assert!(buf.is_some());
|
||||
writer
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn mutable(&self) -> &B {
|
||||
self.mutable.as_ref().expect("must not use after an error")
|
||||
|
||||
@@ -20,6 +20,11 @@ pub struct FlushHandleInner<Buf, W> {
|
||||
/// A bi-directional channel that sends (buffer, offset) for writes,
|
||||
/// and receives recyled buffer.
|
||||
channel: duplex::mpsc::Duplex<FlushRequest<Buf>, FullSlice<Buf>>,
|
||||
/// The flush task is sometimes sensitive to channel disconnection
|
||||
/// (i.e. when we drop [`Self::channel`]), other times sensitive to
|
||||
/// [`FlushBackgroundTask::cancel`], but never both.
|
||||
/// So, also store this drop guard.
|
||||
set_flush_task_cancelled: tokio_util::sync::DropGuard,
|
||||
/// Join handle for the background flush task.
|
||||
join_handle: tokio::task::JoinHandle<Result<Arc<W>, FlushTaskError>>,
|
||||
}
|
||||
@@ -134,8 +139,10 @@ where
|
||||
back.try_send(buf.flush())
|
||||
.expect("we just created it with capacity 1");
|
||||
|
||||
let cancel = cancel.child_token();
|
||||
|
||||
let join_handle = tokio::spawn(
|
||||
FlushBackgroundTask::new(back, file, gate_guard, cancel, ctx)
|
||||
FlushBackgroundTask::new(back, file, gate_guard, cancel.clone(), ctx)
|
||||
.run()
|
||||
.instrument(span),
|
||||
);
|
||||
@@ -143,6 +150,7 @@ where
|
||||
FlushHandle {
|
||||
inner: Some(FlushHandleInner {
|
||||
channel: front,
|
||||
set_flush_task_cancelled: cancel.drop_guard(),
|
||||
join_handle,
|
||||
}),
|
||||
}
|
||||
@@ -189,6 +197,7 @@ where
|
||||
.take()
|
||||
.expect("must not use after we returned an error");
|
||||
drop(handle.channel.tx);
|
||||
drop(handle.set_flush_task_cancelled);
|
||||
handle.join_handle.await.unwrap()
|
||||
}
|
||||
|
||||
|
||||
@@ -4302,10 +4302,10 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
|
||||
def respec_deep(self, **kwargs: Any) -> None:
|
||||
"""
|
||||
Update the endpoint.json file taking into account nested keys.
|
||||
Distinct method from respec() to do not break existing functionality.
|
||||
NOTE: This method also updates the config.json file, not endpoint.json.
|
||||
We need it because neon_local also writes to config.json, so intended
|
||||
Update the spec.json file taking into account nested keys.
|
||||
Distinct method from respec() to not break existing functionality.
|
||||
NOTE: This method also updates the spec.json file, not endpoint.json.
|
||||
We need it because neon_local also writes to spec.json, so intended
|
||||
use-case is i) start endpoint with some config, ii) respec_deep(),
|
||||
iii) call reconfigure() to apply the changes.
|
||||
"""
|
||||
@@ -4318,17 +4318,17 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
curr[k] = v
|
||||
return curr
|
||||
|
||||
config_path = os.path.join(self.endpoint_path(), "config.json")
|
||||
config_path = os.path.join(self.endpoint_path(), "spec.json")
|
||||
with open(config_path) as f:
|
||||
config: dict[str, Any] = json.load(f)
|
||||
data_dict: dict[str, Any] = json.load(f)
|
||||
|
||||
log.debug("Current compute config: %s", json.dumps(config, indent=4))
|
||||
log.debug("Current compute spec: %s", json.dumps(data_dict, indent=4))
|
||||
|
||||
update(config, kwargs)
|
||||
update(data_dict, kwargs)
|
||||
|
||||
with open(config_path, "w") as file:
|
||||
log.debug("Updating compute config to: %s", json.dumps(config, indent=4))
|
||||
json.dump(config, file, indent=4)
|
||||
log.debug("Updating compute spec to: %s", json.dumps(data_dict, indent=4))
|
||||
json.dump(data_dict, file, indent=4)
|
||||
|
||||
def wait_for_migrations(self, wait_for: int = NUM_COMPUTE_MIGRATIONS) -> None:
|
||||
"""
|
||||
@@ -4345,7 +4345,7 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
wait_until(check_migrations_done)
|
||||
|
||||
# Mock the extension part of spec passed from control plane for local testing
|
||||
# endpooint.rs adds content of this file as a part of the config.json
|
||||
# endpooint.rs adds content of this file as a part of the spec.json
|
||||
def create_remote_extension_spec(self, spec: dict[str, Any]):
|
||||
"""Create a remote extension spec file for the endpoint."""
|
||||
remote_extensions_spec_path = os.path.join(
|
||||
|
||||
@@ -187,7 +187,6 @@ def test_fully_custom_config(positive_env: NeonEnv):
|
||||
},
|
||||
"rel_size_v2_enabled": False, # test suite enables it by default as of https://github.com/neondatabase/neon/issues/11081, so, custom config means disabling it
|
||||
"gc_compaction_enabled": True,
|
||||
"gc_compaction_verification": False,
|
||||
"gc_compaction_initial_threshold_kb": 1024000,
|
||||
"gc_compaction_ratio_percent": 200,
|
||||
"image_creation_preempt_threshold": 5,
|
||||
|
||||
@@ -162,8 +162,6 @@ def test_pageserver_compaction_preempt(
|
||||
conf = PREEMPT_COMPACTION_TENANT_CONF.copy()
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=conf)
|
||||
|
||||
env.pageserver.allowed_errors.append(".*The timeline or pageserver is shutting down.*")
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
|
||||
@@ -90,12 +90,10 @@ def test_compute_catalog(neon_simple_env: NeonEnv):
|
||||
# and reconfigure the endpoint to create some test databases.
|
||||
endpoint.respec_deep(
|
||||
**{
|
||||
"spec": {
|
||||
"skip_pg_catalog_updates": False,
|
||||
"cluster": {
|
||||
"roles": TEST_ROLE_NAMES,
|
||||
"databases": TEST_DB_NAMES,
|
||||
},
|
||||
"skip_pg_catalog_updates": False,
|
||||
"cluster": {
|
||||
"roles": TEST_ROLE_NAMES,
|
||||
"databases": TEST_DB_NAMES,
|
||||
},
|
||||
}
|
||||
)
|
||||
@@ -157,12 +155,10 @@ def test_compute_create_drop_dbs_and_roles(neon_simple_env: NeonEnv):
|
||||
# and reconfigure the endpoint to apply the changes.
|
||||
endpoint.respec_deep(
|
||||
**{
|
||||
"spec": {
|
||||
"skip_pg_catalog_updates": False,
|
||||
"cluster": {
|
||||
"roles": TEST_ROLE_NAMES,
|
||||
"databases": TEST_DB_NAMES,
|
||||
},
|
||||
"skip_pg_catalog_updates": False,
|
||||
"cluster": {
|
||||
"roles": TEST_ROLE_NAMES,
|
||||
"databases": TEST_DB_NAMES,
|
||||
},
|
||||
}
|
||||
)
|
||||
@@ -200,14 +196,12 @@ def test_compute_create_drop_dbs_and_roles(neon_simple_env: NeonEnv):
|
||||
|
||||
endpoint.respec_deep(
|
||||
**{
|
||||
"spec": {
|
||||
"skip_pg_catalog_updates": False,
|
||||
"cluster": {
|
||||
"roles": [],
|
||||
"databases": [],
|
||||
},
|
||||
"delta_operations": delta_operations,
|
||||
"skip_pg_catalog_updates": False,
|
||||
"cluster": {
|
||||
"roles": [],
|
||||
"databases": [],
|
||||
},
|
||||
"delta_operations": delta_operations,
|
||||
}
|
||||
)
|
||||
endpoint.reconfigure()
|
||||
@@ -256,11 +250,9 @@ def test_dropdb_with_subscription(neon_simple_env: NeonEnv):
|
||||
# and reconfigure the endpoint to apply the changes.
|
||||
endpoint.respec_deep(
|
||||
**{
|
||||
"spec": {
|
||||
"skip_pg_catalog_updates": False,
|
||||
"cluster": {
|
||||
"databases": TEST_DB_NAMES,
|
||||
},
|
||||
"skip_pg_catalog_updates": False,
|
||||
"cluster": {
|
||||
"databases": TEST_DB_NAMES,
|
||||
},
|
||||
}
|
||||
)
|
||||
@@ -314,19 +306,17 @@ def test_dropdb_with_subscription(neon_simple_env: NeonEnv):
|
||||
# and reconfigure the endpoint to apply the changes.
|
||||
endpoint.respec_deep(
|
||||
**{
|
||||
"spec": {
|
||||
"skip_pg_catalog_updates": False,
|
||||
"cluster": {
|
||||
"databases": TEST_DB_NAMES_NEW,
|
||||
},
|
||||
"delta_operations": [
|
||||
{"action": "delete_db", "name": SUB_DB_NAME},
|
||||
# also test the case when we try to delete a non-existent database
|
||||
# shouldn't happen in normal operation,
|
||||
# but can occur when failed operations are retried
|
||||
{"action": "delete_db", "name": "nonexistent_db"},
|
||||
],
|
||||
"skip_pg_catalog_updates": False,
|
||||
"cluster": {
|
||||
"databases": TEST_DB_NAMES_NEW,
|
||||
},
|
||||
"delta_operations": [
|
||||
{"action": "delete_db", "name": SUB_DB_NAME},
|
||||
# also test the case when we try to delete a non-existent database
|
||||
# shouldn't happen in normal operation,
|
||||
# but can occur when failed operations are retried
|
||||
{"action": "delete_db", "name": "nonexistent_db"},
|
||||
],
|
||||
}
|
||||
)
|
||||
|
||||
@@ -364,27 +354,25 @@ def test_drop_role_with_table_privileges_from_neon_superuser(neon_simple_env: Ne
|
||||
|
||||
endpoint.respec_deep(
|
||||
**{
|
||||
"spec": {
|
||||
"skip_pg_catalog_updates": False,
|
||||
"cluster": {
|
||||
"roles": [
|
||||
{
|
||||
# We need to create role via compute_ctl, because in this case it will receive
|
||||
# additional grants equivalent to our real environment, so we can repro some
|
||||
# issues.
|
||||
"name": "neon",
|
||||
# Some autocomplete-suggested hash, no specific meaning.
|
||||
"encrypted_password": "SCRAM-SHA-256$4096:hBT22QjqpydQWqEulorfXA==$miBogcoj68JWYdsNB5PW1X6PjSLBEcNuctuhtGkb4PY=:hxk2gxkwxGo6P7GCtfpMlhA9zwHvPMsCz+NQf2HfvWk=",
|
||||
"options": [],
|
||||
},
|
||||
],
|
||||
"databases": [
|
||||
{
|
||||
"name": TEST_DB_NAME,
|
||||
"owner": "neon",
|
||||
},
|
||||
],
|
||||
},
|
||||
"skip_pg_catalog_updates": False,
|
||||
"cluster": {
|
||||
"roles": [
|
||||
{
|
||||
# We need to create role via compute_ctl, because in this case it will receive
|
||||
# additional grants equivalent to our real environment, so we can repro some
|
||||
# issues.
|
||||
"name": "neon",
|
||||
# Some autocomplete-suggested hash, no specific meaning.
|
||||
"encrypted_password": "SCRAM-SHA-256$4096:hBT22QjqpydQWqEulorfXA==$miBogcoj68JWYdsNB5PW1X6PjSLBEcNuctuhtGkb4PY=:hxk2gxkwxGo6P7GCtfpMlhA9zwHvPMsCz+NQf2HfvWk=",
|
||||
"options": [],
|
||||
},
|
||||
],
|
||||
"databases": [
|
||||
{
|
||||
"name": TEST_DB_NAME,
|
||||
"owner": "neon",
|
||||
},
|
||||
],
|
||||
},
|
||||
}
|
||||
)
|
||||
@@ -427,15 +415,13 @@ def test_drop_role_with_table_privileges_from_neon_superuser(neon_simple_env: Ne
|
||||
# Drop role via compute_ctl
|
||||
endpoint.respec_deep(
|
||||
**{
|
||||
"spec": {
|
||||
"skip_pg_catalog_updates": False,
|
||||
"delta_operations": [
|
||||
{
|
||||
"action": "delete_role",
|
||||
"name": TEST_GRANTEE,
|
||||
},
|
||||
],
|
||||
},
|
||||
"skip_pg_catalog_updates": False,
|
||||
"delta_operations": [
|
||||
{
|
||||
"action": "delete_role",
|
||||
"name": TEST_GRANTEE,
|
||||
},
|
||||
],
|
||||
}
|
||||
)
|
||||
endpoint.reconfigure()
|
||||
@@ -458,15 +444,13 @@ def test_drop_role_with_table_privileges_from_neon_superuser(neon_simple_env: Ne
|
||||
|
||||
endpoint.respec_deep(
|
||||
**{
|
||||
"spec": {
|
||||
"skip_pg_catalog_updates": False,
|
||||
"delta_operations": [
|
||||
{
|
||||
"action": "delete_role",
|
||||
"name": "readonly2",
|
||||
},
|
||||
],
|
||||
},
|
||||
"skip_pg_catalog_updates": False,
|
||||
"delta_operations": [
|
||||
{
|
||||
"action": "delete_role",
|
||||
"name": "readonly2",
|
||||
},
|
||||
],
|
||||
}
|
||||
)
|
||||
endpoint.reconfigure()
|
||||
@@ -491,27 +475,25 @@ def test_drop_role_with_table_privileges_from_non_neon_superuser(neon_simple_env
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
endpoint.respec_deep(
|
||||
**{
|
||||
"spec": {
|
||||
"skip_pg_catalog_updates": False,
|
||||
"cluster": {
|
||||
"roles": [
|
||||
{
|
||||
# We need to create role via compute_ctl, because in this case it will receive
|
||||
# additional grants equivalent to our real environment, so we can repro some
|
||||
# issues.
|
||||
"name": TEST_GRANTOR,
|
||||
# Some autocomplete-suggested hash, no specific meaning.
|
||||
"encrypted_password": "SCRAM-SHA-256$4096:hBT22QjqpydQWqEulorfXA==$miBogcoj68JWYdsNB5PW1X6PjSLBEcNuctuhtGkb4PY=:hxk2gxkwxGo6P7GCtfpMlhA9zwHvPMsCz+NQf2HfvWk=",
|
||||
"options": [],
|
||||
},
|
||||
],
|
||||
"databases": [
|
||||
{
|
||||
"name": TEST_DB_NAME,
|
||||
"owner": TEST_GRANTOR,
|
||||
},
|
||||
],
|
||||
},
|
||||
"skip_pg_catalog_updates": False,
|
||||
"cluster": {
|
||||
"roles": [
|
||||
{
|
||||
# We need to create role via compute_ctl, because in this case it will receive
|
||||
# additional grants equivalent to our real environment, so we can repro some
|
||||
# issues.
|
||||
"name": TEST_GRANTOR,
|
||||
# Some autocomplete-suggested hash, no specific meaning.
|
||||
"encrypted_password": "SCRAM-SHA-256$4096:hBT22QjqpydQWqEulorfXA==$miBogcoj68JWYdsNB5PW1X6PjSLBEcNuctuhtGkb4PY=:hxk2gxkwxGo6P7GCtfpMlhA9zwHvPMsCz+NQf2HfvWk=",
|
||||
"options": [],
|
||||
},
|
||||
],
|
||||
"databases": [
|
||||
{
|
||||
"name": TEST_DB_NAME,
|
||||
"owner": TEST_GRANTOR,
|
||||
},
|
||||
],
|
||||
},
|
||||
}
|
||||
)
|
||||
@@ -525,15 +507,13 @@ def test_drop_role_with_table_privileges_from_non_neon_superuser(neon_simple_env
|
||||
|
||||
endpoint.respec_deep(
|
||||
**{
|
||||
"spec": {
|
||||
"skip_pg_catalog_updates": False,
|
||||
"delta_operations": [
|
||||
{
|
||||
"action": "delete_role",
|
||||
"name": TEST_GRANTEE,
|
||||
},
|
||||
],
|
||||
},
|
||||
"skip_pg_catalog_updates": False,
|
||||
"delta_operations": [
|
||||
{
|
||||
"action": "delete_role",
|
||||
"name": TEST_GRANTEE,
|
||||
},
|
||||
],
|
||||
}
|
||||
)
|
||||
endpoint.reconfigure()
|
||||
|
||||
@@ -31,17 +31,15 @@ def test_compute_reconfigure(neon_simple_env: NeonEnv):
|
||||
|
||||
endpoint.respec_deep(
|
||||
**{
|
||||
"spec": {
|
||||
"skip_pg_catalog_updates": True,
|
||||
"cluster": {
|
||||
"settings": [
|
||||
{
|
||||
"name": "log_line_prefix",
|
||||
"vartype": "string",
|
||||
"value": TEST_LOG_LINE_PREFIX,
|
||||
}
|
||||
]
|
||||
},
|
||||
"skip_pg_catalog_updates": True,
|
||||
"cluster": {
|
||||
"settings": [
|
||||
{
|
||||
"name": "log_line_prefix",
|
||||
"vartype": "string",
|
||||
"value": TEST_LOG_LINE_PREFIX,
|
||||
}
|
||||
]
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
@@ -251,7 +251,7 @@ def test_multiple_subscription_branching(neon_simple_env: NeonEnv):
|
||||
NUMBER_OF_DBS = 5
|
||||
|
||||
# Create and start endpoint so that neon_local put all the generated
|
||||
# stuff into the config.json file.
|
||||
# stuff into the spec.json file.
|
||||
endpoint = env.endpoints.create_start(
|
||||
"main",
|
||||
config_lines=[
|
||||
@@ -280,15 +280,13 @@ def test_multiple_subscription_branching(neon_simple_env: NeonEnv):
|
||||
}
|
||||
)
|
||||
|
||||
# Update the config.json file to create the databases
|
||||
# Update the spec.json file to create the databases
|
||||
# and reconfigure the endpoint to apply the changes.
|
||||
endpoint.respec_deep(
|
||||
**{
|
||||
"spec": {
|
||||
"skip_pg_catalog_updates": False,
|
||||
"cluster": {
|
||||
"databases": TEST_DB_NAMES,
|
||||
},
|
||||
"skip_pg_catalog_updates": False,
|
||||
"cluster": {
|
||||
"databases": TEST_DB_NAMES,
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user