Add safekeeper option to patch control file.

https://github.com/neondatabase/neon/issues/6397
This commit is contained in:
Arseny Sher
2024-01-21 00:19:47 +03:00
parent f003dd6ad5
commit ee46acee26
3 changed files with 110 additions and 45 deletions

View File

@@ -8,6 +8,8 @@ use futures::future::BoxFuture;
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use remote_storage::RemoteStorageConfig;
use safekeeper::control_file::FileStorage;
use safekeeper::state::TimelinePersistentState;
use sd_notify::NotifyState;
use tokio::runtime::Handle;
use tokio::signal::unix::{signal, SignalKind};
@@ -30,12 +32,12 @@ use safekeeper::defaults::{
DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES,
DEFAULT_PG_LISTEN_ADDR,
};
use safekeeper::wal_service;
use safekeeper::GlobalTimelines;
use safekeeper::SafeKeeperConf;
use safekeeper::{broker, WAL_SERVICE_RUNTIME};
use safekeeper::{control_file, BROKER_RUNTIME};
use safekeeper::{http, WAL_REMOVER_RUNTIME};
use safekeeper::{json_merge, wal_service};
use safekeeper::{remove_wal, WAL_BACKUP_RUNTIME};
use safekeeper::{wal_backup, HTTP_RUNTIME};
use storage_broker::DEFAULT_ENDPOINT;
@@ -105,9 +107,6 @@ struct Args {
/// Do not wait for changes to be written safely to disk. Unsafe.
#[arg(short, long)]
no_sync: bool,
/// Dump control file at path specified by this argument and exit.
#[arg(long)]
dump_control_file: Option<Utf8PathBuf>,
/// Broker endpoint for storage nodes coordination in the form
/// http[s]://host:port. In case of https schema TLS is connection is
/// established; plaintext otherwise.
@@ -166,6 +165,21 @@ struct Args {
/// useful for debugging.
#[arg(long)]
current_thread_runtime: bool,
/// Dump control file at path specified by this argument and exit.
#[arg(long)]
dump_control_file: Option<Utf8PathBuf>,
/// Patch control file at path specified by this argument and exit.
/// Patch is specified in --patch option and imposed over
/// control file as per rfc7386.
/// Without --write-patched the result is only printed.
#[arg(long, verbatim_doc_comment)]
patch_control_file: Option<Utf8PathBuf>,
/// The patch to apply to control file at --patch-control-file, in JSON.
#[arg(long, default_value = None)]
patch: Option<String>,
/// Write --patch-control-file result back in place.
#[arg(long, default_value = "false")]
write_patched: bool,
}
// Like PathBufValueParser, but allows empty string.
@@ -207,7 +221,13 @@ async fn main() -> anyhow::Result<()> {
if let Some(addr) = args.dump_control_file {
let state = control_file::FileStorage::load_control_file(addr)?;
let json = serde_json::to_string(&state)?;
print!("{json}");
println!("{json}");
return Ok(());
}
if let Some(cfile_path) = args.patch_control_file {
let patch = args.patch.ok_or(anyhow::anyhow!("patch is missing"))?;
patch_control_file(cfile_path, patch, args.write_patched).await?;
return Ok(());
}
@@ -529,6 +549,26 @@ fn parse_remote_storage(storage_conf: &str) -> anyhow::Result<RemoteStorageConfi
})
}
async fn patch_control_file(
cfile_path: Utf8PathBuf,
patch: String,
write: bool,
) -> anyhow::Result<()> {
let state = control_file::FileStorage::load_control_file(&cfile_path)?;
// serialize to json, impose patch and deserialize back
let mut state_json =
serde_json::to_value(state).context("failed to serialize state to json")?;
let patch_json = serde_json::from_str(&patch).context("failed to parse patch")?;
json_merge(&mut state_json, patch_json);
let patched_state: TimelinePersistentState =
serde_json::from_value(state_json.clone()).context("failed to deserialize patched json")?;
println!("{state_json}");
if write {
FileStorage::do_persist(&patched_state, &cfile_path, true).await?;
}
return Ok(());
}
#[test]
fn verify_cli() {
use clap::CommandFactory;

View File

@@ -2,7 +2,7 @@
use anyhow::{bail, ensure, Context, Result};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use camino::Utf8PathBuf;
use camino::{Utf8Path, Utf8PathBuf};
use tokio::fs::{self, File};
use tokio::io::AsyncWriteExt;
@@ -155,6 +155,46 @@ impl FileStorage {
})?;
Ok(state)
}
/// Persist state s to dst_path, optionally fsyncing file.
pub async fn do_persist(
s: &TimelinePersistentState,
dst_path: &Utf8Path,
sync: bool,
) -> Result<()> {
let mut f = File::create(&dst_path)
.await
.with_context(|| format!("failed to create partial control file at: {}", &dst_path))?;
let mut buf: Vec<u8> = Vec::new();
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_MAGIC)?;
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_FORMAT_VERSION)?;
s.ser_into(&mut buf)?;
// calculate checksum before resize
let checksum = crc32c::crc32c(&buf);
buf.extend_from_slice(&checksum.to_le_bytes());
f.write_all(&buf).await.with_context(|| {
format!(
"failed to write safekeeper state into control file at: {}",
dst_path
)
})?;
f.flush().await.with_context(|| {
format!(
"failed to flush safekeeper state into control file at: {}",
dst_path
)
})?;
// fsync the file
if sync {
f.sync_all()
.await
.with_context(|| format!("failed to sync partial control file at {}", dst_path))?;
}
Ok(())
}
}
impl Deref for FileStorage {
@@ -167,7 +207,7 @@ impl Deref for FileStorage {
#[async_trait::async_trait]
impl Storage for FileStorage {
/// Persists state durably to the underlying storage.
/// Atomically persists state durably to the underlying storage.
///
/// For a description, see <https://lwn.net/Articles/457667/>.
async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
@@ -175,46 +215,9 @@ impl Storage for FileStorage {
// write data to safekeeper.control.partial
let control_partial_path = self.timeline_dir.join(CONTROL_FILE_NAME_PARTIAL);
let mut control_partial = File::create(&control_partial_path).await.with_context(|| {
format!(
"failed to create partial control file at: {}",
&control_partial_path
)
})?;
let mut buf: Vec<u8> = Vec::new();
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_MAGIC)?;
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_FORMAT_VERSION)?;
s.ser_into(&mut buf)?;
// calculate checksum before resize
let checksum = crc32c::crc32c(&buf);
buf.extend_from_slice(&checksum.to_le_bytes());
control_partial.write_all(&buf).await.with_context(|| {
format!(
"failed to write safekeeper state into control file at: {}",
control_partial_path
)
})?;
control_partial.flush().await.with_context(|| {
format!(
"failed to flush safekeeper state into control file at: {}",
control_partial_path
)
})?;
// fsync the file
if !self.conf.no_sync {
control_partial.sync_all().await.with_context(|| {
format!(
"failed to sync partial control file at {}",
control_partial_path
)
})?;
}
FileStorage::do_persist(s, &control_partial_path, !self.conf.no_sync).await?;
let control_path = self.timeline_dir.join(CONTROL_FILE_NAME);
// rename should be atomic
fs::rename(&control_partial_path, &control_path).await?;
// this sync is not required by any standard but postgres does this (see durable_rename)

View File

@@ -2,6 +2,7 @@
use camino::Utf8PathBuf;
use once_cell::sync::Lazy;
use remote_storage::RemoteStorageConfig;
use serde_json::Value;
use tokio::runtime::Runtime;
use std::time::Duration;
@@ -175,3 +176,24 @@ pub static METRICS_SHIFTER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
.build()
.expect("Failed to create broker runtime")
});
/// Merge json b into json a according to
/// https://www.rfc-editor.org/rfc/rfc7396
/// https://stackoverflow.com/a/54118457/4014587
pub fn json_merge(a: &mut Value, b: Value) {
if let Value::Object(a) = a {
if let Value::Object(b) = b {
for (k, v) in b {
if v.is_null() {
a.remove(&k);
} else {
json_merge(a.entry(k).or_insert(Value::Null), v);
}
}
return;
}
}
*a = b;
}