mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-13 16:32:56 +00:00
Switch to 'tracing' for logging, restructure code to make use of spans.
Refactors Compute::prepare_and_run. It's split into subroutines differently, to make it easier to attach tracing spans to the different stages. The high-level logic for waiting for Postgres to exit is moved to the caller. Replace 'env_logger' with 'tracing', and add `#instrument` directives to different stages fo the startup process. This is a fairly mechanical change, except for the changes in 'spec.rs'. 'spec.rs' contained some complicated formatting, where parts of log messages were printed directly to stdout with `print`s. That was a bit messed up because the log normally goes to stderr, but those lines were printed to stdout. In our docker images, stderr and stdout both go to the same place so you wouldn't notice, but I don't think it was intentional. This changes the log format to the default 'tracing_subscriber::format' format. It's different from the Postgres log format, however, and because both compute_tools and Postgres print to the same log, it's now a mix of two different formats. I'm not sure how the Grafana log parsing pipeline can handle that. If it's a problem, we can build custom formatter to change the compute_tools log format to be the same as Postgres's, like it was before this commit, or we can change the Postgres log format to match tracing_formatter's, or we can start printing compute_tool's log output to a different destination than Postgres
This commit is contained in:
committed by
Heikki Linnakangas
parent
90f66aa51b
commit
e5cc2f92c4
6
Cargo.lock
generated
6
Cargo.lock
generated
@@ -833,10 +833,8 @@ dependencies = [
|
|||||||
"anyhow",
|
"anyhow",
|
||||||
"chrono",
|
"chrono",
|
||||||
"clap 4.0.32",
|
"clap 4.0.32",
|
||||||
"env_logger",
|
|
||||||
"futures",
|
"futures",
|
||||||
"hyper",
|
"hyper",
|
||||||
"log",
|
|
||||||
"notify",
|
"notify",
|
||||||
"postgres",
|
"postgres",
|
||||||
"regex",
|
"regex",
|
||||||
@@ -845,6 +843,8 @@ dependencies = [
|
|||||||
"tar",
|
"tar",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-postgres",
|
"tokio-postgres",
|
||||||
|
"tracing",
|
||||||
|
"tracing-subscriber",
|
||||||
"url",
|
"url",
|
||||||
"workspace_hack",
|
"workspace_hack",
|
||||||
]
|
]
|
||||||
@@ -1954,7 +1954,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||||||
checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e"
|
checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"cfg-if",
|
"cfg-if",
|
||||||
"serde",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -4565,6 +4564,7 @@ dependencies = [
|
|||||||
"tower",
|
"tower",
|
||||||
"tracing",
|
"tracing",
|
||||||
"tracing-core",
|
"tracing-core",
|
||||||
|
"tracing-subscriber",
|
||||||
"url",
|
"url",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|||||||
@@ -8,10 +8,8 @@ license.workspace = true
|
|||||||
anyhow.workspace = true
|
anyhow.workspace = true
|
||||||
chrono.workspace = true
|
chrono.workspace = true
|
||||||
clap.workspace = true
|
clap.workspace = true
|
||||||
env_logger.workspace = true
|
|
||||||
futures.workspace = true
|
futures.workspace = true
|
||||||
hyper = { workspace = true, features = ["full"] }
|
hyper = { workspace = true, features = ["full"] }
|
||||||
log = { workspace = true, features = ["std", "serde"] }
|
|
||||||
notify.workspace = true
|
notify.workspace = true
|
||||||
postgres.workspace = true
|
postgres.workspace = true
|
||||||
regex.workspace = true
|
regex.workspace = true
|
||||||
@@ -20,6 +18,8 @@ serde_json.workspace = true
|
|||||||
tar.workspace = true
|
tar.workspace = true
|
||||||
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
|
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
|
||||||
tokio-postgres.workspace = true
|
tokio-postgres.workspace = true
|
||||||
|
tracing.workspace = true
|
||||||
|
tracing-subscriber.workspace = true
|
||||||
url.workspace = true
|
url.workspace = true
|
||||||
|
|
||||||
workspace_hack.workspace = true
|
workspace_hack.workspace = true
|
||||||
|
|||||||
@@ -40,7 +40,7 @@ use std::{thread, time::Duration};
|
|||||||
use anyhow::{Context, Result};
|
use anyhow::{Context, Result};
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
use clap::Arg;
|
use clap::Arg;
|
||||||
use log::{error, info};
|
use tracing::{error, info};
|
||||||
|
|
||||||
use compute_tools::compute::{ComputeMetrics, ComputeNode, ComputeState, ComputeStatus};
|
use compute_tools::compute::{ComputeMetrics, ComputeNode, ComputeState, ComputeStatus};
|
||||||
use compute_tools::http::api::launch_http_server;
|
use compute_tools::http::api::launch_http_server;
|
||||||
@@ -53,7 +53,6 @@ use compute_tools::spec::*;
|
|||||||
use url::Url;
|
use url::Url;
|
||||||
|
|
||||||
fn main() -> Result<()> {
|
fn main() -> Result<()> {
|
||||||
// TODO: re-use `utils::logging` later
|
|
||||||
init_logger(DEFAULT_LOG_LEVEL)?;
|
init_logger(DEFAULT_LOG_LEVEL)?;
|
||||||
|
|
||||||
let matches = cli().get_matches();
|
let matches = cli().get_matches();
|
||||||
@@ -122,29 +121,45 @@ fn main() -> Result<()> {
|
|||||||
// Also spawn the thread responsible for handling the VM informant -- if it's present
|
// Also spawn the thread responsible for handling the VM informant -- if it's present
|
||||||
let _vm_informant_handle = spawn_vm_informant_if_present().expect("cannot launch VM informant");
|
let _vm_informant_handle = spawn_vm_informant_if_present().expect("cannot launch VM informant");
|
||||||
|
|
||||||
// Run compute (Postgres) and hang waiting on it.
|
// Start Postgres
|
||||||
match compute.prepare_and_run() {
|
let mut delay_exit = false;
|
||||||
Ok(ec) => {
|
let mut exit_code = None;
|
||||||
let code = ec.code().unwrap_or(1);
|
let pg = match compute.start_compute() {
|
||||||
info!("Postgres exited with code {}, shutting down", code);
|
Ok(pg) => Some(pg),
|
||||||
exit(code)
|
Err(err) => {
|
||||||
}
|
error!("could not start the compute node: {:?}", err);
|
||||||
Err(error) => {
|
|
||||||
error!("could not start the compute node: {:?}", error);
|
|
||||||
|
|
||||||
let mut state = compute.state.write().unwrap();
|
let mut state = compute.state.write().unwrap();
|
||||||
state.error = Some(format!("{:?}", error));
|
state.error = Some(format!("{:?}", err));
|
||||||
state.status = ComputeStatus::Failed;
|
state.status = ComputeStatus::Failed;
|
||||||
drop(state);
|
drop(state);
|
||||||
|
delay_exit = true;
|
||||||
// Keep serving HTTP requests, so the cloud control plane was able to
|
None
|
||||||
// get the actual error.
|
|
||||||
info!("giving control plane 30s to collect the error before shutdown");
|
|
||||||
thread::sleep(Duration::from_secs(30));
|
|
||||||
info!("shutting down");
|
|
||||||
Err(error)
|
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Wait for the child Postgres process forever. In this state Ctrl+C will
|
||||||
|
// propagate to Postgres and it will be shut down as well.
|
||||||
|
if let Some(mut pg) = pg {
|
||||||
|
let ecode = pg
|
||||||
|
.wait()
|
||||||
|
.expect("failed to start waiting on Postgres process");
|
||||||
|
info!("Postgres exited with code {}, shutting down", ecode);
|
||||||
|
exit_code = ecode.code()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Err(err) = compute.check_for_core_dumps() {
|
||||||
|
error!("error while checking for core dumps: {err:?}");
|
||||||
|
}
|
||||||
|
|
||||||
|
// If launch failed, keep serving HTTP requests for a while, so the cloud
|
||||||
|
// control plane can get the actual error.
|
||||||
|
if delay_exit {
|
||||||
|
info!("giving control plane 30s to collect the error before shutdown");
|
||||||
|
thread::sleep(Duration::from_secs(30));
|
||||||
|
info!("shutting down");
|
||||||
|
}
|
||||||
|
|
||||||
|
exit(exit_code.unwrap_or(1))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn cli() -> clap::Command {
|
fn cli() -> clap::Command {
|
||||||
|
|||||||
@@ -1,10 +1,11 @@
|
|||||||
use anyhow::{anyhow, Result};
|
use anyhow::{anyhow, Result};
|
||||||
use log::error;
|
|
||||||
use postgres::Client;
|
use postgres::Client;
|
||||||
use tokio_postgres::NoTls;
|
use tokio_postgres::NoTls;
|
||||||
|
use tracing::{error, instrument};
|
||||||
|
|
||||||
use crate::compute::ComputeNode;
|
use crate::compute::ComputeNode;
|
||||||
|
|
||||||
|
#[instrument(skip_all)]
|
||||||
pub fn create_writability_check_data(client: &mut Client) -> Result<()> {
|
pub fn create_writability_check_data(client: &mut Client) -> Result<()> {
|
||||||
let query = "
|
let query = "
|
||||||
CREATE TABLE IF NOT EXISTS health_check (
|
CREATE TABLE IF NOT EXISTS health_check (
|
||||||
@@ -21,6 +22,7 @@ pub fn create_writability_check_data(client: &mut Client) -> Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(skip_all)]
|
||||||
pub async fn check_writability(compute: &ComputeNode) -> Result<()> {
|
pub async fn check_writability(compute: &ComputeNode) -> Result<()> {
|
||||||
let (client, connection) = tokio_postgres::connect(compute.connstr.as_str(), NoTls).await?;
|
let (client, connection) = tokio_postgres::connect(compute.connstr.as_str(), NoTls).await?;
|
||||||
if client.is_closed() {
|
if client.is_closed() {
|
||||||
|
|||||||
@@ -17,15 +17,15 @@
|
|||||||
use std::fs;
|
use std::fs;
|
||||||
use std::os::unix::fs::PermissionsExt;
|
use std::os::unix::fs::PermissionsExt;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::process::{Command, ExitStatus, Stdio};
|
use std::process::{Command, Stdio};
|
||||||
use std::sync::atomic::{AtomicU64, Ordering};
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
use std::sync::RwLock;
|
use std::sync::RwLock;
|
||||||
|
|
||||||
use anyhow::{Context, Result};
|
use anyhow::{Context, Result};
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use log::{error, info, warn};
|
|
||||||
use postgres::{Client, NoTls};
|
use postgres::{Client, NoTls};
|
||||||
use serde::{Serialize, Serializer};
|
use serde::{Serialize, Serializer};
|
||||||
|
use tracing::{info, instrument, warn};
|
||||||
|
|
||||||
use crate::checker::create_writability_check_data;
|
use crate::checker::create_writability_check_data;
|
||||||
use crate::config;
|
use crate::config;
|
||||||
@@ -121,6 +121,7 @@ impl ComputeNode {
|
|||||||
|
|
||||||
// Get basebackup from the libpq connection to pageserver using `connstr` and
|
// Get basebackup from the libpq connection to pageserver using `connstr` and
|
||||||
// unarchive it to `pgdata` directory overriding all its previous content.
|
// unarchive it to `pgdata` directory overriding all its previous content.
|
||||||
|
#[instrument(skip(self))]
|
||||||
fn get_basebackup(&self, lsn: &str) -> Result<()> {
|
fn get_basebackup(&self, lsn: &str) -> Result<()> {
|
||||||
let start_time = Utc::now();
|
let start_time = Utc::now();
|
||||||
|
|
||||||
@@ -154,6 +155,7 @@ impl ComputeNode {
|
|||||||
|
|
||||||
// Run `postgres` in a special mode with `--sync-safekeepers` argument
|
// Run `postgres` in a special mode with `--sync-safekeepers` argument
|
||||||
// and return the reported LSN back to the caller.
|
// and return the reported LSN back to the caller.
|
||||||
|
#[instrument(skip(self))]
|
||||||
fn sync_safekeepers(&self) -> Result<String> {
|
fn sync_safekeepers(&self) -> Result<String> {
|
||||||
let start_time = Utc::now();
|
let start_time = Utc::now();
|
||||||
|
|
||||||
@@ -196,6 +198,7 @@ impl ComputeNode {
|
|||||||
|
|
||||||
/// Do all the preparations like PGDATA directory creation, configuration,
|
/// Do all the preparations like PGDATA directory creation, configuration,
|
||||||
/// safekeepers sync, basebackup, etc.
|
/// safekeepers sync, basebackup, etc.
|
||||||
|
#[instrument(skip(self))]
|
||||||
pub fn prepare_pgdata(&self) -> Result<()> {
|
pub fn prepare_pgdata(&self) -> Result<()> {
|
||||||
let spec = &self.spec;
|
let spec = &self.spec;
|
||||||
let pgdata_path = Path::new(&self.pgdata);
|
let pgdata_path = Path::new(&self.pgdata);
|
||||||
@@ -229,9 +232,8 @@ impl ComputeNode {
|
|||||||
|
|
||||||
/// Start Postgres as a child process and manage DBs/roles.
|
/// Start Postgres as a child process and manage DBs/roles.
|
||||||
/// After that this will hang waiting on the postmaster process to exit.
|
/// After that this will hang waiting on the postmaster process to exit.
|
||||||
pub fn run(&self) -> Result<ExitStatus> {
|
#[instrument(skip(self))]
|
||||||
let start_time = Utc::now();
|
pub fn start_postgres(&self) -> Result<std::process::Child> {
|
||||||
|
|
||||||
let pgdata_path = Path::new(&self.pgdata);
|
let pgdata_path = Path::new(&self.pgdata);
|
||||||
|
|
||||||
// Run postgres as a child process.
|
// Run postgres as a child process.
|
||||||
@@ -242,6 +244,11 @@ impl ComputeNode {
|
|||||||
|
|
||||||
wait_for_postgres(&mut pg, pgdata_path)?;
|
wait_for_postgres(&mut pg, pgdata_path)?;
|
||||||
|
|
||||||
|
Ok(pg)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[instrument(skip(self))]
|
||||||
|
pub fn apply_config(&self) -> Result<()> {
|
||||||
// If connection fails,
|
// If connection fails,
|
||||||
// it may be the old node with `zenith_admin` superuser.
|
// it may be the old node with `zenith_admin` superuser.
|
||||||
//
|
//
|
||||||
@@ -279,8 +286,34 @@ impl ComputeNode {
|
|||||||
|
|
||||||
// 'Close' connection
|
// 'Close' connection
|
||||||
drop(client);
|
drop(client);
|
||||||
let startup_end_time = Utc::now();
|
|
||||||
|
|
||||||
|
info!(
|
||||||
|
"finished configuration of compute for project {}",
|
||||||
|
self.spec.cluster.cluster_id
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[instrument(skip(self))]
|
||||||
|
pub fn start_compute(&self) -> Result<std::process::Child> {
|
||||||
|
info!(
|
||||||
|
"starting compute for project {}, operation {}, tenant {}, timeline {}",
|
||||||
|
self.spec.cluster.cluster_id,
|
||||||
|
self.spec.operation_uuid.as_ref().unwrap(),
|
||||||
|
self.tenant,
|
||||||
|
self.timeline,
|
||||||
|
);
|
||||||
|
|
||||||
|
self.prepare_pgdata()?;
|
||||||
|
|
||||||
|
let start_time = Utc::now();
|
||||||
|
|
||||||
|
let pg = self.start_postgres()?;
|
||||||
|
|
||||||
|
self.apply_config()?;
|
||||||
|
|
||||||
|
let startup_end_time = Utc::now();
|
||||||
self.metrics.config_ms.store(
|
self.metrics.config_ms.store(
|
||||||
startup_end_time
|
startup_end_time
|
||||||
.signed_duration_since(start_time)
|
.signed_duration_since(start_time)
|
||||||
@@ -300,35 +333,7 @@ impl ComputeNode {
|
|||||||
|
|
||||||
self.set_status(ComputeStatus::Running);
|
self.set_status(ComputeStatus::Running);
|
||||||
|
|
||||||
info!(
|
Ok(pg)
|
||||||
"finished configuration of compute for project {}",
|
|
||||||
self.spec.cluster.cluster_id
|
|
||||||
);
|
|
||||||
|
|
||||||
// Wait for child Postgres process basically forever. In this state Ctrl+C
|
|
||||||
// will propagate to Postgres and it will be shut down as well.
|
|
||||||
let ecode = pg
|
|
||||||
.wait()
|
|
||||||
.expect("failed to start waiting on Postgres process");
|
|
||||||
|
|
||||||
if let Err(err) = self.check_for_core_dumps() {
|
|
||||||
error!("error while checking for core dumps: {err:?}");
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(ecode)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn prepare_and_run(&self) -> Result<ExitStatus> {
|
|
||||||
info!(
|
|
||||||
"starting compute for project {}, operation {}, tenant {}, timeline {}",
|
|
||||||
self.spec.cluster.cluster_id,
|
|
||||||
self.spec.operation_uuid.as_ref().unwrap(),
|
|
||||||
self.tenant,
|
|
||||||
self.timeline,
|
|
||||||
);
|
|
||||||
|
|
||||||
self.prepare_pgdata()?;
|
|
||||||
self.run()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Look for core dumps and collect backtraces.
|
// Look for core dumps and collect backtraces.
|
||||||
@@ -341,7 +346,7 @@ impl ComputeNode {
|
|||||||
//
|
//
|
||||||
// Use that as a default location and pattern, except macos where core dumps are written
|
// Use that as a default location and pattern, except macos where core dumps are written
|
||||||
// to /cores/ directory by default.
|
// to /cores/ directory by default.
|
||||||
fn check_for_core_dumps(&self) -> Result<()> {
|
pub fn check_for_core_dumps(&self) -> Result<()> {
|
||||||
let core_dump_dir = match std::env::consts::OS {
|
let core_dump_dir = match std::env::consts::OS {
|
||||||
"macos" => Path::new("/cores/"),
|
"macos" => Path::new("/cores/"),
|
||||||
_ => Path::new(&self.pgdata),
|
_ => Path::new(&self.pgdata),
|
||||||
|
|||||||
@@ -6,8 +6,8 @@ use std::thread;
|
|||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use hyper::service::{make_service_fn, service_fn};
|
use hyper::service::{make_service_fn, service_fn};
|
||||||
use hyper::{Body, Method, Request, Response, Server, StatusCode};
|
use hyper::{Body, Method, Request, Response, Server, StatusCode};
|
||||||
use log::{error, info};
|
|
||||||
use serde_json;
|
use serde_json;
|
||||||
|
use tracing::{error, info};
|
||||||
|
|
||||||
use crate::compute::ComputeNode;
|
use crate::compute::ComputeNode;
|
||||||
|
|
||||||
|
|||||||
@@ -1,8 +1,8 @@
|
|||||||
use log::{info, warn};
|
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::process;
|
use std::process;
|
||||||
use std::thread;
|
use std::thread;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
use tracing::{info, warn};
|
||||||
|
|
||||||
use anyhow::{Context, Result};
|
use anyhow::{Context, Result};
|
||||||
|
|
||||||
|
|||||||
@@ -1,42 +1,20 @@
|
|||||||
use std::io::Write;
|
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use chrono::Utc;
|
use tracing_subscriber::layer::SubscriberExt;
|
||||||
use env_logger::{Builder, Env};
|
use tracing_subscriber::prelude::*;
|
||||||
|
|
||||||
macro_rules! info_println {
|
|
||||||
($($tts:tt)*) => {
|
|
||||||
if log_enabled!(Level::Info) {
|
|
||||||
println!($($tts)*);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
macro_rules! info_print {
|
|
||||||
($($tts:tt)*) => {
|
|
||||||
if log_enabled!(Level::Info) {
|
|
||||||
print!($($tts)*);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Initialize `env_logger` using either `default_level` or
|
/// Initialize `env_logger` using either `default_level` or
|
||||||
/// `RUST_LOG` environment variable as default log level.
|
/// `RUST_LOG` environment variable as default log level.
|
||||||
pub fn init_logger(default_level: &str) -> Result<()> {
|
pub fn init_logger(default_level: &str) -> Result<()> {
|
||||||
let env = Env::default().filter_or("RUST_LOG", default_level);
|
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
|
||||||
|
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(default_level));
|
||||||
|
|
||||||
Builder::from_env(env)
|
let fmt_layer = tracing_subscriber::fmt::layer()
|
||||||
.format(|buf, record| {
|
.with_target(false)
|
||||||
let thread_handle = std::thread::current();
|
.with_writer(std::io::stderr);
|
||||||
writeln!(
|
|
||||||
buf,
|
tracing_subscriber::registry()
|
||||||
"{} [{}] {}: {}",
|
.with(env_filter)
|
||||||
Utc::now().format("%Y-%m-%d %H:%M:%S%.3f %Z"),
|
.with(fmt_layer)
|
||||||
thread_handle.name().unwrap_or("main"),
|
|
||||||
record.level(),
|
|
||||||
record.args()
|
|
||||||
)
|
|
||||||
})
|
|
||||||
.init();
|
.init();
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|||||||
@@ -3,8 +3,8 @@ use std::{thread, time};
|
|||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use log::{debug, info};
|
|
||||||
use postgres::{Client, NoTls};
|
use postgres::{Client, NoTls};
|
||||||
|
use tracing::{debug, info};
|
||||||
|
|
||||||
use crate::compute::ComputeNode;
|
use crate::compute::ComputeNode;
|
||||||
|
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ use anyhow::{bail, Result};
|
|||||||
use notify::{RecursiveMode, Watcher};
|
use notify::{RecursiveMode, Watcher};
|
||||||
use postgres::{Client, Transaction};
|
use postgres::{Client, Transaction};
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
|
use tracing::{debug, instrument};
|
||||||
|
|
||||||
const POSTGRES_WAIT_TIMEOUT: Duration = Duration::from_millis(60 * 1000); // milliseconds
|
const POSTGRES_WAIT_TIMEOUT: Duration = Duration::from_millis(60 * 1000); // milliseconds
|
||||||
|
|
||||||
@@ -229,6 +230,7 @@ pub fn get_existing_dbs(client: &mut Client) -> Result<Vec<Database>> {
|
|||||||
/// Wait for Postgres to become ready to accept connections. It's ready to
|
/// Wait for Postgres to become ready to accept connections. It's ready to
|
||||||
/// accept connections when the state-field in `pgdata/postmaster.pid` says
|
/// accept connections when the state-field in `pgdata/postmaster.pid` says
|
||||||
/// 'ready'.
|
/// 'ready'.
|
||||||
|
#[instrument(skip(pg))]
|
||||||
pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> {
|
pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> {
|
||||||
let pid_path = pgdata.join("postmaster.pid");
|
let pid_path = pgdata.join("postmaster.pid");
|
||||||
|
|
||||||
@@ -287,18 +289,18 @@ pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let res = rx.recv_timeout(Duration::from_millis(100));
|
let res = rx.recv_timeout(Duration::from_millis(100));
|
||||||
log::debug!("woken up by notify: {res:?}");
|
debug!("woken up by notify: {res:?}");
|
||||||
// If there are multiple events in the channel already, we only need to be
|
// If there are multiple events in the channel already, we only need to be
|
||||||
// check once. Swallow the extra events before we go ahead to check the
|
// check once. Swallow the extra events before we go ahead to check the
|
||||||
// pid file.
|
// pid file.
|
||||||
while let Ok(res) = rx.try_recv() {
|
while let Ok(res) = rx.try_recv() {
|
||||||
log::debug!("swallowing extra event: {res:?}");
|
debug!("swallowing extra event: {res:?}");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check that we can open pid file first.
|
// Check that we can open pid file first.
|
||||||
if let Ok(file) = File::open(&pid_path) {
|
if let Ok(file) = File::open(&pid_path) {
|
||||||
if !postmaster_pid_seen {
|
if !postmaster_pid_seen {
|
||||||
log::debug!("postmaster.pid appeared");
|
debug!("postmaster.pid appeared");
|
||||||
watcher
|
watcher
|
||||||
.unwatch(pgdata)
|
.unwatch(pgdata)
|
||||||
.expect("Failed to remove pgdata dir watch");
|
.expect("Failed to remove pgdata dir watch");
|
||||||
@@ -314,7 +316,7 @@ pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> {
|
|||||||
// Pid file could be there and we could read it, but it could be empty, for example.
|
// Pid file could be there and we could read it, but it could be empty, for example.
|
||||||
if let Some(Ok(line)) = last_line {
|
if let Some(Ok(line)) = last_line {
|
||||||
let status = line.trim();
|
let status = line.trim();
|
||||||
log::debug!("last line of postmaster.pid: {status:?}");
|
debug!("last line of postmaster.pid: {status:?}");
|
||||||
|
|
||||||
// Now Postgres is ready to accept connections
|
// Now Postgres is ready to accept connections
|
||||||
if status == "ready" {
|
if status == "ready" {
|
||||||
@@ -330,7 +332,7 @@ pub fn wait_for_postgres(pg: &mut Child, pgdata: &Path) -> Result<()> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log::info!("PostgreSQL is now running, continuing to configure it");
|
tracing::info!("PostgreSQL is now running, continuing to configure it");
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,12 +1,11 @@
|
|||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::time::Instant;
|
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use log::{info, log_enabled, warn, Level};
|
|
||||||
use postgres::config::Config;
|
use postgres::config::Config;
|
||||||
use postgres::{Client, NoTls};
|
use postgres::{Client, NoTls};
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
|
use tracing::{info, info_span, instrument, span_enabled, warn, Level};
|
||||||
|
|
||||||
use crate::compute::ComputeNode;
|
use crate::compute::ComputeNode;
|
||||||
use crate::config;
|
use crate::config;
|
||||||
@@ -80,23 +79,25 @@ pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
|
|||||||
|
|
||||||
/// Given a cluster spec json and open transaction it handles roles creation,
|
/// Given a cluster spec json and open transaction it handles roles creation,
|
||||||
/// deletion and update.
|
/// deletion and update.
|
||||||
|
#[instrument(skip_all)]
|
||||||
pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
|
pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
|
||||||
let mut xact = client.transaction()?;
|
let mut xact = client.transaction()?;
|
||||||
let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
|
let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
|
||||||
|
|
||||||
// Print a list of existing Postgres roles (only in debug mode)
|
// Print a list of existing Postgres roles (only in debug mode)
|
||||||
info!("postgres roles:");
|
if span_enabled!(Level::INFO) {
|
||||||
for r in &existing_roles {
|
info!("postgres roles:");
|
||||||
info_println!(
|
for r in &existing_roles {
|
||||||
"{} - {}:{}",
|
info!(
|
||||||
" ".repeat(27 + 5),
|
" - {}:{}",
|
||||||
r.name,
|
r.name,
|
||||||
if r.encrypted_password.is_some() {
|
if r.encrypted_password.is_some() {
|
||||||
"[FILTERED]"
|
"[FILTERED]"
|
||||||
} else {
|
} else {
|
||||||
"(null)"
|
"(null)"
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process delta operations first
|
// Process delta operations first
|
||||||
@@ -137,58 +138,68 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
|
|||||||
info!("cluster spec roles:");
|
info!("cluster spec roles:");
|
||||||
for role in &spec.cluster.roles {
|
for role in &spec.cluster.roles {
|
||||||
let name = &role.name;
|
let name = &role.name;
|
||||||
|
|
||||||
info_print!(
|
|
||||||
"{} - {}:{}",
|
|
||||||
" ".repeat(27 + 5),
|
|
||||||
name,
|
|
||||||
if role.encrypted_password.is_some() {
|
|
||||||
"[FILTERED]"
|
|
||||||
} else {
|
|
||||||
"(null)"
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
// XXX: with a limited number of roles it is fine, but consider making it a HashMap
|
// XXX: with a limited number of roles it is fine, but consider making it a HashMap
|
||||||
let pg_role = existing_roles.iter().find(|r| r.name == *name);
|
let pg_role = existing_roles.iter().find(|r| r.name == *name);
|
||||||
|
|
||||||
if let Some(r) = pg_role {
|
enum RoleAction {
|
||||||
let mut update_role = false;
|
None,
|
||||||
|
Update,
|
||||||
|
Create,
|
||||||
|
}
|
||||||
|
let action = if let Some(r) = pg_role {
|
||||||
if (r.encrypted_password.is_none() && role.encrypted_password.is_some())
|
if (r.encrypted_password.is_none() && role.encrypted_password.is_some())
|
||||||
|| (r.encrypted_password.is_some() && role.encrypted_password.is_none())
|
|| (r.encrypted_password.is_some() && role.encrypted_password.is_none())
|
||||||
{
|
{
|
||||||
update_role = true;
|
RoleAction::Update
|
||||||
} else if let Some(pg_pwd) = &r.encrypted_password {
|
} else if let Some(pg_pwd) = &r.encrypted_password {
|
||||||
// Check whether password changed or not (trim 'md5:' prefix first)
|
// Check whether password changed or not (trim 'md5:' prefix first)
|
||||||
update_role = pg_pwd[3..] != *role.encrypted_password.as_ref().unwrap();
|
if pg_pwd[3..] != *role.encrypted_password.as_ref().unwrap() {
|
||||||
|
RoleAction::Update
|
||||||
|
} else {
|
||||||
|
RoleAction::None
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
RoleAction::None
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
RoleAction::Create
|
||||||
|
};
|
||||||
|
|
||||||
if update_role {
|
match action {
|
||||||
|
RoleAction::None => {}
|
||||||
|
RoleAction::Update => {
|
||||||
let mut query: String = format!("ALTER ROLE {} ", name.pg_quote());
|
let mut query: String = format!("ALTER ROLE {} ", name.pg_quote());
|
||||||
info_print!(" -> update");
|
|
||||||
|
|
||||||
query.push_str(&role.to_pg_options());
|
query.push_str(&role.to_pg_options());
|
||||||
xact.execute(query.as_str(), &[])?;
|
xact.execute(query.as_str(), &[])?;
|
||||||
}
|
}
|
||||||
} else {
|
RoleAction::Create => {
|
||||||
info!("role name: '{}'", &name);
|
let mut query: String = format!("CREATE ROLE {} ", name.pg_quote());
|
||||||
let mut query: String = format!("CREATE ROLE {} ", name.pg_quote());
|
info!("role create query: '{}'", &query);
|
||||||
info!("role create query: '{}'", &query);
|
query.push_str(&role.to_pg_options());
|
||||||
info_print!(" -> create");
|
xact.execute(query.as_str(), &[])?;
|
||||||
|
|
||||||
query.push_str(&role.to_pg_options());
|
let grant_query = format!(
|
||||||
xact.execute(query.as_str(), &[])?;
|
"GRANT pg_read_all_data, pg_write_all_data TO {}",
|
||||||
|
name.pg_quote()
|
||||||
let grant_query = format!(
|
);
|
||||||
"GRANT pg_read_all_data, pg_write_all_data TO {}",
|
xact.execute(grant_query.as_str(), &[])?;
|
||||||
name.pg_quote()
|
info!("role grant query: '{}'", &grant_query);
|
||||||
);
|
}
|
||||||
xact.execute(grant_query.as_str(), &[])?;
|
|
||||||
info!("role grant query: '{}'", &grant_query);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
info_print!("\n");
|
if span_enabled!(Level::INFO) {
|
||||||
|
let pwd = if role.encrypted_password.is_some() {
|
||||||
|
"[FILTERED]"
|
||||||
|
} else {
|
||||||
|
"(null)"
|
||||||
|
};
|
||||||
|
let action_str = match action {
|
||||||
|
RoleAction::None => "",
|
||||||
|
RoleAction::Create => " -> create",
|
||||||
|
RoleAction::Update => " -> update",
|
||||||
|
};
|
||||||
|
info!(" - {}:{}{}", name, pwd, action_str);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
xact.commit()?;
|
xact.commit()?;
|
||||||
@@ -197,6 +208,7 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Reassign all dependent objects and delete requested roles.
|
/// Reassign all dependent objects and delete requested roles.
|
||||||
|
#[instrument(skip_all)]
|
||||||
pub fn handle_role_deletions(node: &ComputeNode, client: &mut Client) -> Result<()> {
|
pub fn handle_role_deletions(node: &ComputeNode, client: &mut Client) -> Result<()> {
|
||||||
if let Some(ops) = &node.spec.delta_operations {
|
if let Some(ops) = &node.spec.delta_operations {
|
||||||
// First, reassign all dependent objects to db owners.
|
// First, reassign all dependent objects to db owners.
|
||||||
@@ -261,13 +273,16 @@ fn reassign_owned_objects(node: &ComputeNode, role_name: &PgIdent) -> Result<()>
|
|||||||
/// like `CREATE DATABASE` and `DROP DATABASE` do not support it. Statement-level
|
/// like `CREATE DATABASE` and `DROP DATABASE` do not support it. Statement-level
|
||||||
/// atomicity should be enough here due to the order of operations and various checks,
|
/// atomicity should be enough here due to the order of operations and various checks,
|
||||||
/// which together provide us idempotency.
|
/// which together provide us idempotency.
|
||||||
|
#[instrument(skip_all)]
|
||||||
pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
|
pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
|
||||||
let existing_dbs: Vec<Database> = get_existing_dbs(client)?;
|
let existing_dbs: Vec<Database> = get_existing_dbs(client)?;
|
||||||
|
|
||||||
// Print a list of existing Postgres databases (only in debug mode)
|
// Print a list of existing Postgres databases (only in debug mode)
|
||||||
info!("postgres databases:");
|
if span_enabled!(Level::INFO) {
|
||||||
for r in &existing_dbs {
|
info!("postgres databases:");
|
||||||
info_println!("{} - {}:{}", " ".repeat(27 + 5), r.name, r.owner);
|
for r in &existing_dbs {
|
||||||
|
info!(" {}:{}", r.name, r.owner);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process delta operations first
|
// Process delta operations first
|
||||||
@@ -310,13 +325,15 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
|
|||||||
for db in &spec.cluster.databases {
|
for db in &spec.cluster.databases {
|
||||||
let name = &db.name;
|
let name = &db.name;
|
||||||
|
|
||||||
info_print!("{} - {}:{}", " ".repeat(27 + 5), db.name, db.owner);
|
|
||||||
|
|
||||||
// XXX: with a limited number of databases it is fine, but consider making it a HashMap
|
// XXX: with a limited number of databases it is fine, but consider making it a HashMap
|
||||||
let pg_db = existing_dbs.iter().find(|r| r.name == *name);
|
let pg_db = existing_dbs.iter().find(|r| r.name == *name);
|
||||||
|
|
||||||
let start_time = Instant::now();
|
enum DatabaseAction {
|
||||||
if let Some(r) = pg_db {
|
None,
|
||||||
|
Update,
|
||||||
|
Create,
|
||||||
|
}
|
||||||
|
let action = if let Some(r) = pg_db {
|
||||||
// XXX: db owner name is returned as quoted string from Postgres,
|
// XXX: db owner name is returned as quoted string from Postgres,
|
||||||
// when quoting is needed.
|
// when quoting is needed.
|
||||||
let new_owner = if r.owner.starts_with('"') {
|
let new_owner = if r.owner.starts_with('"') {
|
||||||
@@ -326,29 +343,42 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
|
|||||||
};
|
};
|
||||||
|
|
||||||
if new_owner != r.owner {
|
if new_owner != r.owner {
|
||||||
|
// Update the owner
|
||||||
|
DatabaseAction::Update
|
||||||
|
} else {
|
||||||
|
DatabaseAction::None
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
DatabaseAction::Create
|
||||||
|
};
|
||||||
|
|
||||||
|
match action {
|
||||||
|
DatabaseAction::None => {}
|
||||||
|
DatabaseAction::Update => {
|
||||||
let query: String = format!(
|
let query: String = format!(
|
||||||
"ALTER DATABASE {} OWNER TO {}",
|
"ALTER DATABASE {} OWNER TO {}",
|
||||||
name.pg_quote(),
|
name.pg_quote(),
|
||||||
db.owner.pg_quote()
|
db.owner.pg_quote()
|
||||||
);
|
);
|
||||||
info_print!(" -> update");
|
let _ = info_span!("executing", query).entered();
|
||||||
|
|
||||||
client.execute(query.as_str(), &[])?;
|
client.execute(query.as_str(), &[])?;
|
||||||
let elapsed = start_time.elapsed().as_millis();
|
|
||||||
info_print!(" ({} ms)", elapsed);
|
|
||||||
}
|
}
|
||||||
} else {
|
DatabaseAction::Create => {
|
||||||
let mut query: String = format!("CREATE DATABASE {} ", name.pg_quote());
|
let mut query: String = format!("CREATE DATABASE {} ", name.pg_quote());
|
||||||
info_print!(" -> create");
|
query.push_str(&db.to_pg_options());
|
||||||
|
let _ = info_span!("executing", query).entered();
|
||||||
|
client.execute(query.as_str(), &[])?;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
query.push_str(&db.to_pg_options());
|
if span_enabled!(Level::INFO) {
|
||||||
client.execute(query.as_str(), &[])?;
|
let action_str = match action {
|
||||||
|
DatabaseAction::None => "",
|
||||||
let elapsed = start_time.elapsed().as_millis();
|
DatabaseAction::Create => " -> create",
|
||||||
info_print!(" ({} ms)", elapsed);
|
DatabaseAction::Update => " -> update",
|
||||||
|
};
|
||||||
|
info!(" - {}:{}{}", db.name, db.owner, action_str);
|
||||||
}
|
}
|
||||||
|
|
||||||
info_print!("\n");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -356,6 +386,7 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
|
|||||||
|
|
||||||
/// Grant CREATE ON DATABASE to the database owner and do some other alters and grants
|
/// Grant CREATE ON DATABASE to the database owner and do some other alters and grants
|
||||||
/// to allow users creating trusted extensions and re-creating `public` schema, for example.
|
/// to allow users creating trusted extensions and re-creating `public` schema, for example.
|
||||||
|
#[instrument(skip_all)]
|
||||||
pub fn handle_grants(node: &ComputeNode, client: &mut Client) -> Result<()> {
|
pub fn handle_grants(node: &ComputeNode, client: &mut Client) -> Result<()> {
|
||||||
let spec = &node.spec;
|
let spec = &node.spec;
|
||||||
|
|
||||||
|
|||||||
@@ -194,7 +194,7 @@ def test_sync_safekeepers_logs(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
|||||||
)
|
)
|
||||||
except TimeoutExpired as exc:
|
except TimeoutExpired as exc:
|
||||||
ctl_logs = (exc.stderr or b"").decode("utf-8")
|
ctl_logs = (exc.stderr or b"").decode("utf-8")
|
||||||
log.info("compute_ctl output:\n{ctl_logs}")
|
log.info(f"compute_ctl stderr:\n{ctl_logs}")
|
||||||
|
|
||||||
with ExternalProcessManager(Path(pgdata) / "postmaster.pid"):
|
with ExternalProcessManager(Path(pgdata) / "postmaster.pid"):
|
||||||
start = "starting safekeepers syncing"
|
start = "starting safekeepers syncing"
|
||||||
|
|||||||
@@ -26,7 +26,7 @@ futures-util = { version = "0.3", features = ["channel", "io", "sink"] }
|
|||||||
indexmap = { version = "1", default-features = false, features = ["std"] }
|
indexmap = { version = "1", default-features = false, features = ["std"] }
|
||||||
itertools = { version = "0.10" }
|
itertools = { version = "0.10" }
|
||||||
libc = { version = "0.2", features = ["extra_traits"] }
|
libc = { version = "0.2", features = ["extra_traits"] }
|
||||||
log = { version = "0.4", default-features = false, features = ["serde", "std"] }
|
log = { version = "0.4", default-features = false, features = ["std"] }
|
||||||
memchr = { version = "2" }
|
memchr = { version = "2" }
|
||||||
nom = { version = "7" }
|
nom = { version = "7" }
|
||||||
num-bigint = { version = "0.4" }
|
num-bigint = { version = "0.4" }
|
||||||
@@ -45,6 +45,7 @@ tokio-util = { version = "0.7", features = ["codec", "io"] }
|
|||||||
tower = { version = "0.4", features = ["balance", "buffer", "limit", "retry", "timeout", "util"] }
|
tower = { version = "0.4", features = ["balance", "buffer", "limit", "retry", "timeout", "util"] }
|
||||||
tracing = { version = "0.1", features = ["log"] }
|
tracing = { version = "0.1", features = ["log"] }
|
||||||
tracing-core = { version = "0.1" }
|
tracing-core = { version = "0.1" }
|
||||||
|
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
|
||||||
url = { version = "2", features = ["serde"] }
|
url = { version = "2", features = ["serde"] }
|
||||||
|
|
||||||
[build-dependencies]
|
[build-dependencies]
|
||||||
@@ -54,7 +55,7 @@ either = { version = "1" }
|
|||||||
indexmap = { version = "1", default-features = false, features = ["std"] }
|
indexmap = { version = "1", default-features = false, features = ["std"] }
|
||||||
itertools = { version = "0.10" }
|
itertools = { version = "0.10" }
|
||||||
libc = { version = "0.2", features = ["extra_traits"] }
|
libc = { version = "0.2", features = ["extra_traits"] }
|
||||||
log = { version = "0.4", default-features = false, features = ["serde", "std"] }
|
log = { version = "0.4", default-features = false, features = ["std"] }
|
||||||
memchr = { version = "2" }
|
memchr = { version = "2" }
|
||||||
nom = { version = "7" }
|
nom = { version = "7" }
|
||||||
prost = { version = "0.11" }
|
prost = { version = "0.11" }
|
||||||
|
|||||||
Reference in New Issue
Block a user