Compare commits

..

1 Commits

Author SHA1 Message Date
Bojan Serafimov
0abfc72f1c Revert postgres prewarming 2023-07-31 14:15:26 -04:00
10 changed files with 69 additions and 323 deletions

View File

@@ -198,7 +198,7 @@ fn main() -> Result<()> {
// this compute node while it's busy prewarming. It's not too
// bad because it's just 100ms and unlikely, but it's an
// avoidable problem.
compute.prewarm_postgres()?;
// compute.prewarm_postgres()?;
let mut state = compute.state.lock().unwrap();
while state.status != ComputeStatus::ConfigurationPending {

View File

@@ -5,7 +5,7 @@ use chrono::{DateTime, Utc};
use rand::Rng;
use serde::Serialize;
#[derive(Serialize, Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
#[derive(Serialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
#[serde(tag = "type")]
pub enum EventType {
#[serde(rename = "absolute")]
@@ -17,32 +17,6 @@ pub enum EventType {
},
}
impl EventType {
pub fn absolute_time(&self) -> Option<&DateTime<Utc>> {
use EventType::*;
match self {
Absolute { time } => Some(time),
_ => None,
}
}
pub fn incremental_timerange(&self) -> Option<std::ops::Range<&DateTime<Utc>>> {
// these can most likely be thought of as Range or RangeFull
use EventType::*;
match self {
Incremental {
start_time,
stop_time,
} => Some(start_time..stop_time),
_ => None,
}
}
pub fn is_incremental(&self) -> bool {
matches!(self, EventType::Incremental { .. })
}
}
#[derive(Serialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
pub struct Event<Extra> {
#[serde(flatten)]

View File

@@ -7,7 +7,7 @@ use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::{mgr, LogicalSizeCalculationCause};
use anyhow;
use chrono::{DateTime, Utc};
use chrono::Utc;
use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
use pageserver_api::models::TenantState;
use reqwest::Url;
@@ -18,6 +18,12 @@ use std::time::Duration;
use tracing::*;
use utils::id::{NodeId, TenantId, TimelineId};
const WRITTEN_SIZE: &str = "written_size";
const SYNTHETIC_STORAGE_SIZE: &str = "synthetic_storage_size";
const RESIDENT_SIZE: &str = "resident_size";
const REMOTE_STORAGE_SIZE: &str = "remote_storage_size";
const TIMELINE_LOGICAL_SIZE: &str = "timeline_logical_size";
const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
#[serde_as]
@@ -38,121 +44,6 @@ pub struct PageserverConsumptionMetricsKey {
pub metric: &'static str,
}
impl PageserverConsumptionMetricsKey {
const fn absolute_values(self) -> AbsoluteValueFactory {
AbsoluteValueFactory(self)
}
const fn incremental_values(self) -> IncrementalValueFactory {
IncrementalValueFactory(self)
}
}
/// Helper type which each individual metric kind can return to produce only absolute values.
struct AbsoluteValueFactory(PageserverConsumptionMetricsKey);
impl AbsoluteValueFactory {
fn now(self, val: u64) -> (PageserverConsumptionMetricsKey, (EventType, u64)) {
let key = self.0;
let time = Utc::now();
(key, (EventType::Absolute { time }, val))
}
}
/// Helper type which each individual metric kind can return to produce only incremental values.
struct IncrementalValueFactory(PageserverConsumptionMetricsKey);
impl IncrementalValueFactory {
#[allow(clippy::wrong_self_convention)]
fn from_previous_up_to(
self,
prev_end: DateTime<Utc>,
up_to: DateTime<Utc>,
val: u64,
) -> (PageserverConsumptionMetricsKey, (EventType, u64)) {
let key = self.0;
// cannot assert prev_end < up_to because these are realtime clock based
(
key,
(
EventType::Incremental {
start_time: prev_end,
stop_time: up_to,
},
val,
),
)
}
fn key(&self) -> &PageserverConsumptionMetricsKey {
&self.0
}
}
// the static part of a PageserverConsumptionMetricsKey
impl PageserverConsumptionMetricsKey {
const fn written_size(tenant_id: TenantId, timeline_id: TimelineId) -> AbsoluteValueFactory {
PageserverConsumptionMetricsKey {
tenant_id,
timeline_id: Some(timeline_id),
metric: "written_size",
}
.absolute_values()
}
/// Values will be the difference of the latest written_size (last_record_lsn) to what we
/// previously sent.
const fn written_size_delta(
tenant_id: TenantId,
timeline_id: TimelineId,
) -> IncrementalValueFactory {
PageserverConsumptionMetricsKey {
tenant_id,
timeline_id: Some(timeline_id),
metric: "written_size_bytes_delta",
}
.incremental_values()
}
const fn timeline_logical_size(
tenant_id: TenantId,
timeline_id: TimelineId,
) -> AbsoluteValueFactory {
PageserverConsumptionMetricsKey {
tenant_id,
timeline_id: Some(timeline_id),
metric: "timeline_logical_size",
}
.absolute_values()
}
const fn remote_storage_size(tenant_id: TenantId) -> AbsoluteValueFactory {
PageserverConsumptionMetricsKey {
tenant_id,
timeline_id: None,
metric: "remote_storage_size",
}
.absolute_values()
}
const fn resident_size(tenant_id: TenantId) -> AbsoluteValueFactory {
PageserverConsumptionMetricsKey {
tenant_id,
timeline_id: None,
metric: "resident_size",
}
.absolute_values()
}
const fn synthetic_size(tenant_id: TenantId) -> AbsoluteValueFactory {
PageserverConsumptionMetricsKey {
tenant_id,
timeline_id: None,
metric: "synthetic_storage_size",
}
.absolute_values()
}
}
/// Main thread that serves metrics collection
pub async fn collect_metrics(
metric_collection_endpoint: &Url,
@@ -188,7 +79,7 @@ pub async fn collect_metrics(
.timeout(DEFAULT_HTTP_REPORTING_TIMEOUT)
.build()
.expect("Failed to create http client with timeout");
let mut cached_metrics = HashMap::new();
let mut cached_metrics: HashMap<PageserverConsumptionMetricsKey, u64> = HashMap::new();
let mut prev_iteration_time: std::time::Instant = std::time::Instant::now();
loop {
@@ -230,13 +121,13 @@ pub async fn collect_metrics(
/// - refactor this function (chunking+sending part) to reuse it in proxy module;
pub async fn collect_metrics_iteration(
client: &reqwest::Client,
cached_metrics: &mut HashMap<PageserverConsumptionMetricsKey, (EventType, u64)>,
cached_metrics: &mut HashMap<PageserverConsumptionMetricsKey, u64>,
metric_collection_endpoint: &reqwest::Url,
node_id: NodeId,
ctx: &RequestContext,
send_cached: bool,
) {
let mut current_metrics: Vec<(PageserverConsumptionMetricsKey, (EventType, u64))> = Vec::new();
let mut current_metrics: Vec<(PageserverConsumptionMetricsKey, u64)> = Vec::new();
trace!(
"starting collect_metrics_iteration. metric_collection_endpoint: {}",
metric_collection_endpoint
@@ -275,80 +166,27 @@ pub async fn collect_metrics_iteration(
if timeline.is_active() {
let timeline_written_size = u64::from(timeline.get_last_record_lsn());
let (key, written_size_now) =
PageserverConsumptionMetricsKey::written_size(tenant_id, timeline.timeline_id)
.now(timeline_written_size);
// last_record_lsn can only go up, right now at least, TODO: #2592 or related
// features might change this.
let written_size_delta_key = PageserverConsumptionMetricsKey::written_size_delta(
tenant_id,
timeline.timeline_id,
);
// use this when available, because in a stream of incremental values, it will be
// accurate where as when last_record_lsn stops moving, we will only cache the last
// one of those.
let last_stop_time =
cached_metrics
.get(written_size_delta_key.key())
.map(|(until, _val)| {
until
.incremental_timerange()
.expect("never create EventType::Absolute for written_size_delta")
.end
});
// by default, use the last sent written_size as the basis for
// calculating the delta. if we don't yet have one, use the load time value.
let prev = cached_metrics
.get(&key)
.map(|(prev_at, prev)| {
// use the prev time from our last incremental update, or default to latest
// absolute update on the first round.
let prev_at = prev_at
.absolute_time()
.expect("never create EventType::Incremental for written_size");
let prev_at = last_stop_time.unwrap_or(prev_at);
(*prev_at, *prev)
})
.unwrap_or_else(|| {
// if we don't have a previous point of comparison, compare to the load time
// lsn.
let (disk_consistent_lsn, loaded_at) = &timeline.loaded_at;
(DateTime::from(*loaded_at), disk_consistent_lsn.0)
});
// written_size_delta_bytes
current_metrics.extend(
if let Some(delta) = written_size_now.1.checked_sub(prev.1) {
let up_to = written_size_now
.0
.absolute_time()
.expect("never create EventType::Incremental for written_size");
let key_value =
written_size_delta_key.from_previous_up_to(prev.0, *up_to, delta);
Some(key_value)
} else {
None
current_metrics.push((
PageserverConsumptionMetricsKey {
tenant_id,
timeline_id: Some(timeline.timeline_id),
metric: WRITTEN_SIZE,
},
);
// written_size
current_metrics.push((key, written_size_now));
timeline_written_size,
));
let span = info_span!("collect_metrics_iteration", tenant_id = %timeline.tenant_id, timeline_id = %timeline.timeline_id);
match span.in_scope(|| timeline.get_current_logical_size(ctx)) {
// Only send timeline logical size when it is fully calculated.
Ok((size, is_exact)) if is_exact => {
current_metrics.push(
PageserverConsumptionMetricsKey::timeline_logical_size(
current_metrics.push((
PageserverConsumptionMetricsKey {
tenant_id,
timeline.timeline_id,
)
.now(size),
);
timeline_id: Some(timeline.timeline_id),
metric: TIMELINE_LOGICAL_SIZE,
},
size,
));
}
Ok((_, _)) => {}
Err(err) => {
@@ -367,10 +205,14 @@ pub async fn collect_metrics_iteration(
match tenant.get_remote_size().await {
Ok(tenant_remote_size) => {
current_metrics.push(
PageserverConsumptionMetricsKey::remote_storage_size(tenant_id)
.now(tenant_remote_size),
);
current_metrics.push((
PageserverConsumptionMetricsKey {
tenant_id,
timeline_id: None,
metric: REMOTE_STORAGE_SIZE,
},
tenant_remote_size,
));
}
Err(err) => {
error!(
@@ -380,9 +222,14 @@ pub async fn collect_metrics_iteration(
}
}
current_metrics.push(
PageserverConsumptionMetricsKey::resident_size(tenant_id).now(tenant_resident_size),
);
current_metrics.push((
PageserverConsumptionMetricsKey {
tenant_id,
timeline_id: None,
metric: RESIDENT_SIZE,
},
tenant_resident_size,
));
// Note that this metric is calculated in a separate bgworker
// Here we only use cached value, which may lag behind the real latest one
@@ -390,27 +237,23 @@ pub async fn collect_metrics_iteration(
if tenant_synthetic_size != 0 {
// only send non-zeroes because otherwise these show up as errors in logs
current_metrics.push(
PageserverConsumptionMetricsKey::synthetic_size(tenant_id)
.now(tenant_synthetic_size),
);
current_metrics.push((
PageserverConsumptionMetricsKey {
tenant_id,
timeline_id: None,
metric: SYNTHETIC_STORAGE_SIZE,
},
tenant_synthetic_size,
));
}
}
// Filter metrics, unless we want to send all metrics, including cached ones.
// See: https://github.com/neondatabase/neon/issues/3485
if !send_cached {
current_metrics.retain(|(curr_key, (kind, curr_val))| {
if kind.is_incremental() {
// incremental values (currently only written_size_delta) should not get any cache
// deduplication because they will be used by upstream for "is still alive."
true
} else {
match cached_metrics.get(curr_key) {
Some((_, val)) => val != curr_val,
None => true,
}
}
current_metrics.retain(|(curr_key, curr_val)| match cached_metrics.get(curr_key) {
Some(val) => val != curr_val,
None => true,
});
}
@@ -429,8 +272,8 @@ pub async fn collect_metrics_iteration(
chunk_to_send.clear();
// enrich metrics with type,timestamp and idempotency key before sending
chunk_to_send.extend(chunk.iter().map(|(curr_key, (when, curr_val))| Event {
kind: *when,
chunk_to_send.extend(chunk.iter().map(|(curr_key, curr_val)| Event {
kind: EventType::Absolute { time: Utc::now() },
metric: curr_key.metric,
idempotency_key: idempotency_key(node_id.to_string()),
value: *curr_val,

View File

@@ -294,10 +294,6 @@ pub struct Timeline {
/// Completion shared between all timelines loaded during startup; used to delay heavier
/// background tasks until some logical sizes have been calculated.
initial_logical_size_attempt: Mutex<Option<completion::Completion>>,
/// Load or creation time information about the disk_consistent_lsn and when the loading
/// happened. Used for consumption metrics.
pub(crate) loaded_at: (Lsn, SystemTime),
}
pub struct WalReceiverInfo {
@@ -1408,8 +1404,6 @@ impl Timeline {
last_freeze_at: AtomicLsn::new(disk_consistent_lsn.0),
last_freeze_ts: RwLock::new(Instant::now()),
loaded_at: (disk_consistent_lsn, SystemTime::now()),
ancestor_timeline: ancestor,
ancestor_lsn: metadata.ancestor_lsn(),

View File

@@ -1,9 +1,7 @@
use std::sync::Arc;
use anyhow::bail;
use futures::pin_mut;
use futures::StreamExt;
use hashbrown::HashMap;
use hyper::body::HttpBody;
use hyper::http::HeaderName;
use hyper::http::HeaderValue;
@@ -14,7 +12,6 @@ use serde_json::Value;
use tokio_postgres::types::Kind;
use tokio_postgres::types::Type;
use tokio_postgres::GenericClient;
use tokio_postgres::IsolationLevel;
use tokio_postgres::Row;
use url::Url;
@@ -40,8 +37,6 @@ const MAX_REQUEST_SIZE: u64 = 1024 * 1024; // 1 MB
static RAW_TEXT_OUTPUT: HeaderName = HeaderName::from_static("neon-raw-text-output");
static ARRAY_MODE: HeaderName = HeaderName::from_static("neon-array-mode");
static ALLOW_POOL: HeaderName = HeaderName::from_static("neon-pool-opt-in");
static TXN_ISOLATION_LEVEL: HeaderName = HeaderName::from_static("neon-batch-isolation-level");
static TXN_READ_ONLY: HeaderName = HeaderName::from_static("neon-batch-read-only");
static HEADER_VALUE_TRUE: HeaderValue = HeaderValue::from_static("true");
@@ -175,7 +170,7 @@ pub async fn handle(
request: Request<Body>,
sni_hostname: Option<String>,
conn_pool: Arc<GlobalConnPool>,
) -> anyhow::Result<(Value, HashMap<HeaderName, HeaderValue>)> {
) -> anyhow::Result<Value> {
//
// Determine the destination and connection params
//
@@ -190,23 +185,6 @@ pub async fn handle(
// Allow connection pooling only if explicitly requested
let allow_pool = headers.get(&ALLOW_POOL) == Some(&HEADER_VALUE_TRUE);
// isolation level and read only
let txn_isolation_level_raw = headers.get(&TXN_ISOLATION_LEVEL).cloned();
let txn_isolation_level = match txn_isolation_level_raw {
Some(ref x) => Some(match x.as_bytes() {
b"Serializable" => IsolationLevel::Serializable,
b"ReadUncommitted" => IsolationLevel::ReadUncommitted,
b"ReadCommitted" => IsolationLevel::ReadCommitted,
b"RepeatableRead" => IsolationLevel::RepeatableRead,
_ => bail!("invalid isolation level"),
}),
None => None,
};
let txn_read_only_raw = headers.get(&TXN_READ_ONLY).cloned();
let txn_read_only = txn_read_only_raw.as_ref() == Some(&HEADER_VALUE_TRUE);
let request_content_length = match request.body().size_hint().upper() {
Some(v) => v,
None => MAX_REQUEST_SIZE + 1,
@@ -230,19 +208,10 @@ pub async fn handle(
// Now execute the query and return the result
//
let result = match payload {
Payload::Single(query) => query_to_json(&client, query, raw_output, array_mode)
.await
.map(|x| (x, HashMap::default())),
Payload::Single(query) => query_to_json(&client, query, raw_output, array_mode).await,
Payload::Batch(queries) => {
let mut results = Vec::new();
let mut builder = client.build_transaction();
if let Some(isolation_level) = txn_isolation_level {
builder = builder.isolation_level(isolation_level);
}
if txn_read_only {
builder = builder.read_only(true);
}
let transaction = builder.start().await?;
let transaction = client.transaction().await?;
for query in queries {
let result = query_to_json(&transaction, query, raw_output, array_mode).await;
match result {
@@ -254,15 +223,7 @@ pub async fn handle(
}
}
transaction.commit().await?;
let mut headers = HashMap::default();
headers.insert(
TXN_READ_ONLY.clone(),
HeaderValue::try_from(txn_read_only.to_string())?,
);
if let Some(txn_isolation_level_raw) = txn_isolation_level_raw {
headers.insert(TXN_ISOLATION_LEVEL.clone(), txn_isolation_level_raw);
}
Ok((json!({ "results": results }), headers))
Ok(json!({ "results": results }))
}
};

View File

@@ -6,7 +6,6 @@ use crate::{
};
use bytes::{Buf, Bytes};
use futures::{Sink, Stream, StreamExt};
use hashbrown::HashMap;
use hyper::{
server::{
accept,
@@ -206,7 +205,7 @@ async fn ws_handler(
Ok(_) => StatusCode::OK,
Err(_) => StatusCode::BAD_REQUEST,
};
let (json, headers) = match result {
let json = match result {
Ok(r) => r,
Err(e) => {
let message = format!("{:?}", e);
@@ -217,10 +216,7 @@ async fn ws_handler(
},
None => Value::Null,
};
(
json!({ "message": message, "code": code }),
HashMap::default(),
)
json!({ "message": message, "code": code })
}
};
json_response(status_code, json).map(|mut r| {
@@ -228,9 +224,6 @@ async fn ws_handler(
"Access-Control-Allow-Origin",
hyper::http::HeaderValue::from_static("*"),
);
for (k, v) in headers {
r.headers_mut().insert(k, v);
}
r
})
} else if request.uri().path() == "/sql" && request.method() == Method::OPTIONS {

View File

@@ -265,23 +265,18 @@ def test_sql_over_http_output_options(static_proxy: NeonProxy):
def test_sql_over_http_batch(static_proxy: NeonProxy):
static_proxy.safe_psql("create role http with login password 'http' superuser")
def qq(queries: List[Tuple[str, Optional[List[Any]]]], read_only: bool = False) -> Any:
def qq(queries: List[Tuple[str, Optional[List[Any]]]]) -> Any:
connstr = f"postgresql://http:http@{static_proxy.domain}:{static_proxy.proxy_port}/postgres"
response = requests.post(
f"https://{static_proxy.domain}:{static_proxy.external_http_port}/sql",
data=json.dumps(list(map(lambda x: {"query": x[0], "params": x[1] or []}, queries))),
headers={
"Content-Type": "application/sql",
"Neon-Connection-String": connstr,
"Neon-Batch-Isolation-Level": "Serializable",
"Neon-Batch-Read-Only": "true" if read_only else "false",
},
headers={"Content-Type": "application/sql", "Neon-Connection-String": connstr},
verify=str(static_proxy.test_output_dir / "proxy.crt"),
)
assert response.status_code == 200
return response.json()["results"], response.headers
return response.json()["results"]
result, headers = qq(
result = qq(
[
("select 42 as answer", None),
("select $1 as answer", [42]),
@@ -296,9 +291,6 @@ def test_sql_over_http_batch(static_proxy: NeonProxy):
]
)
assert headers["Neon-Batch-Isolation-Level"] == "Serializable"
assert headers["Neon-Batch-Read-Only"] == "false"
assert result[0]["rows"] == [{"answer": 42}]
assert result[1]["rows"] == [{"answer": "42"}]
assert result[2]["rows"] == [{"answer": 42}]
@@ -319,14 +311,3 @@ def test_sql_over_http_batch(static_proxy: NeonProxy):
assert res["command"] == "DROP"
assert res["rowCount"] is None
assert len(result) == 10
result, headers = qq(
[
("select 42 as answer", None),
],
True,
)
assert headers["Neon-Batch-Isolation-Level"] == "Serializable"
assert headers["Neon-Batch-Read-Only"] == "true"
assert result[0]["rows"] == [{"answer": 42}]

View File

@@ -1,4 +1,4 @@
{
"postgres-v15": "770c6dffc5ef6aac05bf049693877fb377eea6fc",
"postgres-v14": "da3885c34db312afd555802be2ce985fafd1d8ad"
"postgres-v15": "1220c8a63f00101829f9222a5821fc084b4384c7",
"postgres-v14": "ebedb34d01c8ac9c31e8ea4628b9854103a1dc8f"
}