Compare commits

..

2 Commits

Author SHA1 Message Date
Christian Schwarz
85445cde14 idea: concurrency-limit initial logical size calculation
Before this patch, there was no concurrency limit on initial logical
size computations.

In an experiment with a PS with 20k tenants, 1 timeline each,
all tenants inactive in SKs / not present in storage broker,
all logical size calculations are spawned by MetricsCollection,
i.e., consumption metrics worker.

Before this patch, these timelines would all do their initial logical
size calculation in parallel, leading to extreme thrashing in page cache
and virtual file cache.

With this patch, the virtual file cache thrashing is reduced
signficantly (from 80k `open`-system-calls/second to ~500
`open`-system-calls/second during loading).

This patch uses the existing background tasks semaphore to limit
concurrency, which generally is the right call for background activity.
However, due to logical size's involvement in PageserverFeedback towards
safekeepers, I think we need a priority-boosting mechanism, e.g., if
we're still calculating but walreceiver is actively asking, skip the
semaphore. That's fairly easy to implement, but, want to some feedback
on the general idea first before implementing it.
See also the FIXME in the block comment added in this commit.

NB: when evaluating, keep in mind that consumption metrics worker
persists its interval across restarts; delete the state file on disk
to get predictable (and I believe worst-case in terms of concurrency
during PS restart) behavior.
2023-11-28 15:19:13 +00:00
Shany Pozin
8625466144 Move run_initdb to be async and guarded by max of 8 running tasks. Fixes #5895. Use tenant.cancel for cancellation (#5921)
## Problem
https://github.com/neondatabase/neon/issues/5895
2023-11-28 14:49:31 +00:00
5 changed files with 218 additions and 116 deletions

View File

