mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-30 03:20:36 +00:00
Merge branch 'communicator-rewrite' of https://github.com/neondatabase/neon into communicator-rewrite
This commit is contained in:
@@ -46,16 +46,45 @@ pub struct ExtensionInstallResponse {
|
||||
pub version: ExtVersion,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Default, Debug, Clone)]
|
||||
/// Status of the LFC prewarm process. The same state machine is reused for
|
||||
/// both autoprewarm (prewarm after compute/Postgres start using the previously
|
||||
/// stored LFC state) and explicit prewarming via API.
|
||||
#[derive(Serialize, Default, Debug, Clone, PartialEq)]
|
||||
#[serde(tag = "status", rename_all = "snake_case")]
|
||||
pub enum LfcPrewarmState {
|
||||
/// Default value when compute boots up.
|
||||
#[default]
|
||||
NotPrewarmed,
|
||||
/// Prewarming thread is active and loading pages into LFC.
|
||||
Prewarming,
|
||||
/// We found requested LFC state in the endpoint storage and
|
||||
/// completed prewarming successfully.
|
||||
Completed,
|
||||
Failed {
|
||||
error: String,
|
||||
},
|
||||
/// Unexpected error happened during prewarming. Note, `Not Found 404`
|
||||
/// response from the endpoint storage is explicitly excluded here
|
||||
/// because it can normally happen on the first compute start,
|
||||
/// since LFC state is not available yet.
|
||||
Failed { error: String },
|
||||
/// We tried to fetch the corresponding LFC state from the endpoint storage,
|
||||
/// but received `Not Found 404`. This should normally happen only during the
|
||||
/// first endpoint start after creation with `autoprewarm: true`.
|
||||
///
|
||||
/// During the orchestrated prewarm via API, when a caller explicitly
|
||||
/// provides the LFC state key to prewarm from, it's the caller responsibility
|
||||
/// to handle this status as an error state in this case.
|
||||
Skipped,
|
||||
}
|
||||
|
||||
impl Display for LfcPrewarmState {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
LfcPrewarmState::NotPrewarmed => f.write_str("NotPrewarmed"),
|
||||
LfcPrewarmState::Prewarming => f.write_str("Prewarming"),
|
||||
LfcPrewarmState::Completed => f.write_str("Completed"),
|
||||
LfcPrewarmState::Skipped => f.write_str("Skipped"),
|
||||
LfcPrewarmState::Failed { error } => write!(f, "Error({error})"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Default, Debug, Clone, PartialEq)]
|
||||
@@ -70,6 +99,23 @@ pub enum LfcOffloadState {
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Serialize, Debug, Clone, PartialEq)]
|
||||
#[serde(tag = "status", rename_all = "snake_case")]
|
||||
/// Response of /promote
|
||||
pub enum PromoteState {
|
||||
NotPromoted,
|
||||
Completed,
|
||||
Failed { error: String },
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Default, Debug, Clone)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
/// Result of /safekeepers_lsn
|
||||
pub struct SafekeepersLsn {
|
||||
pub safekeepers: String,
|
||||
pub wal_flush_lsn: utils::lsn::Lsn,
|
||||
}
|
||||
|
||||
/// Response of the /status API
|
||||
#[derive(Serialize, Debug, Deserialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
@@ -93,6 +139,15 @@ pub enum TerminateMode {
|
||||
Immediate,
|
||||
}
|
||||
|
||||
impl From<TerminateMode> for ComputeStatus {
|
||||
fn from(mode: TerminateMode) -> Self {
|
||||
match mode {
|
||||
TerminateMode::Fast => ComputeStatus::TerminationPendingFast,
|
||||
TerminateMode::Immediate => ComputeStatus::TerminationPendingImmediate,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Clone, Copy, Debug, Deserialize, PartialEq, Eq)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ComputeStatus {
|
||||
@@ -113,7 +168,9 @@ pub enum ComputeStatus {
|
||||
// control-plane to terminate it.
|
||||
Failed,
|
||||
// Termination requested
|
||||
TerminationPending { mode: TerminateMode },
|
||||
TerminationPendingFast,
|
||||
// Termination requested, without waiting 30s before returning from /terminate
|
||||
TerminationPendingImmediate,
|
||||
// Terminated Postgres
|
||||
Terminated,
|
||||
}
|
||||
@@ -132,7 +189,10 @@ impl Display for ComputeStatus {
|
||||
ComputeStatus::Running => f.write_str("running"),
|
||||
ComputeStatus::Configuration => f.write_str("configuration"),
|
||||
ComputeStatus::Failed => f.write_str("failed"),
|
||||
ComputeStatus::TerminationPending { .. } => f.write_str("termination-pending"),
|
||||
ComputeStatus::TerminationPendingFast => f.write_str("termination-pending-fast"),
|
||||
ComputeStatus::TerminationPendingImmediate => {
|
||||
f.write_str("termination-pending-immediate")
|
||||
}
|
||||
ComputeStatus::Terminated => f.write_str("terminated"),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@ use serde::{Deserialize, Serialize};
|
||||
use url::Url;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::shard::{ShardCount, ShardIndex};
|
||||
|
||||
use crate::responses::TlsConfig;
|
||||
|
||||
@@ -106,11 +107,18 @@ pub struct ComputeSpec {
|
||||
pub tenant_id: Option<TenantId>,
|
||||
pub timeline_id: Option<TimelineId>,
|
||||
|
||||
// Pageserver information can be passed in two different ways:
|
||||
// 1. Here
|
||||
// 2. in cluster.settings. This is legacy, we are switching to method 1.
|
||||
/// Pageserver information can be passed in three different ways:
|
||||
/// 1. Here in `pageserver_connection_info`
|
||||
/// 2. In the `pageserver_connstring` field.
|
||||
/// 3. in `cluster.settings`.
|
||||
///
|
||||
/// The goal is to use method 1. everywhere. But for backwards-compatibility with old
|
||||
/// versions of the control plane, `compute_ctl` will check 2. and 3. if the
|
||||
/// `pageserver_connection_info` field is missing.
|
||||
pub pageserver_connection_info: Option<PageserverConnectionInfo>,
|
||||
|
||||
pub pageserver_connstring: Option<String>,
|
||||
|
||||
// More neon ids that we expose to the compute_ctl
|
||||
// and to postgres as neon extension GUCs.
|
||||
pub project_id: Option<String>,
|
||||
@@ -145,7 +153,7 @@ pub struct ComputeSpec {
|
||||
|
||||
// Stripe size for pageserver sharding, in pages
|
||||
#[serde(default)]
|
||||
pub shard_stripe_size: Option<usize>,
|
||||
pub shard_stripe_size: Option<u32>,
|
||||
|
||||
/// Local Proxy configuration used for JWT authentication
|
||||
#[serde(default)]
|
||||
@@ -218,16 +226,28 @@ pub enum ComputeFeature {
|
||||
UnknownFeature,
|
||||
}
|
||||
|
||||
/// Feature flag to signal `compute_ctl` to enable certain experimental functionality.
|
||||
#[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)]
|
||||
#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
|
||||
pub struct PageserverConnectionInfo {
|
||||
pub shards: HashMap<u32, PageserverShardConnectionInfo>,
|
||||
/// NB: 0 for unsharded tenants, 1 for sharded tenants with 1 shard, following storage
|
||||
pub shard_count: ShardCount,
|
||||
|
||||
pub prefer_grpc: bool,
|
||||
/// INVARIANT: null if shard_count is 0, otherwise non-null and immutable
|
||||
pub stripe_size: Option<u32>,
|
||||
|
||||
pub shards: HashMap<ShardIndex, PageserverShardInfo>,
|
||||
|
||||
#[serde(default)]
|
||||
pub prefer_protocol: PageserverProtocol,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)]
|
||||
#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
|
||||
pub struct PageserverShardInfo {
|
||||
pub pageservers: Vec<PageserverShardConnectionInfo>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
|
||||
pub struct PageserverShardConnectionInfo {
|
||||
pub id: Option<String>,
|
||||
pub libpq_url: Option<String>,
|
||||
pub grpc_url: Option<String>,
|
||||
}
|
||||
@@ -465,13 +485,15 @@ pub struct JwksSettings {
|
||||
pub jwt_audience: Option<String>,
|
||||
}
|
||||
|
||||
/// Protocol used to connect to a Pageserver. Parsed from the connstring scheme.
|
||||
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
|
||||
/// Protocol used to connect to a Pageserver.
|
||||
#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
|
||||
pub enum PageserverProtocol {
|
||||
/// The original protocol based on libpq and COPY. Uses postgresql:// or postgres:// scheme.
|
||||
#[default]
|
||||
#[serde(rename = "libpq")]
|
||||
Libpq,
|
||||
/// A newer, gRPC-based protocol. Uses grpc:// scheme.
|
||||
#[serde(rename = "grpc")]
|
||||
Grpc,
|
||||
}
|
||||
|
||||
|
||||
@@ -20,6 +20,7 @@ use tokio_stream::wrappers::ReceiverStream;
|
||||
use tokio_util::io::ReaderStream;
|
||||
use tracing::{Instrument, debug, info, info_span, warn};
|
||||
use utils::auth::{AuthError, Claims, SwappableJwtAuth};
|
||||
use utils::metrics_collector::{METRICS_COLLECTOR, METRICS_STALE_MILLIS};
|
||||
|
||||
use crate::error::{ApiError, api_error_handler, route_error_handler};
|
||||
use crate::request::{get_query_param, parse_query_param};
|
||||
@@ -250,9 +251,28 @@ impl std::io::Write for ChannelWriter {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
pub async fn prometheus_metrics_handler(
|
||||
req: Request<Body>,
|
||||
force_metric_collection_on_scrape: bool,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
SERVE_METRICS_COUNT.inc();
|
||||
|
||||
// HADRON
|
||||
let requested_use_latest = parse_query_param(&req, "use_latest")?;
|
||||
|
||||
let use_latest = match requested_use_latest {
|
||||
None => force_metric_collection_on_scrape,
|
||||
Some(true) => true,
|
||||
Some(false) => {
|
||||
if force_metric_collection_on_scrape {
|
||||
// We don't cache in this case
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let started_at = std::time::Instant::now();
|
||||
|
||||
let (tx, rx) = mpsc::channel(1);
|
||||
@@ -277,12 +297,18 @@ pub async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<
|
||||
|
||||
let _span = span.entered();
|
||||
|
||||
let metrics = metrics::gather();
|
||||
// HADRON
|
||||
let collected = if use_latest {
|
||||
// Skip caching the results if we always force metric collection on scrape.
|
||||
METRICS_COLLECTOR.run_once(!force_metric_collection_on_scrape)
|
||||
} else {
|
||||
METRICS_COLLECTOR.last_collected()
|
||||
};
|
||||
|
||||
let gathered_at = std::time::Instant::now();
|
||||
|
||||
let res = encoder
|
||||
.encode(&metrics, &mut writer)
|
||||
.encode(&collected.metrics, &mut writer)
|
||||
.and_then(|_| writer.flush().map_err(|e| e.into()));
|
||||
|
||||
// this instant is not when we finally got the full response sent, sending is done by hyper
|
||||
@@ -295,6 +321,10 @@ pub async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<
|
||||
let encoded_in = encoded_at - gathered_at - writer.wait_time();
|
||||
let total = encoded_at - started_at;
|
||||
|
||||
// HADRON
|
||||
let staleness_ms = (encoded_at - collected.collected_at).as_millis();
|
||||
METRICS_STALE_MILLIS.set(staleness_ms as i64);
|
||||
|
||||
match res {
|
||||
Ok(()) => {
|
||||
tracing::info!(
|
||||
@@ -303,6 +333,7 @@ pub async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<
|
||||
spawning_ms = spawned_in.as_millis(),
|
||||
collection_ms = collected_in.as_millis(),
|
||||
encoding_ms = encoded_in.as_millis(),
|
||||
stalenss_ms = staleness_ms,
|
||||
"responded /metrics"
|
||||
);
|
||||
}
|
||||
|
||||
@@ -41,17 +41,35 @@ pub fn get_query_param<'a>(
|
||||
Some(q) => q,
|
||||
None => return Ok(None),
|
||||
};
|
||||
let mut values = url::form_urlencoded::parse(query.as_bytes())
|
||||
let values = url::form_urlencoded::parse(query.as_bytes())
|
||||
.filter_map(|(k, v)| if k == param_name { Some(v) } else { None })
|
||||
// we call .next() twice below. If it's None the first time, .fuse() ensures it's None afterwards
|
||||
.fuse();
|
||||
|
||||
let value1 = values.next();
|
||||
if values.next().is_some() {
|
||||
return Err(ApiError::BadRequest(anyhow!(
|
||||
"param {param_name} specified more than once"
|
||||
)));
|
||||
}
|
||||
// Work around an issue with Alloy's pyroscope scrape where the "seconds"
|
||||
// parameter is added several times. https://github.com/grafana/alloy/issues/3026
|
||||
// TODO: revert after Alloy is fixed.
|
||||
let value1 = values
|
||||
.map(Ok)
|
||||
.reduce(|acc, i| {
|
||||
match acc {
|
||||
Err(_) => acc,
|
||||
|
||||
// It's okay to have duplicates as along as they have the same value.
|
||||
Ok(ref a) if a == &i.unwrap() => acc,
|
||||
|
||||
_ => Err(ApiError::BadRequest(anyhow!(
|
||||
"param {param_name} specified more than once"
|
||||
))),
|
||||
}
|
||||
})
|
||||
.transpose()?;
|
||||
// if values.next().is_some() {
|
||||
// return Err(ApiError::BadRequest(anyhow!(
|
||||
// "param {param_name} specified more than once"
|
||||
// )));
|
||||
// }
|
||||
|
||||
Ok(value1)
|
||||
}
|
||||
|
||||
@@ -92,3 +110,39 @@ pub async fn ensure_no_body(request: &mut Request<Body>) -> Result<(), ApiError>
|
||||
None => Ok(()),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_get_query_param_duplicate() {
|
||||
let req = Request::builder()
|
||||
.uri("http://localhost:12345/testuri?testparam=1")
|
||||
.body(hyper::Body::empty())
|
||||
.unwrap();
|
||||
let value = get_query_param(&req, "testparam").unwrap();
|
||||
assert_eq!(value.unwrap(), "1");
|
||||
|
||||
let req = Request::builder()
|
||||
.uri("http://localhost:12345/testuri?testparam=1&testparam=1")
|
||||
.body(hyper::Body::empty())
|
||||
.unwrap();
|
||||
let value = get_query_param(&req, "testparam").unwrap();
|
||||
assert_eq!(value.unwrap(), "1");
|
||||
|
||||
let req = Request::builder()
|
||||
.uri("http://localhost:12345/testuri")
|
||||
.body(hyper::Body::empty())
|
||||
.unwrap();
|
||||
let value = get_query_param(&req, "testparam").unwrap();
|
||||
assert!(value.is_none());
|
||||
|
||||
let req = Request::builder()
|
||||
.uri("http://localhost:12345/testuri?testparam=1&testparam=2&testparam=3")
|
||||
.body(hyper::Body::empty())
|
||||
.unwrap();
|
||||
let value = get_query_param(&req, "testparam");
|
||||
assert!(value.is_err());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,12 +4,14 @@
|
||||
//! a default registry.
|
||||
#![deny(clippy::undocumented_unsafe_blocks)]
|
||||
|
||||
use std::sync::RwLock;
|
||||
|
||||
use measured::label::{LabelGroupSet, LabelGroupVisitor, LabelName, NoLabels};
|
||||
use measured::metric::counter::CounterState;
|
||||
use measured::metric::gauge::GaugeState;
|
||||
use measured::metric::group::Encoding;
|
||||
use measured::metric::name::{MetricName, MetricNameEncoder};
|
||||
use measured::metric::{MetricEncoding, MetricFamilyEncoding};
|
||||
use measured::metric::{MetricEncoding, MetricFamilyEncoding, MetricType};
|
||||
use measured::{FixedCardinalityLabel, LabelGroup, MetricGroup};
|
||||
use once_cell::sync::Lazy;
|
||||
use prometheus::Registry;
|
||||
@@ -116,12 +118,52 @@ pub fn pow2_buckets(start: usize, end: usize) -> Vec<f64> {
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub struct InfoMetric<L: LabelGroup, M: MetricType = GaugeState> {
|
||||
label: RwLock<L>,
|
||||
metric: M,
|
||||
}
|
||||
|
||||
impl<L: LabelGroup> InfoMetric<L> {
|
||||
pub fn new(label: L) -> Self {
|
||||
Self::with_metric(label, GaugeState::new(1))
|
||||
}
|
||||
}
|
||||
|
||||
impl<L: LabelGroup, M: MetricType<Metadata = ()>> InfoMetric<L, M> {
|
||||
pub fn with_metric(label: L, metric: M) -> Self {
|
||||
Self {
|
||||
label: RwLock::new(label),
|
||||
metric,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_label(&self, label: L) {
|
||||
*self.label.write().unwrap() = label;
|
||||
}
|
||||
}
|
||||
|
||||
impl<L, M, E> MetricFamilyEncoding<E> for InfoMetric<L, M>
|
||||
where
|
||||
L: LabelGroup,
|
||||
M: MetricEncoding<E, Metadata = ()>,
|
||||
E: Encoding,
|
||||
{
|
||||
fn collect_family_into(
|
||||
&self,
|
||||
name: impl measured::metric::name::MetricNameEncoder,
|
||||
enc: &mut E,
|
||||
) -> Result<(), E::Err> {
|
||||
M::write_type(&name, enc)?;
|
||||
self.metric
|
||||
.collect_into(&(), &*self.label.read().unwrap(), name, enc)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct BuildInfo {
|
||||
pub revision: &'static str,
|
||||
pub build_tag: &'static str,
|
||||
}
|
||||
|
||||
// todo: allow label group without the set
|
||||
impl LabelGroup for BuildInfo {
|
||||
fn visit_values(&self, v: &mut impl LabelGroupVisitor) {
|
||||
const REVISION: &LabelName = LabelName::from_str("revision");
|
||||
@@ -131,24 +173,6 @@ impl LabelGroup for BuildInfo {
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Encoding> MetricFamilyEncoding<T> for BuildInfo
|
||||
where
|
||||
GaugeState: MetricEncoding<T>,
|
||||
{
|
||||
fn collect_family_into(
|
||||
&self,
|
||||
name: impl measured::metric::name::MetricNameEncoder,
|
||||
enc: &mut T,
|
||||
) -> Result<(), T::Err> {
|
||||
enc.write_help(&name, "Build/version information")?;
|
||||
GaugeState::write_type(&name, enc)?;
|
||||
GaugeState {
|
||||
count: std::sync::atomic::AtomicI64::new(1),
|
||||
}
|
||||
.collect_into(&(), self, name, enc)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(MetricGroup)]
|
||||
#[metric(new(build_info: BuildInfo))]
|
||||
pub struct NeonMetrics {
|
||||
@@ -165,8 +189,8 @@ pub struct NeonMetrics {
|
||||
#[derive(MetricGroup)]
|
||||
#[metric(new(build_info: BuildInfo))]
|
||||
pub struct LibMetrics {
|
||||
#[metric(init = build_info)]
|
||||
build_info: BuildInfo,
|
||||
#[metric(init = InfoMetric::new(build_info))]
|
||||
build_info: InfoMetric<BuildInfo>,
|
||||
|
||||
#[metric(flatten)]
|
||||
rusage: Rusage,
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
//!
|
||||
//! Concurrency is managed very simply: the entire map is guarded by one shared-memory RwLock.
|
||||
|
||||
use std::fmt::Debug;
|
||||
use std::hash::{BuildHasher, Hash};
|
||||
use std::mem::MaybeUninit;
|
||||
|
||||
@@ -56,6 +57,22 @@ pub struct HashMapInit<'a, K, V, S = rustc_hash::FxBuildHasher> {
|
||||
num_buckets: u32,
|
||||
}
|
||||
|
||||
impl<'a, K, V, S> Debug for HashMapInit<'a, K, V, S>
|
||||
where
|
||||
K: Debug,
|
||||
V: Debug,
|
||||
{
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("HashMapInit")
|
||||
.field("shmem_handle", &self.shmem_handle)
|
||||
.field("shared_ptr", &self.shared_ptr)
|
||||
.field("shared_size", &self.shared_size)
|
||||
// .field("hasher", &self.hasher)
|
||||
.field("num_buckets", &self.num_buckets)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
/// This is a per-process handle to a hash table that (possibly) lives in shared memory.
|
||||
/// If a child process is launched with fork(), the child process should
|
||||
/// get its own HashMapAccess by calling HashMapInit::attach_writer/reader().
|
||||
@@ -71,6 +88,20 @@ pub struct HashMapAccess<'a, K, V, S = rustc_hash::FxBuildHasher> {
|
||||
unsafe impl<K: Sync, V: Sync, S> Sync for HashMapAccess<'_, K, V, S> {}
|
||||
unsafe impl<K: Send, V: Send, S> Send for HashMapAccess<'_, K, V, S> {}
|
||||
|
||||
impl<'a, K, V, S> Debug for HashMapAccess<'a, K, V, S>
|
||||
where
|
||||
K: Debug,
|
||||
V: Debug,
|
||||
{
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("HashMapAccess")
|
||||
.field("shmem_handle", &self.shmem_handle)
|
||||
.field("shared_ptr", &self.shared_ptr)
|
||||
// .field("hasher", &self.hasher)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, K: Clone + Hash + Eq, V, S> HashMapInit<'a, K, V, S> {
|
||||
/// Change the 'hasher' used by the hash table.
|
||||
///
|
||||
@@ -298,7 +329,7 @@ where
|
||||
|
||||
/// Get a reference to the entry containing a key.
|
||||
///
|
||||
/// NB: THis takes a write lock as there's no way to distinguish whether the intention
|
||||
/// NB: This takes a write lock as there's no way to distinguish whether the intention
|
||||
/// is to use the entry for reading or for writing in advance.
|
||||
pub fn entry(&self, key: K) -> Entry<'a, '_, K, V> {
|
||||
let hash = self.get_hash_value(&key);
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
//! Simple hash table with chaining.
|
||||
|
||||
use std::fmt::Debug;
|
||||
use std::hash::Hash;
|
||||
use std::mem::MaybeUninit;
|
||||
|
||||
@@ -17,6 +18,19 @@ pub(crate) struct Bucket<K, V> {
|
||||
pub(crate) inner: Option<(K, V)>,
|
||||
}
|
||||
|
||||
impl<K, V> Debug for Bucket<K, V>
|
||||
where
|
||||
K: Debug,
|
||||
V: Debug,
|
||||
{
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("Bucket")
|
||||
.field("next", &self.next)
|
||||
.field("inner", &self.inner)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
/// Core hash table implementation.
|
||||
pub(crate) struct CoreHashMap<'a, K, V> {
|
||||
/// Dictionary used to map hashes to bucket indices.
|
||||
@@ -31,6 +45,22 @@ pub(crate) struct CoreHashMap<'a, K, V> {
|
||||
pub(crate) buckets_in_use: u32,
|
||||
}
|
||||
|
||||
impl<'a, K, V> Debug for CoreHashMap<'a, K, V>
|
||||
where
|
||||
K: Debug,
|
||||
V: Debug,
|
||||
{
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("CoreHashMap")
|
||||
.field("dictionary", &self.dictionary)
|
||||
.field("buckets", &self.buckets)
|
||||
.field("free_head", &self.free_head)
|
||||
.field("alloc_limit", &self.alloc_limit)
|
||||
.field("buckets_in_use", &self.buckets_in_use)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
/// Error for when there are no empty buckets left but one is needed.
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct FullError;
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
//! Shared memory utilities for neon communicator
|
||||
|
||||
pub mod hash;
|
||||
pub mod shmem;
|
||||
pub mod sync;
|
||||
|
||||
@@ -21,6 +21,7 @@ use nix::unistd::ftruncate as nix_ftruncate;
|
||||
/// the underlying file is resized. Do not access the area beyond the current size. Currently, that
|
||||
/// will cause the file to be expanded, but we might use `mprotect()` etc. to enforce that in the
|
||||
/// future.
|
||||
#[derive(Debug)]
|
||||
pub struct ShmemHandle {
|
||||
/// memfd file descriptor
|
||||
fd: OwnedFd,
|
||||
@@ -35,6 +36,7 @@ pub struct ShmemHandle {
|
||||
}
|
||||
|
||||
/// This is stored at the beginning in the shared memory area.
|
||||
#[derive(Debug)]
|
||||
struct SharedStruct {
|
||||
max_size: usize,
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ mod tests;
|
||||
|
||||
use const_format::formatcp;
|
||||
use posthog_client_lite::PostHogClientConfig;
|
||||
use utils::serde_percent::Percent;
|
||||
pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
|
||||
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
|
||||
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
|
||||
@@ -223,8 +224,9 @@ pub struct ConfigToml {
|
||||
pub metric_collection_bucket: Option<RemoteStorageConfig>,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub synthetic_size_calculation_interval: Duration,
|
||||
pub disk_usage_based_eviction: Option<DiskUsageEvictionTaskConfig>,
|
||||
pub disk_usage_based_eviction: DiskUsageEvictionTaskConfig,
|
||||
pub test_remote_failures: u64,
|
||||
pub test_remote_failures_probability: u64,
|
||||
pub ondemand_download_behavior_treat_error_as_warn: bool,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub background_task_maximum_delay: Duration,
|
||||
@@ -270,9 +272,13 @@ pub struct ConfigToml {
|
||||
pub timeline_import_config: TimelineImportConfig,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub basebackup_cache_config: Option<BasebackupCacheConfig>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub image_layer_generation_large_timeline_threshold: Option<u64>,
|
||||
pub force_metric_collection_on_scrape: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(default)]
|
||||
pub struct DiskUsageEvictionTaskConfig {
|
||||
pub max_usage_pct: utils::serde_percent::Percent,
|
||||
pub min_avail_bytes: u64,
|
||||
@@ -283,6 +289,21 @@ pub struct DiskUsageEvictionTaskConfig {
|
||||
/// Select sorting for evicted layers
|
||||
#[serde(default)]
|
||||
pub eviction_order: EvictionOrder,
|
||||
pub enabled: bool,
|
||||
}
|
||||
|
||||
impl Default for DiskUsageEvictionTaskConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
max_usage_pct: Percent::new(80).unwrap(),
|
||||
min_avail_bytes: 2_000_000_000,
|
||||
period: Duration::from_secs(60),
|
||||
#[cfg(feature = "testing")]
|
||||
mock_statvfs: None,
|
||||
eviction_order: EvictionOrder::default(),
|
||||
enabled: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
@@ -543,6 +564,11 @@ pub struct TenantConfigToml {
|
||||
pub gc_period: Duration,
|
||||
// Delta layer churn threshold to create L1 image layers.
|
||||
pub image_creation_threshold: usize,
|
||||
// HADRON
|
||||
// When the timeout is reached, PageServer will (1) force compact any remaining L0 deltas and
|
||||
// (2) create image layers if there are any L1 deltas.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub image_layer_force_creation_period: Option<Duration>,
|
||||
// Determines how much history is retained, to allow
|
||||
// branching and read replicas at an older point in time.
|
||||
// The unit is time.
|
||||
@@ -738,9 +764,10 @@ impl Default for ConfigToml {
|
||||
|
||||
metric_collection_bucket: (None),
|
||||
|
||||
disk_usage_based_eviction: (None),
|
||||
disk_usage_based_eviction: DiskUsageEvictionTaskConfig::default(),
|
||||
|
||||
test_remote_failures: (0),
|
||||
test_remote_failures_probability: (100),
|
||||
|
||||
ondemand_download_behavior_treat_error_as_warn: (false),
|
||||
|
||||
@@ -804,6 +831,8 @@ impl Default for ConfigToml {
|
||||
},
|
||||
basebackup_cache_config: None,
|
||||
posthog_config: None,
|
||||
image_layer_generation_large_timeline_threshold: Some(2 * 1024 * 1024 * 1024),
|
||||
force_metric_collection_on_scrape: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -897,6 +926,7 @@ impl Default for TenantConfigToml {
|
||||
gc_period: humantime::parse_duration(DEFAULT_GC_PERIOD)
|
||||
.expect("cannot parse default gc period"),
|
||||
image_creation_threshold: DEFAULT_IMAGE_CREATION_THRESHOLD,
|
||||
image_layer_force_creation_period: None,
|
||||
pitr_interval: humantime::parse_duration(DEFAULT_PITR_INTERVAL)
|
||||
.expect("cannot parse default PITR interval"),
|
||||
walreceiver_connect_timeout: humantime::parse_duration(
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::fmt::Display;
|
||||
use std::net::IpAddr;
|
||||
use std::str::FromStr;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
@@ -10,7 +11,7 @@ use serde::{Deserialize, Serialize};
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::models::{PageserverUtilization, ShardParameters, TenantConfig};
|
||||
use crate::models::{PageserverUtilization, ShardParameters, TenantConfig, TimelineInfo};
|
||||
use crate::shard::{ShardStripeSize, TenantShardId};
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
@@ -60,6 +61,11 @@ pub struct NodeRegisterRequest {
|
||||
pub listen_https_port: Option<u16>,
|
||||
|
||||
pub availability_zone_id: AvailabilityZone,
|
||||
|
||||
// Reachable IP address of the PS/SK registering, if known.
|
||||
// Hadron Cluster Coordiantor will update the DNS record of the registering node
|
||||
// with this IP address.
|
||||
pub node_ip_addr: Option<IpAddr>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
@@ -126,6 +132,13 @@ pub struct TenantDescribeResponse {
|
||||
pub config: TenantConfig,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct TenantTimelineDescribeResponse {
|
||||
pub shards: Vec<TimelineInfo>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub image_consistent_lsn: Option<Lsn>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct NodeShardResponse {
|
||||
pub node_id: NodeId,
|
||||
@@ -538,6 +551,39 @@ pub struct SafekeeperDescribeResponse {
|
||||
pub scheduling_policy: SkSchedulingPolicy,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
pub struct TimelineSafekeeperPeer {
|
||||
pub node_id: NodeId,
|
||||
pub listen_http_addr: String,
|
||||
pub http_port: i32,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
pub struct SCSafekeeperTimeline {
|
||||
// SC does not know the tenant id.
|
||||
pub timeline_id: TimelineId,
|
||||
pub peers: Vec<NodeId>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
pub struct SCSafekeeperTimelinesResponse {
|
||||
pub timelines: Vec<SCSafekeeperTimeline>,
|
||||
pub safekeeper_peers: Vec<TimelineSafekeeperPeer>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
pub struct SafekeeperTimeline {
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
pub peers: Vec<NodeId>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
pub struct SafekeeperTimelinesResponse {
|
||||
pub timelines: Vec<SafekeeperTimeline>,
|
||||
pub safekeeper_peers: Vec<TimelineSafekeeperPeer>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub struct SafekeeperSchedulingPolicyRequest {
|
||||
pub scheduling_policy: SkSchedulingPolicy,
|
||||
|
||||
@@ -384,7 +384,7 @@ pub struct SafekeepersInfo {
|
||||
pub safekeepers: Vec<SafekeeperInfo>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
pub struct SafekeeperInfo {
|
||||
pub id: NodeId,
|
||||
pub hostname: String,
|
||||
@@ -597,6 +597,9 @@ pub struct TenantConfigPatch {
|
||||
pub gc_period: FieldPatch<String>,
|
||||
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
|
||||
pub image_creation_threshold: FieldPatch<usize>,
|
||||
// HADRON
|
||||
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
|
||||
pub image_layer_force_creation_period: FieldPatch<String>,
|
||||
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
|
||||
pub pitr_interval: FieldPatch<String>,
|
||||
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
|
||||
@@ -700,6 +703,11 @@ pub struct TenantConfig {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub image_creation_threshold: Option<usize>,
|
||||
|
||||
// HADRON
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub image_layer_force_creation_period: Option<Duration>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub pitr_interval: Option<Duration>,
|
||||
@@ -798,6 +806,7 @@ impl TenantConfig {
|
||||
mut gc_horizon,
|
||||
mut gc_period,
|
||||
mut image_creation_threshold,
|
||||
mut image_layer_force_creation_period,
|
||||
mut pitr_interval,
|
||||
mut walreceiver_connect_timeout,
|
||||
mut lagging_wal_timeout,
|
||||
@@ -861,6 +870,11 @@ impl TenantConfig {
|
||||
patch
|
||||
.image_creation_threshold
|
||||
.apply(&mut image_creation_threshold);
|
||||
// HADRON
|
||||
patch
|
||||
.image_layer_force_creation_period
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut image_layer_force_creation_period);
|
||||
patch
|
||||
.pitr_interval
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
@@ -942,6 +956,7 @@ impl TenantConfig {
|
||||
gc_horizon,
|
||||
gc_period,
|
||||
image_creation_threshold,
|
||||
image_layer_force_creation_period,
|
||||
pitr_interval,
|
||||
walreceiver_connect_timeout,
|
||||
lagging_wal_timeout,
|
||||
@@ -1016,6 +1031,9 @@ impl TenantConfig {
|
||||
image_creation_threshold: self
|
||||
.image_creation_threshold
|
||||
.unwrap_or(global_conf.image_creation_threshold),
|
||||
image_layer_force_creation_period: self
|
||||
.image_layer_force_creation_period
|
||||
.or(global_conf.image_layer_force_creation_period),
|
||||
pitr_interval: self.pitr_interval.unwrap_or(global_conf.pitr_interval),
|
||||
walreceiver_connect_timeout: self
|
||||
.walreceiver_connect_timeout
|
||||
@@ -1604,6 +1622,9 @@ pub struct TimelineInfo {
|
||||
|
||||
/// Whether the timeline is invisible in synthetic size calculations.
|
||||
pub is_invisible: Option<bool>,
|
||||
// HADRON: the largest LSN below which all page updates have been included in the image layers.
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub image_consistent_lsn: Option<Lsn>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
|
||||
@@ -749,7 +749,18 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
|
||||
trace!("got query {query_string:?}");
|
||||
if let Err(e) = handler.process_query(self, query_string).await {
|
||||
match e {
|
||||
QueryError::Shutdown => return Ok(ProcessMsgResult::Break),
|
||||
err @ QueryError::Shutdown => {
|
||||
// Notify postgres of the connection shutdown at the libpq
|
||||
// protocol level. This avoids postgres having to tell apart
|
||||
// from an idle connection and a stale one, which is bug prone.
|
||||
let shutdown_error = short_error(&err);
|
||||
self.write_message_noflush(&BeMessage::ErrorResponse(
|
||||
&shutdown_error,
|
||||
Some(err.pg_error_code()),
|
||||
))?;
|
||||
|
||||
return Ok(ProcessMsgResult::Break);
|
||||
}
|
||||
QueryError::SimulatedConnectionError => {
|
||||
return Err(QueryError::SimulatedConnectionError);
|
||||
}
|
||||
|
||||
@@ -110,7 +110,6 @@ fn main() -> anyhow::Result<()> {
|
||||
.allowlist_type("XLogRecPtr")
|
||||
.allowlist_type("XLogSegNo")
|
||||
.allowlist_type("TimeLineID")
|
||||
.allowlist_type("TimestampTz")
|
||||
.allowlist_type("MultiXactId")
|
||||
.allowlist_type("MultiXactOffset")
|
||||
.allowlist_type("MultiXactStatus")
|
||||
|
||||
@@ -227,8 +227,7 @@ pub mod walrecord;
|
||||
// Export some widely used datatypes that are unlikely to change across Postgres versions
|
||||
pub use v14::bindings::{
|
||||
BlockNumber, CheckPoint, ControlFileData, MultiXactId, OffsetNumber, Oid, PageHeaderData,
|
||||
RepOriginId, TimeLineID, TimestampTz, TransactionId, XLogRecPtr, XLogRecord, XLogSegNo, uint32,
|
||||
uint64,
|
||||
RepOriginId, TimeLineID, TransactionId, XLogRecPtr, XLogRecord, XLogSegNo, uint32, uint64,
|
||||
};
|
||||
// Likewise for these, although the assumption that these don't change is a little more iffy.
|
||||
pub use v14::bindings::{MultiXactOffset, MultiXactStatus};
|
||||
|
||||
@@ -4,13 +4,14 @@
|
||||
//! TODO: Generate separate types for each supported PG version
|
||||
|
||||
use bytes::{Buf, Bytes};
|
||||
use postgres_ffi_types::TimestampTz;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::bin_ser::DeserializeError;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::{
|
||||
BLCKSZ, BlockNumber, MultiXactId, MultiXactOffset, MultiXactStatus, Oid, PgMajorVersion,
|
||||
RepOriginId, TimestampTz, TransactionId, XLOG_SIZE_OF_XLOG_RECORD, XLogRecord, pg_constants,
|
||||
RepOriginId, TransactionId, XLOG_SIZE_OF_XLOG_RECORD, XLogRecord, pg_constants,
|
||||
};
|
||||
|
||||
#[repr(C)]
|
||||
@@ -863,7 +864,8 @@ pub mod v17 {
|
||||
XlHeapDelete, XlHeapInsert, XlHeapLock, XlHeapMultiInsert, XlHeapUpdate, XlParameterChange,
|
||||
rm_neon,
|
||||
};
|
||||
pub use crate::{TimeLineID, TimestampTz};
|
||||
pub use crate::TimeLineID;
|
||||
pub use postgres_ffi_types::TimestampTz;
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug)]
|
||||
|
||||
@@ -9,10 +9,11 @@
|
||||
|
||||
use super::super::waldecoder::WalStreamDecoder;
|
||||
use super::bindings::{
|
||||
CheckPoint, ControlFileData, DBState_DB_SHUTDOWNED, FullTransactionId, TimeLineID, TimestampTz,
|
||||
CheckPoint, ControlFileData, DBState_DB_SHUTDOWNED, FullTransactionId, TimeLineID,
|
||||
XLogLongPageHeaderData, XLogPageHeaderData, XLogRecPtr, XLogRecord, XLogSegNo, XLOG_PAGE_MAGIC,
|
||||
MY_PGVERSION
|
||||
};
|
||||
use postgres_ffi_types::TimestampTz;
|
||||
use super::wal_generator::LogicalMessageGenerator;
|
||||
use crate::pg_constants;
|
||||
use crate::PG_TLI;
|
||||
|
||||
@@ -11,3 +11,4 @@ pub mod forknum;
|
||||
|
||||
pub type Oid = u32;
|
||||
pub type RepOriginId = u16;
|
||||
pub type TimestampTz = i64;
|
||||
|
||||
12
libs/proxy/json/Cargo.toml
Normal file
12
libs/proxy/json/Cargo.toml
Normal file
@@ -0,0 +1,12 @@
|
||||
[package]
|
||||
name = "json"
|
||||
version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
ryu = "1"
|
||||
itoa = "1"
|
||||
|
||||
[dev-dependencies]
|
||||
futures = "0.3"
|
||||
412
libs/proxy/json/src/lib.rs
Normal file
412
libs/proxy/json/src/lib.rs
Normal file
@@ -0,0 +1,412 @@
|
||||
//! A JSON serialization lib, designed for more flexibility than `serde_json` offers.
|
||||
//!
|
||||
//! Features:
|
||||
//!
|
||||
//! ## Dynamic construction
|
||||
//!
|
||||
//! Sometimes you have dynamic values you want to serialize, that are not already in a serde-aware model like a struct or a Vec etc.
|
||||
//! To achieve this with serde, you need to implement a lot of different traits on a lot of different new-types.
|
||||
//! Because of this, it's often easier to give-in and pull all the data into a serde-aware model (`serde_json::Value` or some intermediate struct),
|
||||
//! but that is often not very efficient.
|
||||
//!
|
||||
//! This crate allows full control over the JSON encoding without needing to implement any extra traits. Just call the
|
||||
//! relevant functions, and it will guarantee a correctly encoded JSON value.
|
||||
//!
|
||||
//! ## Async construction
|
||||
//!
|
||||
//! Similar to the above, sometimes the values arrive asynchronously. Often collecting those values in memory
|
||||
//! is more expensive than writing them as JSON, since the overheads of `Vec` and `String` is much higher, however
|
||||
//! there are exceptions.
|
||||
//!
|
||||
//! Serializing to JSON all in one go is also more CPU intensive and can cause lag spikes,
|
||||
//! whereas serializing values incrementally spreads out the CPU load and reduces lag.
|
||||
//!
|
||||
//! ## Examples
|
||||
//!
|
||||
//! To represent the following JSON as a compact string
|
||||
//!
|
||||
//! ```json
|
||||
//! {
|
||||
//! "results": {
|
||||
//! "rows": [
|
||||
//! {
|
||||
//! "id": 1,
|
||||
//! "value": null
|
||||
//! },
|
||||
//! {
|
||||
//! "id": 2,
|
||||
//! "value": "hello"
|
||||
//! }
|
||||
//! ]
|
||||
//! }
|
||||
//! }
|
||||
//! ```
|
||||
//!
|
||||
//! We can use the following code:
|
||||
//!
|
||||
//! ```
|
||||
//! // create the outer object
|
||||
//! let s = json::value_to_string!(|v| json::value_as_object!(|v| {
|
||||
//! // create an entry with key "results" and start an object value associated with it.
|
||||
//! let results = v.key("results");
|
||||
//! json::value_as_object!(|results| {
|
||||
//! // create an entry with key "rows" and start an list value associated with it.
|
||||
//! let rows = results.key("rows");
|
||||
//! json::value_as_list!(|rows| {
|
||||
//! // create a list entry and start an object value associated with it.
|
||||
//! let row = rows.entry();
|
||||
//! json::value_as_object!(|row| {
|
||||
//! // add entry "id": 1
|
||||
//! row.entry("id", 1);
|
||||
//! // add entry "value": null
|
||||
//! row.entry("value", json::Null);
|
||||
//! });
|
||||
//!
|
||||
//! // create a list entry and start an object value associated with it.
|
||||
//! let row = rows.entry();
|
||||
//! json::value_as_object!(|row| {
|
||||
//! // add entry "id": 2
|
||||
//! row.entry("id", 2);
|
||||
//! // add entry "value": "hello"
|
||||
//! row.entry("value", "hello");
|
||||
//! });
|
||||
//! });
|
||||
//! });
|
||||
//! }));
|
||||
//!
|
||||
//! assert_eq!(s, r#"{"results":{"rows":[{"id":1,"value":null},{"id":2,"value":"hello"}]}}"#);
|
||||
//! ```
|
||||
|
||||
mod macros;
|
||||
mod str;
|
||||
mod value;
|
||||
|
||||
pub use value::{Null, ValueEncoder};
|
||||
|
||||
#[must_use]
|
||||
/// Serialize a single json value.
|
||||
pub struct ValueSer<'buf> {
|
||||
buf: &'buf mut Vec<u8>,
|
||||
start: usize,
|
||||
}
|
||||
|
||||
impl<'buf> ValueSer<'buf> {
|
||||
/// Create a new json value serializer.
|
||||
pub fn new(buf: &'buf mut Vec<u8>) -> Self {
|
||||
Self { buf, start: 0 }
|
||||
}
|
||||
|
||||
/// Borrow the underlying buffer
|
||||
pub fn as_buffer(&self) -> &[u8] {
|
||||
self.buf
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn value(self, e: impl ValueEncoder) {
|
||||
e.encode(self);
|
||||
}
|
||||
|
||||
/// Write raw bytes to the buf. This must be already JSON encoded.
|
||||
#[inline]
|
||||
pub fn write_raw_json(self, data: &[u8]) {
|
||||
self.buf.extend_from_slice(data);
|
||||
self.finish();
|
||||
}
|
||||
|
||||
/// Start a new object serializer.
|
||||
#[inline]
|
||||
pub fn object(self) -> ObjectSer<'buf> {
|
||||
ObjectSer::new(self)
|
||||
}
|
||||
|
||||
/// Start a new list serializer.
|
||||
#[inline]
|
||||
pub fn list(self) -> ListSer<'buf> {
|
||||
ListSer::new(self)
|
||||
}
|
||||
|
||||
/// Finish the value ser.
|
||||
#[inline]
|
||||
fn finish(self) {
|
||||
// don't trigger the drop handler which triggers a rollback.
|
||||
// this won't cause memory leaks because `ValueSet` owns no allocations.
|
||||
std::mem::forget(self);
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ValueSer<'_> {
|
||||
fn drop(&mut self) {
|
||||
self.buf.truncate(self.start);
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
/// Serialize a json object.
|
||||
pub struct ObjectSer<'buf> {
|
||||
value: ValueSer<'buf>,
|
||||
start: usize,
|
||||
}
|
||||
|
||||
impl<'buf> ObjectSer<'buf> {
|
||||
/// Start a new object serializer.
|
||||
#[inline]
|
||||
pub fn new(value: ValueSer<'buf>) -> Self {
|
||||
value.buf.push(b'{');
|
||||
let start = value.buf.len();
|
||||
Self { value, start }
|
||||
}
|
||||
|
||||
/// Borrow the underlying buffer
|
||||
pub fn as_buffer(&self) -> &[u8] {
|
||||
self.value.as_buffer()
|
||||
}
|
||||
|
||||
/// Start a new object entry with the given string key, returning a [`ValueSer`] for the associated value.
|
||||
#[inline]
|
||||
pub fn key(&mut self, key: impl KeyEncoder) -> ValueSer<'_> {
|
||||
key.write_key(self)
|
||||
}
|
||||
|
||||
/// Write an entry (key-value pair) to the object.
|
||||
#[inline]
|
||||
pub fn entry(&mut self, key: impl KeyEncoder, val: impl ValueEncoder) {
|
||||
self.key(key).value(val);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn entry_inner(&mut self, f: impl FnOnce(&mut Vec<u8>)) -> ValueSer<'_> {
|
||||
// track before the separator so we the value is rolled back it also removes the separator.
|
||||
let start = self.value.buf.len();
|
||||
|
||||
// push separator if necessary
|
||||
if self.value.buf.len() > self.start {
|
||||
self.value.buf.push(b',');
|
||||
}
|
||||
// push key
|
||||
f(self.value.buf);
|
||||
// push value separator
|
||||
self.value.buf.push(b':');
|
||||
|
||||
// return value writer.
|
||||
ValueSer {
|
||||
buf: self.value.buf,
|
||||
start,
|
||||
}
|
||||
}
|
||||
|
||||
/// Reset the buffer back to before this object was started.
|
||||
#[inline]
|
||||
pub fn rollback(self) -> ValueSer<'buf> {
|
||||
// Do not fully reset the value, only reset it to before the `{`.
|
||||
// This ensures any `,` before this value are not clobbered.
|
||||
self.value.buf.truncate(self.start - 1);
|
||||
self.value
|
||||
}
|
||||
|
||||
/// Finish the object ser.
|
||||
#[inline]
|
||||
pub fn finish(self) {
|
||||
self.value.buf.push(b'}');
|
||||
self.value.finish();
|
||||
}
|
||||
}
|
||||
|
||||
pub trait KeyEncoder {
|
||||
fn write_key<'a>(self, obj: &'a mut ObjectSer) -> ValueSer<'a>;
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
/// Serialize a json object.
|
||||
pub struct ListSer<'buf> {
|
||||
value: ValueSer<'buf>,
|
||||
start: usize,
|
||||
}
|
||||
|
||||
impl<'buf> ListSer<'buf> {
|
||||
/// Start a new list serializer.
|
||||
#[inline]
|
||||
pub fn new(value: ValueSer<'buf>) -> Self {
|
||||
value.buf.push(b'[');
|
||||
let start = value.buf.len();
|
||||
Self { value, start }
|
||||
}
|
||||
|
||||
/// Borrow the underlying buffer
|
||||
pub fn as_buffer(&self) -> &[u8] {
|
||||
self.value.as_buffer()
|
||||
}
|
||||
|
||||
/// Write an value to the list.
|
||||
#[inline]
|
||||
pub fn push(&mut self, val: impl ValueEncoder) {
|
||||
self.entry().value(val);
|
||||
}
|
||||
|
||||
/// Start a new value entry in this list.
|
||||
#[inline]
|
||||
pub fn entry(&mut self) -> ValueSer<'_> {
|
||||
// track before the separator so we the value is rolled back it also removes the separator.
|
||||
let start = self.value.buf.len();
|
||||
|
||||
// push separator if necessary
|
||||
if self.value.buf.len() > self.start {
|
||||
self.value.buf.push(b',');
|
||||
}
|
||||
|
||||
// return value writer.
|
||||
ValueSer {
|
||||
buf: self.value.buf,
|
||||
start,
|
||||
}
|
||||
}
|
||||
|
||||
/// Reset the buffer back to before this object was started.
|
||||
#[inline]
|
||||
pub fn rollback(self) -> ValueSer<'buf> {
|
||||
// Do not fully reset the value, only reset it to before the `[`.
|
||||
// This ensures any `,` before this value are not clobbered.
|
||||
self.value.buf.truncate(self.start - 1);
|
||||
self.value
|
||||
}
|
||||
|
||||
/// Finish the object ser.
|
||||
#[inline]
|
||||
pub fn finish(self) {
|
||||
self.value.buf.push(b']');
|
||||
self.value.finish();
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::{Null, ValueSer};
|
||||
|
||||
#[test]
|
||||
fn object() {
|
||||
let mut buf = vec![];
|
||||
let mut object = ValueSer::new(&mut buf).object();
|
||||
object.entry("foo", "bar");
|
||||
object.entry("baz", Null);
|
||||
object.finish();
|
||||
|
||||
assert_eq!(buf, br#"{"foo":"bar","baz":null}"#);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn list() {
|
||||
let mut buf = vec![];
|
||||
let mut list = ValueSer::new(&mut buf).list();
|
||||
list.entry().value("bar");
|
||||
list.entry().value(Null);
|
||||
list.finish();
|
||||
|
||||
assert_eq!(buf, br#"["bar",null]"#);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn object_macro() {
|
||||
let res = crate::value_to_string!(|obj| {
|
||||
crate::value_as_object!(|obj| {
|
||||
obj.entry("foo", "bar");
|
||||
obj.entry("baz", Null);
|
||||
})
|
||||
});
|
||||
|
||||
assert_eq!(res, r#"{"foo":"bar","baz":null}"#);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn list_macro() {
|
||||
let res = crate::value_to_string!(|list| {
|
||||
crate::value_as_list!(|list| {
|
||||
list.entry().value("bar");
|
||||
list.entry().value(Null);
|
||||
})
|
||||
});
|
||||
|
||||
assert_eq!(res, r#"["bar",null]"#);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rollback_on_drop() {
|
||||
let res = crate::value_to_string!(|list| {
|
||||
crate::value_as_list!(|list| {
|
||||
list.entry().value("bar");
|
||||
|
||||
'cancel: {
|
||||
let nested_list = list.entry();
|
||||
crate::value_as_list!(|nested_list| {
|
||||
nested_list.entry().value(1);
|
||||
|
||||
assert_eq!(nested_list.as_buffer(), br#"["bar",[1"#);
|
||||
if true {
|
||||
break 'cancel;
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
assert_eq!(list.as_buffer(), br#"["bar""#);
|
||||
|
||||
list.entry().value(Null);
|
||||
})
|
||||
});
|
||||
|
||||
assert_eq!(res, r#"["bar",null]"#);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rollback_object() {
|
||||
let res = crate::value_to_string!(|obj| {
|
||||
crate::value_as_object!(|obj| {
|
||||
let entry = obj.key("1");
|
||||
entry.value(1_i32);
|
||||
|
||||
let entry = obj.key("2");
|
||||
let entry = {
|
||||
let mut nested_obj = entry.object();
|
||||
nested_obj.entry("foo", "bar");
|
||||
nested_obj.rollback()
|
||||
};
|
||||
|
||||
entry.value(2_i32);
|
||||
})
|
||||
});
|
||||
|
||||
assert_eq!(res, r#"{"1":1,"2":2}"#);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rollback_list() {
|
||||
let res = crate::value_to_string!(|list| {
|
||||
crate::value_as_list!(|list| {
|
||||
let entry = list.entry();
|
||||
entry.value(1_i32);
|
||||
|
||||
let entry = list.entry();
|
||||
let entry = {
|
||||
let mut nested_list = entry.list();
|
||||
nested_list.push("foo");
|
||||
nested_list.rollback()
|
||||
};
|
||||
|
||||
entry.value(2_i32);
|
||||
})
|
||||
});
|
||||
|
||||
assert_eq!(res, r#"[1,2]"#);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn string_escaping() {
|
||||
let mut buf = vec![];
|
||||
let mut object = ValueSer::new(&mut buf).object();
|
||||
|
||||
let key = "hello";
|
||||
let value = "\n world";
|
||||
|
||||
object.entry(format_args!("{key:?}"), value);
|
||||
object.finish();
|
||||
|
||||
assert_eq!(buf, br#"{"\"hello\"":"\n world"}"#);
|
||||
}
|
||||
}
|
||||
86
libs/proxy/json/src/macros.rs
Normal file
86
libs/proxy/json/src/macros.rs
Normal file
@@ -0,0 +1,86 @@
|
||||
//! # Examples
|
||||
//!
|
||||
//! ```
|
||||
//! use futures::{StreamExt, TryStream, TryStreamExt};
|
||||
//!
|
||||
//! async fn stream_to_json_list<S, T, E>(mut s: S) -> Result<String, E>
|
||||
//! where
|
||||
//! S: TryStream<Ok = T, Error = E> + Unpin,
|
||||
//! T: json::ValueEncoder
|
||||
//! {
|
||||
//! Ok(json::value_to_string!(|val| json::value_as_list!(|val| {
|
||||
//! // note how we can use `.await` and `?` in here.
|
||||
//! while let Some(value) = s.try_next().await? {
|
||||
//! val.push(value);
|
||||
//! }
|
||||
//! })))
|
||||
//! }
|
||||
//!
|
||||
//! let stream = futures::stream::iter([1, 2, 3]).map(Ok::<i32, ()>);
|
||||
//! let json_string = futures::executor::block_on(stream_to_json_list(stream)).unwrap();
|
||||
//! assert_eq!(json_string, "[1,2,3]");
|
||||
//! ```
|
||||
|
||||
/// A helper to create a new JSON vec.
|
||||
///
|
||||
/// Implemented as a macro to preserve all control flow.
|
||||
#[macro_export]
|
||||
macro_rules! value_to_vec {
|
||||
(|$val:ident| $body:expr) => {{
|
||||
let mut buf = vec![];
|
||||
let $val = $crate::ValueSer::new(&mut buf);
|
||||
let _: () = $body;
|
||||
buf
|
||||
}};
|
||||
}
|
||||
|
||||
/// A helper to create a new JSON string.
|
||||
///
|
||||
/// Implemented as a macro to preserve all control flow.
|
||||
#[macro_export]
|
||||
macro_rules! value_to_string {
|
||||
(|$val:ident| $body:expr) => {{
|
||||
::std::string::String::from_utf8($crate::value_to_vec!(|$val| $body))
|
||||
.expect("json should be valid utf8")
|
||||
}};
|
||||
}
|
||||
|
||||
/// A helper that ensures the [`ObjectSer::finish`](crate::ObjectSer::finish) method is called on completion.
|
||||
///
|
||||
/// Consumes `$val` and assigns it as an [`ObjectSer`](crate::ObjectSer) serializer.
|
||||
/// The serializer is only 'finished' if the body completes.
|
||||
/// The serializer is rolled back if `break`/`return` escapes the body.
|
||||
///
|
||||
/// Implemented as a macro to preserve all control flow.
|
||||
#[macro_export]
|
||||
macro_rules! value_as_object {
|
||||
(|$val:ident| $body:expr) => {{
|
||||
let mut obj = $crate::ObjectSer::new($val);
|
||||
|
||||
let $val = &mut obj;
|
||||
let res = $body;
|
||||
|
||||
obj.finish();
|
||||
res
|
||||
}};
|
||||
}
|
||||
|
||||
/// A helper that ensures the [`ListSer::finish`](crate::ListSer::finish) method is called on completion.
|
||||
///
|
||||
/// Consumes `$val` and assigns it as an [`ListSer`](crate::ListSer) serializer.
|
||||
/// The serializer is only 'finished' if the body completes.
|
||||
/// The serializer is rolled back if `break`/`return` escapes the body.
|
||||
///
|
||||
/// Implemented as a macro to preserve all control flow.
|
||||
#[macro_export]
|
||||
macro_rules! value_as_list {
|
||||
(|$val:ident| $body:expr) => {{
|
||||
let mut list = $crate::ListSer::new($val);
|
||||
|
||||
let $val = &mut list;
|
||||
let res = $body;
|
||||
|
||||
list.finish();
|
||||
res
|
||||
}};
|
||||
}
|
||||
166
libs/proxy/json/src/str.rs
Normal file
166
libs/proxy/json/src/str.rs
Normal file
@@ -0,0 +1,166 @@
|
||||
//! Helpers for serializing escaped strings.
|
||||
//!
|
||||
//! ## License
|
||||
//!
|
||||
//! <https://github.com/serde-rs/json/blob/c1826ebcccb1a520389c6b78ad3da15db279220d/src/ser.rs#L1514-L1552>
|
||||
//! <https://github.com/serde-rs/json/blob/c1826ebcccb1a520389c6b78ad3da15db279220d/src/ser.rs#L2081-L2157>
|
||||
//! Licensed by David Tolnay under MIT or Apache-2.0.
|
||||
//!
|
||||
//! With modifications by Conrad Ludgate on behalf of Databricks.
|
||||
|
||||
use std::fmt::{self, Write};
|
||||
|
||||
/// Represents a character escape code in a type-safe manner.
|
||||
pub enum CharEscape {
|
||||
/// An escaped quote `"`
|
||||
Quote,
|
||||
/// An escaped reverse solidus `\`
|
||||
ReverseSolidus,
|
||||
// /// An escaped solidus `/`
|
||||
// Solidus,
|
||||
/// An escaped backspace character (usually escaped as `\b`)
|
||||
Backspace,
|
||||
/// An escaped form feed character (usually escaped as `\f`)
|
||||
FormFeed,
|
||||
/// An escaped line feed character (usually escaped as `\n`)
|
||||
LineFeed,
|
||||
/// An escaped carriage return character (usually escaped as `\r`)
|
||||
CarriageReturn,
|
||||
/// An escaped tab character (usually escaped as `\t`)
|
||||
Tab,
|
||||
/// An escaped ASCII plane control character (usually escaped as
|
||||
/// `\u00XX` where `XX` are two hex characters)
|
||||
AsciiControl(u8),
|
||||
}
|
||||
|
||||
impl CharEscape {
|
||||
#[inline]
|
||||
fn from_escape_table(escape: u8, byte: u8) -> CharEscape {
|
||||
match escape {
|
||||
self::BB => CharEscape::Backspace,
|
||||
self::TT => CharEscape::Tab,
|
||||
self::NN => CharEscape::LineFeed,
|
||||
self::FF => CharEscape::FormFeed,
|
||||
self::RR => CharEscape::CarriageReturn,
|
||||
self::QU => CharEscape::Quote,
|
||||
self::BS => CharEscape::ReverseSolidus,
|
||||
self::UU => CharEscape::AsciiControl(byte),
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn format_escaped_str(writer: &mut Vec<u8>, value: &str) {
|
||||
writer.reserve(2 + value.len());
|
||||
|
||||
writer.push(b'"');
|
||||
|
||||
let rest = format_escaped_str_contents(writer, value);
|
||||
writer.extend_from_slice(rest);
|
||||
|
||||
writer.push(b'"');
|
||||
}
|
||||
|
||||
pub(crate) fn format_escaped_fmt(writer: &mut Vec<u8>, args: fmt::Arguments) {
|
||||
writer.push(b'"');
|
||||
|
||||
Collect { buf: writer }
|
||||
.write_fmt(args)
|
||||
.expect("formatting should not error");
|
||||
|
||||
writer.push(b'"');
|
||||
}
|
||||
|
||||
struct Collect<'buf> {
|
||||
buf: &'buf mut Vec<u8>,
|
||||
}
|
||||
|
||||
impl fmt::Write for Collect<'_> {
|
||||
fn write_str(&mut self, s: &str) -> fmt::Result {
|
||||
let last = format_escaped_str_contents(self.buf, s);
|
||||
self.buf.extend(last);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// writes any escape sequences, and returns the suffix still needed to be written.
|
||||
fn format_escaped_str_contents<'a>(writer: &mut Vec<u8>, value: &'a str) -> &'a [u8] {
|
||||
let bytes = value.as_bytes();
|
||||
|
||||
let mut start = 0;
|
||||
|
||||
for (i, &byte) in bytes.iter().enumerate() {
|
||||
let escape = ESCAPE[byte as usize];
|
||||
if escape == 0 {
|
||||
continue;
|
||||
}
|
||||
|
||||
writer.extend_from_slice(&bytes[start..i]);
|
||||
|
||||
let char_escape = CharEscape::from_escape_table(escape, byte);
|
||||
write_char_escape(writer, char_escape);
|
||||
|
||||
start = i + 1;
|
||||
}
|
||||
|
||||
&bytes[start..]
|
||||
}
|
||||
|
||||
const BB: u8 = b'b'; // \x08
|
||||
const TT: u8 = b't'; // \x09
|
||||
const NN: u8 = b'n'; // \x0A
|
||||
const FF: u8 = b'f'; // \x0C
|
||||
const RR: u8 = b'r'; // \x0D
|
||||
const QU: u8 = b'"'; // \x22
|
||||
const BS: u8 = b'\\'; // \x5C
|
||||
const UU: u8 = b'u'; // \x00...\x1F except the ones above
|
||||
const __: u8 = 0;
|
||||
|
||||
// Lookup table of escape sequences. A value of b'x' at index i means that byte
|
||||
// i is escaped as "\x" in JSON. A value of 0 means that byte i is not escaped.
|
||||
static ESCAPE: [u8; 256] = [
|
||||
// 1 2 3 4 5 6 7 8 9 A B C D E F
|
||||
UU, UU, UU, UU, UU, UU, UU, UU, BB, TT, NN, UU, FF, RR, UU, UU, // 0
|
||||
UU, UU, UU, UU, UU, UU, UU, UU, UU, UU, UU, UU, UU, UU, UU, UU, // 1
|
||||
__, __, QU, __, __, __, __, __, __, __, __, __, __, __, __, __, // 2
|
||||
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // 3
|
||||
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // 4
|
||||
__, __, __, __, __, __, __, __, __, __, __, __, BS, __, __, __, // 5
|
||||
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // 6
|
||||
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // 7
|
||||
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // 8
|
||||
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // 9
|
||||
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // A
|
||||
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // B
|
||||
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // C
|
||||
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // D
|
||||
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // E
|
||||
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // F
|
||||
];
|
||||
|
||||
fn write_char_escape(writer: &mut Vec<u8>, char_escape: CharEscape) {
|
||||
let s = match char_escape {
|
||||
CharEscape::Quote => b"\\\"",
|
||||
CharEscape::ReverseSolidus => b"\\\\",
|
||||
// CharEscape::Solidus => b"\\/",
|
||||
CharEscape::Backspace => b"\\b",
|
||||
CharEscape::FormFeed => b"\\f",
|
||||
CharEscape::LineFeed => b"\\n",
|
||||
CharEscape::CarriageReturn => b"\\r",
|
||||
CharEscape::Tab => b"\\t",
|
||||
CharEscape::AsciiControl(byte) => {
|
||||
static HEX_DIGITS: [u8; 16] = *b"0123456789abcdef";
|
||||
let bytes = &[
|
||||
b'\\',
|
||||
b'u',
|
||||
b'0',
|
||||
b'0',
|
||||
HEX_DIGITS[(byte >> 4) as usize],
|
||||
HEX_DIGITS[(byte & 0xF) as usize],
|
||||
];
|
||||
return writer.extend_from_slice(bytes);
|
||||
}
|
||||
};
|
||||
|
||||
writer.extend_from_slice(s);
|
||||
}
|
||||
168
libs/proxy/json/src/value.rs
Normal file
168
libs/proxy/json/src/value.rs
Normal file
@@ -0,0 +1,168 @@
|
||||
use core::fmt;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
|
||||
use crate::str::{format_escaped_fmt, format_escaped_str};
|
||||
use crate::{KeyEncoder, ObjectSer, ValueSer, value_as_list, value_as_object};
|
||||
|
||||
/// Write a value to the underlying json representation.
|
||||
pub trait ValueEncoder {
|
||||
fn encode(self, v: ValueSer<'_>);
|
||||
}
|
||||
|
||||
pub(crate) fn write_int(x: impl itoa::Integer, b: &mut Vec<u8>) {
|
||||
b.extend_from_slice(itoa::Buffer::new().format(x).as_bytes());
|
||||
}
|
||||
|
||||
pub(crate) fn write_float(x: impl ryu::Float, b: &mut Vec<u8>) {
|
||||
b.extend_from_slice(ryu::Buffer::new().format(x).as_bytes());
|
||||
}
|
||||
|
||||
impl<T: Copy + ValueEncoder> ValueEncoder for &T {
|
||||
#[inline]
|
||||
fn encode(self, v: ValueSer<'_>) {
|
||||
T::encode(*self, v);
|
||||
}
|
||||
}
|
||||
|
||||
impl ValueEncoder for &str {
|
||||
#[inline]
|
||||
fn encode(self, v: ValueSer<'_>) {
|
||||
format_escaped_str(v.buf, self);
|
||||
v.finish();
|
||||
}
|
||||
}
|
||||
|
||||
impl ValueEncoder for fmt::Arguments<'_> {
|
||||
#[inline]
|
||||
fn encode(self, v: ValueSer<'_>) {
|
||||
if let Some(s) = self.as_str() {
|
||||
format_escaped_str(v.buf, s);
|
||||
} else {
|
||||
format_escaped_fmt(v.buf, self);
|
||||
}
|
||||
v.finish();
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! int {
|
||||
[$($t:ty),*] => {
|
||||
$(
|
||||
impl ValueEncoder for $t {
|
||||
#[inline]
|
||||
fn encode(self, v: ValueSer<'_>) {
|
||||
write_int(self, v.buf);
|
||||
v.finish();
|
||||
}
|
||||
}
|
||||
)*
|
||||
};
|
||||
}
|
||||
|
||||
int![u8, u16, u32, u64, usize, u128];
|
||||
int![i8, i16, i32, i64, isize, i128];
|
||||
|
||||
macro_rules! float {
|
||||
[$($t:ty),*] => {
|
||||
$(
|
||||
impl ValueEncoder for $t {
|
||||
#[inline]
|
||||
fn encode(self, v: ValueSer<'_>) {
|
||||
write_float(self, v.buf);
|
||||
v.finish();
|
||||
}
|
||||
}
|
||||
)*
|
||||
};
|
||||
}
|
||||
|
||||
float![f32, f64];
|
||||
|
||||
impl ValueEncoder for bool {
|
||||
#[inline]
|
||||
fn encode(self, v: ValueSer<'_>) {
|
||||
v.write_raw_json(if self { b"true" } else { b"false" });
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: ValueEncoder> ValueEncoder for Option<T> {
|
||||
#[inline]
|
||||
fn encode(self, v: ValueSer<'_>) {
|
||||
match self {
|
||||
Some(value) => value.encode(v),
|
||||
None => Null.encode(v),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl KeyEncoder for &str {
|
||||
#[inline]
|
||||
fn write_key<'a>(self, obj: &'a mut ObjectSer) -> ValueSer<'a> {
|
||||
let obj = &mut *obj;
|
||||
obj.entry_inner(|b| format_escaped_str(b, self))
|
||||
}
|
||||
}
|
||||
|
||||
impl KeyEncoder for fmt::Arguments<'_> {
|
||||
#[inline]
|
||||
fn write_key<'a>(self, obj: &'a mut ObjectSer) -> ValueSer<'a> {
|
||||
if let Some(key) = self.as_str() {
|
||||
obj.entry_inner(|b| format_escaped_str(b, key))
|
||||
} else {
|
||||
obj.entry_inner(|b| format_escaped_fmt(b, self))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Represents the JSON null value.
|
||||
pub struct Null;
|
||||
|
||||
impl ValueEncoder for Null {
|
||||
#[inline]
|
||||
fn encode(self, v: ValueSer<'_>) {
|
||||
v.write_raw_json(b"null");
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: ValueEncoder> ValueEncoder for Vec<T> {
|
||||
#[inline]
|
||||
fn encode(self, v: ValueSer<'_>) {
|
||||
value_as_list!(|v| {
|
||||
for t in self {
|
||||
v.entry().value(t);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Copy + ValueEncoder> ValueEncoder for &[T] {
|
||||
#[inline]
|
||||
fn encode(self, v: ValueSer<'_>) {
|
||||
value_as_list!(|v| {
|
||||
for t in self {
|
||||
v.entry().value(t);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
impl<K: KeyEncoder, V: ValueEncoder, S> ValueEncoder for HashMap<K, V, S> {
|
||||
#[inline]
|
||||
fn encode(self, o: ValueSer<'_>) {
|
||||
value_as_object!(|o| {
|
||||
for (k, v) in self {
|
||||
o.entry(k, v);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
impl<K: KeyEncoder, V: ValueEncoder> ValueEncoder for BTreeMap<K, V> {
|
||||
#[inline]
|
||||
fn encode(self, o: ValueSer<'_>) {
|
||||
value_as_object!(|o| {
|
||||
for (k, v) in self {
|
||||
o.entry(k, v);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -13,6 +13,7 @@ aws-smithy-async.workspace = true
|
||||
aws-smithy-types.workspace = true
|
||||
aws-config.workspace = true
|
||||
aws-sdk-s3.workspace = true
|
||||
base64.workspace = true
|
||||
bytes.workspace = true
|
||||
camino = { workspace = true, features = ["serde1"] }
|
||||
humantime-serde.workspace = true
|
||||
@@ -41,6 +42,9 @@ http-body-util.workspace = true
|
||||
itertools.workspace = true
|
||||
sync_wrapper = { workspace = true, features = ["futures"] }
|
||||
|
||||
byteorder = "1.4"
|
||||
rand = "0.8.5"
|
||||
|
||||
[dev-dependencies]
|
||||
camino-tempfile.workspace = true
|
||||
test-context.workspace = true
|
||||
|
||||
@@ -14,17 +14,25 @@ use anyhow::{Context, Result, anyhow};
|
||||
use azure_core::request_options::{IfMatchCondition, MaxResults, Metadata, Range};
|
||||
use azure_core::{Continuable, HttpClient, RetryOptions, TransportOptions};
|
||||
use azure_storage::StorageCredentials;
|
||||
use azure_storage_blobs::blob::operations::GetBlobBuilder;
|
||||
use azure_storage_blobs::blob::BlobBlockType;
|
||||
use azure_storage_blobs::blob::BlockList;
|
||||
use azure_storage_blobs::blob::{Blob, CopyStatus};
|
||||
use azure_storage_blobs::container::operations::ListBlobsBuilder;
|
||||
use azure_storage_blobs::prelude::{ClientBuilder, ContainerClient};
|
||||
use azure_storage_blobs::prelude::ClientBuilder;
|
||||
use azure_storage_blobs::{blob::operations::GetBlobBuilder, prelude::ContainerClient};
|
||||
use base64::{Engine as _, engine::general_purpose::URL_SAFE};
|
||||
use byteorder::{BigEndian, ByteOrder};
|
||||
use bytes::Bytes;
|
||||
use camino::Utf8Path;
|
||||
use futures::FutureExt;
|
||||
use futures::future::Either;
|
||||
use futures::stream::Stream;
|
||||
use futures_util::{StreamExt, TryStreamExt};
|
||||
use http_types::{StatusCode, Url};
|
||||
use scopeguard::ScopeGuard;
|
||||
use tokio::fs::File;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::io::AsyncSeekExt;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::debug;
|
||||
use utils::backoff;
|
||||
@@ -51,6 +59,9 @@ pub struct AzureBlobStorage {
|
||||
|
||||
// Alternative timeout used for metadata objects which are expected to be small
|
||||
pub small_timeout: Duration,
|
||||
/* BEGIN_HADRON */
|
||||
pub put_block_size_mb: Option<usize>,
|
||||
/* END_HADRON */
|
||||
}
|
||||
|
||||
impl AzureBlobStorage {
|
||||
@@ -107,6 +118,9 @@ impl AzureBlobStorage {
|
||||
concurrency_limiter: ConcurrencyLimiter::new(azure_config.concurrency_limit.get()),
|
||||
timeout,
|
||||
small_timeout,
|
||||
/* BEGIN_HADRON */
|
||||
put_block_size_mb: azure_config.put_block_size_mb,
|
||||
/* END_HADRON */
|
||||
})
|
||||
}
|
||||
|
||||
@@ -583,31 +597,137 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
|
||||
let started_at = start_measuring_requests(kind);
|
||||
|
||||
let op = async {
|
||||
let mut metadata_map = metadata.unwrap_or([].into());
|
||||
let timeline_file_path = metadata_map.0.remove("databricks_azure_put_block");
|
||||
|
||||
/* BEGIN_HADRON */
|
||||
let op = async move {
|
||||
let blob_client = self.client.blob_client(self.relative_path_to_name(to));
|
||||
let put_block_size = self.put_block_size_mb.unwrap_or(0) * 1024 * 1024;
|
||||
if timeline_file_path.is_none() || put_block_size == 0 {
|
||||
// Use put_block_blob directly.
|
||||
let from: Pin<
|
||||
Box<dyn Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static>,
|
||||
> = Box::pin(from);
|
||||
let from = NonSeekableStream::new(from, data_size_bytes);
|
||||
let body = azure_core::Body::SeekableStream(Box::new(from));
|
||||
|
||||
let from: Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static>> =
|
||||
Box::pin(from);
|
||||
let mut builder = blob_client.put_block_blob(body);
|
||||
if !metadata_map.0.is_empty() {
|
||||
builder = builder.metadata(to_azure_metadata(metadata_map));
|
||||
}
|
||||
let fut = builder.into_future();
|
||||
let fut = tokio::time::timeout(self.timeout, fut);
|
||||
let result = fut.await;
|
||||
match result {
|
||||
Ok(Ok(_response)) => return Ok(()),
|
||||
Ok(Err(azure)) => return Err(azure.into()),
|
||||
Err(_timeout) => return Err(TimeoutOrCancel::Timeout.into()),
|
||||
};
|
||||
}
|
||||
// Upload chunks concurrently using Put Block.
|
||||
// Each PutBlock uploads put_block_size bytes of the file.
|
||||
let mut upload_futures: Vec<tokio::task::JoinHandle<Result<(), azure_core::Error>>> =
|
||||
vec![];
|
||||
let mut block_list = BlockList::default();
|
||||
let mut start_bytes = 0u64;
|
||||
let mut remaining_bytes = data_size_bytes;
|
||||
let mut block_list_count = 0;
|
||||
|
||||
let from = NonSeekableStream::new(from, data_size_bytes);
|
||||
while remaining_bytes > 0 {
|
||||
let block_size = std::cmp::min(remaining_bytes, put_block_size);
|
||||
let end_bytes = start_bytes + block_size as u64;
|
||||
let block_id = block_list_count;
|
||||
let timeout = self.timeout;
|
||||
let blob_client = blob_client.clone();
|
||||
let timeline_file = timeline_file_path.clone().unwrap().clone();
|
||||
|
||||
let body = azure_core::Body::SeekableStream(Box::new(from));
|
||||
let mut encoded_block_id = [0u8; 8];
|
||||
BigEndian::write_u64(&mut encoded_block_id, block_id);
|
||||
URL_SAFE.encode(encoded_block_id);
|
||||
|
||||
let mut builder = blob_client.put_block_blob(body);
|
||||
// Put one block.
|
||||
let part_fut = async move {
|
||||
let mut file = File::open(Utf8Path::new(&timeline_file.clone())).await?;
|
||||
file.seek(io::SeekFrom::Start(start_bytes)).await?;
|
||||
let limited_reader = file.take(block_size as u64);
|
||||
let file_chunk_stream =
|
||||
tokio_util::io::ReaderStream::with_capacity(limited_reader, 1024 * 1024);
|
||||
let file_chunk_stream_pin: Pin<
|
||||
Box<dyn Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static>,
|
||||
> = Box::pin(file_chunk_stream);
|
||||
let stream_wrapper = NonSeekableStream::new(file_chunk_stream_pin, block_size);
|
||||
let body = azure_core::Body::SeekableStream(Box::new(stream_wrapper));
|
||||
// Azure put block takes URL-encoded block ids and all blocks must have the same byte length.
|
||||
// https://learn.microsoft.com/en-us/rest/api/storageservices/put-block?tabs=microsoft-entra-id#uri-parameters
|
||||
let builder = blob_client.put_block(encoded_block_id.to_vec(), body);
|
||||
let fut = builder.into_future();
|
||||
let fut = tokio::time::timeout(timeout, fut);
|
||||
let result = fut.await;
|
||||
tracing::debug!(
|
||||
"azure put block id-{} size {} start {} end {} file {} response {:#?}",
|
||||
block_id,
|
||||
block_size,
|
||||
start_bytes,
|
||||
end_bytes,
|
||||
timeline_file,
|
||||
result
|
||||
);
|
||||
match result {
|
||||
Ok(Ok(_response)) => Ok(()),
|
||||
Ok(Err(azure)) => Err(azure),
|
||||
Err(_timeout) => Err(azure_core::Error::new(
|
||||
azure_core::error::ErrorKind::Io,
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::TimedOut,
|
||||
"Operation timed out",
|
||||
),
|
||||
)),
|
||||
}
|
||||
};
|
||||
upload_futures.push(tokio::spawn(part_fut));
|
||||
|
||||
if let Some(metadata) = metadata {
|
||||
builder = builder.metadata(to_azure_metadata(metadata));
|
||||
block_list_count += 1;
|
||||
remaining_bytes -= block_size;
|
||||
start_bytes += block_size as u64;
|
||||
|
||||
block_list
|
||||
.blocks
|
||||
.push(BlobBlockType::Uncommitted(encoded_block_id.to_vec().into()));
|
||||
}
|
||||
|
||||
tracing::debug!(
|
||||
"azure put blocks {} total MB: {} chunk size MB: {}",
|
||||
block_list_count,
|
||||
data_size_bytes / 1024 / 1024,
|
||||
put_block_size / 1024 / 1024
|
||||
);
|
||||
// Wait for all blocks to be uploaded.
|
||||
let upload_results = futures::future::try_join_all(upload_futures).await;
|
||||
if upload_results.is_err() {
|
||||
return Err(anyhow::anyhow!(format!(
|
||||
"Failed to upload all blocks {:#?}",
|
||||
upload_results.unwrap_err()
|
||||
)));
|
||||
}
|
||||
|
||||
// Commit the blocks.
|
||||
let mut builder = blob_client.put_block_list(block_list);
|
||||
if !metadata_map.0.is_empty() {
|
||||
builder = builder.metadata(to_azure_metadata(metadata_map));
|
||||
}
|
||||
let fut = builder.into_future();
|
||||
let fut = tokio::time::timeout(self.timeout, fut);
|
||||
let result = fut.await;
|
||||
tracing::debug!("azure put block list response {:#?}", result);
|
||||
|
||||
match fut.await {
|
||||
match result {
|
||||
Ok(Ok(_response)) => Ok(()),
|
||||
Ok(Err(azure)) => Err(azure.into()),
|
||||
Err(_timeout) => Err(TimeoutOrCancel::Timeout.into()),
|
||||
}
|
||||
};
|
||||
/* END_HADRON */
|
||||
|
||||
let res = tokio::select! {
|
||||
res = op => res,
|
||||
@@ -622,7 +742,6 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
crate::metrics::BUCKET_METRICS
|
||||
.req_seconds
|
||||
.observe_elapsed(kind, outcome, started_at);
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
|
||||
@@ -195,8 +195,19 @@ pub struct AzureConfig {
|
||||
pub max_keys_per_list_response: Option<i32>,
|
||||
#[serde(default = "default_azure_conn_pool_size")]
|
||||
pub conn_pool_size: usize,
|
||||
/* BEGIN_HADRON */
|
||||
#[serde(default = "default_azure_put_block_size_mb")]
|
||||
pub put_block_size_mb: Option<usize>,
|
||||
/* END_HADRON */
|
||||
}
|
||||
|
||||
/* BEGIN_HADRON */
|
||||
fn default_azure_put_block_size_mb() -> Option<usize> {
|
||||
// Disable parallel upload by default.
|
||||
Some(0)
|
||||
}
|
||||
/* END_HADRON */
|
||||
|
||||
fn default_remote_storage_azure_concurrency_limit() -> NonZeroUsize {
|
||||
NonZeroUsize::new(DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT).unwrap()
|
||||
}
|
||||
@@ -213,6 +224,9 @@ impl Debug for AzureConfig {
|
||||
"max_keys_per_list_response",
|
||||
&self.max_keys_per_list_response,
|
||||
)
|
||||
/* BEGIN_HADRON */
|
||||
.field("put_block_size_mb", &self.put_block_size_mb)
|
||||
/* END_HADRON */
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
@@ -352,6 +366,7 @@ timeout = '5s'";
|
||||
upload_storage_class = 'INTELLIGENT_TIERING'
|
||||
timeout = '7s'
|
||||
conn_pool_size = 8
|
||||
put_block_size_mb = 1024
|
||||
";
|
||||
|
||||
let config = parse(toml).unwrap();
|
||||
@@ -367,6 +382,9 @@ timeout = '5s'";
|
||||
concurrency_limit: default_remote_storage_azure_concurrency_limit(),
|
||||
max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
|
||||
conn_pool_size: 8,
|
||||
/* BEGIN_HADRON */
|
||||
put_block_size_mb: Some(1024),
|
||||
/* END_HADRON */
|
||||
}),
|
||||
timeout: Duration::from_secs(7),
|
||||
small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT
|
||||
|
||||
@@ -732,9 +732,15 @@ impl GenericRemoteStorage {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn unreliable_wrapper(s: Self, fail_first: u64) -> Self {
|
||||
Self::Unreliable(Arc::new(UnreliableWrapper::new(s, fail_first)))
|
||||
/* BEGIN_HADRON */
|
||||
pub fn unreliable_wrapper(s: Self, fail_first: u64, fail_probability: u64) -> Self {
|
||||
Self::Unreliable(Arc::new(UnreliableWrapper::new(
|
||||
s,
|
||||
fail_first,
|
||||
fail_probability,
|
||||
)))
|
||||
}
|
||||
/* END_HADRON */
|
||||
|
||||
/// See [`RemoteStorage::upload`], which this method calls with `None` as metadata.
|
||||
pub async fn upload_storage_object(
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
//! This module provides a wrapper around a real RemoteStorage implementation that
|
||||
//! causes the first N attempts at each upload or download operatio to fail. For
|
||||
//! testing purposes.
|
||||
use rand::Rng;
|
||||
use std::cmp;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::num::NonZeroU32;
|
||||
@@ -25,6 +27,13 @@ pub struct UnreliableWrapper {
|
||||
|
||||
// Tracks how many failed attempts of each operation has been made.
|
||||
attempts: Mutex<HashMap<RemoteOp, u64>>,
|
||||
|
||||
/* BEGIN_HADRON */
|
||||
// This the probability of failure for each operation, ranged from [0, 100].
|
||||
// The probability is default to 100, which means that all operations will fail.
|
||||
// Storage will fail by probability up to attempts_to_fail times.
|
||||
attempt_failure_probability: u64,
|
||||
/* END_HADRON */
|
||||
}
|
||||
|
||||
/// Used to identify retries of different unique operation.
|
||||
@@ -40,7 +49,11 @@ enum RemoteOp {
|
||||
}
|
||||
|
||||
impl UnreliableWrapper {
|
||||
pub fn new(inner: crate::GenericRemoteStorage, attempts_to_fail: u64) -> Self {
|
||||
pub fn new(
|
||||
inner: crate::GenericRemoteStorage,
|
||||
attempts_to_fail: u64,
|
||||
attempt_failure_probability: u64,
|
||||
) -> Self {
|
||||
assert!(attempts_to_fail > 0);
|
||||
let inner = match inner {
|
||||
GenericRemoteStorage::AwsS3(s) => GenericRemoteStorage::AwsS3(s),
|
||||
@@ -51,9 +64,11 @@ impl UnreliableWrapper {
|
||||
panic!("Can't wrap unreliable wrapper unreliably")
|
||||
}
|
||||
};
|
||||
let actual_attempt_failure_probability = cmp::min(attempt_failure_probability, 100);
|
||||
UnreliableWrapper {
|
||||
inner,
|
||||
attempts_to_fail,
|
||||
attempt_failure_probability: actual_attempt_failure_probability,
|
||||
attempts: Mutex::new(HashMap::new()),
|
||||
}
|
||||
}
|
||||
@@ -66,6 +81,7 @@ impl UnreliableWrapper {
|
||||
///
|
||||
fn attempt(&self, op: RemoteOp) -> anyhow::Result<u64> {
|
||||
let mut attempts = self.attempts.lock().unwrap();
|
||||
let mut rng = rand::thread_rng();
|
||||
|
||||
match attempts.entry(op) {
|
||||
Entry::Occupied(mut e) => {
|
||||
@@ -75,15 +91,19 @@ impl UnreliableWrapper {
|
||||
*p
|
||||
};
|
||||
|
||||
if attempts_before_this >= self.attempts_to_fail {
|
||||
// let it succeed
|
||||
e.remove();
|
||||
Ok(attempts_before_this)
|
||||
} else {
|
||||
/* BEGIN_HADRON */
|
||||
// If there are more attempts to fail, fail the request by probability.
|
||||
if (attempts_before_this < self.attempts_to_fail)
|
||||
&& (rng.gen_range(0..=100) < self.attempt_failure_probability)
|
||||
{
|
||||
let error =
|
||||
anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
|
||||
Err(error)
|
||||
} else {
|
||||
e.remove();
|
||||
Ok(attempts_before_this)
|
||||
}
|
||||
/* END_HADRON */
|
||||
}
|
||||
Entry::Vacant(e) => {
|
||||
let error = anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
|
||||
|
||||
@@ -165,10 +165,42 @@ pub(crate) async fn upload_remote_data(
|
||||
|
||||
let (data, data_len) =
|
||||
upload_stream(format!("remote blob data {i}").into_bytes().into());
|
||||
|
||||
/* BEGIN_HADRON */
|
||||
let mut metadata = None;
|
||||
if matches!(&*task_client, GenericRemoteStorage::AzureBlob(_)) {
|
||||
let file_path = "/tmp/dbx_upload_tmp_file.txt";
|
||||
{
|
||||
// Open the file in append mode
|
||||
let mut file = std::fs::OpenOptions::new()
|
||||
.append(true)
|
||||
.create(true) // Create the file if it doesn't exist
|
||||
.open(file_path)?;
|
||||
// Append some bytes to the file
|
||||
std::io::Write::write_all(
|
||||
&mut file,
|
||||
&format!("remote blob data {i}").into_bytes(),
|
||||
)?;
|
||||
file.sync_all()?;
|
||||
}
|
||||
metadata = Some(remote_storage::StorageMetadata::from([(
|
||||
"databricks_azure_put_block",
|
||||
file_path,
|
||||
)]));
|
||||
}
|
||||
/* END_HADRON */
|
||||
|
||||
task_client
|
||||
.upload(data, data_len, &blob_path, None, &cancel)
|
||||
.upload(data, data_len, &blob_path, metadata, &cancel)
|
||||
.await?;
|
||||
|
||||
// TODO: Check upload is using the put_block upload.
|
||||
// We cannot consume data here since data is moved inside the upload.
|
||||
// let total_bytes = data.fold(0, |acc, chunk| async move {
|
||||
// acc + chunk.map(|bytes| bytes.len()).unwrap_or(0)
|
||||
// }).await;
|
||||
// assert_eq!(total_bytes, data_len);
|
||||
|
||||
Ok::<_, anyhow::Error>((blob_prefix, blob_path))
|
||||
});
|
||||
}
|
||||
|
||||
@@ -219,6 +219,9 @@ async fn create_azure_client(
|
||||
concurrency_limit: NonZeroUsize::new(100).unwrap(),
|
||||
max_keys_per_list_response,
|
||||
conn_pool_size: 8,
|
||||
/* BEGIN_HADRON */
|
||||
put_block_size_mb: Some(1),
|
||||
/* END_HADRON */
|
||||
}),
|
||||
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
|
||||
small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT,
|
||||
|
||||
@@ -9,7 +9,7 @@ anyhow.workspace = true
|
||||
const_format.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
postgres_ffi.workspace = true
|
||||
postgres_ffi_types.workspace = true
|
||||
postgres_versioninfo.workspace = true
|
||||
pq_proto.workspace = true
|
||||
tokio.workspace = true
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use postgres_ffi::TimestampTz;
|
||||
use postgres_ffi_types::TimestampTz;
|
||||
use postgres_versioninfo::PgVersionId;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::time::Instant;
|
||||
@@ -11,7 +11,7 @@ use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::pageserver_feedback::PageserverFeedback;
|
||||
|
||||
use crate::membership::Configuration;
|
||||
use crate::membership::{Configuration, SafekeeperGeneration};
|
||||
use crate::{ServerInfo, Term};
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
@@ -221,7 +221,7 @@ pub struct TimelineMembershipSwitchRequest {
|
||||
pub struct TimelineMembershipSwitchResponse {
|
||||
pub previous_conf: Configuration,
|
||||
pub current_conf: Configuration,
|
||||
pub term: Term,
|
||||
pub last_log_term: Term,
|
||||
pub flush_lsn: Lsn,
|
||||
}
|
||||
|
||||
@@ -311,3 +311,12 @@ pub struct PullTimelineResponse {
|
||||
pub safekeeper_host: Option<String>,
|
||||
// TODO: add more fields?
|
||||
}
|
||||
|
||||
/// Response to a timeline locate request.
|
||||
/// Storcon-only API.
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
pub struct TimelineLocateResponse {
|
||||
pub generation: SafekeeperGeneration,
|
||||
pub sk_set: Vec<NodeId>,
|
||||
pub new_sk_set: Option<Vec<NodeId>>,
|
||||
}
|
||||
|
||||
@@ -47,6 +47,7 @@ tracing-subscriber = { workspace = true, features = ["json", "registry"] }
|
||||
tracing-utils.workspace = true
|
||||
rand.workspace = true
|
||||
scopeguard.workspace = true
|
||||
uuid.workspace = true
|
||||
strum.workspace = true
|
||||
strum_macros.workspace = true
|
||||
walkdir.workspace = true
|
||||
|
||||
@@ -12,7 +12,8 @@ use jsonwebtoken::{
|
||||
Algorithm, DecodingKey, EncodingKey, Header, TokenData, Validation, decode, encode,
|
||||
};
|
||||
use pem::Pem;
|
||||
use serde::{Deserialize, Serialize, de::DeserializeOwned};
|
||||
use serde::{Deserialize, Deserializer, Serialize, de::DeserializeOwned};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::id::TenantId;
|
||||
|
||||
@@ -25,6 +26,11 @@ pub enum Scope {
|
||||
/// Provides access to all data for a specific tenant (specified in `struct Claims` below)
|
||||
// TODO: join these two?
|
||||
Tenant,
|
||||
/// Provides access to all data for a specific tenant, but based on endpoint ID. This token scope
|
||||
/// is only used by compute to fetch the spec for a specific endpoint. The spec contains a Tenant-scoped
|
||||
/// token authorizing access to all data of a tenant, so the spec-fetch API requires a TenantEndpoint
|
||||
/// scope token to ensure that untrusted compute nodes can't fetch spec for arbitrary endpoints.
|
||||
TenantEndpoint,
|
||||
/// Provides blanket access to all tenants on the pageserver plus pageserver-wide APIs.
|
||||
/// Should only be used e.g. for status check/tenant creation/list.
|
||||
PageServerApi,
|
||||
@@ -51,17 +57,43 @@ pub enum Scope {
|
||||
ControllerPeer,
|
||||
}
|
||||
|
||||
fn deserialize_empty_string_as_none_uuid<'de, D>(deserializer: D) -> Result<Option<Uuid>, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
let opt = Option::<String>::deserialize(deserializer)?;
|
||||
match opt.as_deref() {
|
||||
Some("") => Ok(None),
|
||||
Some(s) => Uuid::parse_str(s)
|
||||
.map(Some)
|
||||
.map_err(serde::de::Error::custom),
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
/// JWT payload. See docs/authentication.md for the format
|
||||
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
|
||||
pub struct Claims {
|
||||
#[serde(default)]
|
||||
pub tenant_id: Option<TenantId>,
|
||||
#[serde(
|
||||
default,
|
||||
skip_serializing_if = "Option::is_none",
|
||||
// Neon control plane includes this field as empty in the claims.
|
||||
// Consider it None in those cases.
|
||||
deserialize_with = "deserialize_empty_string_as_none_uuid"
|
||||
)]
|
||||
pub endpoint_id: Option<Uuid>,
|
||||
pub scope: Scope,
|
||||
}
|
||||
|
||||
impl Claims {
|
||||
pub fn new(tenant_id: Option<TenantId>, scope: Scope) -> Self {
|
||||
Self { tenant_id, scope }
|
||||
Self {
|
||||
tenant_id,
|
||||
scope,
|
||||
endpoint_id: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -212,6 +244,7 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
|
||||
let expected_claims = Claims {
|
||||
tenant_id: Some(TenantId::from_str("3d1f7595b468230304e0b73cecbcb081").unwrap()),
|
||||
scope: Scope::Tenant,
|
||||
endpoint_id: None,
|
||||
};
|
||||
|
||||
// A test token containing the following payload, signed using TEST_PRIV_KEY_ED25519:
|
||||
@@ -240,6 +273,7 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
|
||||
let claims = Claims {
|
||||
tenant_id: Some(TenantId::from_str("3d1f7595b468230304e0b73cecbcb081").unwrap()),
|
||||
scope: Scope::Tenant,
|
||||
endpoint_id: None,
|
||||
};
|
||||
|
||||
let pem = pem::parse(TEST_PRIV_KEY_ED25519).unwrap();
|
||||
|
||||
@@ -44,3 +44,63 @@ where
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* BEGIN_HADRON */
|
||||
pub enum DeploymentMode {
|
||||
Local,
|
||||
Dev,
|
||||
Staging,
|
||||
Prod,
|
||||
}
|
||||
|
||||
pub fn get_deployment_mode() -> Option<DeploymentMode> {
|
||||
match std::env::var("DEPLOYMENT_MODE") {
|
||||
Ok(env) => match env.as_str() {
|
||||
"development" => Some(DeploymentMode::Dev),
|
||||
"staging" => Some(DeploymentMode::Staging),
|
||||
"production" => Some(DeploymentMode::Prod),
|
||||
_ => {
|
||||
tracing::error!("Unexpected DEPLOYMENT_MODE: {}", env);
|
||||
None
|
||||
}
|
||||
},
|
||||
Err(_) => {
|
||||
// tracing::error!("DEPLOYMENT_MODE not set");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_dev_or_staging() -> bool {
|
||||
matches!(
|
||||
get_deployment_mode(),
|
||||
Some(DeploymentMode::Dev) | Some(DeploymentMode::Staging)
|
||||
)
|
||||
}
|
||||
|
||||
pub enum TestingMode {
|
||||
Chaos,
|
||||
Stress,
|
||||
}
|
||||
|
||||
pub fn get_test_mode() -> Option<TestingMode> {
|
||||
match std::env::var("HADRON_TEST_MODE") {
|
||||
Ok(env) => match env.as_str() {
|
||||
"chaos" => Some(TestingMode::Chaos),
|
||||
"stress" => Some(TestingMode::Stress),
|
||||
_ => {
|
||||
tracing::error!("Unexpected HADRON_TEST_MODE: {}", env);
|
||||
None
|
||||
}
|
||||
},
|
||||
Err(_) => {
|
||||
tracing::error!("HADRON_TEST_MODE not set");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_chaos_testing() -> bool {
|
||||
matches!(get_test_mode(), Some(TestingMode::Chaos))
|
||||
}
|
||||
/* END_HADRON */
|
||||
|
||||
73
libs/utils/src/ip_address.rs
Normal file
73
libs/utils/src/ip_address.rs
Normal file
@@ -0,0 +1,73 @@
|
||||
use std::env::{VarError, var};
|
||||
use std::error::Error;
|
||||
use std::net::IpAddr;
|
||||
use std::str::FromStr;
|
||||
|
||||
/// Name of the environment variable containing the reachable IP address of the node. If set, the IP address contained in this
|
||||
/// environment variable is used as the reachable IP address of the pageserver or safekeeper node during node registration.
|
||||
/// In a Kubernetes environment, this environment variable should be set by Kubernetes to the Pod IP (specified in the Pod
|
||||
/// template).
|
||||
pub const HADRON_NODE_IP_ADDRESS: &str = "HADRON_NODE_IP_ADDRESS";
|
||||
|
||||
/// Read the reachable IP address of this page server from env var HADRON_NODE_IP_ADDRESS.
|
||||
/// In Kubernetes this environment variable is set to the Pod IP (specified in the Pod template).
|
||||
pub fn read_node_ip_addr_from_env() -> Result<Option<IpAddr>, Box<dyn Error>> {
|
||||
match var(HADRON_NODE_IP_ADDRESS) {
|
||||
Ok(v) => {
|
||||
if let Ok(addr) = IpAddr::from_str(&v) {
|
||||
Ok(Some(addr))
|
||||
} else {
|
||||
Err(format!("Invalid IP address string: {v}. Cannot be parsed as either an IPv4 or an IPv6 address.").into())
|
||||
}
|
||||
}
|
||||
Err(VarError::NotPresent) => Ok(None),
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::env;
|
||||
use std::net::{Ipv4Addr, Ipv6Addr};
|
||||
|
||||
#[test]
|
||||
fn test_read_node_ip_addr_from_env() {
|
||||
// SAFETY: test code
|
||||
unsafe {
|
||||
// Test with a valid IPv4 address
|
||||
env::set_var(HADRON_NODE_IP_ADDRESS, "192.168.1.1");
|
||||
let result = read_node_ip_addr_from_env().unwrap();
|
||||
assert_eq!(result, Some(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1))));
|
||||
|
||||
// Test with a valid IPv6 address
|
||||
env::set_var(
|
||||
HADRON_NODE_IP_ADDRESS,
|
||||
"2001:0db8:85a3:0000:0000:8a2e:0370:7334",
|
||||
);
|
||||
}
|
||||
let result = read_node_ip_addr_from_env().unwrap();
|
||||
assert_eq!(
|
||||
result,
|
||||
Some(IpAddr::V6(
|
||||
Ipv6Addr::from_str("2001:0db8:85a3:0000:0000:8a2e:0370:7334").unwrap()
|
||||
))
|
||||
);
|
||||
|
||||
// Test with an invalid IP address
|
||||
// SAFETY: test code
|
||||
unsafe {
|
||||
env::set_var(HADRON_NODE_IP_ADDRESS, "invalid_ip");
|
||||
}
|
||||
let result = read_node_ip_addr_from_env();
|
||||
assert!(result.is_err());
|
||||
|
||||
// Test with no environment variable set
|
||||
// SAFETY: test code
|
||||
unsafe {
|
||||
env::remove_var(HADRON_NODE_IP_ADDRESS);
|
||||
}
|
||||
let result = read_node_ip_addr_from_env().unwrap();
|
||||
assert_eq!(result, None);
|
||||
}
|
||||
}
|
||||
@@ -26,6 +26,9 @@ pub mod auth;
|
||||
// utility functions and helper traits for unified unique id generation/serialization etc.
|
||||
pub mod id;
|
||||
|
||||
// utility functions to obtain reachable IP addresses in PS/SK nodes.
|
||||
pub mod ip_address;
|
||||
|
||||
pub mod shard;
|
||||
|
||||
mod hex;
|
||||
@@ -99,6 +102,8 @@ pub mod elapsed_accum;
|
||||
#[cfg(target_os = "linux")]
|
||||
pub mod linux_socket_ioctl;
|
||||
|
||||
pub mod metrics_collector;
|
||||
|
||||
// Re-export used in macro. Avoids adding git-version as dep in target crates.
|
||||
#[doc(hidden)]
|
||||
pub use git_version;
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::str::FromStr;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -7,7 +8,7 @@ use metrics::{IntCounter, IntCounterVec};
|
||||
use once_cell::sync::Lazy;
|
||||
use strum_macros::{EnumString, VariantNames};
|
||||
use tokio::time::Instant;
|
||||
use tracing::info;
|
||||
use tracing::{info, warn};
|
||||
|
||||
/// Logs a critical error, similarly to `tracing::error!`. This will:
|
||||
///
|
||||
@@ -377,10 +378,11 @@ impl std::fmt::Debug for SecretString {
|
||||
///
|
||||
/// TODO: consider upgrading this to a warning, but currently it fires too often.
|
||||
#[inline]
|
||||
pub async fn log_slow<F, O>(name: &str, threshold: Duration, f: std::pin::Pin<&mut F>) -> O
|
||||
where
|
||||
F: Future<Output = O>,
|
||||
{
|
||||
pub async fn log_slow<O>(
|
||||
name: &str,
|
||||
threshold: Duration,
|
||||
f: Pin<&mut impl Future<Output = O>>,
|
||||
) -> O {
|
||||
monitor_slow_future(
|
||||
threshold,
|
||||
threshold, // period = threshold
|
||||
@@ -394,16 +396,42 @@ where
|
||||
if !is_slow {
|
||||
return;
|
||||
}
|
||||
let elapsed = elapsed_total.as_secs_f64();
|
||||
if ready {
|
||||
info!(
|
||||
"slow {name} completed after {:.3}s",
|
||||
elapsed_total.as_secs_f64()
|
||||
);
|
||||
info!("slow {name} completed after {elapsed:.3}s");
|
||||
} else {
|
||||
info!(
|
||||
"slow {name} still running after {:.3}s",
|
||||
elapsed_total.as_secs_f64()
|
||||
);
|
||||
info!("slow {name} still running after {elapsed:.3}s");
|
||||
}
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Logs a periodic warning if a future is slow to complete.
|
||||
#[inline]
|
||||
pub async fn warn_slow<O>(
|
||||
name: &str,
|
||||
threshold: Duration,
|
||||
f: Pin<&mut impl Future<Output = O>>,
|
||||
) -> O {
|
||||
monitor_slow_future(
|
||||
threshold,
|
||||
threshold, // period = threshold
|
||||
f,
|
||||
|MonitorSlowFutureCallback {
|
||||
ready,
|
||||
is_slow,
|
||||
elapsed_total,
|
||||
elapsed_since_last_callback: _,
|
||||
}| {
|
||||
if !is_slow {
|
||||
return;
|
||||
}
|
||||
let elapsed = elapsed_total.as_secs_f64();
|
||||
if ready {
|
||||
warn!("slow {name} completed after {elapsed:.3}s");
|
||||
} else {
|
||||
warn!("slow {name} still running after {elapsed:.3}s");
|
||||
}
|
||||
},
|
||||
)
|
||||
@@ -416,7 +444,7 @@ where
|
||||
pub async fn monitor_slow_future<F, O>(
|
||||
threshold: Duration,
|
||||
period: Duration,
|
||||
mut fut: std::pin::Pin<&mut F>,
|
||||
mut fut: Pin<&mut F>,
|
||||
mut cb: impl FnMut(MonitorSlowFutureCallback),
|
||||
) -> O
|
||||
where
|
||||
|
||||
75
libs/utils/src/metrics_collector.rs
Normal file
75
libs/utils/src/metrics_collector.rs
Normal file
@@ -0,0 +1,75 @@
|
||||
use std::{
|
||||
sync::{Arc, RwLock},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use metrics::{IntGauge, proto::MetricFamily, register_int_gauge};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
pub static METRICS_STALE_MILLIS: Lazy<IntGauge> = Lazy::new(|| {
|
||||
register_int_gauge!(
|
||||
"metrics_metrics_stale_milliseconds",
|
||||
"The current metrics stale time in milliseconds"
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct CollectedMetrics {
|
||||
pub metrics: Vec<MetricFamily>,
|
||||
pub collected_at: Instant,
|
||||
}
|
||||
|
||||
impl CollectedMetrics {
|
||||
fn new(metrics: Vec<MetricFamily>) -> Self {
|
||||
Self {
|
||||
metrics,
|
||||
collected_at: Instant::now(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct MetricsCollector {
|
||||
last_collected: RwLock<Arc<CollectedMetrics>>,
|
||||
}
|
||||
|
||||
impl MetricsCollector {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
last_collected: RwLock::new(Arc::new(CollectedMetrics::new(vec![]))),
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(name = "metrics_collector", skip_all)]
|
||||
pub fn run_once(&self, cache_metrics: bool) -> Arc<CollectedMetrics> {
|
||||
let started = Instant::now();
|
||||
let metrics = metrics::gather();
|
||||
let collected = Arc::new(CollectedMetrics::new(metrics));
|
||||
if cache_metrics {
|
||||
let mut guard = self.last_collected.write().unwrap();
|
||||
*guard = collected.clone();
|
||||
}
|
||||
tracing::info!(
|
||||
"Collected {} metric families in {} ms",
|
||||
collected.metrics.len(),
|
||||
started.elapsed().as_millis()
|
||||
);
|
||||
collected
|
||||
}
|
||||
|
||||
pub fn last_collected(&self) -> Arc<CollectedMetrics> {
|
||||
self.last_collected.read().unwrap().clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for MetricsCollector {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
// Interval for metrics collection. Currently hard-coded to be the same as the metrics scape interval from the obs agent
|
||||
pub static METRICS_COLLECTION_INTERVAL: Duration = Duration::from_secs(30);
|
||||
|
||||
pub static METRICS_COLLECTOR: Lazy<MetricsCollector> = Lazy::new(MetricsCollector::default);
|
||||
@@ -49,16 +49,14 @@ pub struct TenantShardId {
|
||||
pub shard_count: ShardCount,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for ShardCount {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
self.0.fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl ShardCount {
|
||||
pub const MAX: Self = Self(u8::MAX);
|
||||
pub const MIN: Self = Self(0);
|
||||
|
||||
pub fn unsharded() -> Self {
|
||||
ShardCount(0)
|
||||
}
|
||||
|
||||
/// The internal value of a ShardCount may be zero, which means "1 shard, but use
|
||||
/// legacy format for TenantShardId that excludes the shard suffix", also known
|
||||
/// as [`TenantShardId::unsharded`].
|
||||
@@ -177,6 +175,12 @@ impl std::fmt::Display for ShardNumber {
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for ShardCount {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
self.0.fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for ShardSlug<'_> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
|
||||
@@ -2,7 +2,8 @@
|
||||
|
||||
use bytes::Bytes;
|
||||
use postgres_ffi::walrecord::{MultiXactMember, describe_postgres_wal_record};
|
||||
use postgres_ffi::{MultiXactId, MultiXactOffset, TimestampTz, TransactionId};
|
||||
use postgres_ffi::{MultiXactId, MultiXactOffset, TransactionId};
|
||||
use postgres_ffi_types::TimestampTz;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::bin_ser::DeserializeError;
|
||||
|
||||
|
||||
@@ -428,6 +428,12 @@ pub fn empty_shmem() -> crate::bindings::WalproposerShmemState {
|
||||
shard_number: 0,
|
||||
};
|
||||
|
||||
let empty_wal_rate_limiter = crate::bindings::WalRateLimiter {
|
||||
should_limit: crate::bindings::pg_atomic_uint32 { value: 0 },
|
||||
sent_bytes: 0,
|
||||
last_recorded_time_us: crate::bindings::pg_atomic_uint64 { value: 0 },
|
||||
};
|
||||
|
||||
crate::bindings::WalproposerShmemState {
|
||||
propEpochStartLsn: crate::bindings::pg_atomic_uint64 { value: 0 },
|
||||
donor_name: [0; 64],
|
||||
@@ -441,6 +447,7 @@ pub fn empty_shmem() -> crate::bindings::WalproposerShmemState {
|
||||
num_shards: 0,
|
||||
replica_promote: false,
|
||||
min_ps_feedback: empty_feedback,
|
||||
wal_rate_limiter: empty_wal_rate_limiter,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user