pageserver: implement auto-splitting (#7681)

## Problem

Currently tenants are only split into multiple shards if a human being
calls the API to do it.

Issue: #7388 

## Summary of changes

- Add a pageserver API for returning the top tenants by size
- Add a step to the controller's background loop where if there is no
reconciliation or optimization to be done, it looks for things to split.
- Add a test that runs pgbench on many tenants concurrently, and checks
that splitting happens as expected as tenants grow, without interrupting
the client I/O.

This PR is quite basic: there is a tasklist in
https://github.com/neondatabase/neon/issues/7388 for further work. This
PR is meant to be safe (off by default), and sufficient to enable our
staging environment to run lots of sharded tenants without a human
having to set them up.
This commit is contained in:
John Spray
2024-05-17 17:01:24 +01:00
committed by GitHub
parent af99c959ef
commit c84656a53e
15 changed files with 689 additions and 46 deletions

View File

@@ -152,6 +152,9 @@ pub struct NeonStorageControllerConf {
/// Heartbeat timeout before marking a node offline
#[serde(with = "humantime_serde")]
pub max_unavailable: Duration,
/// Threshold for auto-splitting a tenant into shards
pub split_threshold: Option<u64>,
}
impl NeonStorageControllerConf {
@@ -164,6 +167,7 @@ impl Default for NeonStorageControllerConf {
fn default() -> Self {
Self {
max_unavailable: Self::DEFAULT_MAX_UNAVAILABLE_INTERVAL,
split_threshold: None,
}
}
}

View File

@@ -305,6 +305,10 @@ impl StorageController {
));
}
if let Some(split_threshold) = self.config.split_threshold.as_ref() {
args.push(format!("--split-threshold={split_threshold}"))
}
background_process::start_process(
COMMAND,
&self.env.base_data_dir,

View File

@@ -824,6 +824,55 @@ pub struct TenantScanRemoteStorageResponse {
pub shards: Vec<TenantScanRemoteStorageShard>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "snake_case")]
pub enum TenantSorting {
ResidentSize,
MaxLogicalSize,
}
impl Default for TenantSorting {
fn default() -> Self {
Self::ResidentSize
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct TopTenantShardsRequest {
// How would you like to sort the tenants?
pub order_by: TenantSorting,
// How many results?
pub limit: usize,
// Omit tenants with more than this many shards (e.g. if this is the max number of shards
// that the caller would ever split to)
pub where_shards_lt: Option<ShardCount>,
// Omit tenants where the ordering metric is less than this (this is an optimization to
// let us quickly exclude numerous tiny shards)
pub where_gt: Option<u64>,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
pub struct TopTenantShardItem {
pub id: TenantShardId,
/// Total size of layers on local disk for all timelines in this tenant
pub resident_size: u64,
/// Total size of layers in remote storage for all timelines in this tenant
pub physical_size: u64,
/// The largest logical size of a timeline within this tenant
pub max_logical_size: u64,
}
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct TopTenantShardsResponse {
pub shards: Vec<TopTenantShardItem>,
}
pub mod virtual_file {
#[derive(
Copy,

View File

@@ -125,7 +125,7 @@ impl ShardCount {
/// `v` may be zero, or the number of shards in the tenant. `v` is what
/// [`Self::literal`] would return.
pub fn new(val: u8) -> Self {
pub const fn new(val: u8) -> Self {
Self(val)
}
}

View File

@@ -486,6 +486,18 @@ impl Client {
.map_err(Error::ReceiveBody)
}
pub async fn top_tenant_shards(
&self,
request: TopTenantShardsRequest,
) -> Result<TopTenantShardsResponse> {
let uri = format!("{}/v1/top_tenants", self.mgmt_api_endpoint);
self.request(Method::POST, uri, request)
.await?
.json()
.await
.map_err(Error::ReceiveBody)
}
pub async fn layer_map_info(
&self,
tenant_shard_id: TenantShardId,

View File

@@ -1,6 +1,8 @@
//!
//! Management HTTP API
//!
use std::cmp::Reverse;
use std::collections::BinaryHeap;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
@@ -24,7 +26,11 @@ use pageserver_api::models::TenantScanRemoteStorageShard;
use pageserver_api::models::TenantShardLocation;
use pageserver_api::models::TenantShardSplitRequest;
use pageserver_api::models::TenantShardSplitResponse;
use pageserver_api::models::TenantSorting;
use pageserver_api::models::TenantState;
use pageserver_api::models::TopTenantShardItem;
use pageserver_api::models::TopTenantShardsRequest;
use pageserver_api::models::TopTenantShardsResponse;
use pageserver_api::models::{
DownloadRemoteLayersTaskSpawnRequest, LocationConfigMode, TenantAttachRequest,
TenantLoadRequest, TenantLocationConfigRequest,
@@ -2323,6 +2329,97 @@ async fn get_utilization(
.map_err(ApiError::InternalServerError)
}
/// Report on the largest tenants on this pageserver, for the storage controller to identify
/// candidates for splitting
async fn post_top_tenants(
mut r: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
check_permission(&r, None)?;
let request: TopTenantShardsRequest = json_request(&mut r).await?;
let state = get_state(&r);
fn get_size_metric(sizes: &TopTenantShardItem, order_by: &TenantSorting) -> u64 {
match order_by {
TenantSorting::ResidentSize => sizes.resident_size,
TenantSorting::MaxLogicalSize => sizes.max_logical_size,
}
}
#[derive(Eq, PartialEq)]
struct HeapItem {
metric: u64,
sizes: TopTenantShardItem,
}
impl PartialOrd for HeapItem {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
/// Heap items have reverse ordering on their metric: this enables using BinaryHeap, which
/// supports popping the greatest item but not the smallest.
impl Ord for HeapItem {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
Reverse(self.metric).cmp(&Reverse(other.metric))
}
}
let mut top_n: BinaryHeap<HeapItem> = BinaryHeap::with_capacity(request.limit);
// FIXME: this is a lot of clones to take this tenant list
for (tenant_shard_id, tenant_slot) in state.tenant_manager.list() {
if let Some(shards_lt) = request.where_shards_lt {
// Ignore tenants which already have >= this many shards
if tenant_shard_id.shard_count >= shards_lt {
continue;
}
}
let sizes = match tenant_slot {
TenantSlot::Attached(tenant) => tenant.get_sizes(),
TenantSlot::Secondary(_) | TenantSlot::InProgress(_) => {
continue;
}
};
let metric = get_size_metric(&sizes, &request.order_by);
if let Some(gt) = request.where_gt {
// Ignore tenants whose metric is <= the lower size threshold, to do less sorting work
if metric <= gt {
continue;
}
};
match top_n.peek() {
None => {
// Top N list is empty: candidate becomes first member
top_n.push(HeapItem { metric, sizes });
}
Some(i) if i.metric > metric && top_n.len() < request.limit => {
// Lowest item in list is greater than our candidate, but we aren't at limit yet: push to end
top_n.push(HeapItem { metric, sizes });
}
Some(i) if i.metric > metric => {
// List is at limit and lowest value is greater than our candidate, drop it.
}
Some(_) => top_n.push(HeapItem { metric, sizes }),
}
while top_n.len() > request.limit {
top_n.pop();
}
}
json_response(
StatusCode::OK,
TopTenantShardsResponse {
shards: top_n.into_iter().map(|i| i.sizes).collect(),
},
)
}
/// Common functionality of all the HTTP API handlers.
///
/// - Adds a tracing span to each request (by `request_span`)
@@ -2609,5 +2706,6 @@ pub fn make_router(
)
.put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler))
.get("/v1/utilization", |r| api_handler(r, get_utilization))
.post("/v1/top_tenants", |r| api_handler(r, post_top_tenants))
.any(handler_404))
}

View File

@@ -2098,7 +2098,7 @@ pub(crate) struct TimelineMetrics {
pub garbage_collect_histo: StorageTimeMetrics,
pub find_gc_cutoffs_histo: StorageTimeMetrics,
pub last_record_gauge: IntGauge,
resident_physical_size_gauge: UIntGauge,
pub resident_physical_size_gauge: UIntGauge,
/// copy of LayeredTimeline.current_logical_size
pub current_logical_size_gauge: UIntGauge,
pub aux_file_size_gauge: IntGauge,
@@ -2312,6 +2312,7 @@ use pin_project_lite::pin_project;
use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::pin::Pin;
use std::sync::atomic::AtomicU64;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
@@ -2321,35 +2322,35 @@ use crate::task_mgr::TaskKind;
use crate::tenant::mgr::TenantSlot;
/// Maintain a per timeline gauge in addition to the global gauge.
struct PerTimelineRemotePhysicalSizeGauge {
last_set: u64,
pub(crate) struct PerTimelineRemotePhysicalSizeGauge {
last_set: AtomicU64,
gauge: UIntGauge,
}
impl PerTimelineRemotePhysicalSizeGauge {
fn new(per_timeline_gauge: UIntGauge) -> Self {
Self {
last_set: per_timeline_gauge.get(),
last_set: AtomicU64::new(0),
gauge: per_timeline_gauge,
}
}
fn set(&mut self, sz: u64) {
pub(crate) fn set(&self, sz: u64) {
self.gauge.set(sz);
if sz < self.last_set {
REMOTE_PHYSICAL_SIZE_GLOBAL.sub(self.last_set - sz);
let prev = self.last_set.swap(sz, std::sync::atomic::Ordering::Relaxed);
if sz < prev {
REMOTE_PHYSICAL_SIZE_GLOBAL.sub(prev - sz);
} else {
REMOTE_PHYSICAL_SIZE_GLOBAL.add(sz - self.last_set);
REMOTE_PHYSICAL_SIZE_GLOBAL.add(sz - prev);
};
self.last_set = sz;
}
fn get(&self) -> u64 {
pub(crate) fn get(&self) -> u64 {
self.gauge.get()
}
}
impl Drop for PerTimelineRemotePhysicalSizeGauge {
fn drop(&mut self) {
REMOTE_PHYSICAL_SIZE_GLOBAL.sub(self.last_set);
REMOTE_PHYSICAL_SIZE_GLOBAL.sub(self.last_set.load(std::sync::atomic::Ordering::Relaxed));
}
}
@@ -2357,7 +2358,7 @@ pub(crate) struct RemoteTimelineClientMetrics {
tenant_id: String,
shard_id: String,
timeline_id: String,
remote_physical_size_gauge: Mutex<Option<PerTimelineRemotePhysicalSizeGauge>>,
pub(crate) remote_physical_size_gauge: PerTimelineRemotePhysicalSizeGauge,
calls: Mutex<HashMap<(&'static str, &'static str), IntCounterPair>>,
bytes_started_counter: Mutex<HashMap<(&'static str, &'static str), IntCounter>>,
bytes_finished_counter: Mutex<HashMap<(&'static str, &'static str), IntCounter>>,
@@ -2365,38 +2366,27 @@ pub(crate) struct RemoteTimelineClientMetrics {
impl RemoteTimelineClientMetrics {
pub fn new(tenant_shard_id: &TenantShardId, timeline_id: &TimelineId) -> Self {
let tenant_id_str = tenant_shard_id.tenant_id.to_string();
let shard_id_str = format!("{}", tenant_shard_id.shard_slug());
let timeline_id_str = timeline_id.to_string();
let remote_physical_size_gauge = PerTimelineRemotePhysicalSizeGauge::new(
REMOTE_PHYSICAL_SIZE
.get_metric_with_label_values(&[&tenant_id_str, &shard_id_str, &timeline_id_str])
.unwrap(),
);
RemoteTimelineClientMetrics {
tenant_id: tenant_shard_id.tenant_id.to_string(),
shard_id: format!("{}", tenant_shard_id.shard_slug()),
timeline_id: timeline_id.to_string(),
tenant_id: tenant_id_str,
shard_id: shard_id_str,
timeline_id: timeline_id_str,
calls: Mutex::new(HashMap::default()),
bytes_started_counter: Mutex::new(HashMap::default()),
bytes_finished_counter: Mutex::new(HashMap::default()),
remote_physical_size_gauge: Mutex::new(None),
remote_physical_size_gauge,
}
}
pub(crate) fn remote_physical_size_set(&self, sz: u64) {
let mut guard = self.remote_physical_size_gauge.lock().unwrap();
let gauge = guard.get_or_insert_with(|| {
PerTimelineRemotePhysicalSizeGauge::new(
REMOTE_PHYSICAL_SIZE
.get_metric_with_label_values(&[
&self.tenant_id,
&self.shard_id,
&self.timeline_id,
])
.unwrap(),
)
});
gauge.set(sz);
}
pub(crate) fn remote_physical_size_get(&self) -> u64 {
let guard = self.remote_physical_size_gauge.lock().unwrap();
guard.as_ref().map(|gauge| gauge.get()).unwrap_or(0)
}
pub fn remote_operation_time(
&self,
file_kind: &RemoteOpFileKind,

View File

@@ -21,6 +21,7 @@ use futures::FutureExt;
use futures::StreamExt;
use pageserver_api::models;
use pageserver_api::models::TimelineState;
use pageserver_api::models::TopTenantShardItem;
use pageserver_api::models::WalRedoManagerStatus;
use pageserver_api::shard::ShardIdentity;
use pageserver_api::shard::ShardStripeSize;
@@ -2196,6 +2197,31 @@ impl Tenant {
Ok(())
}
pub(crate) fn get_sizes(&self) -> TopTenantShardItem {
let mut result = TopTenantShardItem {
id: self.tenant_shard_id,
resident_size: 0,
physical_size: 0,
max_logical_size: 0,
};
for timeline in self.timelines.lock().unwrap().values() {
result.resident_size += timeline.metrics.resident_physical_size_gauge.get();
result.physical_size += timeline
.remote_client
.metrics
.remote_physical_size_gauge
.get();
result.max_logical_size = std::cmp::max(
result.max_logical_size,
timeline.metrics.current_logical_size_gauge.get(),
);
}
result
}
}
/// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id),

View File

@@ -317,7 +317,7 @@ pub struct RemoteTimelineClient {
upload_queue: Mutex<UploadQueue>,
metrics: Arc<RemoteTimelineClientMetrics>,
pub(crate) metrics: Arc<RemoteTimelineClientMetrics>,
storage_impl: GenericRemoteStorage,
@@ -461,11 +461,11 @@ impl RemoteTimelineClient {
} else {
0
};
self.metrics.remote_physical_size_set(size);
self.metrics.remote_physical_size_gauge.set(size);
}
pub fn get_remote_physical_size(&self) -> u64 {
self.metrics.remote_physical_size_get()
self.metrics.remote_physical_size_gauge.get()
}
//

View File

@@ -66,6 +66,10 @@ struct Cli {
#[arg(long)]
max_unavailable_interval: Option<humantime::Duration>,
/// Size threshold for automatically splitting shards (disabled by default)
#[arg(long)]
split_threshold: Option<u64>,
/// Maximum number of reconcilers that may run in parallel
#[arg(long)]
reconciler_concurrency: Option<usize>,
@@ -255,6 +259,7 @@ async fn async_main() -> anyhow::Result<()> {
reconciler_concurrency: args
.reconciler_concurrency
.unwrap_or(RECONCILER_CONCURRENCY_DEFAULT),
split_threshold: args.split_threshold,
};
// After loading secrets & config, but before starting anything else, apply database migrations

View File

@@ -2,7 +2,7 @@ use pageserver_api::{
models::{
LocationConfig, LocationConfigListResponse, PageserverUtilization, SecondaryProgress,
TenantScanRemoteStorageResponse, TenantShardSplitRequest, TenantShardSplitResponse,
TimelineCreateRequest, TimelineInfo,
TimelineCreateRequest, TimelineInfo, TopTenantShardsRequest, TopTenantShardsResponse,
},
shard::TenantShardId,
};
@@ -234,4 +234,16 @@ impl PageserverClient {
self.inner.get_utilization().await
)
}
pub(crate) async fn top_tenant_shards(
&self,
request: TopTenantShardsRequest,
) -> Result<TopTenantShardsResponse> {
measured_request!(
"top_tenants",
crate::metrics::Method::Post,
&self.node_id_label,
self.inner.top_tenant_shards(request).await
)
}
}

View File

@@ -32,10 +32,10 @@ use pageserver_api::{
TenantPolicyRequest, TenantShardMigrateRequest, TenantShardMigrateResponse,
UtilizationScore,
},
models::{SecondaryProgress, TenantConfigRequest},
models::{SecondaryProgress, TenantConfigRequest, TopTenantShardsRequest},
};
use reqwest::StatusCode;
use tracing::instrument;
use tracing::{instrument, Instrument};
use crate::pageserver_client::PageserverClient;
use pageserver_api::{
@@ -222,6 +222,10 @@ pub struct Config {
/// How many Reconcilers may be spawned concurrently
pub reconciler_concurrency: usize,
/// How large must a shard grow in bytes before we split it?
/// None disables auto-splitting.
pub split_threshold: Option<u64>,
}
impl From<DatabaseError> for ApiError {
@@ -699,7 +703,7 @@ impl Service {
/// e.g. a tenant create/attach/migrate must eventually be retried: this task is responsible
/// for those retries.
#[instrument(skip_all)]
async fn background_reconcile(&self) {
async fn background_reconcile(self: &Arc<Self>) {
self.startup_complete.clone().wait().await;
const BACKGROUND_RECONCILE_PERIOD: Duration = Duration::from_secs(20);
@@ -711,7 +715,11 @@ impl Service {
let reconciles_spawned = self.reconcile_all();
if reconciles_spawned == 0 {
// Run optimizer only when we didn't find any other work to do
self.optimize_all().await;
let optimizations = self.optimize_all().await;
if optimizations == 0 {
// Run new splits only when no optimizations are pending
self.autosplit_tenants().await;
}
}
}
_ = self.cancel.cancelled() => return
@@ -4766,6 +4774,104 @@ impl Service {
validated_work
}
/// Look for shards which are oversized and in need of splitting
async fn autosplit_tenants(self: &Arc<Self>) {
let Some(split_threshold) = self.config.split_threshold else {
// Auto-splitting is disabled
return;
};
let nodes = self.inner.read().unwrap().nodes.clone();
const SPLIT_TO_MAX: ShardCount = ShardCount::new(8);
let mut top_n = Vec::new();
// Call into each node to look for big tenants
let top_n_request = TopTenantShardsRequest {
// We currently split based on logical size, for simplicity: logical size is a signal of
// the user's intent to run a large database, whereas physical/resident size can be symptoms
// of compaction issues. Eventually we should switch to using resident size to bound the
// disk space impact of one shard.
order_by: models::TenantSorting::MaxLogicalSize,
limit: 10,
where_shards_lt: Some(SPLIT_TO_MAX),
where_gt: Some(split_threshold),
};
for node in nodes.values() {
let request_ref = &top_n_request;
match node
.with_client_retries(
|client| async move {
let request = request_ref.clone();
client.top_tenant_shards(request.clone()).await
},
&self.config.jwt_token,
3,
3,
Duration::from_secs(5),
&self.cancel,
)
.await
{
Some(Ok(node_top_n)) => {
top_n.extend(node_top_n.shards.into_iter());
}
Some(Err(mgmt_api::Error::Cancelled)) => {
continue;
}
Some(Err(e)) => {
tracing::warn!("Failed to fetch top N tenants from {node}: {e}");
continue;
}
None => {
// Node is shutting down
continue;
}
};
}
// Pick the biggest tenant to split first
top_n.sort_by_key(|i| i.resident_size);
let Some(split_candidate) = top_n.into_iter().next() else {
tracing::debug!("No split-elegible shards found");
return;
};
// We spawn a task to run this, so it's exactly like some external API client requesting it. We don't
// want to block the background reconcile loop on this.
tracing::info!("Auto-splitting tenant for size threshold {split_threshold}: current size {split_candidate:?}");
let this = self.clone();
tokio::spawn(
async move {
match this
.tenant_shard_split(
split_candidate.id.tenant_id,
TenantShardSplitRequest {
// Always split to the max number of shards: this avoids stepping through
// intervening shard counts and encountering the overrhead of a split+cleanup
// each time as a tenant grows, and is not too expensive because our max shard
// count is relatively low anyway.
// This policy will be adjusted in future once we support higher shard count.
new_shard_count: SPLIT_TO_MAX.literal(),
new_stripe_size: Some(ShardParameters::DEFAULT_STRIPE_SIZE),
},
)
.await
{
Ok(_) => {
tracing::info!("Successful auto-split");
}
Err(e) => {
tracing::error!("Auto-split failed: {e}");
}
}
}
.instrument(tracing::info_span!("auto_split", tenant_id=%split_candidate.id.tenant_id)),
);
}
/// Useful for tests: run whatever work a background [`Self::reconcile_all`] would have done, but
/// also wait for any generated Reconcilers to complete. Calling this until it returns zero should
/// put the system into a quiescent state where future background reconciliations won't do anything.

View File

@@ -890,3 +890,18 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
assert current_logical_size == non_incremental
assert isinstance(current_logical_size, int)
return current_logical_size
def top_tenants(
self, order_by: str, limit: int, where_shards_lt: int, where_gt: int
) -> dict[Any, Any]:
res = self.post(
f"http://localhost:{self.port}/v1/top_tenants",
json={
"order_by": order_by,
"limit": limit,
"where_shards_lt": where_shards_lt,
"where_gt": where_gt,
},
)
self.verbose_error(res)
return res.json() # type: ignore

View File

@@ -0,0 +1,280 @@
import concurrent.futures
import re
from pathlib import Path
import pytest
from fixtures.common_types import TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
PgBin,
tenant_get_shards,
)
@pytest.mark.timeout(600)
def test_sharding_autosplit(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
"""
Check that sharding, including auto-splitting, "just works" under pgbench workloads.
This is not a benchmark, but it lives in the same place as benchmarks in order to be run
on a dedicated node that can sustain some significant throughput.
Other tests validate the details of shard splitting, error cases etc. This test is
the sanity check that it all really works as expected with realistic amounts of data
and under load.
Success conditions:
- Tenants auto-split when their capacity grows
- Client workloads are not interrupted while that happens
"""
neon_env_builder.num_pageservers = 8
neon_env_builder.storage_controller_config = {
# Split tenants at 500MB: it's up to the storage controller how it interprets this (logical
# sizes, physical sizes, etc). We will write this much data logically, therefore other sizes
# will reliably be greater.
"split_threshold": 1024 * 1024 * 500
}
tenant_conf = {
# We want layer rewrites to happen as soon as possible (this is the most stressful
# case for the system), so set PITR interval to something tiny.
"pitr_interval": "5s",
# Scaled down thresholds. We will run at ~1GB scale but would like to emulate
# the behavior of a system running at ~100GB scale.
"checkpoint_distance": f"{1024 * 1024}",
"compaction_threshold": "1",
"compaction_target_size": f"{1024 * 1024}",
"image_creation_threshold": "2",
"image_layer_creation_check_threshold": "0",
}
env = neon_env_builder.init_start()
for ps in env.pageservers:
ps.allowed_errors.extend(
[
# We shut down pageservers while they might have some compaction work going on
".*Compaction failed.*shutting down.*"
]
)
env.storage_controller.allowed_errors.extend(
[
# The neon_local functionality for updating computes is flaky for unknown reasons
".*Local notification hook failed.*",
".*Marking shard.*for notification retry.*",
".*Failed to notify compute.*",
]
)
# Total tenants
tenant_count = 4
# Transaction rate: we set this rather than running at full-speed because we
# might run on a slow node that doesn't cope well with many full-speed pgbenches running concurrently.
transaction_rate = 100
class TenantState:
def __init__(self, timeline_id, endpoint):
self.timeline_id = timeline_id
self.endpoint = endpoint
# Create tenants
tenants = {}
for tenant_id in set(TenantId.generate() for _i in range(0, tenant_count)):
timeline_id = TimelineId.generate()
env.neon_cli.create_tenant(tenant_id, timeline_id, conf=tenant_conf)
endpoint = env.endpoints.create("main", tenant_id=tenant_id)
tenants[tenant_id] = TenantState(timeline_id, endpoint)
endpoint.start()
def run_pgbench_init(endpoint):
pg_bin.run_capture(
[
"pgbench",
"-s50",
"-i",
f"postgres://cloud_admin@localhost:{endpoint.pg_port}/postgres",
]
)
def check_pgbench_output(out_path: str):
"""
When we run pgbench, we want not just an absence of errors, but also continuous evidence
of I/O progressing: our shard splitting and migration should not interrrupt the benchmark.
"""
matched_lines = 0
stderr = Path(f"{out_path}.stderr").read_text()
low_watermark = None
# Apply this as a threshold for what we consider an unacceptable interruption to I/O
min_tps = transaction_rate // 10
for line in stderr.split("\n"):
match = re.match(r"progress: ([0-9\.]+) s, ([0-9\.]+) tps, .* ([0-9]+) failed", line)
if match is None:
# Fall back to older-version pgbench output (omits failure count)
match = re.match(r"progress: ([0-9\.]+) s, ([0-9\.]+) tps, .*", line)
if match is None:
continue
else:
(_time, tps) = match.groups()
tps = float(tps)
failed = 0
else:
(_time, tps, failed) = match.groups() # type: ignore
tps = float(tps)
failed = int(failed)
matched_lines += 1
if failed > 0:
raise RuntimeError(
f"pgbench on tenant {endpoint.tenant_id} run at {out_path} has failed > 0"
)
if low_watermark is None or low_watermark > tps:
low_watermark = tps
# Temporarily disabled: have seen some 0 tps regions on Hetzner runners, but not
# at the same time as a shard split.
# if tps < min_tps:
# raise RuntimeError(
# f"pgbench on tenant {endpoint.tenant_id} run at {out_path} has tps < {min_tps}"
# )
log.info(f"Checked {matched_lines} progress lines, lowest TPS was {min_tps}")
if matched_lines == 0:
raise RuntimeError(f"pgbench output at {out_path} contained no progress lines")
def run_pgbench_main(endpoint):
out_path = pg_bin.run_capture(
[
"pgbench",
"-s50",
"-T",
"180",
"-R",
f"{transaction_rate}",
"-P",
"1",
f"postgres://cloud_admin@localhost:{endpoint.pg_port}/postgres",
]
)
check_pgbench_output(out_path)
def run_pgbench_read(endpoint):
out_path = pg_bin.run_capture(
[
"pgbench",
"-s50",
"-T",
"30",
"-R",
f"{transaction_rate}",
"-S",
"-P",
"1",
f"postgres://cloud_admin@localhost:{endpoint.pg_port}/postgres",
]
)
check_pgbench_output(out_path)
with concurrent.futures.ThreadPoolExecutor(max_workers=tenant_count) as pgbench_threads:
pgbench_futs = []
for tenant_state in tenants.values():
fut = pgbench_threads.submit(run_pgbench_init, tenant_state.endpoint)
pgbench_futs.append(fut)
log.info("Waiting for pgbench inits")
for fut in pgbench_futs:
fut.result()
pgbench_futs = []
for tenant_state in tenants.values():
fut = pgbench_threads.submit(run_pgbench_main, tenant_state.endpoint)
pgbench_futs.append(fut)
log.info("Waiting for pgbench read/write pass")
for fut in pgbench_futs:
fut.result()
def assert_all_split():
for tenant_id in tenants.keys():
shards = tenant_get_shards(env, tenant_id)
assert len(shards) == 8
# This is not a wait_until, because we wanted the splits to happen _while_ pgbench is running: otherwise
# this test is not properly doing its job of validating that splits work nicely under load.
assert_all_split()
env.storage_controller.assert_log_contains(".*Successful auto-split.*")
# Log timeline sizes, useful for debug, and implicitly validates that the shards
# are available in the places the controller thinks they should be.
for tenant_id, tenant_state in tenants.items():
(shard_zero_id, shard_zero_ps) = tenant_get_shards(env, tenant_id)[0]
timeline_info = shard_zero_ps.http_client().timeline_detail(
shard_zero_id, tenant_state.timeline_id
)
log.info(f"{shard_zero_id} timeline: {timeline_info}")
# Run compaction for all tenants, restart endpoint so that on subsequent reads we will
# definitely hit pageserver for reads. This compaction passis expected to drop unwanted
# layers but not do any rewrites (we're still in the same generation)
for tenant_id, tenant_state in tenants.items():
tenant_state.endpoint.stop()
for shard_id, shard_ps in tenant_get_shards(env, tenant_id):
shard_ps.http_client().timeline_gc(shard_id, tenant_state.timeline_id, gc_horizon=None)
shard_ps.http_client().timeline_compact(shard_id, tenant_state.timeline_id)
tenant_state.endpoint.start()
with concurrent.futures.ThreadPoolExecutor(max_workers=tenant_count) as pgbench_threads:
pgbench_futs = []
for tenant_state in tenants.values():
fut = pgbench_threads.submit(run_pgbench_read, tenant_state.endpoint)
pgbench_futs.append(fut)
log.info("Waiting for pgbench read pass")
for fut in pgbench_futs:
fut.result()
env.storage_controller.consistency_check()
# Restart the storage controller
env.storage_controller.stop()
env.storage_controller.start()
env.storage_controller.consistency_check()
# Restart all pageservers
for ps in env.pageservers:
ps.stop()
ps.start()
# Freshen gc_info in Timeline, so that when compaction runs in the background in the
# subsequent pgbench period, the last_gc_cutoff is updated and enables the conditions for a rewrite to pass.
for tenant_id, tenant_state in tenants.items():
for shard_id, shard_ps in tenant_get_shards(env, tenant_id):
shard_ps.http_client().timeline_gc(shard_id, tenant_state.timeline_id, gc_horizon=None)
# One last check data remains readable after everything has restarted
with concurrent.futures.ThreadPoolExecutor(max_workers=tenant_count) as pgbench_threads:
pgbench_futs = []
for tenant_state in tenants.values():
fut = pgbench_threads.submit(run_pgbench_read, tenant_state.endpoint)
pgbench_futs.append(fut)
log.info("Waiting for pgbench read pass")
for fut in pgbench_futs:
fut.result()
# Assert that some rewrites happened
# TODO: uncomment this after https://github.com/neondatabase/neon/pull/7531 is merged
# assert any(ps.log_contains(".*Rewriting layer after shard split.*") for ps in env.pageservers)

View File

@@ -1326,3 +1326,45 @@ def test_sharding_unlogged_relation(neon_env_builder: NeonEnvBuilder):
# Ensure that post-endpoint-restart modifications are ingested happily by pageserver
wait_for_last_flush_lsn(env, ep, tenant_id, timeline_id)
def test_top_tenants(neon_env_builder: NeonEnvBuilder):
"""
The top_tenants API is used in shard auto-splitting to find candidates.
"""
env = neon_env_builder.init_configs()
neon_env_builder.start()
tenants = []
n_tenants = 8
for i in range(0, n_tenants):
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
env.neon_cli.create_tenant(tenant_id, timeline_id)
# Write a different amount of data to each tenant
w = Workload(env, tenant_id, timeline_id)
w.init()
w.write_rows(i * 1000)
w.stop()
logical_size = env.pageserver.http_client().timeline_detail(tenant_id, timeline_id)[
"current_logical_size"
]
tenants.append((tenant_id, timeline_id, logical_size))
log.info(f"Created {tenant_id}/{timeline_id} with size {logical_size}")
# Ask for 1 largest tenant
top_1 = env.pageserver.http_client().top_tenants("max_logical_size", 1, 8, 0)
assert len(top_1["shards"]) == 1
assert top_1["shards"][0]["id"] == str(tenants[-1][0])
assert top_1["shards"][0]["max_logical_size"] == tenants[-1][2]
# Apply a lower bound limit
top = env.pageserver.http_client().top_tenants(
"max_logical_size", 100, 8, where_gt=tenants[3][2]
)
assert len(top["shards"]) == n_tenants - 4
assert set(i["id"] for i in top["shards"]) == set(str(i[0]) for i in tenants[4:])