diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index 33047051df..9a84c584bc 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -8,6 +8,8 @@ use futures::future::BoxFuture; use futures::stream::FuturesUnordered; use futures::{FutureExt, StreamExt}; use remote_storage::RemoteStorageConfig; +use safekeeper::control_file::FileStorage; +use safekeeper::state::TimelinePersistentState; use sd_notify::NotifyState; use tokio::runtime::Handle; use tokio::signal::unix::{signal, SignalKind}; @@ -30,12 +32,12 @@ use safekeeper::defaults::{ DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES, DEFAULT_PG_LISTEN_ADDR, }; -use safekeeper::wal_service; use safekeeper::GlobalTimelines; use safekeeper::SafeKeeperConf; use safekeeper::{broker, WAL_SERVICE_RUNTIME}; use safekeeper::{control_file, BROKER_RUNTIME}; use safekeeper::{http, WAL_REMOVER_RUNTIME}; +use safekeeper::{json_merge, wal_service}; use safekeeper::{remove_wal, WAL_BACKUP_RUNTIME}; use safekeeper::{wal_backup, HTTP_RUNTIME}; use storage_broker::DEFAULT_ENDPOINT; @@ -105,9 +107,6 @@ struct Args { /// Do not wait for changes to be written safely to disk. Unsafe. #[arg(short, long)] no_sync: bool, - /// Dump control file at path specified by this argument and exit. - #[arg(long)] - dump_control_file: Option, /// Broker endpoint for storage nodes coordination in the form /// http[s]://host:port. In case of https schema TLS is connection is /// established; plaintext otherwise. @@ -166,6 +165,21 @@ struct Args { /// useful for debugging. #[arg(long)] current_thread_runtime: bool, + /// Dump control file at path specified by this argument and exit. + #[arg(long)] + dump_control_file: Option, + /// Patch control file at path specified by this argument and exit. + /// Patch is specified in --patch option and imposed over + /// control file as per rfc7386. + /// Without --write-patched the result is only printed. + #[arg(long, verbatim_doc_comment)] + patch_control_file: Option, + /// The patch to apply to control file at --patch-control-file, in JSON. + #[arg(long, default_value = None)] + patch: Option, + /// Write --patch-control-file result back in place. + #[arg(long, default_value = "false")] + write_patched: bool, } // Like PathBufValueParser, but allows empty string. @@ -207,7 +221,13 @@ async fn main() -> anyhow::Result<()> { if let Some(addr) = args.dump_control_file { let state = control_file::FileStorage::load_control_file(addr)?; let json = serde_json::to_string(&state)?; - print!("{json}"); + println!("{json}"); + return Ok(()); + } + + if let Some(cfile_path) = args.patch_control_file { + let patch = args.patch.ok_or(anyhow::anyhow!("patch is missing"))?; + patch_control_file(cfile_path, patch, args.write_patched).await?; return Ok(()); } @@ -529,6 +549,26 @@ fn parse_remote_storage(storage_conf: &str) -> anyhow::Result anyhow::Result<()> { + let state = control_file::FileStorage::load_control_file(&cfile_path)?; + // serialize to json, impose patch and deserialize back + let mut state_json = + serde_json::to_value(state).context("failed to serialize state to json")?; + let patch_json = serde_json::from_str(&patch).context("failed to parse patch")?; + json_merge(&mut state_json, patch_json); + let patched_state: TimelinePersistentState = + serde_json::from_value(state_json.clone()).context("failed to deserialize patched json")?; + println!("{state_json}"); + if write { + FileStorage::do_persist(&patched_state, &cfile_path, true).await?; + } + return Ok(()); +} + #[test] fn verify_cli() { use clap::CommandFactory; diff --git a/safekeeper/src/control_file.rs b/safekeeper/src/control_file.rs index f1daddd7c3..0644240c22 100644 --- a/safekeeper/src/control_file.rs +++ b/safekeeper/src/control_file.rs @@ -2,7 +2,7 @@ use anyhow::{bail, ensure, Context, Result}; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; -use camino::Utf8PathBuf; +use camino::{Utf8Path, Utf8PathBuf}; use tokio::fs::{self, File}; use tokio::io::AsyncWriteExt; @@ -155,6 +155,46 @@ impl FileStorage { })?; Ok(state) } + + /// Persist state s to dst_path, optionally fsyncing file. + pub async fn do_persist( + s: &TimelinePersistentState, + dst_path: &Utf8Path, + sync: bool, + ) -> Result<()> { + let mut f = File::create(&dst_path) + .await + .with_context(|| format!("failed to create partial control file at: {}", &dst_path))?; + let mut buf: Vec = Vec::new(); + WriteBytesExt::write_u32::(&mut buf, SK_MAGIC)?; + WriteBytesExt::write_u32::(&mut buf, SK_FORMAT_VERSION)?; + s.ser_into(&mut buf)?; + + // calculate checksum before resize + let checksum = crc32c::crc32c(&buf); + buf.extend_from_slice(&checksum.to_le_bytes()); + + f.write_all(&buf).await.with_context(|| { + format!( + "failed to write safekeeper state into control file at: {}", + dst_path + ) + })?; + f.flush().await.with_context(|| { + format!( + "failed to flush safekeeper state into control file at: {}", + dst_path + ) + })?; + + // fsync the file + if sync { + f.sync_all() + .await + .with_context(|| format!("failed to sync partial control file at {}", dst_path))?; + } + Ok(()) + } } impl Deref for FileStorage { @@ -167,7 +207,7 @@ impl Deref for FileStorage { #[async_trait::async_trait] impl Storage for FileStorage { - /// Persists state durably to the underlying storage. + /// Atomically persists state durably to the underlying storage. /// /// For a description, see . async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> { @@ -175,46 +215,9 @@ impl Storage for FileStorage { // write data to safekeeper.control.partial let control_partial_path = self.timeline_dir.join(CONTROL_FILE_NAME_PARTIAL); - let mut control_partial = File::create(&control_partial_path).await.with_context(|| { - format!( - "failed to create partial control file at: {}", - &control_partial_path - ) - })?; - let mut buf: Vec = Vec::new(); - WriteBytesExt::write_u32::(&mut buf, SK_MAGIC)?; - WriteBytesExt::write_u32::(&mut buf, SK_FORMAT_VERSION)?; - s.ser_into(&mut buf)?; - - // calculate checksum before resize - let checksum = crc32c::crc32c(&buf); - buf.extend_from_slice(&checksum.to_le_bytes()); - - control_partial.write_all(&buf).await.with_context(|| { - format!( - "failed to write safekeeper state into control file at: {}", - control_partial_path - ) - })?; - control_partial.flush().await.with_context(|| { - format!( - "failed to flush safekeeper state into control file at: {}", - control_partial_path - ) - })?; - - // fsync the file - if !self.conf.no_sync { - control_partial.sync_all().await.with_context(|| { - format!( - "failed to sync partial control file at {}", - control_partial_path - ) - })?; - } + FileStorage::do_persist(s, &control_partial_path, !self.conf.no_sync).await?; let control_path = self.timeline_dir.join(CONTROL_FILE_NAME); - // rename should be atomic fs::rename(&control_partial_path, &control_path).await?; // this sync is not required by any standard but postgres does this (see durable_rename) diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index f18a1ec22d..83b16dadb3 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -2,6 +2,7 @@ use camino::Utf8PathBuf; use once_cell::sync::Lazy; use remote_storage::RemoteStorageConfig; +use serde_json::Value; use tokio::runtime::Runtime; use std::time::Duration; @@ -175,3 +176,24 @@ pub static METRICS_SHIFTER_RUNTIME: Lazy = Lazy::new(|| { .build() .expect("Failed to create broker runtime") }); + +/// Merge json b into json a according to +/// https://www.rfc-editor.org/rfc/rfc7396 +/// https://stackoverflow.com/a/54118457/4014587 +pub fn json_merge(a: &mut Value, b: Value) { + if let Value::Object(a) = a { + if let Value::Object(b) = b { + for (k, v) in b { + if v.is_null() { + a.remove(&k); + } else { + json_merge(a.entry(k).or_insert(Value::Null), v); + } + } + + return; + } + } + + *a = b; +}