@@ -19,6 +19,7 @@ use futures::FutureExt;
use pageserver_api::models::TimelineState;
use remote_storage::DownloadError;
use remote_storage::GenericRemoteStorage;
use std::fmt;
use storage_broker::BrokerClientChannel;
use tokio::runtime::Handle;
use tokio::sync::watch;
@@ -31,26 +32,6 @@ use utils::crashsafe::path_with_suffix_extension;
use utils::fs_ext;
use utils::sync::gate::Gate;
use std::cmp::min;
use std::collections::hash_map::Entry;
use std::collections::BTreeSet;
use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt::Debug;
use std::fmt::Display;
use std::fs;
use std::fs::File;
use std::io;
use std::ops::Bound::Included;
use std::process::Command;
use std::process::Stdio;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::MutexGuard;
use std::sync::{Mutex, RwLock};
use std::time::{Duration, Instant};
use self::config::AttachedLocationConfig;
use self::config::AttachmentMode;
use self::config::LocationConf;
@@ -84,14 +65,35 @@ use crate::tenant::remote_timeline_client::MaybeDeletedIndexPart;
use crate::tenant::storage_layer::DeltaLayer;
use crate::tenant::storage_layer::ImageLayer;
use crate::InitializationOrder;
use std::cmp::min;
use std::collections::hash_map::Entry;
use std::collections::BTreeSet;
use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt::Debug;
use std::fmt::Display;
use std::fs;
use std::fs::File;
use std::io;
use std::ops::Bound::Included;
use std::process::Stdio;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::MutexGuard;
use std::sync::{Mutex, RwLock};
use std::time::{Duration, Instant};
use crate::tenant::timeline::delete::DeleteTimelineFlow;
use crate::tenant::timeline::uninit::cleanup_timeline_directory;
use crate::virtual_file::VirtualFile;
use crate::walredo::PostgresRedoManager;
use crate::TEMP_FILE_SUFFIX;
use once_cell::sync::Lazy;
pub use pageserver_api::models::TenantState;
use tokio::sync::Semaphore;
static INIT_DB_SEMAPHORE: Lazy<Semaphore> = Lazy::new(|| Semaphore::new(8));
use toml_edit;
use utils::{
crashsafe,
@@ -403,6 +405,36 @@ pub enum CreateTimelineError {
Other(#[from] anyhow::Error),
}
#[derive(thiserror::Error, Debug)]
enum InitdbError {
Other(anyhow::Error),
Cancelled,
Spawn(std::io::Result<()>),
Failed(std::process::ExitStatus, Vec<u8>),
}
impl fmt::Display for InitdbError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
InitdbError::Cancelled => write!(f, "Operation was cancelled"),
InitdbError::Spawn(e) => write!(f, "Spawn error: {:?}", e),
InitdbError::Failed(status, stderr) => write!(
f,
"Command failed with status {:?}: {}",
status,
String::from_utf8_lossy(stderr)
),
InitdbError::Other(e) => write!(f, "Error: {:?}", e),
}
}
}
impl From<std::io::Error> for InitdbError {
fn from(error: std::io::Error) -> Self {
InitdbError::Spawn(Err(error))
}
}
struct TenantDirectoryScan {
sorted_timelines_to_load: Vec<(TimelineId, TimelineMetadata)>,
timelines_to_resume_deletion: Vec<(TimelineId, Option<TimelineMetadata>)>,
@@ -2922,8 +2954,8 @@ impl Tenant {
format!("Failed to remove already existing initdb directory: {pgdata_path}")
})?;
}
// Init temporarily repo to get bootstrap data, this creates a directory in the `pgdata_path` path
run_initdb(self.conf, &pgdata_path, pg_version)?;
// Init temporarily repo to get bootstrap data, this creates a directory in the `initdb_path` path
run_initdb(self.conf, &pgdata_path, pg_version, &self.cancel).await?;
// this new directory is very temporary, set to remove it immediately after bootstrap, we don't need it
scopeguard::defer! {
if let Err(e) = fs::remove_dir_all(&pgdata_path) {
@@ -3387,42 +3419,54 @@ fn rebase_directory(
/// Create the cluster temporarily in 'initdbpath' directory inside the repository
/// to get bootstrap data for timeline initialization.
fn run_initdb(
async fn run_initdb(
conf: &'static PageServerConf,
initdb_target_dir: &Utf8Path,
pg_version: u32,
) -> anyhow::Result<()> {
let initdb_bin_path = conf.pg_bin_dir(pg_version)?.join("initdb");
let initdb_lib_dir = conf.pg_lib_dir(pg_version)?;
cancel: &CancellationToken,
) -> Result<(), InitdbError> {
let initdb_bin_path = conf
.pg_bin_dir(pg_version)
.map_err(InitdbError::Other)?
.join("initdb");
let initdb_lib_dir = conf.pg_lib_dir(pg_version).map_err(InitdbError::Other)?;
info!(
"running {} in {}, libdir: {}",
initdb_bin_path, initdb_target_dir, initdb_lib_dir,
);
let initdb_output = Command::new(&initdb_bin_path)
let _permit = INIT_DB_SEMAPHORE.acquire().await;
let initdb_command = tokio::process::Command::new(&initdb_bin_path)
.args(["-D", initdb_target_dir.as_ref()])
.args(["-U", &conf.superuser])
.args(["-E", "utf8"])
.arg("--no-instructions")
// This is only used for a temporary installation that is deleted shortly after,
// so no need to fsync it
.arg("--no-sync")
.env_clear()
.env("LD_LIBRARY_PATH", &initdb_lib_dir)
.env("DYLD_LIBRARY_PATH", &initdb_lib_dir)
.stdout(Stdio::null())
.output()
.with_context(|| {
format!(
"failed to execute {} at target dir {}",
initdb_bin_path, initdb_target_dir,
)
})?;
if !initdb_output.status.success() {
bail!(
"initdb failed: '{}'",
String::from_utf8_lossy(&initdb_output.stderr)
);
.stdout(Stdio::piped())
.stderr(Stdio::piped())
// If the `select!` below doesn't finish the `wait_with_output`,
// let the task get `wait()`ed for asynchronously by tokio.
// This means there is a slim chance we can go over the INIT_DB_SEMAPHORE.
// TODO: fix for this is non-trivial, see
// https://github.com/neondatabase/neon/pull/5921#pullrequestreview-1750858021
//
.kill_on_drop(true)
.spawn()?;
tokio::select! {
initdb_output = initdb_command.wait_with_output() => {
let initdb_output = initdb_output?;
if !initdb_output.status.success() {
return Err(InitdbError::Failed(initdb_output.status, initdb_output.stderr));
}
}
_ = cancel.cancelled() => {
return Err(InitdbError::Cancelled);
}
}
Ok(())

View File

@@ -44,6 +44,7 @@ pub(crate) enum BackgroundLoopKind {
Eviction,
ConsumptionMetricsCollectMetrics,
ConsumptionMetricsSyntheticSizeWorker,
InitialLogicalSizeCalculation,
}
impl BackgroundLoopKind {

View File

@@ -1825,6 +1825,29 @@ impl Timeline {
// delay will be terminated by a timeout regardless.
let _completion = { self_clone.initial_logical_size_attempt.lock().expect("unexpected initial_logical_size_attempt poisoned").take() };
// In prod, initial logical size calucalation is spawned either by
// WalReceiverConnectionHandler if the timeline is active according to storage broker,
// or by the first consumption metrics worker (MetricsCollection).
// The latter runs every `metric_collection_interval` and checkpoints to disk, i.e.,
// if pageserver gets restarted, the consumption metrics worker will resume waiting
// for the correct remaining time, as if the pageserver had not been restarted.
//
// FIXME: with the current code, walreceiver requests would also hit this semaphore
// and get queued behind other background operations. That's bad because walreceiver_connection
// will push the not-precise value as `current_timeline_size` in the `PageserverFeedback`
// while this calculation is stuck.
// We need some way to priority-boost the initial size calculation if walreceiver is asking.
// Or, should we maybe revisit the use of logical size in `PageserverFeedback`?
// It seems broken the way it is.
//
// Example query to show different causes of initial size calculation spawning:
//
// https://neonprod.grafana.net/explore?panes=%7B%22wSx%22:%7B%22datasource%22:%22grafanacloud-logs%22,%22queries%22:%5B%7B%22refId%22:%22A%22,%22expr%22:%22sum%20by%20%28task_kind%29%20%28count_over_time%28%7Bneon_service%3D%5C%22pageserver%5C%22,%20neon_region%3D%5C%22us-west-2%5C%22%7D%20%7C%3D%20%60logical%20size%20computation%20from%20context%20of%20task%20kind%60%20%7C%20regexp%20%60logical%20size%20computation%20from%20context%20of%20task%20kind%20%28%3FP%3Ctask_kind%3E.%2A%29%60%20%5B1m%5D%29%29%22,%22queryType%22:%22range%22,%22datasource%22:%7B%22type%22:%22loki%22,%22uid%22:%22grafanacloud-logs%22%7D,%22editorMode%22:%22code%22,%22step%22:%221m%22%7D%5D,%22range%22:%7B%22from%22:%221700637500615%22,%22to%22:%221700639648743%22%7D%7D%7D&schemaVersion=1&orgId=1
let _permit = match crate::tenant::tasks::concurrent_background_tasks_rate_limit(BackgroundLoopKind::InitialLogicalSizeCalculation,&background_ctx, &cancel).await {
Ok(permit) => permit,
Err(RateLimitError::Cancelled) => return Ok(()),
};
let calculated_size = match self_clone
.logical_size_calculation_task(lsn, LogicalSizeCalculationCause::Initial, &background_ctx)
.await

View File

@@ -18,7 +18,6 @@ from datetime import datetime
from functools import cached_property
from itertools import chain, product
from pathlib import Path
from queue import SimpleQueue
from types import TracebackType
from typing import Any, Dict, Iterator, List, Optional, Tuple, Type, cast
from urllib.parse import urlparse
@@ -37,11 +36,8 @@ from _pytest.fixtures import FixtureRequest
from psycopg2.extensions import connection as PgConnection
from psycopg2.extensions import cursor as PgCursor
from psycopg2.extensions import make_dsn, parse_dsn
from pytest_httpserver import HTTPServer
from typing_extensions import Literal
from urllib3.util.retry import Retry
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
from fixtures.broker import NeonBroker
from fixtures.log_helper import log
@@ -1009,68 +1005,6 @@ def neon_env_builder(
yield builder
@pytest.fixture(scope="function")
def neon_env_and_metrics_server(
httpserver: HTTPServer,
neon_env_builder: NeonEnvBuilder,
httpserver_listen_address,
) -> Tuple[NeonEnv, HTTPServer, SimpleQueue[Any]]:
"""
Fixture to create a Neon environment and metrics server.
"""
(host, port) = httpserver_listen_address
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"
# this should be Union[str, Tuple[List[Any], bool]], but it will make unpacking much more verbose
uploads: SimpleQueue[Any] = SimpleQueue()
def metrics_handler(request: Request) -> Response:
if request.json is None:
return Response(status=400)
events = request.json["events"]
is_last = request.headers["pageserver-metrics-last-upload-in-batch"]
assert is_last in ["true", "false"]
uploads.put((events, is_last == "true"))
return Response(status=200)
# Require collecting metrics frequently, since we change
# the timeline and want something to be logged about it.
#
# Disable time-based pitr, we will use the manual GC calls
# to trigger remote storage operations in a controlled way
neon_env_builder.pageserver_config_override = f"""
metric_collection_interval="1s"
metric_collection_endpoint="{metric_collection_endpoint}"
cached_metric_collection_interval="0s"
synthetic_size_calculation_interval="3s"
"""
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
log.info(f"test_metric_collection endpoint is {metric_collection_endpoint}")
# mock http server that returns OK for the metrics
httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler(
metrics_handler
)
# spin up neon, after http server is ready
env = neon_env_builder.init_start(initial_tenant_conf={"pitr_interval": "0 sec"})
# httpserver is shut down before pageserver during passing run
env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*")
# we have a fast rate of calculation, these can happen at shutdown
env.pageserver.allowed_errors.append(
".*synthetic_size_worker:calculate_synthetic_size.*:gather_size_inputs.*: failed to calculate logical size at .*: cancelled.*"
)
env.pageserver.allowed_errors.append(
".*synthetic_size_worker: failed to calculate synthetic size for tenant .*: failed to calculate some logical_sizes"
)
return (env, httpserver, uploads)
@dataclass
class PageserverPort:
pg: int

View File

@@ -3,21 +3,75 @@ import time
from dataclasses import dataclass
from pathlib import Path
from queue import SimpleQueue
from typing import Any, Dict, Set, Tuple
from typing import Any, Dict, Set
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
wait_for_last_flush_lsn,
)
from fixtures.remote_storage import RemoteStorageKind
from fixtures.types import TenantId, TimelineId
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
# TODO: collect all of the env setup *AFTER* removal of RemoteStorageKind.NOOP
def test_metric_collection(
neon_env_and_metrics_server: Tuple[NeonEnv, HTTPServer, SimpleQueue[Any]]
httpserver: HTTPServer,
neon_env_builder: NeonEnvBuilder,
httpserver_listen_address,
):
(env, httpserver, uploads) = neon_env_and_metrics_server
(host, port) = httpserver_listen_address
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"
# this should be Union[str, Tuple[List[Any], bool]], but it will make unpacking much more verbose
uploads: SimpleQueue[Any] = SimpleQueue()
def metrics_handler(request: Request) -> Response:
if request.json is None:
return Response(status=400)
events = request.json["events"]
is_last = request.headers["pageserver-metrics-last-upload-in-batch"]
assert is_last in ["true", "false"]
uploads.put((events, is_last == "true"))
return Response(status=200)
# Require collecting metrics frequently, since we change
# the timeline and want something to be logged about it.
#
# Disable time-based pitr, we will use the manual GC calls
# to trigger remote storage operations in a controlled way
neon_env_builder.pageserver_config_override = f"""
metric_collection_interval="1s"
metric_collection_endpoint="{metric_collection_endpoint}"
cached_metric_collection_interval="0s"
synthetic_size_calculation_interval="3s"
"""
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
log.info(f"test_metric_collection endpoint is {metric_collection_endpoint}")
# mock http server that returns OK for the metrics
httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler(
metrics_handler
)
# spin up neon, after http server is ready
env = neon_env_builder.init_start(initial_tenant_conf={"pitr_interval": "0 sec"})
# httpserver is shut down before pageserver during passing run
env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*")
# we have a fast rate of calculation, these can happen at shutdown
env.pageserver.allowed_errors.append(
".*synthetic_size_worker:calculate_synthetic_size.*:gather_size_inputs.*: failed to calculate logical size at .*: cancelled.*"
)
env.pageserver.allowed_errors.append(
".*synthetic_size_worker: failed to calculate synthetic size for tenant .*: failed to calculate some logical_sizes"
)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
@@ -114,11 +168,59 @@ def test_metric_collection(
def test_metric_collection_cleans_up_tempfile(
neon_env_and_metrics_server: Tuple[NeonEnv, HTTPServer, SimpleQueue[Any]]
httpserver: HTTPServer,
neon_env_builder: NeonEnvBuilder,
httpserver_listen_address,
):
(env, httpserver, uploads) = neon_env_and_metrics_server
(host, port) = httpserver_listen_address
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"
# this should be Union[str, Tuple[List[Any], bool]], but it will make unpacking much more verbose
uploads: SimpleQueue[Any] = SimpleQueue()
def metrics_handler(request: Request) -> Response:
if request.json is None:
return Response(status=400)
events = request.json["events"]
is_last = request.headers["pageserver-metrics-last-upload-in-batch"]
assert is_last in ["true", "false"]
uploads.put((events, is_last == "true"))
return Response(status=200)
# Require collecting metrics frequently, since we change
# the timeline and want something to be logged about it.
#
# Disable time-based pitr, we will use the manual GC calls
# to trigger remote storage operations in a controlled way
neon_env_builder.pageserver_config_override = f"""
metric_collection_interval="1s"
metric_collection_endpoint="{metric_collection_endpoint}"
cached_metric_collection_interval="0s"
synthetic_size_calculation_interval="3s"
"""
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
# mock http server that returns OK for the metrics
httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler(
metrics_handler
)
# spin up neon, after http server is ready
env = neon_env_builder.init_start(initial_tenant_conf={"pitr_interval": "0 sec"})
pageserver_http = env.pageserver.http_client()
# httpserver is shut down before pageserver during passing run
env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*")
# we have a fast rate of calculation, these can happen at shutdown
env.pageserver.allowed_errors.append(
".*synthetic_size_worker:calculate_synthetic_size.*:gather_size_inputs.*: failed to calculate logical size at .*: cancelled.*"
)
env.pageserver.allowed_errors.append(
".*synthetic_size_worker: failed to calculate synthetic size for tenant .*: failed to calculate some logical_sizes"
)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
@@ -188,8 +290,6 @@ def test_metric_collection_cleans_up_tempfile(
), "only initial tempfile should had been removed"
assert initially.other.issuperset(later.other), "no other files should had been removed"
httpserver.check()
@dataclass
class PrefixPartitionedFiles: