mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-04 03:52:56 +00:00
remote_storage config: move handling of empty inline table {} to callers (#8193)
Before this PR, `RemoteStorageConfig::from_toml` would support
deserializing an
empty `{}` TOML inline table to a `None`, otherwise try `Some()`.
We can instead let
* in proxy: let clap derive handle the Option
* in PS & SK: assume that if the field is specified, it must be a valid
RemtoeStorageConfig
(This PR started with a much simpler goal of factoring out the
`deserialize_item` function because I need that in another PR).
This commit is contained in:
committed by
GitHub
parent
0497b99f3a
commit
7dcdbaa25e
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -6811,6 +6811,7 @@ dependencies = [
|
||||
"tokio-stream",
|
||||
"tokio-tar",
|
||||
"tokio-util",
|
||||
"toml_edit 0.19.10",
|
||||
"tracing",
|
||||
"tracing-error",
|
||||
"tracing-subscriber",
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use std::{fmt::Debug, num::NonZeroUsize, str::FromStr, time::Duration};
|
||||
|
||||
use anyhow::bail;
|
||||
use aws_sdk_s3::types::StorageClass;
|
||||
use camino::Utf8PathBuf;
|
||||
|
||||
@@ -176,20 +175,8 @@ fn serialize_storage_class<S: serde::Serializer>(
|
||||
impl RemoteStorageConfig {
|
||||
pub const DEFAULT_TIMEOUT: Duration = std::time::Duration::from_secs(120);
|
||||
|
||||
pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result<Option<RemoteStorageConfig>> {
|
||||
let document: toml_edit::Document = match toml {
|
||||
toml_edit::Item::Table(toml) => toml.clone().into(),
|
||||
toml_edit::Item::Value(toml_edit::Value::InlineTable(toml)) => {
|
||||
toml.clone().into_table().into()
|
||||
}
|
||||
_ => bail!("toml not a table or inline table"),
|
||||
};
|
||||
|
||||
if document.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
Ok(Some(toml_edit::de::from_document(document)?))
|
||||
pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result<RemoteStorageConfig> {
|
||||
Ok(utils::toml_edit_ext::deserialize_item(toml)?)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -197,7 +184,7 @@ impl RemoteStorageConfig {
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn parse(input: &str) -> anyhow::Result<Option<RemoteStorageConfig>> {
|
||||
fn parse(input: &str) -> anyhow::Result<RemoteStorageConfig> {
|
||||
let toml = input.parse::<toml_edit::Document>().unwrap();
|
||||
RemoteStorageConfig::from_toml(toml.as_item())
|
||||
}
|
||||
@@ -207,7 +194,7 @@ mod tests {
|
||||
let input = "local_path = '.'
|
||||
timeout = '5s'";
|
||||
|
||||
let config = parse(input).unwrap().expect("it exists");
|
||||
let config = parse(input).unwrap();
|
||||
|
||||
assert_eq!(
|
||||
config,
|
||||
@@ -229,7 +216,7 @@ timeout = '5s'";
|
||||
timeout = '7s'
|
||||
";
|
||||
|
||||
let config = parse(toml).unwrap().expect("it exists");
|
||||
let config = parse(toml).unwrap();
|
||||
|
||||
assert_eq!(
|
||||
config,
|
||||
@@ -257,7 +244,7 @@ timeout = '5s'";
|
||||
timeout = '7s'
|
||||
";
|
||||
|
||||
let config = parse(toml).unwrap().expect("it exists");
|
||||
let config = parse(toml).unwrap();
|
||||
|
||||
assert_eq!(
|
||||
config,
|
||||
|
||||
@@ -40,6 +40,7 @@ thiserror.workspace = true
|
||||
tokio.workspace = true
|
||||
tokio-tar.workspace = true
|
||||
tokio-util.workspace = true
|
||||
toml_edit.workspace = true
|
||||
tracing.workspace = true
|
||||
tracing-error.workspace = true
|
||||
tracing-subscriber = { workspace = true, features = ["json", "registry"] }
|
||||
|
||||
@@ -94,6 +94,8 @@ pub mod env;
|
||||
|
||||
pub mod poison;
|
||||
|
||||
pub mod toml_edit_ext;
|
||||
|
||||
/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
|
||||
///
|
||||
/// we have several cases:
|
||||
|
||||
22
libs/utils/src/toml_edit_ext.rs
Normal file
22
libs/utils/src/toml_edit_ext.rs
Normal file
@@ -0,0 +1,22 @@
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum Error {
|
||||
#[error("item is not a document")]
|
||||
ItemIsNotADocument,
|
||||
#[error(transparent)]
|
||||
Serde(toml_edit::de::Error),
|
||||
}
|
||||
|
||||
pub fn deserialize_item<T>(item: &toml_edit::Item) -> Result<T, Error>
|
||||
where
|
||||
T: serde::de::DeserializeOwned,
|
||||
{
|
||||
let document: toml_edit::Document = match item {
|
||||
toml_edit::Item::Table(toml) => toml.clone().into(),
|
||||
toml_edit::Item::Value(toml_edit::Value::InlineTable(toml)) => {
|
||||
toml.clone().into_table().into()
|
||||
}
|
||||
_ => return Err(Error::ItemIsNotADocument),
|
||||
};
|
||||
|
||||
toml_edit::de::from_document(document).map_err(Error::Serde)
|
||||
}
|
||||
@@ -178,7 +178,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
let toml_item = toml_document
|
||||
.get("remote_storage")
|
||||
.expect("need remote_storage");
|
||||
let config = RemoteStorageConfig::from_toml(toml_item)?.expect("incomplete config");
|
||||
let config = RemoteStorageConfig::from_toml(toml_item)?;
|
||||
let storage = remote_storage::GenericRemoteStorage::from_config(&config);
|
||||
let cancel = CancellationToken::new();
|
||||
storage
|
||||
|
||||
@@ -159,7 +159,7 @@ pub mod defaults {
|
||||
|
||||
#ephemeral_bytes_per_memory_kb = {DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB}
|
||||
|
||||
[remote_storage]
|
||||
#[remote_storage]
|
||||
|
||||
"#
|
||||
);
|
||||
@@ -918,7 +918,7 @@ impl PageServerConf {
|
||||
"http_auth_type" => builder.http_auth_type(parse_toml_from_str(key, item)?),
|
||||
"pg_auth_type" => builder.pg_auth_type(parse_toml_from_str(key, item)?),
|
||||
"remote_storage" => {
|
||||
builder.remote_storage_config(RemoteStorageConfig::from_toml(item)?)
|
||||
builder.remote_storage_config(Some(RemoteStorageConfig::from_toml(item).context("remote_storage")?))
|
||||
}
|
||||
"tenant_config" => {
|
||||
t_conf = TenantConfOpt::try_from(item.to_owned()).context(format!("failed to parse: '{key}'"))?;
|
||||
@@ -946,7 +946,7 @@ impl PageServerConf {
|
||||
builder.metric_collection_endpoint(Some(endpoint));
|
||||
},
|
||||
"metric_collection_bucket" => {
|
||||
builder.metric_collection_bucket(RemoteStorageConfig::from_toml(item)?)
|
||||
builder.metric_collection_bucket(Some(RemoteStorageConfig::from_toml(item)?))
|
||||
}
|
||||
"synthetic_size_calculation_interval" =>
|
||||
builder.synthetic_size_calculation_interval(parse_toml_duration(key, item)?),
|
||||
@@ -1681,6 +1681,19 @@ threshold = "20m"
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn empty_remote_storage_is_error() {
|
||||
let tempdir = tempdir().unwrap();
|
||||
let (workdir, _) = prepare_fs(&tempdir).unwrap();
|
||||
let input = r#"
|
||||
remote_storage = {}
|
||||
"#;
|
||||
let doc = toml_edit::Document::from_str(input).unwrap();
|
||||
let err = PageServerConf::parse_and_validate(&doc, &workdir)
|
||||
.expect_err("empty remote_storage field should fail, don't specify it if you want no remote_storage");
|
||||
assert!(format!("{err}").contains("remote_storage"), "{err}");
|
||||
}
|
||||
|
||||
fn prepare_fs(tempdir: &Utf8TempDir) -> anyhow::Result<(Utf8PathBuf, Utf8PathBuf)> {
|
||||
let tempdir_path = tempdir.path();
|
||||
|
||||
|
||||
@@ -35,6 +35,7 @@ use proxy::usage_metrics;
|
||||
use anyhow::bail;
|
||||
use proxy::config::{self, ProxyConfig};
|
||||
use proxy::serverless;
|
||||
use remote_storage::RemoteStorageConfig;
|
||||
use std::net::SocketAddr;
|
||||
use std::pin::pin;
|
||||
use std::sync::Arc;
|
||||
@@ -205,8 +206,8 @@ struct ProxyCliArgs {
|
||||
/// remote storage configuration for backup metric collection
|
||||
/// Encoded as toml (same format as pageservers), eg
|
||||
/// `{bucket_name='the-bucket',bucket_region='us-east-1',prefix_in_bucket='proxy',endpoint='http://minio:9000'}`
|
||||
#[clap(long, default_value = "{}")]
|
||||
metric_backup_collection_remote_storage: String,
|
||||
#[clap(long, value_parser = remote_storage_from_toml)]
|
||||
metric_backup_collection_remote_storage: Option<RemoteStorageConfig>,
|
||||
/// chunk size for backup metric collection
|
||||
/// Size of each event is no more than 400 bytes, so 2**22 is about 200MB before the compression.
|
||||
#[clap(long, default_value = "4194304")]
|
||||
@@ -511,9 +512,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
}
|
||||
let backup_metric_collection_config = config::MetricBackupCollectionConfig {
|
||||
interval: args.metric_backup_collection_interval,
|
||||
remote_storage_config: remote_storage_from_toml(
|
||||
&args.metric_backup_collection_remote_storage,
|
||||
)?,
|
||||
remote_storage_config: args.metric_backup_collection_remote_storage.clone(),
|
||||
chunk_size: args.metric_backup_collection_chunk_size,
|
||||
};
|
||||
|
||||
|
||||
@@ -399,15 +399,11 @@ impl FromStr for EndpointCacheConfig {
|
||||
#[derive(Debug)]
|
||||
pub struct MetricBackupCollectionConfig {
|
||||
pub interval: Duration,
|
||||
pub remote_storage_config: OptRemoteStorageConfig,
|
||||
pub remote_storage_config: Option<RemoteStorageConfig>,
|
||||
pub chunk_size: usize,
|
||||
}
|
||||
|
||||
/// Hack to avoid clap being smarter. If you don't use this type alias, clap assumes more about the optional state and you get
|
||||
/// runtime type errors from the value parser we use.
|
||||
pub type OptRemoteStorageConfig = Option<RemoteStorageConfig>;
|
||||
|
||||
pub fn remote_storage_from_toml(s: &str) -> anyhow::Result<OptRemoteStorageConfig> {
|
||||
pub fn remote_storage_from_toml(s: &str) -> anyhow::Result<RemoteStorageConfig> {
|
||||
RemoteStorageConfig::from_toml(&s.parse()?)
|
||||
}
|
||||
|
||||
|
||||
@@ -14,17 +14,14 @@ use parquet::{
|
||||
record::RecordWriter,
|
||||
};
|
||||
use pq_proto::StartupMessageParams;
|
||||
use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel};
|
||||
use remote_storage::{GenericRemoteStorage, RemotePath, RemoteStorageConfig, TimeoutOrCancel};
|
||||
use serde::ser::SerializeMap;
|
||||
use tokio::{sync::mpsc, time};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, info, Span};
|
||||
use utils::backoff;
|
||||
|
||||
use crate::{
|
||||
config::{remote_storage_from_toml, OptRemoteStorageConfig},
|
||||
context::LOG_CHAN_DISCONNECT,
|
||||
};
|
||||
use crate::{config::remote_storage_from_toml, context::LOG_CHAN_DISCONNECT};
|
||||
|
||||
use super::{RequestMonitoring, LOG_CHAN};
|
||||
|
||||
@@ -33,11 +30,11 @@ pub struct ParquetUploadArgs {
|
||||
/// Storage location to upload the parquet files to.
|
||||
/// Encoded as toml (same format as pageservers), eg
|
||||
/// `{bucket_name='the-bucket',bucket_region='us-east-1',prefix_in_bucket='proxy',endpoint='http://minio:9000'}`
|
||||
#[clap(long, default_value = "{}", value_parser = remote_storage_from_toml)]
|
||||
parquet_upload_remote_storage: OptRemoteStorageConfig,
|
||||
#[clap(long, value_parser = remote_storage_from_toml)]
|
||||
parquet_upload_remote_storage: Option<RemoteStorageConfig>,
|
||||
|
||||
#[clap(long, default_value = "{}", value_parser = remote_storage_from_toml)]
|
||||
parquet_upload_disconnect_events_remote_storage: OptRemoteStorageConfig,
|
||||
#[clap(long, value_parser = remote_storage_from_toml)]
|
||||
parquet_upload_disconnect_events_remote_storage: Option<RemoteStorageConfig>,
|
||||
|
||||
/// How many rows to include in a row group
|
||||
#[clap(long, default_value_t = 8192)]
|
||||
|
||||
@@ -12,7 +12,6 @@ use sd_notify::NotifyState;
|
||||
use tokio::runtime::Handle;
|
||||
use tokio::signal::unix::{signal, SignalKind};
|
||||
use tokio::task::JoinError;
|
||||
use toml_edit::Document;
|
||||
use utils::logging::SecretString;
|
||||
|
||||
use std::env::{var, VarError};
|
||||
@@ -126,7 +125,7 @@ struct Args {
|
||||
peer_recovery: bool,
|
||||
/// Remote storage configuration for WAL backup (offloading to s3) as TOML
|
||||
/// inline table, e.g.
|
||||
/// {"max_concurrent_syncs" = 17, "max_sync_errors": 13, "bucket_name": "<BUCKETNAME>", "bucket_region":"<REGION>", "concurrency_limit": 119}
|
||||
/// {max_concurrent_syncs = 17, max_sync_errors = 13, bucket_name = "<BUCKETNAME>", bucket_region = "<REGION>", concurrency_limit = 119}
|
||||
/// Safekeeper offloads WAL to
|
||||
/// [prefix_in_bucket/]<tenant_id>/<timeline_id>/<segment_file>, mirroring
|
||||
/// structure on the file system.
|
||||
@@ -553,16 +552,8 @@ fn set_id(workdir: &Utf8Path, given_id: Option<NodeId>) -> Result<NodeId> {
|
||||
Ok(my_id)
|
||||
}
|
||||
|
||||
// Parse RemoteStorage from TOML table.
|
||||
fn parse_remote_storage(storage_conf: &str) -> anyhow::Result<RemoteStorageConfig> {
|
||||
// funny toml doesn't consider plain inline table as valid document, so wrap in a key to parse
|
||||
let storage_conf_toml = format!("remote_storage = {storage_conf}");
|
||||
let parsed_toml = storage_conf_toml.parse::<Document>()?; // parse
|
||||
let (_, storage_conf_parsed_toml) = parsed_toml.iter().next().unwrap(); // and strip key off again
|
||||
RemoteStorageConfig::from_toml(storage_conf_parsed_toml).and_then(|parsed_config| {
|
||||
// XXX: Don't print the original toml here, there might be some sensitive data
|
||||
parsed_config.context("Incorrectly parsed remote storage toml as no remote storage config")
|
||||
})
|
||||
RemoteStorageConfig::from_toml(&storage_conf.parse()?)
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -1167,7 +1167,9 @@ class NeonEnv:
|
||||
if config.auth_enabled:
|
||||
sk_cfg["auth_enabled"] = True
|
||||
if self.safekeepers_remote_storage is not None:
|
||||
sk_cfg["remote_storage"] = self.safekeepers_remote_storage.to_toml_inline_table()
|
||||
sk_cfg[
|
||||
"remote_storage"
|
||||
] = self.safekeepers_remote_storage.to_toml_inline_table().strip()
|
||||
self.safekeepers.append(Safekeeper(env=self, id=id, port=port))
|
||||
cfg["safekeepers"].append(sk_cfg)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user