Compare commits

...

7 Commits

Author SHA1 Message Date
Christian Schwarz
9655709739 fix: compaction: no index upload scheduled if no on-demand downloads
Commit

    0cf7fd0fb8
    Compaction with on-demand download (#3598)

introduced a subtle bug: if we don't have to do on-demand downloads,
we only take one ROUND in fn compact() and exit early.
Thereby, we miss scheduling the index part upload for any layers
created by fn compact_inner().

Before that commit, we didn't have this problem.
So, this patch fixes it.

Since no regression test caught this, I went ahead and extended the
timeline size tests to assert that, if remote storage is configured,
1. pageserver_remote_physical_size matches the other physical sizes
2. file sizes reported by the layer map info endpoint match the other
   physical size metrics

Without the pageserver code fix, the regression test would
fail at the physical size assertion, complaining that
any of the resident physical size != remote physical size metric
50790400.0 != 18399232.0
I figured out what the problem is by comparing the remote storage
and local directories like so, and noticed that the image layer
in the local directory wasn't present on the remote side.
It's size was exactly the difference
    50790400.0 - 18399232.0  =32391168.0

fixes https://github.com/neondatabase/neon/issues/3738
2023-03-03 15:12:31 +01:00
Christian Schwarz
bd87245abc tests: use parse_metrics everywhere (#3737)
- use parse_metrics() in all places where we parse Prometheus metrics
- query_all: make `filter` argument optional
- encourage using properly parsed, typed metrics by changing get_metrics()
  to return already-parsed metrics. The new get_metric_str() method,
  like in the Safekeeper type, returns the raw text response.
2023-03-03 14:55:09 +01:00
Christian Schwarz
fd73e138b4 timeline_checkpoint_handler: add span with tenant and timeline id
Before this patch, the logs written by freeze_and_flush() and compact()
didn't have any span, which made the test logs annoying to read.
2023-03-03 14:33:47 +01:00
Christian Schwarz
9217e7cce7 gc: only decrement resident size if GC'd layer is resident
Before this patch, GC would call PersistentLayer::delete()
on every GC'ed layer.
RemoteLayer::delete() returned Ok(()) unconditionally.
GC would then proceed by decrementing the resident size metric,
even though the layer is a RemoteLayer.

This patch makes the following changes:
- Rename PersistentLayer::delete() to delete_resident_layer_file().
  That name is unambiguous.
- Make RemoteLayer::delete_resident_layer_file return an Err().
  We would have uncovered this bug if we had done that from the start.
- Change GC / Timeline::delete_historic_layer check whether
  the layer is remote or not, and only call delete_resident_layer_file()
  if it's not remote. This brings us in line with how eviction does it.
- Add a regression test.

fixes https://github.com/neondatabase/neon/issues/3722
2023-03-03 14:33:31 +01:00
Christian Schwarz
d646ddcd07 eviction: add comment explaining resident size decrement on error
https://github.com/neondatabase/neon/issues/3722
2023-03-03 14:33:25 +01:00
Christian Schwarz
c47af7ea9a eviction: remove needless if-let around resident size decrement
The branch was always taken at runtime, so, this should not
constitute a behavioral change.

refs https://github.com/neondatabase/neon/issues/3722
2023-03-03 14:33:20 +01:00
Christian Schwarz
1167aee661 fix checkpoint_timeout serialization in TenantConf
Without this change, when actually setting this conf opt, the tenant
would become Broken next time we load it.
Why?
The serde_toml representation that persist_tenant_conf would write out
would be a TOML inline table of `secs` and `nsecs`.
But our hand-rolled TenantConf parser expects a TOML string.

I checked that all other `Duration` values in TenantConfOpt
use the humantime serialization.

Issues like this would likely be systematically prevent by
https://github.com/neondatabase/neon/issues/3682
2023-03-03 14:33:13 +01:00
19 changed files with 435 additions and 120 deletions

View File

@@ -971,19 +971,22 @@ async fn timeline_checkpoint_handler(request: Request<Body>) -> Result<Response<
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;
async {
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
timeline
.freeze_and_flush()
.await
.map_err(ApiError::InternalServerError)?;
timeline
.compact(&ctx)
.await
.map_err(ApiError::InternalServerError)?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
timeline
.freeze_and_flush()
.await
.map_err(ApiError::InternalServerError)?;
timeline
.compact(&ctx)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
json_response(StatusCode::OK, ())
}
.instrument(info_span!("manual_checkpoint", tenant_id = %tenant_id, timeline_id = %timeline_id))
.await
}
async fn timeline_download_remote_layers_handler_post(

View File

@@ -103,6 +103,7 @@ pub struct TenantConfOpt {
pub checkpoint_distance: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(with = "humantime_serde")]
#[serde(default)]
pub checkpoint_timeout: Option<Duration>,

View File

@@ -364,7 +364,7 @@ pub trait PersistentLayer: Layer {
}
/// Permanently remove this layer from disk.
fn delete(&self) -> Result<()>;
fn delete_resident_layer_file(&self) -> Result<()>;
fn downcast_remote_layer(self: Arc<Self>) -> Option<std::sync::Arc<RemoteLayer>> {
None

View File

@@ -438,7 +438,7 @@ impl PersistentLayer for DeltaLayer {
))
}
fn delete(&self) -> Result<()> {
fn delete_resident_layer_file(&self) -> Result<()> {
// delete underlying file
fs::remove_file(self.path())?;
Ok(())

View File

@@ -252,7 +252,7 @@ impl PersistentLayer for ImageLayer {
unimplemented!();
}
fn delete(&self) -> Result<()> {
fn delete_resident_layer_file(&self) -> Result<()> {
// delete underlying file
fs::remove_file(self.path())?;
Ok(())

View File

@@ -155,8 +155,8 @@ impl PersistentLayer for RemoteLayer {
bail!("cannot iterate a remote layer");
}
fn delete(&self) -> Result<()> {
Ok(())
fn delete_resident_layer_file(&self) -> Result<()> {
bail!("remote layer has no layer file");
}
fn downcast_remote_layer<'a>(self: Arc<Self>) -> Option<std::sync::Arc<RemoteLayer>> {

View File

@@ -662,8 +662,8 @@ impl Timeline {
// update the index file on next flush iteration too. But it
// could take a while until that happens.
//
// Additionally, only do this on the terminal round before sleeping.
if last_round {
// Additionally, only do this once before we return from this function.
if last_round || res.is_ok() {
if let Some(remote_client) = &self.remote_client {
remote_client.schedule_index_upload_for_file_changes()?;
}
@@ -1047,11 +1047,12 @@ impl Timeline {
return Ok(false);
}
let layer_metadata = LayerFileMetadata::new(
local_layer
.file_size()
.expect("Local layer should have a file size"),
);
let layer_file_size = local_layer
.file_size()
.expect("Local layer should have a file size");
let layer_metadata = LayerFileMetadata::new(layer_file_size);
let new_remote_layer = Arc::new(match local_layer.filename() {
LayerFileName::Image(image_name) => RemoteLayer::new_img(
self.tenant_id,
@@ -1075,15 +1076,22 @@ impl Timeline {
let replaced = match batch_updates.replace_historic(local_layer, new_remote_layer)? {
Replacement::Replaced { .. } => {
let layer_size = local_layer.file_size();
if let Err(e) = local_layer.delete() {
if let Err(e) = local_layer.delete_resident_layer_file() {
error!("failed to remove layer file on evict after replacement: {e:#?}");
}
if let Some(layer_size) = layer_size {
self.metrics.resident_physical_size_gauge.sub(layer_size);
}
// Always decrement the physical size gauge, even if we failed to delete the file.
// Rationale: we already replaced the layer with a remote layer in the layer map,
// and any subsequent download_remote_layer will
// 1. overwrite the file on disk and
// 2. add the downloaded size to the resident size gauge.
//
// If there is no re-download, and we restart the pageserver, then load_layer_map
// will treat the file as a local layer again, count it towards resident size,
// and it'll be like the layer removal never happened.
// The bump in resident size is perhaps unexpected but overall a robust behavior.
self.metrics
.resident_physical_size_gauge
.sub(layer_file_size);
true
}
@@ -1942,11 +1950,14 @@ impl Timeline {
layer: Arc<dyn PersistentLayer>,
updates: &mut BatchedUpdates<'_, dyn PersistentLayer>,
) -> anyhow::Result<()> {
let layer_size = layer.file_size();
layer.delete()?;
if let Some(layer_size) = layer_size {
self.metrics.resident_physical_size_gauge.sub(layer_size);
if !layer.is_remote_layer() {
layer.delete_resident_layer_file()?;
let layer_file_size = layer
.file_size()
.expect("Local layer should have a file size");
self.metrics
.resident_physical_size_gauge
.sub(layer_file_size);
}
// TODO Removing from the bottom of the layer map is expensive.

View File

@@ -366,17 +366,9 @@ class NeonBenchmarker:
def get_int_counter_value(self, pageserver: NeonPageserver, metric_name: str) -> int:
"""Fetch the value of given int counter from pageserver metrics."""
# TODO: If we start to collect more of the prometheus metrics in the
# performance test suite like this, we should refactor this to load and
# parse all the metrics into a more convenient structure in one go.
#
# The metric should be an integer, as it's a number of bytes. But in general
# all prometheus metrics are floats. So to be pedantic, read it as a float
# and round to integer.
all_metrics = pageserver.http_client().get_metrics()
matches = re.search(rf"^{metric_name} (\S+)$", all_metrics, re.MULTILINE)
assert matches, f"metric {metric_name} not found"
return int(round(float(matches.group(1))))
sample = all_metrics.query_one(metric_name)
return int(round(sample.value))
def get_timeline_size(
self, repo_dir: Path, tenant_id: TenantId, timeline_id: TimelineId

View File

@@ -13,7 +13,8 @@ class Metrics:
self.metrics = defaultdict(list)
self.name = name
def query_all(self, name: str, filter: Dict[str, str]) -> List[Sample]:
def query_all(self, name: str, filter: Optional[Dict[str, str]] = None) -> List[Sample]:
filter = filter or {}
res = []
for sample in self.metrics[name]:
try:

View File

@@ -14,6 +14,7 @@ import tempfile
import textwrap
import time
import uuid
from collections import defaultdict
from contextlib import closing, contextmanager
from dataclasses import dataclass, field
from enum import Flag, auto
@@ -28,7 +29,6 @@ import asyncpg
import backoff # type: ignore
import boto3
import jwt
import prometheus_client
import psycopg2
import pytest
import requests
@@ -36,7 +36,7 @@ from _pytest.config import Config
from _pytest.config.argparsing import Parser
from _pytest.fixtures import FixtureRequest
from fixtures.log_helper import log
from fixtures.metrics import parse_metrics
from fixtures.metrics import Metrics, parse_metrics
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import (
ATTACHMENT_NAME_REGEX,
@@ -45,7 +45,6 @@ from fixtures.utils import (
get_self_dir,
subprocess_capture,
)
from prometheus_client.parser import text_string_to_metric_families
# Type-related stuff
from psycopg2.extensions import connection as PgConnection
@@ -1436,22 +1435,27 @@ class PageserverHttpClient(requests.Session):
assert completed["successful_download_count"] > 0
return completed
def get_metrics(self) -> str:
def get_metrics_str(self) -> str:
"""You probably want to use get_metrics() instead."""
res = self.get(f"http://localhost:{self.port}/metrics")
self.verbose_error(res)
return res.text
def get_timeline_metric(self, tenant_id: TenantId, timeline_id: TimelineId, metric_name: str):
raw = self.get_metrics()
family: List[prometheus_client.Metric] = list(text_string_to_metric_families(raw))
[metric] = [m for m in family if m.name == metric_name]
[sample] = [
s
for s in metric.samples
if s.labels["tenant_id"] == str(tenant_id)
and s.labels["timeline_id"] == str(timeline_id)
]
return sample.value
def get_metrics(self) -> Metrics:
res = self.get_metrics_str()
return parse_metrics(res)
def get_timeline_metric(
self, tenant_id: TenantId, timeline_id: TimelineId, metric_name: str
) -> float:
metrics = self.get_metrics()
return metrics.query_one(
metric_name,
filter={
"tenant_id": str(tenant_id),
"timeline_id": str(timeline_id),
},
).value
def get_remote_timeline_client_metric(
self,
@@ -1461,7 +1465,7 @@ class PageserverHttpClient(requests.Session):
file_kind: str,
op_kind: str,
) -> Optional[float]:
metrics = parse_metrics(self.get_metrics(), "pageserver")
metrics = self.get_metrics()
matches = metrics.query_all(
name=metric_name,
filter={
@@ -1480,14 +1484,16 @@ class PageserverHttpClient(requests.Session):
assert len(matches) < 2, "above filter should uniquely identify metric"
return value
def get_metric_value(self, name: str) -> Optional[str]:
def get_metric_value(
self, name: str, filter: Optional[Dict[str, str]] = None
) -> Optional[float]:
metrics = self.get_metrics()
relevant = [line for line in metrics.splitlines() if line.startswith(name)]
if len(relevant) == 0:
results = metrics.query_all(name, filter=filter)
if not results:
log.info(f'could not find metric "{name}"')
return None
assert len(relevant) == 1
return relevant[0].lstrip(name).strip()
assert len(results) == 1, f"metric {name} with given filters is not unique, got: {results}"
return results[0].value
def layer_map_info(
self,
@@ -1516,6 +1522,11 @@ class PageserverHttpClient(requests.Session):
assert res.status_code == 200
def evict_all_layers(self, tenant_id: TenantId, timeline_id: TimelineId):
info = self.layer_map_info(tenant_id, timeline_id)
for layer in info.historic_layers:
self.evict_layer(tenant_id, timeline_id, layer.layer_file_name)
@dataclass
class TenantConfig:
@@ -1551,6 +1562,14 @@ class LayerMapInfo:
return info
def kind_count(self) -> Dict[str, int]:
counts: Dict[str, int] = defaultdict(int)
for inmem_layer in self.in_memory_layers:
counts[inmem_layer.kind] += 1
for hist_layer in self.historic_layers:
counts[hist_layer.kind] += 1
return counts
@dataclass
class InMemoryLayerInfo:
@@ -1567,7 +1586,7 @@ class InMemoryLayerInfo:
)
@dataclass
@dataclass(frozen=True)
class HistoricLayerInfo:
kind: str
layer_file_name: str
@@ -3516,3 +3535,23 @@ def wait_for_sk_commit_lsn_to_reach_remote_storage(
ps_http.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload(ps_http, tenant_id, timeline_id, lsn)
return lsn
def wait_for_upload_queue_empty(
pageserver: NeonPageserver, tenant_id: TenantId, timeline_id: TimelineId
):
ps_http = pageserver.http_client()
while True:
all_metrics = ps_http.get_metrics()
tl = all_metrics.query_all(
"pageserver_remote_timeline_client_calls_unfinished",
{
"tenant_id": str(tenant_id),
"timeline_id": str(timeline_id),
},
)
assert len(tl) > 0
log.info(f"upload queue for {tenant_id}/{timeline_id}: {tl}")
if all(m.value == 0 for m in tl):
return
time.sleep(0.2)

View File

@@ -8,7 +8,7 @@ def test_build_info_metric(neon_env_builder: NeonEnvBuilder, link_proxy: NeonPro
parsed_metrics = {}
parsed_metrics["pageserver"] = parse_metrics(env.pageserver.http_client().get_metrics())
parsed_metrics["pageserver"] = parse_metrics(env.pageserver.http_client().get_metrics_str())
parsed_metrics["safekeeper"] = parse_metrics(env.safekeepers[0].http_client().get_metrics_str())
parsed_metrics["proxy"] = parse_metrics(link_proxy.get_metrics())

View File

@@ -4,7 +4,6 @@ import random
import pytest
from fixtures.log_helper import log
from fixtures.metrics import parse_metrics
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
@@ -134,7 +133,7 @@ def test_gc_index_upload(neon_env_builder: NeonEnvBuilder, remote_storage_kind:
# Helper function that gets the number of given kind of remote ops from the metrics
def get_num_remote_ops(file_kind: str, op_kind: str) -> int:
ps_metrics = parse_metrics(env.pageserver.http_client().get_metrics(), "pageserver")
ps_metrics = env.pageserver.http_client().get_metrics()
total = 0.0
for sample in ps_metrics.query_all(
name="pageserver_remote_operation_seconds_count",

View File

@@ -1,8 +1,13 @@
import time
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
RemoteStorageKind,
wait_for_last_flush_lsn,
wait_for_last_record_lsn,
wait_for_sk_commit_lsn_to_reach_remote_storage,
wait_for_upload,
)
from fixtures.types import Lsn, TenantId, TimelineId
@@ -138,3 +143,160 @@ def test_basic_eviction(
assert (
redownloaded_layer_map_info == initial_layer_map_info
), "Should have the same layer map after redownloading the evicted layers"
def test_gc_of_remote_layers(neon_env_builder: NeonEnvBuilder):
neon_env_builder.enable_remote_storage(
remote_storage_kind=RemoteStorageKind.LOCAL_FS,
test_name="test_gc_of_remote_layers",
)
env = neon_env_builder.init_start()
tenant_config = {
"pitr_interval": "1s", # set to non-zero, so GC actually does something
"gc_period": "0s", # we want to control when GC runs
"compaction_period": "0s", # we want to control when compaction runs
"checkpoint_timeout": "24h", # something we won't reach
"checkpoint_distance": f"{50 * (1024**2)}", # something we won't reach, we checkpoint manually
"compaction_threshold": "3",
# "image_creation_threshold": set at runtime
"compaction_target_size": f"{128 * (1024**2)}", # make it so that we only have 1 partition => image coverage for delta layers => enables gc of delta layers
}
def tenant_update_config(changes):
tenant_config.update(changes)
env.neon_cli.config_tenant(tenant_id, tenant_config)
tenant_id, timeline_id = env.neon_cli.create_tenant(conf=tenant_config)
log.info("tenant id is %s", tenant_id)
env.initial_tenant = tenant_id # update_and_gc relies on this
ps_http = env.pageserver.http_client()
pg = env.postgres.create_start("main")
log.info("fill with data, creating delta & image layers, some of which are GC'able after")
# no particular reason to create the layers like this, but we are sure
# not to hit the image_creation_threshold here.
with pg.cursor() as cur:
cur.execute("create table a (id bigserial primary key, some_value bigint not null)")
cur.execute("insert into a(some_value) select i from generate_series(1, 10000) s(i)")
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
ps_http.timeline_checkpoint(tenant_id, timeline_id)
# Create delta layers, then turn them into image layers.
# Do it multiple times so that there's something to GC.
for k in range(0, 2):
# produce delta layers => disable image layer creation by setting high threshold
tenant_update_config({"image_creation_threshold": "100"})
for i in range(0, 2):
for j in range(0, 3):
# create a minimal amount of "delta difficulty" for this table
with pg.cursor() as cur:
cur.execute("update a set some_value = -some_value + %s", (j,))
with pg.cursor() as cur:
# vacuuming should aid to reuse keys, though it's not really important
# with image_creation_threshold=1 which we will use on the last compaction
cur.execute("vacuum")
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
if i == 1 and j == 2 and k == 1:
# last iteration; stop before checkpoint to avoid leaving an inmemory layer
pg.stop_and_destroy()
ps_http.timeline_checkpoint(tenant_id, timeline_id)
# images should not yet be created, because threshold is too high,
# but these will be reshuffled to L1 layers
ps_http.timeline_compact(tenant_id, timeline_id)
for _ in range(0, 20):
# loop in case flushing is still in progress
layers = ps_http.layer_map_info(tenant_id, timeline_id)
if not layers.in_memory_layers:
break
time.sleep(0.2)
# now that we've grown some delta layers, turn them into image layers
tenant_update_config({"image_creation_threshold": "1"})
ps_http.timeline_compact(tenant_id, timeline_id)
# wait for all uploads to finish
wait_for_sk_commit_lsn_to_reach_remote_storage(
tenant_id, timeline_id, env.safekeepers, env.pageserver
)
# shutdown safekeepers to avoid on-demand downloads from walreceiver
for sk in env.safekeepers:
sk.stop()
ps_http.timeline_checkpoint(tenant_id, timeline_id)
log.info("ensure the code above produced image and delta layers")
pre_evict_info = ps_http.layer_map_info(tenant_id, timeline_id)
log.info("layer map dump: %s", pre_evict_info)
by_kind = pre_evict_info.kind_count()
log.info("by kind: %s", by_kind)
assert by_kind["Image"] > 0
assert by_kind["Delta"] > 0
assert by_kind["InMemory"] == 0
resident_layers = list(env.timeline_dir(tenant_id, timeline_id).glob("*-*_*"))
log.info("resident layers count before eviction: %s", len(resident_layers))
log.info("evict all layers")
ps_http.evict_all_layers(tenant_id, timeline_id)
def ensure_resident_and_remote_size_metrics():
log.info("ensure that all the layers are gone")
resident_layers = list(env.timeline_dir(tenant_id, timeline_id).glob("*-*_*"))
# we have disabled all background loops, so, this should hold
assert len(resident_layers) == 0
info = ps_http.layer_map_info(tenant_id, timeline_id)
log.info("layer map dump: %s", info)
log.info("ensure that resident_physical_size metric is zero")
resident_physical_size_metric = ps_http.get_timeline_metric(
tenant_id, timeline_id, "pageserver_resident_physical_size"
)
assert resident_physical_size_metric == 0
log.info("ensure that resident_physical_size metric corresponds to layer map dump")
assert resident_physical_size_metric == sum(
[layer.layer_file_size or 0 for layer in info.historic_layers if not layer.remote]
)
log.info("ensure that remote_physical_size metric matches layer map")
remote_physical_size_metric = ps_http.get_timeline_metric(
tenant_id, timeline_id, "pageserver_remote_physical_size"
)
log.info("ensure that remote_physical_size metric corresponds to layer map dump")
assert remote_physical_size_metric == sum(
layer.layer_file_size or 0 for layer in info.historic_layers if layer.remote
)
log.info("before runnning GC, ensure that remote_physical size is zero")
ensure_resident_and_remote_size_metrics()
log.info("run GC")
time.sleep(2) # let pitr_interval + 1 second pass
ps_http.timeline_gc(tenant_id, timeline_id, 0)
time.sleep(1)
assert not env.pageserver.log_contains("Nothing to GC")
log.info("ensure GC deleted some layers, otherwise this test is pointless")
post_gc_info = ps_http.layer_map_info(tenant_id, timeline_id)
log.info("layer map dump: %s", post_gc_info)
log.info("by kind: %s", post_gc_info.kind_count())
pre_evict_layers = set([layer.layer_file_name for layer in pre_evict_info.historic_layers])
post_gc_layers = set([layer.layer_file_name for layer in post_gc_info.historic_layers])
assert post_gc_layers.issubset(pre_evict_layers)
assert len(post_gc_layers) < len(pre_evict_layers)
log.info("update_gc_info might download some layers. Evict them again.")
ps_http.evict_all_layers(tenant_id, timeline_id)
log.info("after running GC, ensure that resident size is still zero")
ensure_resident_and_remote_size_metrics()

View File

@@ -9,7 +9,6 @@ from typing import Iterator
import pytest
from fixtures.log_helper import log
from fixtures.metrics import parse_metrics
from fixtures.neon_fixtures import (
PSQL,
NeonEnvBuilder,
@@ -143,7 +142,7 @@ def test_metric_collection(
# Helper function that gets the number of given kind of remote ops from the metrics
def get_num_remote_ops(file_kind: str, op_kind: str) -> int:
ps_metrics = parse_metrics(env.pageserver.http_client().get_metrics(), "pageserver")
ps_metrics = env.pageserver.http_client().get_metrics()
total = 0.0
for sample in ps_metrics.query_all(
name="pageserver_remote_operation_seconds_count",

View File

@@ -11,6 +11,7 @@ from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
PageserverApiException,
PageserverHttpClient,
RemoteStorageKind,
assert_tenant_status,
available_remote_storages,
@@ -25,9 +26,16 @@ from fixtures.types import Lsn
from fixtures.utils import query_scalar
def get_num_downloaded_layers(client, tenant_id, timeline_id):
def get_num_downloaded_layers(client: PageserverHttpClient, tenant_id, timeline_id):
value = client.get_metric_value(
f'pageserver_remote_operation_seconds_count{{file_kind="layer",op_kind="download",status="success",tenant_id="{tenant_id}",timeline_id="{timeline_id}"}}'
"pageserver_remote_operation_seconds_count",
{
"file_kind": "layer",
"op_kind": "download",
"status": "success",
"tenant_id": tenant_id,
"timeline_id": timeline_id,
},
)
if value is None:
return 0

View File

@@ -6,7 +6,6 @@ from threading import Thread
import asyncpg
import pytest
from fixtures.log_helper import log
from fixtures.metrics import parse_metrics
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
@@ -79,7 +78,7 @@ def test_tenant_reattach(
".*failed to perform remote task UploadMetadata.*, will retry.*"
)
ps_metrics = parse_metrics(pageserver_http.get_metrics(), "pageserver")
ps_metrics = pageserver_http.get_metrics()
tenant_metric_filter = {
"tenant_id": str(tenant_id),
"timeline_id": str(timeline_id),
@@ -93,7 +92,7 @@ def test_tenant_reattach(
time.sleep(1) # for metrics propagation
ps_metrics = parse_metrics(pageserver_http.get_metrics(), "pageserver")
ps_metrics = pageserver_http.get_metrics()
pageserver_last_record_lsn = int(
ps_metrics.query_one("pageserver_last_record_lsn", filter=tenant_metric_filter).value
)

View File

@@ -50,16 +50,22 @@ def test_tenant_tasks(neon_env_builder: NeonEnvBuilder):
wait_until(10, 0.2, lambda: assert_active(tenant_id))
# Assert that all tasks finish quickly after tenant is detached
task_starts = client.get_metric_value('pageserver_tenant_task_events{event="start"}')
task_starts = client.get_metric_value("pageserver_tenant_task_events_total", {"event": "start"})
assert task_starts is not None
assert int(task_starts) > 0
client.tenant_detach(tenant)
client.tenant_detach(env.initial_tenant)
def assert_tasks_finish():
tasks_started = client.get_metric_value('pageserver_tenant_task_events{event="start"}')
tasks_ended = client.get_metric_value('pageserver_tenant_task_events{event="stop"}')
tasks_panicked = client.get_metric_value('pageserver_tenant_task_events{event="panic"}')
tasks_started = client.get_metric_value(
"pageserver_tenant_task_events_total", {"event": "start"}
)
tasks_ended = client.get_metric_value(
"pageserver_tenant_task_events_total", {"event": "stop"}
)
tasks_panicked = client.get_metric_value(
"pageserver_tenant_task_events_total", {"event": "panic"}
)
log.info(f"started {tasks_started}, ended {tasks_ended}, panicked {tasks_panicked}")
assert tasks_started == tasks_ended
assert tasks_panicked is None or int(tasks_panicked) == 0

View File

@@ -107,7 +107,7 @@ def test_metrics_normal_work(neon_env_builder: NeonEnvBuilder):
assert cur.fetchone() == (5000050000,)
collected_metrics = {
"pageserver": env.pageserver.http_client().get_metrics(),
"pageserver": env.pageserver.http_client().get_metrics_str(),
}
for sk in env.safekeepers:
collected_metrics[f"safekeeper{sk.id}"] = sk.http_client().get_metrics_str()
@@ -207,7 +207,7 @@ def test_pageserver_metrics_removed_after_detach(
assert cur.fetchone() == (5000050000,)
def get_ps_metric_samples_for_tenant(tenant_id: TenantId) -> List[Sample]:
ps_metrics = parse_metrics(env.pageserver.http_client().get_metrics(), "pageserver")
ps_metrics = env.pageserver.http_client().get_metrics()
samples = []
for metric_name in ps_metrics.metrics:
for sample in ps_metrics.query_all(
@@ -307,7 +307,7 @@ def test_pageserver_with_empty_tenants(
time.sleep(1) # to allow metrics propagation
ps_metrics = parse_metrics(client.get_metrics(), "pageserver")
ps_metrics = client.get_metrics()
broken_tenants_metric_filter = {
"tenant_id": str(tenant_without_timelines_dir),
"state": "broken",

View File

@@ -1,11 +1,11 @@
import math
import queue
import random
import re
import threading
import time
from contextlib import closing
from pathlib import Path
from typing import Optional
import psycopg2.errors
import psycopg2.extras
@@ -19,9 +19,11 @@ from fixtures.neon_fixtures import (
PgBin,
PortDistributor,
Postgres,
RemoteStorageKind,
VanillaPostgres,
assert_tenant_status,
wait_for_last_flush_lsn,
wait_for_upload_queue_empty,
wait_until,
)
from fixtures.types import TenantId, TimelineId
@@ -302,8 +304,18 @@ def test_timeline_initial_logical_size_calculation_cancellation(
# message emitted by the code behind failpoint "timeline-calculate-logical-size-check-dir-exists"
def test_timeline_physical_size_init(neon_simple_env: NeonEnv):
env = neon_simple_env
@pytest.mark.parametrize("remote_storage_kind", [None, RemoteStorageKind.LOCAL_FS])
def test_timeline_physical_size_init(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: Optional[RemoteStorageKind]
):
if remote_storage_kind is not None:
neon_env_builder.enable_remote_storage(
remote_storage_kind, "test_timeline_physical_size_init"
)
env = neon_env_builder.init_start()
new_timeline_id = env.neon_cli.create_branch("test_timeline_physical_size_init")
pg = env.postgres.create_start("test_timeline_physical_size_init")
@@ -331,12 +343,22 @@ def test_timeline_physical_size_init(neon_simple_env: NeonEnv):
)
assert_physical_size_invariants(
get_physical_size_values(env, env.initial_tenant, new_timeline_id)
get_physical_size_values(env, env.initial_tenant, new_timeline_id, remote_storage_kind),
remote_storage_kind,
)
def test_timeline_physical_size_post_checkpoint(neon_simple_env: NeonEnv):
env = neon_simple_env
@pytest.mark.parametrize("remote_storage_kind", [None, RemoteStorageKind.LOCAL_FS])
def test_timeline_physical_size_post_checkpoint(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: Optional[RemoteStorageKind]
):
if remote_storage_kind is not None:
neon_env_builder.enable_remote_storage(
remote_storage_kind, "test_timeline_physical_size_init"
)
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
new_timeline_id = env.neon_cli.create_branch("test_timeline_physical_size_post_checkpoint")
pg = env.postgres.create_start("test_timeline_physical_size_post_checkpoint")
@@ -354,11 +376,21 @@ def test_timeline_physical_size_post_checkpoint(neon_simple_env: NeonEnv):
pageserver_http.timeline_checkpoint(env.initial_tenant, new_timeline_id)
assert_physical_size_invariants(
get_physical_size_values(env, env.initial_tenant, new_timeline_id)
get_physical_size_values(env, env.initial_tenant, new_timeline_id, remote_storage_kind),
remote_storage_kind,
)
def test_timeline_physical_size_post_compaction(neon_env_builder: NeonEnvBuilder):
@pytest.mark.parametrize("remote_storage_kind", [None, RemoteStorageKind.LOCAL_FS])
def test_timeline_physical_size_post_compaction(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: Optional[RemoteStorageKind]
):
if remote_storage_kind is not None:
neon_env_builder.enable_remote_storage(
remote_storage_kind, "test_timeline_physical_size_init"
)
# Disable background compaction as we don't want it to happen after `get_physical_size` request
# and before checking the expected size on disk, which makes the assertion failed
neon_env_builder.pageserver_config_override = (
@@ -387,15 +419,33 @@ def test_timeline_physical_size_post_compaction(neon_env_builder: NeonEnvBuilder
)
wait_for_last_flush_lsn(env, pg, env.initial_tenant, new_timeline_id)
# shutdown safekeepers to prevent new data from coming in
for sk in env.safekeepers:
sk.stop()
pageserver_http.timeline_checkpoint(env.initial_tenant, new_timeline_id)
pageserver_http.timeline_compact(env.initial_tenant, new_timeline_id)
if remote_storage_kind is not None:
wait_for_upload_queue_empty(env.pageserver, env.initial_tenant, new_timeline_id)
assert_physical_size_invariants(
get_physical_size_values(env, env.initial_tenant, new_timeline_id)
get_physical_size_values(env, env.initial_tenant, new_timeline_id, remote_storage_kind),
remote_storage_kind,
)
def test_timeline_physical_size_post_gc(neon_env_builder: NeonEnvBuilder):
@pytest.mark.parametrize("remote_storage_kind", [None, RemoteStorageKind.LOCAL_FS])
def test_timeline_physical_size_post_gc(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: Optional[RemoteStorageKind]
):
if remote_storage_kind is not None:
neon_env_builder.enable_remote_storage(
remote_storage_kind, "test_timeline_physical_size_init"
)
# Disable background compaction and GC as we don't want it to happen after `get_physical_size` request
# and before checking the expected size on disk, which makes the assertion failed
neon_env_builder.pageserver_config_override = "tenant_config={checkpoint_distance=100000, compaction_period='0s', gc_period='0s', pitr_interval='1s'}"
@@ -431,8 +481,12 @@ def test_timeline_physical_size_post_gc(neon_env_builder: NeonEnvBuilder):
pageserver_http.timeline_checkpoint(env.initial_tenant, new_timeline_id)
pageserver_http.timeline_gc(env.initial_tenant, new_timeline_id, gc_horizon=None)
if remote_storage_kind is not None:
wait_for_upload_queue_empty(env.pageserver, env.initial_tenant, new_timeline_id)
assert_physical_size_invariants(
get_physical_size_values(env, env.initial_tenant, new_timeline_id)
get_physical_size_values(env, env.initial_tenant, new_timeline_id, remote_storage_kind),
remote_storage_kind,
)
@@ -465,26 +519,26 @@ def test_timeline_size_metrics(
# get the metrics and parse the metric for the current timeline's physical size
metrics = env.pageserver.http_client().get_metrics()
matches = re.search(
f'^pageserver_resident_physical_size{{tenant_id="{env.initial_tenant}",timeline_id="{new_timeline_id}"}} (\\S+)$',
metrics,
re.MULTILINE,
)
assert matches
tl_physical_size_metric = int(matches.group(1))
tl_physical_size_metric = metrics.query_one(
name="pageserver_resident_physical_size",
filter={
"tenant_id": str(env.initial_tenant),
"timeline_id": str(new_timeline_id),
},
).value
# assert that the physical size metric matches the actual physical size on disk
timeline_path = env.timeline_dir(env.initial_tenant, new_timeline_id)
assert tl_physical_size_metric == get_timeline_dir_size(timeline_path)
# Check that the logical size metric is sane, and matches
matches = re.search(
f'^pageserver_current_logical_size{{tenant_id="{env.initial_tenant}",timeline_id="{new_timeline_id}"}} (\\S+)$',
metrics,
re.MULTILINE,
)
assert matches
tl_logical_size_metric = int(matches.group(1))
tl_logical_size_metric = metrics.query_one(
name="pageserver_current_logical_size",
filter={
"tenant_id": str(env.initial_tenant),
"timeline_id": str(new_timeline_id),
},
).value
pgdatadir = test_output_dir / "pgdata-vanilla"
pg_bin = PgBin(test_output_dir, pg_distrib_dir, pg_version)
@@ -516,18 +570,29 @@ def test_timeline_size_metrics(
assert math.isclose(dbsize_sum, tl_logical_size_metric, abs_tol=2 * 1024 * 1024)
def test_tenant_physical_size(neon_simple_env: NeonEnv):
@pytest.mark.parametrize("remote_storage_kind", [None, RemoteStorageKind.LOCAL_FS])
def test_tenant_physical_size(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: Optional[RemoteStorageKind]
):
random.seed(100)
env = neon_simple_env
if remote_storage_kind is not None:
neon_env_builder.enable_remote_storage(
remote_storage_kind, "test_timeline_physical_size_init"
)
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
client = env.pageserver.http_client()
tenant, timeline = env.neon_cli.create_tenant()
if remote_storage_kind is not None:
wait_for_upload_queue_empty(env.pageserver, tenant, timeline)
def get_timeline_resident_physical_size(timeline: TimelineId):
sizes = get_physical_size_values(env, tenant, timeline)
assert_physical_size_invariants(sizes)
sizes = get_physical_size_values(env, tenant, timeline, remote_storage_kind)
assert_physical_size_invariants(sizes, remote_storage_kind)
return sizes.prometheus_resident_physical
timeline_total_resident_physical_size = get_timeline_resident_physical_size(timeline)
@@ -547,6 +612,9 @@ def test_tenant_physical_size(neon_simple_env: NeonEnv):
wait_for_last_flush_lsn(env, pg, tenant, timeline)
pageserver_http.timeline_checkpoint(tenant, timeline)
if remote_storage_kind is not None:
wait_for_upload_queue_empty(env.pageserver, tenant, timeline)
timeline_total_resident_physical_size += get_timeline_resident_physical_size(timeline)
pg.stop()
@@ -564,21 +632,39 @@ def test_tenant_physical_size(neon_simple_env: NeonEnv):
class TimelinePhysicalSizeValues:
api_current_physical: int
prometheus_resident_physical: int
prometheus_resident_physical: float
prometheus_remote_physical: Optional[float] = None
python_timelinedir_layerfiles_physical: int
layer_map_file_size_sum: int
def get_physical_size_values(
env: NeonEnv, tenant_id: TenantId, timeline_id: TimelineId
env: NeonEnv,
tenant_id: TenantId,
timeline_id: TimelineId,
remote_storage_kind: Optional[RemoteStorageKind],
) -> TimelinePhysicalSizeValues:
res = TimelinePhysicalSizeValues()
client = env.pageserver.http_client()
res.prometheus_resident_physical = client.get_timeline_metric(
tenant_id, timeline_id, "pageserver_resident_physical_size"
res.layer_map_file_size_sum = sum(
layer.layer_file_size or 0
for layer in client.layer_map_info(tenant_id, timeline_id).historic_layers
)
metrics = client.get_metrics()
metrics_filter = {"tenant_id": str(tenant_id), "timeline_id": str(timeline_id)}
res.prometheus_resident_physical = metrics.query_one(
"pageserver_resident_physical_size", metrics_filter
).value
if remote_storage_kind is not None:
res.prometheus_remote_physical = metrics.query_one(
"pageserver_remote_physical_size", metrics_filter
).value
else:
res.prometheus_remote_physical = None
detail = client.timeline_detail(
tenant_id, timeline_id, include_timeline_dir_layer_file_size_sum=True
)
@@ -590,11 +676,20 @@ def get_physical_size_values(
return res
def assert_physical_size_invariants(sizes: TimelinePhysicalSizeValues):
def assert_physical_size_invariants(
sizes: TimelinePhysicalSizeValues, remote_storage_kind: Optional[RemoteStorageKind]
):
# resident phyiscal size is defined as
assert sizes.python_timelinedir_layerfiles_physical == sizes.prometheus_resident_physical
assert sizes.python_timelinedir_layerfiles_physical == sizes.layer_map_file_size_sum
# we don't do layer eviction, so, all layers are resident
assert sizes.api_current_physical == sizes.prometheus_resident_physical
if remote_storage_kind is not None:
assert sizes.prometheus_resident_physical == sizes.prometheus_remote_physical
# XXX would be nice to assert layer file physical storage utilization here as well, but we can only do that for LocalFS
else:
assert sizes.prometheus_remote_physical is None
# Timeline logical size initialization is an asynchronous background task that runs once,